#!/usr/bin/env python3 from __future__ import annotations import argparse import configparser import json import shutil import sys import tarfile import tempfile import urllib.error import urllib.parse import urllib.request from collections.abc import Callable from datetime import datetime from pathlib import Path from typing import Any DEFAULT_SERVER = "http://127.0.0.1:8080" DEFAULT_BOTTLES_DIR = Path.home() / ".var/app/com.usebottles.bottles/data/bottles/bottles" CONF_FILE = Path.home() / ".cellar.conf" CHUNK_SIZE = 1024 * 1024 ProgressCallback = Callable[[str, float | None], None] def load_cellar_conf() -> dict[str, str]: """Read ~/.cellar.conf [cellar], creating it with defaults if absent. Returns a dict with keys 'server' and 'bottles_dir'. """ cfg = configparser.ConfigParser() if not CONF_FILE.exists(): cfg["cellar"] = { "server": DEFAULT_SERVER, "bottles_dir": str(DEFAULT_BOTTLES_DIR), } with CONF_FILE.open("w") as fh: cfg.write(fh) else: cfg.read(CONF_FILE) section = cfg["cellar"] if "cellar" in cfg else {} return { "server": section.get("server", DEFAULT_SERVER).strip(), "bottles_dir": section.get("bottles_dir", str(DEFAULT_BOTTLES_DIR)).strip(), } def request_json(url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None): req = urllib.request.Request(url, data=data, method=method) for key, value in (headers or {}).items(): req.add_header(key, value) with urllib.request.urlopen(req) as response: return json.loads(response.read().decode("utf-8")) def get_archives(server: str) -> list[dict[str, Any]]: return request_json(f"{server}/archives") def get_archive(server: str, archive_id: int) -> dict[str, Any]: return request_json(f"{server}/archives/{archive_id}") def notify_progress(progress_callback: ProgressCallback | None, message: str, fraction: float | None = None) -> None: if progress_callback is not None: progress_callback(message, fraction) def print_archives(server: str) -> int: archives = get_archives(server) if not archives: print("No archives found.") return 0 print(f"{'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}") print("-" * 110) for item in archives: size_mb = item["size_bytes"] / (1024 * 1024) print( f"{item['id']:<4} " f"{(item['name'] or '-')[:30]:<30} " f"{(item.get('bottle_name') or '-')[:30]:<30} " f"{(item.get('arch') or '-'):<8} " f"{(item.get('runner') or '-')[:18]:<18} " f"{size_mb:>10.2f}" ) return 0 def upload_archive_with_result( server: str, file_path: Path, fields: dict[str, str], progress_callback: ProgressCallback | None = None, ) -> dict[str, Any]: boundary = "----BottleArchiveBoundary7MA4YWxkTrZu0gW" data = [] notify_progress(progress_callback, "Preparing upload…", 0.0) def add_field(field_name: str, value: str): data.extend( [ f"--{boundary}\r\n".encode(), f'Content-Disposition: form-data; name="{field_name}"\r\n\r\n'.encode(), value.encode(), b"\r\n", ] ) for key, value in fields.items(): if value: add_field(key, value) filename = file_path.name mime = "application/gzip" if filename.endswith((".tar.gz", ".tgz", ".gz")) else "application/octet-stream" notify_progress(progress_callback, f"Reading archive {filename}…", 0.25) data.extend( [ f"--{boundary}\r\n".encode(), f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode(), f"Content-Type: {mime}\r\n\r\n".encode(), file_path.read_bytes(), b"\r\n", f"--{boundary}--\r\n".encode(), ] ) payload = b"".join(data) notify_progress(progress_callback, "Uploading archive…", 0.7) archive = request_json( f"{server}/archives", method="POST", data=payload, headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, ) notify_progress(progress_callback, f"Upload completed: {archive['name']}", 1.0) return archive def upload_archive( server: str, file_path: Path, fields: dict[str, str], progress_callback: ProgressCallback | None = None, ) -> int: archive = upload_archive_with_result( server, file_path, fields, progress_callback=progress_callback, ) print(f"Uploaded archive #{archive['id']}: {archive['name']}") return 0 def download_archive( server: str, archive_id: int, output: Path, progress_callback: ProgressCallback | None = None, ) -> int: url = f"{server}/archives/{archive_id}/download" req = urllib.request.Request(url, method="GET") notify_progress(progress_callback, "Preparing download…", 0.0) with urllib.request.urlopen(req) as response: if output.is_dir(): filename = response.headers.get_filename() or f"archive-{archive_id}.bin" destination = output / filename else: destination = output total_bytes = response.headers.get("Content-Length") total = int(total_bytes) if total_bytes and total_bytes.isdigit() else None received = 0 with destination.open("wb") as buffer: while chunk := response.read(CHUNK_SIZE): buffer.write(chunk) received += len(chunk) fraction = (received / total) if total else None notify_progress(progress_callback, f"Downloading archive… {received / (1024 * 1024):.1f} MB", fraction) notify_progress(progress_callback, f"Download completed: {destination}", 1.0) if progress_callback is None: print(f"Downloaded to {destination}") return 0 def find_remote_archive(server: str, bottle_ref: str) -> dict[str, Any]: archives = get_archives(server) bottle_ref_lower = bottle_ref.lower() for item in archives: if (item.get("bottle_name") or "").lower() == bottle_ref_lower: return item for item in archives: if (item.get("name") or "").lower() == bottle_ref_lower: return item raise FileNotFoundError(f"No remote archive found for '{bottle_ref}'") def safe_extract_tar(archive_path: Path, target_dir: Path) -> None: with tarfile.open(archive_path, "r:gz") as tar: for member in tar.getmembers(): member_path = (target_dir / member.name).resolve() if not str(member_path).startswith(str(target_dir.resolve())): raise ValueError("Unsafe archive path detected.") tar.extractall(target_dir, filter="fully_trusted") def install_archive_from_metadata( server: str, archive: dict[str, Any], bottles_dir: Path, replace: bool, progress_callback: ProgressCallback | None = None, bottle_ref: str | None = None, ) -> int: bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref or "unknown-bottle" target_dir = bottles_dir / bottle_name if target_dir.exists(): if not replace: notify_progress(progress_callback, f"Bottle already exists: {target_dir}", None) print( f"Bottle already exists: {target_dir}\n" "Use --replace to overwrite it.", file=sys.stderr, ) return 1 notify_progress(progress_callback, f"Removing existing bottle: {target_dir}", None) shutil.rmtree(target_dir) bottles_dir.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory(prefix="bottle-install-") as tmp_dir: tmp_path = Path(tmp_dir) download_path = tmp_path / f"archive-{archive['id']}.tar.gz" notify_progress(progress_callback, "Downloading archive…", 0.0) download_archive(server, int(archive["id"]), download_path, progress_callback=progress_callback) extract_dir = tmp_path / "extract" extract_dir.mkdir(parents=True, exist_ok=True) notify_progress(progress_callback, "Extracting archive…", None) safe_extract_tar(download_path, extract_dir) candidates = [path for path in extract_dir.iterdir() if path.is_dir()] if not candidates: raise FileNotFoundError("Archive did not contain a bottle directory.") source_dir = candidates[0] bottle_yml = source_dir / "bottle.yml" if not bottle_yml.exists(): raise FileNotFoundError("Archive does not look like a valid bottle backup.") notify_progress(progress_callback, f"Installing into {target_dir}…", None) shutil.move(str(source_dir), str(target_dir)) notify_progress(progress_callback, f"Installed bottle '{bottle_name}' to {target_dir}", 1.0) if progress_callback is None: print(f"Installed bottle '{bottle_name}' to {target_dir}") return 0 def install_archive( server: str, bottle_ref: str, bottles_dir: Path, replace: bool, progress_callback: ProgressCallback | None = None, ) -> int: archive = find_remote_archive(server, bottle_ref) return install_archive_from_metadata( server, archive, bottles_dir, replace, progress_callback=progress_callback, bottle_ref=bottle_ref, ) def parse_bottle_yml(bottle_yml: Path) -> dict[str, str]: data: dict[str, str] = {} wanted_keys = {"Name", "Arch", "Runner", "Environment", "Windows"} for line in bottle_yml.read_text(encoding="utf-8", errors="ignore").splitlines(): if ":" not in line or line.startswith(" "): continue key, value = line.split(":", 1) if key in wanted_keys: data[key] = value.strip().strip("'") return data def collect_local_bottles(bottles_dir: Path) -> list[dict[str, Any]]: if not bottles_dir.exists(): return [] bottles: list[dict[str, Any]] = [] for directory in sorted(path for path in bottles_dir.iterdir() if path.is_dir()): bottle_yml = directory / "bottle.yml" if not bottle_yml.exists(): continue metadata = parse_bottle_yml(bottle_yml) bottles.append( { "name": metadata.get("Name", directory.name), "directory": directory.name, "path": str(directory), "arch": metadata.get("Arch", "-"), "runner": metadata.get("Runner", "-"), "environment": metadata.get("Environment", "-"), "windows": metadata.get("Windows", "-"), } ) return bottles def scan_local_bottles(bottles_dir: Path, as_json: bool) -> int: bottles = collect_local_bottles(bottles_dir) if as_json: print(json.dumps(bottles, indent=2)) return 0 if not bottles: print(f"No local bottles found in {bottles_dir}") return 0 print(f"Local Bottles directory: {bottles_dir}") print(f"{'Dir':<24} {'Name':<28} {'Arch':<8} {'Runner':<18} {'Env':<12} {'Windows':<10}") print("-" * 110) for bottle in bottles: print( f"{bottle['directory'][:24]:<24} " f"{bottle['name'][:28]:<28} " f"{bottle['arch']:<8} " f"{bottle['runner'][:18]:<18} " f"{bottle['environment'][:12]:<12} " f"{bottle['windows'][:10]:<10}" ) return 0 def create_bottle_backup(bottle: dict[str, Any], output_dir: Path | None = None) -> Path: source_dir = Path(bottle["path"]) target_dir = output_dir or Path(tempfile.gettempdir()) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") archive_name = f"{bottle['directory']}-{timestamp}.tar.gz" archive_path = target_dir / archive_name with tarfile.open(archive_path, "w:gz") as tar: tar.add(source_dir, arcname=source_dir.name) return archive_path def prompt_text(label: str, default: str) -> str: value = input(f"{label} [{default}]: ").strip() return value or default def choose_bottle(bottles: list[dict[str, Any]]) -> dict[str, Any]: print("Available local bottles:") for index, bottle in enumerate(bottles, start=1): print( f" {index}. {bottle['directory']} " f"(arch={bottle['arch']}, runner={bottle['runner']}, windows={bottle['windows']})" ) while True: raw = input("Select a bottle number: ").strip() try: choice = int(raw) except ValueError: print("Please enter a valid number.") continue if 1 <= choice <= len(bottles): return bottles[choice - 1] print("Choice out of range.") def wizard_upload(server: str, bottles_dir: Path) -> int: bottles = collect_local_bottles(bottles_dir) if not bottles: print(f"No local bottles found in {bottles_dir}") return 0 bottle = choose_bottle(bottles) display_name = prompt_text("Archive name", bottle["name"]) bottle_name = prompt_text("Bottle name", bottle["directory"]) description = prompt_text("Description", f"Backup of {bottle['name']}") tags = prompt_text("Tags", "bottles,backup") print(f"\nCreating backup for {bottle['directory']}...") archive_path = create_bottle_backup(bottle) print(f"Backup created: {archive_path}") try: return upload_archive( server, archive_path, { "name": display_name, "bottle_name": bottle_name, "description": description, "tags": tags, "arch": str(bottle.get("arch", "")), "runner": str(bottle.get("runner", "")), "windows_version": str(bottle.get("windows", "")), }, ) finally: archive_path.unlink(missing_ok=True) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Minimal client for Bottle Archive Server") parser.add_argument("--server", default=DEFAULT_SERVER, help="Base server URL") subparsers = parser.add_subparsers(dest="command", required=True) subparsers.add_parser("list", help="List uploaded bottle archives") upload_parser = subparsers.add_parser("upload", help="Upload a bottle archive") upload_parser.add_argument("file", type=Path, help="Path to the archive file") upload_parser.add_argument("--name", required=True, help="Display name") upload_parser.add_argument("--bottle-name", help="Bottle name") upload_parser.add_argument("--description", help="Description") upload_parser.add_argument("--tags", help="Tags") upload_parser.add_argument("--arch", help="Architecture") upload_parser.add_argument("--runner", help="Runner") upload_parser.add_argument("--windows-version", help="Windows version") download_parser = subparsers.add_parser("download", help="Download an archive by id") download_parser.add_argument("archive_id", type=int, help="Archive id") download_parser.add_argument("output", type=Path, help="Output file or directory") install_parser = subparsers.add_parser( "install", help="Install a bottle from the server by bottle name or archive name", ) install_parser.add_argument("bottle", help="Bottle name or archive name to install") install_parser.add_argument( "--bottles-dir", type=Path, default=DEFAULT_BOTTLES_DIR, help="Path to the local Bottles directory", ) install_parser.add_argument("--replace", action="store_true", help="Replace existing local bottle") scan_parser = subparsers.add_parser("scan-local", help="List bottles installed on this computer") scan_parser.add_argument( "--bottles-dir", type=Path, default=DEFAULT_BOTTLES_DIR, help="Path to the local Bottles directory", ) scan_parser.add_argument("--json", action="store_true", help="Print results as JSON") wizard_parser = subparsers.add_parser( "wizard-upload", help="Choose a local bottle, create a backup, and upload it to the server", ) wizard_parser.add_argument( "--bottles-dir", type=Path, default=DEFAULT_BOTTLES_DIR, help="Path to the local Bottles directory", ) return parser def main() -> int: parser = build_parser() args = parser.parse_args() server = args.server.rstrip("/") try: if args.command == "list": return print_archives(server) if args.command == "upload": return upload_archive( server, args.file, { "name": args.name, "bottle_name": args.bottle_name or "", "description": args.description or "", "tags": args.tags or "", "arch": args.arch or "", "runner": args.runner or "", "windows_version": args.windows_version or "", }, ) if args.command == "download": return download_archive(server, args.archive_id, args.output) if args.command == "install": return install_archive(server, args.bottle, args.bottles_dir, args.replace) if args.command == "scan-local": return scan_local_bottles(args.bottles_dir, args.json) if args.command == "wizard-upload": return wizard_upload(server, args.bottles_dir) parser.error("Unknown command") return 2 except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") print(f"HTTP {exc.code}: {detail or exc.reason}", file=sys.stderr) return 1 except urllib.error.URLError as exc: print(f"Connection error: {exc.reason}", file=sys.stderr) return 1 except FileNotFoundError as exc: print(f"File error: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())