Browse Source

feat: Enhance MCP HAL9002 with command tracking and GUI integration

- Added support for command start and end markers in the terminal GUI.
- Implemented command event logging with timestamps and exit codes.
- Introduced manual submission handling for commands in the terminal.
- Enhanced the GUI to manage command execution status and results.
- Added new RPC methods for querying command results and statuses.
- Updated the pyproject.toml to include pycairo as a dependency.
- Created configuration files for VS Code to streamline development.
- Added usage rules for MCP in the GitHub repository.
main
Matteo Benedetto 3 months ago
parent
commit
0ce2af33a8
  1. 7
      .github/copilot-instructions.md
  2. 18
      .vscode/mcp.json
  3. 3
      .vscode/settings.json
  4. 52
      README.md
  5. 3
      pyproject.toml
  6. 513
      src/gnome_vte_mcp/gui.py
  7. 391
      src/gnome_vte_mcp/server.py

7
.github/copilot-instructions.md

@ -0,0 +1,7 @@
# MCP usage rules for this workspace
- When this workspace MCP server `mcpHal9002` is available in the tool runtime, use its tools directly instead of calling the local Python module as a fallback.
- Only fall back to local Python entrypoints when the required MCP capability is not exposed in the current tool runtime.
- If a behavior looks inconsistent with the current source code after changes, suspect a stale VS Code MCP process or cached tool schema first.
- In that case, explicitly warn the user that the issue might be caused by VS Code not having restarted or refreshed the MCP server yet.
- Prefer asking the user to run `MCP: List Servers`, restart `mcpHal9002`, or `MCP: Reset Cached Tools` before diagnosing deeper runtime mismatches.

18
.vscode/mcp.json vendored

@ -0,0 +1,18 @@
{
"servers": {
"mcpHal9002": {
"type": "stdio",
"command": "${workspaceFolder}/.venv/bin/python",
"args": [
"-m",
"gnome_vte_mcp.server"
],
"env": {
"PYTHONPATH": "${workspaceFolder}/src"
},
"dev": {
"watch": "${workspaceFolder}/src/**/*.py"
}
}
}
}

3
.vscode/settings.json vendored

@ -0,0 +1,3 @@
{
"chat.mcp.autostart": true
}

52
README.md

@ -32,18 +32,18 @@ Il progetto evita di dipendere da API non ufficiali di GNOME Terminal. Invece es
Il server MCP lancia automaticamente la GUI se non e gia in esecuzione.
La GUI e considerata una finestra gestita dal server MCP: il flusso previsto e aprirla, portarla in primo piano e chiuderla tramite tool MCP, non tramite una CLI separata.
## Uso con uvx
Entrypoint principali:
- `mcp-hal9002` avvia il server MCP
- `mcp-hal9002-gui` avvia la GUI GTK/VTE
Esempi locali dal checkout:
```bash
uvx --from . mcp-hal9002
uvx --from . mcp-hal9002-gui
```
Esempio da repository Git:
@ -52,11 +52,25 @@ Esempio da repository Git:
uvx --from git+https://git.enne2.net/enne2/mcp-hal9002.git mcp-hal9002
```
## Uso diretto in VS Code con Copilot
Il workspace include una configurazione [ .vscode/mcp.json ] che registra `mcp-hal9002` come server MCP `stdio` per GitHub Copilot usando il Python del virtualenv locale e `PYTHONPATH=${workspaceFolder}/src`.
Nel workspace e presente anche [ .vscode/settings.json ] con `chat.mcp.autostart=true`, cosi VS Code puo riavviare automaticamente il server quando la configurazione cambia.
Per usarlo in Copilot:
1. apri questo workspace in VS Code
2. assicurati che `.venv` esista e contenga le dipendenze del progetto
3. apri Chat e conferma la trust prompt del server MCP quando VS Code la mostra
4. se i tool non compaiono subito, esegui `MCP: List Servers` oppure `MCP: Reset Cached Tools`
## Requisiti
Nel sistema devono essere disponibili:
- Python 3.12+
- package Python `pycairo`
- PyGObject (`gi`)
- GTK4
- VTE con binding GI (`Vte 3.91`)
@ -64,7 +78,9 @@ Nel sistema devono essere disponibili:
In questa macchina il runtime necessario risulta disponibile.
## Avvio locale della GUI
## Debug locale della GUI
La GUI continua ad avere un modulo eseguibile per debug e sviluppo, ma il percorso normale e pilotarla dal server MCP con `open_gui()` e chiuderla con `close_gui()`.
```bash
PYTHONPATH=src python3 -m gnome_vte_mcp.gui
@ -92,13 +108,37 @@ La variabile legacy `GNOME_VTE_MCP_SOCKET` resta accettata per compatibilita.
## Tool MCP disponibili
- `gui_status()`
- `open_gui()`
- `open_tab(title=None, cwd=None, command=None)`
- `list_tabs()`
- `focus_tab(tab_id)`
- `exec_command(tab_id, command, newline=True)`
- `exec_command(tab_id, command, newline=True, poll_interval=0.1)`
- `read_tab(tab_id, last_n_lines=200)`
- `read_last_command_result(tab_id)`
- `wait_for_running_command(tab_id, timeout=None, poll_interval=0.1)`
- `wait_for_command_result(tab_id, after_sequence=None, timeout=None, poll_interval=0.1)`
- `capture_screenshot(target="window", tab_id=None, path=None, diagnostic_overlay=False)`
- `close_tab(tab_id)`
- `close_gui()`
Lifecycle GUI:
- `open_gui()` avvia la GUI se manca e porta in primo piano la finestra condivisa
- `gui_status()` restituisce lo stato senza creare nuove istanze
- `close_gui()` chiude la finestra condivisa se e in esecuzione
- tutti gli altri tool che richiedono la GUI continuano a riusare la stessa istanza attraverso il socket locale
Per la lettura testuale dei comandi:
- `read_tab(...)` restituisce lo scrollback recente, utile per debugging grezzo
- `exec_command(...)` scrive il testo nel terminale, poi resta bloccato indefinitamente finche l'utente non preme `Enter` manualmente nella GUI; il parametro `newline` resta ignorato per compatibilita
- dopo eventuali modifiche manuali nella GUI, premi `Enter` tu per sbloccare davvero `exec_command(...)` e inviare il comando alla shell
- per comandi che aprono una sessione interattiva delegata come `ssh`, `arca` o una subshell, la tab non viene trattata come bloccata: puoi continuare a scrivere nuovi comandi nella stessa sessione senza aspettare il ritorno della shell locale
- `wait_for_running_command(...)` aspetta il completamento del comando che e gia in esecuzione nella tab quando il terminale e occupato e non puoi ancora inviare un nuovo comando
- se la tab e dentro una sessione interattiva delegata, `wait_for_running_command(...)` fallisce esplicitamente perche il completamento tracciato tornera disponibile solo quando esci da quella sessione
- `wait_for_command_result(...)` aspetta in modo bloccante il completamento del comando dopo l'`after_sequence` restituito da `exec_command(...)` e restituisce `command`, `cwd`, `cwd_after`, `started_at`, `finished_at`, `duration_seconds`, `exit_code` e `text`; se `timeout` e omesso o `<= 0`, l'attesa e indefinita
- `read_last_command_result(tab_id)` restituisce l'ultimo comando completato con gli stessi metadati temporali e di path
Per `capture_screenshot`:
@ -108,7 +148,7 @@ Per `capture_screenshot`:
- `path` permette di scegliere il file PNG di destinazione
- `diagnostic_overlay=True` annota lo screenshot con una griglia e i bounds del target per `window`, `tab` e `tab-container`
Il tool restituisce contenuto MCP di tipo testo + immagine, seguendo lo stesso approccio di `local-image-mcp`, quindi l'agente puo ispezionare direttamente lo screenshot senza passare da un path locale separato.
Il tool restituisce un payload JSON serializzabile con path, dimensioni e metadati del capture, cosi il bridge MCP non dipende da tipi immagine custom.
Per ogni screenshot viene salvato anche un file JSON sidecar con metadati di debug UI, tra cui:
@ -125,7 +165,7 @@ La prima tab mostra un piccolo pannello di onboarding centrato sopra il terminal
Il pannello sparisce automaticamente:
- alla prima pressione di tasto dentro il terminale
- al primo `exec_command(...)` inviato via MCP
- al primo `exec_command(...)` inviato via MCP oppure al primo comando eseguito manualmente
## Titoli tab dinamici

