Browse Source

Initial import

main
Matteo Benedetto 9 hours ago
commit
1d89c7fdc7
  1. 104
      .github/copilot-instructions.md
  2. 9
      .gitignore
  3. 21
      LICENSE
  4. 144
      README.md
  5. 61
      docs/development-status.md
  6. 14
      environment.yml
  7. 38
      pyproject.toml
  8. 2
      requirements.txt
  9. 1
      src/r36s_dlna_browser/__init__.py
  10. 22
      src/r36s_dlna_browser/__main__.py
  11. 173
      src/r36s_dlna_browser/app.py
  12. BIN
      src/r36s_dlna_browser/assets/NotoSans-Regular.ttf
  13. BIN
      src/r36s_dlna_browser/assets/icons/audio.png
  14. 8
      src/r36s_dlna_browser/assets/icons/audio.svg
  15. BIN
      src/r36s_dlna_browser/assets/icons/folder.png
  16. 8
      src/r36s_dlna_browser/assets/icons/folder.svg
  17. BIN
      src/r36s_dlna_browser/assets/icons/hud-display.png
  18. 5
      src/r36s_dlna_browser/assets/icons/hud-display.svg
  19. BIN
      src/r36s_dlna_browser/assets/icons/hud-pause.png
  20. 4
      src/r36s_dlna_browser/assets/icons/hud-pause.svg
  21. BIN
      src/r36s_dlna_browser/assets/icons/hud-play.png
  22. 3
      src/r36s_dlna_browser/assets/icons/hud-play.svg
  23. BIN
      src/r36s_dlna_browser/assets/icons/hud-seek.png
  24. 5
      src/r36s_dlna_browser/assets/icons/hud-seek.svg
  25. BIN
      src/r36s_dlna_browser/assets/icons/hud-stop.png
  26. 3
      src/r36s_dlna_browser/assets/icons/hud-stop.svg
  27. BIN
      src/r36s_dlna_browser/assets/icons/hud-volume.png
  28. 5
      src/r36s_dlna_browser/assets/icons/hud-volume.svg
  29. BIN
      src/r36s_dlna_browser/assets/icons/image.png
  30. 10
      src/r36s_dlna_browser/assets/icons/image.svg
  31. BIN
      src/r36s_dlna_browser/assets/icons/playing.png
  32. 6
      src/r36s_dlna_browser/assets/icons/playing.svg
  33. 100
      src/r36s_dlna_browser/assets/icons/review.html
  34. BIN
      src/r36s_dlna_browser/assets/icons/server.png
  35. 12
      src/r36s_dlna_browser/assets/icons/server.svg
  36. BIN
      src/r36s_dlna_browser/assets/icons/video.png
  37. 6
      src/r36s_dlna_browser/assets/icons/video.svg
  38. 0
      src/r36s_dlna_browser/dlna/__init__.py
  39. 150
      src/r36s_dlna_browser/dlna/browser_state.py
  40. 221
      src/r36s_dlna_browser/dlna/client.py
  41. 105
      src/r36s_dlna_browser/dlna/discovery.py
  42. 105
      src/r36s_dlna_browser/dlna/models.py
  43. 0
      src/r36s_dlna_browser/platform/__init__.py
  44. 62
      src/r36s_dlna_browser/platform/controls.py
  45. 32
      src/r36s_dlna_browser/platform/runtime.py
  46. 0
      src/r36s_dlna_browser/player/__init__.py
  47. 46
      src/r36s_dlna_browser/player/backend.py
  48. 440
      src/r36s_dlna_browser/player/gstreamer_backend.py
  49. 0
      src/r36s_dlna_browser/ui/__init__.py
  50. 301
      src/r36s_dlna_browser/ui/screens.py
  51. 463
      src/r36s_dlna_browser/ui/sdl_app.py
  52. 80
      src/r36s_dlna_browser/ui/theme.py
  53. 131
      tests/test_client_parser.py
  54. 79
      tests/test_controls.py
  55. 120
      tests/test_didl_mapping.py
  56. 134
      tests/test_navigation_state.py
  57. 357
      tests/test_player.py
  58. 40
      tests/test_runtime.py
  59. 122
      tests/test_sdl_app.py

104
.github/copilot-instructions.md

@ -0,0 +1,104 @@
# Copilot Instructions
## Product Goal
Build a balanced, production-oriented first milestone for an R36S DLNA browser with Wi-Fi support.
The application must:
- run on R36S-class Linux firmware derived from ArkOS
- use Python for the application logic
- use SDL2 for the user interface and input handling
- discover DLNA/UPnP media servers on the local LAN
- allow hierarchical browsing of remote media content
- support local playback on the device
- stay lightweight enough for RK3326-class hardware
## Balanced Development Plan
Copilot should follow this plan unless the user explicitly changes priorities.
### Phase 1 - Project Bootstrap
- Create a clean Python project layout with a package entrypoint.
- Keep dependencies minimal and explicit.
- Prefer simple run scripts and documentation that also make sense on-device.
### Phase 2 - DLNA Discovery And Browsing
- Implement SSDP discovery for media servers in the LAN.
- Filter and normalize DLNA MediaServer devices.
- Build a ContentDirectory client for browsing containers and media items.
- Support pagination, caching, and safe handling of incomplete metadata.
### Phase 3 - SDL2 User Interface
- Design the UI for 640x480 with readable typography and low visual overhead.
- Optimize navigation for D-pad controls with A/B and shoulder buttons.
- Keep the UI responsive while network operations run asynchronously.
### Phase 4 - Milestone 2: Integrated GStreamer Playback
- Implement playback through GStreamer inside the application process instead of launching an external player subprocess.
- Prefer the system GStreamer stack via `PyGObject` and `GstPlayBin` rather than custom decode loops.
- Prefer decoding video to `GstAppSink` and rendering frames through SDL2 so playback does not depend on a window manager, native overlay sinks, or X11 embedding.
- Support play, pause/resume, stop, relative seek, automatic return to browser on end-of-stream, and explicit error reporting when decode or frame upload fails.
- Keep the architecture ready for follow-up work on on-screen playback HUD, resume state, and device-specific video backend tuning.
### Phase 5 - Milestone 3: SDL Video Viewport, HUD, and Wayland Compatibility
- Reserve an explicit video viewport inside the SDL window and scale decoded video to that viewport instead of using the full window surface.
- Render a full playback HUD in SDL around the video area, including title, state, elapsed/duration, progress, volume, cache/buffer hints, resolution, and visible controls.
- Make the playback HUD controllable with the same device inputs used elsewhere in the app, at minimum covering pause/resume, stop, seek, and volume.
- Prefer SDL backends that work without a window manager on target hardware, especially `kmsdrm` on R36S-class devices.
- If the chosen frame upload path is too slow on target hardware, optimize within the SDL-rendered path rather than falling back to external overlay windows.
### Phase 6 - Device Integration And Hardening
- Handle Wi-Fi instability, timeouts, malformed XML, missing thumbnails, and unsupported media.
- Keep memory use and redraw frequency conservative.
- Document firmware assumptions, runtime prerequisites, and launch steps.
### Phase 7 - Validation
- Add focused tests for parsing, navigation state, and non-UI logic.
- Verify manually on a real LAN and, when possible, on the target device.
- Prefer incremental, testable milestones over broad unfinished features.
## State Tracking Requirements
Copilot must treat development state as a living artifact, not as implicit context.
- Track the current milestone, completed work, in-progress work, blockers, and next steps.
- Keep a repository status file updated during development at `docs/development-status.md`.
- Update that status file whenever a meaningful implementation step is completed, re-scoped, or blocked.
- Record only verified state, not guesses.
- Keep status entries concise, factual, and ordered by relevance to current work.
- If the plan changes, update both the active implementation and the tracked status.
The status file should normally include:
- current milestone
- current architecture decisions
- completed tasks
- tasks in progress
- blockers or open questions
- next recommended actions
## Implementation Preferences
- Favor simple, explicit modules over premature abstraction.
- Separate SDL2 UI concerns from networking and playback concerns.
- Keep async/network code isolated behind small adapters.
- Avoid features that increase complexity before the core browsing flow is stable.
- Prefer integrated GStreamer playback over subprocess control for Milestone 2.
- Prefer SDL-rendered playback from decoded frames over platform-specific window embedding on R36S-class targets.
- For Milestone 3, keep video and HUD in the same SDL render pass so the design remains compatible with DRM/KMS-only environments.
## Quality Bar
- Fix root causes when practical.
- Avoid unrelated refactors.
- Preserve performance on low-end ARM hardware.
- Make every milestone runnable, not just partially scaffolded.
- Treat documentation and development-state tracking as part of the deliverable.

9
.gitignore vendored

@ -0,0 +1,9 @@
__pycache__/
*.py[cod]
.pytest_cache/
.venv/
.mypy_cache/
.ruff_cache/
build/
dist/
*.egg-info/

21
LICENSE

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Matteo Benedetto
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

144
README.md

@ -0,0 +1,144 @@
# R36S DLNA Browser
A lightweight DLNA media browser for the R36S handheld, built with Python, SDL2, and GStreamer.
Author: Matteo Benedetto
## Features
- Discover DLNA/UPnP media servers on local LAN via SSDP
- Browse media libraries hierarchically with D-pad navigation
- Play audio and video locally via integrated GStreamer playback inside the SDL window
- Optimised for 640×480 screen and low-power RK3326 hardware
## System Prerequisites
Install these system packages before running (Ubuntu/Debian-based):
```bash
sudo apt install libsdl2-2.0-0 libsdl2-ttf-2.0-0 python3 python3-pip python3-gi gir1.2-gstreamer-1.0 gstreamer1.0-tools gstreamer1.0-plugins-base gstreamer1.0-plugins-good
```
On Fedora the equivalent runtime packages are:
```bash
sudo dnf install SDL2_ttf python3-gobject gstreamer1 gstreamer1-plugins-base gstreamer1-plugins-good python3 python3-pip
```
The current playback path is SDL-only: GStreamer decodes into frames and the app
renders them inside the SDL renderer. This avoids any dependency on X11,
Wayland embedding, or a window manager and is aligned with DRM/KMS-only targets.
On ArkOS-derived firmware the SDL2 libraries are typically pre-installed.
A TrueType font is bundled inside the package at `src/r36s_dlna_browser/assets/`
so packaged builds do not depend on a system font path. System font fallbacks are
still kept for development machines.
## Quick Start
```bash
cd /path/to/R36SHack
python3 -m pip install -e ".[dev]"
python3 -m r36s_dlna_browser
```
## Miniforge / Conda Deploy
Use Miniforge or Miniconda when you want a reproducible Linux development or
packaging environment with GStreamer and SDL dependencies managed together.
```bash
cd /path/to/R36SHack
conda env create -f environment.yml
conda activate r36s-dlna-browser
python -m r36s_dlna_browser
```
To update an existing environment after dependency changes:
```bash
conda env update -f environment.yml --prune
conda activate r36s-dlna-browser
```
Or without installing:
```bash
cd /path/to/R36SHack
pip install -r requirements.txt
PYTHONPATH=src python3 -m r36s_dlna_browser
```
## Controls
| Key / Button | Action |
|-------------|----------------------|
| Up / Down | Navigate items |
| Left / Right | Page up / down |
| A (Return) | Select / Enter |
| B (Escape) | Back |
| Start | Quit |
During playback, the same controls are remapped to transport actions:
- `A`: pause / resume
- `B`: stop and return to browser
- `Left / Right`: seek `-10s / +10s`
- `Up / Down`: volume `+5 / -5`
- `H` / `Y`: cycle HUD mode (`auto / fixed / hidden`)
## On-Device (R36S / ArkOS)
1. Copy the project to `/roms/ports/` or another writable location.
2. Ensure Wi-Fi is connected.
3. Run via PortMaster or a launch script:
```bash
cd /roms/ports/R36SHack
PYTHONPATH=src python3 -m r36s_dlna_browser
```
## Git / Release Workflow
For publication on a Gitea instance with `tea`, initialize git locally, create a
remote repository, then push the current branch.
```bash
git init
git add .
git commit -m "Initial import"
tea repos create --name R36SHack --description "SDL2/GStreamer DLNA browser for R36S" --owner enne2
git remote add origin git@git.enne2.net:enne2/R36SHack.git
git push -u origin main
```
## Project Structure
```
src/r36s_dlna_browser/
├── __main__.py # Entrypoint
├── app.py # Application lifecycle
├── assets/
│ └── NotoSans-Regular.ttf
├── dlna/
│ ├── discovery.py # SSDP media-server discovery
│ ├── client.py # ContentDirectory browsing via DmsDevice
│ ├── models.py # Domain models for servers/items
│ └── browser_state.py # Navigation stack and cache
├── ui/
│ ├── sdl_app.py # SDL2 window, event loop, rendering
│ ├── screens.py # Screen definitions
│ └── theme.py # Layout constants for 640×480
├── player/
│ ├── backend.py # Player interface
│ └── gstreamer_backend.py # integrated GStreamer playback backend
└── platform/
├── controls.py # Input mapping
└── runtime.py # Runtime detection and logging
```
## License
MIT

61
docs/development-status.md

@ -0,0 +1,61 @@
# Development Status
## Current Milestone
Milestone 3 — SDL Video Viewport, HUD, and Wayland Compatibility
## Current Architecture Decisions
- **Language / UI**: Python 3.9+ with PySDL2 (ctypes wrapper around system SDL2)
- **DLNA Discovery**: Custom SSDP M-SEARCH implementation using asyncio datagrams + aiohttp for device description XML
- **Content Browsing**: Direct SOAP/XML ContentDirectory client with DIDL-Lite parser (no dependency on async-upnp-client browsing at runtime — only aiohttp)
- **Playback**: integrated GStreamer backend via `PyGObject` / `GstPlayBin`, decoding video into `GstAppSink` frames that are uploaded to SDL textures and rendered in the main SDL renderer
- **Playback viewport**: SDL scales decoded video into a dedicated playback viewport in the same render pass as the HUD, with full-width video bounds and a black playback backdrop outside the viewport
- **Concurrency**: Dedicated asyncio event loop in a daemon thread; thread-safe queues bridge it to the SDL2 main loop
- **Input**: Keyboard mapping for desktop testing + SDL2 GameController for R36S D-pad/buttons
- **Font**: Bundled package font preferred first; system font fallback kept only for development hosts
- **UI icons**: prefer bundled monochrome glyphs or a bundled icon font subset instead of depending on OS emoji fonts
- **Playback HUD**: SDL-rendered overlay is now simplified and compact, uses bundled playback icons, uses a smaller dedicated playback font with title ellipsis for 640x480 readability, supports auto-hide or fixed visibility, stays visible while playback is paused, and remains in the same SDL render pass as video
- **Wayland / DRM strategy**: playback no longer depends on native overlay sinks or X11 window handles; R36S-class targets continue to prefer `kmsdrm` when no display server is present
- **Deploy packaging**: a `conda-forge`-oriented `environment.yml` now defines a reproducible Miniforge/Miniconda environment for local development and release preparation
## Completed Tasks
- Phase 1: Project bootstrap (`pyproject.toml`, `requirements.txt`, `README.md`, package layout under `src/`)
- Phase 2: DLNA discovery (`dlna/discovery.py` — SSDP M-SEARCH, friendly-name fetch) and browsing (`dlna/client.py` — SOAP Browse, DIDL-Lite parser with relative-URL resolution + `dlna/models.py` domain models + `dlna/browser_state.py` navigation stack/cache)
- Phase 3: SDL2 UI (`ui/sdl_app.py` — window, event loop, input dispatch; `ui/screens.py` — server list, browse list, playback, error screens; `ui/theme.py` — 640×480 layout constants)
- Phase 4: Playback (`player/backend.py` abstract interface + `player/gstreamer_backend.py` integrated GStreamer backend)
- Phase 5: Device integration (`platform/controls.py` — keyboard + gamecontroller mapping; `platform/runtime.py` — logging, R36S heuristic, SDL env hints)
- Phase 7: Tests — 75 tests across 7 test files all passing (DIDL mapping, SOAP/XML parser, navigation state, playback backend, SDL redraw policy, input controls, runtime environment setup)
- Desktop runtime verification completed: fixed SSDP discovery socket setup for IPv4 and removed pending-task shutdown noise from the async worker thread
- Packaging hardening: bundled a local UI font asset and configured setuptools to ship it with the package
- Real LAN regression fixed: Browse SOAP parser now handles `Result` elements both with and without a namespace, matching responses from the discovered MiniDLNA/Jellyfin servers
- Playback backend pivoted to GStreamer because libmpv continued to create a separate native window on the desktop host instead of remaining embedded in the SDL UI
- Milestone 2 is now implemented with GStreamer: playback uses `GstPlayBin` plus `GstAppSink` instead of an external player, libmpv, or native overlay sinks
- SDL playback flow updated: decoded GStreamer frames are uploaded into SDL textures, playback end-of-stream returns automatically to the browser, and playback controls support pause/resume, relative seek, and volume
- Milestone 3 implemented in code: SDL scales video into a dedicated viewport inside the SDL window, with reserved HUD margins instead of using the whole window area for video
- Playback HUD expanded: progress bar, elapsed/duration, volume, buffer, resolution, and control legends are rendered around the video area and updated from GStreamer bus/pipeline queries
- Playback-page flashing root cause addressed by removing native overlay composition entirely: video and HUD are now rendered together by SDL in one pass, with redraws driven by decoded frame availability and HUD state changes
- Playback HUD simplified: the border around the video area was removed, playback control/status icons were added as bundled SVG+PNG assets, the title/timer top bar no longer overlaps, and playback now supports `auto / fixed / hidden` HUD modes through a dedicated command while staying visible when paused
- Deployment assets added: `.gitignore`, `environment.yml`, and a real `LICENSE` file so the project can be initialized and published as a clean git repository
- Copilot instructions and this status file
## Tasks In Progress
- Verify that the SDL-texture playback path is smooth enough on real host playback and on R36S hardware
- Measure whether BGRA frame upload is acceptable on RK3326 or whether a future YUV texture path is needed
## Blockers Or Open Questions
- `SDL2_ttf` system library needed for text rendering (`sudo dnf install SDL2_ttf` on Fedora, `sudo apt install libsdl2-ttf-2.0-0` on Debian/Ubuntu). The app handles its absence gracefully but will show no text.
- Integrated playback requires system GStreamer plus Python GI bindings (for Fedora: `python3-gobject gstreamer1 gstreamer1-plugins-base gstreamer1-plugins-good`; add codec/plugin packages as needed for target media).
- Root browse verified against two real DLNA servers on the LAN.
- On-device testing on R36S hardware is pending.
- The current SDL-texture path avoids window-manager dependencies but may still need optimization on low-end hardware if BGRA upload cost is too high.
## Next Recommended Actions
1. Run a visual playback smoke test and confirm SDL-rendered video plus HUD eliminates flashing on host playback.
2. Validate the SDL-texture playback path on the target R36S `kmsdrm` backend.
3. Measure CPU/load on RK3326 hardware during audio and video playback.
4. If RGBA upload cost is too high, add a follow-up YUV texture upload path using `SDL_UpdateYUVTexture`.

