You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
412 lines
14 KiB
412 lines
14 KiB
#!/usr/bin/env python3 |
|
from __future__ import annotations |
|
|
|
import argparse |
|
import json |
|
import shutil |
|
import sys |
|
import tarfile |
|
import tempfile |
|
import urllib.error |
|
import urllib.parse |
|
import urllib.request |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Any |
|
|
|
DEFAULT_SERVER = "http://127.0.0.1:8080" |
|
DEFAULT_BOTTLES_DIR = Path.home() / ".var/app/com.usebottles.bottles/data/bottles/bottles" |
|
|
|
|
|
def request_json(url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None): |
|
req = urllib.request.Request(url, data=data, method=method) |
|
for key, value in (headers or {}).items(): |
|
req.add_header(key, value) |
|
with urllib.request.urlopen(req) as response: |
|
return json.loads(response.read().decode("utf-8")) |
|
|
|
|
|
def print_archives(server: str) -> int: |
|
archives = request_json(f"{server}/archives") |
|
if not archives: |
|
print("No archives found.") |
|
return 0 |
|
|
|
print(f"{'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}") |
|
print("-" * 110) |
|
for item in archives: |
|
size_mb = item["size_bytes"] / (1024 * 1024) |
|
print( |
|
f"{item['id']:<4} " |
|
f"{(item['name'] or '-')[:30]:<30} " |
|
f"{(item.get('bottle_name') or '-')[:30]:<30} " |
|
f"{(item.get('arch') or '-'):<8} " |
|
f"{(item.get('runner') or '-')[:18]:<18} " |
|
f"{size_mb:>10.2f}" |
|
) |
|
return 0 |
|
|
|
|
|
def upload_archive(server: str, file_path: Path, fields: dict[str, str]) -> int: |
|
boundary = "----BottleArchiveBoundary7MA4YWxkTrZu0gW" |
|
data = [] |
|
|
|
def add_field(field_name: str, value: str): |
|
data.extend( |
|
[ |
|
f"--{boundary}\r\n".encode(), |
|
f'Content-Disposition: form-data; name="{field_name}"\r\n\r\n'.encode(), |
|
value.encode(), |
|
b"\r\n", |
|
] |
|
) |
|
|
|
for key, value in fields.items(): |
|
if value: |
|
add_field(key, value) |
|
|
|
filename = file_path.name |
|
mime = "application/gzip" if filename.endswith((".tar.gz", ".tgz", ".gz")) else "application/octet-stream" |
|
data.extend( |
|
[ |
|
f"--{boundary}\r\n".encode(), |
|
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode(), |
|
f"Content-Type: {mime}\r\n\r\n".encode(), |
|
file_path.read_bytes(), |
|
b"\r\n", |
|
f"--{boundary}--\r\n".encode(), |
|
] |
|
) |
|
|
|
payload = b"".join(data) |
|
archive = request_json( |
|
f"{server}/archives", |
|
method="POST", |
|
data=payload, |
|
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, |
|
) |
|
print(f"Uploaded archive #{archive['id']}: {archive['name']}") |
|
return 0 |
|
|
|
|
|
def download_archive(server: str, archive_id: int, output: Path) -> int: |
|
url = f"{server}/archives/{archive_id}/download" |
|
req = urllib.request.Request(url, method="GET") |
|
with urllib.request.urlopen(req) as response: |
|
if output.is_dir(): |
|
filename = response.headers.get_filename() or f"archive-{archive_id}.bin" |
|
destination = output / filename |
|
else: |
|
destination = output |
|
destination.write_bytes(response.read()) |
|
print(f"Downloaded to {destination}") |
|
return 0 |
|
|
|
|
|
def find_remote_archive(server: str, bottle_ref: str) -> dict[str, Any]: |
|
archives = request_json(f"{server}/archives") |
|
bottle_ref_lower = bottle_ref.lower() |
|
|
|
for item in archives: |
|
if (item.get("bottle_name") or "").lower() == bottle_ref_lower: |
|
return item |
|
for item in archives: |
|
if (item.get("name") or "").lower() == bottle_ref_lower: |
|
return item |
|
|
|
raise FileNotFoundError(f"No remote archive found for '{bottle_ref}'") |
|
|
|
|
|
def safe_extract_tar(archive_path: Path, target_dir: Path) -> None: |
|
with tarfile.open(archive_path, "r:gz") as tar: |
|
for member in tar.getmembers(): |
|
member_path = (target_dir / member.name).resolve() |
|
if not str(member_path).startswith(str(target_dir.resolve())): |
|
raise ValueError("Unsafe archive path detected.") |
|
tar.extractall(target_dir, filter="fully_trusted") |
|
|
|
|
|
def install_archive(server: str, bottle_ref: str, bottles_dir: Path, replace: bool) -> int: |
|
archive = find_remote_archive(server, bottle_ref) |
|
bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref |
|
target_dir = bottles_dir / bottle_name |
|
|
|
if target_dir.exists(): |
|
if not replace: |
|
print( |
|
f"Bottle already exists: {target_dir}\n" |
|
"Use --replace to overwrite it.", |
|
file=sys.stderr, |
|
) |
|
return 1 |
|
shutil.rmtree(target_dir) |
|
|
|
bottles_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
with tempfile.TemporaryDirectory(prefix="bottle-install-") as tmp_dir: |
|
tmp_path = Path(tmp_dir) |
|
download_path = tmp_path / f"archive-{archive['id']}.tar.gz" |
|
download_archive(server, int(archive["id"]), download_path) |
|
|
|
extract_dir = tmp_path / "extract" |
|
extract_dir.mkdir(parents=True, exist_ok=True) |
|
safe_extract_tar(download_path, extract_dir) |
|
|
|
candidates = [path for path in extract_dir.iterdir() if path.is_dir()] |
|
if not candidates: |
|
raise FileNotFoundError("Archive did not contain a bottle directory.") |
|
|
|
source_dir = candidates[0] |
|
bottle_yml = source_dir / "bottle.yml" |
|
if not bottle_yml.exists(): |
|
raise FileNotFoundError("Archive does not look like a valid bottle backup.") |
|
|
|
shutil.move(str(source_dir), str(target_dir)) |
|
|
|
print(f"Installed bottle '{bottle_name}' to {target_dir}") |
|
return 0 |
|
|
|
|
|
def parse_bottle_yml(bottle_yml: Path) -> dict[str, str]: |
|
data: dict[str, str] = {} |
|
wanted_keys = {"Name", "Arch", "Runner", "Environment", "Windows"} |
|
|
|
for line in bottle_yml.read_text(encoding="utf-8", errors="ignore").splitlines(): |
|
if ":" not in line or line.startswith(" "): |
|
continue |
|
key, value = line.split(":", 1) |
|
if key in wanted_keys: |
|
data[key] = value.strip().strip("'") |
|
|
|
return data |
|
|
|
|
|
def collect_local_bottles(bottles_dir: Path) -> list[dict[str, Any]]: |
|
if not bottles_dir.exists(): |
|
return [] |
|
|
|
bottles: list[dict[str, Any]] = [] |
|
for directory in sorted(path for path in bottles_dir.iterdir() if path.is_dir()): |
|
bottle_yml = directory / "bottle.yml" |
|
if not bottle_yml.exists(): |
|
continue |
|
|
|
metadata = parse_bottle_yml(bottle_yml) |
|
bottles.append( |
|
{ |
|
"name": metadata.get("Name", directory.name), |
|
"directory": directory.name, |
|
"path": str(directory), |
|
"arch": metadata.get("Arch", "-"), |
|
"runner": metadata.get("Runner", "-"), |
|
"environment": metadata.get("Environment", "-"), |
|
"windows": metadata.get("Windows", "-"), |
|
} |
|
) |
|
|
|
return bottles |
|
|
|
|
|
def scan_local_bottles(bottles_dir: Path, as_json: bool) -> int: |
|
bottles = collect_local_bottles(bottles_dir) |
|
if as_json: |
|
print(json.dumps(bottles, indent=2)) |
|
return 0 |
|
|
|
if not bottles: |
|
print(f"No local bottles found in {bottles_dir}") |
|
return 0 |
|
|
|
print(f"Local Bottles directory: {bottles_dir}") |
|
print(f"{'Dir':<24} {'Name':<28} {'Arch':<8} {'Runner':<18} {'Env':<12} {'Windows':<10}") |
|
print("-" * 110) |
|
for bottle in bottles: |
|
print( |
|
f"{bottle['directory'][:24]:<24} " |
|
f"{bottle['name'][:28]:<28} " |
|
f"{bottle['arch']:<8} " |
|
f"{bottle['runner'][:18]:<18} " |
|
f"{bottle['environment'][:12]:<12} " |
|
f"{bottle['windows'][:10]:<10}" |
|
) |
|
return 0 |
|
|
|
|
|
def create_bottle_backup(bottle: dict[str, Any], output_dir: Path | None = None) -> Path: |
|
source_dir = Path(bottle["path"]) |
|
target_dir = output_dir or Path(tempfile.gettempdir()) |
|
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") |
|
archive_name = f"{bottle['directory']}-{timestamp}.tar.gz" |
|
archive_path = target_dir / archive_name |
|
|
|
with tarfile.open(archive_path, "w:gz") as tar: |
|
tar.add(source_dir, arcname=source_dir.name) |
|
|
|
return archive_path |
|
|
|
|
|
def prompt_text(label: str, default: str) -> str: |
|
value = input(f"{label} [{default}]: ").strip() |
|
return value or default |
|
|
|
|
|
def choose_bottle(bottles: list[dict[str, Any]]) -> dict[str, Any]: |
|
print("Available local bottles:") |
|
for index, bottle in enumerate(bottles, start=1): |
|
print( |
|
f" {index}. {bottle['directory']} " |
|
f"(arch={bottle['arch']}, runner={bottle['runner']}, windows={bottle['windows']})" |
|
) |
|
|
|
while True: |
|
raw = input("Select a bottle number: ").strip() |
|
try: |
|
choice = int(raw) |
|
except ValueError: |
|
print("Please enter a valid number.") |
|
continue |
|
|
|
if 1 <= choice <= len(bottles): |
|
return bottles[choice - 1] |
|
|
|
print("Choice out of range.") |
|
|
|
|
|
def wizard_upload(server: str, bottles_dir: Path) -> int: |
|
bottles = collect_local_bottles(bottles_dir) |
|
if not bottles: |
|
print(f"No local bottles found in {bottles_dir}") |
|
return 0 |
|
|
|
bottle = choose_bottle(bottles) |
|
display_name = prompt_text("Archive name", bottle["name"]) |
|
bottle_name = prompt_text("Bottle name", bottle["directory"]) |
|
description = prompt_text("Description", f"Backup of {bottle['name']}") |
|
tags = prompt_text("Tags", "bottles,backup") |
|
|
|
print(f"\nCreating backup for {bottle['directory']}...") |
|
archive_path = create_bottle_backup(bottle) |
|
print(f"Backup created: {archive_path}") |
|
|
|
try: |
|
return upload_archive( |
|
server, |
|
archive_path, |
|
{ |
|
"name": display_name, |
|
"bottle_name": bottle_name, |
|
"description": description, |
|
"tags": tags, |
|
"arch": str(bottle.get("arch", "")), |
|
"runner": str(bottle.get("runner", "")), |
|
"windows_version": str(bottle.get("windows", "")), |
|
}, |
|
) |
|
finally: |
|
archive_path.unlink(missing_ok=True) |
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser: |
|
parser = argparse.ArgumentParser(description="Minimal client for Bottle Archive Server") |
|
parser.add_argument("--server", default=DEFAULT_SERVER, help="Base server URL") |
|
|
|
subparsers = parser.add_subparsers(dest="command", required=True) |
|
|
|
subparsers.add_parser("list", help="List uploaded bottle archives") |
|
|
|
upload_parser = subparsers.add_parser("upload", help="Upload a bottle archive") |
|
upload_parser.add_argument("file", type=Path, help="Path to the archive file") |
|
upload_parser.add_argument("--name", required=True, help="Display name") |
|
upload_parser.add_argument("--bottle-name", help="Bottle name") |
|
upload_parser.add_argument("--description", help="Description") |
|
upload_parser.add_argument("--tags", help="Tags") |
|
upload_parser.add_argument("--arch", help="Architecture") |
|
upload_parser.add_argument("--runner", help="Runner") |
|
upload_parser.add_argument("--windows-version", help="Windows version") |
|
|
|
download_parser = subparsers.add_parser("download", help="Download an archive by id") |
|
download_parser.add_argument("archive_id", type=int, help="Archive id") |
|
download_parser.add_argument("output", type=Path, help="Output file or directory") |
|
|
|
install_parser = subparsers.add_parser( |
|
"install", |
|
help="Install a bottle from the server by bottle name or archive name", |
|
) |
|
install_parser.add_argument("bottle", help="Bottle name or archive name to install") |
|
install_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=DEFAULT_BOTTLES_DIR, |
|
help="Path to the local Bottles directory", |
|
) |
|
install_parser.add_argument("--replace", action="store_true", help="Replace existing local bottle") |
|
|
|
scan_parser = subparsers.add_parser("scan-local", help="List bottles installed on this computer") |
|
scan_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=DEFAULT_BOTTLES_DIR, |
|
help="Path to the local Bottles directory", |
|
) |
|
scan_parser.add_argument("--json", action="store_true", help="Print results as JSON") |
|
|
|
wizard_parser = subparsers.add_parser( |
|
"wizard-upload", |
|
help="Choose a local bottle, create a backup, and upload it to the server", |
|
) |
|
wizard_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=DEFAULT_BOTTLES_DIR, |
|
help="Path to the local Bottles directory", |
|
) |
|
|
|
return parser |
|
|
|
|
|
def main() -> int: |
|
parser = build_parser() |
|
args = parser.parse_args() |
|
server = args.server.rstrip("/") |
|
|
|
try: |
|
if args.command == "list": |
|
return print_archives(server) |
|
if args.command == "upload": |
|
return upload_archive( |
|
server, |
|
args.file, |
|
{ |
|
"name": args.name, |
|
"bottle_name": args.bottle_name or "", |
|
"description": args.description or "", |
|
"tags": args.tags or "", |
|
"arch": args.arch or "", |
|
"runner": args.runner or "", |
|
"windows_version": args.windows_version or "", |
|
}, |
|
) |
|
if args.command == "download": |
|
return download_archive(server, args.archive_id, args.output) |
|
if args.command == "install": |
|
return install_archive(server, args.bottle, args.bottles_dir, args.replace) |
|
if args.command == "scan-local": |
|
return scan_local_bottles(args.bottles_dir, args.json) |
|
if args.command == "wizard-upload": |
|
return wizard_upload(server, args.bottles_dir) |
|
parser.error("Unknown command") |
|
return 2 |
|
except urllib.error.HTTPError as exc: |
|
detail = exc.read().decode("utf-8", errors="ignore") |
|
print(f"HTTP {exc.code}: {detail or exc.reason}", file=sys.stderr) |
|
return 1 |
|
except urllib.error.URLError as exc: |
|
print(f"Connection error: {exc.reason}", file=sys.stderr) |
|
return 1 |
|
except FileNotFoundError as exc: |
|
print(f"File error: {exc}", file=sys.stderr) |
|
return 1 |
|
|
|
|
|
if __name__ == "__main__": |
|
raise SystemExit(main())
|
|
|