Browse Source

Rename project to mcp-hal9002

main
Matteo Benedetto 3 months ago
commit
2aec9186a2
  1. 6
      .gitignore
  2. 152
      README.md
  3. 18
      assets/mcp-hal9002-logo.svg
  4. 28
      pyproject.toml
  5. 7
      src/gnome_vte_mcp/__init__.py
  6. 72
      src/gnome_vte_mcp/control.py
  7. 865
      src/gnome_vte_mcp/gui.py
  8. 140
      src/gnome_vte_mcp/server.py

6
.gitignore vendored

@ -0,0 +1,6 @@
.venv/
__pycache__/
*.py[cod]
*.egg-info/
build/
dist/

152
README.md

@ -0,0 +1,152 @@
# mcp-hal9002
![mcp-hal9002 logo](assets/mcp-hal9002-logo.svg)
`mcp-hal9002` e un terminal workspace GTK4 + VTE controllabile via MCP, con GUI locale, control plane Unix socket e strumenti per screenshot/debug UI.
Il nome definitivo del progetto, del pacchetto distribuibile e delle CLI e allineato alla cartella root: `mcp-hal9002`.
## Obiettivo
Il progetto evita di dipendere da API non ufficiali di GNOME Terminal. Invece espone direttamente una GUI propria, che supporta:
- apertura tab
- elenco tab
- focus tab
- invio comandi a una shell esistente
- lettura dello scrollback della tab
- screenshot della finestra, del contenuto VTE della tab, o del container della tab per debug UI/UX
- chiusura tab
## Architettura
- `src/gnome_vte_mcp/gui.py`
applicazione GTK4/VTE con un controllo locale via socket Unix
- `src/gnome_vte_mcp/server.py`
MCP server basato su `FastMCP` che parla con la GUI
- `src/gnome_vte_mcp/control.py`
protocollo client per il socket locale
Il server MCP lancia automaticamente la GUI se non e gia in esecuzione.
## Uso con uvx
Entrypoint principali:
- `mcp-hal9002` avvia il server MCP
- `mcp-hal9002-gui` avvia la GUI GTK/VTE
Esempi locali dal checkout:
```bash
uvx --from . mcp-hal9002
uvx --from . mcp-hal9002-gui
```
Esempio da repository Git:
```bash
uvx --from git+https://git.enne2.net/enne2/mcp-hal9002.git mcp-hal9002
```
## Requisiti
Nel sistema devono essere disponibili:
- Python 3.12+
- PyGObject (`gi`)
- GTK4
- VTE con binding GI (`Vte 3.91`)
- package Python `mcp`
In questa macchina il runtime necessario risulta disponibile.
## Avvio locale della GUI
```bash
PYTHONPATH=src python3 -m gnome_vte_mcp.gui
```
Per usare un socket custom:
```bash
PYTHONPATH=src python3 -m gnome_vte_mcp.gui --socket /tmp/mcp-hal9002-demo.sock
```
## Avvio del server MCP
```bash
PYTHONPATH=src python3 -m gnome_vte_mcp.server
```
Variabile opzionale:
```bash
export MCP_HAL9002_SOCKET=/tmp/mcp-hal9002-demo.sock
```
La variabile legacy `GNOME_VTE_MCP_SOCKET` resta accettata per compatibilita.
## Tool MCP disponibili
- `open_tab(title=None, cwd=None, command=None)`
- `list_tabs()`
- `focus_tab(tab_id)`
- `exec_command(tab_id, command, newline=True)`
- `read_tab(tab_id, last_n_lines=200)`
- `capture_screenshot(target="window", tab_id=None, path=None, diagnostic_overlay=False)`
- `close_tab(tab_id)`
Per `capture_screenshot`:
- `target="window"` prova a catturare l'intera finestra, inclusa la titlebar GTK; se la piattaforma non lo consente ripiega sul contenuto renderizzato
- `target="tab"` cattura solo il widget `Vte.Terminal` della tab attiva o della tab indicata con `tab_id`, escludendo header bar e tab bar
- `target="tab-container"` cattura il container GTK della tab, utile se serve includere il bordo o eventuali scrollbar
- `path` permette di scegliere il file PNG di destinazione
- `diagnostic_overlay=True` annota lo screenshot con una griglia e i bounds del target per `window`, `tab` e `tab-container`
Il tool restituisce contenuto MCP di tipo testo + immagine, seguendo lo stesso approccio di `local-image-mcp`, quindi l'agente puo ispezionare direttamente lo screenshot senza passare da un path locale separato.
Per ogni screenshot viene salvato anche un file JSON sidecar con metadati di debug UI, tra cui:
- widget target effettivo
- allocazione del widget rispetto alla finestra
- renderer GSK usato per la cattura
- dettagli di superficie e scala
- stato dell'overlay diagnostico richiesto/applicato
## Primo avvio e onboarding
La prima tab mostra un piccolo pannello di onboarding centrato sopra il terminale, con un riepilogo delle azioni MCP disponibili e del workspace corrente.
Il pannello sparisce automaticamente:
- alla prima pressione di tasto dentro il terminale
- al primo `exec_command(...)` inviato via MCP
## Titoli tab dinamici
I titoli delle tab sono derivati in modo compatto da:
- `title` esplicito, se fornito e non generico
- comando iniziale o ultimo comando eseguito
- `cwd`, quando non c'e un comando significativo
Questo rende il `StackSwitcher` piu compatto e piu utile quando ci sono piu tab aperte.
## Limiti attuali
- non salva ancora layout o sessioni persistenti
- non gestisce ancora rename tab automatico da processo o cwd
- il canale di controllo locale non ha ancora autenticazione aggiuntiva oltre ai permessi del socket Unix
- il recupero testo usa un transcript locale della shell, ma non implementa ancora stream incrementale o eventi
- la cattura screenshot dipende dal fatto che il widget sia già renderizzato nella sessione grafica corrente
- se una GUI gia avviata resta in esecuzione, i test del control plane continueranno a usare quel processo: dopo modifiche a screenshot o titlebar conviene riavviare la GUI prima di ritestare
## Prossimi passi naturali
1. aggiungere eventi di output incrementale e subscription
2. introdurre persistenza dello stato tab/sessioni
3. migliorare il modello UI con veri tab drag-and-drop e split view
4. aggiungere policy locali per limitare i comandi consentiti
5. usare gli screenshot e i sidecar JSON per confronti UI before/after automatizzabili

