#!/usr/bin/env python3 """ Video playback diagnostic for R36S / ArkOS. Tests GStreamer availability, codec coverage, and a short live-playback loop using the same pipeline the app uses (playbin → NV12 appsink with videoscale GstBin → SDL2 NV12 texture upload → rendered frame). Section 8 is the key end-to-end timing test: it runs a real SDL window with KMSDRM (or whatever SDL picks), decodes via the same GstBin the app uses, and measures memmove + SDL_UpdateNVTexture + RenderCopy separately so desync and frame-drop root causes are visible. Run directly on the device: export HOME=/home/ark PYTHONPATH=/home/ark/R36SHack/src \\ LD_LIBRARY_PATH=/home/ark/miniconda3/envs/r36s-dlna-browser/lib \\ GST_PLUGIN_PATH=/usr/lib/aarch64-linux-gnu/gstreamer-1.0 \\ LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1 /home/ark/miniconda3/envs/r36s-dlna-browser/bin/python \\ /home/ark/R36SHack/tests/test_video_playback_device.py [URL] Pass --nosection8 to skip the SDL rendering loop (useful when running headless). """ from __future__ import annotations import sys import time import textwrap # ── pretty output helpers ─────────────────────────────────────────────────── def _ok(msg: str) -> None: print(f" [OK] {msg}") def _warn(msg: str) -> None: print(f" [WRN] {msg}") def _fail(msg: str) -> None: print(f" [ERR] {msg}") def _section(title: str) -> None: print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") # ── 1. Python env ─────────────────────────────────────────────────────────── _section("1. Python environment") import platform print(f" Python {sys.version}") print(f" Platform: {platform.machine()} / {platform.system()}") # ── 2. GI / GStreamer core ────────────────────────────────────────────────── _section("2. GStreamer core") try: import gi _ok(f"PyGObject (gi) {gi.__version__}") except ImportError as exc: _fail(f"PyGObject not found: {exc}") sys.exit(1) try: gi.require_version("Gst", "1.0") from gi.repository import Gst Gst.init(None) v = Gst.version() _ok(f"GStreamer {v.major}.{v.minor}.{v.micro}.{v.nano}") except Exception as exc: _fail(f"GStreamer init failed: {exc}") sys.exit(1) try: gi.require_version("GstApp", "1.0") gi.require_version("GstVideo", "1.0") from gi.repository import GstApp, GstVideo _ok("GstApp + GstVideo bindings present") except Exception as exc: _fail(f"GstApp/GstVideo bindings missing: {exc}") sys.exit(1) # ── 3. Plugin registry ────────────────────────────────────────────────────── _section("3. Plugin registry") REQUIRED_ELEMENTS = { "playbin": "core orchestrator", "appsink": "app-side frame sink", "videoconvert": "pixel format conversion", "videoscale": "frame scaling", "typefind": "format detection", "decodebin": "auto demux/decode", "uridecodebin": "URI decode helper", } CODEC_ELEMENTS = { # demuxers "matroskademux": "MKV/WebM demuxer", "qtdemux": "MP4/MOV demuxer", "flvdemux": "FLV demuxer", "tsdemux": "MPEG-TS demuxer", # video decoders "avdec_h264": "H.264 software decoder (libav)", "avdec_hevc": "H.265/HEVC software decoder (libav)", "avdec_mpeg2video": "MPEG-2 video decoder", "avdec_vp8": "VP8 decoder", "avdec_vp9": "VP9 decoder", "v4l2h264dec": "H.264 V4L2 HW decoder", "v4l2h265dec": "H.265 V4L2 HW decoder", # audio decoders "avdec_aac": "AAC decoder (libav)", "avdec_mp3": "MP3 decoder (libav)", "avdec_ac3": "AC-3 decoder (libav)", "vorbisdec": "Vorbis decoder", "opusdec": "Opus decoder", # audio output "autoaudiosink": "Auto audio sink", "alsasink": "ALSA sink", "pulsesink": "PulseAudio sink", } missing_required = [] registry = Gst.Registry.get() for element, desc in REQUIRED_ELEMENTS.items(): feat = registry.find_feature(element, Gst.ElementFactory.__gtype__) if feat: _ok(f"{element:20s} ({desc})") else: missing_required.append(element) _fail(f"{element:20s} ({desc}) ← MISSING") print() missing_codecs = [] present_codecs = [] for element, desc in CODEC_ELEMENTS.items(): feat = registry.find_feature(element, Gst.ElementFactory.__gtype__) if feat: present_codecs.append(element) _ok(f"{element:20s} ({desc})") else: missing_codecs.append(element) _warn(f"{element:20s} ({desc}) ← not found") if missing_required: print(f"\n CRITICAL: {len(missing_required)} required element(s) missing: {missing_required}") print(f"\n Codecs present: {len(present_codecs)} / {len(CODEC_ELEMENTS)}") # ── 4. Caps negotiation: can we build a BGRA appsink pipeline? ────────────── _section("4. BGRA appsink pipeline negotiation") TEST_PIPE = ( "videotestsrc num-buffers=5 ! " "videoconvert ! " "video/x-raw,format=BGRA,width=64,height=64 ! " "appsink name=sink emit-signals=true max-buffers=1 drop=true" ) frames_received = 0 negotiation_ok = False try: pipe = Gst.parse_launch(TEST_PIPE) sink = pipe.get_by_name("sink") def _on_sample(sink, *_): global frames_received sample = sink.emit("pull-sample") if sample: frames_received += 1 return Gst.FlowReturn.OK sink.connect("new-sample", _on_sample) pipe.set_state(Gst.State.PLAYING) time.sleep(1.5) pipe.set_state(Gst.State.NULL) if frames_received > 0: negotiation_ok = True _ok(f"BGRA appsink pipeline: received {frames_received} frame(s)") else: _fail("BGRA appsink pipeline ran but produced 0 frames") except Exception as exc: _fail(f"Could not build BGRA appsink pipeline: {exc}") # ── 5. Audio output probe ─────────────────────────────────────────────────── _section("5. Audio output probe") AUDIO_PIPE = "audiotestsrc num-buffers=10 freq=440 ! autoaudiosink" audio_ok = False try: p = Gst.parse_launch(AUDIO_PIPE) p.set_state(Gst.State.PLAYING) time.sleep(0.8) p.set_state(Gst.State.NULL) audio_ok = True _ok("autoaudiosink: played 440 Hz tone without error") except Exception as exc: _fail(f"autoaudiosink failed: {exc}") if not audio_ok: ALSA_PIPE = "audiotestsrc num-buffers=10 freq=440 ! alsasink device=hw:0" try: p = Gst.parse_launch(ALSA_PIPE) p.set_state(Gst.State.PLAYING) time.sleep(0.8) p.set_state(Gst.State.NULL) audio_ok = True _ok("alsasink hw:0: played 440 Hz tone") except Exception as exc: _warn(f"alsasink also failed: {exc}") # ── 6. Live URL playback (optional) ──────────────────────────────────────── test_url = sys.argv[1] if len(sys.argv) > 1 else None if not test_url: try: with open("/tmp/dlna_last_url.txt") as _f: test_url = _f.read().strip() or None if test_url: print(f"\n [auto] Loaded last DLNA URL from /tmp/dlna_last_url.txt") except FileNotFoundError: pass _section("6. Live URL / file playback") if not test_url: _warn("No URL provided — skipping live playback test.") print(" Pass a media URL or path as the first argument, or start the app and") print(" play something; the URL will be saved to /tmp/dlna_last_url.txt.") else: print(f" URL: {test_url}") live_frames = 0 live_error = None try: # Build the pipeline element-by-element so we can hold a direct # reference to the appsink (parse_launch embeds it in a bin and # get_by_name returns None when the bin wraps a nested pipeline string). pipe = Gst.ElementFactory.make("playbin", "live_player") vsink = Gst.ElementFactory.make("appsink", "vsink") if pipe is None or vsink is None: raise RuntimeError("playbin or appsink not available") vsink.set_property("emit-signals", True) vsink.set_property("max-buffers", 2) vsink.set_property("drop", True) vsink.set_property("caps", Gst.Caps.from_string("video/x-raw,format=BGRA")) pipe.set_property("video-sink", vsink) pipe.set_property("uri", test_url if "://" in test_url else Gst.filename_to_uri(test_url)) def _on_live_sample(sink, *_): global live_frames sample = sink.emit("pull-sample") if sample: buf = sample.get_buffer() ok_m, map_info = buf.map(Gst.MapFlags.READ) if ok_m and map_info.size > 0: live_frames += 1 buf.unmap(map_info) return Gst.FlowReturn.OK vsink.connect("new-sample", _on_live_sample) bus = pipe.get_bus() pipe.set_state(Gst.State.PLAYING) deadline = time.monotonic() + 15 while time.monotonic() < deadline: msg = bus.timed_pop_filtered( 200 * Gst.MSECOND, Gst.MessageType.ERROR | Gst.MessageType.WARNING | Gst.MessageType.EOS, ) if msg: if msg.type == Gst.MessageType.ERROR: err, debug = msg.parse_error() live_error = f"{err.message} | debug: {debug}" break if msg.type == Gst.MessageType.WARNING: w, d = msg.parse_warning() _warn(f"GStreamer warning: {w.message}") if msg.type == Gst.MessageType.EOS: break if live_frames >= 10: break pipe.set_state(Gst.State.NULL) if live_error: _fail(f"Playback error: {live_error}") elif live_frames == 0: _fail("Playback ran but decoded 0 video frames (audio-only or decode failure)") else: _ok(f"Decoded {live_frames} video frame(s) successfully") except Exception as exc: _fail(f"Could not start live playback pipeline: {exc}") # ── 7. Summary ───────────────────────────────────────────────────────────── _section("7. Summary") issues = [] if missing_required: issues.append(f"Missing required elements: {', '.join(missing_required)}") if not negotiation_ok: issues.append("BGRA appsink pipeline negotiation failed") if not audio_ok: issues.append("No working audio sink found") key_missing = [e for e in ("avdec_h264", "avdec_aac", "matroskademux", "qtdemux") if e in missing_codecs] if key_missing: issues.append(f"Key codecs missing (install gst-libav): {', '.join(key_missing)}") if not issues: print(" All checks passed — video playback should work.") else: print(" Issues found:") for issue in issues: print(f" • {issue}") print() print(" Suggested fix:") print(textwrap.dedent("""\ /home/ark/miniconda3/bin/conda install -n r36s-dlna-browser \\ -c conda-forge gst-libav gst-plugins-good gst-plugins-bad gst-plugins-ugly """)) print() # ── 8. End-to-end SDL rendering benchmark ───────────────────────────────── # # This section replicates what the app does frame-by-frame: # 1. GStreamer appsink (same videoscale GstBin as the app) — width-only NV12 # capsfilter so GStreamer preserves the source DAR when choosing height. # 2. Python memmoves the mapped buffer into a ctypes array ← timed # 3. SDL_UpdateNVTexture uploads Y + UV planes into a lazily-created # texture whose dimensions match the actual decoded frame. ← timed # 4. SDL_RenderCopy blits the texture to the window ← timed # # Desync and drops will be visible here because we do real SDL rendering. # Pass --nosection8 to skip if running headless. SKIP_SDL = "--nosection8" in sys.argv _section("8. End-to-end SDL render loop (real device output)") if SKIP_SDL: _warn("Skipped (--nosection8 flag)") elif not test_url: _warn("Skipped — no URL. Provide a URL as the first argument.") else: SDL8_SECONDS = 20 # how long to run SDL8_SCALE_W = 640 # target width; height computed as 16:9 box try: import ctypes import threading import statistics from dataclasses import dataclass, field as dc_field import sdl2 import sdl2.ext # ── SDL init ──────────────────────────────────────────────────────── # Prefer KMSDRM on the device; SDL will fall back automatically. sdl2.SDL_SetHint(b"SDL_VIDEODRIVER", b"kmsdrm,offscreen") sdl2.SDL_SetHint(b"SDL_AUDIODRIVER", b"alsa,dummy") if sdl2.SDL_Init(sdl2.SDL_INIT_VIDEO | sdl2.SDL_INIT_AUDIO) != 0: _fail(f"SDL_Init failed: {sdl2.SDL_GetError().decode()}") raise RuntimeError("SDL_Init") window = sdl2.SDL_CreateWindow( b"R36S playback test", sdl2.SDL_WINDOWPOS_UNDEFINED, sdl2.SDL_WINDOWPOS_UNDEFINED, SDL8_SCALE_W, SDL8_SCALE_W, # square hint; KMSDRM uses native res anyway sdl2.SDL_WINDOW_FULLSCREEN_DESKTOP | sdl2.SDL_WINDOW_SHOWN, ) if not window: _fail(f"SDL_CreateWindow: {sdl2.SDL_GetError().decode()}") raise RuntimeError("SDL window") renderer = sdl2.SDL_CreateRenderer( window, -1, sdl2.SDL_RENDERER_ACCELERATED | sdl2.SDL_RENDERER_PRESENTVSYNC, ) if not renderer: _warn("HW renderer unavailable — falling back to software renderer") renderer = sdl2.SDL_CreateRenderer(window, -1, sdl2.SDL_RENDERER_SOFTWARE) if not renderer: _fail(f"SDL_CreateRenderer: {sdl2.SDL_GetError().decode()}") raise RuntimeError("SDL renderer") # Retrieve actual window size (KMSDRM may ignore the requested size). w_actual = ctypes.c_int(0) h_actual = ctypes.c_int(0) sdl2.SDL_GetWindowSize(window, ctypes.byref(w_actual), ctypes.byref(h_actual)) print(f" SDL window size: {w_actual.value}×{h_actual.value}") # Texture is created lazily on the first frame so dimensions match # the actual GStreamer output (height is AR-derived, not fixed). texture = None texture_size = (0, 0) # (w, h) of the current texture _ok(f"SDL init OK — window {w_actual.value}×{h_actual.value} (texture: lazy init)") # ── GStreamer pipeline (mirrors _create_appsink) ───────────────────── gi.require_version("GstVideo", "1.0") from gi.repository import GstVideo pipeline8 = Gst.ElementFactory.make("playbin", "p8") appsink8 = Gst.ElementFactory.make("appsink", "vsink8") appsink8.set_property("emit-signals", True) appsink8.set_property("sync", True) appsink8.set_property("max-buffers", 2) appsink8.set_property("drop", True) # Boost mppvideodec rank if /dev/vpu_service is accessible. import os as _os _HW_DEVS = ["/dev/vpu_service", "/dev/mpp_service", "/dev/video10"] _HW_ELEMS = ["mppvideodec", "v4l2h264dec"] _hw_active = False for _dev in _HW_DEVS: try: _fd = _os.open(_dev, _os.O_RDWR | _os.O_NONBLOCK) _os.close(_fd) for _name in _HW_ELEMS: _fac = Gst.ElementFactory.find(_name) if _fac: _fac.set_rank(Gst.Rank.PRIMARY + 1) _hw_active = True print(f" [HW] boosted {_name}") break except OSError: pass # Build videoscale GstBin (nearest-neighbour) → capsfilter → appsink. # Mirrors _create_appsink(): 16:9 target box + add-borders for non-16:9 # sources so all content ARs are handled without distortion. video_sink8 = appsink8 if _hw_active: scale8 = Gst.ElementFactory.make("videoscale", "vs8") cfilt8 = Gst.ElementFactory.make("capsfilter", "cf8") if scale8 and cfilt8: _ar = 16 / 9 _s8_w = SDL8_SCALE_W _s8_h = (int(_s8_w / _ar) // 2) * 2 # e.g. 640 → 360 scale8.set_property("method", 0) scale8.set_property("add-borders", True) cfilt8.set_property("caps", Gst.Caps.from_string( f"video/x-raw,format=NV12,width={_s8_w},height={_s8_h}")) bin8 = Gst.Bin.new("vscale-bin8") bin8.add(scale8); bin8.add(cfilt8); bin8.add(appsink8) scale8.link(cfilt8); cfilt8.link(appsink8) sp = scale8.get_static_pad("sink") gp = Gst.GhostPad.new("sink", sp) gp.set_active(True) bin8.add_pad(gp) video_sink8 = bin8 print(f" [pipeline] videoscale(nearest,add-borders) → {_s8_w}×{_s8_h} NV12 bin active") else: appsink8.set_property("caps", Gst.Caps.from_string( "video/x-raw,format=NV12;video/x-raw,format=BGRA")) else: appsink8.set_property("caps", Gst.Caps.from_string( "video/x-raw,format=BGRA")) pipeline8.set_property("video-sink", video_sink8) pipeline8.set_property("uri", test_url if "://" in test_url else Gst.filename_to_uri(_os.path.abspath(test_url))) # ── Shared frame buffer ───────────────────────────────────────────── @dataclass class FrameState: lock: threading.RLock = dc_field(default_factory=threading.RLock) raw_arr: object = None raw_arr_size: int = 0 width: int = 0 height: int = 0 pitch: int = 0 y_size: int = 0 uv_pitch: int = 0 pixel_format: str = "?" dirty: bool = False # per-frame timing samples (µs) memmove_us: list = dc_field(default_factory=list) upload_us: list = dc_field(default_factory=list) render_us: list = dc_field(default_factory=list) frame_wall: list = dc_field(default_factory=list) # wall time at upload frame_count: int = 0 first_fmt: str = "" fs = FrameState() errors8: list[str] = [] eos8 = threading.Event() # ── GStreamer callback (runs in GStreamer thread) ──────────────────── def _on_sample8(sink): sample = sink.emit("pull-sample") if sample is None: return Gst.FlowReturn.OK buf = sample.get_buffer() caps = sample.get_caps() if buf is None or caps is None: return Gst.FlowReturn.OK info8 = GstVideo.VideoInfo.new_from_caps(caps) if info8 is None: return Gst.FlowReturn.OK fmt = "BGRA" if info8.finfo: try: fmt = info8.finfo.name.upper() except Exception: pass pitch = int(info8.stride[0]) uv_pitch = int(info8.stride[1]) if fmt == "NV12" else 0 h = int(info8.height) w = int(info8.width) y_size = pitch * h t0 = time.monotonic() ok_map, map_info = buf.map(Gst.MapFlags.READ) if not ok_map: return Gst.FlowReturn.OK try: src_size = map_info.size with fs.lock: if fs.raw_arr is None or fs.raw_arr_size < src_size: fs.raw_arr = (ctypes.c_ubyte * src_size)() fs.raw_arr_size = src_size ctypes.memmove(fs.raw_arr, map_info.data, src_size) t_copy = (time.monotonic() - t0) * 1e6 fs.width = w fs.height = h fs.pitch = pitch fs.uv_pitch = uv_pitch fs.y_size = y_size fs.pixel_format = fmt fs.dirty = True fs.frame_count += 1 if not fs.first_fmt: fs.first_fmt = fmt print(f"\n [first frame] fmt={fmt} {w}x{h} " f"stride0={pitch} buf={src_size}") fs.memmove_us.append(t_copy) finally: buf.unmap(map_info) return Gst.FlowReturn.OK appsink8.connect("new-sample", _on_sample8) # ── Bus thread ─────────────────────────────────────────────────────── def _bus8(): bus = pipeline8.get_bus() while not eos8.is_set(): msg = bus.timed_pop_filtered( 200 * Gst.MSECOND, Gst.MessageType.ERROR | Gst.MessageType.EOS, ) if msg is None: continue if msg.type == Gst.MessageType.ERROR: err, dbg = msg.parse_error() errors8.append(f"{err.message} | {dbg}") print(f"\n [bus] ERROR: {err.message}") eos8.set() elif msg.type == Gst.MessageType.EOS: print("\n [bus] EOS") eos8.set() bth8 = threading.Thread(target=_bus8, daemon=True) bth8.start() pipeline8.set_state(Gst.State.PLAYING) print(f" Running SDL render loop for {SDL8_SECONDS}s …") print(" (close window with Escape or Q, or wait for timeout)\n") # ── SDL render loop (runs on main thread) ─────────────────────────── WARMUP = 30 # skip ~1.25 s of frames to let DRM, network and texture init settle deadline8 = time.monotonic() + SDL8_SECONDS frame_n = 0 while time.monotonic() < deadline8 and not eos8.is_set(): # Drain SDL events (allows Escape / Q to quit). ev = sdl2.SDL_Event() while sdl2.SDL_PollEvent(ctypes.byref(ev)): if ev.type == sdl2.SDL_QUIT: eos8.set() elif ev.type == sdl2.SDL_KEYDOWN: sym = ev.key.keysym.sym if sym in (sdl2.SDLK_ESCAPE, sdl2.SDLK_q): eos8.set() # Upload + render if a new frame is ready. with fs.lock: if not fs.dirty or fs.raw_arr is None: pass else: w8 = fs.width; h8 = fs.height pitch8 = fs.pitch uv_pitch8 = fs.uv_pitch y_size8 = fs.y_size fmt8 = fs.pixel_format arr8 = fs.raw_arr fs.dirty = False frame_n += 1 # --- Lazy texture creation / resize --- if texture_size != (w8, h8): if texture: sdl2.SDL_DestroyTexture(texture) texture = sdl2.SDL_CreateTexture( renderer, sdl2.SDL_PIXELFORMAT_NV12, sdl2.SDL_TEXTUREACCESS_STREAMING, w8, h8, ) texture_size = (w8, h8) print(f" [texture] created {w8}×{h8} NV12 (AR={w8/h8:.3f})") # --- SDL_UpdateNVTexture upload --- t_up0 = time.monotonic() if fmt8 == "NV12" and y_size8 > 0 and texture: y_ptr = ctypes.cast(arr8, ctypes.POINTER(ctypes.c_ubyte)) uv_ptr = ctypes.cast( ctypes.byref(arr8, y_size8), ctypes.POINTER(ctypes.c_ubyte), ) sdl2.SDL_UpdateNVTexture( texture, None, y_ptr, pitch8, uv_ptr, uv_pitch8, ) elif texture: # BGRA fallback (SW decode path) pix = ctypes.cast(arr8, ctypes.POINTER(ctypes.c_ubyte)) sdl2.SDL_UpdateTexture(texture, None, pix, pitch8) t_upload = (time.monotonic() - t_up0) * 1e6 # --- SDL_RenderCopy (letterbox into window) --- t_r0 = time.monotonic() sdl2.SDL_RenderClear(renderer) if texture: # Fit frame into window preserving AR (letterbox). win_w, win_h = w_actual.value, h_actual.value scale = min(win_w / w8, win_h / h8) if w8 > 0 and h8 > 0 else 1.0 dw = max(1, int(w8 * scale)) dh = max(1, int(h8 * scale)) dx = (win_w - dw) // 2 dy = (win_h - dh) // 2 dst = sdl2.SDL_Rect(dx, dy, dw, dh) sdl2.SDL_RenderCopy(renderer, texture, None, dst) sdl2.SDL_RenderPresent(renderer) t_render = (time.monotonic() - t_r0) * 1e6 wall_now = time.monotonic() if frame_n > WARMUP: fs.upload_us.append(t_upload) fs.render_us.append(t_render) fs.frame_wall.append(wall_now) time.sleep(0.001) # yield to GStreamer thread pipeline8.set_state(Gst.State.NULL) eos8.set() if texture: sdl2.SDL_DestroyTexture(texture) sdl2.SDL_DestroyRenderer(renderer) sdl2.SDL_DestroyWindow(window) sdl2.SDL_Quit() # ── Section 8 report ──────────────────────────────────────────────── print() print(" --- Section 8 Timing Report ---") print(f" Total GStreamer frames decoded : {fs.frame_count}") print(f" Frames rendered (excl warmup) : {len(fs.upload_us)}") print(f" Pixel format seen : {fs.first_fmt or '?'}") budget = 1_000_000 / 24 # µs per frame @ 24fps nominal def _stat(label, samples_us): if not samples_us: print(f" {label:38s}: no samples") return mn = statistics.mean(samples_us) mx = max(samples_us) pct = mn / budget * 100 # p95: sort and take the 95th-percentile value to filter outlier spikes sorted_s = sorted(samples_us) p95 = sorted_s[int(len(sorted_s) * 0.95)] print(f" {label:38s}: mean {mn:6.0f} µs p95 {p95:6.0f} µs max {mx:6.0f} µs ({pct:.1f}% budget)") _stat("memmove (GStreamer thread)", fs.memmove_us[WARMUP:] if len(fs.memmove_us) > WARMUP else fs.memmove_us) _stat("SDL_UpdateNVTexture (main thread)", fs.upload_us) _stat("SDL_RenderCopy+Present (main thread)", fs.render_us) if len(fs.frame_wall) >= 2: intervals = [fs.frame_wall[i+1] - fs.frame_wall[i] for i in range(len(fs.frame_wall) - 1)] elapsed = fs.frame_wall[-1] - fs.frame_wall[0] fps_act = (len(fs.frame_wall) - 1) / elapsed if elapsed > 0 else 0 dropped = sum(1 for iv in intervals if iv > 0.080) jitter = statistics.stdev(intervals) * 1000 if len(intervals) > 1 else 0 print(f" {'Rendered FPS':38s}: {fps_act:.2f} (jitter {jitter:.1f} ms, dropped {dropped})") if errors8: for e in errors8: _fail(f"GStreamer: {e}") total_mean = ( (statistics.mean(fs.memmove_us[WARMUP:]) if len(fs.memmove_us) > WARMUP else 0) + (statistics.mean(fs.upload_us) if fs.upload_us else 0) + (statistics.mean(fs.render_us) if fs.render_us else 0) ) print(f" {'TOTAL (copy+upload+render)':38s}: {total_mean:.0f} µs ({total_mean/budget*100:.1f}% of 41.7ms budget)") if fps_act < 22: _fail(f"FPS too low ({fps_act:.2f}) — check timing breakdown above for bottleneck") elif dropped > 5: _warn(f"{dropped} dropped frames — pipeline may be too slow under SDL load") else: _ok(f"SDL render loop healthy: {fps_act:.2f} fps, {dropped} dropped") except RuntimeError: pass # error already printed above except ImportError as exc: _warn(f"sdl2 Python bindings not available: {exc}") _warn("Install: conda install -c conda-forge pysdl2") # ── 9. HUD overhead benchmark ────────────────────────────────────────────── # # Measures draw_playback() cost per frame using a synthetic 640×360 NV12 # frame so no GStreamer pipeline is needed. Vsync is disabled so phases # are timed without vsync-wait interference. # # Phases: # upload_us — SDL_UpdateNVTexture (synthetic NV12 frame) # video_us — SDL_RenderClear + SDL_RenderCopy (letterboxed video rect) # hud_us — screens.draw_playback() (all TTF_RenderUTF8_Blended calls) # present_us — SDL_RenderPresent (no vsync → near-zero on HW renderer) # # The "hud_us" line is the key number: add it to the section-8 total to get # the estimated real-app per-frame cost. If hud_us pushes the combined # total past 41.7 ms the HUD is causing frame drops. _section("9. HUD overhead per frame (draw_playback benchmark)") if SKIP_SDL: _warn("Skipped (--nosection8 flag)") else: try: import ctypes import statistics import types import sdl2 import sdl2.sdlttf as _ttf9 from r36s_dlna_browser.ui import screens as _screens9, theme as _theme9 _cache9 = _screens9.HUDTextCache() # ── SDL + TTF init (no vsync so phases are timed cleanly) ──────────── sdl2.SDL_SetHint(b"SDL_VIDEODRIVER", b"kmsdrm,offscreen") sdl2.SDL_SetHint(b"SDL_AUDIODRIVER", b"alsa,dummy") sdl2.SDL_Init(sdl2.SDL_INIT_VIDEO) _ttf9.TTF_Init() _win9 = sdl2.SDL_CreateWindow( b"S9-HUD", sdl2.SDL_WINDOWPOS_UNDEFINED, sdl2.SDL_WINDOWPOS_UNDEFINED, 640, 640, sdl2.SDL_WINDOW_FULLSCREEN_DESKTOP | sdl2.SDL_WINDOW_SHOWN, ) # Accelerated renderer, NO PRESENTVSYNC — we want clean per-phase timings. _ren9 = sdl2.SDL_CreateRenderer(_win9, -1, sdl2.SDL_RENDERER_ACCELERATED) if not _ren9: _ren9 = sdl2.SDL_CreateRenderer(_win9, -1, sdl2.SDL_RENDERER_SOFTWARE) _ww9 = ctypes.c_int(0); _wh9 = ctypes.c_int(0) sdl2.SDL_GetWindowSize(_win9, ctypes.byref(_ww9), ctypes.byref(_wh9)) _layout9 = _theme9.get_layout(_ww9.value, _wh9.value) print(f" Window: {_ww9.value}×{_wh9.value} " f"HUD top={_layout9.playback_hud_top}px bottom={_layout9.playback_hud_bottom}px") # ── Font (same search order as the app) ────────────────────────────── _font9 = None for _fp9 in _theme9.FONT_SEARCH_PATHS: try: _f9 = _ttf9.TTF_OpenFont(_fp9.encode(), _layout9.playback_font_size) if _f9: _font9 = _f9 print(f" Font: {_fp9} @ {_layout9.playback_font_size}pt") break except Exception: pass if not _font9: _warn("No font found — HUD text will be empty (still measures SDL overhead)") # ── Icons (optional — mirrors app icon loading) ────────────────────── _icons9: dict = {} try: import sdl2.sdlimage as _img9 _img9.IMG_Init(_img9.IMG_INIT_PNG) _icon_map = [ ("hud-play", "hud-play"), ("hud-pause", "hud-pause"), ("hud-stop", "hud-stop"), ("hud-seek", "hud-seek"), ("hud-volume", "hud-volume"), ("hud-display", "hud-display"), ] for _ifname, _ikey in _icon_map: _ipath = _theme9.ICONS_DIR / f"{_ifname}.png" if _ipath.exists(): _isurf = _img9.IMG_Load(str(_ipath).encode()) if _isurf: _itex = sdl2.SDL_CreateTextureFromSurface(_ren9, _isurf) sdl2.SDL_FreeSurface(_isurf) if _itex: _icons9[_ikey] = _itex print(f" Icons loaded: {len(_icons9)} / {len(_icon_map)}") except ImportError: _warn("sdl2.sdlimage not available — icons skipped (icons=None)") # ── Synthetic 640×360 NV12 frame (black) ──────────────────────────── _nv12_w, _nv12_h = 640, 360 _y_size9 = _nv12_w * _nv12_h _uv_size9 = _y_size9 // 2 # Y=0 (black luma), UV=0x80 (neutral chroma) → solid black frame. _nv12_data9 = b'\x00' * _y_size9 + b'\x80' * _uv_size9 _nv12_buf9 = (ctypes.c_ubyte * len(_nv12_data9)).from_buffer_copy(_nv12_data9) _y_ptr9 = ctypes.cast(_nv12_buf9, ctypes.POINTER(ctypes.c_ubyte)) _uv_ptr9 = ctypes.cast(ctypes.byref(_nv12_buf9, _y_size9), ctypes.POINTER(ctypes.c_ubyte)) _tex9 = sdl2.SDL_CreateTexture( _ren9, sdl2.SDL_PIXELFORMAT_NV12, sdl2.SDL_TEXTUREACCESS_STREAMING, _nv12_w, _nv12_h, ) # Letterbox dst rect (same logic as the app render path). _sc9 = min(_ww9.value / _nv12_w, _wh9.value / _nv12_h) _dw9 = max(1, int(_nv12_w * _sc9)); _dh9 = max(1, int(_nv12_h * _sc9)) _dx9 = (_ww9.value - _dw9) // 2; _dy9 = (_wh9.value - _dh9) // 2 _dst9 = sdl2.SDL_Rect(_dx9, _dy9, _dw9, _dh9) print(f" Synthetic frame: {_nv12_w}×{_nv12_h} dst rect: {_dw9}×{_dh9} @ ({_dx9},{_dy9})") # ── Mock playback state ────────────────────────────────────────────── _state9 = types.SimpleNamespace( playback_hud_visible=True, playback_paused=False, playback_duration=3600.0, playback_position=42.0, playback_volume=80, playback_buffer_percent=100, playback_resolution="1920×1080", playback_backend="gstreamer", playback_title="Test Video — A Long Title That May Require Ellipsis Fitting.mkv", playback_hud_mode=_theme9.PLAYBACK_HUD_PINNED, ) # ── Benchmark loop ─────────────────────────────────────────────────── WARMUP9 = 30 FRAMES9 = 300 _upload9: list[float] = [] _video9: list[float] = [] _hud9: list[float] = [] _pres9: list[float] = [] print(f" Running {WARMUP9 + FRAMES9} frames (warmup={WARMUP9}) …") for _fn9 in range(WARMUP9 + FRAMES9): # Advance position so time-text changes each frame (exercises _fit_text). _state9.playback_position = 42.0 + _fn9 * (1.0 / 24.0) # Phase 1 — NV12 texture upload. _t0 = time.monotonic() sdl2.SDL_UpdateNVTexture( _tex9, None, _y_ptr9, _nv12_w, _uv_ptr9, _nv12_w ) _t_upload = (time.monotonic() - _t0) * 1e6 # Phase 2 — RenderClear + RenderCopy (video frame, no HUD). _t0 = time.monotonic() sdl2.SDL_SetRenderDrawColor(_ren9, 0, 0, 0, 255) sdl2.SDL_RenderClear(_ren9) sdl2.SDL_RenderCopy(_ren9, _tex9, None, _dst9) _t_video = (time.monotonic() - _t0) * 1e6 # Phase 3 — draw_playback() with texture cache (warm after first frame). _t0 = time.monotonic() _screens9.draw_playback(_ren9, _font9, _state9, _layout9, _icons9 or None, cache=_cache9) _t_hud = (time.monotonic() - _t0) * 1e6 # Phase 4 — Present (no vsync → should be near-zero on HW renderer). _t0 = time.monotonic() sdl2.SDL_RenderPresent(_ren9) _t_pres = (time.monotonic() - _t0) * 1e6 if _fn9 >= WARMUP9: _upload9.append(_t_upload) _video9.append(_t_video) _hud9.append(_t_hud) _pres9.append(_t_pres) # Drain events. _ev9 = sdl2.SDL_Event() while sdl2.SDL_PollEvent(ctypes.byref(_ev9)): pass # Cleanup. _cache9.invalidate() for _itex_v in _icons9.values(): sdl2.SDL_DestroyTexture(_itex_v) if _tex9: sdl2.SDL_DestroyTexture(_tex9) if _font9: _ttf9.TTF_CloseFont(_font9) _ttf9.TTF_Quit() sdl2.SDL_DestroyRenderer(_ren9) sdl2.SDL_DestroyWindow(_win9) sdl2.SDL_Quit() # ── Report ──────────────────────────────────────────────────────────── _budget9 = 1_000_000 / 24 # µs per frame @ 24 fps def _stat9(label, samples): if not samples: print(f" {label:48s}: no samples") return mn = statistics.mean(samples) mx = max(samples) p95 = sorted(samples)[int(len(samples) * 0.95)] print(f" {label:48s}: mean {mn:6.0f} µs p95 {p95:6.0f} µs max {mx:6.0f} µs ({mn/_budget9*100:.1f}%)") print() print(" --- Section 9 HUD Timing Report ---") print(f" Frames measured (excl warmup) : {len(_hud9)}") _stat9("SDL_UpdateNVTexture (synthetic NV12)", _upload9) _stat9("SDL_RenderClear+RenderCopy (video)", _video9) _stat9("draw_playback() — cached TTF+fills+icons", _hud9) _stat9("SDL_RenderPresent (no vsync)", _pres9) _hud_mean = statistics.mean(_hud9) if _hud9 else 0 _total9_mean = (statistics.mean(_upload9) + statistics.mean(_video9) + _hud_mean + statistics.mean(_pres9)) if _upload9 else 0 print(f"\n {'TOTAL (upload+video+HUD+present)':48s}: {_total9_mean:.0f} µs " f"({_total9_mean/_budget9*100:.1f}% of 41.7ms budget — no vsync wait)") # Estimate real-app cost: section-8 measured memmove+upload+render ≈ 8550 µs # that path had no HUD; add hud_mean to get the estimated combined overhead. _s8_baseline = 8550 # µs, from last known section-8 run _estimated_full = _s8_baseline + _hud_mean print(f" {'Estimated S8 + HUD combined':48s}: {_estimated_full:.0f} µs " f"({_estimated_full/_budget9*100:.1f}% of 41.7ms budget)\n") if _hud_mean > 15_000: _warn(f"HUD mean {_hud_mean:.0f} µs > 15 ms — very likely causing frame drops!") elif _hud_mean > 8_000: _warn(f"HUD mean {_hud_mean:.0f} µs — significant overhead; monitor for drops at 24 fps") elif _hud_mean > 3_000: _warn(f"HUD mean {_hud_mean:.0f} µs — moderate overhead; combined budget may be tight") else: _ok(f"HUD overhead {_hud_mean:.0f} µs — not a bottleneck") except ImportError as exc: _warn(f"sdl2 / sdlttf / screens not available: {exc}") print()