Browse Source

perf: replace extract_dup+from_buffer_copy with buffer.map+memmove zero-copy

Instead of extract_dup (GLib alloc+memcpy → Python bytes) followed by
from_buffer_copy (Python bytes → ctypes array) — two 3MB copies per frame —
use Gst.Buffer.map(READ) to get a zero-allocation pointer to the decoded
frame memory, then memmove directly into a pre-allocated reusable ctypes
array (_raw_arr).

This reduces the per-frame copy path from 2 copies (6MB) to 1 memmove
(3MB), with no Python bytes object allocation at all.  The memmove happens
under _frame_lock so render() on the main thread never reads a partial frame.
_raw_arr is allocated once on the first frame (or on resolution change) and
reused for every subsequent frame.

_Frame no longer carries a pixels field.  Tests updated accordingly.
Benchmark updated to use the same buffer.map+memmove path as the app.
main
Matteo Benedetto 3 months ago
parent
commit
da02e7446f
  1. 122
      src/r36s_dlna_browser/player/gstreamer_backend.py
  2. 35
      tests/benchmark_nv12_decode.py
  3. 23
      tests/test_player.py
  4. 1247
      uv.lock

122
src/r36s_dlna_browser/player/gstreamer_backend.py

@ -145,14 +145,12 @@ class _Frame:
width: int
height: int
pitch: int
pixels: bytes
pixel_format: str = "BGRA" # "BGRA" or "NV12"
# For NV12: pixels holds the FULL raw buffer (Y + interleaved UV); y_size is
# the byte offset where the UV plane starts. uv_pitch is the UV plane stride.
# Storing the whole buffer in one object avoids two separate bytes slices (each
# a copy) and lets render() do a single from_buffer_copy instead of two.
# For NV12: y_size is the byte offset where the UV plane starts.
y_size: int = 0
uv_pitch: int = 0
# Total raw buffer size (Y+UV for NV12, full frame for BGRA).
buf_size: int = 0
class GStreamerBackend(PlayerBackend):
@ -197,6 +195,10 @@ class GStreamerBackend(PlayerBackend):
self._resolution = ""
self._hw_decoders: list | None = None # None = not yet probed
self._frame_count = 0 # total frames decoded
# Pre-allocated ctypes buffer for zero-copy NV12 frame transfer.
# Sized on first frame; reused every frame to avoid per-frame allocation.
self._raw_arr: ctypes.Array | None = None
self._raw_arr_size: int = 0
def attach_window(self, window: object) -> None:
self._window = window
@ -248,24 +250,15 @@ class GStreamerBackend(PlayerBackend):
if self._frame_dirty:
if frame.pixel_format == "NV12" and frame.y_size > 0:
# NV12 upload via SDL_UpdateNVTexture.
# ONE from_buffer_copy of the full Y+UV buffer, then use
# ctypes.byref(arr, offset) to address Y at 0 and UV at y_size.
# This avoids the two extra bytes slices that were previously
# created in _on_new_sample, cutting per-frame copies from 5 to 2.
# _raw_arr already holds the frame data from _on_new_sample's
# memmove — pass pointers directly, no per-frame allocation.
try:
raw_buf = frame.pixels
arr = (ctypes.c_ubyte * len(raw_buf)).from_buffer_copy(raw_buf)
arr = self._raw_arr
y_ptr = ctypes.cast(arr, ctypes.POINTER(ctypes.c_ubyte))
uv_ptr = ctypes.cast(
ctypes.byref(arr, frame.y_size),
ctypes.POINTER(ctypes.c_ubyte),
)
log.debug(
"SDL_UpdateNVTexture: %dx%d buf=%d y_size=%d pitch=%d uv_pitch=%d",
frame.width, frame.height,
len(raw_buf), frame.y_size,
frame.pitch, frame.uv_pitch,
)
result = sdl2.SDL_UpdateNVTexture(
self._texture, None,
y_ptr, frame.pitch,
@ -279,7 +272,7 @@ class GStreamerBackend(PlayerBackend):
)
return False
else:
pixel_buffer = ctypes.create_string_buffer(frame.pixels)
pixel_buffer = ctypes.cast(self._raw_arr, ctypes.POINTER(ctypes.c_ubyte))
result = sdl2.SDL_UpdateTexture(self._texture, None, pixel_buffer, frame.pitch)
if result != 0:
log.error(
@ -465,49 +458,62 @@ class GStreamerBackend(PlayerBackend):
pass
if fmt_str == "NV12":
# NV12: Y plane (stride[0]) followed immediately by interleaved UV plane (stride[1]).
# Store the WHOLE raw buffer in pixels without slicing — slicing bytes
# creates two extra copies (2 MB + 1 MB) that we can avoid. render()
# uses a single from_buffer_copy of the full buffer and ctypes.byref to
# address the UV plane at the y_size byte offset.
pitch = int(info.stride[0])
# NV12: Y plane (stride[0]) followed by interleaved UV plane (stride[1]).
pitch = int(info.stride[0])
uv_pitch = int(info.stride[1])
y_size = pitch * height
raw = buffer.extract_dup(0, buffer.get_size())
if self._frame_count == 0:
log.info(
"First NV12 frame: %dx%d y_pitch=%d uv_pitch=%d "
"y_size=%d buf_total=%d",
width, height, pitch, uv_pitch, y_size, len(raw),
)
frame = _Frame(
width=width, height=height,
pitch=pitch, pixels=raw,
pixel_format="NV12",
y_size=y_size, uv_pitch=uv_pitch,
)
y_size = pitch * height
buf_size = buffer.get_size()
else:
pitch = int(info.stride[0]) if info.stride else width * 4
pixels = buffer.extract_dup(0, buffer.get_size())
if self._frame_count == 0:
log.info(
"First %s frame: %dx%d pitch=%d buf_total=%d",
fmt_str, width, height, pitch, buffer.get_size(),
)
frame = _Frame(width=width, height=height, pitch=pitch, pixels=pixels)
uv_pitch = 0
y_size = 0
buf_size = buffer.get_size()
with self._frame_lock:
self._frame_count += 1
if self._frame_count <= 3 or self._frame_count % 300 == 0:
log.debug("Frame #%d fmt=%s %dx%d", self._frame_count, fmt_str, width, height)
prev_res = self._resolution
self._latest_frame = frame
self._frame_dirty = True
if resolution != prev_res:
self._resolution = resolution
if prev_res:
log.info("Resolution changed: %s -> %s", prev_res, resolution)
self._event_callback("resolution", resolution)
# Map the GStreamer buffer (zero-copy access to decoded frame memory).
ok, map_info = buffer.map(self._gst.MapFlags.READ)
if not ok:
return self._flow_ok()
try:
src_size = map_info.size
# memmove into _raw_arr inside the frame lock so render() never
# reads a partially-written buffer from the main thread.
with self._frame_lock:
if self._raw_arr is None or self._raw_arr_size < src_size:
self._raw_arr = (ctypes.c_ubyte * src_size)()
self._raw_arr_size = src_size
ctypes.memmove(self._raw_arr, map_info.data, src_size)
self._frame_count += 1
if self._frame_count == 1:
log.info(
"First %s frame: %dx%d pitch=%d uv_pitch=%d "
"y_size=%d buf_total=%d (alloc ctypes buf %d)",
fmt_str, width, height, pitch, uv_pitch,
y_size, buf_size, src_size,
)
elif self._frame_count <= 3 or self._frame_count % 300 == 0:
log.debug("Frame #%d fmt=%s %dx%d", self._frame_count, fmt_str, width, height)
if fmt_str == "NV12":
frame = _Frame(
width=width, height=height, pitch=pitch,
pixel_format="NV12",
y_size=y_size, uv_pitch=uv_pitch, buf_size=buf_size,
)
else:
frame = _Frame(width=width, height=height, pitch=pitch,
buf_size=buf_size)
prev_res = self._resolution
self._latest_frame = frame
self._frame_dirty = True
if resolution != prev_res:
self._resolution = resolution
if prev_res:
log.info("Resolution changed: %s -> %s", prev_res, resolution)
self._event_callback("resolution", resolution)
finally:
buffer.unmap(map_info)
return self._flow_ok()
@ -647,6 +653,8 @@ class GStreamerBackend(PlayerBackend):
self._frame_dirty = False
self._resolution = ""
self._frame_count = 0
self._raw_arr = None
self._raw_arr_size = 0
self._destroy_texture()
def _set_playing(self, value: bool, notify: bool) -> None:

35
tests/benchmark_nv12_decode.py

@ -153,6 +153,8 @@ class Stats:
lock: threading.Lock = field(default_factory=threading.Lock)
stats = Stats()
stats._raw_arr = None
stats._raw_arr_size = 0
# ── Callback ────────────────────────────────────────────────────────────────
@ -183,23 +185,22 @@ def _on_sample(sink) -> Gst.FlowReturn:
except Exception:
pass
# Measure extract_dup (GStreamer buf → Python bytes) + from_buffer_copy
# (Python bytes → ctypes array for SDL upload). del objects immediately
# after timing so CPython's ref-counting frees the 3+ MB allocations at
# once rather than letting them accumulate across frames (OOM on 1 GB device).
# Measure buffer.map(READ) + memmove into a pre-allocated ctypes array
# (same path as the app). Reuse a single ctypes array across frames to
# avoid per-frame allocation. del is not needed — ctypes array is reused.
t0 = time.monotonic()
raw = buf.extract_dup(0, buf.get_size())
extract_us = (time.monotonic() - t0) * 1e6
if fmt_str == "NV12":
y_size = int(info.stride[0]) * int(info.height)
t1 = time.monotonic()
arr = (ctypes.c_ubyte * len(raw)).from_buffer_copy(raw)
copy_us = extract_us + (time.monotonic() - t1) * 1e6
del arr # free 3 MB ctypes array immediately
else:
copy_us = extract_us
del raw # free 3 MB bytes object immediately
ok, map_info = buf.map(Gst.MapFlags.READ)
if not ok:
return Gst.FlowReturn.OK
try:
src_size = map_info.size
if not hasattr(stats, '_raw_arr') or stats._raw_arr_size < src_size:
stats._raw_arr = (ctypes.c_ubyte * src_size)()
stats._raw_arr_size = src_size
ctypes.memmove(stats._raw_arr, map_info.data, src_size)
copy_us = (time.monotonic() - t0) * 1e6
finally:
buf.unmap(map_info)
with stats.lock:
stats.total_frames += 1
@ -341,7 +342,7 @@ else:
if copy_us:
mean_copy = statistics.mean(copy_us)
max_copy = max(copy_us)
print(f"\n --- CPU copy cost (from_buffer_copy) ---")
print(f"\n --- CPU copy cost (buffer.map + memmove) ---")
print(f" Mean copy time : {mean_copy:.0f} µs")
print(f" Max copy time : {max_copy:.0f} µs")
budget_us = 1_000_000 / (actual_fps if len(wall_times) >= 2 and actual_fps > 0 else 30)

23
tests/test_player.py

@ -13,6 +13,7 @@ class FakeMessageType:
EOS = 2
BUFFERING = 4
STATE_CHANGED = 8
WARNING = 16
class FakeFlowReturn:
@ -64,6 +65,12 @@ class FakeStructure:
return self._width if name == "width" else self._height
class FakeMapInfo:
def __init__(self, data: bytes):
self.data = data
self.size = len(data)
class FakeBuffer:
def __init__(self, payload: bytes):
self._payload = payload
@ -74,6 +81,12 @@ class FakeBuffer:
def extract_dup(self, _offset, _size):
return self._payload
def map(self, _flags):
return True, FakeMapInfo(self._payload)
def unmap(self, _map_info):
pass
class FakeSample:
def __init__(self, width=1280, height=720, payload: bytes | None = None):
@ -176,6 +189,10 @@ class FakeMessage:
return SimpleNamespace(get_name=lambda: self._structure_name)
class FakeMapFlags:
READ = 1
class FakeGst:
State = FakeState
StateChangeReturn = FakeStateChangeReturn
@ -183,6 +200,7 @@ class FakeGst:
SeekFlags = FakeSeekFlags
MessageType = FakeMessageType
FlowReturn = FakeFlowReturn
MapFlags = FakeMapFlags
SECOND = 1_000_000_000
MSECOND = 1_000_000
Caps = SimpleNamespace(from_string=lambda value: value)
@ -352,7 +370,10 @@ class TestGStreamerBackend:
def test_render_uploads_latest_frame_and_clears_dirty_flag(self, monkeypatch):
backend, _pipeline, _sink = self._make_backend()
backend._latest_frame = SimpleNamespace(width=320, height=180, pitch=1280, pixels=b"\x00" * (320 * 180 * 4), pixel_format="BGRA", uv_pixels=None, uv_pitch=0)
raw_data = b"\x00" * (320 * 180 * 4)
backend._raw_arr = (ctypes.c_ubyte * len(raw_data)).from_buffer_copy(raw_data)
backend._raw_arr_size = len(raw_data)
backend._latest_frame = SimpleNamespace(width=320, height=180, pitch=1280, pixel_format="BGRA", y_size=0, uv_pitch=0, buf_size=len(raw_data))
backend._frame_dirty = True
calls = []

1247
uv.lock

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save