14
environment.yml

@ -0,0 +1,14 @@
name: r36s-dlna-browser
channels:
- conda-forge
dependencies:
- python=3.11
- pip
- pygobject
- gstreamer
- gst-plugins-base
- gst-plugins-good
- sdl2
- sdl2_ttf
- pip:
- -e .[dev]

38
pyproject.toml

@ -0,0 +1,38 @@
[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "r36s-dlna-browser"
version = "0.1.0"
description = "DLNA media browser for R36S handheld (ArkOS-derived Linux)"
readme = "README.md"
requires-python = ">=3.9"
license = "MIT"
authors = [
{name = "Matteo Benedetto"},
]
dependencies = [
"PySDL2>=0.9.16",
"async-upnp-client>=0.38.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-asyncio>=0.21",
]
[project.scripts]
r36s-dlna = "r36s_dlna_browser.__main__:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
"r36s_dlna_browser" = ["assets/*.ttf", "assets/icons/*.png"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

2
requirements.txt

@ -0,0 +1,2 @@
PySDL2>=0.9.16
async-upnp-client>=0.38.0

1
src/r36s_dlna_browser/__init__.py

@ -0,0 +1 @@
"""R36S DLNA Browser – lightweight DLNA media browser for R36S handhelds."""

22
src/r36s_dlna_browser/__main__.py

@ -0,0 +1,22 @@
"""Entrypoint for ``python -m r36s_dlna_browser``."""
from __future__ import annotations
import sys
def main() -> None:
from r36s_dlna_browser.app import Application
app = Application()
try:
app.run()
except KeyboardInterrupt:
pass
finally:
app.shutdown()
sys.exit(0)
if __name__ == "__main__":
main()

173
src/r36s_dlna_browser/app.py

@ -0,0 +1,173 @@
"""Application lifecycle – wires together UI, DLNA networking and playback."""
from __future__ import annotations
import asyncio
import logging
import queue
import threading
from typing import Any
from r36s_dlna_browser.dlna.browser_state import BrowserState
from r36s_dlna_browser.dlna.discovery import DLNADiscovery
from r36s_dlna_browser.dlna.client import DLNAClient
from r36s_dlna_browser.player.gstreamer_backend import GStreamerBackend
from r36s_dlna_browser.platform.runtime import configure_logging, sdl_env_setup
from r36s_dlna_browser.ui.sdl_app import SDLApp
log = logging.getLogger(__name__)
class Application:
"""Top-level application that owns the event loop, UI and networking."""
def __init__(self) -> None:
configure_logging()
sdl_env_setup()
self.ui_queue: queue.Queue[Any] = queue.Queue()
self.cmd_queue: queue.Queue[Any] = queue.Queue()
self.browser_state = BrowserState()
self.discovery = DLNADiscovery()
self.client = DLNAClient()
self.player = GStreamerBackend()
self.player.set_event_callback(self._handle_player_event)
self._async_loop: asyncio.AbstractEventLoop | None = None
self._net_thread: threading.Thread | None = None
self._shutting_down = False
# ── public API ───────────────────────────────────────────────
def run(self) -> None:
"""Start the networking thread and enter the SDL2 main loop."""
self._start_network_thread()
self.browser_state.loading = True
self._schedule(("discover",))
sdl = SDLApp(
ui_queue=self.ui_queue,
cmd_queue=self.cmd_queue,
browser_state=self.browser_state,
player=self.player,
)
sdl.run() # blocks until quit
def shutdown(self) -> None:
self._shutting_down = True
self._schedule(("shutdown",))
if self._net_thread and self._net_thread.is_alive():
self._net_thread.join(timeout=2)
self.player.shutdown()
log.info("Application shut down.")
# ── networking thread ────────────────────────────────────────
def _start_network_thread(self) -> None:
self._net_thread = threading.Thread(
target=self._run_async_loop, daemon=True, name="dlna-net"
)
self._net_thread.start()
def _run_async_loop(self) -> None:
self._async_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._async_loop)
try:
self._async_loop.run_until_complete(self._command_dispatcher())
finally:
pending = [
task
for task in asyncio.all_tasks(self._async_loop)
if not task.done()
]
for task in pending:
task.cancel()
if pending:
self._async_loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True)
)
self._async_loop.close()
async def _command_dispatcher(self) -> None:
"""Poll cmd_queue for commands from the UI thread and dispatch them."""
loop = asyncio.get_running_loop()
while not self._shutting_down:
try:
cmd = await loop.run_in_executor(None, self.cmd_queue.get, True, 0.1)
except queue.Empty:
continue
try:
await self._handle_command(cmd)
except Exception as exc:
log.exception("Error handling command %s", cmd)
self.browser_state.loading = False
self.ui_queue.put(("error", str(exc)))
async def _handle_command(self, cmd: tuple) -> None:
action = cmd[0]
if action == "discover":
servers = await self.discovery.find_servers()
self.browser_state.set_servers(servers)
self.browser_state.loading = False
self.ui_queue.put(("servers_updated",))
elif action == "browse":
server_location = cmd[1]
object_id = cmd[2]
items = await self.client.browse(server_location, object_id)
self.browser_state.set_items(items, object_id)
self.browser_state.loading = False
self.ui_queue.put(("items_updated",))
elif action == "play":
url = cmd[1]
title = cmd[2] if len(cmd) > 2 else ""
self.player.play(url)
self.browser_state.playback_paused = False
self.ui_queue.put(("playback_started", title))
elif action == "stop":
self.player.stop()
elif action == "toggle_pause":
paused = self.player.toggle_pause()
self.browser_state.playback_paused = paused
self.ui_queue.put(("playback_paused" if paused else "playback_resumed",))
elif action == "seek":
self.player.seek(cmd[1])
elif action == "volume":
volume = self.player.change_volume(cmd[1])
self.browser_state.playback_volume = volume
elif action == "shutdown":
self._shutting_down = True
self.player.shutdown()
# ── helpers ──────────────────────────────────────────────────
def _schedule(self, cmd: tuple) -> None:
self.cmd_queue.put(cmd)
def _handle_player_event(self, event: str, *args: Any) -> None:
if event == "stopped":
self.browser_state.playback_paused = False
self.ui_queue.put(("playback_stopped",))
elif event == "position" and args:
self.browser_state.playback_position = float(args[0])
elif event == "duration" and args:
self.browser_state.playback_duration = float(args[0])
elif event == "paused" and args:
self.browser_state.playback_paused = bool(args[0])
elif event == "volume" and args:
self.browser_state.playback_volume = int(args[0])
elif event == "buffering" and args:
self.browser_state.playback_buffer_percent = int(args[0])
elif event == "cache" and args:
self.browser_state.playback_cache_seconds = float(args[0])
elif event == "resolution" and args:
self.browser_state.playback_resolution = str(args[0])
elif event == "error" and args:
self.browser_state.playback_paused = False
self.ui_queue.put(("error", str(args[0])))

BIN
src/r36s_dlna_browser/assets/NotoSans-Regular.ttf

Binary file not shown.

BIN
src/r36s_dlna_browser/assets/icons/audio.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 998 B

8
src/r36s_dlna_browser/assets/icons/audio.svg

@ -0,0 +1,8 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<!-- notehead -->
<ellipse cx="20" cy="50" rx="13" ry="9" transform="rotate(-18 20 50)" fill="#DCDCDC"/>
<!-- stem -->
<rect x="31" y="10" width="5" height="42" rx="2" fill="#DCDCDC"/>
<!-- flag -->
<path d="M36 10 Q54 16 52 32 Q44 24 36 28 Z" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 364 B

BIN
src/r36s_dlna_browser/assets/icons/folder.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 532 B

8
src/r36s_dlna_browser/assets/icons/folder.svg

@ -0,0 +1,8 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<!-- folder tab -->
<rect x="6" y="10" width="24" height="15" rx="4" fill="#DCDCDC"/>
<!-- separator so tab reads separately from body -->
<rect x="6" y="22" width="52" height="3" fill="#181820"/>
<!-- folder body -->
<rect x="6" y="22" width="52" height="34" rx="5" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 387 B

BIN
src/r36s_dlna_browser/assets/icons/hud-display.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 636 B

5
src/r36s_dlna_browser/assets/icons/hud-display.svg

@ -0,0 +1,5 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<rect x="10" y="14" width="44" height="28" rx="4" fill="none" stroke="#DCDCDC" stroke-width="4"/>
<rect x="26" y="46" width="12" height="4" rx="2" fill="#DCDCDC"/>
<rect x="20" y="52" width="24" height="4" rx="2" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 326 B

BIN
src/r36s_dlna_browser/assets/icons/hud-pause.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 322 B

4
src/r36s_dlna_browser/assets/icons/hud-pause.svg

@ -0,0 +1,4 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<rect x="18" y="14" width="10" height="36" rx="3" fill="#DCDCDC"/>
<rect x="36" y="14" width="10" height="36" rx="3" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 228 B

BIN
src/r36s_dlna_browser/assets/icons/hud-play.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 410 B

3
src/r36s_dlna_browser/assets/icons/hud-play.svg

@ -0,0 +1,3 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<polygon points="20,14 50,32 20,50" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 145 B

BIN
src/r36s_dlna_browser/assets/icons/hud-seek.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 610 B

5
src/r36s_dlna_browser/assets/icons/hud-seek.svg

@ -0,0 +1,5 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<polygon points="10,32 26,20 26,44" fill="#DCDCDC"/>
<polygon points="26,32 42,20 42,44" fill="#DCDCDC"/>
<polygon points="54,32 38,20 38,44" fill="#DCDCDC" opacity="0.4"/>
</svg>

After

Width:  |  Height:  |  Size: 269 B

BIN
src/r36s_dlna_browser/assets/icons/hud-stop.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 338 B

3
src/r36s_dlna_browser/assets/icons/hud-stop.svg

@ -0,0 +1,3 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<rect x="18" y="18" width="28" height="28" rx="4" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 159 B

BIN
src/r36s_dlna_browser/assets/icons/hud-volume.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

5
src/r36s_dlna_browser/assets/icons/hud-volume.svg

@ -0,0 +1,5 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<polygon points="12,26 22,26 34,16 34,48 22,38 12,38" fill="#DCDCDC"/>
<path d="M41 24 C46 27, 46 37, 41 40" fill="none" stroke="#DCDCDC" stroke-width="4" stroke-linecap="round"/>
<path d="M46 18 C55 23, 55 41, 46 46" fill="none" stroke="#DCDCDC" stroke-width="4" stroke-linecap="round"/>
</svg>

After

Width:  |  Height:  |  Size: 385 B

BIN
src/r36s_dlna_browser/assets/icons/image.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

10
src/r36s_dlna_browser/assets/icons/image.svg

@ -0,0 +1,10 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<!-- photo frame -->
<rect x="4" y="8" width="56" height="48" rx="6" fill="#DCDCDC"/>
<!-- photo area (dark inner) -->
<rect x="10" y="14" width="44" height="36" rx="3" fill="#181820"/>
<!-- mountain silhouette -->
<polygon points="10,50 22,30 34,42 44,28 54,50" fill="#DCDCDC"/>
<!-- sun -->
<circle cx="44" cy="22" r="6" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 447 B

BIN
src/r36s_dlna_browser/assets/icons/playing.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 378 B

6
src/r36s_dlna_browser/assets/icons/playing.svg

@ -0,0 +1,6 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<!-- equalizer bars: left short, center tall, right medium -->
<rect x="8" y="34" width="13" height="22" rx="3" fill="#DCDCDC"/>
<rect x="26" y="14" width="13" height="42" rx="3" fill="#DCDCDC"/>
<rect x="44" y="24" width="13" height="32" rx="3" fill="#DCDCDC"/>
</svg>

After

Width:  |  Height:  |  Size: 363 B

100
src/r36s_dlna_browser/assets/icons/review.html

