#!/usr/bin/env python3 """ NV12 / mppvideodec decode benchmark for R36S (RK3326 / ArkOS). Replays the exact same pipeline the app uses (playbin → NV12 appsink with mppvideodec auto-selected) and reports: • Decoder selected (HW vs SW) • Decoded frame rate (actual vs stream nominal) • Frame interval stddev (jitter) • Dropped / late frames • A/V sync drift (video PTS vs pipeline clock position) • from_buffer_copy() time per frame (CPU copy cost) Run on device (must use same env as the app): export LD_LIBRARY_PATH=/home/ark/miniconda3/envs/r36s-dlna-browser/lib export GST_PLUGIN_PATH=/usr/lib/aarch64-linux-gnu/gstreamer-1.0 export LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1 export PYTHONPATH=/home/ark/R36SHack/src /home/ark/miniconda3/envs/r36s-dlna-browser/bin/python \\ /home/ark/R36SHack/tests/benchmark_nv12_decode.py [URL] If no URL is given, reads /tmp/dlna_last_url.txt (auto-written by the app on play). """ from __future__ import annotations import ctypes import os import statistics import sys import time import threading from dataclasses import dataclass, field from typing import Optional # ── URL resolution ────────────────────────────────────────────────────────── url: Optional[str] = sys.argv[1] if len(sys.argv) > 1 else None if not url: try: with open("/tmp/dlna_last_url.txt") as _f: url = _f.read().strip() or None if url: print(f"[auto] URL from /tmp/dlna_last_url.txt: {url}") except FileNotFoundError: pass if not url: print("Usage: benchmark_nv12_decode.py ") print(" Or start the app, play something, then re-run without args.") sys.exit(1) BENCH_SECONDS = 30 # how long to run WARMUP_FRAMES = 5 # frames to skip before recording stats # ── GStreamer setup ───────────────────────────────────────────────────────── import gi gi.require_version("Gst", "1.0") gi.require_version("GstApp", "1.0") gi.require_version("GstVideo", "1.0") from gi.repository import Gst, GstApp, GstVideo Gst.init(None) gst_v = Gst.version() print(f"GStreamer {gst_v.major}.{gst_v.minor}.{gst_v.micro}") # ── VPU probe + rank boost (same logic as the app) ────────────────────────── _HW_DECODER_ELEMENTS = ["mppvideodec", "v4l2h264dec", "v4l2h265dec", "v4l2vp8dec", "v4l2vp9dec"] _HW_VPU_DEVICES = ["/dev/vpu_service", "/dev/mpp_service", "/dev/video10", "/dev/video11"] hw_decoders: list[str] = [] for dev in _HW_VPU_DEVICES: try: fd = os.open(dev, os.O_RDWR | os.O_NONBLOCK) os.close(fd) print(f"[HW] VPU device accessible: {dev}") for name in _HW_DECODER_ELEMENTS: fac = Gst.ElementFactory.find(name) if fac is not None: fac.set_rank(Gst.Rank.PRIMARY + 1) hw_decoders.append(name) print(f"[HW] Boosted rank: {name}") break except OSError: pass if not hw_decoders: print("[SW] No VPU device or no gst-mpp plugin — using software decode") # ── Pipeline ──────────────────────────────────────────────────────────────── pipeline = Gst.ElementFactory.make("playbin", "player") if pipeline is None: print("[ERR] playbin unavailable — install gst-plugins-base") sys.exit(1) # Target display size — same default as the app (R36S screen is 640×480). SCALE_W, SCALE_H = 640, 480 appsink = Gst.ElementFactory.make("appsink", "vsink") appsink.set_property("emit-signals", True) appsink.set_property("sync", True) # keep A/V sync on appsink.set_property("max-buffers", 2) appsink.set_property("drop", True) # video_sink is what we hand to playbin. For HW decode we mirror _create_appsink() # from the app: wrap videoscale → capsfilter(NV12,640×480) → appsink in a GstBin # so the Python callback receives 460 KB per frame instead of 3.1 MB (6.7× smaller). # Pass --noscale to disable this and benchmark the unscaled path. video_sink = appsink if hw_decoders and "--noscale" not in sys.argv: scale_el = Gst.ElementFactory.make("videoscale", "vscale") cfilt_el = Gst.ElementFactory.make("capsfilter", "vcaps") if scale_el is not None and cfilt_el is not None: # nearest-neighbour: skips ~56% of source rows so only ~44% of the # source cache lines are loaded vs 100% for memmove of the full frame. # Bilinear must read adjacent rows too, making it slower than nearest. scale_el.set_property("method", 0) cfilt_el.set_property( "caps", Gst.Caps.from_string( f"video/x-raw,format=NV12,width={SCALE_W},height={SCALE_H}" ), ) bin_ = Gst.Bin.new("vscale-bin") bin_.add(scale_el) bin_.add(cfilt_el) bin_.add(appsink) scale_el.link(cfilt_el) cfilt_el.link(appsink) sink_pad = scale_el.get_static_pad("sink") ghost = Gst.GhostPad.new("sink", sink_pad) ghost.set_active(True) bin_.add_pad(ghost) video_sink = bin_ print(f"[scale] videoscale(nearest) → {SCALE_W}×{SCALE_H} NV12") else: print("[scale] videoscale element unavailable — falling back to unscaled NV12") appsink.set_property( "caps", Gst.Caps.from_string("video/x-raw,format=NV12;video/x-raw,format=BGRA") ) elif hw_decoders: print(f"[scale] --noscale: unscaled NV12 ({SCALE_W}×{SCALE_H} disabled)") appsink.set_property( "caps", Gst.Caps.from_string("video/x-raw,format=NV12;video/x-raw,format=BGRA") ) else: appsink.set_property("caps", Gst.Caps.from_string("video/x-raw,format=BGRA")) print(f"[caps] video-sink: {'GstBin(videoscale-nearest+capsfilter+appsink)' if video_sink is not appsink else 'appsink'}") pipeline.set_property("video-sink", video_sink) pipeline.set_property("uri", url) # Disable subtitles / visualisations, keep audio+video (same flags as app). PlayFlags = getattr(Gst, "PlayFlags", None) if PlayFlags is not None: flags = int(pipeline.get_property("flags")) for req in ("AUDIO", "VIDEO"): v = getattr(PlayFlags, req, None) if v is not None: flags |= int(v) for dis in ("TEXT", "VIS"): v = getattr(PlayFlags, dis, None) if v is not None: flags &= ~int(v) pipeline.set_property("flags", flags) # ── Measurement state ─────────────────────────────────────────────────────── @dataclass class Stats: total_frames: int = 0 warmup_done: bool = False fmt: str = "?" width: int = 0 height: int = 0 # timing frame_wall_times: list[float] = field(default_factory=list) pts_list: list[float] = field(default_factory=list) # seconds copy_times_us: list[float] = field(default_factory=list) # µs # A/V sync: (video_pts_s - pipeline_pos_s) samples av_drift_ms: list[float] = field(default_factory=list) dropped_frames: int = 0 lock: threading.Lock = field(default_factory=threading.Lock) stats = Stats() stats._raw_arr = None stats._raw_arr_size = 0 # ── Callback ──────────────────────────────────────────────────────────────── def _on_sample(sink) -> Gst.FlowReturn: sample = sink.emit("pull-sample") if sample is None: return Gst.FlowReturn.OK buf = sample.get_buffer() caps = sample.get_caps() if buf is None or caps is None: return Gst.FlowReturn.OK info = GstVideo.VideoInfo.new_from_caps(caps) if info is None: return Gst.FlowReturn.OK wall_now = time.monotonic() # Buffer PTS in seconds pts_ns = buf.pts pts_s = pts_ns / Gst.SECOND if pts_ns != Gst.CLOCK_TIME_NONE else None fmt_str = "BGRA" if info.finfo is not None: try: fmt_str = info.finfo.name.upper() except Exception: pass # Measure buffer.map(READ) + memmove into a pre-allocated ctypes array # (same path as the app). Reuse a single ctypes array across frames to # avoid per-frame allocation. del is not needed — ctypes array is reused. t0 = time.monotonic() ok, map_info = buf.map(Gst.MapFlags.READ) if not ok: return Gst.FlowReturn.OK try: src_size = map_info.size if not hasattr(stats, '_raw_arr') or stats._raw_arr_size < src_size: stats._raw_arr = (ctypes.c_ubyte * src_size)() stats._raw_arr_size = src_size ctypes.memmove(stats._raw_arr, map_info.data, src_size) copy_us = (time.monotonic() - t0) * 1e6 finally: buf.unmap(map_info) with stats.lock: stats.total_frames += 1 if stats.total_frames == 1: stats.fmt = fmt_str stats.width = int(info.width) stats.height = int(info.height) print(f"\n[first frame] fmt={fmt_str} {info.width}x{info.height} " f"stride0={info.stride[0]} buf_total={buf.get_size()}") if stats.total_frames <= WARMUP_FRAMES: return Gst.FlowReturn.OK # skip warmup frames from stats stats.warmup_done = True stats.frame_wall_times.append(wall_now) stats.copy_times_us.append(copy_us) if pts_s is not None: stats.pts_list.append(pts_s) # A/V sync: query pipeline position and compare to video PTS if pts_s is not None: ok, pos_ns = pipeline.query_position(Gst.Format.TIME) if ok and pos_ns >= 0: drift_ms = (pts_s - pos_ns / Gst.SECOND) * 1000.0 stats.av_drift_ms.append(drift_ms) return Gst.FlowReturn.OK appsink.connect("new-sample", _on_sample) # ── Bus watcher ───────────────────────────────────────────────────────────── errors: list[str] = [] warnings: list[str] = [] eos_reached = threading.Event() def _bus_thread(): bus = pipeline.get_bus() while not eos_reached.is_set(): msg = bus.timed_pop_filtered( 200 * Gst.MSECOND, Gst.MessageType.ERROR | Gst.MessageType.WARNING | Gst.MessageType.EOS, ) if msg is None: continue if msg.type == Gst.MessageType.EOS: print("\n[bus] EOS") eos_reached.set() elif msg.type == Gst.MessageType.ERROR: err, dbg = msg.parse_error() errors.append(f"{err.message} | {dbg}") print(f"\n[bus] ERROR: {err.message}") eos_reached.set() elif msg.type == Gst.MessageType.WARNING: w, d = msg.parse_warning() warnings.append(w.message) print(f"\n[bus] WARNING: {w.message}") bt = threading.Thread(target=_bus_thread, daemon=True) bt.start() # ── Run ───────────────────────────────────────────────────────────────────── print(f"\nRunning benchmark for {BENCH_SECONDS}s (warmup: {WARMUP_FRAMES} frames)...") print("Press Ctrl+C to stop early.\n") ret = pipeline.set_state(Gst.State.PLAYING) if ret == Gst.StateChangeReturn.FAILURE: print("[ERR] Pipeline failed to start") sys.exit(1) deadline = time.monotonic() + BENCH_SECONDS try: while time.monotonic() < deadline and not eos_reached.is_set(): with stats.lock: n = len(stats.frame_wall_times) elapsed = BENCH_SECONDS - (deadline - time.monotonic()) print(f"\r elapsed={elapsed:5.1f}s frames={n:4d} fmt={stats.fmt}", end="", flush=True) time.sleep(0.5) except KeyboardInterrupt: print("\n[interrupted]") print() pipeline.set_state(Gst.State.NULL) eos_reached.set() # ── Report ─────────────────────────────────────────────────────────────────── print("\n" + "="*62) print(" BENCHMARK RESULTS") print("="*62) with stats.lock: wall_times = list(stats.frame_wall_times) copy_us = list(stats.copy_times_us) pts_list = list(stats.pts_list) av_drifts = list(stats.av_drift_ms) total = stats.total_frames fmt = stats.fmt w, h = stats.width, stats.height scale_active = video_sink is not appsink print(f" URL : {url}") print(f" Scale : {'videoscale → %d×%d NV12' % (SCALE_W, SCALE_H) if scale_active else 'none (--noscale or SW)'}") print(f" Format : {fmt} {w}x{h}") print(f" Decoder : {'HW (' + ', '.join(hw_decoders) + ')' if hw_decoders else 'SW (avdec_*)'}") print(f" Total frames decoded : {total} (excl. {WARMUP_FRAMES} warmup)") print(f" Measured frames : {len(wall_times)}") if len(wall_times) >= 2: elapsed_wall = wall_times[-1] - wall_times[0] actual_fps = (len(wall_times) - 1) / elapsed_wall if elapsed_wall > 0 else 0.0 intervals = [wall_times[i+1] - wall_times[i] for i in range(len(wall_times)-1)] mean_ms = statistics.mean(intervals) * 1000 stdev_ms = statistics.stdev(intervals) * 1000 if len(intervals) > 1 else 0.0 max_ms = max(intervals) * 1000 min_ms = min(intervals) * 1000 drops = sum(1 for iv in intervals if iv > 0.080) # >80 ms = likely drop print(f"\n --- Frame rate ---") print(f" Actual FPS : {actual_fps:.2f}") print(f" Frame interval mean : {mean_ms:.1f} ms (nominal {1000/actual_fps:.1f} ms)") print(f" Frame interval stdev : {stdev_ms:.1f} ms (jitter)") print(f" Frame interval min : {min_ms:.1f} ms") print(f" Frame interval max : {max_ms:.1f} ms") print(f" Likely dropped frames: {drops} (intervals > 80 ms)") if len(pts_list) >= 2: pts_intervals = [pts_list[i+1] - pts_list[i] for i in range(len(pts_list)-1)] nominal_fps = 1.0 / statistics.mean(pts_intervals) if pts_intervals else 0.0 print(f"\n --- Stream timestamps ---") print(f" Nominal stream FPS : {nominal_fps:.2f}") pts_rate = actual_fps / nominal_fps if nominal_fps > 0 else 0 if pts_rate < 0.90: print(f" [WARNING] delivering at {pts_rate*100:.0f}% of stream rate — decoder is slow") else: print(f" Decode pace : {pts_rate*100:.0f}% of stream rate (OK)") else: print(" Not enough frames to compute FPS — did playback start?") if copy_us: mean_copy = statistics.mean(copy_us) max_copy = max(copy_us) print(f"\n --- CPU copy cost (buffer.map + memmove) ---") print(f" Mean copy time : {mean_copy:.0f} µs") print(f" Max copy time : {max_copy:.0f} µs") budget_us = 1_000_000 / (actual_fps if len(wall_times) >= 2 and actual_fps > 0 else 30) copy_pct = mean_copy / budget_us * 100 print(f" Copy % of frame budget: {copy_pct:.1f}%") if av_drifts: mean_drift = statistics.mean(av_drifts) stdev_drift = statistics.stdev(av_drifts) if len(av_drifts) > 1 else 0.0 min_drift = min(av_drifts) max_drift = max(av_drifts) print(f"\n --- A/V sync (video PTS - pipeline clock) ---") print(f" Mean drift : {mean_drift:+.1f} ms") print(f" Drift stdev : {stdev_drift:.1f} ms") print(f" Drift range : {min_drift:+.1f} ms .. {max_drift:+.1f} ms") if abs(mean_drift) > 100: print(f" [WARNING] Large mean drift — audio/video desync likely") if stdev_drift > 50: print(f" [WARNING] High drift variance — intermittent desync") if warnings: print(f"\n GStreamer warnings ({len(warnings)}):") for w_msg in warnings[:5]: print(f" • {w_msg}") if errors: print(f"\n GStreamer errors:") for e_msg in errors: print(f" • {e_msg}") print()