Browse Source

feat: add GTK4 desktop client for browsing and installing archived bottles

master
Matteo Benedetto 2 weeks ago
parent
commit
b8c319d034
  1. 36
      README.md
  2. 113
      client.py
  3. 587
      gtk_client.py

36
README.md

@ -12,6 +12,7 @@ The project currently provides:
- a local `SQLite` database for archive metadata
- local file storage for uploaded bottle backups
- a minimal Python CLI client
- a small `GTK4` desktop client for browsing and installing archived bottles
- a `Docker Compose` setup for running the server quickly
The current goal is simple: make it possible to back up a local Bottles bottle, upload it to a Cellar server, list what is stored, and reinstall a bottle by name.
@ -33,6 +34,7 @@ Implemented features:
- interactive backup + upload wizard from the client
- install a bottle from the server with a simple command:
- `python client.py install <bottle-name>`
- browse server archives from a GTK4 window and install them with visual feedback
Not implemented yet:
@ -53,6 +55,7 @@ Not implemented yet:
- [docker-compose.yml](docker-compose.yml) — local deployment
- [requirements.txt](requirements.txt) — Python dependencies
- [client.py](client.py) — minimal CLI client
- [gtk_client.py](gtk_client.py) — GTK4 desktop client
- [app/main.py](app/main.py) — API routes
- [app/models.py](app/models.py) — SQLAlchemy model
- [app/schemas.py](app/schemas.py) — API response schemas
@ -102,6 +105,15 @@ It can:
- run a wizard to choose a local bottle, create a backup, and upload it
- install a remote bottle by name
The GTK desktop client builds on the same install flow, but exposes it through a small Linux GUI:
- refresh the archive list from the server
- filter archives locally with a quick search field
- see archive metadata at a glance
- show when a bottle is already installed locally
- download and install a bottle locally with progress feedback
- open a separate configuration window to change the server URL and local Bottles path
### Storage format
The current client creates a `.tar.gz` backup of the selected bottle directory.
@ -239,6 +251,30 @@ Use a custom Bottles directory:
`python client.py install Empire-Earth-Gold --bottles-dir /custom/path`
### GTK4 desktop client
Run:
`python gtk_client.py`
The GUI lets you:
- list archives available on the server
- filter archives by name, bottle, runner, tags, and description
- immediately see whether a bottle is already installed on the local system
- download and install a selected bottle locally
- enable overwrite of existing local bottles
- open a separate configuration window to set the server address
The GTK client stores its local settings in:
- `~/.config/cellar/gtk_client.json`
On Linux you need GTK4 bindings available in the system Python. Typical packages are:
- Fedora: `python3-gobject` and `gtk4`
- Debian/Ubuntu: `python3-gi` and `gir1.2-gtk-4.0`
---
## Example workflow

113
client.py