18
assets/mcp-hal9002-logo.svg

@ -0,0 +1,18 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="none" xmlns="http://www.w3.org/2000/svg">
<rect width="512" height="512" rx="112" fill="#111111"/>
<rect x="48" y="48" width="416" height="416" rx="88" fill="url(#bg)"/>
<circle cx="256" cy="220" r="114" fill="#2A0B0B"/>
<circle cx="256" cy="220" r="84" fill="#541111"/>
<circle cx="256" cy="220" r="56" fill="#D92323"/>
<circle cx="256" cy="220" r="28" fill="#FFD7D7" fill-opacity="0.9"/>
<path d="M144 356C144 341.641 155.641 330 170 330H342C356.359 330 368 341.641 368 356V358C368 372.359 356.359 384 342 384H170C155.641 384 144 372.359 144 358V356Z" fill="#161616" stroke="#3A3A3A" stroke-width="4"/>
<path d="M188 357L214 331" stroke="#F04A4A" stroke-width="12" stroke-linecap="round"/>
<path d="M188 331L214 357" stroke="#F04A4A" stroke-width="12" stroke-linecap="round"/>
<path d="M242 358H324" stroke="#F6F6F6" stroke-width="10" stroke-linecap="round"/>
<defs>
<linearGradient id="bg" x1="84" y1="64" x2="432" y2="448" gradientUnits="userSpaceOnUse">
<stop stop-color="#1D1D1D"/>
<stop offset="1" stop-color="#090909"/>
</linearGradient>
</defs>
</svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

28
pyproject.toml

@ -0,0 +1,28 @@
[build-system]
requires = ["setuptools>=69"]
build-backend = "setuptools.build_meta"
[project]
name = "mcp-hal9002"
version = "0.1.0"
description = "GTK4/VTE terminal workspace controllable through MCP"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"mcp>=1.0.0",
]
[project.urls]
Repository = "https://git.enne2.net/enne2/mcp-hal9002"
[project.scripts]
mcp-hal9002 = "gnome_vte_mcp.server:main"
mcp-hal9002-gui = "gnome_vte_mcp.gui:main"
gnome-vte-mcp-gui = "gnome_vte_mcp.gui:main"
gnome-vte-mcp-server = "gnome_vte_mcp.server:main"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]

7
src/gnome_vte_mcp/__init__.py

@ -0,0 +1,7 @@
"""mcp-hal9002 package."""
__all__ = [
"control",
"gui",
"server",
]

72
src/gnome_vte_mcp/control.py

@ -0,0 +1,72 @@
from __future__ import annotations
import json
import os
import socket
import time
import uuid
from pathlib import Path
from typing import Any
DEFAULT_SOCKET_NAME = "mcp-hal9002.sock"
class ControlError(RuntimeError):
"""Raised when the local GUI control plane returns an error."""
def default_socket_path() -> Path:
runtime_dir = os.environ.get("XDG_RUNTIME_DIR")
if runtime_dir:
return Path(runtime_dir) / DEFAULT_SOCKET_NAME
return Path("/tmp") / DEFAULT_SOCKET_NAME
def request(method: str, params: dict[str, Any] | None = None, socket_path: str | Path | None = None, timeout: float = 5.0) -> Any:
path = Path(socket_path) if socket_path else default_socket_path()
message = {
"id": str(uuid.uuid4()),
"method": method,
"params": params or {},
}
payload = (json.dumps(message) + "\n").encode("utf-8")
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.settimeout(timeout)
client.connect(os.fspath(path))
client.sendall(payload)
response_chunks: list[bytes] = []
while True:
chunk = client.recv(65536)
if not chunk:
break
response_chunks.append(chunk)
if b"\n" in chunk:
break
if not response_chunks:
raise ControlError("No response received from GUI controller")
line = b"".join(response_chunks).splitlines()[0]
response = json.loads(line.decode("utf-8"))
if not response.get("ok"):
raise ControlError(response.get("error", "Unknown control-plane error"))
return response.get("result")
def wait_for_socket(socket_path: str | Path | None = None, timeout: float = 10.0) -> bool:
path = Path(socket_path) if socket_path else default_socket_path()
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if path.exists():
try:
request("ping", socket_path=path, timeout=1.0)
return True
except OSError:
pass
except ControlError:
return True
time.sleep(0.1)
return False

865
src/gnome_vte_mcp/gui.py

