#!/usr/bin/env python3 """ Video playback diagnostic for R36S / ArkOS. Tests GStreamer availability, codec coverage, and a short live-playback loop using the same pipeline the app uses (playbin → NV12 appsink with videoscale GstBin → SDL2 NV12 texture upload → rendered frame). Section 8 is the key end-to-end timing test: it runs a real SDL window with KMSDRM (or whatever SDL picks), decodes via the same GstBin the app uses, and measures memmove + SDL_UpdateNVTexture + RenderCopy separately so desync and frame-drop root causes are visible. Run directly on the device: export HOME=/home/ark PYTHONPATH=/home/ark/R36SHack/src \\ LD_LIBRARY_PATH=/home/ark/miniconda3/envs/r36s-dlna-browser/lib \\ GST_PLUGIN_PATH=/usr/lib/aarch64-linux-gnu/gstreamer-1.0 \\ LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1 /home/ark/miniconda3/envs/r36s-dlna-browser/bin/python \\ /home/ark/R36SHack/tests/test_video_playback_device.py [URL] Pass --nosection8 to skip the SDL rendering loop (useful when running headless). """ from __future__ import annotations import sys import time import textwrap # ── pretty output helpers ─────────────────────────────────────────────────── def _ok(msg: str) -> None: print(f" [OK] {msg}") def _warn(msg: str) -> None: print(f" [WRN] {msg}") def _fail(msg: str) -> None: print(f" [ERR] {msg}") def _section(title: str) -> None: print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") # ── 1. Python env ─────────────────────────────────────────────────────────── _section("1. Python environment") import platform print(f" Python {sys.version}") print(f" Platform: {platform.machine()} / {platform.system()}") # ── 2. GI / GStreamer core ────────────────────────────────────────────────── _section("2. GStreamer core") try: import gi _ok(f"PyGObject (gi) {gi.__version__}") except ImportError as exc: _fail(f"PyGObject not found: {exc}") sys.exit(1) try: gi.require_version("Gst", "1.0") from gi.repository import Gst Gst.init(None) v = Gst.version() _ok(f"GStreamer {v.major}.{v.minor}.{v.micro}.{v.nano}") except Exception as exc: _fail(f"GStreamer init failed: {exc}") sys.exit(1) try: gi.require_version("GstApp", "1.0") gi.require_version("GstVideo", "1.0") from gi.repository import GstApp, GstVideo _ok("GstApp + GstVideo bindings present") except Exception as exc: _fail(f"GstApp/GstVideo bindings missing: {exc}") sys.exit(1) # ── 3. Plugin registry ────────────────────────────────────────────────────── _section("3. Plugin registry") REQUIRED_ELEMENTS = { "playbin": "core orchestrator", "appsink": "app-side frame sink", "videoconvert": "pixel format conversion", "videoscale": "frame scaling", "typefind": "format detection", "decodebin": "auto demux/decode", "uridecodebin": "URI decode helper", } CODEC_ELEMENTS = { # demuxers "matroskademux": "MKV/WebM demuxer", "qtdemux": "MP4/MOV demuxer", "flvdemux": "FLV demuxer", "tsdemux": "MPEG-TS demuxer", # video decoders "avdec_h264": "H.264 software decoder (libav)", "avdec_hevc": "H.265/HEVC software decoder (libav)", "avdec_mpeg2video": "MPEG-2 video decoder", "avdec_vp8": "VP8 decoder", "avdec_vp9": "VP9 decoder", "v4l2h264dec": "H.264 V4L2 HW decoder", "v4l2h265dec": "H.265 V4L2 HW decoder", # audio decoders "avdec_aac": "AAC decoder (libav)", "avdec_mp3": "MP3 decoder (libav)", "avdec_ac3": "AC-3 decoder (libav)", "vorbisdec": "Vorbis decoder", "opusdec": "Opus decoder", # audio output "autoaudiosink": "Auto audio sink", "alsasink": "ALSA sink", "pulsesink": "PulseAudio sink", } missing_required = [] registry = Gst.Registry.get() for element, desc in REQUIRED_ELEMENTS.items(): feat = registry.find_feature(element, Gst.ElementFactory.__gtype__) if feat: _ok(f"{element:20s} ({desc})") else: missing_required.append(element) _fail(f"{element:20s} ({desc}) ← MISSING") print() missing_codecs = [] present_codecs = [] for element, desc in CODEC_ELEMENTS.items(): feat = registry.find_feature(element, Gst.ElementFactory.__gtype__) if feat: present_codecs.append(element) _ok(f"{element:20s} ({desc})") else: missing_codecs.append(element) _warn(f"{element:20s} ({desc}) ← not found") if missing_required: print(f"\n CRITICAL: {len(missing_required)} required element(s) missing: {missing_required}") print(f"\n Codecs present: {len(present_codecs)} / {len(CODEC_ELEMENTS)}") # ── 4. Caps negotiation: can we build a BGRA appsink pipeline? ────────────── _section("4. BGRA appsink pipeline negotiation") TEST_PIPE = ( "videotestsrc num-buffers=5 ! " "videoconvert ! " "video/x-raw,format=BGRA,width=64,height=64 ! " "appsink name=sink emit-signals=true max-buffers=1 drop=true" ) frames_received = 0 negotiation_ok = False try: pipe = Gst.parse_launch(TEST_PIPE) sink = pipe.get_by_name("sink") def _on_sample(sink, *_): global frames_received sample = sink.emit("pull-sample") if sample: frames_received += 1 return Gst.FlowReturn.OK sink.connect("new-sample", _on_sample) pipe.set_state(Gst.State.PLAYING) time.sleep(1.5) pipe.set_state(Gst.State.NULL) if frames_received > 0: negotiation_ok = True _ok(f"BGRA appsink pipeline: received {frames_received} frame(s)") else: _fail("BGRA appsink pipeline ran but produced 0 frames") except Exception as exc: _fail(f"Could not build BGRA appsink pipeline: {exc}") # ── 5. Audio output probe ─────────────────────────────────────────────────── _section("5. Audio output probe") AUDIO_PIPE = "audiotestsrc num-buffers=10 freq=440 ! autoaudiosink" audio_ok = False try: p = Gst.parse_launch(AUDIO_PIPE) p.set_state(Gst.State.PLAYING) time.sleep(0.8) p.set_state(Gst.State.NULL) audio_ok = True _ok("autoaudiosink: played 440 Hz tone without error") except Exception as exc: _fail(f"autoaudiosink failed: {exc}") if not audio_ok: ALSA_PIPE = "audiotestsrc num-buffers=10 freq=440 ! alsasink device=hw:0" try: p = Gst.parse_launch(ALSA_PIPE) p.set_state(Gst.State.PLAYING) time.sleep(0.8) p.set_state(Gst.State.NULL) audio_ok = True _ok("alsasink hw:0: played 440 Hz tone") except Exception as exc: _warn(f"alsasink also failed: {exc}") # ── 6. Live URL playback (optional) ──────────────────────────────────────── test_url = sys.argv[1] if len(sys.argv) > 1 else None if not test_url: try: with open("/tmp/dlna_last_url.txt") as _f: test_url = _f.read().strip() or None if test_url: print(f"\n [auto] Loaded last DLNA URL from /tmp/dlna_last_url.txt") except FileNotFoundError: pass _section("6. Live URL / file playback") if not test_url: _warn("No URL provided — skipping live playback test.") print(" Pass a media URL or path as the first argument, or start the app and") print(" play something; the URL will be saved to /tmp/dlna_last_url.txt.") else: print(f" URL: {test_url}") live_frames = 0 live_error = None try: # Build the pipeline element-by-element so we can hold a direct # reference to the appsink (parse_launch embeds it in a bin and # get_by_name returns None when the bin wraps a nested pipeline string). pipe = Gst.ElementFactory.make("playbin", "live_player") vsink = Gst.ElementFactory.make("appsink", "vsink") if pipe is None or vsink is None: raise RuntimeError("playbin or appsink not available") vsink.set_property("emit-signals", True) vsink.set_property("max-buffers", 2) vsink.set_property("drop", True) vsink.set_property("caps", Gst.Caps.from_string("video/x-raw,format=BGRA")) pipe.set_property("video-sink", vsink) pipe.set_property("uri", test_url if "://" in test_url else Gst.filename_to_uri(test_url)) def _on_live_sample(sink, *_): global live_frames sample = sink.emit("pull-sample") if sample: buf = sample.get_buffer() info = buf.map(Gst.MapFlags.READ) if info.size > 0: live_frames += 1 buf.unmap(info) return Gst.FlowReturn.OK vsink.connect("new-sample", _on_live_sample) bus = pipe.get_bus() pipe.set_state(Gst.State.PLAYING) deadline = time.monotonic() + 15 while time.monotonic() < deadline: msg = bus.timed_pop_filtered( 200 * Gst.MSECOND, Gst.MessageType.ERROR | Gst.MessageType.WARNING | Gst.MessageType.EOS, ) if msg: if msg.type == Gst.MessageType.ERROR: err, debug = msg.parse_error() live_error = f"{err.message} | debug: {debug}" break if msg.type == Gst.MessageType.WARNING: w, d = msg.parse_warning() _warn(f"GStreamer warning: {w.message}") if msg.type == Gst.MessageType.EOS: break if live_frames >= 10: break pipe.set_state(Gst.State.NULL) if live_error: _fail(f"Playback error: {live_error}") elif live_frames == 0: _fail("Playback ran but decoded 0 video frames (audio-only or decode failure)") else: _ok(f"Decoded {live_frames} video frame(s) successfully") except Exception as exc: _fail(f"Could not start live playback pipeline: {exc}") # ── 7. Summary ───────────────────────────────────────────────────────────── _section("7. Summary") issues = [] if missing_required: issues.append(f"Missing required elements: {', '.join(missing_required)}") if not negotiation_ok: issues.append("BGRA appsink pipeline negotiation failed") if not audio_ok: issues.append("No working audio sink found") key_missing = [e for e in ("avdec_h264", "avdec_aac", "matroskademux", "qtdemux") if e in missing_codecs] if key_missing: issues.append(f"Key codecs missing (install gst-libav): {', '.join(key_missing)}") if not issues: print(" All checks passed — video playback should work.") else: print(" Issues found:") for issue in issues: print(f" • {issue}") print() print(" Suggested fix:") print(textwrap.dedent("""\ /home/ark/miniconda3/bin/conda install -n r36s-dlna-browser \\ -c conda-forge gst-libav gst-plugins-good gst-plugins-bad gst-plugins-ugly """)) print() # ── 8. End-to-end SDL rendering benchmark ───────────────────────────────── # # This section replicates what the app does frame-by-frame: # 1. GStreamer appsink (same videoscale GstBin as the app) delivers NV12 at 640×480 # 2. Python memmoves the mapped buffer into a ctypes array ← timed # 3. SDL_UpdateNVTexture uploads Y + UV planes ← timed # 4. SDL_RenderCopy blits the texture to the window ← timed # # Desync and drops will be visible here because we do real SDL rendering. # Pass --nosection8 to skip if running headless. SKIP_SDL = "--nosection8" in sys.argv _section("8. End-to-end SDL render loop (real device output)") if SKIP_SDL: _warn("Skipped (--nosection8 flag)") elif not test_url: _warn("Skipped — no URL. Provide a URL as the first argument.") else: SDL8_SECONDS = 20 # how long to run SDL8_SCALE_W = 640 SDL8_SCALE_H = 480 try: import ctypes import threading import statistics from dataclasses import dataclass, field as dc_field import sdl2 import sdl2.ext # ── SDL init ──────────────────────────────────────────────────────── # Prefer KMSDRM on the device; SDL will fall back automatically. sdl2.SDL_SetHint(b"SDL_VIDEODRIVER", b"kmsdrm,offscreen") sdl2.SDL_SetHint(b"SDL_AUDIODRIVER", b"alsa,dummy") if sdl2.SDL_Init(sdl2.SDL_INIT_VIDEO | sdl2.SDL_INIT_AUDIO) != 0: _fail(f"SDL_Init failed: {sdl2.SDL_GetError().decode()}") raise RuntimeError("SDL_Init") window = sdl2.SDL_CreateWindow( b"R36S playback test", sdl2.SDL_WINDOWPOS_UNDEFINED, sdl2.SDL_WINDOWPOS_UNDEFINED, SDL8_SCALE_W, SDL8_SCALE_H, sdl2.SDL_WINDOW_FULLSCREEN_DESKTOP | sdl2.SDL_WINDOW_SHOWN, ) if not window: _fail(f"SDL_CreateWindow: {sdl2.SDL_GetError().decode()}") raise RuntimeError("SDL window") renderer = sdl2.SDL_CreateRenderer( window, -1, sdl2.SDL_RENDERER_ACCELERATED | sdl2.SDL_RENDERER_PRESENTVSYNC, ) if not renderer: _warn("HW renderer unavailable — falling back to software renderer") renderer = sdl2.SDL_CreateRenderer(window, -1, sdl2.SDL_RENDERER_SOFTWARE) if not renderer: _fail(f"SDL_CreateRenderer: {sdl2.SDL_GetError().decode()}") raise RuntimeError("SDL renderer") # Retrieve actual window size (KMSDRM may ignore the requested size). w_actual = ctypes.c_int(0) h_actual = ctypes.c_int(0) sdl2.SDL_GetWindowSize(window, ctypes.byref(w_actual), ctypes.byref(h_actual)) print(f" SDL window size: {w_actual.value}×{h_actual.value}") texture = sdl2.SDL_CreateTexture( renderer, sdl2.SDL_PIXELFORMAT_NV12, sdl2.SDL_TEXTUREACCESS_STREAMING, SDL8_SCALE_W, SDL8_SCALE_H, ) if not texture: _fail(f"SDL_CreateTexture NV12: {sdl2.SDL_GetError().decode()}") raise RuntimeError("SDL texture") _ok(f"SDL init OK — window {w_actual.value}×{h_actual.value}, NV12 texture {SDL8_SCALE_W}×{SDL8_SCALE_H}") # ── GStreamer pipeline (mirrors _create_appsink) ───────────────────── gi.require_version("GstVideo", "1.0") from gi.repository import GstVideo pipeline8 = Gst.ElementFactory.make("playbin", "p8") appsink8 = Gst.ElementFactory.make("appsink", "vsink8") appsink8.set_property("emit-signals", True) appsink8.set_property("sync", True) appsink8.set_property("max-buffers", 2) appsink8.set_property("drop", True) # Boost mppvideodec rank if /dev/vpu_service is accessible. import os as _os _HW_DEVS = ["/dev/vpu_service", "/dev/mpp_service", "/dev/video10"] _HW_ELEMS = ["mppvideodec", "v4l2h264dec"] _hw_active = False for _dev in _HW_DEVS: try: _fd = _os.open(_dev, _os.O_RDWR | _os.O_NONBLOCK) _os.close(_fd) for _name in _HW_ELEMS: _fac = Gst.ElementFactory.find(_name) if _fac: _fac.set_rank(Gst.Rank.PRIMARY + 1) _hw_active = True print(f" [HW] boosted {_name}") break except OSError: pass # Build videoscale GstBin (nearest-neighbour) → capsfilter → appsink. video_sink8 = appsink8 if _hw_active: scale8 = Gst.ElementFactory.make("videoscale", "vs8") cfilt8 = Gst.ElementFactory.make("capsfilter", "cf8") if scale8 and cfilt8: scale8.set_property("method", 0) # nearest-neighbour cfilt8.set_property("caps", Gst.Caps.from_string( f"video/x-raw,format=NV12,width={SDL8_SCALE_W},height={SDL8_SCALE_H}")) bin8 = Gst.Bin.new("vscale-bin8") bin8.add(scale8); bin8.add(cfilt8); bin8.add(appsink8) scale8.link(cfilt8); cfilt8.link(appsink8) sp = scale8.get_static_pad("sink") gp = Gst.GhostPad.new("sink", sp) gp.set_active(True) bin8.add_pad(gp) video_sink8 = bin8 print(f" [pipeline] videoscale(nearest)→{SDL8_SCALE_W}×{SDL8_SCALE_H} NV12 bin active") else: appsink8.set_property("caps", Gst.Caps.from_string( "video/x-raw,format=NV12;video/x-raw,format=BGRA")) else: appsink8.set_property("caps", Gst.Caps.from_string( "video/x-raw,format=BGRA")) pipeline8.set_property("video-sink", video_sink8) pipeline8.set_property("uri", test_url if "://" in test_url else Gst.filename_to_uri(_os.path.abspath(test_url))) # ── Shared frame buffer ───────────────────────────────────────────── @dataclass class FrameState: lock: threading.RLock = dc_field(default_factory=threading.RLock) raw_arr: object = None raw_arr_size: int = 0 width: int = 0 height: int = 0 pitch: int = 0 y_size: int = 0 uv_pitch: int = 0 pixel_format: str = "?" dirty: bool = False # per-frame timing samples (µs) memmove_us: list = dc_field(default_factory=list) upload_us: list = dc_field(default_factory=list) render_us: list = dc_field(default_factory=list) frame_wall: list = dc_field(default_factory=list) # wall time at upload frame_count: int = 0 first_fmt: str = "" fs = FrameState() errors8: list[str] = [] eos8 = threading.Event() # ── GStreamer callback (runs in GStreamer thread) ──────────────────── def _on_sample8(sink): sample = sink.emit("pull-sample") if sample is None: return Gst.FlowReturn.OK buf = sample.get_buffer() caps = sample.get_caps() if buf is None or caps is None: return Gst.FlowReturn.OK info8 = GstVideo.VideoInfo.new_from_caps(caps) if info8 is None: return Gst.FlowReturn.OK fmt = "BGRA" if info8.finfo: try: fmt = info8.finfo.name.upper() except Exception: pass pitch = int(info8.stride[0]) uv_pitch = int(info8.stride[1]) if fmt == "NV12" else 0 h = int(info8.height) w = int(info8.width) y_size = pitch * h t0 = time.monotonic() ok_map, map_info = buf.map(Gst.MapFlags.READ) if not ok_map: return Gst.FlowReturn.OK try: src_size = map_info.size with fs.lock: if fs.raw_arr is None or fs.raw_arr_size < src_size: fs.raw_arr = (ctypes.c_ubyte * src_size)() fs.raw_arr_size = src_size ctypes.memmove(fs.raw_arr, map_info.data, src_size) t_copy = (time.monotonic() - t0) * 1e6 fs.width = w fs.height = h fs.pitch = pitch fs.uv_pitch = uv_pitch fs.y_size = y_size fs.pixel_format = fmt fs.dirty = True fs.frame_count += 1 if not fs.first_fmt: fs.first_fmt = fmt print(f"\n [first frame] fmt={fmt} {w}x{h} " f"stride0={pitch} buf={src_size}") fs.memmove_us.append(t_copy) finally: buf.unmap(map_info) return Gst.FlowReturn.OK appsink8.connect("new-sample", _on_sample8) # ── Bus thread ─────────────────────────────────────────────────────── def _bus8(): bus = pipeline8.get_bus() while not eos8.is_set(): msg = bus.timed_pop_filtered( 200 * Gst.MSECOND, Gst.MessageType.ERROR | Gst.MessageType.EOS, ) if msg is None: continue if msg.type == Gst.MessageType.ERROR: err, dbg = msg.parse_error() errors8.append(f"{err.message} | {dbg}") print(f"\n [bus] ERROR: {err.message}") eos8.set() elif msg.type == Gst.MessageType.EOS: print("\n [bus] EOS") eos8.set() bth8 = threading.Thread(target=_bus8, daemon=True) bth8.start() pipeline8.set_state(Gst.State.PLAYING) print(f" Running SDL render loop for {SDL8_SECONDS}s …") print(" (close window with Escape or Q, or wait for timeout)\n") # ── SDL render loop (runs on main thread) ─────────────────────────── WARMUP = 5 deadline8 = time.monotonic() + SDL8_SECONDS frame_n = 0 while time.monotonic() < deadline8 and not eos8.is_set(): # Drain SDL events (allows Escape / Q to quit). ev = sdl2.SDL_Event() while sdl2.SDL_PollEvent(ctypes.byref(ev)): if ev.type == sdl2.SDL_QUIT: eos8.set() elif ev.type == sdl2.SDL_KEYDOWN: sym = ev.key.keysym.sym if sym in (sdl2.SDLK_ESCAPE, sdl2.SDLK_q): eos8.set() # Upload + render if a new frame is ready. with fs.lock: if not fs.dirty or fs.raw_arr is None: pass else: w8 = fs.width; h8 = fs.height pitch8 = fs.pitch uv_pitch8 = fs.uv_pitch y_size8 = fs.y_size fmt8 = fs.pixel_format arr8 = fs.raw_arr fs.dirty = False frame_n += 1 # --- SDL_UpdateNVTexture upload --- t_up0 = time.monotonic() if fmt8 == "NV12" and y_size8 > 0: y_ptr = ctypes.cast(arr8, ctypes.POINTER(ctypes.c_ubyte)) uv_ptr = ctypes.cast( ctypes.byref(arr8, y_size8), ctypes.POINTER(ctypes.c_ubyte), ) sdl2.SDL_UpdateNVTexture( texture, None, y_ptr, pitch8, uv_ptr, uv_pitch8, ) else: # BGRA fallback (SW decode path) pix = ctypes.cast(arr8, ctypes.POINTER(ctypes.c_ubyte)) sdl2.SDL_UpdateTexture(texture, None, pix, pitch8) t_upload = (time.monotonic() - t_up0) * 1e6 # --- SDL_RenderCopy --- t_r0 = time.monotonic() sdl2.SDL_RenderClear(renderer) sdl2.SDL_RenderCopy(renderer, texture, None, None) sdl2.SDL_RenderPresent(renderer) t_render = (time.monotonic() - t_r0) * 1e6 wall_now = time.monotonic() if frame_n > WARMUP: fs.upload_us.append(t_upload) fs.render_us.append(t_render) fs.frame_wall.append(wall_now) time.sleep(0.001) # yield to GStreamer thread pipeline8.set_state(Gst.State.NULL) eos8.set() sdl2.SDL_DestroyTexture(texture) sdl2.SDL_DestroyRenderer(renderer) sdl2.SDL_DestroyWindow(window) sdl2.SDL_Quit() # ── Section 8 report ──────────────────────────────────────────────── print() print(" --- Section 8 Timing Report ---") print(f" Total GStreamer frames decoded : {fs.frame_count}") print(f" Frames rendered (excl warmup) : {len(fs.upload_us)}") print(f" Pixel format seen : {fs.first_fmt or '?'}") budget = 1_000_000 / 24 # µs per frame @ 24fps nominal def _stat(label, samples_us): if not samples_us: print(f" {label:38s}: no samples") return mn = statistics.mean(samples_us) mx = max(samples_us) pct = mn / budget * 100 print(f" {label:38s}: mean {mn:6.0f} µs max {mx:6.0f} µs ({pct:.1f}% budget)") _stat("memmove (GStreamer thread)", fs.memmove_us[WARMUP:] if len(fs.memmove_us) > WARMUP else fs.memmove_us) _stat("SDL_UpdateNVTexture (main thread)", fs.upload_us) _stat("SDL_RenderCopy+Present (main thread)", fs.render_us) if len(fs.frame_wall) >= 2: intervals = [fs.frame_wall[i+1] - fs.frame_wall[i] for i in range(len(fs.frame_wall) - 1)] elapsed = fs.frame_wall[-1] - fs.frame_wall[0] fps_act = (len(fs.frame_wall) - 1) / elapsed if elapsed > 0 else 0 dropped = sum(1 for iv in intervals if iv > 0.080) jitter = statistics.stdev(intervals) * 1000 if len(intervals) > 1 else 0 print(f" {'Rendered FPS':38s}: {fps_act:.2f} (jitter {jitter:.1f} ms, dropped {dropped})") if errors8: for e in errors8: _fail(f"GStreamer: {e}") total_mean = ( (statistics.mean(fs.memmove_us[WARMUP:]) if len(fs.memmove_us) > WARMUP else 0) + (statistics.mean(fs.upload_us) if fs.upload_us else 0) + (statistics.mean(fs.render_us) if fs.render_us else 0) ) print(f" {'TOTAL (copy+upload+render)':38s}: {total_mean:.0f} µs ({total_mean/budget*100:.1f}% of 41.7ms budget)") if fps_act < 22: _fail(f"FPS too low ({fps_act:.2f}) — check timing breakdown above for bottleneck") elif dropped > 5: _warn(f"{dropped} dropped frames — pipeline may be too slow under SDL load") else: _ok(f"SDL render loop healthy: {fps_act:.2f} fps, {dropped} dropped") except RuntimeError: pass # error already printed above except ImportError as exc: _warn(f"sdl2 Python bindings not available: {exc}") _warn("Install: conda install -c conda-forge pysdl2") print()