You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

528 lines
18 KiB

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import configparser
import json
import shutil
import sys
import tarfile
import tempfile
import urllib.error
import urllib.parse
import urllib.request
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any
DEFAULT_SERVER = "http://127.0.0.1:8080"
DEFAULT_BOTTLES_DIR = Path.home() / ".var/app/com.usebottles.bottles/data/bottles/bottles"
CONF_FILE = Path.home() / ".cellar.conf"
CHUNK_SIZE = 1024 * 1024
ProgressCallback = Callable[[str, float | None], None]
def load_cellar_conf() -> dict[str, str]:
"""Read ~/.cellar.conf [cellar], creating it with defaults if absent.
Returns a dict with keys 'server' and 'bottles_dir'.
"""
cfg = configparser.ConfigParser()
if not CONF_FILE.exists():
cfg["cellar"] = {
"server": DEFAULT_SERVER,
"bottles_dir": str(DEFAULT_BOTTLES_DIR),
}
with CONF_FILE.open("w") as fh:
cfg.write(fh)
else:
cfg.read(CONF_FILE)
section = cfg["cellar"] if "cellar" in cfg else {}
return {
"server": section.get("server", DEFAULT_SERVER).strip(),
"bottles_dir": section.get("bottles_dir", str(DEFAULT_BOTTLES_DIR)).strip(),
}
def request_json(url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None):
req = urllib.request.Request(url, data=data, method=method)
for key, value in (headers or {}).items():
req.add_header(key, value)
with urllib.request.urlopen(req) as response:
return json.loads(response.read().decode("utf-8"))
def get_archives(server: str) -> list[dict[str, Any]]:
return request_json(f"{server}/archives")
def get_archive(server: str, archive_id: int) -> dict[str, Any]:
return request_json(f"{server}/archives/{archive_id}")
def notify_progress(progress_callback: ProgressCallback | None, message: str, fraction: float | None = None) -> None:
if progress_callback is not None:
progress_callback(message, fraction)
def print_archives(server: str) -> int:
archives = get_archives(server)
if not archives:
print("No archives found.")
return 0
print(f"{'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}")
print("-" * 110)
for item in archives:
size_mb = item["size_bytes"] / (1024 * 1024)
print(
f"{item['id']:<4} "
f"{(item['name'] or '-')[:30]:<30} "
f"{(item.get('bottle_name') or '-')[:30]:<30} "
f"{(item.get('arch') or '-'):<8} "
f"{(item.get('runner') or '-')[:18]:<18} "
f"{size_mb:>10.2f}"
)
return 0
def upload_archive_with_result(
server: str,
file_path: Path,
fields: dict[str, str],
progress_callback: ProgressCallback | None = None,
) -> dict[str, Any]:
boundary = "----BottleArchiveBoundary7MA4YWxkTrZu0gW"
data = []
notify_progress(progress_callback, "Preparing upload…", 0.0)
def add_field(field_name: str, value: str):
data.extend(
[
f"--{boundary}\r\n".encode(),
f'Content-Disposition: form-data; name="{field_name}"\r\n\r\n'.encode(),
value.encode(),
b"\r\n",
]
)
for key, value in fields.items():
if value:
add_field(key, value)
filename = file_path.name
mime = "application/gzip" if filename.endswith((".tar.gz", ".tgz", ".gz")) else "application/octet-stream"
notify_progress(progress_callback, f"Reading archive {filename}", 0.25)
data.extend(
[
f"--{boundary}\r\n".encode(),
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode(),
f"Content-Type: {mime}\r\n\r\n".encode(),
file_path.read_bytes(),
b"\r\n",
f"--{boundary}--\r\n".encode(),
]
)
payload = b"".join(data)
notify_progress(progress_callback, "Uploading archive…", 0.7)
archive = request_json(
f"{server}/archives",
method="POST",
data=payload,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
)
notify_progress(progress_callback, f"Upload completed: {archive['name']}", 1.0)
return archive
def upload_archive(
server: str,
file_path: Path,
fields: dict[str, str],
progress_callback: ProgressCallback | None = None,
) -> int:
archive = upload_archive_with_result(
server,
file_path,
fields,
progress_callback=progress_callback,
)
print(f"Uploaded archive #{archive['id']}: {archive['name']}")
return 0
def download_archive(
server: str,
archive_id: int,
output: Path,
progress_callback: ProgressCallback | None = None,
) -> int:
url = f"{server}/archives/{archive_id}/download"
req = urllib.request.Request(url, method="GET")
notify_progress(progress_callback, "Preparing download…", 0.0)
with urllib.request.urlopen(req) as response:
if output.is_dir():
filename = response.headers.get_filename() or f"archive-{archive_id}.bin"
destination = output / filename
else:
destination = output
total_bytes = response.headers.get("Content-Length")
total = int(total_bytes) if total_bytes and total_bytes.isdigit() else None
received = 0
with destination.open("wb") as buffer:
while chunk := response.read(CHUNK_SIZE):
buffer.write(chunk)
received += len(chunk)
fraction = (received / total) if total else None
notify_progress(progress_callback, f"Downloading archive… {received / (1024 * 1024):.1f} MB", fraction)
notify_progress(progress_callback, f"Download completed: {destination}", 1.0)
if progress_callback is None:
print(f"Downloaded to {destination}")
return 0
def find_remote_archive(server: str, bottle_ref: str) -> dict[str, Any]:
archives = get_archives(server)
bottle_ref_lower = bottle_ref.lower()
for item in archives:
if (item.get("bottle_name") or "").lower() == bottle_ref_lower:
return item
for item in archives:
if (item.get("name") or "").lower() == bottle_ref_lower:
return item
raise FileNotFoundError(f"No remote archive found for '{bottle_ref}'")
def safe_extract_tar(archive_path: Path, target_dir: Path) -> None:
with tarfile.open(archive_path, "r:gz") as tar:
for member in tar.getmembers():
member_path = (target_dir / member.name).resolve()
if not str(member_path).startswith(str(target_dir.resolve())):
raise ValueError("Unsafe archive path detected.")
tar.extractall(target_dir, filter="fully_trusted")
def install_archive_from_metadata(
server: str,
archive: dict[str, Any],
bottles_dir: Path,
replace: bool,
progress_callback: ProgressCallback | None = None,
bottle_ref: str | None = None,
) -> int:
bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref or "unknown-bottle"
target_dir = bottles_dir / bottle_name
if target_dir.exists():
if not replace:
notify_progress(progress_callback, f"Bottle already exists: {target_dir}", None)
print(
f"Bottle already exists: {target_dir}\n"
"Use --replace to overwrite it.",
file=sys.stderr,
)
return 1
notify_progress(progress_callback, f"Removing existing bottle: {target_dir}", None)
shutil.rmtree(target_dir)
bottles_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix="bottle-install-") as tmp_dir:
tmp_path = Path(tmp_dir)
download_path = tmp_path / f"archive-{archive['id']}.tar.gz"
notify_progress(progress_callback, "Downloading archive…", 0.0)
download_archive(server, int(archive["id"]), download_path, progress_callback=progress_callback)
extract_dir = tmp_path / "extract"
extract_dir.mkdir(parents=True, exist_ok=True)
notify_progress(progress_callback, "Extracting archive…", None)
safe_extract_tar(download_path, extract_dir)
candidates = [path for path in extract_dir.iterdir() if path.is_dir()]
if not candidates:
raise FileNotFoundError("Archive did not contain a bottle directory.")
source_dir = candidates[0]
bottle_yml = source_dir / "bottle.yml"
if not bottle_yml.exists():
raise FileNotFoundError("Archive does not look like a valid bottle backup.")
notify_progress(progress_callback, f"Installing into {target_dir}", None)
shutil.move(str(source_dir), str(target_dir))
notify_progress(progress_callback, f"Installed bottle '{bottle_name}' to {target_dir}", 1.0)
if progress_callback is None:
print(f"Installed bottle '{bottle_name}' to {target_dir}")
return 0
def install_archive(
server: str,
bottle_ref: str,
bottles_dir: Path,
replace: bool,
progress_callback: ProgressCallback | None = None,
) -> int:
archive = find_remote_archive(server, bottle_ref)
return install_archive_from_metadata(
server,
archive,
bottles_dir,
replace,
progress_callback=progress_callback,
bottle_ref=bottle_ref,
)
def parse_bottle_yml(bottle_yml: Path) -> dict[str, str]:
data: dict[str, str] = {}
wanted_keys = {"Name", "Arch", "Runner", "Environment", "Windows"}
for line in bottle_yml.read_text(encoding="utf-8", errors="ignore").splitlines():
if ":" not in line or line.startswith(" "):
continue
key, value = line.split(":", 1)
if key in wanted_keys:
data[key] = value.strip().strip("'")
return data
def collect_local_bottles(bottles_dir: Path) -> list[dict[str, Any]]:
if not bottles_dir.exists():
return []
bottles: list[dict[str, Any]] = []
for directory in sorted(path for path in bottles_dir.iterdir() if path.is_dir()):
bottle_yml = directory / "bottle.yml"
if not bottle_yml.exists():
continue
metadata = parse_bottle_yml(bottle_yml)
bottles.append(
{
"name": metadata.get("Name", directory.name),
"directory": directory.name,
"path": str(directory),
"arch": metadata.get("Arch", "-"),
"runner": metadata.get("Runner", "-"),
"environment": metadata.get("Environment", "-"),
"windows": metadata.get("Windows", "-"),
}
)
return bottles
def scan_local_bottles(bottles_dir: Path, as_json: bool) -> int:
bottles = collect_local_bottles(bottles_dir)
if as_json:
print(json.dumps(bottles, indent=2))
return 0
if not bottles:
print(f"No local bottles found in {bottles_dir}")
return 0
print(f"Local Bottles directory: {bottles_dir}")
print(f"{'Dir':<24} {'Name':<28} {'Arch':<8} {'Runner':<18} {'Env':<12} {'Windows':<10}")
print("-" * 110)
for bottle in bottles:
print(
f"{bottle['directory'][:24]:<24} "
f"{bottle['name'][:28]:<28} "
f"{bottle['arch']:<8} "
f"{bottle['runner'][:18]:<18} "
f"{bottle['environment'][:12]:<12} "
f"{bottle['windows'][:10]:<10}"
)
return 0
def create_bottle_backup(bottle: dict[str, Any], output_dir: Path | None = None) -> Path:
source_dir = Path(bottle["path"])
target_dir = output_dir or Path(tempfile.gettempdir())
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
archive_name = f"{bottle['directory']}-{timestamp}.tar.gz"
archive_path = target_dir / archive_name
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(source_dir, arcname=source_dir.name)
return archive_path
def prompt_text(label: str, default: str) -> str:
value = input(f"{label} [{default}]: ").strip()
return value or default
def choose_bottle(bottles: list[dict[str, Any]]) -> dict[str, Any]:
print("Available local bottles:")
for index, bottle in enumerate(bottles, start=1):
print(
f" {index}. {bottle['directory']} "
f"(arch={bottle['arch']}, runner={bottle['runner']}, windows={bottle['windows']})"
)
while True:
raw = input("Select a bottle number: ").strip()
try:
choice = int(raw)
except ValueError:
print("Please enter a valid number.")
continue
if 1 <= choice <= len(bottles):
return bottles[choice - 1]
print("Choice out of range.")
def wizard_upload(server: str, bottles_dir: Path) -> int:
bottles = collect_local_bottles(bottles_dir)
if not bottles:
print(f"No local bottles found in {bottles_dir}")
return 0
bottle = choose_bottle(bottles)
display_name = prompt_text("Archive name", bottle["name"])
bottle_name = prompt_text("Bottle name", bottle["directory"])
description = prompt_text("Description", f"Backup of {bottle['name']}")
tags = prompt_text("Tags", "bottles,backup")
print(f"\nCreating backup for {bottle['directory']}...")
archive_path = create_bottle_backup(bottle)
print(f"Backup created: {archive_path}")
try:
return upload_archive(
server,
archive_path,
{
"name": display_name,
"bottle_name": bottle_name,
"description": description,
"tags": tags,
"arch": str(bottle.get("arch", "")),
"runner": str(bottle.get("runner", "")),
"windows_version": str(bottle.get("windows", "")),
},
)
finally:
archive_path.unlink(missing_ok=True)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Minimal client for Bottle Archive Server")
parser.add_argument("--server", default=DEFAULT_SERVER, help="Base server URL")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("list", help="List uploaded bottle archives")
upload_parser = subparsers.add_parser("upload", help="Upload a bottle archive")
upload_parser.add_argument("file", type=Path, help="Path to the archive file")
upload_parser.add_argument("--name", required=True, help="Display name")
upload_parser.add_argument("--bottle-name", help="Bottle name")
upload_parser.add_argument("--description", help="Description")
upload_parser.add_argument("--tags", help="Tags")
upload_parser.add_argument("--arch", help="Architecture")
upload_parser.add_argument("--runner", help="Runner")
upload_parser.add_argument("--windows-version", help="Windows version")
download_parser = subparsers.add_parser("download", help="Download an archive by id")
download_parser.add_argument("archive_id", type=int, help="Archive id")
download_parser.add_argument("output", type=Path, help="Output file or directory")
install_parser = subparsers.add_parser(
"install",
help="Install a bottle from the server by bottle name or archive name",
)
install_parser.add_argument("bottle", help="Bottle name or archive name to install")
install_parser.add_argument(
"--bottles-dir",
type=Path,
default=DEFAULT_BOTTLES_DIR,
help="Path to the local Bottles directory",
)
install_parser.add_argument("--replace", action="store_true", help="Replace existing local bottle")
scan_parser = subparsers.add_parser("scan-local", help="List bottles installed on this computer")
scan_parser.add_argument(
"--bottles-dir",
type=Path,
default=DEFAULT_BOTTLES_DIR,
help="Path to the local Bottles directory",
)
scan_parser.add_argument("--json", action="store_true", help="Print results as JSON")
wizard_parser = subparsers.add_parser(
"wizard-upload",
help="Choose a local bottle, create a backup, and upload it to the server",
)
wizard_parser.add_argument(
"--bottles-dir",
type=Path,
default=DEFAULT_BOTTLES_DIR,
help="Path to the local Bottles directory",
)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
server = args.server.rstrip("/")
try:
if args.command == "list":
return print_archives(server)
if args.command == "upload":
return upload_archive(
server,
args.file,
{
"name": args.name,
"bottle_name": args.bottle_name or "",
"description": args.description or "",
"tags": args.tags or "",
"arch": args.arch or "",
"runner": args.runner or "",
"windows_version": args.windows_version or "",
},
)
if args.command == "download":
return download_archive(server, args.archive_id, args.output)
if args.command == "install":
return install_archive(server, args.bottle, args.bottles_dir, args.replace)
if args.command == "scan-local":
return scan_local_bottles(args.bottles_dir, args.json)
if args.command == "wizard-upload":
return wizard_upload(server, args.bottles_dir)
parser.error("Unknown command")
return 2
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
print(f"HTTP {exc.code}: {detail or exc.reason}", file=sys.stderr)
return 1
except urllib.error.URLError as exc:
print(f"Connection error: {exc.reason}", file=sys.stderr)
return 1
except FileNotFoundError as exc:
print(f"File error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())