SDL2/GStreamer DLNA browser for R36S by Matteo Benedetto
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

740 lines
29 KiB

#!/usr/bin/env python3
"""
Video playback diagnostic for R36S / ArkOS.
Tests GStreamer availability, codec coverage, and a short live-playback loop
using the same pipeline the app uses (playbin → NV12 appsink with videoscale
GstBin → SDL2 NV12 texture upload → rendered frame).
Section 8 is the key end-to-end timing test: it runs a real SDL window with
KMSDRM (or whatever SDL picks), decodes via the same GstBin the app uses, and
measures memmove + SDL_UpdateNVTexture + RenderCopy separately so desync and
frame-drop root causes are visible.
Run directly on the device:
export HOME=/home/ark PYTHONPATH=/home/ark/R36SHack/src \\
LD_LIBRARY_PATH=/home/ark/miniconda3/envs/r36s-dlna-browser/lib \\
GST_PLUGIN_PATH=/usr/lib/aarch64-linux-gnu/gstreamer-1.0 \\
LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1
/home/ark/miniconda3/envs/r36s-dlna-browser/bin/python \\
/home/ark/R36SHack/tests/test_video_playback_device.py [URL]
Pass --nosection8 to skip the SDL rendering loop (useful when running headless).
"""
from __future__ import annotations
import sys
import time
import textwrap
# ── pretty output helpers ───────────────────────────────────────────────────
def _ok(msg: str) -> None:
print(f" [OK] {msg}")
def _warn(msg: str) -> None:
print(f" [WRN] {msg}")
def _fail(msg: str) -> None:
print(f" [ERR] {msg}")
def _section(title: str) -> None:
print(f"\n{'='*60}")
print(f" {title}")
print(f"{'='*60}")
# ── 1. Python env ───────────────────────────────────────────────────────────
_section("1. Python environment")
import platform
print(f" Python {sys.version}")
print(f" Platform: {platform.machine()} / {platform.system()}")
# ── 2. GI / GStreamer core ──────────────────────────────────────────────────
_section("2. GStreamer core")
try:
import gi
_ok(f"PyGObject (gi) {gi.__version__}")
except ImportError as exc:
_fail(f"PyGObject not found: {exc}")
sys.exit(1)
try:
gi.require_version("Gst", "1.0")
from gi.repository import Gst
Gst.init(None)
v = Gst.version()
_ok(f"GStreamer {v.major}.{v.minor}.{v.micro}.{v.nano}")
except Exception as exc:
_fail(f"GStreamer init failed: {exc}")
sys.exit(1)
try:
gi.require_version("GstApp", "1.0")
gi.require_version("GstVideo", "1.0")
from gi.repository import GstApp, GstVideo
_ok("GstApp + GstVideo bindings present")
except Exception as exc:
_fail(f"GstApp/GstVideo bindings missing: {exc}")
sys.exit(1)
# ── 3. Plugin registry ──────────────────────────────────────────────────────
_section("3. Plugin registry")
REQUIRED_ELEMENTS = {
"playbin": "core orchestrator",
"appsink": "app-side frame sink",
"videoconvert": "pixel format conversion",
"videoscale": "frame scaling",
"typefind": "format detection",
"decodebin": "auto demux/decode",
"uridecodebin": "URI decode helper",
}
CODEC_ELEMENTS = {
# demuxers
"matroskademux": "MKV/WebM demuxer",
"qtdemux": "MP4/MOV demuxer",
"flvdemux": "FLV demuxer",
"tsdemux": "MPEG-TS demuxer",
# video decoders
"avdec_h264": "H.264 software decoder (libav)",
"avdec_hevc": "H.265/HEVC software decoder (libav)",
"avdec_mpeg2video": "MPEG-2 video decoder",
"avdec_vp8": "VP8 decoder",
"avdec_vp9": "VP9 decoder",
"v4l2h264dec": "H.264 V4L2 HW decoder",
"v4l2h265dec": "H.265 V4L2 HW decoder",
# audio decoders
"avdec_aac": "AAC decoder (libav)",
"avdec_mp3": "MP3 decoder (libav)",
"avdec_ac3": "AC-3 decoder (libav)",
"vorbisdec": "Vorbis decoder",
"opusdec": "Opus decoder",
# audio output
"autoaudiosink": "Auto audio sink",
"alsasink": "ALSA sink",
"pulsesink": "PulseAudio sink",
}
missing_required = []
registry = Gst.Registry.get()
for element, desc in REQUIRED_ELEMENTS.items():
feat = registry.find_feature(element, Gst.ElementFactory.__gtype__)
if feat:
_ok(f"{element:20s} ({desc})")
else:
missing_required.append(element)
_fail(f"{element:20s} ({desc}) ← MISSING")
print()
missing_codecs = []
present_codecs = []
for element, desc in CODEC_ELEMENTS.items():
feat = registry.find_feature(element, Gst.ElementFactory.__gtype__)
if feat:
present_codecs.append(element)
_ok(f"{element:20s} ({desc})")
else:
missing_codecs.append(element)
_warn(f"{element:20s} ({desc}) ← not found")
if missing_required:
print(f"\n CRITICAL: {len(missing_required)} required element(s) missing: {missing_required}")
print(f"\n Codecs present: {len(present_codecs)} / {len(CODEC_ELEMENTS)}")
# ── 4. Caps negotiation: can we build a BGRA appsink pipeline? ──────────────
_section("4. BGRA appsink pipeline negotiation")
TEST_PIPE = (
"videotestsrc num-buffers=5 ! "
"videoconvert ! "
"video/x-raw,format=BGRA,width=64,height=64 ! "
"appsink name=sink emit-signals=true max-buffers=1 drop=true"
)
frames_received = 0
negotiation_ok = False
try:
pipe = Gst.parse_launch(TEST_PIPE)
sink = pipe.get_by_name("sink")
def _on_sample(sink, *_):
global frames_received
sample = sink.emit("pull-sample")
if sample:
frames_received += 1
return Gst.FlowReturn.OK
sink.connect("new-sample", _on_sample)
pipe.set_state(Gst.State.PLAYING)
time.sleep(1.5)
pipe.set_state(Gst.State.NULL)
if frames_received > 0:
negotiation_ok = True
_ok(f"BGRA appsink pipeline: received {frames_received} frame(s)")
else:
_fail("BGRA appsink pipeline ran but produced 0 frames")
except Exception as exc:
_fail(f"Could not build BGRA appsink pipeline: {exc}")
# ── 5. Audio output probe ───────────────────────────────────────────────────
_section("5. Audio output probe")
AUDIO_PIPE = "audiotestsrc num-buffers=10 freq=440 ! autoaudiosink"
audio_ok = False
try:
p = Gst.parse_launch(AUDIO_PIPE)
p.set_state(Gst.State.PLAYING)
time.sleep(0.8)
p.set_state(Gst.State.NULL)
audio_ok = True
_ok("autoaudiosink: played 440 Hz tone without error")
except Exception as exc:
_fail(f"autoaudiosink failed: {exc}")
if not audio_ok:
ALSA_PIPE = "audiotestsrc num-buffers=10 freq=440 ! alsasink device=hw:0"
try:
p = Gst.parse_launch(ALSA_PIPE)
p.set_state(Gst.State.PLAYING)
time.sleep(0.8)
p.set_state(Gst.State.NULL)
audio_ok = True
_ok("alsasink hw:0: played 440 Hz tone")
except Exception as exc:
_warn(f"alsasink also failed: {exc}")
# ── 6. Live URL playback (optional) ────────────────────────────────────────
test_url = sys.argv[1] if len(sys.argv) > 1 else None
if not test_url:
try:
with open("/tmp/dlna_last_url.txt") as _f:
test_url = _f.read().strip() or None
if test_url:
print(f"\n [auto] Loaded last DLNA URL from /tmp/dlna_last_url.txt")
except FileNotFoundError:
pass
_section("6. Live URL / file playback")
if not test_url:
_warn("No URL provided — skipping live playback test.")
print(" Pass a media URL or path as the first argument, or start the app and")
print(" play something; the URL will be saved to /tmp/dlna_last_url.txt.")
else:
print(f" URL: {test_url}")
live_frames = 0
live_error = None
try:
# Build the pipeline element-by-element so we can hold a direct
# reference to the appsink (parse_launch embeds it in a bin and
# get_by_name returns None when the bin wraps a nested pipeline string).
pipe = Gst.ElementFactory.make("playbin", "live_player")
vsink = Gst.ElementFactory.make("appsink", "vsink")
if pipe is None or vsink is None:
raise RuntimeError("playbin or appsink not available")
vsink.set_property("emit-signals", True)
vsink.set_property("max-buffers", 2)
vsink.set_property("drop", True)
vsink.set_property("caps", Gst.Caps.from_string("video/x-raw,format=BGRA"))
pipe.set_property("video-sink", vsink)
pipe.set_property("uri", test_url if "://" in test_url else Gst.filename_to_uri(test_url))
def _on_live_sample(sink, *_):
global live_frames
sample = sink.emit("pull-sample")
if sample:
buf = sample.get_buffer()
info = buf.map(Gst.MapFlags.READ)
if info.size > 0:
live_frames += 1
buf.unmap(info)
return Gst.FlowReturn.OK
vsink.connect("new-sample", _on_live_sample)
bus = pipe.get_bus()
pipe.set_state(Gst.State.PLAYING)
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
msg = bus.timed_pop_filtered(
200 * Gst.MSECOND,
Gst.MessageType.ERROR | Gst.MessageType.WARNING | Gst.MessageType.EOS,
)
if msg:
if msg.type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
live_error = f"{err.message} | debug: {debug}"
break
if msg.type == Gst.MessageType.WARNING:
w, d = msg.parse_warning()
_warn(f"GStreamer warning: {w.message}")
if msg.type == Gst.MessageType.EOS:
break
if live_frames >= 10:
break
pipe.set_state(Gst.State.NULL)
if live_error:
_fail(f"Playback error: {live_error}")
elif live_frames == 0:
_fail("Playback ran but decoded 0 video frames (audio-only or decode failure)")
else:
_ok(f"Decoded {live_frames} video frame(s) successfully")
except Exception as exc:
_fail(f"Could not start live playback pipeline: {exc}")
# ── 7. Summary ─────────────────────────────────────────────────────────────
_section("7. Summary")
issues = []
if missing_required:
issues.append(f"Missing required elements: {', '.join(missing_required)}")
if not negotiation_ok:
issues.append("BGRA appsink pipeline negotiation failed")
if not audio_ok:
issues.append("No working audio sink found")
key_missing = [e for e in ("avdec_h264", "avdec_aac", "matroskademux", "qtdemux") if e in missing_codecs]
if key_missing:
issues.append(f"Key codecs missing (install gst-libav): {', '.join(key_missing)}")
if not issues:
print(" All checks passed — video playback should work.")
else:
print(" Issues found:")
for issue in issues:
print(f"{issue}")
print()
print(" Suggested fix:")
print(textwrap.dedent("""\
/home/ark/miniconda3/bin/conda install -n r36s-dlna-browser \\
-c conda-forge gst-libav gst-plugins-good gst-plugins-bad gst-plugins-ugly
"""))
print()
# ── 8. End-to-end SDL rendering benchmark ─────────────────────────────────
#
# This section replicates what the app does frame-by-frame:
# 1. GStreamer appsink (same videoscale GstBin as the app) — width-only NV12
# capsfilter so GStreamer preserves the source DAR when choosing height.
# 2. Python memmoves the mapped buffer into a ctypes array ← timed
# 3. SDL_UpdateNVTexture uploads Y + UV planes into a lazily-created
# texture whose dimensions match the actual decoded frame. ← timed
# 4. SDL_RenderCopy blits the texture to the window ← timed
#
# Desync and drops will be visible here because we do real SDL rendering.
# Pass --nosection8 to skip if running headless.
SKIP_SDL = "--nosection8" in sys.argv
_section("8. End-to-end SDL render loop (real device output)")
if SKIP_SDL:
_warn("Skipped (--nosection8 flag)")
elif not test_url:
_warn("Skipped — no URL. Provide a URL as the first argument.")
else:
SDL8_SECONDS = 20 # how long to run
SDL8_SCALE_W = 640 # width fed into capsfilter; height derived from source DAR
try:
import ctypes
import threading
import statistics
from dataclasses import dataclass, field as dc_field
import sdl2
import sdl2.ext
# ── SDL init ────────────────────────────────────────────────────────
# Prefer KMSDRM on the device; SDL will fall back automatically.
sdl2.SDL_SetHint(b"SDL_VIDEODRIVER", b"kmsdrm,offscreen")
sdl2.SDL_SetHint(b"SDL_AUDIODRIVER", b"alsa,dummy")
if sdl2.SDL_Init(sdl2.SDL_INIT_VIDEO | sdl2.SDL_INIT_AUDIO) != 0:
_fail(f"SDL_Init failed: {sdl2.SDL_GetError().decode()}")
raise RuntimeError("SDL_Init")
window = sdl2.SDL_CreateWindow(
b"R36S playback test",
sdl2.SDL_WINDOWPOS_UNDEFINED, sdl2.SDL_WINDOWPOS_UNDEFINED,
SDL8_SCALE_W, SDL8_SCALE_W, # square hint; KMSDRM uses native res anyway
sdl2.SDL_WINDOW_FULLSCREEN_DESKTOP | sdl2.SDL_WINDOW_SHOWN,
)
if not window:
_fail(f"SDL_CreateWindow: {sdl2.SDL_GetError().decode()}")
raise RuntimeError("SDL window")
renderer = sdl2.SDL_CreateRenderer(
window, -1,
sdl2.SDL_RENDERER_ACCELERATED | sdl2.SDL_RENDERER_PRESENTVSYNC,
)
if not renderer:
_warn("HW renderer unavailable — falling back to software renderer")
renderer = sdl2.SDL_CreateRenderer(window, -1, sdl2.SDL_RENDERER_SOFTWARE)
if not renderer:
_fail(f"SDL_CreateRenderer: {sdl2.SDL_GetError().decode()}")
raise RuntimeError("SDL renderer")
# Retrieve actual window size (KMSDRM may ignore the requested size).
w_actual = ctypes.c_int(0)
h_actual = ctypes.c_int(0)
sdl2.SDL_GetWindowSize(window, ctypes.byref(w_actual), ctypes.byref(h_actual))
print(f" SDL window size: {w_actual.value}×{h_actual.value}")
# Texture is created lazily on the first frame so dimensions match
# the actual GStreamer output (height is AR-derived, not fixed).
texture = None
texture_size = (0, 0) # (w, h) of the current texture
_ok(f"SDL init OK — window {w_actual.value}×{h_actual.value} (texture: lazy init)")
# ── GStreamer pipeline (mirrors _create_appsink) ─────────────────────
gi.require_version("GstVideo", "1.0")
from gi.repository import GstVideo
pipeline8 = Gst.ElementFactory.make("playbin", "p8")
appsink8 = Gst.ElementFactory.make("appsink", "vsink8")
appsink8.set_property("emit-signals", True)
appsink8.set_property("sync", True)
appsink8.set_property("max-buffers", 2)
appsink8.set_property("drop", True)
# Boost mppvideodec rank if /dev/vpu_service is accessible.
import os as _os
_HW_DEVS = ["/dev/vpu_service", "/dev/mpp_service", "/dev/video10"]
_HW_ELEMS = ["mppvideodec", "v4l2h264dec"]
_hw_active = False
for _dev in _HW_DEVS:
try:
_fd = _os.open(_dev, _os.O_RDWR | _os.O_NONBLOCK)
_os.close(_fd)
for _name in _HW_ELEMS:
_fac = Gst.ElementFactory.find(_name)
if _fac:
_fac.set_rank(Gst.Rank.PRIMARY + 1)
_hw_active = True
print(f" [HW] boosted {_name}")
break
except OSError:
pass
# Build videoscale GstBin (nearest-neighbour) → capsfilter → appsink.
# Only width is fixed in the capsfilter; GStreamer derives height from
# the source's display aspect ratio (same strategy as _create_appsink).
video_sink8 = appsink8
if _hw_active:
scale8 = Gst.ElementFactory.make("videoscale", "vs8")
cfilt8 = Gst.ElementFactory.make("capsfilter", "cf8")
if scale8 and cfilt8:
scale8.set_property("method", 0) # nearest-neighbour
cfilt8.set_property("caps", Gst.Caps.from_string(
f"video/x-raw,format=NV12,width={SDL8_SCALE_W}"))
bin8 = Gst.Bin.new("vscale-bin8")
bin8.add(scale8); bin8.add(cfilt8); bin8.add(appsink8)
scale8.link(cfilt8); cfilt8.link(appsink8)
sp = scale8.get_static_pad("sink")
gp = Gst.GhostPad.new("sink", sp)
gp.set_active(True)
bin8.add_pad(gp)
video_sink8 = bin8
print(f" [pipeline] videoscale(nearest) width={SDL8_SCALE_W} NV12 bin active (height=AR-derived)")
else:
appsink8.set_property("caps", Gst.Caps.from_string(
"video/x-raw,format=NV12;video/x-raw,format=BGRA"))
else:
appsink8.set_property("caps", Gst.Caps.from_string(
"video/x-raw,format=BGRA"))
pipeline8.set_property("video-sink", video_sink8)
pipeline8.set_property("uri", test_url if "://" in test_url else Gst.filename_to_uri(_os.path.abspath(test_url)))
# ── Shared frame buffer ─────────────────────────────────────────────
@dataclass
class FrameState:
lock: threading.RLock = dc_field(default_factory=threading.RLock)
raw_arr: object = None
raw_arr_size: int = 0
width: int = 0
height: int = 0
pitch: int = 0
y_size: int = 0
uv_pitch: int = 0
pixel_format: str = "?"
dirty: bool = False
# per-frame timing samples (µs)
memmove_us: list = dc_field(default_factory=list)
upload_us: list = dc_field(default_factory=list)
render_us: list = dc_field(default_factory=list)
frame_wall: list = dc_field(default_factory=list) # wall time at upload
frame_count: int = 0
first_fmt: str = ""
fs = FrameState()
errors8: list[str] = []
eos8 = threading.Event()
# ── GStreamer callback (runs in GStreamer thread) ────────────────────
def _on_sample8(sink):
sample = sink.emit("pull-sample")
if sample is None:
return Gst.FlowReturn.OK
buf = sample.get_buffer()
caps = sample.get_caps()
if buf is None or caps is None:
return Gst.FlowReturn.OK
info8 = GstVideo.VideoInfo.new_from_caps(caps)
if info8 is None:
return Gst.FlowReturn.OK
fmt = "BGRA"
if info8.finfo:
try:
fmt = info8.finfo.name.upper()
except Exception:
pass
pitch = int(info8.stride[0])
uv_pitch = int(info8.stride[1]) if fmt == "NV12" else 0
h = int(info8.height)
w = int(info8.width)
y_size = pitch * h
t0 = time.monotonic()
ok_map, map_info = buf.map(Gst.MapFlags.READ)
if not ok_map:
return Gst.FlowReturn.OK
try:
src_size = map_info.size
with fs.lock:
if fs.raw_arr is None or fs.raw_arr_size < src_size:
fs.raw_arr = (ctypes.c_ubyte * src_size)()
fs.raw_arr_size = src_size
ctypes.memmove(fs.raw_arr, map_info.data, src_size)
t_copy = (time.monotonic() - t0) * 1e6
fs.width = w
fs.height = h
fs.pitch = pitch
fs.uv_pitch = uv_pitch
fs.y_size = y_size
fs.pixel_format = fmt
fs.dirty = True
fs.frame_count += 1
if not fs.first_fmt:
fs.first_fmt = fmt
print(f"\n [first frame] fmt={fmt} {w}x{h} "
f"stride0={pitch} buf={src_size}")
fs.memmove_us.append(t_copy)
finally:
buf.unmap(map_info)
return Gst.FlowReturn.OK
appsink8.connect("new-sample", _on_sample8)
# ── Bus thread ───────────────────────────────────────────────────────
def _bus8():
bus = pipeline8.get_bus()
while not eos8.is_set():
msg = bus.timed_pop_filtered(
200 * Gst.MSECOND,
Gst.MessageType.ERROR | Gst.MessageType.EOS,
)
if msg is None:
continue
if msg.type == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
errors8.append(f"{err.message} | {dbg}")
print(f"\n [bus] ERROR: {err.message}")
eos8.set()
elif msg.type == Gst.MessageType.EOS:
print("\n [bus] EOS")
eos8.set()
bth8 = threading.Thread(target=_bus8, daemon=True)
bth8.start()
pipeline8.set_state(Gst.State.PLAYING)
print(f" Running SDL render loop for {SDL8_SECONDS}s …")
print(" (close window with Escape or Q, or wait for timeout)\n")
# ── SDL render loop (runs on main thread) ───────────────────────────
WARMUP = 5
deadline8 = time.monotonic() + SDL8_SECONDS
frame_n = 0
while time.monotonic() < deadline8 and not eos8.is_set():
# Drain SDL events (allows Escape / Q to quit).
ev = sdl2.SDL_Event()
while sdl2.SDL_PollEvent(ctypes.byref(ev)):
if ev.type == sdl2.SDL_QUIT:
eos8.set()
elif ev.type == sdl2.SDL_KEYDOWN:
sym = ev.key.keysym.sym
if sym in (sdl2.SDLK_ESCAPE, sdl2.SDLK_q):
eos8.set()
# Upload + render if a new frame is ready.
with fs.lock:
if not fs.dirty or fs.raw_arr is None:
pass
else:
w8 = fs.width; h8 = fs.height
pitch8 = fs.pitch
uv_pitch8 = fs.uv_pitch
y_size8 = fs.y_size
fmt8 = fs.pixel_format
arr8 = fs.raw_arr
fs.dirty = False
frame_n += 1
# --- Lazy texture creation / resize ---
if texture_size != (w8, h8):
if texture:
sdl2.SDL_DestroyTexture(texture)
texture = sdl2.SDL_CreateTexture(
renderer,
sdl2.SDL_PIXELFORMAT_NV12,
sdl2.SDL_TEXTUREACCESS_STREAMING,
w8, h8,
)
texture_size = (w8, h8)
print(f" [texture] created {w8}×{h8} NV12 (AR={w8/h8:.3f})")
# --- SDL_UpdateNVTexture upload ---
t_up0 = time.monotonic()
if fmt8 == "NV12" and y_size8 > 0 and texture:
y_ptr = ctypes.cast(arr8, ctypes.POINTER(ctypes.c_ubyte))
uv_ptr = ctypes.cast(
ctypes.byref(arr8, y_size8),
ctypes.POINTER(ctypes.c_ubyte),
)
sdl2.SDL_UpdateNVTexture(
texture, None, y_ptr, pitch8, uv_ptr, uv_pitch8,
)
elif texture:
# BGRA fallback (SW decode path)
pix = ctypes.cast(arr8, ctypes.POINTER(ctypes.c_ubyte))
sdl2.SDL_UpdateTexture(texture, None, pix, pitch8)
t_upload = (time.monotonic() - t_up0) * 1e6
# --- SDL_RenderCopy (letterbox into window) ---
t_r0 = time.monotonic()
sdl2.SDL_RenderClear(renderer)
if texture:
# Fit frame into window preserving AR (letterbox).
win_w, win_h = w_actual.value, h_actual.value
scale = min(win_w / w8, win_h / h8) if w8 > 0 and h8 > 0 else 1.0
dw = max(1, int(w8 * scale))
dh = max(1, int(h8 * scale))
dx = (win_w - dw) // 2
dy = (win_h - dh) // 2
dst = sdl2.SDL_Rect(dx, dy, dw, dh)
sdl2.SDL_RenderCopy(renderer, texture, None, dst)
sdl2.SDL_RenderPresent(renderer)
t_render = (time.monotonic() - t_r0) * 1e6
wall_now = time.monotonic()
if frame_n > WARMUP:
fs.upload_us.append(t_upload)
fs.render_us.append(t_render)
fs.frame_wall.append(wall_now)
time.sleep(0.001) # yield to GStreamer thread
pipeline8.set_state(Gst.State.NULL)
eos8.set()
if texture:
sdl2.SDL_DestroyTexture(texture)
sdl2.SDL_DestroyRenderer(renderer)
sdl2.SDL_DestroyWindow(window)
sdl2.SDL_Quit()
# ── Section 8 report ────────────────────────────────────────────────
print()
print(" --- Section 8 Timing Report ---")
print(f" Total GStreamer frames decoded : {fs.frame_count}")
print(f" Frames rendered (excl warmup) : {len(fs.upload_us)}")
print(f" Pixel format seen : {fs.first_fmt or '?'}")
budget = 1_000_000 / 24 # µs per frame @ 24fps nominal
def _stat(label, samples_us):
if not samples_us:
print(f" {label:38s}: no samples")
return
mn = statistics.mean(samples_us)
mx = max(samples_us)
pct = mn / budget * 100
print(f" {label:38s}: mean {mn:6.0f} µs max {mx:6.0f} µs ({pct:.1f}% budget)")
_stat("memmove (GStreamer thread)", fs.memmove_us[WARMUP:] if len(fs.memmove_us) > WARMUP else fs.memmove_us)
_stat("SDL_UpdateNVTexture (main thread)", fs.upload_us)
_stat("SDL_RenderCopy+Present (main thread)", fs.render_us)
if len(fs.frame_wall) >= 2:
intervals = [fs.frame_wall[i+1] - fs.frame_wall[i]
for i in range(len(fs.frame_wall) - 1)]
elapsed = fs.frame_wall[-1] - fs.frame_wall[0]
fps_act = (len(fs.frame_wall) - 1) / elapsed if elapsed > 0 else 0
dropped = sum(1 for iv in intervals if iv > 0.080)
jitter = statistics.stdev(intervals) * 1000 if len(intervals) > 1 else 0
print(f" {'Rendered FPS':38s}: {fps_act:.2f} (jitter {jitter:.1f} ms, dropped {dropped})")
if errors8:
for e in errors8:
_fail(f"GStreamer: {e}")
total_mean = (
(statistics.mean(fs.memmove_us[WARMUP:]) if len(fs.memmove_us) > WARMUP else 0) +
(statistics.mean(fs.upload_us) if fs.upload_us else 0) +
(statistics.mean(fs.render_us) if fs.render_us else 0)
)
print(f" {'TOTAL (copy+upload+render)':38s}: {total_mean:.0f} µs ({total_mean/budget*100:.1f}% of 41.7ms budget)")
if fps_act < 22:
_fail(f"FPS too low ({fps_act:.2f}) — check timing breakdown above for bottleneck")
elif dropped > 5:
_warn(f"{dropped} dropped frames — pipeline may be too slow under SDL load")
else:
_ok(f"SDL render loop healthy: {fps_act:.2f} fps, {dropped} dropped")
except RuntimeError:
pass # error already printed above
except ImportError as exc:
_warn(f"sdl2 Python bindings not available: {exc}")
_warn("Install: conda install -c conda-forge pysdl2")
print()