#!/usr/bin/env python3 """ H.264 FHD decode benchmark for R36S / RK3326. Compares three decode paths: 1. SW-fakesink — avdec_h264 → fakesink (pure decode throughput, CPU) 2. SW-appsink — avdec_h264 → videoconvert → BGRA → appsink (app path) 3. HW-appsink — mppvideodec → NV12 → appsink (HW decode + zero-copy upload) Usage: # Full auto (generates /tmp/test_fhd.mp4 if missing): GST_PLUGIN_PATH=/usr/lib/aarch64-linux-gnu/gstreamer-1.0 \\ LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1 \\ /home/ark/miniconda3/envs/r36s-dlna-browser/bin/python \\ /home/ark/R36SHack/tests/benchmark_decode.py # Use a specific file: ... benchmark_decode.py /path/to/video.mp4 Output: text table + /tmp/decode_benchmark.json """ from __future__ import annotations import json import os import sys import threading import time TEST_VIDEO = "/tmp/test_fhd.mp4" GENERATE_DURATION = 10 # seconds of synthetic FHD content to encode GENERATE_FPS = 30 WARMUP_FRAMES = 10 # frames to discard before measuring # ── helpers ───────────────────────────────────────────────────────────────── def _section(title: str) -> None: print(f"\n{'='*60}") print(f" {title}") print("=" * 60) def _cpu_times() -> tuple[int, int]: """Return (user+sys, total) jiffies from /proc/stat cpu line.""" with open("/proc/stat") as f: line = f.readline() parts = line.split() values = [int(x) for x in parts[1:8]] busy = values[0] + values[1] + values[2] # user + nice + system total = sum(values) return busy, total def _cpu_percent(t0_busy: int, t0_total: int, t1_busy: int, t1_total: int) -> float: db = t1_busy - t0_busy dt = t1_total - t0_total return 100.0 * db / dt if dt else 0.0 # ── GStreamer ──────────────────────────────────────────────────────────────── def _init_gst(): import gi gi.require_version("Gst", "1.0") gi.require_version("GstApp", "1.0") from gi.repository import Gst, GstApp, GLib Gst.init(None) return Gst, GLib # ── video generation ───────────────────────────────────────────────────────── def _generate_test_video(path: str, Gst, GLib) -> None: print(f" Generating synthetic FHD H.264 clip → {path}") print(f" (duration={GENERATE_DURATION}s, size=1920x1080, rate={GENERATE_FPS}/1)") pipe_str = ( f"videotestsrc num-buffers={GENERATE_DURATION * GENERATE_FPS} " f"! video/x-raw,width=1920,height=1080,framerate={GENERATE_FPS}/1,format=I420 " f"! avenc_h264 bitrate=4000000 " f"! h264parse ! mp4mux ! filesink location={path}" ) # Check if avenc_h264 is available reg = Gst.Registry.get() if reg.find_feature("avenc_h264", Gst.ElementFactory.__gtype__) is None: print(" [WARN] avenc_h264 not found — trying x264enc") pipe_str = pipe_str.replace("avenc_h264 bitrate=4000000", "x264enc bitrate=4000 speed-preset=ultrafast") if reg.find_feature("x264enc", Gst.ElementFactory.__gtype__) is None: sys.exit(" [ERR] No H.264 encoder found. Use ffmpeg to generate the clip:\n" f" ffmpeg -y -f lavfi -i testsrc=duration={GENERATE_DURATION}:" f"size=1920x1080:rate={GENERATE_FPS} -c:v libx264 -preset ultrafast " f"-b:v 4M {path}") pipeline = Gst.parse_launch(pipe_str) loop = GLib.MainLoop() def on_message(bus, msg): if msg.type == Gst.MessageType.EOS: pipeline.set_state(Gst.State.NULL) loop.quit() elif msg.type == Gst.MessageType.ERROR: err, dbg = msg.parse_error() pipeline.set_state(Gst.State.NULL) loop.quit() sys.exit(f" [ERR] Generation failed: {err} {dbg}") bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", on_message) pipeline.set_state(Gst.State.PLAYING) t_start = time.monotonic() loop.run() elapsed = time.monotonic() - t_start size = os.path.getsize(path) print(f" Generated in {elapsed:.1f}s — {size // 1024}KB") # ── benchmark runner ───────────────────────────────────────────────────────── def _run_benchmark( label: str, video_path: str, pipeline_str: str, is_appsink: bool, Gst, GLib, ) -> dict: """Run one decode benchmark pass, return metrics dict.""" print(f"\n Running: {label}") frame_count = [0] done_event = threading.Event() frame_times: list[float] = [] pipeline = Gst.parse_launch(pipeline_str) if is_appsink: sink = pipeline.get_by_name("bench_sink") def on_new_sample(s): s.emit("pull-sample") n = frame_count[0] frame_count[0] = n + 1 if n >= WARMUP_FRAMES: frame_times.append(time.monotonic()) return Gst.FlowReturn.OK sink.set_property("emit-signals", True) sink.set_property("sync", False) sink.set_property("max-buffers", 4) sink.set_property("drop", False) sink.connect("new-sample", on_new_sample) else: # fakesink path: count frames via identity handoff signal probe = pipeline.get_by_name("probe") if probe: def on_handoff(*args): n = frame_count[0] frame_count[0] = n + 1 if n >= WARMUP_FRAMES: frame_times.append(time.monotonic()) probe.connect("handoff", on_handoff) loop = GLib.MainLoop() def on_message(bus, msg): if msg.type == Gst.MessageType.EOS: pipeline.set_state(Gst.State.NULL) loop.quit() done_event.set() elif msg.type == Gst.MessageType.ERROR: err, dbg = msg.parse_error() pipeline.set_state(Gst.State.NULL) loop.quit() done_event.set() print(f" [ERR] {label}: {err}") bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", on_message) cpu_before = _cpu_times() t_wall_start = time.monotonic() pipeline.set_state(Gst.State.PLAYING) loop.run() t_wall_end = time.monotonic() cpu_after = _cpu_times() wall = t_wall_end - t_wall_start cpu_pct = _cpu_percent(*cpu_before, *cpu_after) if len(frame_times) >= 2: measure_frames = len(frame_times) measure_wall = frame_times[-1] - frame_times[0] fps = (measure_frames - 1) / measure_wall if measure_wall > 0 else 0 else: total_frames = frame_count[0] fps = total_frames / wall if wall > 0 else 0 measure_frames = total_frames result = { "label": label, "fps": round(fps, 2), "frames": measure_frames, "wall_s": round(wall, 2), "cpu_pct": round(cpu_pct, 1), } print(f" {fps:6.1f} fps {measure_frames} frames {wall:.1f}s wall CPU {cpu_pct:.0f}%") return result # ── main ───────────────────────────────────────────────────────────────────── def main() -> None: video_path = sys.argv[1] if len(sys.argv) > 1 else TEST_VIDEO _section("GStreamer FHD decode benchmark — R36S / RK3326") Gst, GLib = _init_gst() print(f" GStreamer {Gst.version_string()}") reg = Gst.Registry.get() mpp_ok = reg.find_feature("mppvideodec", Gst.ElementFactory.__gtype__) is not None av_ok = reg.find_feature("avdec_h264", Gst.ElementFactory.__gtype__) is not None print(f" avdec_h264: {'✓' if av_ok else '✗ MISSING'}") print(f" mppvideodec: {'✓' if mpp_ok else '✗ MISSING (install mpp libs)'}") # --- prepare test video ------------------------------------------------- _section("Test video") if not os.path.isfile(video_path) or os.path.getsize(video_path) < 10_000: _generate_test_video(video_path, Gst, GLib) else: size = os.path.getsize(video_path) print(f" Using existing: {video_path} ({size // 1024}KB)") def _set_mpp_rank(rank: int) -> None: """Temporarily set mppvideodec factory rank to steer decodebin selection.""" factory = reg.find_feature("mppvideodec", Gst.ElementFactory.__gtype__) if factory: factory.set_rank(rank) # --- benchmark runs ----------------------------------------------------- _section("Benchmarks (sync=false, as fast as possible)") results = [] # Pipeline note: qtdemux has dynamic src pads so we use decodebin. # decodebin rank manipulation steers it toward SW or HW decoder. GST_RANK_NONE = 0 MPP_RANK_HIGH = 257 # above avdec_h264 (256) # 1. SW fakesink — pure decode throughput if av_ok: _set_mpp_rank(GST_RANK_NONE) # ensure avdec_h264 wins pipe = (f"filesrc location={video_path} ! decodebin " f"! identity name=probe signal-handoffs=true ! fakesink sync=false") results.append(_run_benchmark( "SW fakesink (avdec_h264 → discard)", video_path, pipe, is_appsink=False, Gst=Gst, GLib=GLib, )) # 2. SW appsink — full BGRA path (as used by app without HW decode) if av_ok: _set_mpp_rank(GST_RANK_NONE) pipe = ( f"filesrc location={video_path} ! decodebin " f"! videoconvert ! video/x-raw,format=BGRA " f"! appsink name=bench_sink" ) results.append(_run_benchmark( "SW appsink BGRA (avdec_h264 → videoconvert → BGRA)", video_path, pipe, is_appsink=True, Gst=Gst, GLib=GLib, )) # 3. HW fakesink — pure MPP decode throughput if mpp_ok: _set_mpp_rank(MPP_RANK_HIGH) # prefer mppvideodec pipe = (f"filesrc location={video_path} ! decodebin " f"! identity name=probe signal-handoffs=true ! fakesink sync=false") results.append(_run_benchmark( "HW fakesink (mppvideodec → discard)", video_path, pipe, is_appsink=False, Gst=Gst, GLib=GLib, )) # 4. HW appsink NV12 — full app path with HW + zero-copy SDL upload if mpp_ok: _set_mpp_rank(MPP_RANK_HIGH) pipe = ( f"filesrc location={video_path} ! decodebin " f"! appsink name=bench_sink caps=video/x-raw,format=NV12" ) results.append(_run_benchmark( "HW appsink NV12 (mppvideodec → NV12, zero-copy)", video_path, pipe, is_appsink=True, Gst=Gst, GLib=GLib, )) _set_mpp_rank(64) # restore default marginal rank # --- summary table ------------------------------------------------------- _section("Results") print(f" {'Path':<45} {'FPS':>6} {'CPU%':>5}") print(f" {'-'*45} {'-'*6} {'-'*5}") for r in results: fps_bar = "★" * int(r["fps"] / 5) print(f" {r['label']:<45} {r['fps']:>6.1f} {r['cpu_pct']:>4.0f}% {fps_bar}") target_fps = GENERATE_FPS print(f"\n Target playback FPS: {target_fps}") best = max(results, key=lambda r: r["fps"]) if results else None if best: print(f" Best path: {best['label']} @ {best['fps']:.1f} fps") if best["fps"] >= target_fps: print(f" ✓ Smooth real-time playback is achievable.") else: print(f" ✗ Cannot sustain {target_fps} fps — consider lower resolution or bitrate.") # --- save JSON ----------------------------------------------------------- out_path = "/tmp/decode_benchmark.json" with open(out_path, "w") as f: json.dump({"video": video_path, "results": results, "target_fps": target_fps}, f, indent=2) print(f"\n Results saved: {out_path}") if __name__ == "__main__": main()