You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

679 lines
25 KiB

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import configparser
import json
import shutil
import sys
import tarfile
import tempfile
import urllib.error
import urllib.parse
import urllib.request
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any
DEFAULT_SERVER = "http://127.0.0.1:8080"
DEFAULT_BOTTLES_DIR = Path.home() / ".var/app/com.usebottles.bottles/data/bottles/bottles"
CONF_FILE = Path.home() / ".cellar.conf"
def load_config() -> dict[str, str]:
"""Read ~/.cellar.conf [cellar], creating it with defaults if absent.
Keys returned: 'server', 'bottles_dir'.
"""
cfg = configparser.ConfigParser()
if not CONF_FILE.exists():
cfg["cellar"] = {
"server": DEFAULT_SERVER,
"bottles_dir": str(DEFAULT_BOTTLES_DIR),
}
with CONF_FILE.open("w") as fh:
cfg.write(fh)
print(f"Created default config: {CONF_FILE}", file=sys.stderr)
else:
cfg.read(CONF_FILE)
section = cfg["cellar"] if "cellar" in cfg else {}
return {
"server": section.get("server", DEFAULT_SERVER).strip(),
"bottles_dir": section.get("bottles_dir", str(DEFAULT_BOTTLES_DIR)).strip(),
}
CHUNK_SIZE = 1024 * 1024
ProgressCallback = Callable[[str, float | None], None]
def request_json(url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None):
req = urllib.request.Request(url, data=data, method=method)
for key, value in (headers or {}).items():
req.add_header(key, value)
with urllib.request.urlopen(req) as response:
return json.loads(response.read().decode("utf-8"))
def get_archives(server: str) -> list[dict[str, Any]]:
return request_json(f"{server}/archives")
def get_archive(server: str, archive_id: int) -> dict[str, Any]:
return request_json(f"{server}/archives/{archive_id}")
def notify_progress(progress_callback: ProgressCallback | None, message: str, fraction: float | None = None) -> None:
if progress_callback is not None:
progress_callback(message, fraction)
def print_archives(server: str) -> int:
archives = get_archives(server)
if not archives:
print("No archives found.")
return 0
print(f"{'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}")
print("-" * 110)
for item in archives:
size_mb = item["size_bytes"] / (1024 * 1024)
print(
f"{item['id']:<4} "
f"{(item['name'] or '-')[:30]:<30} "
f"{(item.get('bottle_name') or '-')[:30]:<30} "
f"{(item.get('arch') or '-'):<8} "
f"{(item.get('runner') or '-')[:18]:<18} "
f"{size_mb:>10.2f}"
)
return 0
def upload_archive_with_result(
server: str,
file_path: Path,
fields: dict[str, str],
progress_callback: ProgressCallback | None = None,
) -> dict[str, Any]:
boundary = "----BottleArchiveBoundary7MA4YWxkTrZu0gW"
data = []
notify_progress(progress_callback, "Preparing upload…", 0.0)
def add_field(field_name: str, value: str):
data.extend(
[
f"--{boundary}\r\n".encode(),
f'Content-Disposition: form-data; name="{field_name}"\r\n\r\n'.encode(),
value.encode(),
b"\r\n",
]
)
for key, value in fields.items():
if value:
add_field(key, value)
filename = file_path.name
mime = "application/gzip" if filename.endswith((".tar.gz", ".tgz", ".gz")) else "application/octet-stream"
notify_progress(progress_callback, f"Reading archive {filename}", 0.25)
data.extend(
[
f"--{boundary}\r\n".encode(),
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode(),
f"Content-Type: {mime}\r\n\r\n".encode(),
file_path.read_bytes(),
b"\r\n",
f"--{boundary}--\r\n".encode(),
]
)
payload = b"".join(data)
notify_progress(progress_callback, "Uploading archive…", 0.7)
archive = request_json(
f"{server}/archives",
method="POST",
data=payload,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
)
notify_progress(progress_callback, f"Upload completed: {archive['name']}", 1.0)
return archive
def upload_archive(
server: str,
file_path: Path,
fields: dict[str, str],
progress_callback: ProgressCallback | None = None,
) -> int:
archive = upload_archive_with_result(
server,
file_path,
fields,
progress_callback=progress_callback,
)
print(f"Uploaded archive #{archive['id']}: {archive['name']}")
return 0
def download_archive(
server: str,
archive_id: int,
output: Path,
progress_callback: ProgressCallback | None = None,
) -> int:
url = f"{server}/archives/{archive_id}/download"
req = urllib.request.Request(url, method="GET")
notify_progress(progress_callback, "Preparing download…", 0.0)
with urllib.request.urlopen(req) as response:
if output.is_dir():
filename = response.headers.get_filename() or f"archive-{archive_id}.bin"
destination = output / filename
else:
destination = output
total_bytes = response.headers.get("Content-Length")
total = int(total_bytes) if total_bytes and total_bytes.isdigit() else None
received = 0
with destination.open("wb") as buffer:
while chunk := response.read(CHUNK_SIZE):
buffer.write(chunk)
received += len(chunk)
fraction = (received / total) if total else None
notify_progress(progress_callback, f"Downloading archive… {received / (1024 * 1024):.1f} MB", fraction)
notify_progress(progress_callback, f"Download completed: {destination}", 1.0)
if progress_callback is None:
print(f"Downloaded to {destination}")
return 0
def find_remote_archive(server: str, bottle_ref: str) -> dict[str, Any]:
archives = get_archives(server)
bottle_ref_lower = bottle_ref.lower()
for item in archives:
if (item.get("bottle_name") or "").lower() == bottle_ref_lower:
return item
for item in archives:
if (item.get("name") or "").lower() == bottle_ref_lower:
return item
raise FileNotFoundError(f"No remote archive found for '{bottle_ref}'")
def safe_extract_tar(archive_path: Path, target_dir: Path) -> None:
with tarfile.open(archive_path, "r:gz") as tar:
for member in tar.getmembers():
member_path = (target_dir / member.name).resolve()
if not str(member_path).startswith(str(target_dir.resolve())):
raise ValueError("Unsafe archive path detected.")
tar.extractall(target_dir, filter="fully_trusted")
def install_archive_from_metadata(
server: str,
archive: dict[str, Any],
bottles_dir: Path,
replace: bool,
progress_callback: ProgressCallback | None = None,
bottle_ref: str | None = None,
) -> int:
bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref or "unknown-bottle"
target_dir = bottles_dir / bottle_name
if target_dir.exists():
if not replace:
notify_progress(progress_callback, f"Bottle already exists: {target_dir}", None)
print(
f"Bottle already exists: {target_dir}\n"
"Use --replace to overwrite it.",
file=sys.stderr,
)
return 1
notify_progress(progress_callback, f"Removing existing bottle: {target_dir}", None)
shutil.rmtree(target_dir)
bottles_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix="bottle-install-") as tmp_dir:
tmp_path = Path(tmp_dir)
download_path = tmp_path / f"archive-{archive['id']}.tar.gz"
notify_progress(progress_callback, "Downloading archive…", 0.0)
download_archive(server, int(archive["id"]), download_path, progress_callback=progress_callback)
extract_dir = tmp_path / "extract"
extract_dir.mkdir(parents=True, exist_ok=True)
notify_progress(progress_callback, "Extracting archive…", None)
safe_extract_tar(download_path, extract_dir)
candidates = [path for path in extract_dir.iterdir() if path.is_dir()]
if not candidates:
raise FileNotFoundError("Archive did not contain a bottle directory.")
source_dir = candidates[0]
bottle_yml = source_dir / "bottle.yml"
if not bottle_yml.exists():
raise FileNotFoundError("Archive does not look like a valid bottle backup.")
notify_progress(progress_callback, f"Installing into {target_dir}", None)
shutil.move(str(source_dir), str(target_dir))
notify_progress(progress_callback, f"Installed bottle '{bottle_name}' to {target_dir}", 1.0)
if progress_callback is None:
print(f"Installed bottle '{bottle_name}' to {target_dir}")
return 0
def install_archive(
server: str,
bottle_ref: str,
bottles_dir: Path,
replace: bool,
progress_callback: ProgressCallback | None = None,
) -> int:
archive = find_remote_archive(server, bottle_ref)
return install_archive_from_metadata(
server,
archive,
bottles_dir,
replace,
progress_callback=progress_callback,
bottle_ref=bottle_ref,
)
def parse_bottle_yml(bottle_yml: Path) -> dict[str, str]:
data: dict[str, str] = {}
wanted_keys = {"Name", "Arch", "Runner", "Environment", "Windows"}
for line in bottle_yml.read_text(encoding="utf-8", errors="ignore").splitlines():
if ":" not in line or line.startswith(" "):
continue
key, value = line.split(":", 1)
if key in wanted_keys:
data[key] = value.strip().strip("'")
return data
def collect_local_bottles(bottles_dir: Path) -> list[dict[str, Any]]:
if not bottles_dir.exists():
return []
bottles: list[dict[str, Any]] = []
for directory in sorted(path for path in bottles_dir.iterdir() if path.is_dir()):
bottle_yml = directory / "bottle.yml"
if not bottle_yml.exists():
continue
metadata = parse_bottle_yml(bottle_yml)
bottles.append(
{
"name": metadata.get("Name", directory.name),
"directory": directory.name,
"path": str(directory),
"arch": metadata.get("Arch", "-"),
"runner": metadata.get("Runner", "-"),
"environment": metadata.get("Environment", "-"),
"windows": metadata.get("Windows", "-"),
}
)
return bottles
def scan_local_bottles(bottles_dir: Path, as_json: bool) -> int:
bottles = collect_local_bottles(bottles_dir)
if as_json:
print(json.dumps(bottles, indent=2))
return 0
if not bottles:
print(f"No local bottles found in {bottles_dir}")
return 0
print(f"Local Bottles directory: {bottles_dir}")
print(f"{'Dir':<24} {'Name':<28} {'Arch':<8} {'Runner':<18} {'Env':<12} {'Windows':<10}")
print("-" * 110)
for bottle in bottles:
print(
f"{bottle['directory'][:24]:<24} "
f"{bottle['name'][:28]:<28} "
f"{bottle['arch']:<8} "
f"{bottle['runner'][:18]:<18} "
f"{bottle['environment'][:12]:<12} "
f"{bottle['windows'][:10]:<10}"
)
return 0
def create_bottle_backup(bottle: dict[str, Any], output_dir: Path | None = None) -> Path:
source_dir = Path(bottle["path"])
target_dir = output_dir or Path(tempfile.gettempdir())
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
archive_name = f"{bottle['directory']}-{timestamp}.tar.gz"
archive_path = target_dir / archive_name
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(source_dir, arcname=source_dir.name)
return archive_path
def prompt_text(label: str, default: str) -> str:
value = input(f"{label} [{default}]: ").strip()
return value or default
def choose_bottle(bottles: list[dict[str, Any]]) -> dict[str, Any]:
print("Available local bottles:")
for index, bottle in enumerate(bottles, start=1):
print(
f" {index}. {bottle['directory']} "
f"(arch={bottle['arch']}, runner={bottle['runner']}, windows={bottle['windows']})"
)
while True:
raw = input("Select a bottle number: ").strip()
try:
choice = int(raw)
except ValueError:
print("Please enter a valid number.")
continue
if 1 <= choice <= len(bottles):
return bottles[choice - 1]
print("Choice out of range.")
def wizard_upload(server: str, bottles_dir: Path) -> int:
bottles = collect_local_bottles(bottles_dir)
if not bottles:
print(f"No local bottles found in {bottles_dir}")
return 0
bottle = choose_bottle(bottles)
display_name = prompt_text("Archive name", bottle["name"])
bottle_name = prompt_text("Bottle name", bottle["directory"])
description = prompt_text("Description", f"Backup of {bottle['name']}")
tags = prompt_text("Tags", "bottles,backup")
print(f"\nCreating backup for {bottle['directory']}...")
archive_path = create_bottle_backup(bottle)
print(f"Backup created: {archive_path}")
try:
return upload_archive(
server,
archive_path,
{
"name": display_name,
"bottle_name": bottle_name,
"description": description,
"tags": tags,
"arch": str(bottle.get("arch", "")),
"runner": str(bottle.get("runner", "")),
"windows_version": str(bottle.get("windows", "")),
},
)
finally:
archive_path.unlink(missing_ok=True)
def wizard_install(server: str, bottles_dir: Path, replace: bool) -> int:
archives = get_archives(server)
if not archives:
print("No archives found on the server.")
return 0
print(f"{'#':<4} {'ID':<4} {'Name':<30} {'Bottle':<30} {'Arch':<8} {'Runner':<18} {'Size(MB)':>10}")
print("-" * 114)
for index, item in enumerate(archives, start=1):
size_mb = item["size_bytes"] / (1024 * 1024)
print(
f"{index:<4} "
f"{item['id']:<4} "
f"{(item['name'] or '-')[:30]:<30} "
f"{(item.get('bottle_name') or '-')[:30]:<30} "
f"{(item.get('arch') or '-'):<8} "
f"{(item.get('runner') or '-')[:18]:<18} "
f"{size_mb:>10.2f}"
)
while True:
raw = input("\nSelect an archive number: ").strip()
try:
choice = int(raw)
except ValueError:
print("Please enter a valid number.")
continue
if 1 <= choice <= len(archives):
archive = archives[choice - 1]
break
print("Choice out of range.")
if not replace:
bottle_name = archive.get("bottle_name") or archive.get("name") or "unknown"
target_dir = bottles_dir / bottle_name
if target_dir.exists():
ans = input(f"Bottle '{bottle_name}' already exists at {target_dir}. Replace? [y/N]: ").strip().lower()
replace = ans in ("y", "yes")
return install_archive_from_metadata(server, archive, bottles_dir, replace)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="cellar-cli",
description=(
"Cellar — command-line client for the Bottle Archive Server.\n\n"
"Manage backups of Wine prefixes (Bottles) hosted on a remote server.\n"
"You can list, upload, download, and install bottle archives, or run the\n"
"interactive wizard to pick a local bottle, pack it, and upload it in one step."
),
epilog=(
"examples:\n"
" %(prog)s list\n"
" %(prog)s --server http://brain.local:8080 list\n"
" %(prog)s --server http://brain.local:8080 wizard-upload\n"
" %(prog)s upload MyGame.tar.gz --name 'My Game' --tags 'gog,rpg'\n"
" %(prog)s download 3 ~/Downloads/\n"
" %(prog)s install 'My Game' --replace\n"
" %(prog)s scan-local --json\n"
" %(prog)s wizard-install\n"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
conf = load_config()
default_server = conf["server"]
default_bottles_dir = Path(conf["bottles_dir"])
parser.add_argument(
"--server",
default=default_server,
metavar="URL",
help=(
f"Base URL of the Bottle Archive Server "
f"(default: {default_server}; override via {CONF_FILE})"
),
)
subparsers = parser.add_subparsers(dest="command", required=True, title="commands")
subparsers.add_parser(
"list",
help="List all archives stored on the server",
description="Fetch and display every bottle archive available on the server, including\nname, source bottle, architecture, runner, and compressed size.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
upload_parser = subparsers.add_parser(
"upload",
help="Upload a pre-existing archive file to the server",
description=(
"Upload a .tar.gz bottle archive that you already created manually.\n"
"Use 'wizard-upload' instead to let the tool create the archive for you."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
upload_parser.add_argument("file", type=Path, help="Path to the .tar.gz archive file to upload")
upload_parser.add_argument("--name", required=True, metavar="TEXT", help="Human-readable display name for the archive (required)")
upload_parser.add_argument("--bottle-name", metavar="TEXT", help="Internal Bottles directory name (defaults to --name)")
upload_parser.add_argument("--description", metavar="TEXT", help="Free-text description shown in the catalogue")
upload_parser.add_argument("--tags", metavar="TAG[,TAG…]", help="Comma-separated list of tags, e.g. 'gog,rpg,win32'")
upload_parser.add_argument("--arch", metavar="ARCH", help="Windows architecture target, e.g. 'win32' or 'win64'")
upload_parser.add_argument("--runner", metavar="NAME", help="Wine/Proton runner used by the bottle, e.g. 'soda-9.0-1'")
upload_parser.add_argument("--windows-version", metavar="VERSION", help="Emulated Windows version, e.g. 'win10'")
download_parser = subparsers.add_parser(
"download",
help="Download a raw archive file from the server",
description="Download the compressed .tar.gz archive for a specific archive ID.\nPass the numeric ID shown by 'list'.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
download_parser.add_argument("archive_id", type=int, metavar="ID", help="Numeric archive ID (see: cellar-cli list)")
download_parser.add_argument("output", type=Path, metavar="DEST", help="Destination: a file path or an existing directory")
install_parser = subparsers.add_parser(
"install",
help="Download and install a bottle directly into the local Bottles data directory",
description=(
"Fetch the archive that matches BOTTLE (matched against archive name or source\n"
"bottle name) and extract it into the local Bottles directory so it appears\n"
"immediately in the Bottles app."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
install_parser.add_argument("bottle", metavar="BOTTLE", help="Archive name or bottle name to search for on the server")
install_parser.add_argument(
"--bottles-dir",
type=Path,
default=default_bottles_dir,
metavar="DIR",
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})",
)
install_parser.add_argument(
"--replace",
action="store_true",
help="Overwrite the local bottle if a directory with the same name already exists",
)
scan_parser = subparsers.add_parser(
"scan-local",
help="List all Wine prefixes (bottles) found on this computer",
description=(
"Scan the local Bottles data directory and print every bottle found,\n"
"including its architecture, runner, environment type, and Windows version."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
scan_parser.add_argument(
"--bottles-dir",
type=Path,
default=default_bottles_dir,
metavar="DIR",
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})",
)
scan_parser.add_argument("--json", action="store_true", help="Output the results as a JSON array instead of a table")
wizard_parser = subparsers.add_parser(
"wizard-upload",
help="Interactive wizard: pick a local bottle, pack it, and upload it to the server",
description=(
"Guided upload flow:\n"
" 1. Scan the local Bottles directory and show a numbered list.\n"
" 2. Prompt you to select a bottle.\n"
" 3. Ask for archive name, description, and tags (pre-filled with sensible defaults).\n"
" 4. Create a compressed .tar.gz backup in a temporary directory.\n"
" 5. Upload the archive to the server and report the assigned ID.\n\n"
"The temporary archive file is deleted automatically after upload."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
wizard_parser.add_argument(
"--bottles-dir",
type=Path,
default=default_bottles_dir,
metavar="DIR",
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})",
)
wizard_install_parser = subparsers.add_parser(
"wizard-install",
help="Interactive wizard: pick a remote archive and install it locally",
description=(
"Guided install flow:\n"
" 1. Fetch all archives available on the server and show a numbered list.\n"
" 2. Prompt you to select one.\n"
" 3. Download, extract, and install the bottle into the local Bottles directory.\n"
" 4. If the bottle already exists, ask whether to replace it (or use --replace).\n\n"
"The downloaded archive is removed automatically after extraction."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
wizard_install_parser.add_argument(
"--bottles-dir",
type=Path,
default=default_bottles_dir,
metavar="DIR",
help=f"Local Bottles data directory (default: {default_bottles_dir}; override via {CONF_FILE})",
)
wizard_install_parser.add_argument(
"--replace",
action="store_true",
help="Overwrite the local bottle without prompting if it already exists",
)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
server = args.server.rstrip("/")
try:
if args.command == "list":
return print_archives(server)
if args.command == "upload":
return upload_archive(
server,
args.file,
{
"name": args.name,
"bottle_name": args.bottle_name or "",
"description": args.description or "",
"tags": args.tags or "",
"arch": args.arch or "",
"runner": args.runner or "",
"windows_version": args.windows_version or "",
},
)
if args.command == "download":
return download_archive(server, args.archive_id, args.output)
if args.command == "install":
return install_archive(server, args.bottle, args.bottles_dir, args.replace)
if args.command == "scan-local":
return scan_local_bottles(args.bottles_dir, args.json)
if args.command == "wizard-upload":
return wizard_upload(server, args.bottles_dir)
if args.command == "wizard-install":
return wizard_install(server, args.bottles_dir, args.replace)
parser.error("Unknown command")
return 2
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
print(f"HTTP {exc.code}: {detail or exc.reason}", file=sys.stderr)
return 1
except urllib.error.URLError as exc:
print(f"Connection error: {exc.reason}", file=sys.stderr)
return 1
except FileNotFoundError as exc:
print(f"File error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())