@ -10,12 +10,15 @@ import tempfile
import urllib.error
import urllib.parse
import urllib.request
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any
DEFAULT_SERVER = "http://127.0.0.1:8080"
DEFAULT_BOTTLES_DIR = Path.home() / ".var/app/com.usebottles.bottles/data/bottles/bottles"
CHUNK_SIZE = 1024 * 1024
ProgressCallback = Callable[[str, float | None], None]
def request_json(url: str, method: str = "GET", data: bytes | None = None, headers: dict[str, str] | None = None):
@ -26,8 +29,21 @@ def request_json(url: str, method: str = "GET", data: bytes | None = None, heade
return json.loads(response.read().decode("utf-8"))
def get_archives(server: str) -> list[dict[str, Any]]:
return request_json(f"{server}/archives")
def get_archive(server: str, archive_id: int) -> dict[str, Any]:
return request_json(f"{server}/archives/{archive_id}")
def notify_progress(progress_callback: ProgressCallback | None, message: str, fraction: float | None = None) -> None:
if progress_callback is not None:
progress_callback(message, fraction)
def print_archives(server: str) -> int:
archives = request_json(f"{server}/archives")
archives = get_archives(server)
if not archives:
print("No archives found.")
return 0
@ -47,10 +63,17 @@ def print_archives(server: str) -> int:
return 0
def upload_archive(server: str, file_path: Path, fields: dict[str, str]) -> int:
def upload_archive_with_result(
server: str,
file_path: Path,
fields: dict[str, str],
progress_callback: ProgressCallback | None = None,
) -> dict[str, Any]:
boundary = "----BottleArchiveBoundary7MA4YWxkTrZu0gW"
data = []
notify_progress(progress_callback, "Preparing upload…", 0.0)
def add_field(field_name: str, value: str):
data.extend(
[
@ -67,6 +90,7 @@ def upload_archive(server: str, file_path: Path, fields: dict[str, str]) -> int:
filename = file_path.name
mime = "application/gzip" if filename.endswith((".tar.gz", ".tgz", ".gz")) else "application/octet-stream"
notify_progress(progress_callback, f"Reading archive {filename}", 0.25)
data.extend(
[
f"--{boundary}\r\n".encode(),
@ -79,32 +103,68 @@ def upload_archive(server: str, file_path: Path, fields: dict[str, str]) -> int:
)
payload = b"".join(data)
notify_progress(progress_callback, "Uploading archive…", 0.7)
archive = request_json(
f"{server}/archives",
method="POST",
data=payload,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
)
notify_progress(progress_callback, f"Upload completed: {archive['name']}", 1.0)
return archive
def upload_archive(
server: str,
file_path: Path,
fields: dict[str, str],
progress_callback: ProgressCallback | None = None,
) -> int:
archive = upload_archive_with_result(
server,
file_path,
fields,
progress_callback=progress_callback,
)
print(f"Uploaded archive #{archive['id']}: {archive['name']}")
return 0
def download_archive(server: str, archive_id: int, output: Path) -> int:
def download_archive(
server: str,
archive_id: int,
output: Path,
progress_callback: ProgressCallback | None = None,
) -> int:
url = f"{server}/archives/{archive_id}/download"
req = urllib.request.Request(url, method="GET")
notify_progress(progress_callback, "Preparing download…", 0.0)
with urllib.request.urlopen(req) as response:
if output.is_dir():
filename = response.headers.get_filename() or f"archive-{archive_id}.bin"
destination = output / filename
else:
destination = output
destination.write_bytes(response.read())
print(f"Downloaded to {destination}")
total_bytes = response.headers.get("Content-Length")
total = int(total_bytes) if total_bytes and total_bytes.isdigit() else None
received = 0
with destination.open("wb") as buffer:
while chunk := response.read(CHUNK_SIZE):
buffer.write(chunk)
received += len(chunk)
fraction = (received / total) if total else None
notify_progress(progress_callback, f"Downloading archive… {received / (1024 * 1024):.1f} MB", fraction)
notify_progress(progress_callback, f"Download completed: {destination}", 1.0)
if progress_callback is None:
print(f"Downloaded to {destination}")
return 0
def find_remote_archive(server: str, bottle_ref: str) -> dict[str, Any]:
archives = request_json(f"{server}/archives")
archives = get_archives(server)
bottle_ref_lower = bottle_ref.lower()
for item in archives:
@ -126,19 +186,27 @@ def safe_extract_tar(archive_path: Path, target_dir: Path) -> None:
tar.extractall(target_dir, filter="fully_trusted")
def install_archive(server: str, bottle_ref: str, bottles_dir: Path, replace: bool) -> int:
archive = find_remote_archive(server, bottle_ref)
bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref
def install_archive_from_metadata(
server: str,
archive: dict[str, Any],
bottles_dir: Path,
replace: bool,
progress_callback: ProgressCallback | None = None,
bottle_ref: str | None = None,
) -> int:
bottle_name = archive.get("bottle_name") or archive.get("name") or bottle_ref or "unknown-bottle"
target_dir = bottles_dir / bottle_name
if target_dir.exists():
if not replace:
notify_progress(progress_callback, f"Bottle already exists: {target_dir}", None)
print(
f"Bottle already exists: {target_dir}\n"
"Use --replace to overwrite it.",
file=sys.stderr,
)
return 1
notify_progress(progress_callback, f"Removing existing bottle: {target_dir}", None)
shutil.rmtree(target_dir)
bottles_dir.mkdir(parents=True, exist_ok=True)
@ -146,10 +214,12 @@ def install_archive(server: str, bottle_ref: str, bottles_dir: Path, replace: bo
with tempfile.TemporaryDirectory(prefix="bottle-install-") as tmp_dir:
tmp_path = Path(tmp_dir)
download_path = tmp_path / f"archive-{archive['id']}.tar.gz"
download_archive(server, int(archive["id"]), download_path)
notify_progress(progress_callback, "Downloading archive…", 0.0)
download_archive(server, int(archive["id"]), download_path, progress_callback=progress_callback)
extract_dir = tmp_path / "extract"
extract_dir.mkdir(parents=True, exist_ok=True)
notify_progress(progress_callback, "Extracting archive…", None)
safe_extract_tar(download_path, extract_dir)
candidates = [path for path in extract_dir.iterdir() if path.is_dir()]
@ -161,12 +231,33 @@ def install_archive(server: str, bottle_ref: str, bottles_dir: Path, replace: bo
if not bottle_yml.exists():
raise FileNotFoundError("Archive does not look like a valid bottle backup.")
notify_progress(progress_callback, f"Installing into {target_dir}", None)
shutil.move(str(source_dir), str(target_dir))
print(f"Installed bottle '{bottle_name}' to {target_dir}")
notify_progress(progress_callback, f"Installed bottle '{bottle_name}' to {target_dir}", 1.0)
if progress_callback is None:
print(f"Installed bottle '{bottle_name}' to {target_dir}")
return 0
def install_archive(
server: str,
bottle_ref: str,
bottles_dir: Path,
replace: bool,
progress_callback: ProgressCallback | None = None,
) -> int:
archive = find_remote_archive(server, bottle_ref)
return install_archive_from_metadata(
server,
archive,
bottles_dir,
replace,
progress_callback=progress_callback,
bottle_ref=bottle_ref,
)
def parse_bottle_yml(bottle_yml: Path) -> dict[str, str]:
data: dict[str, str] = {}
wanted_keys = {"Name", "Arch", "Runner", "Environment", "Windows"}

587
gtk_client.py

@ -0,0 +1,587 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import threading
import urllib.error
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Callable
try:
import gi # type: ignore[import-not-found]
except ImportError as exc:
raise SystemExit(
"PyGObject / GTK4 non trovato. Installa i pacchetti di sistema 'python3-gobject' e 'gtk4' "
"oppure l'equivalente della tua distribuzione."
) from exc
from client import (
DEFAULT_BOTTLES_DIR,
DEFAULT_SERVER,
collect_local_bottles,
get_archives,
install_archive_from_metadata,
)
gi.require_version("Gtk", "4.0")
from gi.repository import GLib, Gtk # type: ignore[import-not-found]
CONFIG_PATH = Path.home() / ".config" / "cellar" / "gtk_client.json"
@dataclass
class AppConfig:
server_url: str = DEFAULT_SERVER
bottles_dir: str = str(DEFAULT_BOTTLES_DIR)
replace_existing: bool = False
def load_config() -> AppConfig:
if not CONFIG_PATH.exists():
return AppConfig()
try:
raw = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return AppConfig()
return AppConfig(
server_url=str(raw.get("server_url") or DEFAULT_SERVER).rstrip("/"),
bottles_dir=str(raw.get("bottles_dir") or DEFAULT_BOTTLES_DIR),
replace_existing=bool(raw.get("replace_existing", False)),
)
def save_config(config: AppConfig) -> None:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(asdict(config), indent=2), encoding="utf-8")
def format_size(size_bytes: int | None) -> str:
if not size_bytes:
return "-"
size = float(size_bytes)
units = ["B", "KB", "MB", "GB"]
for unit in units:
if size < 1024 or unit == units[-1]:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size_bytes} B"
def archive_matches_query(archive: dict[str, Any], query: str) -> bool:
normalized = query.strip().lower()
if not normalized:
return True
searchable_fields = (
archive.get("name"),
archive.get("bottle_name"),
archive.get("description"),
archive.get("tags"),
archive.get("runner"),
archive.get("arch"),
archive.get("windows_version"),
)
haystack = " ".join(str(value or "") for value in searchable_fields).lower()
return normalized in haystack
def normalize_name(value: str | None) -> str:
return (value or "").strip().lower()
class SettingsWindow(Gtk.Window):
def __init__(self, parent: "CellarWindow", config: AppConfig, on_save: Callable[[AppConfig], None]):
super().__init__(title="Configurazione")
self.parent = parent
self.on_save = on_save
self.set_transient_for(parent)
self.set_modal(True)
self.set_default_size(520, 180)
root = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
root.set_margin_top(16)
root.set_margin_bottom(16)
root.set_margin_start(16)
root.set_margin_end(16)
description = Gtk.Label(
label="Imposta il server Cellar e la cartella locale di Bottles usata per installazione e upload.",
wrap=True,
xalign=0,
)
root.append(description)
grid = Gtk.Grid(column_spacing=12, row_spacing=12)
server_label = Gtk.Label(label="Server", xalign=0)
self.server_entry = Gtk.Entry(text=config.server_url)
self.server_entry.set_hexpand(True)
grid.attach(server_label, 0, 0, 1, 1)
grid.attach(self.server_entry, 1, 0, 1, 1)
bottles_label = Gtk.Label(label="Cartella Bottles", xalign=0)
self.bottles_entry = Gtk.Entry(text=config.bottles_dir)
self.bottles_entry.set_hexpand(True)
grid.attach(bottles_label, 0, 1, 1, 1)
grid.attach(self.bottles_entry, 1, 1, 1, 1)
root.append(grid)
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12)
button_box.set_halign(Gtk.Align.END)
cancel_button = Gtk.Button(label="Annulla")
cancel_button.connect("clicked", lambda *_: self.close())
save_button = Gtk.Button(label="Salva")
save_button.add_css_class("suggested-action")
save_button.connect("clicked", self._save)
button_box.append(cancel_button)
button_box.append(save_button)
root.append(button_box)
self.set_child(root)
def _save(self, *_args: object) -> None:
server_url = self.server_entry.get_text().strip().rstrip("/")
bottles_dir = self.bottles_entry.get_text().strip()
if not server_url:
self.parent.show_message("L'indirizzo del server non può essere vuoto.")
return
config = AppConfig(
server_url=server_url,
bottles_dir=str(Path(bottles_dir).expanduser()) if bottles_dir else str(DEFAULT_BOTTLES_DIR),
replace_existing=self.parent.replace_switch.get_active(),
)
save_config(config)
self.on_save(config)
self.close()
class ArchiveRow(Gtk.ListBoxRow):
def __init__(
self,
archive: dict[str, Any],
on_install: Callable[[dict[str, Any]], None],
is_installed: bool,
replace_enabled: bool,
):
super().__init__()
self.archive = archive
root = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12)
root.set_margin_top(10)
root.set_margin_bottom(10)
root.set_margin_start(12)
root.set_margin_end(12)
text_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4)
text_box.set_hexpand(True)
title = archive.get("name") or archive.get("bottle_name") or "Archivio senza nome"
bottle_name = archive.get("bottle_name") or "-"
runner = archive.get("runner") or "-"
arch = archive.get("arch") or "-"
windows_version = archive.get("windows_version") or "-"
created_at = archive.get("created_at") or "-"
tags = archive.get("tags") or "-"
description = archive.get("description") or ""
size = format_size(archive.get("size_bytes"))
title_label = Gtk.Label(xalign=0)
title_label.set_markup(f"<b>{GLib.markup_escape_text(title)}</b>")
details_text = (
f"Bottle: {bottle_name} • Runner: {runner} • Arch: {arch}"
f"Windows: {windows_version} • Size: {size} • Tag: {tags} • Creato: {created_at}"
)
if description:
details_text = f"{details_text}\n{description}"
details_label = Gtk.Label(
xalign=0,
wrap=True,
selectable=True,
label=details_text,
)
details_label.add_css_class("dim-label")
status_label = Gtk.Label(xalign=0)
if is_installed:
status_label.set_markup("<span foreground='#2e7d32'><b>Già installata sul sistema</b></span>")
else:
status_label.set_markup("<span foreground='#666666'>Non installata localmente</span>")
text_box.append(title_label)
text_box.append(details_label)
text_box.append(status_label)
button_label = "Scarica e installa"
if is_installed and replace_enabled:
button_label = "Reinstalla"
elif is_installed:
button_label = "Già installata"
self.install_button = Gtk.Button(label=button_label)
if not is_installed or replace_enabled:
self.install_button.add_css_class("suggested-action")
self.install_button.set_valign(Gtk.Align.CENTER)
self.install_button.connect("clicked", lambda *_: on_install(self.archive))
self.install_button.set_sensitive((not is_installed) or replace_enabled)
root.append(text_box)
root.append(self.install_button)
self.set_child(root)
def set_sensitive_state(self, enabled: bool) -> None:
self.install_button.set_sensitive(enabled)
class CellarWindow(Gtk.ApplicationWindow):
def __init__(self, app: Gtk.Application):
super().__init__(application=app, title="Cellar GTK")
self.config = load_config()
self.settings_window: SettingsWindow | None = None
self.rows: list[ArchiveRow] = []
self.all_archives: list[dict[str, Any]] = []
self.local_bottle_names: set[str] = set()
self.busy = False
self.set_default_size(980, 700)
root = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
root.set_margin_top(12)
root.set_margin_bottom(12)
root.set_margin_start(12)
root.set_margin_end(12)
header = Gtk.HeaderBar()
header.set_title_widget(Gtk.Label(label="Cellar"))
self.refresh_button = Gtk.Button(label="Aggiorna")
self.refresh_button.connect("clicked", lambda *_: self.refresh_archives())
header.pack_start(self.refresh_button)
self.settings_button = Gtk.Button(label="Configurazione")
self.settings_button.connect("clicked", lambda *_: self.open_settings())
header.pack_end(self.settings_button)
self.set_titlebar(header)
info_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
info_box.add_css_class("card")
info_box.set_margin_bottom(6)
info_box.set_margin_top(6)
info_box.set_margin_start(6)
info_box.set_margin_end(6)
self.server_label = Gtk.Label(xalign=0, selectable=True)
self.bottles_label = Gtk.Label(xalign=0, selectable=True)
self.replace_switch = Gtk.Switch(active=self.config.replace_existing)
self.replace_switch.connect("notify::active", self._store_replace_preference)
replace_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12)
replace_box.append(Gtk.Label(label="Sostituisci installazioni esistenti", xalign=0))
replace_box.append(self.replace_switch)
info_box.append(self.server_label)
info_box.append(self.bottles_label)
info_box.append(replace_box)
search_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12)
search_box.append(Gtk.Label(label="Cerca", xalign=0))
self.search_entry = Gtk.SearchEntry()
self.search_entry.set_hexpand(True)
self.search_entry.set_placeholder_text("Nome, bottle, tag, runner, descrizione…")
self.search_entry.connect("search-changed", self._on_search_changed)
search_box.append(self.search_entry)
clear_search_button = Gtk.Button(label="Pulisci")
clear_search_button.connect("clicked", lambda *_: self.search_entry.set_text(""))
search_box.append(clear_search_button)
self.summary_label = Gtk.Label(xalign=0)
self.status_label = Gtk.Label(xalign=0, wrap=True)
self.progress_bar = Gtk.ProgressBar()
self.progress_bar.set_hexpand(True)
self.progress_bar.set_show_text(False)
self.spinner = Gtk.Spinner()
status_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12)
status_box.append(self.spinner)
status_box.append(self.progress_bar)
root.append(info_box)
root.append(search_box)
root.append(self.summary_label)
root.append(self.status_label)
root.append(status_box)
self.placeholder_label = Gtk.Label(
label="Nessun archivio disponibile. Premi 'Aggiorna' per leggere il catalogo dal server.",
wrap=True,
justify=Gtk.Justification.CENTER,
)
self.placeholder_label.set_vexpand(True)
self.listbox = Gtk.ListBox()
self.listbox.set_selection_mode(Gtk.SelectionMode.NONE)
self.listbox.add_css_class("boxed-list")
scroller = Gtk.ScrolledWindow()
scroller.set_vexpand(True)
scroller.set_hexpand(True)
scroller.set_child(self.listbox)
self.content_stack = Gtk.Stack()
self.content_stack.add_named(self.placeholder_label, "placeholder")
self.content_stack.add_named(scroller, "list")
root.append(self.content_stack)
self.set_child(root)
self.update_info_labels()
self.refresh_local_bottles()
self.set_status("Pronto.", 0.0)
self.update_summary(0, 0)
self.refresh_archives()
def _store_replace_preference(self, *_args: object) -> None:
self.config.replace_existing = self.replace_switch.get_active()
save_config(self.config)
self.apply_archive_filter()
def update_info_labels(self) -> None:
self.server_label.set_text(f"Server: {self.config.server_url}")
self.bottles_label.set_text(f"Cartella locale: {self.config.bottles_dir}")
self.replace_switch.set_active(self.config.replace_existing)
def update_summary(self, visible_count: int, total_count: int) -> None:
if total_count == 0:
self.summary_label.set_text("Nessun archivio caricato.")
return
if visible_count == total_count:
self.summary_label.set_text(f"Archivi visibili: {visible_count}")
return
self.summary_label.set_text(f"Archivi visibili: {visible_count} su {total_count}")
def open_settings(self) -> None:
if self.settings_window is not None:
self.settings_window.present()
return
self.settings_window = SettingsWindow(self, self.config, self.apply_config)
self.settings_window.connect("close-request", self._on_settings_closed)
self.settings_window.present()
def _on_settings_closed(self, window: Gtk.Window) -> bool:
if window is self.settings_window:
self.settings_window = None
return False
def apply_config(self, config: AppConfig) -> None:
self.config = config
self.update_info_labels()
self.refresh_local_bottles()
self.refresh_archives()
def refresh_local_bottles(self) -> None:
bottles_dir = Path(self.config.bottles_dir).expanduser()
local_bottles = collect_local_bottles(bottles_dir)
known_names: set[str] = set()
for bottle in local_bottles:
known_names.add(normalize_name(bottle.get("directory")))
known_names.add(normalize_name(bottle.get("name")))
self.local_bottle_names = {name for name in known_names if name}
def is_archive_installed(self, archive: dict[str, Any]) -> bool:
candidates = {
normalize_name(archive.get("bottle_name")),
normalize_name(archive.get("name")),
}
candidates.discard("")
return any(candidate in self.local_bottle_names for candidate in candidates)
def set_busy(self, busy: bool) -> None:
self.busy = busy
self.refresh_button.set_sensitive(not busy)
self.settings_button.set_sensitive(not busy)
self.search_entry.set_sensitive(not busy)
for row in self.rows:
row.set_sensitive_state(not busy)
if busy:
self.spinner.start()
else:
self.spinner.stop()
def set_status(self, message: str, fraction: float | None = None) -> None:
self.status_label.set_text(message)
if fraction is None:
self.progress_bar.pulse()
else:
self.progress_bar.set_fraction(max(0.0, min(1.0, fraction)))
def show_message(self, message: str, title: str = "Cellar") -> None:
dialog = Gtk.MessageDialog(
transient_for=self,
modal=True,
buttons=Gtk.ButtonsType.OK,
text=title,
secondary_text=message,
)
dialog.connect("response", lambda d, *_: d.close())
dialog.present()
def clear_rows(self) -> None:
self.rows.clear()
child = self.listbox.get_first_child()
while child is not None:
next_child = child.get_next_sibling()
self.listbox.remove(child)
child = next_child
def render_archives(self, archives: list[dict[str, Any]]) -> None:
self.clear_rows()
replace_enabled = self.replace_switch.get_active()
for archive in archives:
row = ArchiveRow(
archive,
self.install_archive,
is_installed=self.is_archive_installed(archive),
replace_enabled=replace_enabled,
)
row.set_sensitive_state(not self.busy)
self.rows.append(row)
self.listbox.append(row)
if self.all_archives and not archives:
self.placeholder_label.set_text("Nessun archivio corrisponde al filtro corrente.")
elif not self.all_archives:
self.placeholder_label.set_text("Nessun archivio disponibile. Premi 'Aggiorna' per leggere il catalogo dal server.")
self.content_stack.set_visible_child_name("list" if archives else "placeholder")
self.update_summary(len(archives), len(self.all_archives))
def apply_archive_filter(self) -> None:
query = self.search_entry.get_text()
filtered = [archive for archive in self.all_archives if archive_matches_query(archive, query)]
self.render_archives(filtered)
def _on_search_changed(self, *_args: object) -> None:
self.apply_archive_filter()
def run_background(self, worker: Callable[[], None]) -> None:
thread = threading.Thread(target=worker, daemon=True)
thread.start()
def refresh_archives(self) -> None:
if self.busy:
return
self.set_busy(True)
self.set_status("Caricamento catalogo dal server…", None)
def worker() -> None:
try:
archives = get_archives(self.config.server_url)
except Exception as exc:
GLib.idle_add(self._handle_error, exc)
return
GLib.idle_add(self._finish_refresh, archives)
self.run_background(worker)
def _finish_refresh(self, archives: list[dict[str, Any]]) -> bool:
self.all_archives = archives
self.refresh_local_bottles()
self.apply_archive_filter()
message = f"Trovati {len(archives)} archivi sul server." if archives else "Nessun archivio trovato sul server."
self.set_status(message, 1.0)
self.set_busy(False)
return False
def install_archive(self, archive: dict[str, Any]) -> None:
if self.busy:
return
archive_name = archive.get("name") or archive.get("bottle_name") or f"#{archive.get('id', '?')}"
bottles_dir = Path(self.config.bottles_dir).expanduser()
replace_existing = self.replace_switch.get_active()
self.set_busy(True)
self.set_status(f"Avvio installazione di {archive_name}", 0.0)
def progress(message: str, fraction: float | None) -> None:
GLib.idle_add(self.set_status, message, fraction)
def worker() -> None:
try:
result = install_archive_from_metadata(
self.config.server_url,
archive,
bottles_dir,
replace_existing,
progress_callback=progress,
)
GLib.idle_add(self._finish_install, archive_name, result)
except Exception as exc:
GLib.idle_add(self._handle_error, exc)
self.run_background(worker)
def _finish_install(self, archive_name: str, result: int) -> bool:
self.set_busy(False)
if result == 0:
self.refresh_local_bottles()
self.apply_archive_filter()
self.set_status(f"Installazione completata: {archive_name}", 1.0)
self.show_message(f"Installazione completata con successo: {archive_name}")
else:
self.set_status(f"Installazione non completata: {archive_name}", 0.0)
self.show_message(
"L'installazione non è stata completata. Controlla se la bottle esiste già oppure abilita la sostituzione.",
title="Installazione interrotta",
)
return False
def _handle_error(self, exc: Exception) -> bool:
self.set_busy(False)
self.set_status("Operazione non riuscita.", 0.0)
if isinstance(exc, urllib.error.HTTPError):
detail = exc.read().decode("utf-8", errors="ignore")
message = f"HTTP {exc.code}: {detail or exc.reason}"
elif isinstance(exc, urllib.error.URLError):
message = f"Errore di connessione: {exc.reason}"
else:
message = str(exc)
self.show_message(message, title="Errore")
return False
class CellarApplication(Gtk.Application):
def __init__(self) -> None:
super().__init__(application_id="com.cellar.gtkclient")
def do_activate(self) -> None:
window = self.props.active_window
if window is None:
window = CellarWindow(self)
window.present()
def main() -> int:
app = CellarApplication()
return app.run(None)
if __name__ == "__main__":
raise SystemExit(main())
Loading…
Cancel
Save