@ -0,0 +1,865 @@
from __future__ import annotations
import argparse
import cairo
from datetime import datetime
import json
import os
import re
import shlex
import socketserver
import threading
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import gi
gi.require_version("Gdk", "4.0")
gi.require_version("Gsk", "4.0")
gi.require_version("Graphene", "1.0")
gi.require_version("Gtk", "4.0")
gi.require_version("Vte", "3.91")
from gi.repository import Gdk, Gio, GLib, Graphene, Gsk, Gtk, Vte
from .control import default_socket_path
DEFAULT_SCROLLBACK_LINES = 10000
ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
OSC_ESCAPE_RE = re.compile(r"\x1b\].*?(?:\x07|\x1b\\)", re.DOTALL)
SINGLE_ESCAPE_RE = re.compile(r"\x1b[@-_]")
@dataclass
class TabState:
tab_id: str
title: str
container: Gtk.Overlay
scroller: Gtk.ScrolledWindow
terminal: Vte.Terminal
welcome_revealer: Gtk.Revealer
diagnostic_layer: Gtk.DrawingArea
log_path: Path
cwd: str
last_command: str | None
class TerminalWindow(Gtk.ApplicationWindow):
def __init__(self, app: "TerminalApp") -> None:
super().__init__(application=app)
self.set_title("mcp-hal9002")
self.set_default_size(1100, 720)
self.set_hide_on_close(False)
self._app = app
self._diagnostic_config: dict[str, Any] | None = None
self._stack = Gtk.Stack()
self._stack.set_vexpand(True)
self._stack.set_hexpand(True)
self._switcher = Gtk.StackSwitcher()
self._switcher.set_stack(self._stack)
self._switcher.set_tooltip_text("Switch terminal tabs")
self._title_label = Gtk.Label(label="MCP HAL9002")
self._title_label.add_css_class("heading")
self._subtitle_label = Gtk.Label(label="Remote terminal cockpit")
self._subtitle_label.add_css_class("caption")
self._title_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
self._title_box.set_halign(Gtk.Align.CENTER)
self._title_box.append(self._title_label)
self._title_box.append(self._subtitle_label)
header = Gtk.HeaderBar()
self._header = header
header.set_show_title_buttons(True)
header.set_title_widget(self._title_box)
add_button = Gtk.Button.new_from_icon_name("tab-new-symbolic")
add_button.add_css_class("flat")
add_button.set_tooltip_text("Open a new terminal tab")
add_button.connect("clicked", self._on_new_tab_clicked)
header.pack_end(add_button)
self.set_titlebar(header)
self._root_overlay = Gtk.Overlay()
self._root_overlay.set_child(self._stack)
self._diagnostic_layer = Gtk.DrawingArea()
self._diagnostic_layer.set_hexpand(True)
self._diagnostic_layer.set_vexpand(True)
self._diagnostic_layer.set_can_target(False)
self._diagnostic_layer.set_draw_func(self._draw_window_diagnostic_overlay)
self._root_overlay.add_overlay(self._diagnostic_layer)
self._root_overlay.set_measure_overlay(self._diagnostic_layer, True)
self._root_overlay.set_clip_overlay(self._diagnostic_layer, False)
self._diagnostic_layer.set_visible(False)
self.set_child(self._root_overlay)
self.sync_header_state(tab_count=0)
@property
def stack(self) -> Gtk.Stack:
return self._stack
def sync_header_state(self, tab_count: int) -> None:
if tab_count > 1:
self._header.set_title_widget(self._switcher)
else:
self._header.set_title_widget(self._title_box)
def set_diagnostic_overlay(self, config: dict[str, Any] | None) -> None:
self._diagnostic_config = config
self._diagnostic_layer.set_visible(config is not None)
self._diagnostic_layer.queue_draw()
def _draw_window_diagnostic_overlay(
self,
_area: Gtk.DrawingArea,
ctx: Any,
width: int,
height: int,
) -> None:
self._app.draw_diagnostic_overlay(
ctx,
width,
height,
self._diagnostic_config,
)
def _on_new_tab_clicked(self, _button: Gtk.Button) -> None:
self._app.create_tab(title="shell")
class ControlRequestHandler(socketserver.StreamRequestHandler):
def handle(self) -> None:
line = self.rfile.readline()
if not line:
return
try:
payload = json.loads(line.decode("utf-8"))
method = payload["method"]
params = payload.get("params", {})
result = self.server.app.call_on_ui_thread(method, params)
response = {"id": payload.get("id"), "ok": True, "result": result}
except Exception as exc: # noqa: BLE001
response = {"id": payload.get("id") if "payload" in locals() else None, "ok": False, "error": str(exc)}
self.wfile.write((json.dumps(response) + "\n").encode("utf-8"))
self.wfile.flush()
class ControlServer(socketserver.ThreadingUnixStreamServer):
allow_reuse_address = True
def __init__(self, socket_path: str, app: "TerminalApp") -> None:
self.app = app
super().__init__(socket_path, ControlRequestHandler)
class TerminalApp(Gtk.Application):
def __init__(self, socket_path: Path) -> None:
super().__init__(application_id="net.enne2.McpHal9002")
self.socket_path = socket_path
self.log_dir = self.socket_path.parent / "mcp-hal9002-logs"
self.screenshot_dir = self.socket_path.parent / "mcp-hal9002-screenshots"
self.window: TerminalWindow | None = None
self.tabs: dict[str, TabState] = {}
self.server: ControlServer | None = None
self.server_thread: threading.Thread | None = None
self.connect("activate", self.on_activate)
def on_activate(self, _app: Gtk.Application) -> None:
if self.window is None:
self.window = TerminalWindow(self)
self.start_control_server()
self.create_tab(title="shell")
self.window.present()
def start_control_server(self) -> None:
self.socket_path.parent.mkdir(parents=True, exist_ok=True)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
if self.socket_path.exists():
self.socket_path.unlink()
self.server = ControlServer(os.fspath(self.socket_path), self)
self.server_thread = threading.Thread(target=self.server.serve_forever, name="gui-control-server", daemon=True)
self.server_thread.start()
def do_shutdown(self) -> None:
if self.server is not None:
self.server.shutdown()
self.server.server_close()
if self.socket_path.exists():
self.socket_path.unlink()
Gtk.Application.do_shutdown(self)
def call_on_ui_thread(self, method: str, params: dict[str, Any]) -> Any:
outcome: dict[str, Any] = {}
event = threading.Event()
def invoke() -> bool:
try:
handler = getattr(self, f"rpc_{method}")
except AttributeError as exc:
outcome["error"] = RuntimeError(f"Unknown method: {method}")
else:
try:
outcome["result"] = handler(**params)
except Exception as exc: # noqa: BLE001
outcome["error"] = exc
event.set()
return False
GLib.idle_add(invoke)
event.wait()
if "error" in outcome:
raise outcome["error"]
return outcome.get("result")
def _format_tab_title(self, cwd: str | None = None, command: str | None = None, title: str | None = None) -> str:
normalized_title = (title or "").strip()
if normalized_title and normalized_title.lower() != "shell":
candidate = normalized_title
elif command:
try:
first_token = shlex.split(command)[0]
except ValueError:
first_token = command.strip().split()[0] if command.strip() else "shell"
candidate = os.path.basename(first_token) or first_token or "shell"
elif cwd:
expanded = os.path.expanduser(cwd)
home = os.path.expanduser("~")
if expanded == home:
candidate = "Home"
else:
candidate = os.path.basename(expanded.rstrip(os.sep)) or expanded
else:
candidate = "shell"
candidate = candidate.strip() or "shell"
if len(candidate) > 18:
return candidate[:15] + "..."
return candidate
def _update_tab_title(self, tab: TabState, title: str) -> None:
tab.title = title
if self.window is None:
return
page = self.window.stack.get_page(tab.container)
page.set_title(title)
def _dismiss_onboarding(self, tab: TabState) -> None:
if tab.welcome_revealer.is_visible() or tab.welcome_revealer.get_reveal_child():
tab.welcome_revealer.set_reveal_child(False)
tab.welcome_revealer.set_visible(False)
def _configure_tab_diagnostic_overlay(self, tab: TabState, config: dict[str, Any] | None) -> None:
tab.diagnostic_layer._diagnostic_config = config # type: ignore[attr-defined]
tab.diagnostic_layer.set_visible(config is not None)
tab.diagnostic_layer.queue_draw()
def _annotate_screenshot_with_diagnostics(
self,
image_path: Path,
*,
width: int,
height: int,
label: str,
bounds_in_window: dict[str, Any] | None,
) -> None:
surface = cairo.ImageSurface.create_from_png(os.fspath(image_path))
ctx = cairo.Context(surface)
self.draw_diagnostic_overlay(
ctx,
width,
height,
{
"grid_step": 48,
"bounds": {"x": 0.0, "y": 0.0, "width": width, "height": height},
"label": label,
},
)
if bounds_in_window is not None:
alloc_text = (
f"window alloc {int(bounds_in_window.get('x', 0))},{int(bounds_in_window.get('y', 0))} "
f"{int(bounds_in_window.get('width', width))}x{int(bounds_in_window.get('height', height))}"
)
ctx.set_source_rgba(0.08, 0.08, 0.08, 0.88)
ctx.rectangle(12.0, max(height - 34.0, 0.0), max(len(alloc_text) * 7.0, 170.0), 22.0)
ctx.fill()
ctx.set_source_rgba(1.0, 1.0, 1.0, 0.98)
ctx.select_font_face("Sans", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_NORMAL)
ctx.set_font_size(12.0)
ctx.move_to(18.0, max(height - 18.0, 12.0))
ctx.show_text(alloc_text)
surface.write_to_png(os.fspath(image_path))
def draw_diagnostic_overlay(
self,
ctx: Any,
width: int,
height: int,
config: dict[str, Any] | None,
) -> None:
if config is None:
return
try:
grid_step = int(config.get("grid_step", 48))
bounds = config.get("bounds") or {}
label = config.get("label", "")
ctx.set_source_rgba(1.0, 1.0, 1.0, 0.12)
ctx.set_line_width(1.0)
for x in range(grid_step, width, grid_step):
ctx.move_to(x + 0.5, 0)
ctx.line_to(x + 0.5, height)
for y in range(grid_step, height, grid_step):
ctx.move_to(0, y + 0.5)
ctx.line_to(width, y + 0.5)
ctx.stroke()
rect_x = float(bounds.get("x", 0.0))
rect_y = float(bounds.get("y", 0.0))
rect_width = float(bounds.get("width", width))
rect_height = float(bounds.get("height", height))
ctx.set_source_rgba(0.97, 0.36, 0.2, 0.18)
ctx.rectangle(rect_x, rect_y, rect_width, rect_height)
ctx.fill()
ctx.set_source_rgba(0.97, 0.36, 0.2, 0.95)
ctx.set_line_width(2.0)
ctx.rectangle(rect_x + 1.0, rect_y + 1.0, max(rect_width - 2.0, 1.0), max(rect_height - 2.0, 1.0))
ctx.stroke()
if label:
label_text = f"{label} {int(rect_width)}x{int(rect_height)} @ {int(rect_x)},{int(rect_y)}"
text_x = rect_x + 8.0
text_y = max(rect_y - 10.0, 20.0)
ctx.set_source_rgba(0.08, 0.08, 0.08, 0.88)
ctx.rectangle(text_x - 6.0, text_y - 16.0, max(len(label_text) * 7.2, 120.0), 20.0)
ctx.fill()
ctx.set_source_rgba(1.0, 1.0, 1.0, 0.98)
ctx.select_font_face("Sans", 0, 0)
ctx.set_font_size(12.0)
ctx.move_to(text_x, text_y)
ctx.show_text(label_text)
except Exception:
return
def create_tab(self, title: str | None = None, cwd: str | None = None, command: str | None = None) -> dict[str, Any]:
if self.window is None:
raise RuntimeError("Window has not been created yet")
working_directory = cwd or os.path.expanduser("~")
terminal = Vte.Terminal()
terminal.set_scrollback_lines(DEFAULT_SCROLLBACK_LINES)
terminal.set_hexpand(True)
terminal.set_vexpand(True)
key_controller = Gtk.EventControllerKey()
key_controller.connect("key-pressed", self._on_terminal_key_pressed)
terminal.add_controller(key_controller)
scroller = Gtk.ScrolledWindow()
scroller.set_child(terminal)
tab_overlay = Gtk.Overlay()
tab_overlay.set_child(scroller)
diagnostic_layer = Gtk.DrawingArea()
diagnostic_layer.set_hexpand(True)
diagnostic_layer.set_vexpand(True)
diagnostic_layer.set_can_target(False)
diagnostic_layer._diagnostic_config = None # type: ignore[attr-defined]
diagnostic_layer.set_draw_func(self._draw_tab_diagnostic_overlay)
tab_overlay.add_overlay(diagnostic_layer)
tab_overlay.set_measure_overlay(diagnostic_layer, True)
tab_overlay.set_clip_overlay(diagnostic_layer, False)
diagnostic_layer.set_visible(False)
welcome_revealer = Gtk.Revealer()
welcome_revealer.set_transition_type(Gtk.RevealerTransitionType.CROSSFADE)
welcome_revealer.set_halign(Gtk.Align.CENTER)
welcome_revealer.set_valign(Gtk.Align.CENTER)
welcome_revealer.set_can_target(False)
welcome_card = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
welcome_card.add_css_class("card")
welcome_card.set_margin_top(24)
welcome_card.set_margin_bottom(24)
welcome_card.set_margin_start(24)
welcome_card.set_margin_end(24)
welcome_card.set_size_request(320, -1)
welcome_title = Gtk.Label(label="Ready to control this shell")
welcome_title.add_css_class("heading")
welcome_title.set_wrap(True)
welcome_body = Gtk.Label(
label="Use MCP tools to open tabs, send commands, read output, and capture screenshots. Press any key or run a command to dismiss this panel."
)
welcome_body.add_css_class("caption")
welcome_body.set_wrap(True)
welcome_tip = Gtk.Label(label=f"Current workspace: {working_directory}")
welcome_tip.add_css_class("caption")
welcome_tip.set_wrap(True)
welcome_card.append(welcome_title)
welcome_card.append(welcome_body)
welcome_card.append(welcome_tip)
welcome_revealer.set_child(welcome_card)
tab_overlay.add_overlay(welcome_revealer)
tab_id = str(uuid.uuid4())
tab_title = self._format_tab_title(cwd=working_directory, command=command, title=title)
log_path = self.log_dir / f"{tab_id}.log"
log_path.write_text("", encoding="utf-8")
page_name = tab_id
self.window.stack.add_titled(tab_overlay, page_name, tab_title)
self.window.stack.set_visible_child_name(page_name)
self.window.stack.set_visible_child(tab_overlay)
self.window.sync_header_state(tab_count=len(self.tabs) + 1)
tab = TabState(
tab_id=tab_id,
title=tab_title,
container=tab_overlay,
scroller=scroller,
terminal=terminal,
welcome_revealer=welcome_revealer,
diagnostic_layer=diagnostic_layer,
log_path=log_path,
cwd=working_directory,
last_command=command,
)
self.tabs[tab_id] = tab
show_onboarding = len(self.tabs) == 1 and command is None
welcome_revealer.set_visible(show_onboarding)
welcome_revealer.set_reveal_child(show_onboarding)
terminal.connect("child-exited", self._on_child_exited, tab_id)
terminal.connect("window-title-changed", self._on_terminal_window_title_changed, tab_id)
self._spawn_shell(tab, cwd=working_directory, command=command)
self.window.present()
return self._serialize_tab(tab)
def _spawn_shell(self, tab: TabState, cwd: str | None, command: str | None) -> None:
working_directory = cwd or os.path.expanduser("~")
envv = [f"{key}={value}" for key, value in os.environ.items()]
shell = os.environ.get("SHELL", "/bin/bash")
quoted_log = shlex.quote(os.fspath(tab.log_path))
script_prefix = f"exec > >(tee -a {quoted_log}) 2>&1; "
if command:
argv = ["/bin/bash", "-lc", f"{script_prefix}{command}; exec {shlex.quote(shell)} -i"]
else:
argv = ["/bin/bash", "-lc", f"{script_prefix}exec {shlex.quote(shell)} -i"]
_success, _pid = tab.terminal.spawn_sync(
Vte.PtyFlags.DEFAULT,
working_directory,
argv,
envv,
GLib.SpawnFlags.DEFAULT,
None,
None,
None,
)
def _on_terminal_key_pressed(
self,
controller: Gtk.EventControllerKey,
_keyval: int,
_keycode: int,
_state: Gdk.ModifierType,
) -> bool:
widget = controller.get_widget()
if widget is None:
return False
tab = next((item for item in self.tabs.values() if item.terminal == widget), None)
if tab is not None:
self._dismiss_onboarding(tab)
return False
def _on_terminal_window_title_changed(self, terminal: Vte.Terminal, tab_id: str) -> None:
tab = self.tabs.get(tab_id)
if tab is None:
return
terminal_title = terminal.get_window_title() or ""
terminal_title = terminal_title.strip()
if terminal_title:
self._update_tab_title(tab, self._format_tab_title(title=terminal_title))
def _draw_tab_diagnostic_overlay(
self,
area: Gtk.DrawingArea,
ctx: Any,
width: int,
height: int,
) -> None:
config = getattr(area, "_diagnostic_config", None)
self.draw_diagnostic_overlay(ctx, width, height, config)
def _on_child_exited(self, terminal: Vte.Terminal, _status: int, tab_id: str) -> None:
tab = self.tabs.get(tab_id)
if tab is not None:
with tab.log_path.open("a", encoding="utf-8") as handle:
handle.write("\n# process exited\n")
self._update_tab_title(tab, self._format_tab_title(title=f"{tab.title} exited"))
def _serialize_tab(self, tab: TabState) -> dict[str, Any]:
active = False
if self.window is not None:
visible = self.window.stack.get_visible_child()
active = visible == tab.container
return {
"tab_id": tab.tab_id,
"title": tab.title,
"active": active,
"rows": tab.terminal.get_row_count(),
"columns": tab.terminal.get_column_count(),
}
def _gtype_name(self, obj: object) -> str:
gtype = getattr(obj, "__gtype__", None)
if gtype is not None:
name = getattr(gtype, "name", None)
if name:
return str(name)
return type(obj).__name__
def _widget_bounds(self, widget: Gtk.Widget) -> dict[str, float | int] | None:
width = widget.get_width() or widget.get_allocated_width()
height = widget.get_height() or widget.get_allocated_height()
if width <= 0 or height <= 0:
return None
bounds: dict[str, float | int] = {
"x": 0.0,
"y": 0.0,
"width": width,
"height": height,
}
if self.window is None or widget == self.window:
return bounds
try:
success, rect = widget.compute_bounds(self.window)
except TypeError:
return bounds
if success:
bounds["x"] = round(rect.get_x(), 2)
bounds["y"] = round(rect.get_y(), 2)
bounds["width"] = round(rect.get_width(), 2)
bounds["height"] = round(rect.get_height(), 2)
return bounds
def _build_screenshot_metadata(
self,
*,
widget: Gtk.Widget,
target: str,
renderer: Gsk.Renderer,
surface: Gdk.Surface,
path: Path,
tab_id: str | None,
) -> dict[str, Any]:
return {
"captured_at": datetime.now().isoformat(timespec="seconds"),
"target": target,
"tab_id": tab_id,
"path": os.fspath(path),
"widget": {
"type": self._gtype_name(widget),
"name": widget.get_name(),
"visible": widget.get_visible(),
"mapped": widget.get_mapped(),
"focusable": widget.get_focusable(),
"scale_factor": widget.get_scale_factor(),
"bounds_in_window": self._widget_bounds(widget),
},
"window": {
"type": self._gtype_name(self.window) if self.window is not None else None,
"title": self.window.get_title() if self.window is not None else None,
"bounds": self._widget_bounds(self.window) if self.window is not None else None,
},
"renderer": {
"type": self._gtype_name(renderer),
"realized": renderer.is_realized(),
},
"surface": {
"type": self._gtype_name(surface),
"scale": surface.get_scale(),
},
}
def _require_tab(self, tab_id: str) -> TabState:
try:
return self.tabs[tab_id]
except KeyError as exc:
raise RuntimeError(f"Unknown tab_id: {tab_id}") from exc
def _sanitize_text(self, text: str) -> str:
cleaned = OSC_ESCAPE_RE.sub("", text)
cleaned = ANSI_ESCAPE_RE.sub("", cleaned)
cleaned = SINGLE_ESCAPE_RE.sub("", cleaned)
cleaned = cleaned.replace("\r", "")
return cleaned
def _default_screenshot_path(self, target: str, tab_id: str | None = None) -> Path:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
suffix = tab_id if tab_id else target
return self.screenshot_dir / f"{timestamp}-{suffix}.png"
def _capture_widget_to_png(self, widget: Gtk.Widget, path: Path) -> dict[str, Any]:
paintable = Gtk.WidgetPaintable.new(widget)
width = widget.get_width() or widget.get_allocated_width()
height = widget.get_height() or widget.get_allocated_height()
if width <= 0 or height <= 0:
raise RuntimeError("The widget is not ready for capture yet; try again after the window has been presented")
native = widget.get_native()
if native is None:
raise RuntimeError("The widget is not attached to a native surface yet; try again after the window has been presented")
surface = native.get_surface()
if surface is None:
raise RuntimeError("The widget does not have an associated surface yet; try again after the window has been presented")
node = None
texture = None
renderer = None
viewport = Graphene.Rect()
viewport.init(0, 0, float(width), float(height))
main_context = GLib.MainContext.default()
last_error = "Failed to snapshot the widget into a render node"
for attempt in range(4):
while main_context.pending():
main_context.iteration(False)
snapshot = Gtk.Snapshot.new()
paintable.snapshot(snapshot, float(width), float(height))
node = snapshot.to_node()
if node is None:
last_error = "Failed to snapshot the widget into a render node"
if attempt < 3:
time.sleep(0.03)
continue
raise RuntimeError(last_error)
renderer = Gsk.Renderer.new_for_surface(surface)
if renderer is None:
raise RuntimeError("Failed to create a GSK renderer for the widget surface")
if not renderer.is_realized() and not renderer.realize(surface):
raise RuntimeError("Failed to realize the GSK renderer for screenshot capture")
texture = renderer.render_texture(node, viewport)
if texture is not None:
break
renderer.unrealize()
renderer = None
last_error = "Failed to render the widget snapshot into a texture"
if attempt < 3:
time.sleep(0.03)
continue
raise RuntimeError(last_error)
if renderer is None or texture is None:
raise RuntimeError(last_error)
path.parent.mkdir(parents=True, exist_ok=True)
saved = texture.save_to_png(os.fspath(path))
metadata = self._build_screenshot_metadata(
widget=widget,
target="",
renderer=renderer,
surface=surface,
path=path,
tab_id=None,
)
renderer.unrealize()
if not saved:
raise RuntimeError(f"Failed to save screenshot to {path}")
metadata_path = path.with_suffix(".json")
metadata_path.write_text(json.dumps(metadata, indent=2), encoding="utf-8")
return {
"path": os.fspath(path),
"metadata_path": os.fspath(metadata_path),
"metadata": metadata,
"width": width,
"height": height,
}
def rpc_ping(self) -> dict[str, str]:
return {"status": "ok"}
def rpc_list_tabs(self) -> list[dict[str, Any]]:
return [self._serialize_tab(tab) for tab in self.tabs.values()]
def rpc_open_tab(self, title: str | None = None, cwd: str | None = None, command: str | None = None) -> dict[str, Any]:
return self.create_tab(title=title, cwd=cwd, command=command)
def rpc_focus_tab(self, tab_id: str) -> dict[str, Any]:
tab = self._require_tab(tab_id)
if self.window is None:
raise RuntimeError("Window has not been created yet")
self.window.stack.set_visible_child(tab.container)
self.window.present()
return self._serialize_tab(tab)
def rpc_exec(self, tab_id: str, command: str, newline: bool = True) -> dict[str, Any]:
tab = self._require_tab(tab_id)
payload = command + ("\n" if newline else "")
with tab.log_path.open("a", encoding="utf-8") as handle:
handle.write(f"$ {command}\n")
tab.last_command = command
self._dismiss_onboarding(tab)
self._update_tab_title(tab, self._format_tab_title(cwd=tab.cwd, command=command))
tab.terminal.feed_child(list(payload.encode("utf-8")))
return {"tab_id": tab_id, "command": command}
def rpc_read_tab(self, tab_id: str, last_n_lines: int = 200) -> dict[str, Any]:
tab = self._require_tab(tab_id)
lines = tab.log_path.read_text(encoding="utf-8").splitlines()
tail_lines = lines[-last_n_lines:]
text = self._sanitize_text("\n".join(tail_lines))
return {
"tab_id": tab_id,
"line_count": len(lines),
"text": text,
}
def rpc_capture_screenshot(
self,
target: str = "window",
tab_id: str | None = None,
path: str | None = None,
diagnostic_overlay: bool = False,
) -> dict[str, Any]:
if self.window is None:
raise RuntimeError("Window has not been created yet")
target_widget: Gtk.Widget
selected_tab_id: str | None = tab_id
normalized_target = target.strip().lower()
if normalized_target == "window":
self.window.present()
target_widget = self.window
overlay_scope = "window"
elif normalized_target in {"active-tab", "active_tab", "tab", "terminal", "vte"}:
if selected_tab_id is None:
visible = self.window.stack.get_visible_child()
if visible is None:
raise RuntimeError("No active tab is available")
tab = next((item for item in self.tabs.values() if item.container == visible), None)
if tab is None:
raise RuntimeError("Unable to resolve the active tab")
else:
tab = self._require_tab(selected_tab_id)
self.window.stack.set_visible_child(tab.container)
self.window.present()
target_widget = tab.terminal
selected_tab_id = tab.tab_id
normalized_target = "tab"
overlay_scope = "terminal"
elif normalized_target in {"tab-container", "tab_container", "scroller"}:
if selected_tab_id is None:
visible = self.window.stack.get_visible_child()
if visible is None:
raise RuntimeError("No active tab is available")
tab = next((item for item in self.tabs.values() if item.container == visible), None)
if tab is None:
raise RuntimeError("Unable to resolve the active tab")
else:
tab = self._require_tab(selected_tab_id)
self.window.stack.set_visible_child(tab.container)
self.window.present()
target_widget = tab.container
selected_tab_id = tab.tab_id
normalized_target = "tab-container"
overlay_scope = "tab-container"
else:
raise RuntimeError("target must be one of: window, tab, tab-container")
screenshot_path = Path(path) if path else self._default_screenshot_path(normalized_target, tab_id=selected_tab_id)
try:
result = self._capture_widget_to_png(target_widget, screenshot_path)
except RuntimeError:
if normalized_target != "window":
raise
fallback_widget = self.window.get_child()
if fallback_widget is None or fallback_widget is target_widget:
raise
target_widget = fallback_widget
result = self._capture_widget_to_png(target_widget, screenshot_path)
result["metadata"]["target"] = normalized_target
result["metadata"]["tab_id"] = selected_tab_id
overlay_applied = False
if diagnostic_overlay:
overlay_label = f"{normalized_target} {self._gtype_name(target_widget)}"
self._annotate_screenshot_with_diagnostics(
screenshot_path,
width=result["width"],
height=result["height"],
label=overlay_label,
bounds_in_window=result["metadata"]["widget"].get("bounds_in_window"),
)
overlay_applied = True
result["metadata"]["diagnostic_overlay"] = {
"requested": diagnostic_overlay,
"applied": overlay_applied,
"scope": overlay_scope,
"mode": "postprocess-cairo",
}
Path(result["metadata_path"]).write_text(json.dumps(result["metadata"], indent=2), encoding="utf-8")
result["target"] = normalized_target
if selected_tab_id is not None:
result["tab_id"] = selected_tab_id
return result
def rpc_close_tab(self, tab_id: str) -> dict[str, Any]:
tab = self._require_tab(tab_id)
if self.window is None:
raise RuntimeError("Window has not been created yet")
self.window.stack.remove(tab.container)
del self.tabs[tab_id]
if tab.log_path.exists():
tab.log_path.unlink()
if not self.tabs:
self.create_tab(title="shell")
elif self.window is not None:
self.window.sync_header_state(tab_count=len(self.tabs))
return {"closed": tab_id}
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="mcp-hal9002 GTK4/VTE terminal workspace controllable through a local Unix socket")
parser.add_argument("--socket", type=Path, default=default_socket_path(), help="Unix socket path for the local control plane")
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
app = TerminalApp(socket_path=args.socket)
app.run([])
if __name__ == "__main__":
main()

