You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
679 lines
25 KiB
679 lines
25 KiB
#!/usr/bin/env python3 |
|
from __future__ import annotations |
|
|
|
import argparse |
|
import configparser |
|
import json |
|
import shutil |
|
import sys |
|
import tarfile |
|
import tempfile |
|
import urllib.error |
|
import urllib.parse |
|
import urllib.request |
|
from collections.abc import Callable |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Any |
|
|
|
DEFAULT_SERVER = "http://127.0.0.1:8080" |
|
DEFAULT_BOTTLES_DIR = Path.home() / ".var/app/com.usebottles.bottles/data/bottles/bottles" |
|
CONF_FILE = Path.home() / ".cellar.conf" |
|
|
|
|
|
def load_config() -> dict[str, str]: |
|
"""Read ~/.cellar.conf [cellar], creating it with defaults if absent. |
|
|
|
Keys returned: 'server', 'bottles_dir'. |
|
""" |
|
cfg = configparser.ConfigParser() |
|
if not CONF_FILE.exists(): |
|
cfg["cellar"] = { |
|
"server": DEFAULT_SERVER, |
|
"bottles_dir": str(DEFAULT_BOTTLES_DIR), |
|
} |
|
with CONF_FILE.open("w") as fh: |
|
cfg.write(fh) |
|
print(f"Created default config: {CONF_FILE}", file=sys.stderr) |
|
else: |
|
cfg.read(CONF_FILE) |
|
|
|
section = cfg["cellar"] if "cellar" in cfg else {} |
|
return { |
|
"server": section.get("server", DEFAULT_SERVER).strip(), |
|
"bottles_dir": section.get("bottles_dir", str(DEFAULT_BOTTLES_DIR)).strip(), |
|
} |
|
CHUNK_SIZE = 1024 * 1024 |
|
ProgressCallback = Callable[[str, float | None], None] |
|
|
|
|
|
def request_json(url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None): |
|
req = urllib.request.Request(url, data=data, method=method) |
|
for key, value in (headers or {}).items(): |
|
req.add_header(key, value) |
|
with urllib.request.urlopen(req) as response: |
|
return json.loads(response.read().decode("utf-8")) |
|
|
|
|
|
def get_archives(server: str) -> list[dict[str, Any]]: |
|
return request_json(f"{server}/archives") |
|
|
|
|
|
def get_archive(server: str, archive_id: int) -> dict[str, Any]: |
|
return request_json(f"{server}/archives/{archive_id}") |
|
|
|
|
|
def notify_progress(progress_callback: ProgressCallback | None, message: str, fraction: float | None = None) -> None: |
|
if progress_callback is not None: |
|
progress_callback(message, fraction) |
|
|
|
|
|
def print_archives(server: str) -> int: |
|
archives = get_archives(server) |
|
if not archives: |
|
print("No archives found.") |
|
return 0 |
|
|
|
print(f"{'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}") |
|
print("-" * 110) |
|
for item in archives: |
|
size_mb = item["size_bytes"] / (1024 * 1024) |
|
print( |
|
f"{item['id']:<4} " |
|
f"{(item['name'] or '-')[:30]:<30} " |
|
f"{(item.get('bottle_name') or '-')[:30]:<30} " |
|
f"{(item.get('arch') or '-'):<8} " |
|
f"{(item.get('runner') or '-')[:18]:<18} " |
|
f"{size_mb:>10.2f}" |
|
) |
|
return 0 |
|
|
|
|
|
def upload_archive_with_result( |
|
server: str, |
|
file_path: Path, |
|
fields: dict[str, str], |
|
progress_callback: ProgressCallback | None = None, |
|
) -> dict[str, Any]: |
|
boundary = "----BottleArchiveBoundary7MA4YWxkTrZu0gW" |
|
data = [] |
|
|
|
notify_progress(progress_callback, "Preparing upload…", 0.0) |
|
|
|
def add_field(field_name: str, value: str): |
|
data.extend( |
|
[ |
|
f"--{boundary}\r\n".encode(), |
|
f'Content-Disposition: form-data; name="{field_name}"\r\n\r\n'.encode(), |
|
value.encode(), |
|
b"\r\n", |
|
] |
|
) |
|
|
|
for key, value in fields.items(): |
|
if value: |
|
add_field(key, value) |
|
|
|
filename = file_path.name |
|
mime = "application/gzip" if filename.endswith((".tar.gz", ".tgz", ".gz")) else "application/octet-stream" |
|
notify_progress(progress_callback, f"Reading archive {filename}…", 0.25) |
|
data.extend( |
|
[ |
|
f"--{boundary}\r\n".encode(), |
|
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode(), |
|
f"Content-Type: {mime}\r\n\r\n".encode(), |
|
file_path.read_bytes(), |
|
b"\r\n", |
|
f"--{boundary}--\r\n".encode(), |
|
] |
|
) |
|
|
|
payload = b"".join(data) |
|
notify_progress(progress_callback, "Uploading archive…", 0.7) |
|
archive = request_json( |
|
f"{server}/archives", |
|
method="POST", |
|
data=payload, |
|
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, |
|
) |
|
notify_progress(progress_callback, f"Upload completed: {archive['name']}", 1.0) |
|
return archive |
|
|
|
|
|
def upload_archive( |
|
server: str, |
|
file_path: Path, |
|
fields: dict[str, str], |
|
progress_callback: ProgressCallback | None = None, |
|
) -> int: |
|
archive = upload_archive_with_result( |
|
server, |
|
file_path, |
|
fields, |
|
progress_callback=progress_callback, |
|
) |
|
print(f"Uploaded archive #{archive['id']}: {archive['name']}") |
|
return 0 |
|
|
|
|
|
def download_archive( |
|
server: str, |
|
archive_id: int, |
|
output: Path, |
|
progress_callback: ProgressCallback | None = None, |
|
) -> int: |
|
url = f"{server}/archives/{archive_id}/download" |
|
req = urllib.request.Request(url, method="GET") |
|
notify_progress(progress_callback, "Preparing download…", 0.0) |
|
with urllib.request.urlopen(req) as response: |
|
if output.is_dir(): |
|
filename = response.headers.get_filename() or f"archive-{archive_id}.bin" |
|
destination = output / filename |
|
else: |
|
destination = output |
|
|
|
total_bytes = response.headers.get("Content-Length") |
|
total = int(total_bytes) if total_bytes and total_bytes.isdigit() else None |
|
received = 0 |
|
|
|
with destination.open("wb") as buffer: |
|
while chunk := response.read(CHUNK_SIZE): |
|
buffer.write(chunk) |
|
received += len(chunk) |
|
fraction = (received / total) if total else None |
|
notify_progress(progress_callback, f"Downloading archive… {received / (1024 * 1024):.1f} MB", fraction) |
|
|
|
notify_progress(progress_callback, f"Download completed: {destination}", 1.0) |
|
if progress_callback is None: |
|
print(f"Downloaded to {destination}") |
|
return 0 |
|
|
|
|
|
def find_remote_archive(server: str, bottle_ref: str) -> dict[str, Any]: |
|
archives = get_archives(server) |
|
bottle_ref_lower = bottle_ref.lower() |
|
|
|
for item in archives: |
|
if (item.get("bottle_name") or "").lower() == bottle_ref_lower: |
|
return item |
|
for item in archives: |
|
if (item.get("name") or "").lower() == bottle_ref_lower: |
|
return item |
|
|
|
raise FileNotFoundError(f"No remote archive found for '{bottle_ref}'") |
|
|
|
|
|
def safe_extract_tar(archive_path: Path, target_dir: Path) -> None: |
|
with tarfile.open(archive_path, "r:gz") as tar: |
|
for member in tar.getmembers(): |
|
member_path = (target_dir / member.name).resolve() |
|
if not str(member_path).startswith(str(target_dir.resolve())): |
|
raise ValueError("Unsafe archive path detected.") |
|
tar.extractall(target_dir, filter="fully_trusted") |
|
|
|
|
|
def install_archive_from_metadata( |
|
server: str, |
|
archive: dict[str, Any], |
|
bottles_dir: Path, |
|
replace: bool, |
|
progress_callback: ProgressCallback | None = None, |
|
bottle_ref: str | None = None, |
|
) -> int: |
|
bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref or "unknown-bottle" |
|
target_dir = bottles_dir / bottle_name |
|
|
|
if target_dir.exists(): |
|
if not replace: |
|
notify_progress(progress_callback, f"Bottle already exists: {target_dir}", None) |
|
print( |
|
f"Bottle already exists: {target_dir}\n" |
|
"Use --replace to overwrite it.", |
|
file=sys.stderr, |
|
) |
|
return 1 |
|
notify_progress(progress_callback, f"Removing existing bottle: {target_dir}", None) |
|
shutil.rmtree(target_dir) |
|
|
|
bottles_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
with tempfile.TemporaryDirectory(prefix="bottle-install-") as tmp_dir: |
|
tmp_path = Path(tmp_dir) |
|
download_path = tmp_path / f"archive-{archive['id']}.tar.gz" |
|
notify_progress(progress_callback, "Downloading archive…", 0.0) |
|
download_archive(server, int(archive["id"]), download_path, progress_callback=progress_callback) |
|
|
|
extract_dir = tmp_path / "extract" |
|
extract_dir.mkdir(parents=True, exist_ok=True) |
|
notify_progress(progress_callback, "Extracting archive…", None) |
|
safe_extract_tar(download_path, extract_dir) |
|
|
|
candidates = [path for path in extract_dir.iterdir() if path.is_dir()] |
|
if not candidates: |
|
raise FileNotFoundError("Archive did not contain a bottle directory.") |
|
|
|
source_dir = candidates[0] |
|
bottle_yml = source_dir / "bottle.yml" |
|
if not bottle_yml.exists(): |
|
raise FileNotFoundError("Archive does not look like a valid bottle backup.") |
|
|
|
notify_progress(progress_callback, f"Installing into {target_dir}…", None) |
|
shutil.move(str(source_dir), str(target_dir)) |
|
|
|
notify_progress(progress_callback, f"Installed bottle '{bottle_name}' to {target_dir}", 1.0) |
|
if progress_callback is None: |
|
print(f"Installed bottle '{bottle_name}' to {target_dir}") |
|
return 0 |
|
|
|
|
|
def install_archive( |
|
server: str, |
|
bottle_ref: str, |
|
bottles_dir: Path, |
|
replace: bool, |
|
progress_callback: ProgressCallback | None = None, |
|
) -> int: |
|
archive = find_remote_archive(server, bottle_ref) |
|
return install_archive_from_metadata( |
|
server, |
|
archive, |
|
bottles_dir, |
|
replace, |
|
progress_callback=progress_callback, |
|
bottle_ref=bottle_ref, |
|
) |
|
|
|
|
|
def parse_bottle_yml(bottle_yml: Path) -> dict[str, str]: |
|
data: dict[str, str] = {} |
|
wanted_keys = {"Name", "Arch", "Runner", "Environment", "Windows"} |
|
|
|
for line in bottle_yml.read_text(encoding="utf-8", errors="ignore").splitlines(): |
|
if ":" not in line or line.startswith(" "): |
|
continue |
|
key, value = line.split(":", 1) |
|
if key in wanted_keys: |
|
data[key] = value.strip().strip("'") |
|
|
|
return data |
|
|
|
|
|
def collect_local_bottles(bottles_dir: Path) -> list[dict[str, Any]]: |
|
if not bottles_dir.exists(): |
|
return [] |
|
|
|
bottles: list[dict[str, Any]] = [] |
|
for directory in sorted(path for path in bottles_dir.iterdir() if path.is_dir()): |
|
bottle_yml = directory / "bottle.yml" |
|
if not bottle_yml.exists(): |
|
continue |
|
|
|
metadata = parse_bottle_yml(bottle_yml) |
|
bottles.append( |
|
{ |
|
"name": metadata.get("Name", directory.name), |
|
"directory": directory.name, |
|
"path": str(directory), |
|
"arch": metadata.get("Arch", "-"), |
|
"runner": metadata.get("Runner", "-"), |
|
"environment": metadata.get("Environment", "-"), |
|
"windows": metadata.get("Windows", "-"), |
|
} |
|
) |
|
|
|
return bottles |
|
|
|
|
|
def scan_local_bottles(bottles_dir: Path, as_json: bool) -> int: |
|
bottles = collect_local_bottles(bottles_dir) |
|
if as_json: |
|
print(json.dumps(bottles, indent=2)) |
|
return 0 |
|
|
|
if not bottles: |
|
print(f"No local bottles found in {bottles_dir}") |
|
return 0 |
|
|
|
print(f"Local Bottles directory: {bottles_dir}") |
|
print(f"{'Dir':<24} {'Name':<28} {'Arch':<8} {'Runner':<18} {'Env':<12} {'Windows':<10}") |
|
print("-" * 110) |
|
for bottle in bottles: |
|
print( |
|
f"{bottle['directory'][:24]:<24} " |
|
f"{bottle['name'][:28]:<28} " |
|
f"{bottle['arch']:<8} " |
|
f"{bottle['runner'][:18]:<18} " |
|
f"{bottle['environment'][:12]:<12} " |
|
f"{bottle['windows'][:10]:<10}" |
|
) |
|
return 0 |
|
|
|
|
|
def create_bottle_backup(bottle: dict[str, Any], output_dir: Path | None = None) -> Path: |
|
source_dir = Path(bottle["path"]) |
|
target_dir = output_dir or Path(tempfile.gettempdir()) |
|
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") |
|
archive_name = f"{bottle['directory']}-{timestamp}.tar.gz" |
|
archive_path = target_dir / archive_name |
|
|
|
with tarfile.open(archive_path, "w:gz") as tar: |
|
tar.add(source_dir, arcname=source_dir.name) |
|
|
|
return archive_path |
|
|
|
|
|
def prompt_text(label: str, default: str) -> str: |
|
value = input(f"{label} [{default}]: ").strip() |
|
return value or default |
|
|
|
|
|
def choose_bottle(bottles: list[dict[str, Any]]) -> dict[str, Any]: |
|
print("Available local bottles:") |
|
for index, bottle in enumerate(bottles, start=1): |
|
print( |
|
f" {index}. {bottle['directory']} " |
|
f"(arch={bottle['arch']}, runner={bottle['runner']}, windows={bottle['windows']})" |
|
) |
|
|
|
while True: |
|
raw = input("Select a bottle number: ").strip() |
|
try: |
|
choice = int(raw) |
|
except ValueError: |
|
print("Please enter a valid number.") |
|
continue |
|
|
|
if 1 <= choice <= len(bottles): |
|
return bottles[choice - 1] |
|
|
|
print("Choice out of range.") |
|
|
|
|
|
def wizard_upload(server: str, bottles_dir: Path) -> int: |
|
bottles = collect_local_bottles(bottles_dir) |
|
if not bottles: |
|
print(f"No local bottles found in {bottles_dir}") |
|
return 0 |
|
|
|
bottle = choose_bottle(bottles) |
|
display_name = prompt_text("Archive name", bottle["name"]) |
|
bottle_name = prompt_text("Bottle name", bottle["directory"]) |
|
description = prompt_text("Description", f"Backup of {bottle['name']}") |
|
tags = prompt_text("Tags", "bottles,backup") |
|
|
|
print(f"\nCreating backup for {bottle['directory']}...") |
|
archive_path = create_bottle_backup(bottle) |
|
print(f"Backup created: {archive_path}") |
|
|
|
try: |
|
return upload_archive( |
|
server, |
|
archive_path, |
|
{ |
|
"name": display_name, |
|
"bottle_name": bottle_name, |
|
"description": description, |
|
"tags": tags, |
|
"arch": str(bottle.get("arch", "")), |
|
"runner": str(bottle.get("runner", "")), |
|
"windows_version": str(bottle.get("windows", "")), |
|
}, |
|
) |
|
finally: |
|
archive_path.unlink(missing_ok=True) |
|
|
|
|
|
def wizard_install(server: str, bottles_dir: Path, replace: bool) -> int: |
|
archives = get_archives(server) |
|
if not archives: |
|
print("No archives found on the server.") |
|
return 0 |
|
|
|
print(f"{'#':<4} {'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}") |
|
print("-" * 114) |
|
for index, item in enumerate(archives, start=1): |
|
size_mb = item["size_bytes"] / (1024 * 1024) |
|
print( |
|
f"{index:<4} " |
|
f"{item['id']:<4} " |
|
f"{(item['name'] or '-')[:30]:<30} " |
|
f"{(item.get('bottle_name') or '-')[:30]:<30} " |
|
f"{(item.get('arch') or '-'):<8} " |
|
f"{(item.get('runner') or '-')[:18]:<18} " |
|
f"{size_mb:>10.2f}" |
|
) |
|
|
|
while True: |
|
raw = input("\nSelect an archive number: ").strip() |
|
try: |
|
choice = int(raw) |
|
except ValueError: |
|
print("Please enter a valid number.") |
|
continue |
|
if 1 <= choice <= len(archives): |
|
archive = archives[choice - 1] |
|
break |
|
print("Choice out of range.") |
|
|
|
if not replace: |
|
bottle_name = archive.get("bottle_name") or archive.get("name") or "unknown" |
|
target_dir = bottles_dir / bottle_name |
|
if target_dir.exists(): |
|
ans = input(f"Bottle '{bottle_name}' already exists at {target_dir}. Replace? [y/N]: ").strip().lower() |
|
replace = ans in ("y", "yes") |
|
|
|
return install_archive_from_metadata(server, archive, bottles_dir, replace) |
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser: |
|
parser = argparse.ArgumentParser( |
|
prog="cellar-cli", |
|
description=( |
|
"Cellar — command-line client for the Bottle Archive Server.\n\n" |
|
"Manage backups of Wine prefixes (Bottles) hosted on a remote server.\n" |
|
"You can list, upload, download, and install bottle archives, or run the\n" |
|
"interactive wizard to pick a local bottle, pack it, and upload it in one step." |
|
), |
|
epilog=( |
|
"examples:\n" |
|
" %(prog)s list\n" |
|
" %(prog)s --server http://brain.local:8080 list\n" |
|
" %(prog)s --server http://brain.local:8080 wizard-upload\n" |
|
" %(prog)s upload MyGame.tar.gz --name 'My Game' --tags 'gog,rpg'\n" |
|
" %(prog)s download 3 ~/Downloads/\n" |
|
" %(prog)s install 'My Game' --replace\n" |
|
" %(prog)s scan-local --json\n" |
|
" %(prog)s wizard-install\n" |
|
), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
conf = load_config() |
|
default_server = conf["server"] |
|
default_bottles_dir = Path(conf["bottles_dir"]) |
|
parser.add_argument( |
|
"--server", |
|
default=default_server, |
|
metavar="URL", |
|
help=( |
|
f"Base URL of the Bottle Archive Server " |
|
f"(default: {default_server}; override via {CONF_FILE})" |
|
), |
|
) |
|
|
|
subparsers = parser.add_subparsers(dest="command", required=True, title="commands") |
|
|
|
subparsers.add_parser( |
|
"list", |
|
help="List all archives stored on the server", |
|
description="Fetch and display every bottle archive available on the server, including\nname, source bottle, architecture, runner, and compressed size.", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
|
|
upload_parser = subparsers.add_parser( |
|
"upload", |
|
help="Upload a pre-existing archive file to the server", |
|
description=( |
|
"Upload a .tar.gz bottle archive that you already created manually.\n" |
|
"Use 'wizard-upload' instead to let the tool create the archive for you." |
|
), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
upload_parser.add_argument("file", type=Path, help="Path to the .tar.gz archive file to upload") |
|
upload_parser.add_argument("--name", required=True, metavar="TEXT", help="Human-readable display name for the archive (required)") |
|
upload_parser.add_argument("--bottle-name", metavar="TEXT", help="Internal Bottles directory name (defaults to --name)") |
|
upload_parser.add_argument("--description", metavar="TEXT", help="Free-text description shown in the catalogue") |
|
upload_parser.add_argument("--tags", metavar="TAG[,TAG…]", help="Comma-separated list of tags, e.g. 'gog,rpg,win32'") |
|
upload_parser.add_argument("--arch", metavar="ARCH", help="Windows architecture target, e.g. 'win32' or 'win64'") |
|
upload_parser.add_argument("--runner", metavar="NAME", help="Wine/Proton runner used by the bottle, e.g. 'soda-9.0-1'") |
|
upload_parser.add_argument("--windows-version", metavar="VERSION", help="Emulated Windows version, e.g. 'win10'") |
|
|
|
download_parser = subparsers.add_parser( |
|
"download", |
|
help="Download a raw archive file from the server", |
|
description="Download the compressed .tar.gz archive for a specific archive ID.\nPass the numeric ID shown by 'list'.", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
download_parser.add_argument("archive_id", type=int, metavar="ID", help="Numeric archive ID (see: cellar-cli list)") |
|
download_parser.add_argument("output", type=Path, metavar="DEST", help="Destination: a file path or an existing directory") |
|
|
|
install_parser = subparsers.add_parser( |
|
"install", |
|
help="Download and install a bottle directly into the local Bottles data directory", |
|
description=( |
|
"Fetch the archive that matches BOTTLE (matched against archive name or source\n" |
|
"bottle name) and extract it into the local Bottles directory so it appears\n" |
|
"immediately in the Bottles app." |
|
), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
install_parser.add_argument("bottle", metavar="BOTTLE", help="Archive name or bottle name to search for on the server") |
|
install_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=default_bottles_dir, |
|
metavar="DIR", |
|
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})", |
|
) |
|
install_parser.add_argument( |
|
"--replace", |
|
action="store_true", |
|
help="Overwrite the local bottle if a directory with the same name already exists", |
|
) |
|
|
|
scan_parser = subparsers.add_parser( |
|
"scan-local", |
|
help="List all Wine prefixes (bottles) found on this computer", |
|
description=( |
|
"Scan the local Bottles data directory and print every bottle found,\n" |
|
"including its architecture, runner, environment type, and Windows version." |
|
), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
scan_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=default_bottles_dir, |
|
metavar="DIR", |
|
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})", |
|
) |
|
scan_parser.add_argument("--json", action="store_true", help="Output the results as a JSON array instead of a table") |
|
|
|
wizard_parser = subparsers.add_parser( |
|
"wizard-upload", |
|
help="Interactive wizard: pick a local bottle, pack it, and upload it to the server", |
|
description=( |
|
"Guided upload flow:\n" |
|
" 1. Scan the local Bottles directory and show a numbered list.\n" |
|
" 2. Prompt you to select a bottle.\n" |
|
" 3. Ask for archive name, description, and tags (pre-filled with sensible defaults).\n" |
|
" 4. Create a compressed .tar.gz backup in a temporary directory.\n" |
|
" 5. Upload the archive to the server and report the assigned ID.\n\n" |
|
"The temporary archive file is deleted automatically after upload." |
|
), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
wizard_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=default_bottles_dir, |
|
metavar="DIR", |
|
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})", |
|
) |
|
|
|
wizard_install_parser = subparsers.add_parser( |
|
"wizard-install", |
|
help="Interactive wizard: pick a remote archive and install it locally", |
|
description=( |
|
"Guided install flow:\n" |
|
" 1. Fetch all archives available on the server and show a numbered list.\n" |
|
" 2. Prompt you to select one.\n" |
|
" 3. Download, extract, and install the bottle into the local Bottles directory.\n" |
|
" 4. If the bottle already exists, ask whether to replace it (or use --replace).\n\n" |
|
"The downloaded archive is removed automatically after extraction." |
|
), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
wizard_install_parser.add_argument( |
|
"--bottles-dir", |
|
type=Path, |
|
default=default_bottles_dir, |
|
metavar="DIR", |
|
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})", |
|
) |
|
wizard_install_parser.add_argument( |
|
"--replace", |
|
action="store_true", |
|
help="Overwrite the local bottle without prompting if it already exists", |
|
) |
|
|
|
return parser |
|
|
|
|
|
def main() -> int: |
|
parser = build_parser() |
|
args = parser.parse_args() |
|
server = args.server.rstrip("/") |
|
|
|
try: |
|
if args.command == "list": |
|
return print_archives(server) |
|
if args.command == "upload": |
|
return upload_archive( |
|
server, |
|
args.file, |
|
{ |
|
"name": args.name, |
|
"bottle_name": args.bottle_name or "", |
|
"description": args.description or "", |
|
"tags": args.tags or "", |
|
"arch": args.arch or "", |
|
"runner": args.runner or "", |
|
"windows_version": args.windows_version or "", |
|
}, |
|
) |
|
if args.command == "download": |
|
return download_archive(server, args.archive_id, args.output) |
|
if args.command == "install": |
|
return install_archive(server, args.bottle, args.bottles_dir, args.replace) |
|
if args.command == "scan-local": |
|
return scan_local_bottles(args.bottles_dir, args.json) |
|
if args.command == "wizard-upload": |
|
return wizard_upload(server, args.bottles_dir) |
|
if args.command == "wizard-install": |
|
return wizard_install(server, args.bottles_dir, args.replace) |
|
parser.error("Unknown command") |
|
return 2 |
|
except urllib.error.HTTPError as exc: |
|
detail = exc.read().decode("utf-8", errors="ignore") |
|
print(f"HTTP {exc.code}: {detail or exc.reason}", file=sys.stderr) |
|
return 1 |
|
except urllib.error.URLError as exc: |
|
print(f"Connection error: {exc.reason}", file=sys.stderr) |
|
return 1 |
|
except FileNotFoundError as exc: |
|
print(f"File error: {exc}", file=sys.stderr) |
|
return 1 |
|
|
|
|
|
if __name__ == "__main__": |
|
raise SystemExit(main())
|
|
|