3
pyproject.toml

@ -10,6 +10,7 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"mcp>=1.0.0",
"pycairo>=1.26.0",
]
[project.urls]
@ -17,8 +18,6 @@ Repository = "https://git.enne2.net/enne2/mcp-hal9002"
[project.scripts]
mcp-hal9002 = "gnome_vte_mcp.server:main"
mcp-hal9002-gui = "gnome_vte_mcp.gui:main"
gnome-vte-mcp-gui = "gnome_vte_mcp.gui:main"
gnome-vte-mcp-server = "gnome_vte_mcp.server:main"
[tool.setuptools]

513
src/gnome_vte_mcp/gui.py

@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
import base64
import cairo
from datetime import datetime
import json
@ -32,6 +33,8 @@ DEFAULT_SCROLLBACK_LINES = 10000
ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
OSC_ESCAPE_RE = re.compile(r"\x1b\].*?(?:\x07|\x1b\\)", re.DOTALL)
SINGLE_ESCAPE_RE = re.compile(r"\x1b[@-_]")
COMMAND_START_MARKER = "__MCP_HAL9002_CMD_START__"
COMMAND_END_MARKER = "__MCP_HAL9002_CMD_END__"
@dataclass
@ -44,8 +47,26 @@ class TabState:
welcome_revealer: Gtk.Revealer
diagnostic_layer: Gtk.DrawingArea
log_path: Path
command_events_path: Path
history_path: Path
shell_rc_path: Path
cwd: str
last_command: str | None
last_command_token: str | None
pending_submit_id: str | None
pending_submit_text: str | None
pending_submit_requested_at: str | None
last_manual_submit_id: str | None
last_manual_submit_text: str | None
last_manual_submit_at: str | None
current_execution_submission_id: str | None
current_execution_command: str | None
current_execution_started_at: str | None
current_execution_after_sequence: int | None
delegated_session_submission_id: str | None
delegated_session_command: str | None
delegated_session_started_at: str | None
delegated_session_after_sequence: int | None
class TerminalWindow(Gtk.ApplicationWindow):
@ -426,7 +447,12 @@ class TerminalApp(Gtk.Application):
tab_id = str(uuid.uuid4())
tab_title = self._format_tab_title(cwd=working_directory, command=command, title=title)
log_path = self.log_dir / f"{tab_id}.log"
command_events_path = self.log_dir / f"{tab_id}.commands.tsv"
history_path = self.log_dir / f"{tab_id}.history"
shell_rc_path = self.log_dir / f"{tab_id}.bashrc"
log_path.write_text("", encoding="utf-8")
command_events_path.write_text("", encoding="utf-8")
history_path.write_text("", encoding="utf-8")
page_name = tab_id
self.window.stack.add_titled(tab_overlay, page_name, tab_title)
self.window.stack.set_visible_child_name(page_name)
@ -442,8 +468,26 @@ class TerminalApp(Gtk.Application):
welcome_revealer=welcome_revealer,
diagnostic_layer=diagnostic_layer,
log_path=log_path,
command_events_path=command_events_path,
history_path=history_path,
shell_rc_path=shell_rc_path,
cwd=working_directory,
last_command=command,
last_command=None,
last_command_token=None,
pending_submit_id=None,
pending_submit_text=None,
pending_submit_requested_at=None,
last_manual_submit_id=None,
last_manual_submit_text=None,
last_manual_submit_at=None,
current_execution_submission_id=None,
current_execution_command=None,
current_execution_started_at=None,
current_execution_after_sequence=None,
delegated_session_submission_id=None,
delegated_session_command=None,
delegated_session_started_at=None,
delegated_session_after_sequence=None,
)
self.tabs[tab_id] = tab
show_onboarding = len(self.tabs) == 1 and command is None
@ -452,20 +496,116 @@ class TerminalApp(Gtk.Application):
terminal.connect("child-exited", self._on_child_exited, tab_id)
terminal.connect("window-title-changed", self._on_terminal_window_title_changed, tab_id)
self._spawn_shell(tab, cwd=working_directory, command=command)
self._write_shell_rc_file(tab)
self._spawn_shell(tab, cwd=working_directory)
if command:
GLib.timeout_add(150, self._feed_terminal_input, tab_id, command + "\n")
self.window.present()
return self._serialize_tab(tab)
def _spawn_shell(self, tab: TabState, cwd: str | None, command: str | None) -> None:
def _command_start_marker(self, token: str) -> str:
return f"{COMMAND_START_MARKER}:{token}"
def _command_end_marker_prefix(self, token: str) -> str:
return f"{COMMAND_END_MARKER}:{token}:"
def _shell_hook_rc_content(self, tab: TabState) -> str:
transcript_file = shlex.quote(os.fspath(tab.log_path))
events_file = shlex.quote(os.fspath(tab.command_events_path))
history_file = shlex.quote(os.fspath(tab.history_path))
return f"""# Auto-generated by mcp-hal9002
[[ -f ~/.bashrc ]] && source ~/.bashrc
export HISTFILE={history_file}
export HISTSIZE=50000
export HISTFILESIZE=50000
export HISTCONTROL=
shopt -s histappend cmdhist lithist
export __MCP_HAL9002_TRANSCRIPT_FILE={transcript_file}
export __MCP_HAL9002_EVENTS_FILE={events_file}
export __MCP_HAL9002_ACTIVE_TOKEN=""
export __MCP_HAL9002_ACTIVE_STARTED_AT=""
export __MCP_HAL9002_ACTIVE_CWD=""
export __MCP_HAL9002_LAST_HISTORY_NUM=""
export __MCP_HAL9002_SEQUENCE=0
export __MCP_HAL9002_IN_HOOK=0
__mcp_hal9002_b64() {{
printf '%s' "$1" | base64 | tr -d '\\n'
}}
__mcp_hal9002_debug_trap() {{
[[ "${{__MCP_HAL9002_IN_HOOK:-0}}" = 1 ]] && return 0
case "$BASH_COMMAND" in
__mcp_hal9002_*|history*|builtin\\ history* ) return 0 ;;
esac
if [[ -z "${{__MCP_HAL9002_ACTIVE_TOKEN:-}}" ]]; then
__MCP_HAL9002_ACTIVE_TOKEN="${{EPOCHREALTIME:-0}}-$$-$RANDOM"
__MCP_HAL9002_ACTIVE_STARTED_AT="${{EPOCHREALTIME:-0}}"
__MCP_HAL9002_ACTIVE_CWD="$PWD"
printf '%s\\n' "{COMMAND_START_MARKER}:${{__MCP_HAL9002_ACTIVE_TOKEN}}" >> "$__MCP_HAL9002_TRANSCRIPT_FILE"
fi
return 0
}}
__mcp_hal9002_prompt_hook() {{
local exit_code=$?
local finished_at="${{EPOCHREALTIME:-0}}"
local hist_line=""
local hist_num=""
local command=""
local seq=""
__MCP_HAL9002_IN_HOOK=1
hist_line=$(HISTTIMEFORMAT= builtin history 1 2>/dev/null || true)
__MCP_HAL9002_IN_HOOK=0
if [[ -n "$hist_line" && "$hist_line" =~ ^[[:space:]]*([0-9]+)[[:space:]](.*)$ ]]; then
hist_num="${{BASH_REMATCH[1]}}"
command="${{BASH_REMATCH[2]}}"
fi
if [[ -n "${{__MCP_HAL9002_ACTIVE_TOKEN:-}}" && -n "$hist_num" && "$hist_num" != "${{__MCP_HAL9002_LAST_HISTORY_NUM:-}}" ]]; then
__MCP_HAL9002_LAST_HISTORY_NUM="$hist_num"
__MCP_HAL9002_SEQUENCE=$(( ${{__MCP_HAL9002_SEQUENCE:-0}} + 1 ))
seq="${{__MCP_HAL9002_SEQUENCE}}"
printf '%s\\n' "{COMMAND_END_MARKER}:${{__MCP_HAL9002_ACTIVE_TOKEN}}:$exit_code" >> "$__MCP_HAL9002_TRANSCRIPT_FILE"
printf '%s\\t%s\\t%s\\t%s\\t%s\\t%s\\t%s\\t%s\\n' \
"$seq" \
"${{__MCP_HAL9002_ACTIVE_TOKEN}}" \
"$exit_code" \
"${{__MCP_HAL9002_ACTIVE_STARTED_AT:-0}}" \
"$finished_at" \
"$(__mcp_hal9002_b64 "${{__MCP_HAL9002_ACTIVE_CWD:-$PWD}}")" \
"$(__mcp_hal9002_b64 "$PWD")" \
"$(__mcp_hal9002_b64 "$command")" >> "$__MCP_HAL9002_EVENTS_FILE"
fi
__MCP_HAL9002_ACTIVE_TOKEN=""
__MCP_HAL9002_ACTIVE_STARTED_AT=""
__MCP_HAL9002_ACTIVE_CWD=""
return 0
}}
trap '__mcp_hal9002_debug_trap' DEBUG
PROMPT_COMMAND='__mcp_hal9002_prompt_hook'
"""
def _write_shell_rc_file(self, tab: TabState) -> None:
tab.shell_rc_path.write_text(self._shell_hook_rc_content(tab), encoding="utf-8")
def _spawn_shell(self, tab: TabState, cwd: str | None) -> None:
working_directory = cwd or os.path.expanduser("~")
envv = [f"{key}={value}" for key, value in os.environ.items()]
shell = os.environ.get("SHELL", "/bin/bash")
quoted_log = shlex.quote(os.fspath(tab.log_path))
script_prefix = f"exec > >(tee -a {quoted_log}) 2>&1; "
if command:
argv = ["/bin/bash", "-lc", f"{script_prefix}{command}; exec {shlex.quote(shell)} -i"]
else:
argv = ["/bin/bash", "-lc", f"{script_prefix}exec {shlex.quote(shell)} -i"]
argv = [
"/bin/bash",
"-lc",
f"{script_prefix}exec /bin/bash --rcfile {shlex.quote(os.fspath(tab.shell_rc_path))} -i",
]
_success, _pid = tab.terminal.spawn_sync(
Vte.PtyFlags.DEFAULT,
@ -478,10 +618,61 @@ class TerminalApp(Gtk.Application):
None,
)
def _feed_terminal_input(self, tab_id: str, payload: str) -> bool:
tab = self.tabs.get(tab_id)
if tab is None:
return False
tab.terminal.paste_text(payload)
return False
def _looks_like_interactive_handoff(self, command: str) -> bool:
try:
tokens = shlex.split(command)
except ValueError:
tokens = command.strip().split()
if not tokens:
return False
executable = os.path.basename(tokens[0])
if executable in {"arca", "bash", "sh", "zsh", "fish", "tmux", "screen", "nu"}:
return True
if executable in {"python", "python3", "ipython", "bpython", "node"}:
return len(tokens) == 1
if executable not in {"ssh", "mosh"}:
return False
options_with_value = {
"-B", "-b", "-c", "-D", "-E", "-e", "-F", "-I", "-i", "-J", "-L", "-l", "-m",
"-O", "-o", "-p", "-Q", "-R", "-S", "-W", "-w",
}
destination_seen = False
index = 1
while index < len(tokens):
token = tokens[index]
if not destination_seen:
if token == "--":
destination_seen = True
index += 1
continue
if token.startswith("-"):
if token in options_with_value:
index += 2
continue
if len(token) == 2 and token[0] == "-" and token in options_with_value:
index += 2
continue
index += 1
continue
destination_seen = True
index += 1
continue
return False
return destination_seen
def _on_terminal_key_pressed(
self,
controller: Gtk.EventControllerKey,
_keyval: int,
keyval: int,
_keycode: int,
_state: Gdk.ModifierType,
) -> bool:
@ -491,6 +682,27 @@ class TerminalApp(Gtk.Application):
tab = next((item for item in self.tabs.values() if item.terminal == widget), None)
if tab is not None:
self._dismiss_onboarding(tab)
if keyval in {Gdk.KEY_Return, Gdk.KEY_KP_Enter, Gdk.KEY_ISO_Enter} and tab.pending_submit_id is not None:
submitted_at = datetime.now().astimezone().isoformat(timespec="milliseconds")
after_sequence = self._latest_command_sequence(tab)
delegated = self._looks_like_interactive_handoff(tab.pending_submit_text or "")
tab.last_manual_submit_id = tab.pending_submit_id
tab.last_manual_submit_text = tab.pending_submit_text
tab.last_manual_submit_at = submitted_at
if delegated:
tab.delegated_session_submission_id = tab.pending_submit_id
tab.delegated_session_command = tab.pending_submit_text
tab.delegated_session_started_at = submitted_at
tab.delegated_session_after_sequence = after_sequence
self._clear_current_execution(tab)
else:
tab.current_execution_submission_id = tab.pending_submit_id
tab.current_execution_command = tab.pending_submit_text
tab.current_execution_started_at = submitted_at
tab.current_execution_after_sequence = after_sequence
tab.pending_submit_id = None
tab.pending_submit_text = None
tab.pending_submit_requested_at = None
return False
def _on_terminal_window_title_changed(self, terminal: Vte.Terminal, tab_id: str) -> None:
@ -620,11 +832,196 @@ class TerminalApp(Gtk.Application):
cleaned = cleaned.replace("\r", "")
return cleaned
def _parse_epoch(self, value: str) -> float:
return float(value.replace(",", "."))
def _parse_command_events(self, tab: TabState) -> list[dict[str, Any]]:
if not tab.command_events_path.exists():
return []
events: list[dict[str, Any]] = []
for raw_line in tab.command_events_path.read_text(encoding="utf-8").splitlines():
if not raw_line.strip():
continue
parts = raw_line.split("\t")
if len(parts) != 8:
continue
try:
started_epoch = self._parse_epoch(parts[3])
finished_epoch = self._parse_epoch(parts[4])
event = {
"sequence": int(parts[0]),
"token": parts[1],
"exit_code": int(parts[2]),
"started_epoch": started_epoch,
"finished_epoch": finished_epoch,
"started_at": datetime.fromtimestamp(started_epoch).astimezone().isoformat(timespec="seconds"),
"finished_at": datetime.fromtimestamp(finished_epoch).astimezone().isoformat(timespec="seconds"),
"duration_seconds": round(max(finished_epoch - started_epoch, 0.0), 6),
"cwd": base64.b64decode(parts[5]).decode("utf-8"),
"cwd_after": base64.b64decode(parts[6]).decode("utf-8"),
"command": base64.b64decode(parts[7]).decode("utf-8"),
}
except Exception:
continue
events.append(event)
return events
def _latest_command_event(self, tab: TabState) -> dict[str, Any] | None:
events = self._parse_command_events(tab)
return events[-1] if events else None
def _latest_command_sequence(self, tab: TabState) -> int:
latest = self._latest_command_event(tab)
return int(latest["sequence"]) if latest is not None else 0
def _manual_submit_status(self, tab: TabState, submission_id: str) -> dict[str, Any]:
if tab.last_manual_submit_id == submission_id:
return {
"tab_id": tab.tab_id,
"submission_id": submission_id,
"submitted": True,
"submitted_at": tab.last_manual_submit_at,
"written_text": tab.last_manual_submit_text,
"current_sequence": self._latest_command_sequence(tab),
}
if tab.pending_submit_id == submission_id:
return {
"tab_id": tab.tab_id,
"submission_id": submission_id,
"submitted": False,
"requested_at": tab.pending_submit_requested_at,
"written_text": tab.pending_submit_text,
"current_sequence": self._latest_command_sequence(tab),
}
raise RuntimeError(f"Unknown manual submission id for tab_id={tab.tab_id}: {submission_id}")
def _clear_current_execution(self, tab: TabState) -> None:
tab.current_execution_submission_id = None
tab.current_execution_command = None
tab.current_execution_started_at = None
tab.current_execution_after_sequence = None
def _clear_delegated_session(self, tab: TabState) -> None:
tab.delegated_session_submission_id = None
tab.delegated_session_command = None
tab.delegated_session_started_at = None
tab.delegated_session_after_sequence = None
def _delegated_session_status(self, tab: TabState) -> dict[str, Any]:
if tab.delegated_session_submission_id is None:
return {
"tab_id": tab.tab_id,
"state": "idle",
"current_sequence": self._latest_command_sequence(tab),
}
after_sequence = int(tab.delegated_session_after_sequence or 0)
current_sequence = self._latest_command_sequence(tab)
if current_sequence > after_sequence:
self._clear_delegated_session(tab)
return {
"tab_id": tab.tab_id,
"state": "idle",
"current_sequence": current_sequence,
}
return {
"tab_id": tab.tab_id,
"state": "interactive-session",
"submission_id": tab.delegated_session_submission_id,
"command": tab.delegated_session_command,
"started_at": tab.delegated_session_started_at,
"after_sequence": after_sequence,
"current_sequence": current_sequence,
}
def _running_command_status(self, tab: TabState) -> dict[str, Any]:
delegated_status = self._delegated_session_status(tab)
if delegated_status["state"] == "interactive-session":
return delegated_status
if tab.current_execution_submission_id is None:
return {
"tab_id": tab.tab_id,
"state": "idle",
"current_sequence": self._latest_command_sequence(tab),
}
after_sequence = int(tab.current_execution_after_sequence or 0)
current_sequence = self._latest_command_sequence(tab)
status = {
"tab_id": tab.tab_id,
"submission_id": tab.current_execution_submission_id,
"command": tab.current_execution_command,
"started_at": tab.current_execution_started_at,
"after_sequence": after_sequence,
"current_sequence": current_sequence,
}
if current_sequence > after_sequence:
latest = self._latest_command_event(tab)
status["state"] = "completed"
if latest is not None:
status["sequence"] = latest["sequence"]
status["finished_at"] = latest["finished_at"]
status["exit_code"] = latest["exit_code"]
return status
status["state"] = "running"
return status
def _default_screenshot_path(self, target: str, tab_id: str | None = None) -> Path:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
suffix = tab_id if tab_id else target
return self.screenshot_dir / f"{timestamp}-{suffix}.png"
def _extract_command_result(self, tab: TabState, token: str) -> dict[str, Any]:
lines = self._sanitize_text(tab.log_path.read_text(encoding="utf-8")).splitlines()
start_marker = self._command_start_marker(token)
end_prefix = self._command_end_marker_prefix(token)
start_index: int | None = None
end_index: int | None = None
exit_code: int | None = None
for index, line in enumerate(lines):
if line == start_marker:
start_index = index
end_index = None
exit_code = None
continue
if start_index is not None and line.startswith(end_prefix):
end_index = index
try:
exit_code = int(line[len(end_prefix):])
except ValueError:
exit_code = None
if start_index is None:
raise RuntimeError("No tracked command result is available for this tab yet")
if end_index is None:
raise RuntimeError("The last tracked command is still running or has not produced a completion marker yet")
text = "\n".join(lines[start_index + 1:end_index]).strip("\n")
latest = next((event for event in reversed(self._parse_command_events(tab)) if event["token"] == token), None)
if latest is None:
raise RuntimeError("Tracked command metadata is not available for this tab yet")
tab.last_command = str(latest["command"])
tab.last_command_token = token
return {
"tab_id": tab.tab_id,
"sequence": latest["sequence"],
"command": latest["command"],
"cwd": latest["cwd"],
"cwd_after": latest["cwd_after"],
"started_at": latest["started_at"],
"finished_at": latest["finished_at"],
"started_epoch": latest["started_epoch"],
"finished_epoch": latest["finished_epoch"],
"duration_seconds": latest["duration_seconds"],
"exit_code": exit_code,
"text": text,
}
def _capture_widget_to_png(self, widget: Gtk.Widget, path: Path) -> dict[str, Any]:
paintable = Gtk.WidgetPaintable.new(widget)
width = widget.get_width() or widget.get_allocated_width()
@ -709,6 +1106,39 @@ class TerminalApp(Gtk.Application):
def rpc_ping(self) -> dict[str, str]:
return {"status": "ok"}
def _visible_tab(self) -> TabState | None:
if self.window is None:
return None
visible = self.window.stack.get_visible_child()
if visible is None:
return None
return next((item for item in self.tabs.values() if item.container == visible), None)
def _quit_application(self) -> bool:
self.quit()
return False
def rpc_gui_status(self) -> dict[str, Any]:
visible_tab = self._visible_tab()
return {
"tab_count": len(self.tabs),
"active_tab_id": visible_tab.tab_id if visible_tab is not None else None,
"tab_ids": list(self.tabs.keys()),
"window_title": self.window.get_title() if self.window is not None else None,
}
def rpc_show_window(self) -> dict[str, Any]:
if self.window is None:
raise RuntimeError("Window has not been created yet")
self.window.present()
return self.rpc_gui_status()
def rpc_close_window(self) -> dict[str, Any]:
status = self.rpc_gui_status()
GLib.idle_add(self._quit_application)
status["closed"] = True
return status
def rpc_list_tabs(self) -> list[dict[str, Any]]:
return [self._serialize_tab(tab) for tab in self.tabs.values()]
@ -725,26 +1155,73 @@ class TerminalApp(Gtk.Application):
def rpc_exec(self, tab_id: str, command: str, newline: bool = True) -> dict[str, Any]:
tab = self._require_tab(tab_id)
payload = command + ("\n" if newline else "")
with tab.log_path.open("a", encoding="utf-8") as handle:
handle.write(f"$ {command}\n")
tab.last_command = command
running_status = self._running_command_status(tab)
if running_status["state"] == "running":
raise RuntimeError(
f"tab_id={tab_id} already has a command in progress; use wait_for_running_command before sending another command"
)
if running_status["state"] == "completed":
self._clear_current_execution(tab)
if running_status["state"] == "interactive-session":
self._clear_current_execution(tab)
if tab.pending_submit_id is not None:
raise RuntimeError(
f"tab_id={tab_id} already has a pending manual submit request; press Enter in the GUI before writing another command"
)
submission_id = str(uuid.uuid4())
requested_at = datetime.now().astimezone().isoformat(timespec="milliseconds")
payload = command
self._dismiss_onboarding(tab)
self._update_tab_title(tab, self._format_tab_title(cwd=tab.cwd, command=command))
tab.terminal.feed_child(list(payload.encode("utf-8")))
return {"tab_id": tab_id, "command": command}
if self.window is not None:
self.window.stack.set_visible_child(tab.container)
self.window.present()
tab.terminal.grab_focus()
tab.pending_submit_id = submission_id
tab.pending_submit_text = command
tab.pending_submit_requested_at = requested_at
tab.terminal.paste_text(payload)
return {
"tab_id": tab_id,
"written_text": command,
"awaiting_manual_submit": True,
"after_sequence": self._latest_command_sequence(tab),
"submission_id": submission_id,
"requested_at": requested_at,
"newline_ignored": newline,
}
def rpc_manual_submit_status(self, tab_id: str, submission_id: str) -> dict[str, Any]:
tab = self._require_tab(tab_id)
return self._manual_submit_status(tab, submission_id)
def rpc_running_command_status(self, tab_id: str) -> dict[str, Any]:
tab = self._require_tab(tab_id)
return self._running_command_status(tab)
def rpc_read_tab(self, tab_id: str, last_n_lines: int = 200) -> dict[str, Any]:
tab = self._require_tab(tab_id)
lines = tab.log_path.read_text(encoding="utf-8").splitlines()
tail_lines = lines[-last_n_lines:]
filtered_lines = [
line
for line in lines
if not line.startswith(f"{COMMAND_START_MARKER}:") and not line.startswith(f"{COMMAND_END_MARKER}:")
]
tail_lines = filtered_lines[-last_n_lines:]
text = self._sanitize_text("\n".join(tail_lines))
return {
"tab_id": tab_id,
"line_count": len(lines),
"line_count": len(filtered_lines),
"text": text,
}
def rpc_read_last_command_result(self, tab_id: str) -> dict[str, Any]:
tab = self._require_tab(tab_id)
latest = self._latest_command_event(tab)
if latest is None:
raise RuntimeError("No tracked command result is available for this tab yet")
return self._extract_command_result(tab, str(latest["token"]))
def rpc_capture_screenshot(
self,
target: str = "window",

391
src/gnome_vte_mcp/server.py

@ -1,19 +1,33 @@
from __future__ import annotations
import base64
import fcntl
import json
import os
import re
import shutil
import subprocess
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Any
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.utilities.types import Image
from .control import ControlError, default_socket_path, request, wait_for_socket
mcp = FastMCP("mcp-hal9002")
COMMAND_START_MARKER = "__MCP_HAL9002_CMD_START__"
COMMAND_END_MARKER = "__MCP_HAL9002_CMD_END__"
ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
OSC_ESCAPE_RE = re.compile(r"\x1b\].*?(?:\x07|\x1b\\)", re.DOTALL)
SINGLE_ESCAPE_RE = re.compile(r"\x1b[@-_]")
_LAUNCH_THREAD_LOCK = threading.Lock()
def _socket_path() -> Path:
@ -21,9 +35,194 @@ def _socket_path() -> Path:
return Path(override) if override else default_socket_path()
def _launch_lock_path(socket_path: Path) -> Path:
return socket_path.with_suffix(".launch.lock")
def _log_dir(socket_path: Path) -> Path:
return socket_path.parent / "mcp-hal9002-logs"
def _tab_log_path(socket_path: Path, tab_id: str) -> Path:
return _log_dir(socket_path) / f"{tab_id}.log"
def _tab_events_path(socket_path: Path, tab_id: str) -> Path:
return _log_dir(socket_path) / f"{tab_id}.commands.tsv"
def _sanitize_text(text: str) -> str:
cleaned = OSC_ESCAPE_RE.sub("", text)
cleaned = ANSI_ESCAPE_RE.sub("", cleaned)
cleaned = SINGLE_ESCAPE_RE.sub("", cleaned)
return cleaned.replace("\r", "")
def _parse_epoch(value: str) -> float:
return float(value.replace(",", "."))
def _parse_command_events(socket_path: Path, tab_id: str) -> list[dict[str, Any]]:
path = _tab_events_path(socket_path, tab_id)
if not path.exists():
return []
events: list[dict[str, Any]] = []
for raw_line in path.read_text(encoding="utf-8").splitlines():
if not raw_line.strip():
continue
parts = raw_line.split("\t")
if len(parts) != 8:
continue
try:
started_epoch = _parse_epoch(parts[3])
finished_epoch = _parse_epoch(parts[4])
events.append(
{
"sequence": int(parts[0]),
"token": parts[1],
"exit_code": int(parts[2]),
"started_epoch": started_epoch,
"finished_epoch": finished_epoch,
"started_at": datetime.fromtimestamp(started_epoch).astimezone().isoformat(timespec="seconds"),
"finished_at": datetime.fromtimestamp(finished_epoch).astimezone().isoformat(timespec="seconds"),
"duration_seconds": round(max(finished_epoch - started_epoch, 0.0), 6),
"cwd": base64.b64decode(parts[5]).decode("utf-8"),
"cwd_after": base64.b64decode(parts[6]).decode("utf-8"),
"command": base64.b64decode(parts[7]).decode("utf-8"),
}
)
except Exception:
continue
return events
def _latest_command_event(socket_path: Path, tab_id: str) -> dict[str, Any] | None:
events = _parse_command_events(socket_path, tab_id)
return events[-1] if events else None
def _extract_command_text(socket_path: Path, tab_id: str, token: str) -> str:
path = _tab_log_path(socket_path, tab_id)
if not path.exists():
raise RuntimeError(f"No transcript log exists yet for tab_id: {tab_id}")
lines = _sanitize_text(path.read_text(encoding="utf-8")).splitlines()
start_marker = f"{COMMAND_START_MARKER}:{token}"
end_prefix = f"{COMMAND_END_MARKER}:{token}:"
start_index: int | None = None
end_index: int | None = None
for index, line in enumerate(lines):
if line == start_marker:
start_index = index
end_index = None
continue
if start_index is not None and line.startswith(end_prefix):
end_index = index
if start_index is None:
raise RuntimeError("No tracked command output is available for this tab yet")
if end_index is None:
raise RuntimeError("The tracked command has not finished yet")
return "\n".join(lines[start_index + 1:end_index]).strip("\n")
def _build_command_result(socket_path: Path, tab_id: str, event: dict[str, Any]) -> dict[str, Any]:
return {
"tab_id": tab_id,
"sequence": event["sequence"],
"command": event["command"],
"cwd": event["cwd"],
"cwd_after": event["cwd_after"],
"started_at": event["started_at"],
"finished_at": event["finished_at"],
"started_epoch": event["started_epoch"],
"finished_epoch": event["finished_epoch"],
"duration_seconds": event["duration_seconds"],
"exit_code": event["exit_code"],
"text": _extract_command_text(socket_path, tab_id, str(event["token"])),
}
def _wait_for_manual_submit(
tab_id: str,
submission_id: str,
*,
poll_interval: float = 0.1,
) -> dict[str, Any]:
socket_path = ensure_gui()
while True:
status = request("manual_submit_status", params={"tab_id": tab_id, "submission_id": submission_id}, socket_path=socket_path)
if bool(status.get("submitted")):
return status
time.sleep(poll_interval)
def _running_command_status(tab_id: str) -> dict[str, Any]:
return call_gui("running_command_status", tab_id=tab_id)
def _package_root() -> Path:
return Path(__file__).resolve().parents[1]
def _python_supports_gui(executable: str) -> bool:
try:
result = subprocess.run( # noqa: S603
[
executable,
"-c",
"import cairo, gi; gi.require_version('Gtk', '4.0'); gi.require_version('Vte', '3.91')",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
check=False,
)
except (OSError, subprocess.SubprocessError):
return False
return result.returncode == 0
def _gui_python_executable() -> str:
base_prefix = Path(sys.base_prefix)
candidates = [
sys.executable,
shutil.which("python3"),
shutil.which("python"),
os.fspath(base_prefix / "bin" / "python3"),
os.fspath(base_prefix / "bin" / "python"),
"/usr/bin/python3",
"/usr/bin/python",
]
seen: set[str] = set()
for candidate in candidates:
if not candidate or candidate in seen:
continue
seen.add(candidate)
if _python_supports_gui(candidate):
return candidate
raise RuntimeError(
"Could not find a Python interpreter with GTK/VTE bindings available. Install PyGObject/VTE for a system Python or run the server in an environment where gi and cairo are importable."
)
def _is_gui_running(socket_path: Path, timeout: float = 0.5) -> bool:
try:
request("ping", socket_path=socket_path, timeout=timeout)
except (OSError, ControlError):
return False
return True
def _launch_gui(socket_path: Path) -> None:
python_executable = _gui_python_executable()
env = os.environ.copy()
command = [sys.executable, "-m", "gnome_vte_mcp.gui", "--socket", os.fspath(socket_path)]
package_root = os.fspath(_package_root())
existing_python_path = env.get("PYTHONPATH")
env["PYTHONPATH"] = package_root if not existing_python_path else os.pathsep.join([package_root, existing_python_path])
command = [python_executable, "-m", "gnome_vte_mcp.gui", "--socket", os.fspath(socket_path)]
subprocess.Popen( # noqa: S603
command,
env=env,
@ -35,25 +234,69 @@ def _launch_gui(socket_path: Path) -> None:
def ensure_gui() -> Path:
socket_path = _socket_path()
try:
request("ping", socket_path=socket_path, timeout=0.5)
if _is_gui_running(socket_path):
return socket_path
except (OSError, ControlError):
pass
_launch_gui(socket_path)
if not wait_for_socket(socket_path=socket_path, timeout=10.0):
raise RuntimeError(
"The GTK/VTE GUI did not start in time. Ensure the MCP server runs inside a graphical session with DISPLAY or WAYLAND_DISPLAY available."
)
lock_path = _launch_lock_path(socket_path)
lock_path.parent.mkdir(parents=True, exist_ok=True)
with _LAUNCH_THREAD_LOCK:
with lock_path.open("a+", encoding="utf-8") as lock_handle:
fcntl.flock(lock_handle.fileno(), fcntl.LOCK_EX)
if _is_gui_running(socket_path):
return socket_path
_launch_gui(socket_path)
if not wait_for_socket(socket_path=socket_path, timeout=10.0):
raise RuntimeError(
"The GTK/VTE GUI did not start in time. Ensure the MCP server runs inside a graphical session with DISPLAY or WAYLAND_DISPLAY available."
)
return socket_path
def call_gui(method: str, **params: Any) -> Any:
socket_path = ensure_gui()
def call_gui(method: str, *, ensure_running: bool = True, **params: Any) -> Any:
socket_path = ensure_gui() if ensure_running else _socket_path()
return request(method, params=params, socket_path=socket_path)
@mcp.tool()
def gui_status() -> dict[str, Any]:
"""Return GUI lifecycle state without starting a new window."""
socket_path = _socket_path()
if not _is_gui_running(socket_path):
return {
"running": False,
"socket_path": os.fspath(socket_path),
}
result = call_gui("gui_status", ensure_running=False)
result["running"] = True
result["socket_path"] = os.fspath(socket_path)
return result
@mcp.tool()
def open_gui() -> dict[str, Any]:
"""Start the GUI if needed and present the shared window."""
ensure_gui()
return call_gui("show_window")
@mcp.tool()
def close_gui() -> dict[str, Any]:
"""Close the shared GUI window if it is currently running."""
socket_path = _socket_path()
if not _is_gui_running(socket_path):
return {
"closed": False,
"running": False,
"socket_path": os.fspath(socket_path),
}
result = call_gui("close_window", ensure_running=False)
result["socket_path"] = os.fspath(socket_path)
return result
@mcp.tool()
def open_tab(title: str | None = None, cwd: str | None = None, command: str | None = None) -> dict[str, Any]:
"""Open a new terminal tab in the GTK/VTE prototype window."""
@ -73,9 +316,34 @@ def focus_tab(tab_id: str) -> dict[str, Any]:
@mcp.tool()
def exec_command(tab_id: str, command: str, newline: bool = True) -> dict[str, Any]:
"""Send a command to an existing tab's shell process."""
return call_gui("exec", tab_id=tab_id, command=command, newline=newline)
def exec_command(
tab_id: str,
command: str,
newline: bool = True,
poll_interval: float = 0.1,
) -> dict[str, Any]:
"""Write command text into an existing tab and block until the user manually presses Enter in the GUI.
The tool blocks indefinitely until manual submission is detected, not after command completion.
Use wait_for_command_result() to wait for the command to finish and collect output.
"""
result = call_gui("exec", tab_id=tab_id, command=command, newline=newline)
submitted = _wait_for_manual_submit(
tab_id,
str(result["submission_id"]),
poll_interval=poll_interval,
)
return {
"tab_id": tab_id,
"written_text": result["written_text"],
"submission_id": result["submission_id"],
"requested_at": result["requested_at"],
"submitted_at": submitted["submitted_at"],
"submitted_manually": True,
"after_sequence": result["after_sequence"],
"current_sequence": submitted["current_sequence"],
"newline_ignored": result["newline_ignored"],
}
@mcp.tool()
@ -84,6 +352,76 @@ def read_tab(tab_id: str, last_n_lines: int = 200) -> dict[str, Any]:
return call_gui("read_tab", tab_id=tab_id, last_n_lines=last_n_lines)
@mcp.tool()
def read_last_command_result(tab_id: str) -> dict[str, Any]:
"""Read the last completed command result for a tab, including cwd and execution timestamps."""
socket_path = ensure_gui()
event = _latest_command_event(socket_path, tab_id)
if event is None:
raise RuntimeError("No tracked command result is available for this tab yet")
return _build_command_result(socket_path, tab_id, event)
@mcp.tool()
def wait_for_command_result(tab_id: str, after_sequence: int | None = None, timeout: float | None = None, poll_interval: float = 0.1) -> dict[str, Any]:
"""Block until the next completed command is observed in a tab and return its tracked result.
When `timeout` is omitted or set to a non-positive value, wait indefinitely.
"""
socket_path = ensure_gui()
baseline = after_sequence
if baseline is None:
latest = _latest_command_event(socket_path, tab_id)
baseline = int(latest["sequence"]) if latest is not None else 0
deadline = None if timeout is None or timeout <= 0 else time.monotonic() + timeout
while deadline is None or time.monotonic() < deadline:
latest = _latest_command_event(socket_path, tab_id)
if latest is not None and int(latest["sequence"]) > baseline:
return _build_command_result(socket_path, tab_id, latest)
time.sleep(poll_interval)
raise RuntimeError(f"Timed out waiting for a completed command on tab_id={tab_id} after sequence {baseline}")
@mcp.tool()
def wait_for_running_command(tab_id: str, timeout: float | None = None, poll_interval: float = 0.1) -> dict[str, Any]:
"""Block until the command currently executing in a tab finishes and return its tracked result.
Use this when a command has already been manually submitted and the terminal is still busy.
When `timeout` is omitted or set to a non-positive value, wait indefinitely.
"""
socket_path = ensure_gui()
deadline = None if timeout is None or timeout <= 0 else time.monotonic() + timeout
saw_running = False
while deadline is None or time.monotonic() < deadline:
status = request("running_command_status", params={"tab_id": tab_id}, socket_path=socket_path)
state = str(status.get("state"))
if state == "completed":
event = _latest_command_event(socket_path, tab_id)
if event is None:
raise RuntimeError(f"Command completion was reported for tab_id={tab_id}, but no tracked result is available yet")
return _build_command_result(socket_path, tab_id, event)
if state == "running":
saw_running = True
time.sleep(poll_interval)
continue
if state == "interactive-session":
raise RuntimeError(
f"tab_id={tab_id} is inside an interactive delegated session ({status.get('command')}); tracked command completion is unavailable until that session exits"
)
if state == "idle":
if saw_running:
event = _latest_command_event(socket_path, tab_id)
if event is not None:
return _build_command_result(socket_path, tab_id, event)
raise RuntimeError(f"No tracked command is currently running on tab_id={tab_id}")
raise RuntimeError(f"Unexpected running command state for tab_id={tab_id}: {state}")
raise RuntimeError(f"Timed out waiting for the running command on tab_id={tab_id} to finish")
@mcp.tool()
def close_tab(tab_id: str) -> dict[str, Any]:
"""Close a tab in the prototype GUI."""
@ -96,8 +434,8 @@ def capture_screenshot(
tab_id: str | None = None,
path: str | None = None,
diagnostic_overlay: bool = False,
) -> list[Any]:
"""Capture a PNG screenshot of the full window, the VTE content of a tab, or a tab container and return it as MCP image content."""
) -> dict[str, Any]:
"""Capture a PNG screenshot of the full window, the VTE content of a tab, or a tab container and return JSON-serializable metadata."""
result = call_gui(
"capture_screenshot",
target=target,
@ -125,11 +463,18 @@ def capture_screenshot(
f"alloc={bounds['x']},{bounds['y']} {bounds['width']}x{bounds['height']}"
)
return [
" | ".join(summary_parts),
json.dumps(result["metadata"], indent=2),
Image(path=result["path"]),
]
response = {
"summary": " | ".join(summary_parts),
"path": result["path"],
"metadata_path": result["metadata_path"],
"target": result["target"],
"width": result["width"],
"height": result["height"],
"metadata": result["metadata"],
}
if "tab_id" in result:
response["tab_id"] = result["tab_id"]
return response
def main() -> None:

Loading…
Cancel
Save