140
src/gnome_vte_mcp/server.py

@ -0,0 +1,140 @@
from __future__ import annotations
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.utilities.types import Image
from .control import ControlError, default_socket_path, request, wait_for_socket
mcp = FastMCP("mcp-hal9002")
def _socket_path() -> Path:
override = os.environ.get("MCP_HAL9002_SOCKET") or os.environ.get("GNOME_VTE_MCP_SOCKET")
return Path(override) if override else default_socket_path()
def _launch_gui(socket_path: Path) -> None:
env = os.environ.copy()
command = [sys.executable, "-m", "gnome_vte_mcp.gui", "--socket", os.fspath(socket_path)]
subprocess.Popen( # noqa: S603
command,
env=env,
start_new_session=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def ensure_gui() -> Path:
socket_path = _socket_path()
try:
request("ping", socket_path=socket_path, timeout=0.5)
return socket_path
except (OSError, ControlError):
pass
_launch_gui(socket_path)
if not wait_for_socket(socket_path=socket_path, timeout=10.0):
raise RuntimeError(
"The GTK/VTE GUI did not start in time. Ensure the MCP server runs inside a graphical session with DISPLAY or WAYLAND_DISPLAY available."
)
return socket_path
def call_gui(method: str, **params: Any) -> Any:
socket_path = ensure_gui()
return request(method, params=params, socket_path=socket_path)
@mcp.tool()
def open_tab(title: str | None = None, cwd: str | None = None, command: str | None = None) -> dict[str, Any]:
"""Open a new terminal tab in the GTK/VTE prototype window."""
return call_gui("open_tab", title=title, cwd=cwd, command=command)
@mcp.tool()
def list_tabs() -> list[dict[str, Any]]:
"""List the currently known tabs managed by the prototype GUI."""
return call_gui("list_tabs")
@mcp.tool()
def focus_tab(tab_id: str) -> dict[str, Any]:
"""Bring a specific tab to the foreground in the prototype GUI."""
return call_gui("focus_tab", tab_id=tab_id)
@mcp.tool()
def exec_command(tab_id: str, command: str, newline: bool = True) -> dict[str, Any]:
"""Send a command to an existing tab's shell process."""
return call_gui("exec", tab_id=tab_id, command=command, newline=newline)
@mcp.tool()
def read_tab(tab_id: str, last_n_lines: int = 200) -> dict[str, Any]:
"""Read the trailing scrollback text from a tab."""
return call_gui("read_tab", tab_id=tab_id, last_n_lines=last_n_lines)
@mcp.tool()
def close_tab(tab_id: str) -> dict[str, Any]:
"""Close a tab in the prototype GUI."""
return call_gui("close_tab", tab_id=tab_id)
@mcp.tool()
def capture_screenshot(
target: str = "window",
tab_id: str | None = None,
path: str | None = None,
diagnostic_overlay: bool = False,
) -> list[Any]:
"""Capture a PNG screenshot of the full window, the VTE content of a tab, or a tab container and return it as MCP image content."""
result = call_gui(
"capture_screenshot",
target=target,
tab_id=tab_id,
path=path,
diagnostic_overlay=diagnostic_overlay,
)
summary_parts = [
f"Captured screenshot {result['path']}",
f"target={result['target']}",
f"size={result['width']}x{result['height']}",
f"widget={result['metadata']['widget']['type']}",
f"renderer={result['metadata']['renderer']['type']}",
f"metadata={result['metadata_path']}",
]
if "tab_id" in result:
summary_parts.append(f"tab_id={result['tab_id']}")
overlay = result["metadata"].get("diagnostic_overlay", {})
if overlay.get("requested"):
summary_parts.append(f"overlay={overlay.get('applied')}")
bounds = result["metadata"]["widget"].get("bounds_in_window")
if bounds is not None:
summary_parts.append(
f"alloc={bounds['x']},{bounds['y']} {bounds['width']}x{bounds['height']}"
)
return [
" | ".join(summary_parts),
json.dumps(result["metadata"], indent=2),
Image(path=result["path"]),
]
def main() -> None:
mcp.run()
if __name__ == "__main__":
main()
Loading…
Cancel
Save