@ -0,0 +1,100 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>R36S Icons Review</title>
<style>
body { background: #181820; margin: 0; padding: 40px; font-family: sans-serif; color: #dcdcdc; }
h1 { font-size: 18px; margin-bottom: 32px; }
.grid { display: flex; gap: 40px; flex-wrap: wrap; align-items: flex-start; }
.item { display: flex; flex-direction: column; align-items: center; gap: 12px; }
.item img { image-rendering: pixelated; }
.label { font-size: 13px; color: #888; }
.sizes { display: flex; gap: 16px; align-items: center; }
/* simulate a list row */
.row { background: #18182018; display: flex; align-items: center; gap: 8px;
padding: 4px 8px; border-radius: 3px; width: 300px; }
.row.hi { background: #3250a0; }
.row span { font-size: 14px; color: #dcdcdc; }
.rows { display: flex; flex-direction: column; gap: 0px; margin-top: 40px; }
</style>
</head>
<body>
<h1>R36S DLNA Browser — Icons Review (64×64)</h1>
<div class="grid">
<div class="item">
<div class="sizes">
<img src="folder.png" width="64" height="64" alt="folder">
<img src="folder.png" width="32" height="32" alt="folder">
<img src="folder.png" width="20" height="20" alt="folder">
</div>
<div class="label">folder</div>
</div>
<div class="item">
<div class="sizes">
<img src="audio.png" width="64" height="64" alt="audio">
<img src="audio.png" width="32" height="32" alt="audio">
<img src="audio.png" width="20" height="20" alt="audio">
</div>
<div class="label">audio</div>
</div>
<div class="item">
<div class="sizes">
<img src="video.png" width="64" height="64" alt="video">
<img src="video.png" width="32" height="32" alt="video">
<img src="video.png" width="20" height="20" alt="video">
</div>
<div class="label">video</div>
</div>
<div class="item">
<div class="sizes">
<img src="image.png" width="64" height="64" alt="image">
<img src="image.png" width="32" height="32" alt="image">
<img src="image.png" width="20" height="20" alt="image">
</div>
<div class="label">image</div>
</div>
<div class="item">
<div class="sizes">
<img src="playing.png" width="64" height="64" alt="playing">
<img src="playing.png" width="32" height="32" alt="playing">
<img src="playing.png" width="20" height="20" alt="playing">
</div>
<div class="label">playing</div>
</div>
<div class="item">
<div class="sizes">
<img src="server.png" width="64" height="64" alt="server">
<img src="server.png" width="32" height="32" alt="server">
<img src="server.png" width="20" height="20" alt="server">
</div>
<div class="label">server</div>
</div>
</div>
<div class="rows">
<h2 style="font-size:14px; margin-bottom:8px;">Simulated list rows (20px icon)</h2>
<div class="row hi">
<img src="folder.png" width="20" height="20"><span>Music</span>
</div>
<div class="row">
<img src="folder.png" width="20" height="20"><span>Videos</span>
</div>
<div class="row">
<img src="audio.png" width="20" height="20"><span>Song Title.flac</span>
</div>
<div class="row">
<img src="video.png" width="20" height="20"><span>Movie.mkv</span>
</div>
<div class="row">
<img src="image.png" width="20" height="20"><span>photo.jpg</span>
</div>
<div class="row">
<img src="playing.png" width="20" height="20"><span>Now Playing.mkv</span>
</div>
<div class="row">
<img src="server.png" width="20" height="20"><span>MiniDLNA [10.0.0.181]</span>
</div>
</div>
</body>
</html>

BIN
src/r36s_dlna_browser/assets/icons/server.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

12
src/r36s_dlna_browser/assets/icons/server.svg

@ -0,0 +1,12 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<!-- broadcast dot -->
<circle cx="32" cy="36" r="5" fill="#DCDCDC"/>
<!-- mast -->
<rect x="30" y="40" width="4" height="14" rx="2" fill="#DCDCDC"/>
<!-- base -->
<rect x="18" y="52" width="28" height="6" rx="3" fill="#DCDCDC"/>
<!-- inner signal arc -->
<path d="M20 30 Q32 20 44 30" stroke="#DCDCDC" stroke-width="4.5" fill="none" stroke-linecap="round"/>
<!-- outer signal arc -->
<path d="M10 22 Q32 8 54 22" stroke="#DCDCDC" stroke-width="4.5" fill="none" stroke-linecap="round"/>
</svg>

After

Width:  |  Height:  |  Size: 598 B

BIN
src/r36s_dlna_browser/assets/icons/video.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 KiB

6
src/r36s_dlna_browser/assets/icons/video.svg

@ -0,0 +1,6 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<!-- rounded screen -->
<rect x="4" y="8" width="56" height="48" rx="7" fill="#DCDCDC"/>
<!-- play triangle cutout (dark) -->
<polygon points="22,17 22,47 50,32" fill="#181820"/>
</svg>

After

Width:  |  Height:  |  Size: 278 B

0
src/r36s_dlna_browser/dlna/__init__.py

150
src/r36s_dlna_browser/dlna/browser_state.py

@ -0,0 +1,150 @@
"""Navigation stack, pagination and in-memory cache for the browser."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional
from r36s_dlna_browser.dlna.models import MediaServer, MediaItem
from r36s_dlna_browser.ui import theme
@dataclass
class _Level:
"""One level of the navigation stack."""
server_location: str
object_id: str
items: List[MediaItem] = field(default_factory=list)
cursor: int = 0
scroll_offset: int = 0
class BrowserState:
"""Tracks the server list, the current navigation stack and cursor position."""
def __init__(self) -> None:
self.servers: List[MediaServer] = []
self.server_cursor: int = 0
self.server_scroll_offset: int = 0
self._stack: List[_Level] = []
self.loading: bool = False
self.error: str = ""
self.playback_title: str = ""
self.playback_paused: bool = False
self.playback_position: float = 0.0
self.playback_duration: float = 0.0
self.playback_volume: int = 100
self.playback_cache_seconds: float = 0.0
self.playback_buffer_percent: int = 0
self.playback_resolution: str = ""
self.playback_hud_visible: bool = True
self.playback_hud_mode: int = theme.PLAYBACK_HUD_AUTO
self.playback_backend: str = "gstreamer-sdl"
self.playback_video_driver: str = ""
# ── server list ──────────────────────────────────────────────
def set_servers(self, servers: List[MediaServer]) -> None:
self.servers = servers
self.server_cursor = 0
self.server_scroll_offset = 0
@property
def selected_server(self) -> Optional[MediaServer]:
if 0 <= self.server_cursor < len(self.servers):
return self.servers[self.server_cursor]
return None
# ── browse stack ─────────────────────────────────────────────
@property
def in_browse_mode(self) -> bool:
return len(self._stack) > 0
@property
def current_level(self) -> Optional[_Level]:
return self._stack[-1] if self._stack else None
@property
def current_items(self) -> List[MediaItem]:
lv = self.current_level
return lv.items if lv else []
@property
def cursor(self) -> int:
lv = self.current_level
return lv.cursor if lv else 0
@cursor.setter
def cursor(self, value: int) -> None:
lv = self.current_level
if lv:
lv.cursor = max(0, min(value, len(lv.items) - 1)) if lv.items else 0
@property
def scroll_offset(self) -> int:
lv = self.current_level
return lv.scroll_offset if lv else 0
@scroll_offset.setter
def scroll_offset(self, value: int) -> None:
lv = self.current_level
if lv:
lv.scroll_offset = value
def selected_item(self) -> Optional[MediaItem]:
items = self.current_items
c = self.cursor
if 0 <= c < len(items):
return items[c]
return None
def set_items(self, items: List[MediaItem], object_id: str) -> None:
"""Push or update items for the given object_id."""
lv = self.current_level
if lv and lv.object_id == object_id:
lv.items = items
lv.cursor = 0
lv.scroll_offset = 0
else:
# Find server_location from the stack or current server
loc = ""
if self._stack:
loc = self._stack[-1].server_location
elif self.selected_server:
loc = self.selected_server.location
self._stack.append(_Level(
server_location=loc,
object_id=object_id,
items=items,
))
def go_back(self) -> bool:
"""Pop one level. Returns True if we went back, False if at root."""
if self._stack:
self._stack.pop()
return True
return False
def enter_server(self, server: MediaServer) -> str:
"""Start browsing a server. Returns the root object_id."""
self._stack.clear()
self._stack.append(_Level(
server_location=server.location, object_id="0"
))
return "0"
def reset(self) -> None:
self._stack.clear()
self.error = ""
self.playback_title = ""
self.playback_paused = False
self.playback_position = 0.0
self.playback_duration = 0.0
self.playback_cache_seconds = 0.0
self.playback_buffer_percent = 0
self.playback_resolution = ""
self.playback_hud_visible = True
self.playback_hud_mode = theme.PLAYBACK_HUD_AUTO

221
src/r36s_dlna_browser/dlna/client.py

@ -0,0 +1,221 @@
"""ContentDirectory client – browse containers and items via UPnP."""
from __future__ import annotations
import logging
from typing import List
from xml.etree import ElementTree
import aiohttp
from r36s_dlna_browser.dlna.models import MediaItem, ItemType, classify_upnp_class
log = logging.getLogger(__name__)
_NS = {
"s": "http://schemas.xmlsoap.org/soap/envelope/",
"u": "urn:schemas-upnp-org:service:ContentDirectory:1",
"didl": "urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/",
"dc": "http://purl.org/dc/elements/1.1/",
"upnp": "urn:schemas-upnp-org:metadata-1-0/upnp/",
}
_BROWSE_BODY = """<?xml version="1.0" encoding="utf-8"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:Browse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1">
<ObjectID>{object_id}</ObjectID>
<BrowseFlag>BrowseDirectChildren</BrowseFlag>
<Filter>*</Filter>
<StartingIndex>{start}</StartingIndex>
<RequestedCount>{count}</RequestedCount>
<SortCriteria></SortCriteria>
</u:Browse>
</s:Body>
</s:Envelope>"""
def _text(el: ElementTree.Element | None) -> str:
if el is not None and el.text:
return el.text.strip()
return ""
def _local_name(tag: str) -> str:
return tag.rsplit("}", 1)[-1]
def _extract_browse_result(xml_text: str) -> str | None:
"""Extract the Browse Result payload from a SOAP response.
DLNA servers are inconsistent here: some return a namespaced Result element,
some return an unqualified Result element, and some may embed the DIDL as
children instead of escaped text. This helper accepts all three shapes.
"""
try:
soap = ElementTree.fromstring(xml_text)
except ElementTree.ParseError as exc:
log.warning("SOAP parse error: %s", exc)
return None
for element in soap.iter():
if _local_name(element.tag) != "Result":
continue
if element.text and element.text.strip():
return element.text.strip()
if len(element):
return "".join(
ElementTree.tostring(child, encoding="unicode")
for child in element
).strip()
return None
def _parse_didl(didl_xml: str, base_url: str) -> List[MediaItem]:
"""Parse a DIDL-Lite XML fragment into MediaItem objects."""
items: list[MediaItem] = []
try:
root = ElementTree.fromstring(didl_xml)
except ElementTree.ParseError as exc:
log.warning("DIDL parse error: %s", exc)
return items
for container in root.findall("didl:container", _NS):
title = _text(container.find("dc:title", _NS))
obj_id = container.get("id", "")
parent_id = container.get("parentID", "0")
child_count = int(container.get("childCount", "0") or "0")
items.append(MediaItem(
object_id=obj_id,
title=title,
item_type=ItemType.CONTAINER,
parent_id=parent_id,
child_count=child_count,
))
for item_el in root.findall("didl:item", _NS):
title = _text(item_el.find("dc:title", _NS))
obj_id = item_el.get("id", "")
parent_id = item_el.get("parentID", "0")
upnp_class = _text(item_el.find("upnp:class", _NS))
album_art = _text(item_el.find("upnp:albumArtURI", _NS))
resource_url = ""
mime_type = ""
duration = ""
size = 0
res_el = item_el.find("didl:res", _NS)
if res_el is not None:
resource_url = res_el.text.strip() if res_el.text else ""
protocol_info = res_el.get("protocolInfo", "")
# Extract mime from protocolInfo: "http-get:*:audio/mpeg:*"
parts = protocol_info.split(":")
if len(parts) >= 3:
mime_type = parts[2]
duration = res_el.get("duration", "")
size = int(res_el.get("size", "0") or "0")
# Resolve relative URLs
if resource_url and not resource_url.startswith(("http://", "https://")):
resource_url = base_url.rstrip("/") + "/" + resource_url.lstrip("/")
if album_art and not album_art.startswith(("http://", "https://")):
album_art = base_url.rstrip("/") + "/" + album_art.lstrip("/")
items.append(MediaItem(
object_id=obj_id,
title=title,
item_type=classify_upnp_class(upnp_class),
parent_id=parent_id,
resource_url=resource_url,
mime_type=mime_type,
size=size,
duration=duration,
album_art_url=album_art,
))
return items
async def _get_content_directory_url(
session: aiohttp.ClientSession, location: str
) -> str | None:
"""Fetch device XML and find the ContentDirectory control URL."""
try:
async with session.get(location, timeout=aiohttp.ClientTimeout(total=5)) as resp:
xml_text = await resp.text()
root = ElementTree.fromstring(xml_text)
ns = {"d": "urn:schemas-upnp-org:device-1-0"}
for svc in root.findall(".//d:service", ns):
svc_type = _text(svc.find("d:serviceType", ns))
if "ContentDirectory" in svc_type:
ctrl = _text(svc.find("d:controlURL", ns))
if ctrl:
# Make absolute
from urllib.parse import urljoin
return urljoin(location, ctrl)
except Exception as exc:
log.warning("Could not get ContentDirectory URL from %s: %s", location, exc)
return None
class DLNAClient:
"""Browses a DLNA server's ContentDirectory."""
def __init__(self) -> None:
self._control_urls: dict[str, str] = {} # location → controlURL cache
async def browse(
self,
server_location: str,
object_id: str = "0",
start: int = 0,
count: int = 200,
) -> List[MediaItem]:
async with aiohttp.ClientSession() as session:
ctrl_url = self._control_urls.get(server_location)
if not ctrl_url:
ctrl_url = await _get_content_directory_url(session, server_location)
if not ctrl_url:
log.error("No ContentDirectory for %s", server_location)
return []
self._control_urls[server_location] = ctrl_url
body = _BROWSE_BODY.format(
object_id=object_id, start=start, count=count
)
headers = {
"Content-Type": 'text/xml; charset="utf-8"',
"SOAPAction": '"urn:schemas-upnp-org:service:ContentDirectory:1#Browse"',
}
try:
async with session.post(
ctrl_url,
data=body,
headers=headers,
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
xml_text = await resp.text()
except Exception as exc:
log.error("Browse request failed: %s", exc)
return []
# Extract the DIDL-Lite Result from the SOAP response
didl_xml = _extract_browse_result(xml_text)
if not didl_xml:
log.warning(
"Empty browse result for object_id=%s via %s",
object_id,
ctrl_url,
)
return []
# Derive base URL from server location for relative URL resolution
from urllib.parse import urlparse
parsed = urlparse(server_location)
base_url = f"{parsed.scheme}://{parsed.netloc}"
return _parse_didl(didl_xml, base_url)

105
src/r36s_dlna_browser/dlna/discovery.py

@ -0,0 +1,105 @@
"""SSDP-based discovery of DLNA MediaServers on the local network."""
from __future__ import annotations
import asyncio
import logging
import socket
from typing import List
from xml.etree import ElementTree
import aiohttp
from r36s_dlna_browser.dlna.models import MediaServer
log = logging.getLogger(__name__)
SSDP_ADDR = "239.255.255.250"
SSDP_PORT = 1900
MEDIA_SERVER_TYPE = "urn:schemas-upnp-org:device:MediaServer:1"
SSDP_MX = 3
SEARCH_TIMEOUT = 5.0
_SSDP_SEARCH = (
"M-SEARCH * HTTP/1.1\r\n"
f"HOST: {SSDP_ADDR}:{SSDP_PORT}\r\n"
'MAN: "ssdp:discover"\r\n'
f"MX: {SSDP_MX}\r\n"
f"ST: {MEDIA_SERVER_TYPE}\r\n"
"\r\n"
).encode()
class _SSDPProtocol(asyncio.DatagramProtocol):
"""Collects M-SEARCH responses."""
def __init__(self) -> None:
self.responses: list[tuple[str, str]] = [] # (location, server)
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
try:
text = data.decode("utf-8", errors="replace")
except Exception:
return
headers: dict[str, str] = {}
for line in text.split("\r\n"):
if ":" in line:
key, _, val = line.partition(":")
headers[key.strip().upper()] = val.strip()
location = headers.get("LOCATION", "")
st = headers.get("ST", "")
if location and MEDIA_SERVER_TYPE in st:
self.responses.append((location, headers.get("SERVER", "")))
def error_received(self, exc: Exception) -> None:
log.debug("SSDP error: %s", exc)
def connection_lost(self, exc: Exception | None) -> None:
pass
async def _fetch_friendly_name(session: aiohttp.ClientSession, location: str) -> str:
"""Fetch the device description XML and extract friendlyName."""
try:
async with session.get(location, timeout=aiohttp.ClientTimeout(total=5)) as resp:
xml_text = await resp.text()
root = ElementTree.fromstring(xml_text)
ns = {"d": "urn:schemas-upnp-org:device-1-0"}
el = root.find(".//d:device/d:friendlyName", ns)
if el is not None and el.text:
return el.text.strip()
except Exception as exc:
log.debug("Could not fetch description from %s: %s", location, exc)
return location
class DLNADiscovery:
"""Find DLNA MediaServers via SSDP M-SEARCH."""
async def find_servers(self, timeout: float = SEARCH_TIMEOUT) -> List[MediaServer]:
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
_SSDPProtocol,
local_addr=("0.0.0.0", 0),
family=socket.AF_INET,
)
assert isinstance(protocol, _SSDPProtocol)
try:
transport.sendto(_SSDP_SEARCH, (SSDP_ADDR, SSDP_PORT))
await asyncio.sleep(timeout)
finally:
transport.close()
seen: dict[str, str] = {}
for loc, srv in protocol.responses:
if loc not in seen:
seen[loc] = srv
servers: list[MediaServer] = []
if seen:
async with aiohttp.ClientSession() as session:
for loc in seen:
name = await _fetch_friendly_name(session, loc)
servers.append(MediaServer(friendly_name=name, location=loc))
log.info("Discovered %d DLNA server(s).", len(servers))
return servers

105
src/r36s_dlna_browser/dlna/models.py

@ -0,0 +1,105 @@
"""Domain models for DLNA servers, containers and media items."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class ItemType(Enum):
CONTAINER = "container"
AUDIO = "audio"
VIDEO = "video"
IMAGE = "image"
UNKNOWN = "unknown"
@dataclass
class MediaServer:
"""Represents a discovered DLNA MediaServer on the LAN."""
friendly_name: str
location: str # description URL
udn: str = ""
icon_url: str = ""
def __str__(self) -> str:
return self.friendly_name or self.location
@dataclass
class MediaItem:
"""A single content-directory entry (container or leaf item)."""
object_id: str
title: str
item_type: ItemType = ItemType.UNKNOWN
parent_id: str = "0"
resource_url: str = ""
mime_type: str = ""
size: int = 0
duration: str = ""
album_art_url: str = ""
child_count: int = 0
@property
def is_container(self) -> bool:
return self.item_type == ItemType.CONTAINER
def __str__(self) -> str:
return self.title or self.object_id
def classify_upnp_class(upnp_class: str) -> ItemType:
"""Map a UPnP class string to our ItemType enum."""
c = upnp_class.lower()
if "container" in c:
return ItemType.CONTAINER
if "audio" in c:
return ItemType.AUDIO
if "video" in c:
return ItemType.VIDEO
if "image" in c:
return ItemType.IMAGE
return ItemType.UNKNOWN
def parse_didl_item(didl_item: dict) -> MediaItem:
"""Build a MediaItem from a parsed DIDL-Lite dict (as returned by async-upnp-client)."""
title = didl_item.get("title", "")
object_id = didl_item.get("id", didl_item.get("@id", ""))
parent_id = didl_item.get("parent_id", didl_item.get("@parentID", "0"))
upnp_class = didl_item.get("upnp_class", didl_item.get("class", ""))
item_type = classify_upnp_class(upnp_class)
child_count = int(didl_item.get("child_count", didl_item.get("@childCount", 0)) or 0)
resource_url = ""
mime_type = ""
size = 0
duration = ""
resources = didl_item.get("resources", didl_item.get("res", []))
if isinstance(resources, dict):
resources = [resources]
if resources:
res = resources[0]
if isinstance(res, dict):
resource_url = res.get("url", res.get("uri", res.get("#text", "")))
mime_type = res.get("protocol_info", res.get("@protocolInfo", ""))
size = int(res.get("size", res.get("@size", 0)) or 0)
duration = res.get("duration", res.get("@duration", ""))
elif isinstance(res, str):
resource_url = res
album_art = didl_item.get("album_art_uri", didl_item.get("albumArtURI", ""))
return MediaItem(
object_id=str(object_id),
title=str(title),
item_type=item_type,
parent_id=str(parent_id),
resource_url=str(resource_url),
mime_type=str(mime_type),
size=size,
duration=str(duration),
album_art_url=str(album_art),
child_count=child_count,
)

0
src/r36s_dlna_browser/platform/__init__.py

62
src/r36s_dlna_browser/platform/controls.py

@ -0,0 +1,62 @@
"""Input mapping – translates SDL2 events to abstract actions."""
from __future__ import annotations
import ctypes
from enum import Enum, auto
from typing import Optional
import sdl2
class Action(Enum):
UP = auto()
DOWN = auto()
LEFT = auto()
RIGHT = auto()
CONFIRM = auto() # A / Return
BACK = auto() # B / Escape
PAGE_UP = auto() # L shoulder / PageUp
PAGE_DOWN = auto() # R shoulder / PageDown
HUD_MODE = auto() # Y / H
QUIT = auto() # Start / Q
# Keyboard mapping (for desktop testing)
_KEY_MAP: dict[int, Action] = {
sdl2.SDLK_UP: Action.UP,
sdl2.SDLK_DOWN: Action.DOWN,
sdl2.SDLK_LEFT: Action.PAGE_UP,
sdl2.SDLK_RIGHT: Action.PAGE_DOWN,
sdl2.SDLK_RETURN: Action.CONFIRM,
sdl2.SDLK_SPACE: Action.CONFIRM,
sdl2.SDLK_ESCAPE: Action.BACK,
sdl2.SDLK_BACKSPACE: Action.BACK,
sdl2.SDLK_q: Action.QUIT,
sdl2.SDLK_h: Action.HUD_MODE,
sdl2.SDLK_PAGEUP: Action.PAGE_UP,
sdl2.SDLK_PAGEDOWN: Action.PAGE_DOWN,
}
# GameController button mapping (R36S / generic controller)
_BUTTON_MAP: dict[int, Action] = {
sdl2.SDL_CONTROLLER_BUTTON_DPAD_UP: Action.UP,
sdl2.SDL_CONTROLLER_BUTTON_DPAD_DOWN: Action.DOWN,
sdl2.SDL_CONTROLLER_BUTTON_DPAD_LEFT: Action.PAGE_UP,
sdl2.SDL_CONTROLLER_BUTTON_DPAD_RIGHT: Action.PAGE_DOWN,
sdl2.SDL_CONTROLLER_BUTTON_A: Action.CONFIRM,
sdl2.SDL_CONTROLLER_BUTTON_B: Action.BACK,
sdl2.SDL_CONTROLLER_BUTTON_LEFTSHOULDER: Action.PAGE_UP,
sdl2.SDL_CONTROLLER_BUTTON_RIGHTSHOULDER: Action.PAGE_DOWN,
sdl2.SDL_CONTROLLER_BUTTON_Y: Action.HUD_MODE,
sdl2.SDL_CONTROLLER_BUTTON_START: Action.QUIT,
}
def map_key(event: sdl2.SDL_Event) -> Optional[Action]:
"""Map an SDL2 event to an abstract Action, or None."""
if event.type == sdl2.SDL_KEYDOWN:
return _KEY_MAP.get(event.key.keysym.sym)
if event.type == sdl2.SDL_CONTROLLERBUTTONDOWN:
return _BUTTON_MAP.get(event.cbutton.button)
return None

32
src/r36s_dlna_browser/platform/runtime.py

@ -0,0 +1,32 @@
"""Runtime detection, logging setup and platform helpers."""
from __future__ import annotations
import logging
import os
import platform
import sys
log = logging.getLogger(__name__)
def is_r36s() -> bool:
"""Heuristic: running on aarch64/armv7l with a small screen."""
return platform.machine() in ("aarch64", "armv7l")
def configure_logging() -> None:
level_name = os.environ.get("R36S_LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)
fmt = "%(asctime)s %(name)s %(levelname)s %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
def sdl_env_setup() -> None:
"""Set SDL environment hints for the R36S if needed."""
if is_r36s():
# Force fbdev or kmsdrm if no display server
if not os.environ.get("DISPLAY") and not os.environ.get("WAYLAND_DISPLAY"):
os.environ.setdefault("SDL_VIDEODRIVER", "kmsdrm")
os.environ.setdefault("SDL_AUDIODRIVER", "alsa")

0
src/r36s_dlna_browser/player/__init__.py

46
src/r36s_dlna_browser/player/backend.py

@ -0,0 +1,46 @@
"""Abstract player interface."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Callable
class PlayerBackend(ABC):
"""Minimal playback interface used by the application."""
@abstractmethod
def attach_window(self, window: object) -> None: ...
@abstractmethod
def set_event_callback(self, callback: Callable[..., None] | None) -> None: ...
@abstractmethod
def set_viewport(self, width: int, height: int, top: int, bottom: int, left: int, right: int) -> None: ...
@abstractmethod
def has_new_frame(self) -> bool: ...
@abstractmethod
def render(self, renderer: object) -> bool: ...
@abstractmethod
def play(self, url: str) -> None: ...
@abstractmethod
def stop(self) -> None: ...
@abstractmethod
def toggle_pause(self) -> bool: ...
@abstractmethod
def seek(self, seconds: int) -> None: ...
@abstractmethod
def change_volume(self, delta: int) -> int: ...
@abstractmethod
def shutdown(self) -> None: ...
@abstractmethod
def is_playing(self) -> bool: ...

440
src/r36s_dlna_browser/player/gstreamer_backend.py

@ -0,0 +1,440 @@
"""Integrated GStreamer playback backend rendered through SDL textures."""
from __future__ import annotations
import ctypes
import logging
import os
import threading
import time
from dataclasses import dataclass
from typing import Any, Callable
import gi
import sdl2
from sdl2 import version as sdl_version
from r36s_dlna_browser.player.backend import PlayerBackend
gi.require_version("Gst", "1.0")
gi.require_version("GstApp", "1.0")
gi.require_version("GstVideo", "1.0")
from gi.repository import Gst, GstApp, GstVideo
log = logging.getLogger(__name__)
Gst.init(None)
def _wm_subsystem_name(subsystem: int) -> str:
mapping = {
sdl2.SDL_SYSWM_UNKNOWN: "unknown",
sdl2.SDL_SYSWM_WINDOWS: "windows",
sdl2.SDL_SYSWM_X11: "x11",
sdl2.SDL_SYSWM_DIRECTFB: "directfb",
sdl2.SDL_SYSWM_COCOA: "cocoa",
sdl2.SDL_SYSWM_UIKIT: "uikit",
sdl2.SDL_SYSWM_WAYLAND: "wayland",
sdl2.SDL_SYSWM_MIR: "mir",
sdl2.SDL_SYSWM_WINRT: "winrt",
sdl2.SDL_SYSWM_ANDROID: "android",
sdl2.SDL_SYSWM_VIVANTE: "vivante",
sdl2.SDL_SYSWM_OS2: "os2",
sdl2.SDL_SYSWM_HAIKU: "haiku",
sdl2.SDL_SYSWM_KMSDRM: "kmsdrm",
sdl2.SDL_SYSWM_RISCOS: "riscos",
}
return mapping.get(subsystem, f"wm-{subsystem}")
def _resolve_window_subsystem(window: object) -> str:
if not window:
return "unbound"
info = sdl2.SDL_SysWMinfo()
sdl_version.SDL_VERSION(info.version)
if not sdl2.SDL_GetWindowWMInfo(window, ctypes.byref(info)):
return "unknown"
return _wm_subsystem_name(info.subsystem)
def _as_uri(url: str) -> str:
if "://" in url:
return url
return Gst.filename_to_uri(os.path.abspath(url))
class _NoopCallback:
def __call__(self, *args: Any) -> None:
return None
@dataclass
class _Frame:
width: int
height: int
pitch: int
pixels: bytes
class GStreamerBackend(PlayerBackend):
"""Decode video with GStreamer and render frames through the SDL renderer."""
def __init__(
self,
gst_module=Gst,
gst_app_module=GstApp,
gst_video_module=GstVideo,
appsink_factory: Callable[[], Any] | None = None,
playbin_factory: Callable[[], Any] | None = None,
subsystem: str = "test",
) -> None:
self._gst = gst_module
self._gst_app = gst_app_module
self._gst_video = gst_video_module
self._appsink_factory = appsink_factory or self._default_appsink_factory
self._playbin_factory = playbin_factory or self._default_playbin_factory
self._event_callback: Callable[..., None] = _NoopCallback()
self._window = None
self._wm_subsystem = subsystem
self._viewport = (640, 480, 0, 0, 0, 0)
self._pipeline = None
self._video_sink = None
self._bus = None
self._bus_thread = None
self._bus_running = False
self._lock = threading.RLock()
self._frame_lock = threading.RLock()
self._is_playing = False
self._paused = False
self._volume = 100
self._buffer_percent = 0
self._last_query = 0.0
self._latest_frame: _Frame | None = None
self._frame_dirty = False
self._texture = None
self._texture_renderer = None
self._texture_size = (0, 0)
self._resolution = ""
def attach_window(self, window: object) -> None:
self._window = window
self._wm_subsystem = _resolve_window_subsystem(window)
log.info("Playback window subsystem: %s", self._wm_subsystem)
def set_event_callback(self, callback: Callable[..., None] | None) -> None:
self._event_callback = callback or _NoopCallback()
def set_viewport(self, width: int, height: int, top: int, bottom: int, left: int, right: int) -> None:
self._viewport = (width, height, top, bottom, left, right)
def has_new_frame(self) -> bool:
with self._frame_lock:
return self._frame_dirty
def render(self, renderer: object) -> bool:
with self._frame_lock:
frame = self._latest_frame
if frame is None:
return False
if self._texture is None or self._texture_renderer != renderer or self._texture_size != (frame.width, frame.height):
self._destroy_texture()
self._texture = sdl2.SDL_CreateTexture(
renderer,
sdl2.SDL_PIXELFORMAT_BGRA32,
sdl2.SDL_TEXTUREACCESS_STREAMING,
frame.width,
frame.height,
)
if not self._texture:
log.error("Could not create SDL texture for video frame.")
return False
self._texture_renderer = renderer
self._texture_size = (frame.width, frame.height)
if self._frame_dirty:
pixel_buffer = ctypes.create_string_buffer(frame.pixels)
result = sdl2.SDL_UpdateTexture(self._texture, None, pixel_buffer, frame.pitch)
if result != 0:
log.error("Could not upload SDL video texture: %s", sdl2.SDL_GetError())
return False
self._frame_dirty = False
dst = self._fit_frame_to_viewport(frame.width, frame.height)
sdl2.SDL_RenderCopy(renderer, self._texture, None, dst)
return True
def play(self, url: str) -> None:
if not url:
raise ValueError("Playback URL is empty.")
pipeline = self._ensure_pipeline()
uri = _as_uri(url)
with self._lock:
pipeline.set_state(self._gst.State.NULL)
pipeline.set_property("uri", uri)
self._paused = False
self._buffer_percent = 100
self._clear_frames()
result = pipeline.set_state(self._gst.State.PLAYING)
if result == self._gst.StateChangeReturn.FAILURE:
raise RuntimeError(f"Failed to start SDL-rendered GStreamer playback: {uri}")
self._set_playing(True, notify=False)
self._event_callback("buffering", 100)
log.info("Starting SDL-rendered GStreamer playback: %s", url)
def stop(self) -> None:
with self._lock:
if self._pipeline is None:
self._set_playing(False, notify=False)
return
self._pipeline.set_state(self._gst.State.NULL)
self._paused = False
self._clear_frames()
self._set_playing(False, notify=True)
def toggle_pause(self) -> bool:
with self._lock:
if self._pipeline is None:
return False
self._paused = not self._paused
target = self._gst.State.PAUSED if self._paused else self._gst.State.PLAYING
self._pipeline.set_state(target)
self._event_callback("paused", self._paused)
return self._paused
def seek(self, seconds: int) -> None:
with self._lock:
if self._pipeline is None or seconds == 0:
return
ok_pos, current = self._pipeline.query_position(self._gst.Format.TIME)
current_ns = current if ok_pos else 0
target_ns = max(0, current_ns + int(seconds * self._gst.SECOND))
self._pipeline.seek_simple(
self._gst.Format.TIME,
self._gst.SeekFlags.FLUSH | self._gst.SeekFlags.KEY_UNIT,
target_ns,
)
def change_volume(self, delta: int) -> int:
with self._lock:
if self._pipeline is None:
return self._volume
self._volume = max(0, min(130, self._volume + delta))
self._pipeline.set_property("volume", self._volume / 100.0)
self._event_callback("volume", self._volume)
return self._volume
def shutdown(self) -> None:
self._stop_bus_thread()
with self._lock:
if self._pipeline is not None:
self._pipeline.set_state(self._gst.State.NULL)
self._pipeline = None
self._video_sink = None
self._bus = None
self._clear_frames()
self._set_playing(False, notify=False)
def is_playing(self) -> bool:
return self._is_playing
def _default_playbin_factory(self):
pipeline = self._gst.ElementFactory.make("playbin", "player")
if pipeline is None:
raise RuntimeError("GStreamer playbin is not available.")
return pipeline
def _default_appsink_factory(self):
sink = self._gst.ElementFactory.make("appsink", "video-output")
if sink is None:
raise RuntimeError("GStreamer appsink is not available.")
return sink
def _ensure_pipeline(self):
with self._lock:
if self._pipeline is not None:
return self._pipeline
self._pipeline = self._playbin_factory()
self._video_sink = self._create_appsink()
self._pipeline.set_property("video-sink", self._video_sink)
self._pipeline.set_property("volume", self._volume / 100.0)
self._bus = self._pipeline.get_bus()
self._start_bus_thread()
return self._pipeline
def _create_appsink(self):
sink = self._appsink_factory()
sink.set_property("emit-signals", True)
sink.set_property("sync", True)
sink.set_property("max-buffers", 2)
sink.set_property("drop", True)
sink.set_property("caps", self._gst.Caps.from_string("video/x-raw,format=BGRA"))
sink.connect("new-sample", self._on_new_sample)
return sink
def _on_new_sample(self, sink) -> Any:
sample = sink.emit("pull-sample")
if sample is None:
return self._flow_ok()
buffer = sample.get_buffer()
caps = sample.get_caps()
if buffer is None or caps is None:
return self._flow_ok()
info = self._gst_video.VideoInfo.new_from_caps(caps)
if info is None:
return self._flow_ok()
width = int(info.width)
height = int(info.height)
pitch = int(info.stride[0]) if info.stride else width * 4
pixels = buffer.extract_dup(0, buffer.get_size())
resolution = f"{width}x{height}"
with self._frame_lock:
self._latest_frame = _Frame(width=width, height=height, pitch=pitch, pixels=pixels)
self._frame_dirty = True
if resolution != self._resolution:
self._resolution = resolution
self._event_callback("resolution", resolution)
return self._flow_ok()
def _flow_ok(self) -> Any:
flow_return = getattr(self._gst, "FlowReturn", None)
if flow_return is None:
return 0
return flow_return.OK
def _start_bus_thread(self) -> None:
if self._bus_thread is not None and self._bus_thread.is_alive():
return
self._bus_running = True
self._bus_thread = threading.Thread(target=self._poll_bus, name="gst-bus", daemon=True)
self._bus_thread.start()
def _stop_bus_thread(self) -> None:
self._bus_running = False
if self._bus_thread and self._bus_thread.is_alive():
self._bus_thread.join(timeout=1.0)
self._bus_thread = None
def _poll_bus(self) -> None:
while self._bus_running:
bus = self._bus
if bus is None:
time.sleep(0.05)
continue
message = bus.timed_pop_filtered(
100 * self._gst.MSECOND,
self._gst.MessageType.ERROR
| self._gst.MessageType.EOS
| self._gst.MessageType.BUFFERING
| self._gst.MessageType.STATE_CHANGED,
)
if message is not None:
self._handle_bus_message(message)
if time.monotonic() - self._last_query >= 0.25:
self._last_query = time.monotonic()
self._emit_playback_metrics()
def _handle_bus_message(self, message) -> None:
msg_type = message.type
if msg_type == self._gst.MessageType.EOS:
log.info("SDL-rendered GStreamer playback reached end-of-stream")
with self._lock:
if self._pipeline is not None:
self._pipeline.set_state(self._gst.State.NULL)
self._clear_frames()
self._set_playing(False, notify=True)
return
if msg_type == self._gst.MessageType.ERROR:
err, debug = message.parse_error()
text = err.message if hasattr(err, "message") else str(err)
if debug:
text = f"{text} ({debug})"
log.error("GStreamer playback error: %s", text)
with self._lock:
if self._pipeline is not None:
self._pipeline.set_state(self._gst.State.NULL)
self._clear_frames()
self._set_playing(False, notify=False)
self._event_callback("error", text)
return
if msg_type == self._gst.MessageType.BUFFERING:
percent = int(message.parse_buffering())
self._buffer_percent = percent
self._event_callback("buffering", percent)
return
if msg_type == self._gst.MessageType.STATE_CHANGED and message.src == self._pipeline:
_old, new, _pending = message.parse_state_changed()
paused = new == self._gst.State.PAUSED
if new == self._gst.State.PLAYING:
self._paused = False
self._event_callback("paused", False)
elif paused:
self._paused = True
self._event_callback("paused", True)
def _emit_playback_metrics(self) -> None:
with self._lock:
pipeline = self._pipeline
if pipeline is None or not self._is_playing:
return
ok_pos, position = pipeline.query_position(self._gst.Format.TIME)
ok_dur, duration = pipeline.query_duration(self._gst.Format.TIME)
volume = int(round(float(pipeline.get_property("volume") or 1.0) * 100))
if ok_pos:
self._event_callback("position", position / self._gst.SECOND)
if ok_dur and duration > 0:
self._event_callback("duration", duration / self._gst.SECOND)
self._event_callback("volume", volume)
self._event_callback("buffering", self._buffer_percent)
if self._resolution:
self._event_callback("resolution", self._resolution)
def _fit_frame_to_viewport(self, frame_width: int, frame_height: int) -> sdl2.SDL_Rect:
width, height, top, bottom, left, right = self._viewport
viewport_w = max(1, width - left - right)
viewport_h = max(1, height - top - bottom)
if frame_width <= 0 or frame_height <= 0:
return sdl2.SDL_Rect(left, top, viewport_w, viewport_h)
scale = min(viewport_w / frame_width, viewport_h / frame_height)
draw_w = max(1, int(frame_width * scale))
draw_h = max(1, int(frame_height * scale))
draw_x = left + (viewport_w - draw_w) // 2
draw_y = top + (viewport_h - draw_h) // 2
return sdl2.SDL_Rect(draw_x, draw_y, draw_w, draw_h)
def _destroy_texture(self) -> None:
if self._texture is not None:
sdl2.SDL_DestroyTexture(self._texture)
self._texture = None
self._texture_renderer = None
self._texture_size = (0, 0)
def _clear_frames(self) -> None:
with self._frame_lock:
self._latest_frame = None
self._frame_dirty = False
self._resolution = ""
self._destroy_texture()
def _set_playing(self, value: bool, notify: bool) -> None:
changed = self._is_playing != value
self._is_playing = value
if changed and notify:
self._event_callback("stopped" if not value else "started")

0
src/r36s_dlna_browser/ui/__init__.py

301
src/r36s_dlna_browser/ui/screens.py

@ -0,0 +1,301 @@
"""Screen rendering helpers for the SDL2 UI."""
from __future__ import annotations
import ctypes
from typing import TYPE_CHECKING
import sdl2
try:
import sdl2.sdlttf as ttf
except ImportError:
ttf = None # type: ignore[assignment]
from r36s_dlna_browser.ui import theme
if TYPE_CHECKING:
from r36s_dlna_browser.dlna.browser_state import BrowserState
def _color(rgba: tuple) -> sdl2.SDL_Color:
return sdl2.SDL_Color(rgba[0], rgba[1], rgba[2], rgba[3] if len(rgba) > 3 else 255)
def _render_text(
renderer: ctypes.c_void_p,
font: ctypes.c_void_p,
text: str,
x: int,
y: int,
color: tuple,
max_w: int = 0,
) -> None:
if not text or not font or not ttf:
return
encoded = text.encode("utf-8")
surf = ttf.TTF_RenderUTF8_Blended(font, encoded, _color(color))
if not surf:
return
tex = sdl2.SDL_CreateTextureFromSurface(renderer, surf)
w, h = surf.contents.w, surf.contents.h
sdl2.SDL_FreeSurface(surf)
if not tex:
return
if max_w and w > max_w:
w = max_w
dst = sdl2.SDL_Rect(x, y, w, h)
src = sdl2.SDL_Rect(0, 0, w, h)
sdl2.SDL_RenderCopy(renderer, tex, src, dst)
sdl2.SDL_DestroyTexture(tex)
def _fit_text(font: ctypes.c_void_p, text: str, max_w: int) -> str:
if not text or not font or not ttf or max_w <= 0:
return text
width = ctypes.c_int()
height = ctypes.c_int()
encoded = text.encode("utf-8")
if ttf.TTF_SizeUTF8(font, encoded, ctypes.byref(width), ctypes.byref(height)) == 0 and width.value <= max_w:
return text
ellipsis = "..."
ellipsis_width = ctypes.c_int()
if ttf.TTF_SizeUTF8(font, ellipsis.encode("utf-8"), ctypes.byref(ellipsis_width), ctypes.byref(height)) != 0:
return text
if ellipsis_width.value >= max_w:
return ""
trimmed = text
while trimmed:
trimmed = trimmed[:-1]
candidate = (trimmed.rstrip() + ellipsis).encode("utf-8")
if ttf.TTF_SizeUTF8(font, candidate, ctypes.byref(width), ctypes.byref(height)) == 0 and width.value <= max_w:
return trimmed.rstrip() + ellipsis
return ellipsis if ellipsis_width.value <= max_w else ""
def _fill_rect(renderer, x: int, y: int, w: int, h: int, color: tuple) -> None:
sdl2.SDL_SetRenderDrawBlendMode(renderer, sdl2.SDL_BLENDMODE_BLEND)
sdl2.SDL_SetRenderDrawColor(renderer, color[0], color[1], color[2],
color[3] if len(color) > 3 else 255)
rect = sdl2.SDL_Rect(x, y, w, h)
sdl2.SDL_RenderFillRect(renderer, rect)
def _draw_rect(renderer, x: int, y: int, w: int, h: int, color: tuple) -> None:
sdl2.SDL_SetRenderDrawBlendMode(renderer, sdl2.SDL_BLENDMODE_BLEND)
sdl2.SDL_SetRenderDrawColor(renderer, color[0], color[1], color[2],
color[3] if len(color) > 3 else 255)
rect = sdl2.SDL_Rect(x, y, w, h)
sdl2.SDL_RenderDrawRect(renderer, rect)
def _render_icon(renderer, texture, x: int, y: int) -> None:
"""Blit a pre-loaded SDL texture as an ICON_SIZE square."""
if not texture:
return
s = theme.ICON_SIZE
dst = sdl2.SDL_Rect(x, y, s, s)
sdl2.SDL_RenderCopy(renderer, texture, None, dst)
def _render_icon_small(renderer, texture, x: int, y: int) -> None:
if not texture:
return
s = theme.PLAYBACK_HUD_ICON_SIZE
dst = sdl2.SDL_Rect(x, y, s, s)
sdl2.SDL_RenderCopy(renderer, texture, None, dst)
def _format_time(seconds: float) -> str:
total = max(0, int(seconds))
mins, secs = divmod(total, 60)
hours, mins = divmod(mins, 60)
if hours:
return f"{hours}:{mins:02d}:{secs:02d}"
return f"{mins:02d}:{secs:02d}"
def _draw_progress_bar(renderer, x: int, y: int, w: int, h: int, progress: float) -> None:
_fill_rect(renderer, x, y, w, h, theme.PLAYBACK_OVERLAY_DIM)
fill_w = max(0, min(w, int(w * max(0.0, min(1.0, progress)))))
if fill_w:
_fill_rect(renderer, x, y, fill_w, h, theme.PLAYBACK_OVERLAY_ACCENT)
# ── Public drawing functions ─────────────────────────────────────
def draw_header(renderer, font, title: str) -> None:
_fill_rect(renderer, 0, 0, theme.SCREEN_W, theme.HEADER_H, theme.HEADER_BG)
_render_text(renderer, font, title, theme.LIST_PAD_LEFT, 8,
theme.TEXT_COLOR, theme.SCREEN_W - theme.LIST_PAD_LEFT * 2)
def draw_status_bar(renderer, font, text: str) -> None:
y = theme.SCREEN_H - theme.STATUS_H
_fill_rect(renderer, 0, y, theme.SCREEN_W, theme.STATUS_H, theme.STATUS_BG)
_render_text(renderer, font, text, theme.LIST_PAD_LEFT, y + 6,
theme.DIM_TEXT, theme.SCREEN_W - theme.LIST_PAD_LEFT * 2)
def draw_server_list(renderer, font, state: "BrowserState", icons: dict | None = None) -> None:
draw_header(renderer, font, "DLNA Servers")
if not state.servers:
msg = "Searching..." if state.loading else "No servers found. Press A to refresh."
_render_text(renderer, font, msg, theme.LIST_PAD_LEFT, theme.LIST_TOP + 20, theme.DIM_TEXT)
draw_status_bar(renderer, font, "A=Select Start=Quit")
return
srv_icon = icons.get("server") if icons else None
icon_gap = theme.ICON_SIZE + 4 if srv_icon else 0
text_x = theme.LIST_PAD_LEFT + icon_gap
vis = theme.VISIBLE_ITEMS
offset = state.server_scroll_offset
for i in range(vis):
idx = offset + i
if idx >= len(state.servers):
break
srv = state.servers[idx]
y = theme.LIST_TOP + i * theme.LIST_ITEM_H
if idx == state.server_cursor:
_fill_rect(renderer, 0, y, theme.SCREEN_W, theme.LIST_ITEM_H, theme.HIGHLIGHT_BG)
color = theme.HIGHLIGHT_TEXT
else:
color = theme.TEXT_COLOR
icon_y = y + (theme.LIST_ITEM_H - theme.ICON_SIZE) // 2
_render_icon(renderer, srv_icon, theme.LIST_PAD_LEFT, icon_y)
_render_text(renderer, font, str(srv), text_x, y + 4,
color, theme.SCREEN_W - text_x - theme.LIST_PAD_RIGHT)
draw_status_bar(renderer, font, f"A=Browse B=Refresh Start=Quit ({len(state.servers)} servers)")
def draw_browse_list(renderer, font, state: "BrowserState", icons: dict | None = None) -> None:
lv = state.current_level
title = "Browsing"
if lv:
title = f"/{lv.object_id}" if lv.object_id != "0" else "/"
draw_header(renderer, font, title)
items = state.current_items
if not items:
msg = "Loading..." if state.loading else "Empty folder."
_render_text(renderer, font, msg, theme.LIST_PAD_LEFT, theme.LIST_TOP + 20, theme.DIM_TEXT)
draw_status_bar(renderer, font, "B=Back")
return
has_icons = bool(icons)
icon_gap = theme.ICON_SIZE + 4 if has_icons else 0
text_x = theme.LIST_PAD_LEFT + icon_gap
vis = theme.VISIBLE_ITEMS
offset = state.scroll_offset
cursor = state.cursor
for i in range(vis):
idx = offset + i
if idx >= len(items):
break
item = items[idx]
y = theme.LIST_TOP + i * theme.LIST_ITEM_H
if idx == cursor:
_fill_rect(renderer, 0, y, theme.SCREEN_W, theme.LIST_ITEM_H, theme.HIGHLIGHT_BG)
color = theme.HIGHLIGHT_TEXT
else:
color = theme.TEXT_COLOR
icon_y = y + (theme.LIST_ITEM_H - theme.ICON_SIZE) // 2
if has_icons:
ico = icons.get(item.item_type) # type: ignore[call-overload]
_render_icon(renderer, ico, theme.LIST_PAD_LEFT, icon_y)
_render_text(renderer, font, item.title, text_x, y + 4,
color, theme.SCREEN_W - text_x - theme.LIST_PAD_RIGHT)
draw_status_bar(
renderer, font,
f"A=Open B=Back L/R=Page Start=Quit ({cursor+1}/{len(items)})"
)
def draw_playback(renderer, font, state: "BrowserState", icons: dict | None = None) -> None:
if not state.playback_hud_visible:
return
top_h = theme.PLAYBACK_HUD_TOP
bottom_h = theme.PLAYBACK_HUD_BOTTOM
width = theme.SCREEN_W
height = theme.SCREEN_H
left = theme.PLAYBACK_HUD_SIDE
right = theme.PLAYBACK_HUD_SIDE
status = "Paused" if state.playback_paused else "Playing"
total = max(0.0, state.playback_duration)
current = max(0.0, min(state.playback_position, total if total else state.playback_position))
progress = (current / total) if total > 0 else 0.0
progress_x = left
progress_w = width - left - right
status_icon = icons.get("hud-pause" if state.playback_paused else "hud-play") if icons else None
volume_icon = icons.get("hud-volume") if icons else None
seek_icon = icons.get("hud-seek") if icons else None
stop_icon = icons.get("hud-stop") if icons else None
hud_icon = icons.get("hud-display") if icons else None
hud_mode = {
theme.PLAYBACK_HUD_AUTO: "AUTO",
theme.PLAYBACK_HUD_PINNED: "FIXED",
theme.PLAYBACK_HUD_HIDDEN: "HIDDEN",
}.get(state.playback_hud_mode, "AUTO")
time_text = f"{_format_time(current)} / {_format_time(total)}"
time_w = 156
time_x = width - left - time_w
title_x = left + 98
title_w = max(0, time_x - title_x - 12)
title_text = _fit_text(font, state.playback_title or "...", title_w)
_fill_rect(renderer, 0, 0, width, top_h, theme.PLAYBACK_OVERLAY_BG)
_fill_rect(renderer, 0, height - bottom_h, width, bottom_h, theme.PLAYBACK_OVERLAY_BG)
_render_icon_small(renderer, status_icon, left, 7)
_render_text(renderer, font, status, left + 22, 5, theme.TEXT_COLOR, 70)
_render_text(renderer, font, title_text,
title_x, 5, theme.TEXT_COLOR, title_w)
_render_text(renderer, font, time_text,
time_x, 5, theme.DIM_TEXT, time_w)
progress_y = height - bottom_h + 10
_draw_progress_bar(renderer, progress_x, progress_y, progress_w, theme.PLAYBACK_PROGRESS_H, progress)
info_y = height - bottom_h + 23
_render_icon_small(renderer, volume_icon, left, info_y + 1)
_render_text(renderer, font, f"{state.playback_volume}%", left + 22, info_y - 1, theme.TEXT_COLOR, 64)
buffer_text = f"Buffer {state.playback_buffer_percent}%" if state.playback_buffer_percent < 100 else state.playback_resolution or ""
_render_text(renderer, font, buffer_text, left + 88, info_y - 1, theme.DIM_TEXT, 180)
_render_text(renderer, font, hud_mode, width - 70, info_y - 1, theme.DIM_TEXT, 60)
controls_y = height - 20
cx = left
_render_icon_small(renderer, volume_icon, cx, controls_y - 1)
_render_text(renderer, font, "U/D", cx + 20, controls_y - 2, theme.TEXT_COLOR, 38)
cx += 70
_render_icon_small(renderer, status_icon, cx, controls_y - 1)
_render_text(renderer, font, "A", cx + 20, controls_y - 2, theme.TEXT_COLOR, 18)
cx += 44
_render_icon_small(renderer, seek_icon, cx, controls_y - 1)
_render_text(renderer, font, "L/R", cx + 20, controls_y - 2, theme.TEXT_COLOR, 34)
cx += 62
_render_icon_small(renderer, stop_icon, cx, controls_y - 1)
_render_text(renderer, font, "B", cx + 20, controls_y - 2, theme.TEXT_COLOR, 18)
cx += 42
_render_icon_small(renderer, hud_icon, cx, controls_y - 1)
_render_text(renderer, font, "Y", cx + 20, controls_y - 2, theme.TEXT_COLOR, 18)
def draw_error(renderer, font, state: "BrowserState") -> None:
draw_header(renderer, font, "Error")
_render_text(renderer, font, state.error or "Unknown error",
theme.LIST_PAD_LEFT, theme.SCREEN_H // 2 - 20, theme.ERROR_COLOR,
theme.SCREEN_W - theme.LIST_PAD_LEFT * 2)
draw_status_bar(renderer, font, "A=OK")

463
src/r36s_dlna_browser/ui/sdl_app.py

@ -0,0 +1,463 @@
"""SDL2 window, event loop, rendering and input handling."""
from __future__ import annotations
import ctypes
import logging
import os
import queue
import time
from enum import Enum, auto
from typing import Any
import sdl2
try:
import sdl2.sdlttf as ttf
except ImportError:
ttf = None # type: ignore[assignment]
try:
import sdl2.sdlimage as sdl_img
except ImportError:
sdl_img = None # type: ignore[assignment]
from r36s_dlna_browser.dlna.browser_state import BrowserState
from r36s_dlna_browser.dlna.models import ItemType, MediaItem
from r36s_dlna_browser.player.backend import PlayerBackend
from r36s_dlna_browser.platform.controls import Action, map_key
from r36s_dlna_browser.ui import theme
from r36s_dlna_browser.ui import screens
log = logging.getLogger(__name__)
class Screen(Enum):
SERVERS = auto()
BROWSE = auto()
PLAYBACK = auto()
ERROR = auto()
class SDLApp:
"""Owns the SDL2 window and drives the main loop."""
def __init__(
self,
ui_queue: queue.Queue[Any],
cmd_queue: queue.Queue[Any],
browser_state: BrowserState,
player: PlayerBackend,
) -> None:
self._ui_q = ui_queue
self._cmd_q = cmd_queue
self._state = browser_state
self._player = player
self._screen = Screen.SERVERS
self._running = False
self._window = None
self._renderer = None
self._font = None
self._playback_font = None
self._icons: dict = {}
self._playback_clear_once = False
self._needs_redraw = True
self._last_playback_draw_at = 0.0
self._last_playback_snapshot: tuple[Any, ...] | None = None
self._last_playback_interaction_at = 0.0
# ── lifecycle ────────────────────────────────────────────────
def run(self) -> None:
self._init_sdl()
self._running = True
try:
while self._running:
self._poll_events()
self._process_ui_queue()
self._refresh_playback_hud_visibility()
if self._should_draw():
self._draw()
sdl2.SDL_Delay(theme.FRAME_DELAY_MS)
finally:
self._cleanup()
def _init_sdl(self) -> None:
sdl2.SDL_Init(sdl2.SDL_INIT_VIDEO | sdl2.SDL_INIT_GAMECONTROLLER)
if ttf:
ttf.TTF_Init()
self._window = sdl2.SDL_CreateWindow(
b"R36S DLNA Browser",
sdl2.SDL_WINDOWPOS_CENTERED,
sdl2.SDL_WINDOWPOS_CENTERED,
theme.SCREEN_W,
theme.SCREEN_H,
sdl2.SDL_WINDOW_SHOWN,
)
self._renderer = sdl2.SDL_CreateRenderer(
self._window, -1,
sdl2.SDL_RENDERER_ACCELERATED | sdl2.SDL_RENDERER_PRESENTVSYNC,
)
# Open first available gamecontroller
for i in range(sdl2.SDL_NumJoysticks()):
if sdl2.SDL_IsGameController(i):
sdl2.SDL_GameControllerOpen(i)
break
self._font = self._load_font(theme.FONT_SIZE)
self._playback_font = self._load_font(theme.PLAYBACK_FONT_SIZE)
if not self._font:
log.error("Could not load any font – UI text will be missing.")
self._icons = self._load_icons()
self._player.attach_window(self._window)
self._sync_player_viewport()
def _load_icons(self) -> dict:
icons: dict = {}
if not sdl_img:
return icons
sdl_img.IMG_Init(sdl_img.IMG_INIT_PNG)
mapping = [
("folder", ItemType.CONTAINER),
("audio", ItemType.AUDIO),
("video", ItemType.VIDEO),
("image", ItemType.IMAGE),
("playing", "playing"),
("server", "server"),
("hud-play", "hud-play"),
("hud-pause", "hud-pause"),
("hud-stop", "hud-stop"),
("hud-seek", "hud-seek"),
("hud-volume", "hud-volume"),
("hud-display", "hud-display"),
]
for fname, key in mapping:
path = theme.ICONS_DIR / f"{fname}.png"
if path.exists():
surf = sdl_img.IMG_Load(str(path).encode())
if surf:
tex = sdl2.SDL_CreateTextureFromSurface(self._renderer, surf)
sdl2.SDL_FreeSurface(surf)
if tex:
icons[key] = tex
log.debug("Loaded icon: %s", fname)
log.info("Loaded %d icon(s).", len(icons))
return icons
def _load_font(self, size: int):
if not ttf:
log.warning("SDL2_ttf not available – text rendering disabled.")
return None
for path in theme.FONT_SEARCH_PATHS:
expanded = os.path.expanduser(path)
if os.path.isfile(expanded):
f = ttf.TTF_OpenFont(expanded.encode(), size)
if f:
log.info("Loaded font: %s", expanded)
return f
return None
def _cleanup(self) -> None:
for tex in self._icons.values():
sdl2.SDL_DestroyTexture(tex)
self._icons = {}
if sdl_img:
sdl_img.IMG_Quit()
if self._font and ttf:
ttf.TTF_CloseFont(self._font)
if self._playback_font and ttf:
ttf.TTF_CloseFont(self._playback_font)
if self._renderer:
sdl2.SDL_DestroyRenderer(self._renderer)
if self._window:
sdl2.SDL_DestroyWindow(self._window)
if ttf:
ttf.TTF_Quit()
sdl2.SDL_Quit()
# ── events ───────────────────────────────────────────────────
def _poll_events(self) -> None:
event = sdl2.SDL_Event()
while sdl2.SDL_PollEvent(ctypes.byref(event)):
if event.type == sdl2.SDL_QUIT:
self._running = False
return
if event.type == sdl2.SDL_WINDOWEVENT and event.window.event in (
sdl2.SDL_WINDOWEVENT_MOVED,
sdl2.SDL_WINDOWEVENT_SIZE_CHANGED,
sdl2.SDL_WINDOWEVENT_RESIZED,
):
if self._window and event.window.windowID == sdl2.SDL_GetWindowID(self._window):
self._sync_player_viewport()
self._mark_dirty()
action = map_key(event)
if action is not None:
self._handle_action(action)
def _process_ui_queue(self) -> None:
while True:
try:
msg = self._ui_q.get_nowait()
except queue.Empty:
break
self._handle_ui_message(msg)
def _handle_ui_message(self, msg: tuple) -> None:
kind = msg[0]
if kind == "servers_updated":
self._state.loading = False
self._screen = Screen.SERVERS
elif kind == "items_updated":
self._state.loading = False
self._screen = Screen.BROWSE
elif kind == "playback_started":
self._state.playback_title = msg[1] if len(msg) > 1 else ""
self._state.playback_paused = False
self._show_playback_hud(force=True)
self._playback_clear_once = True
self._screen = Screen.PLAYBACK
elif kind == "playback_paused":
self._state.playback_paused = True
self._show_playback_hud(force=True)
elif kind == "playback_resumed":
self._state.playback_paused = False
self._show_playback_hud(force=True)
elif kind == "playback_stopped":
self._state.playback_paused = False
self._screen = Screen.BROWSE if self._state.in_browse_mode else Screen.SERVERS
elif kind == "error":
self._state.error = msg[1] if len(msg) > 1 else "Unknown error"
self._state.loading = False
self._screen = Screen.ERROR
self._mark_dirty()
# ── input dispatch ───────────────────────────────────────────
def _handle_action(self, action: Action) -> None:
if action == Action.QUIT:
self._running = False
return
if self._screen == Screen.ERROR:
if action in (Action.CONFIRM, Action.BACK):
self._state.error = ""
self._screen = Screen.BROWSE if self._state.in_browse_mode else Screen.SERVERS
self._mark_dirty()
return
if self._screen == Screen.PLAYBACK:
if action == Action.BACK:
self._cmd_q.put(("stop",))
elif action == Action.CONFIRM:
self._cmd_q.put(("toggle_pause",))
elif action == Action.UP:
self._cmd_q.put(("volume", 5))
elif action == Action.DOWN:
self._cmd_q.put(("volume", -5))
elif action == Action.PAGE_UP:
self._cmd_q.put(("seek", -10))
elif action == Action.PAGE_DOWN:
self._cmd_q.put(("seek", 10))
elif action == Action.HUD_MODE:
self._cycle_playback_hud_mode()
self._mark_dirty()
self._show_playback_hud(force=action != Action.HUD_MODE)
return
if self._screen == Screen.SERVERS:
self._handle_server_action(action)
elif self._screen == Screen.BROWSE:
self._handle_browse_action(action)
def _handle_server_action(self, action: Action) -> None:
n = len(self._state.servers)
if action == Action.UP and n:
self._state.server_cursor = max(0, self._state.server_cursor - 1)
self._adjust_server_scroll()
self._mark_dirty()
elif action == Action.DOWN and n:
self._state.server_cursor = min(n - 1, self._state.server_cursor + 1)
self._adjust_server_scroll()
self._mark_dirty()
elif action == Action.CONFIRM:
srv = self._state.selected_server
if srv:
obj_id = self._state.enter_server(srv)
self._state.loading = True
self._cmd_q.put(("browse", srv.location, obj_id))
else:
# No server selected – refresh
self._state.loading = True
self._cmd_q.put(("discover",))
self._mark_dirty()
elif action == Action.BACK:
# Refresh server list
self._state.loading = True
self._cmd_q.put(("discover",))
self._mark_dirty()
def _handle_browse_action(self, action: Action) -> None:
items = self._state.current_items
n = len(items)
if action == Action.UP and n:
self._state.cursor -= 1
self._adjust_browse_scroll()
self._mark_dirty()
elif action == Action.DOWN and n:
self._state.cursor += 1
self._adjust_browse_scroll()
self._mark_dirty()
elif action == Action.PAGE_UP and n:
self._state.cursor = max(0, self._state.cursor - theme.PAGE_SIZE)
self._adjust_browse_scroll()
self._mark_dirty()
elif action == Action.PAGE_DOWN and n:
self._state.cursor = min(n - 1, self._state.cursor + theme.PAGE_SIZE)
self._adjust_browse_scroll()
self._mark_dirty()
elif action == Action.CONFIRM:
item = self._state.selected_item()
if item:
if item.is_container:
self._state.loading = True
lv = self._state.current_level
loc = lv.server_location if lv else ""
self._cmd_q.put(("browse", loc, item.object_id))
elif item.resource_url:
self._cmd_q.put(("play", item.resource_url, item.title))
self._mark_dirty()
elif action == Action.BACK:
if not self._state.go_back():
self._screen = Screen.SERVERS
else:
self._screen = Screen.BROWSE
self._mark_dirty()
def _adjust_server_scroll(self) -> None:
vis = theme.VISIBLE_ITEMS
cur = self._state.server_cursor
off = self._state.server_scroll_offset
if cur < off:
self._state.server_scroll_offset = cur
elif cur >= off + vis:
self._state.server_scroll_offset = cur - vis + 1
def _adjust_browse_scroll(self) -> None:
vis = theme.VISIBLE_ITEMS
cur = self._state.cursor
off = self._state.scroll_offset
if cur < off:
self._state.scroll_offset = cur
elif cur >= off + vis:
self._state.scroll_offset = cur - vis + 1
# ── drawing ──────────────────────────────────────────────────
def _draw(self) -> None:
r = self._renderer
if self._screen == Screen.PLAYBACK:
sdl2.SDL_SetRenderDrawColor(r, *theme.PLAYBACK_BG_COLOR)
else:
sdl2.SDL_SetRenderDrawColor(r, *theme.BG_COLOR)
sdl2.SDL_RenderClear(r)
self._playback_clear_once = False
f = self._font
if self._screen == Screen.SERVERS:
screens.draw_server_list(r, f, self._state, self._icons)
elif self._screen == Screen.BROWSE:
screens.draw_browse_list(r, f, self._state, self._icons)
elif self._screen == Screen.PLAYBACK:
self._player.render(r)
screens.draw_playback(r, self._playback_font or f, self._state, self._icons)
elif self._screen == Screen.ERROR:
screens.draw_error(r, f, self._state)
sdl2.SDL_RenderPresent(r)
self._needs_redraw = False
if self._screen == Screen.PLAYBACK:
self._last_playback_draw_at = time.monotonic()
self._last_playback_snapshot = self._playback_snapshot()
def _mark_dirty(self) -> None:
self._needs_redraw = True
def _playback_snapshot(self) -> tuple[Any, ...]:
return (
self._state.playback_title,
self._state.playback_paused,
round(self._state.playback_position, 1),
round(self._state.playback_duration, 1),
self._state.playback_volume,
self._state.playback_buffer_percent,
self._state.playback_resolution,
self._state.playback_backend,
self._state.playback_hud_visible,
self._state.playback_hud_mode,
)
def _refresh_playback_hud_visibility(self) -> None:
if self._screen != Screen.PLAYBACK:
return
if self._state.playback_hud_mode == theme.PLAYBACK_HUD_HIDDEN:
return
if self._state.playback_hud_mode == theme.PLAYBACK_HUD_PINNED or self._state.playback_paused:
if not self._state.playback_hud_visible:
self._state.playback_hud_visible = True
self._mark_dirty()
return
visible = (time.monotonic() - self._last_playback_interaction_at) < (theme.PLAYBACK_HUD_AUTOHIDE_MS / 1000.0)
if visible != self._state.playback_hud_visible:
self._state.playback_hud_visible = visible
self._mark_dirty()
def _show_playback_hud(self, force: bool = False) -> None:
self._last_playback_interaction_at = time.monotonic()
if force or self._state.playback_hud_mode != theme.PLAYBACK_HUD_HIDDEN:
if not self._state.playback_hud_visible:
self._state.playback_hud_visible = True
self._mark_dirty()
def _cycle_playback_hud_mode(self) -> None:
next_mode = {
theme.PLAYBACK_HUD_AUTO: theme.PLAYBACK_HUD_PINNED,
theme.PLAYBACK_HUD_PINNED: theme.PLAYBACK_HUD_HIDDEN,
theme.PLAYBACK_HUD_HIDDEN: theme.PLAYBACK_HUD_AUTO,
}
self._state.playback_hud_mode = next_mode[self._state.playback_hud_mode]
self._state.playback_hud_visible = self._state.playback_hud_mode != theme.PLAYBACK_HUD_HIDDEN
self._last_playback_interaction_at = time.monotonic()
def _should_draw(self) -> bool:
if self._needs_redraw:
return True
if self._screen != Screen.PLAYBACK:
return False
if self._player.has_new_frame():
return True
now = time.monotonic()
if now - self._last_playback_draw_at < (theme.PLAYBACK_HUD_REFRESH_MS / 1000.0):
return False
return self._playback_snapshot() != self._last_playback_snapshot
def _sync_player_viewport(self) -> None:
width = ctypes.c_int()
height = ctypes.c_int()
if self._window:
sdl2.SDL_GetWindowSize(self._window, ctypes.byref(width), ctypes.byref(height))
if width.value <= 0 or height.value <= 0:
width.value = theme.SCREEN_W
height.value = theme.SCREEN_H
self._player.set_viewport(
width.value,
height.value,
theme.PLAYBACK_HUD_TOP,
theme.PLAYBACK_HUD_BOTTOM,
theme.PLAYBACK_VIDEO_LEFT,
width.value - theme.PLAYBACK_VIDEO_RIGHT,
)

80
src/r36s_dlna_browser/ui/theme.py

@ -0,0 +1,80 @@
"""UI layout constants for a 640×480 display."""
from __future__ import annotations
from pathlib import Path
# Screen
SCREEN_W = 640
SCREEN_H = 480
# Colors (R, G, B, A)
BG_COLOR = (24, 24, 32, 255)
PLAYBACK_BG_COLOR = (0, 0, 0, 255)
TEXT_COLOR = (220, 220, 220, 255)
HIGHLIGHT_BG = (50, 80, 160, 255)
HIGHLIGHT_TEXT = (255, 255, 255, 255)
DIM_TEXT = (140, 140, 150, 255)
ERROR_COLOR = (220, 60, 60, 255)
HEADER_BG = (32, 32, 48, 255)
STATUS_BG = (20, 20, 28, 255)
# Typography
FONT_SIZE = 20
HEADER_FONT_SIZE = 24
PLAYBACK_FONT_SIZE = 14
# Layout
HEADER_H = 40
STATUS_H = 32
LIST_TOP = HEADER_H
LIST_BOTTOM = SCREEN_H - STATUS_H
LIST_ITEM_H = 28
VISIBLE_ITEMS = (LIST_BOTTOM - LIST_TOP) // LIST_ITEM_H
LIST_PAD_LEFT = 12
LIST_PAD_RIGHT = 12
# Page jump size (L/R shoulder)
PAGE_SIZE = VISIBLE_ITEMS
# Timing
TARGET_FPS = 30
FRAME_DELAY_MS = 1000 // TARGET_FPS
PLAYBACK_HUD_REFRESH_MS = 250
PLAYBACK_HUD_AUTOHIDE_MS = 2500
PLAYBACK_HUD_HIDDEN = 0
PLAYBACK_HUD_AUTO = 1
PLAYBACK_HUD_PINNED = 2
PACKAGE_ROOT = Path(__file__).resolve().parents[1]
ASSETS_DIR = PACKAGE_ROOT / "assets"
ICONS_DIR = ASSETS_DIR / "icons"
ICON_SIZE = 20
# Playback HUD / video viewport
PLAYBACK_HUD_TOP = 30
PLAYBACK_HUD_BOTTOM = 50
PLAYBACK_HUD_SIDE = 10
PLAYBACK_PROGRESS_H = 8
PLAYBACK_OVERLAY_BG = (10, 14, 22, 170)
PLAYBACK_OVERLAY_ACCENT = (84, 124, 224, 255)
PLAYBACK_OVERLAY_DIM = (220, 220, 220, 72)
PLAYBACK_PILL_BG = (8, 12, 18, 210)
PLAYBACK_HUD_ICON_SIZE = 16
PLAYBACK_VIDEO_TOP = PLAYBACK_HUD_TOP
PLAYBACK_VIDEO_BOTTOM = SCREEN_H - PLAYBACK_HUD_BOTTOM
PLAYBACK_VIDEO_LEFT = 0
PLAYBACK_VIDEO_RIGHT = SCREEN_W
# Font search paths (in order)
FONT_SEARCH_PATHS = [
str(ASSETS_DIR / "NotoSans-Regular.ttf"),
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/TTF/DejaVuSans.ttf",
"/usr/share/fonts/dejavu-sans-fonts/DejaVuSans.ttf",
"/usr/share/fonts/google-noto-vf/NotoSans[wght].ttf",
"/usr/share/fonts/liberation-sans-fonts/LiberationSans-Regular.ttf",
]

131
tests/test_client_parser.py

@ -0,0 +1,131 @@
"""Tests for the DLNAClient SOAP and DIDL-Lite parsers."""
from r36s_dlna_browser.dlna.client import _extract_browse_result, _parse_didl
from r36s_dlna_browser.dlna.models import ItemType
_SAMPLE_DIDL = """\
<DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/">
<container id="1" parentID="0" childCount="3">
<dc:title>Music</dc:title>
<upnp:class>object.container.storageFolder</upnp:class>
</container>
<item id="100" parentID="1">
<dc:title>Song.mp3</dc:title>
<upnp:class>object.item.audioItem.musicTrack</upnp:class>
<upnp:albumArtURI>http://srv/art.jpg</upnp:albumArtURI>
<res protocolInfo="http-get:*:audio/mpeg:*" size="4000000" duration="0:03:21">http://srv/song.mp3</res>
</item>
<item id="200" parentID="1">
<dc:title>Video.mkv</dc:title>
<upnp:class>object.item.videoItem</upnp:class>
<res protocolInfo="http-get:*:video/x-matroska:*">http://srv/video.mkv</res>
</item>
</DIDL-Lite>
"""
class TestParseDIDL:
def test_parses_container(self):
items = _parse_didl(_SAMPLE_DIDL, "http://srv")
containers = [i for i in items if i.is_container]
assert len(containers) == 1
assert containers[0].title == "Music"
assert containers[0].child_count == 3
def test_parses_audio_item(self):
items = _parse_didl(_SAMPLE_DIDL, "http://srv")
audio = [i for i in items if i.item_type == ItemType.AUDIO]
assert len(audio) == 1
assert audio[0].title == "Song.mp3"
assert audio[0].resource_url == "http://srv/song.mp3"
assert audio[0].mime_type == "audio/mpeg"
assert audio[0].size == 4000000
assert audio[0].duration == "0:03:21"
assert audio[0].album_art_url == "http://srv/art.jpg"
def test_parses_video_item(self):
items = _parse_didl(_SAMPLE_DIDL, "http://srv")
video = [i for i in items if i.item_type == ItemType.VIDEO]
assert len(video) == 1
assert video[0].title == "Video.mkv"
assert "video/x-matroska" in video[0].mime_type
def test_total_count(self):
items = _parse_didl(_SAMPLE_DIDL, "http://srv")
assert len(items) == 3
def test_empty_didl(self):
empty = '<DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/"></DIDL-Lite>'
assert _parse_didl(empty, "http://srv") == []
def test_malformed_xml(self):
assert _parse_didl("<<<not xml>>>", "http://srv") == []
def test_relative_url_resolution(self):
didl = """\
<DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/">
<item id="300" parentID="1">
<dc:title>Relative</dc:title>
<upnp:class>object.item.audioItem</upnp:class>
<res protocolInfo="http-get:*:audio/flac:*">media/track.flac</res>
</item>
</DIDL-Lite>
"""
items = _parse_didl(didl, "http://myserver:8200")
assert items[0].resource_url == "http://myserver:8200/media/track.flac"
class TestExtractBrowseResult:
def test_extracts_unnamespaced_result(self):
soap = """\
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
<s:Body>
<u:BrowseResponse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1">
<Result>&lt;DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/"&gt;&lt;container id="1" parentID="0" /&gt;&lt;/DIDL-Lite&gt;</Result>
</u:BrowseResponse>
</s:Body>
</s:Envelope>
"""
result = _extract_browse_result(soap)
assert result is not None
assert "DIDL-Lite" in result
assert 'container id="1"' in result
def test_extracts_namespaced_result(self):
soap = """\
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1">
<s:Body>
<u:BrowseResponse>
<u:Result>&lt;DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/"&gt;&lt;/DIDL-Lite&gt;</u:Result>
</u:BrowseResponse>
</s:Body>
</s:Envelope>
"""
assert _extract_browse_result(soap) is not None
def test_extracts_embedded_xml_result(self):
soap = """\
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
<s:Body>
<BrowseResponse>
<Result>
<DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/">
<container id="1" parentID="0" />
</DIDL-Lite>
</Result>
</BrowseResponse>
</s:Body>
</s:Envelope>
"""
result = _extract_browse_result(soap)
assert result is not None
assert "DIDL-Lite" in result
def test_returns_none_when_missing(self):
soap = "<Envelope><Body><BrowseResponse /></Body></Envelope>"
assert _extract_browse_result(soap) is None

79
tests/test_controls.py

@ -0,0 +1,79 @@
"""Tests for platform/controls input mapping."""
import ctypes
from unittest.mock import MagicMock
import sdl2
from r36s_dlna_browser.platform.controls import Action, map_key
def _make_key_event(sym: int) -> sdl2.SDL_Event:
event = sdl2.SDL_Event()
event.type = sdl2.SDL_KEYDOWN
event.key.keysym.sym = sym
return event
def _make_button_event(button: int) -> sdl2.SDL_Event:
event = sdl2.SDL_Event()
event.type = sdl2.SDL_CONTROLLERBUTTONDOWN
event.cbutton.button = button
return event
class TestKeyMapping:
def test_up(self):
assert map_key(_make_key_event(sdl2.SDLK_UP)) == Action.UP
def test_down(self):
assert map_key(_make_key_event(sdl2.SDLK_DOWN)) == Action.DOWN
def test_confirm_return(self):
assert map_key(_make_key_event(sdl2.SDLK_RETURN)) == Action.CONFIRM
def test_confirm_space(self):
assert map_key(_make_key_event(sdl2.SDLK_SPACE)) == Action.CONFIRM
def test_back_escape(self):
assert map_key(_make_key_event(sdl2.SDLK_ESCAPE)) == Action.BACK
def test_quit(self):
assert map_key(_make_key_event(sdl2.SDLK_q)) == Action.QUIT
def test_hud_mode(self):
assert map_key(_make_key_event(sdl2.SDLK_h)) == Action.HUD_MODE
def test_page_up(self):
assert map_key(_make_key_event(sdl2.SDLK_PAGEUP)) == Action.PAGE_UP
def test_page_down(self):
assert map_key(_make_key_event(sdl2.SDLK_PAGEDOWN)) == Action.PAGE_DOWN
def test_unmapped_key(self):
assert map_key(_make_key_event(sdl2.SDLK_z)) is None
def test_wrong_event_type(self):
event = sdl2.SDL_Event()
event.type = sdl2.SDL_MOUSEMOTION
assert map_key(event) is None
class TestButtonMapping:
def test_dpad_up(self):
assert map_key(_make_button_event(sdl2.SDL_CONTROLLER_BUTTON_DPAD_UP)) == Action.UP
def test_button_a(self):
assert map_key(_make_button_event(sdl2.SDL_CONTROLLER_BUTTON_A)) == Action.CONFIRM
def test_button_b(self):
assert map_key(_make_button_event(sdl2.SDL_CONTROLLER_BUTTON_B)) == Action.BACK
def test_start(self):
assert map_key(_make_button_event(sdl2.SDL_CONTROLLER_BUTTON_START)) == Action.QUIT
def test_left_shoulder(self):
assert map_key(_make_button_event(sdl2.SDL_CONTROLLER_BUTTON_LEFTSHOULDER)) == Action.PAGE_UP
def test_button_y(self):
assert map_key(_make_button_event(sdl2.SDL_CONTROLLER_BUTTON_Y)) == Action.HUD_MODE

120
tests/test_didl_mapping.py

@ -0,0 +1,120 @@
"""Tests for DIDL-Lite parsing and domain model mapping."""
from r36s_dlna_browser.dlna.models import (
ItemType,
MediaItem,
classify_upnp_class,
parse_didl_item,
)
class TestClassifyUpnpClass:
def test_container(self):
assert classify_upnp_class("object.container.storageFolder") == ItemType.CONTAINER
def test_audio(self):
assert classify_upnp_class("object.item.audioItem.musicTrack") == ItemType.AUDIO
def test_video(self):
assert classify_upnp_class("object.item.videoItem") == ItemType.VIDEO
def test_image(self):
assert classify_upnp_class("object.item.imageItem.photo") == ItemType.IMAGE
def test_unknown(self):
assert classify_upnp_class("object.item.something") == ItemType.UNKNOWN
def test_empty(self):
assert classify_upnp_class("") == ItemType.UNKNOWN
class TestParseDIDLItem:
def test_audio_item(self):
didl = {
"id": "42",
"title": "My Song",
"parent_id": "10",
"upnp_class": "object.item.audioItem.musicTrack",
"resources": [
{
"url": "http://server/song.mp3",
"protocol_info": "http-get:*:audio/mpeg:*",
"size": "5000000",
"duration": "0:03:45",
}
],
"album_art_uri": "http://server/art.jpg",
}
item = parse_didl_item(didl)
assert item.object_id == "42"
assert item.title == "My Song"
assert item.item_type == ItemType.AUDIO
assert item.resource_url == "http://server/song.mp3"
assert item.mime_type == "http-get:*:audio/mpeg:*"
assert item.size == 5000000
assert item.duration == "0:03:45"
assert item.album_art_url == "http://server/art.jpg"
assert not item.is_container
def test_container_item(self):
didl = {
"id": "5",
"title": "Music",
"parent_id": "0",
"upnp_class": "object.container.storageFolder",
"child_count": "12",
}
item = parse_didl_item(didl)
assert item.object_id == "5"
assert item.title == "Music"
assert item.item_type == ItemType.CONTAINER
assert item.child_count == 12
assert item.is_container
def test_missing_resources(self):
didl = {"id": "99", "title": "NoRes", "upnp_class": "object.item.audioItem"}
item = parse_didl_item(didl)
assert item.resource_url == ""
assert item.size == 0
def test_single_resource_dict(self):
didl = {
"id": "1",
"title": "Track",
"upnp_class": "object.item.audioItem",
"resources": {"url": "http://srv/track.flac", "size": "100"},
}
item = parse_didl_item(didl)
assert item.resource_url == "http://srv/track.flac"
assert item.size == 100
def test_resource_string(self):
didl = {
"id": "2",
"title": "Track2",
"upnp_class": "object.item.videoItem",
"resources": ["http://srv/vid.mp4"],
}
item = parse_didl_item(didl)
assert item.resource_url == "http://srv/vid.mp4"
def test_alt_key_names(self):
"""Handles alternative key formats (@id, @parentID, class, etc.)."""
didl = {
"@id": "7",
"title": "Alt",
"@parentID": "3",
"class": "object.container",
"@childCount": "5",
}
item = parse_didl_item(didl)
assert item.object_id == "7"
assert item.parent_id == "3"
assert item.item_type == ItemType.CONTAINER
assert item.child_count == 5
def test_empty_dict(self):
item = parse_didl_item({})
assert item.object_id == ""
assert item.title == ""
assert item.item_type == ItemType.UNKNOWN

134
tests/test_navigation_state.py

@ -0,0 +1,134 @@
"""Tests for BrowserState navigation stack and pagination."""
from r36s_dlna_browser.dlna.browser_state import BrowserState
from r36s_dlna_browser.dlna.models import MediaServer, MediaItem, ItemType
def _server(name: str = "TestServer") -> MediaServer:
return MediaServer(friendly_name=name, location=f"http://{name}:8080/desc.xml")
def _items(n: int, container: bool = False) -> list[MediaItem]:
t = ItemType.CONTAINER if container else ItemType.AUDIO
return [
MediaItem(
object_id=str(i),
title=f"Item {i}",
item_type=t,
resource_url="" if container else f"http://srv/file{i}.mp3",
)
for i in range(n)
]
class TestServerList:
def test_empty_initial(self):
s = BrowserState()
assert s.servers == []
assert s.selected_server is None
def test_set_servers(self):
s = BrowserState()
servers = [_server("A"), _server("B")]
s.set_servers(servers)
assert len(s.servers) == 2
assert s.server_cursor == 0
assert s.selected_server.friendly_name == "A"
def test_cursor_bounds(self):
s = BrowserState()
s.set_servers([_server("A"), _server("B"), _server("C")])
s.server_cursor = 2
assert s.selected_server.friendly_name == "C"
s.server_cursor = 5 # out of bounds
assert s.selected_server is None
class TestBrowseStack:
def test_not_in_browse_initially(self):
s = BrowserState()
assert not s.in_browse_mode
def test_enter_server(self):
s = BrowserState()
s.set_servers([_server()])
obj_id = s.enter_server(s.selected_server)
assert obj_id == "0"
assert s.in_browse_mode
assert s.current_level.object_id == "0"
def test_set_items_and_cursor(self):
s = BrowserState()
s.set_servers([_server()])
s.enter_server(s.selected_server)
items = _items(10)
s.set_items(items, "0")
assert len(s.current_items) == 10
assert s.cursor == 0
s.cursor = 3
assert s.cursor == 3
assert s.selected_item().title == "Item 3"
def test_cursor_clamps(self):
s = BrowserState()
s.set_servers([_server()])
s.enter_server(s.selected_server)
s.set_items(_items(5), "0")
s.cursor = 100
assert s.cursor == 4 # clamped to last
s.cursor = -5
assert s.cursor == 0
def test_push_and_pop(self):
s = BrowserState()
s.set_servers([_server()])
s.enter_server(s.selected_server)
s.set_items(_items(3, container=True), "0")
# Drill into child
s.set_items(_items(5), "1")
assert len(s.current_items) == 5
# Go back
assert s.go_back()
assert len(s.current_items) == 3
assert s.current_level.object_id == "0"
# Go back again exits browse
assert s.go_back()
assert not s.in_browse_mode
def test_go_back_at_root(self):
s = BrowserState()
assert not s.go_back()
def test_reset(self):
s = BrowserState()
s.set_servers([_server()])
s.enter_server(s.selected_server)
s.set_items(_items(5), "0")
s.error = "fail"
s.playback_title = "song"
s.playback_paused = True
s.reset()
assert not s.in_browse_mode
assert s.error == ""
assert s.playback_title == ""
assert not s.playback_paused
class TestScrollOffset:
def test_scroll_offset_default(self):
s = BrowserState()
s.set_servers([_server()])
s.enter_server(s.selected_server)
s.set_items(_items(3), "0")
assert s.scroll_offset == 0
def test_scroll_offset_set(self):
s = BrowserState()
s.set_servers([_server()])
s.enter_server(s.selected_server)
s.set_items(_items(30), "0")
s.scroll_offset = 10
assert s.scroll_offset == 10

357
tests/test_player.py

@ -0,0 +1,357 @@
"""Tests for the SDL-rendered GStreamer backend without using a real pipeline."""
import ctypes
from types import SimpleNamespace
import sdl2
from r36s_dlna_browser.player.gstreamer_backend import GStreamerBackend
class FakeMessageType:
ERROR = 1
EOS = 2
BUFFERING = 4
STATE_CHANGED = 8
class FakeFlowReturn:
OK = 0
class FakeState:
NULL = 0
READY = 1
PAUSED = 2
PLAYING = 3
class FakeStateChangeReturn:
FAILURE = -1
SUCCESS = 0
class FakeFormat:
TIME = 1
class FakeSeekFlags:
FLUSH = 1
KEY_UNIT = 2
class FakeCaps:
def __init__(self, width, height):
self._width = width
self._height = height
def get_size(self):
return 1
def get_structure(self, _index):
return FakeStructure(self._width, self._height)
class FakeStructure:
def __init__(self, width, height):
self._width = width
self._height = height
def has_field(self, name):
return name in {"width", "height"}
def get_value(self, name):
return self._width if name == "width" else self._height
class FakeBuffer:
def __init__(self, payload: bytes):
self._payload = payload
def get_size(self):
return len(self._payload)
def extract_dup(self, _offset, _size):
return self._payload
class FakeSample:
def __init__(self, width=1280, height=720, payload: bytes | None = None):
self._caps = FakeCaps(width, height)
self._buffer = FakeBuffer(payload or (b"\x00" * (width * height * 4)))
def get_caps(self):
return self._caps
def get_buffer(self):
return self._buffer
class FakeAppSink:
def __init__(self):
self.props = {}
self.connected = []
self.sample = None
def set_property(self, name, value):
self.props[name] = value
def connect(self, signal_name, callback):
self.connected.append((signal_name, callback))
def emit(self, signal_name):
assert signal_name == "pull-sample"
return self.sample
class FakeBus:
def __init__(self):
self.message = None
def timed_pop_filtered(self, *_args):
msg, self.message = self.message, None
return msg
class FakePipeline:
def __init__(self):
self.props = {}
self.state = FakeState.NULL
self.uri = None
self.position = 12 * 1_000_000_000
self.duration = 93 * 1_000_000_000
self.bus = FakeBus()
self.seek_calls = []
def set_state(self, state):
self.state = state
return FakeStateChangeReturn.SUCCESS
def set_property(self, name, value):
self.props[name] = value
if name == "uri":
self.uri = value
def get_property(self, name):
if name == "volume":
return self.props.get("volume", 1.0)
return self.props.get(name)
def get_bus(self):
return self.bus
def query_position(self, _format):
return True, self.position
def query_duration(self, _format):
return True, self.duration
def seek_simple(self, _format, flags, target):
self.seek_calls.append((flags, target))
self.position = target
return True
class FakeMessage:
def __init__(self, msg_type, src=None, buffering=0, state=None, error_text="boom", structure_name=None):
self.type = msg_type
self.src = src
self._buffering = buffering
self._state = state or (FakeState.NULL, FakeState.PLAYING, FakeState.NULL)
self._error_text = error_text
self._structure_name = structure_name
def parse_buffering(self):
return self._buffering
def parse_state_changed(self):
return self._state
def parse_error(self):
return SimpleNamespace(message=self._error_text), None
def get_structure(self):
if not self._structure_name:
return None
return SimpleNamespace(get_name=lambda: self._structure_name)
class FakeGst:
State = FakeState
StateChangeReturn = FakeStateChangeReturn
Format = FakeFormat
SeekFlags = FakeSeekFlags
MessageType = FakeMessageType
FlowReturn = FakeFlowReturn
SECOND = 1_000_000_000
MSECOND = 1_000_000
Caps = SimpleNamespace(from_string=lambda value: value)
class FakeVideoInfoValue:
def __init__(self, width, height, stride):
self.width = width
self.height = height
self.stride = [stride]
class FakeVideoInfo:
@staticmethod
def new_from_caps(caps):
structure = caps.get_structure(0)
width = structure.get_value("width")
height = structure.get_value("height")
return FakeVideoInfoValue(width, height, width * 4)
class FakeGstVideo:
VideoInfo = FakeVideoInfo
class TestGStreamerBackend:
def _make_backend(self):
pipeline = FakePipeline()
sink = FakeAppSink()
backend = GStreamerBackend(
gst_module=FakeGst,
gst_video_module=FakeGstVideo,
appsink_factory=lambda: sink,
playbin_factory=lambda: pipeline,
subsystem="x11",
)
backend._start_bus_thread = lambda: None
backend._stop_bus_thread = lambda: None
return backend, pipeline, sink
def test_not_playing_initially(self):
backend, _pipeline, _sink = self._make_backend()
assert not backend.is_playing()
def test_play_sets_uri_and_configures_appsink(self):
backend, pipeline, sink = self._make_backend()
backend.play("http://example.com/video.mp4")
assert backend.is_playing()
assert pipeline.uri == "http://example.com/video.mp4"
assert pipeline.props["video-sink"] is sink
assert sink.props["emit-signals"] is True
assert sink.props["caps"] == "video/x-raw,format=BGRA"
def test_new_sample_marks_frame_dirty_and_updates_resolution(self):
events = []
backend, _pipeline, sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
sink.sample = FakeSample(width=640, height=360)
assert backend._on_new_sample(sink) == FakeFlowReturn.OK
assert backend.has_new_frame() is True
assert ("resolution", "640x360") in events
def test_toggle_pause_switches_pipeline_state(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
assert backend.toggle_pause() is True
assert pipeline.state == FakeState.PAUSED
assert backend.toggle_pause() is False
assert pipeline.state == FakeState.PLAYING
def test_seek_uses_relative_position(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
backend.seek(10)
assert pipeline.seek_calls == [(FakeSeekFlags.FLUSH | FakeSeekFlags.KEY_UNIT, 22 * 1_000_000_000)]
def test_change_volume_clamps_to_supported_range(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
assert backend.change_volume(15) == 115
assert pipeline.props["volume"] == 1.15
assert backend.change_volume(50) == 130
assert pipeline.props["volume"] == 1.3
def test_eos_notifies_stopped(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._handle_bus_message(FakeMessage(FakeMessageType.EOS, src=pipeline))
assert ("stopped",) in events
assert not backend.is_playing()
assert pipeline.state == FakeState.NULL
def test_buffering_and_metrics_feed_hud(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._resolution = "1280x720"
backend._handle_bus_message(FakeMessage(FakeMessageType.BUFFERING, buffering=76))
backend._emit_playback_metrics()
assert ("buffering", 76) in events
assert ("position", 12.0) in events
assert ("duration", 93.0) in events
assert ("volume", 100) in events
assert ("resolution", "1280x720") in events
def test_state_changed_updates_pause_event(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._handle_bus_message(
FakeMessage(
FakeMessageType.STATE_CHANGED,
src=pipeline,
state=(FakeState.PLAYING, FakeState.PAUSED, FakeState.NULL),
)
)
assert ("paused", True) in events
def test_error_message_propagates(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._handle_bus_message(FakeMessage(FakeMessageType.ERROR, src=pipeline, error_text="decoder fail"))
assert ("error", "decoder fail") in events
assert not backend.is_playing()
def test_stop_sets_pipeline_to_null(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
backend.stop()
assert pipeline.state == FakeState.NULL
assert not backend.is_playing()
def test_render_uploads_latest_frame_and_clears_dirty_flag(self, monkeypatch):
backend, _pipeline, _sink = self._make_backend()
backend._latest_frame = SimpleNamespace(width=320, height=180, pitch=1280, pixels=b"\x00" * (320 * 180 * 4))
backend._frame_dirty = True
calls = []
monkeypatch.setattr(sdl2, "SDL_CreateTexture", lambda *_args: object())
monkeypatch.setattr(sdl2, "SDL_UpdateTexture", lambda *_args: 0)
monkeypatch.setattr(sdl2, "SDL_RenderCopy", lambda _renderer, _texture, _src, dst: calls.append((dst.x, dst.y, dst.w, dst.h)))
backend.set_viewport(640, 480, 48, 80, 12, 12)
assert backend.render(object()) is True
assert backend.has_new_frame() is False
assert calls == [(12, 51, 616, 346)]

40
tests/test_runtime.py

@ -0,0 +1,40 @@
"""Tests for runtime environment setup helpers."""
import os
from r36s_dlna_browser.platform import runtime
def test_wayland_does_not_override_video_driver(monkeypatch):
monkeypatch.setenv("WAYLAND_DISPLAY", "wayland-0")
monkeypatch.setenv("DISPLAY", ":0")
monkeypatch.delenv("SDL_VIDEODRIVER", raising=False)
monkeypatch.setattr(runtime, "is_r36s", lambda: False)
runtime.sdl_env_setup()
assert "SDL_VIDEODRIVER" not in os.environ
def test_respects_existing_sdl_videodriver(monkeypatch):
monkeypatch.setenv("WAYLAND_DISPLAY", "wayland-0")
monkeypatch.setenv("DISPLAY", ":0")
monkeypatch.setenv("SDL_VIDEODRIVER", "wayland")
monkeypatch.setattr(runtime, "is_r36s", lambda: False)
runtime.sdl_env_setup()
assert os.environ["SDL_VIDEODRIVER"] == "wayland"
def test_r36s_sets_alsa_audio_driver(monkeypatch):
monkeypatch.delenv("SDL_AUDIODRIVER", raising=False)
monkeypatch.delenv("SDL_VIDEODRIVER", raising=False)
monkeypatch.delenv("DISPLAY", raising=False)
monkeypatch.delenv("WAYLAND_DISPLAY", raising=False)
monkeypatch.setattr(runtime, "is_r36s", lambda: True)
runtime.sdl_env_setup()
assert os.environ["SDL_AUDIODRIVER"] == "alsa"
assert os.environ["SDL_VIDEODRIVER"] == "kmsdrm"

122
tests/test_sdl_app.py

@ -0,0 +1,122 @@
from __future__ import annotations
from queue import Queue
from r36s_dlna_browser.dlna.browser_state import BrowserState
from r36s_dlna_browser.platform.controls import Action
from r36s_dlna_browser.ui import theme
from r36s_dlna_browser.ui import sdl_app as sdl_app_module
from r36s_dlna_browser.ui.sdl_app import SDLApp, Screen
class DummyPlayer:
def __init__(self) -> None:
self._new_frame = False
def attach_window(self, _window) -> None:
pass
def set_viewport(self, *_args) -> None:
pass
def has_new_frame(self) -> bool:
return self._new_frame
def render(self, _renderer) -> bool:
self._new_frame = False
return True
def _make_app() -> SDLApp:
return SDLApp(Queue(), Queue(), BrowserState(), DummyPlayer())
def test_non_playback_draws_only_when_dirty() -> None:
app = _make_app()
app._screen = Screen.SERVERS
app._needs_redraw = False
assert app._should_draw() is False
app._mark_dirty()
assert app._should_draw() is True
def test_playback_snapshot_change_waits_for_refresh_interval(monkeypatch) -> None:
app = _make_app()
app._screen = Screen.PLAYBACK
app._needs_redraw = False
app._last_playback_snapshot = app._playback_snapshot()
app._last_playback_draw_at = 10.0
app._state.playback_position = 1.0
monkeypatch.setattr(sdl_app_module.time, "monotonic", lambda: 10.1)
assert app._should_draw() is False
def test_playback_snapshot_change_redraws_after_refresh_interval(monkeypatch) -> None:
app = _make_app()
app._screen = Screen.PLAYBACK
app._needs_redraw = False
app._last_playback_snapshot = app._playback_snapshot()
app._last_playback_draw_at = 10.0
app._state.playback_position = 1.0
monkeypatch.setattr(sdl_app_module.time, "monotonic", lambda: 10.3)
assert app._should_draw() is True
def test_playback_snapshot_without_changes_does_not_redraw(monkeypatch) -> None:
app = _make_app()
app._screen = Screen.PLAYBACK
app._needs_redraw = False
app._last_playback_snapshot = app._playback_snapshot()
app._last_playback_draw_at = 10.0
monkeypatch.setattr(sdl_app_module.time, "monotonic", lambda: 10.5)
assert app._should_draw() is False
def test_playback_draws_immediately_for_new_video_frame() -> None:
app = _make_app()
app._screen = Screen.PLAYBACK
app._needs_redraw = False
app._player._new_frame = True
assert app._should_draw() is True
def test_hud_mode_cycles_auto_fixed_hidden(monkeypatch) -> None:
app = _make_app()
app._screen = Screen.PLAYBACK
monkeypatch.setattr(sdl_app_module.time, "monotonic", lambda: 12.0)
app._handle_action(Action.HUD_MODE)
assert app._state.playback_hud_mode == theme.PLAYBACK_HUD_PINNED
assert app._state.playback_hud_visible is True
app._handle_action(Action.HUD_MODE)
assert app._state.playback_hud_mode == theme.PLAYBACK_HUD_HIDDEN
assert app._state.playback_hud_visible is False
app._handle_action(Action.HUD_MODE)
assert app._state.playback_hud_mode == theme.PLAYBACK_HUD_AUTO
assert app._state.playback_hud_visible is True
def test_paused_playback_keeps_hud_visible_in_auto_mode(monkeypatch) -> None:
app = _make_app()
app._screen = Screen.PLAYBACK
app._state.playback_hud_mode = theme.PLAYBACK_HUD_AUTO
app._state.playback_hud_visible = False
app._state.playback_paused = True
monkeypatch.setattr(sdl_app_module.time, "monotonic", lambda: 99.0)
app._refresh_playback_hud_visibility()
assert app._state.playback_hud_visible is True
Loading…
Cancel
Save