You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
528 lines
18 KiB
528 lines
18 KiB
#!/usr/bin/env python3 |
|
from __future__ import annotations |
|
|
|
import argparse |
|
import configparser |
|
import json |
|
import shutil |
|
import sys |
|
import tarfile |
|
import tempfile |
|
import urllib.error |
|
import urllib.parse |
|
import urllib.request |
|
from collections.abc import Callable |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Any |
|
|
|
DEFAULT_SERVER = "http://127.0.0.1:8080" |
|
DEFAULT_BOTTLES_DIR = Path.home() / ".var/app/com.usebottles.bottles/data/bottles/bottles" |
|
CONF_FILE = Path.home() / ".cellar.conf" |
|
CHUNK_SIZE = 1024 * 1024 |
|
ProgressCallback = Callable[[str, float | None], None] |
|
|
|
|
|
def load_cellar_conf() -> dict[str, str]: |
|
"""Read ~/.cellar.conf [cellar], creating it with defaults if absent. |
|
|
|
Returns a dict with keys 'server' and 'bottles_dir'. |
|
""" |
|
cfg = configparser.ConfigParser() |
|
if not CONF_FILE.exists(): |
|
cfg["cellar"] = { |
|
"server": DEFAULT_SERVER, |
|
"bottles_dir": str(DEFAULT_BOTTLES_DIR), |
|
} |
|
with CONF_FILE.open("w") as fh: |
|
cfg.write(fh) |
|
else: |
|
cfg.read(CONF_FILE) |
|
|
|
section = cfg["cellar"] if "cellar" in cfg else {} |
|
return { |
|
"server": section.get("server", DEFAULT_SERVER).strip(), |
|
"bottles_dir": section.get("bottles_dir", str(DEFAULT_BOTTLES_DIR)).strip(), |
|
} |
|
|
|
|
|
def request_json(url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None): |
|
req = urllib.request.Request(url, data=data, method=method) |
|
for key, value in (headers or {}).items(): |
|
req.add_header(key, value) |
|
with urllib.request.urlopen(req) as response: |
|
return json.loads(response.read().decode("utf-8")) |
|
|
|
|
|
def get_archives(server: str) -> list[dict[str, Any]]: |
|
return request_json(f"{server}/archives") |
|
|
|
|
|
def get_archive(server: str, archive_id: int) -> dict[str, Any]: |
|
return request_json(f"{server}/archives/{archive_id}") |
|
|
|
|
|
def notify_progress(progress_callback: ProgressCallback | None, message: str, fraction: float | None = None) -> None: |
|
if progress_callback is not None: |
|
progress_callback(message, fraction) |
|
|
|
|
|
def print_archives(server: str) -> int: |
|
archives = get_archives(server) |
|
if not archives: |
|
print("No archives found.") |
|
return 0 |
|
|
|
print(f"{'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}") |
|
print("-" * 110) |
|
for item in archives: |
|
size_mb = item["size_bytes"] / (1024 * 1024) |
|
print( |
|
f"{item['id']:<4} " |
|
f"{(item['name'] or '-')[:30]:<30} " |
|
f"{(item.get('bottle_name') or '-')[:30]:<30} " |
|
f"{(item.get('arch') or '-'):<8} " |
|
f"{(item.get('runner') or '-')[:18]:<18} " |
|
f"{size_mb:>10.2f}" |
|
) |
|
return 0 |
|
|
|
|
|
def upload_archive_with_result( |
|
server: str, |
|
file_path: Path, |
|
fields: dict[str, str], |
|
progress_callback: ProgressCallback | None = None, |
|
) -> dict[str, Any]: |
|
boundary = "----BottleArchiveBoundary7MA4YWxkTrZu0gW" |
|
data = [] |
|
|
|
notify_progress(progress_callback, "Preparing upload…", 0.0) |
|
|
|
def add_field(field_name: str, value: str): |
|
data.extend( |
|
[ |
|
f"--{boundary}\r\n".encode(), |
|
f'Content-Disposition: form-data; name="{field_name}"\r\n\r\n'.encode(), |
|
value.encode(), |
|
b"\r\n", |
|
] |
|
) |
|
|
|
for key, value in fields.items(): |
|
if value: |
|
add_field(key, value) |
|
|
|
filename = file_path.name |
|
mime = "application/gzip" if filename.endswith((".tar.gz", ".tgz", ".gz")) else "application/octet-stream" |
|
notify_progress(progress_callback, f"Reading archive {filename}…", 0.25) |
|
data.extend( |
|
[ |
|
f"--{boundary}\r\n".encode(), |
|
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode(), |
|
f"Content-Type: {mime}\r\n\r\n".encode(), |
|
file_path.read_bytes(), |
|
b"\r\n", |
|
f"--{boundary}--\r\n".encode(), |
|
] |
|
) |
|
|
|
payload = b"".join(data) |
|
notify_progress(progress_callback, "Uploading archive…", 0.7) |
|
archive = request_json( |
|
f"{server}/archives", |
|
method="POST", |
|
data=payload, |
|
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, |
|
) |
|
notify_progress(progress_callback, f"Upload completed: {archive['name']}", 1.0) |
|
return archive |
|
|
|
|
|
def upload_archive( |
|
server: str, |
|
file_path: Path, |
|
fields: dict[str, str], |
|
progress_callback: ProgressCallback | None = None, |
|
) -> int: |
|
archive = upload_archive_with_result( |
|
server, |
|
file_path, |
|
fields, |
|
progress_callback=progress_callback, |
|
) |
|
print(f"Uploaded archive #{archive['id']}: {archive['name']}") |
|
return 0 |
|
|
|
|
|
def download_archive( |
|
server: str, |
|
archive_id: int, |
|
output: Path, |
|
progress_callback: ProgressCallback | None = None, |
|
) -> int: |
|
url = f"{server}/archives/{archive_id}/download" |
|
req = urllib.request.Request(url, method="GET") |
|
notify_progress(progress_callback, "Preparing download…", 0.0) |
|
with urllib.request.urlopen(req) as response: |
|
if output.is_dir(): |
|
filename = response.headers.get_filename() or f"archive-{archive_id}.bin" |
|
destination = output / filename |
|
else: |
|
destination = output |
|
|
|
total_bytes = response.headers.get("Content-Length") |
|
total = int(total_bytes) if total_bytes and total_bytes.isdigit() else None |
|
received = 0 |
|
|
|
with destination.open("wb") as buffer: |
|
while chunk := response.read(CHUNK_SIZE): |
|
buffer.write(chunk) |
|
received += len(chunk) |
|
fraction = (received / total) if total else None |
|
notify_progress(progress_callback, f"Downloading archive… {received / (1024 * 1024):.1f} MB", fraction) |
|
|
|
notify_progress(progress_callback, f"Download completed: {destination}", 1.0) |
|
if progress_callback is None: |
|
print(f"Downloaded to {destination}") |
|
return 0 |
|
|
|
|
|
def find_remote_archive(server: str, bottle_ref: str) -> dict[str, Any]: |
|
archives = get_archives(server) |
|
bottle_ref_lower = bottle_ref.lower() |
|
|
|
for item in archives: |
|
if (item.get("bottle_name") or "").lower() == bottle_ref_lower: |
|
return item |
|
for item in archives: |
|
if (item.get("name") or "").lower() == bottle_ref_lower: |
|
return item |
|
|
|
raise FileNotFoundError(f"No remote archive found for '{bottle_ref}'") |
|
|
|
|
|
def safe_extract_tar(archive_path: Path, target_dir: Path) -> None: |
|
with tarfile.open(archive_path, "r:gz") as tar: |
|
for member in tar.getmembers(): |
|
member_path = (target_dir / member.name).resolve() |
|
if not str(member_path).startswith(str(target_dir.resolve())): |
|
raise ValueError("Unsafe archive path detected.") |
|
tar.extractall(target_dir, filter="fully_trusted") |
|
|
|
|
|
def install_archive_from_metadata( |
|
server: str, |
|
archive: dict[str, Any], |
|
bottles_dir: Path, |
|
replace: bool, |
|
progress_callback: ProgressCallback | None = None, |
|
bottle_ref: str | None = None, |
|
) -> int: |
|
bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref or "unknown-bottle" |
|
target_dir = bottles_dir / bottle_name |
|
|
|
if target_dir.exists(): |
|
if not replace: |
|
notify_progress(progress_callback, f"Bottle already exists: {target_dir}", None) |
|
print( |
|
f"Bottle already exists: {target_dir}\n" |
|
"Use --replace to overwrite it.", |
|
file=sys.stderr, |
|
) |
|
return 1 |
|
notify_progress(progress_callback, f"Removing existing bottle: {target_dir}", None) |
|
shutil.rmtree(target_dir) |
|
|
|
bottles_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
with tempfile.TemporaryDirectory(prefix="bottle-install-") as tmp_dir: |
|
tmp_path = Path(tmp_dir) |
|
download_path = tmp_path / f"archive-{archive['id']}.tar.gz" |
|
notify_progress(progress_callback, "Downloading archive…", 0.0) |
|
download_archive(server, int(archive["id"]), download_path, progress_callback=progress_callback) |
|
|
|
extract_dir = tmp_path / "extract" |
|
extract_dir.mkdir(parents=True, exist_ok=True) |
|
notify_progress(progress_callback, "Extracting archive…", None) |
|
safe_extract_tar(download_path, extract_dir) |
|
|
|
candidates = [path for path in extract_dir.iterdir() if path.is_dir()] |
|
if not candidates: |
|
raise FileNotFoundError("Archive did not contain a bottle directory.") |
|
|
|
source_dir = candidates[0] |
|
bottle_yml = source_dir / "bottle.yml" |
|
if not bottle_yml.exists(): |
|
raise FileNotFoundError("Archive does not look like a valid bottle backup.") |
|
|
|
notify_progress(progress_callback, f"Installing into {target_dir}…", None) |
|
shutil.move(str(source_dir), str(target_dir)) |
|
|
|
notify_progress(progress_callback, f"Installed bottle '{bottle_name}' to {target_dir}", 1.0) |
|
if progress_callback is None: |
|
print(f"Installed bottle '{bottle_name}' to {target_dir}") |
|
return 0 |
|
|
|
|
|
def install_archive( |
|
server: str, |
|
bottle_ref: str, |
|
bottles_dir: Path, |
|
replace: bool, |
|
progress_callback: ProgressCallback | None = None, |
|
) -> int: |
|
archive = find_remote_archive(server, bottle_ref) |
|
return install_archive_from_metadata( |
|
server, |
|
archive, |
|
bottles_dir, |
|
replace, |
|
progress_callback=progress_callback, |
|
bottle_ref=bottle_ref, |
|
) |
|
|
|
|
|
def parse_bottle_yml(bottle_yml: Path) -> dict[str, str]: |
|
data: dict[str, str] = {} |
|
wanted_keys = {"Name", "Arch", "Runner", "Environment", "Windows"} |
|
|
|
for line in bottle_yml.read_text(encoding="utf-8", errors="ignore").splitlines(): |
|
if ":" not in line or line.startswith(" "): |
|
continue |
|
key, value = line.split(":", 1) |
|
if key in wanted_keys: |
|
data[key] = value.strip().strip("'") |
|
|
|
return data |
|
|
|
|
|
def collect_local_bottles(bottles_dir: Path) -> list[dict[str, Any]]: |
|
if not bottles_dir.exists(): |
|
return [] |
|
|
|
bottles: list[dict[str, Any]] = [] |
|
for directory in sorted(path for path in bottles_dir.iterdir() if path.is_dir()): |
|
bottle_yml = directory / "bottle.yml" |
|
if not bottle_yml.exists(): |
|
continue |
|
|
|
metadata = parse_bottle_yml(bottle_yml) |
|
bottles.append( |
|
{ |
|
"name": metadata.get("Name", directory.name), |
|
"directory": directory.name, |
|
"path": str(directory), |
|
"arch": metadata.get("Arch", "-"), |
|
"runner": metadata.get("Runner", "-"), |
|
"environment": metadata.get("Environment", "-"), |
|
"windows": metadata.get("Windows", "-"), |
|
} |
|
) |
|
|
|
return bottles |
|
|
|
|
|
def scan_local_bottles(bottles_dir: Path, as_json: bool) -> int: |
|
bottles = collect_local_bottles(bottles_dir) |
|
if as_json: |
|
print(json.dumps(bottles, indent=2)) |
|
return 0 |
|
|
|
if not bottles: |
|
print(f"No local bottles found in {bottles_dir}") |
|
return 0 |
|
|
|
print(f"Local Bottles directory: {bottles_dir}") |
|
print(f"{'Dir':<24} {'Name':<28} {'Arch':<8} {'Runner':<18} {'Env':<12} {'Windows':<10}") |
|
print("-" * 110) |
|
for bottle in bottles: |
|
print( |
|
f"{bottle['directory'][:24]:<24} " |
|
f"{bottle['name'][:28]:<28} " |
|
f"{bottle['arch']:<8} " |
|
f"{bottle['runner'][:18]:<18} " |
|
f"{bottle['environment'][:12]:<12} " |
|
f"{bottle['windows'][:10]:<10}" |
|
) |
|
return 0 |
|
|
|
|
|
def create_bottle_backup(bottle: dict[str, Any], output_dir: Path | None = None) -> Path: |
|
source_dir = Path(bottle["path"]) |
|
target_dir = output_dir or Path(tempfile.gettempdir()) |
|
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") |
|
archive_name = f"{bottle['directory']}-{timestamp}.tar.gz" |
|
archive_path = target_dir / archive_name |
|
|
|
with tarfile.open(archive_path, "w:gz") as tar: |
|
tar.add(source_dir, arcname=source_dir.name) |
|
|
|
return archive_path |
|
|
|
|
|
def prompt_text(label: str, default: str) -> str: |
|
value = input(f"{label} [{default}]: ").strip() |
|
return value or default |
|
|
|
|
|
def choose_bottle(bottles: list[dict[str, Any]]) -> dict[str, Any]: |
|
print("Available local bottles:") |
|
for index, bottle in enumerate(bottles, start=1): |
|
print( |
|
f" {index}. {bottle['directory']} " |
|
f"(arch={bottle['arch']}, runner={bottle['runner']}, windows={bottle['windows']})" |
|
) |
|
|
|
while True: |
|
raw = input("Select a bottle number: ").strip() |
|
try: |
|
choice = int(raw) |
|
except ValueError: |
|
print("Please enter a valid number.") |
|
continue |
|
|
|
if 1 <= choice <= len(bottles): |
|
return bottles[choice - 1] |
|
|
|
print("Choice out of range.") |
|
|
|
|
|
def wizard_upload(server: str, bottles_dir: Path) -> int: |
|
bottles = collect_local_bottles(bottles_dir) |
|
if not bottles: |
|
print(f"No local bottles found in {bottles_dir}") |
|
return 0 |
|
|
|
bottle = choose_bottle(bottles) |
|
display_name = prompt_text("Archive name", bottle["name"]) |
|
bottle_name = prompt_text("Bottle name", bottle["directory"]) |
|
description = prompt_text("Description", f"Backup of {bottle['name']}") |
|
tags = prompt_text("Tags", "bottles,backup") |
|
|
|
print(f"\nCreating backup for {bottle['directory']}...") |
|
archive_path = create_bottle_backup(bottle) |
|
print(f"Backup created: {archive_path}") |
|
|
|
try: |
|
return upload_archive( |
|
server, |
|
archive_path, |
|
{ |
|
"name": display_name, |
|
"bottle_name": bottle_name, |
|
"description": description, |
|
"tags": tags, |
|
"arch": str(bottle.get("arch", "")), |
|
"runner": str(bottle.get("runner", "")), |
|
"windows_version": str(bottle.get("windows", "")), |
|
}, |
|
) |
|
finally: |
|
archive_path.unlink(missing_ok=True) |
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser: |
|
parser = argparse.ArgumentParser(description="Minimal client for Bottle Archive Server") |
|
parser.add_argument("--server", default=DEFAULT_SERVER, help="Base server URL") |
|
|
|
subparsers = parser.add_subparsers(dest="command", required=True) |
|
|
|
subparsers.add_parser("list", help="List uploaded bottle archives") |
|
|
|
upload_parser = subparsers.add_parser("upload", help="Upload a bottle archive") |
|
upload_parser.add_argument("file", type=Path, help="Path to the archive file") |
|
upload_parser.add_argument("--name", required=True, help="Display name") |
|
upload_parser.add_argument("--bottle-name", help="Bottle name") |
|
upload_parser.add_argument("--description", help="Description") |
|
upload_parser.add_argument("--tags", help="Tags") |
|
upload_parser.add_argument("--arch", help="Architecture") |
|
upload_parser.add_argument("--runner", help="Runner") |
|
upload_parser.add_argument("--windows-version", help="Windows version") |
|
|
|
download_parser = subparsers.add_parser("download", help="Download an archive by id") |
|
download_parser.add_argument("archive_id", type=int, help="Archive id") |
|
download_parser.add_argument("output", type=Path, help="Output file or directory") |
|
|
|
install_parser = subparsers.add_parser( |
|
"install", |
|
help="Install a bottle from the server by bottle name or archive name", |
|
) |
|
install_parser.add_argument("bottle", help="Bottle name or archive name to install") |
|
install_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=DEFAULT_BOTTLES_DIR, |
|
help="Path to the local Bottles directory", |
|
) |
|
install_parser.add_argument("--replace", action="store_true", help="Replace existing local bottle") |
|
|
|
scan_parser = subparsers.add_parser("scan-local", help="List bottles installed on this computer") |
|
scan_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=DEFAULT_BOTTLES_DIR, |
|
help="Path to the local Bottles directory", |
|
) |
|
scan_parser.add_argument("--json", action="store_true", help="Print results as JSON") |
|
|
|
wizard_parser = subparsers.add_parser( |
|
"wizard-upload", |
|
help="Choose a local bottle, create a backup, and upload it to the server", |
|
) |
|
wizard_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=DEFAULT_BOTTLES_DIR, |
|
help="Path to the local Bottles directory", |
|
) |
|
|
|
return parser |
|
|
|
|
|
def main() -> int: |
|
parser = build_parser() |
|
args = parser.parse_args() |
|
server = args.server.rstrip("/") |
|
|
|
try: |
|
if args.command == "list": |
|
return print_archives(server) |
|
if args.command == "upload": |
|
return upload_archive( |
|
server, |
|
args.file, |
|
{ |
|
"name": args.name, |
|
"bottle_name": args.bottle_name or "", |
|
"description": args.description or "", |
|
"tags": args.tags or "", |
|
"arch": args.arch or "", |
|
"runner": args.runner or "", |
|
"windows_version": args.windows_version or "", |
|
}, |
|
) |
|
if args.command == "download": |
|
return download_archive(server, args.archive_id, args.output) |
|
if args.command == "install": |
|
return install_archive(server, args.bottle, args.bottles_dir, args.replace) |
|
if args.command == "scan-local": |
|
return scan_local_bottles(args.bottles_dir, args.json) |
|
if args.command == "wizard-upload": |
|
return wizard_upload(server, args.bottles_dir) |
|
parser.error("Unknown command") |
|
return 2 |
|
except urllib.error.HTTPError as exc: |
|
detail = exc.read().decode("utf-8", errors="ignore") |
|
print(f"HTTP {exc.code}: {detail or exc.reason}", file=sys.stderr) |
|
return 1 |
|
except urllib.error.URLError as exc: |
|
print(f"Connection error: {exc.reason}", file=sys.stderr) |
|
return 1 |
|
except FileNotFoundError as exc: |
|
print(f"File error: {exc}", file=sys.stderr) |
|
return 1 |
|
|
|
|
|
if __name__ == "__main__": |
|
raise SystemExit(main())
|
|
|