You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
389 lines
11 KiB
389 lines
11 KiB
"""Tests for the SDL-rendered GStreamer backend without using a real pipeline.""" |
|
|
|
import ctypes |
|
from types import SimpleNamespace |
|
|
|
import sdl2 |
|
|
|
from r36s_dlna_browser.player.gstreamer_backend import GStreamerBackend |
|
|
|
|
|
class FakeMessageType: |
|
ERROR = 1 |
|
EOS = 2 |
|
BUFFERING = 4 |
|
STATE_CHANGED = 8 |
|
WARNING = 16 |
|
|
|
|
|
class FakeFlowReturn: |
|
OK = 0 |
|
|
|
|
|
class FakeState: |
|
NULL = 0 |
|
READY = 1 |
|
PAUSED = 2 |
|
PLAYING = 3 |
|
|
|
|
|
class FakeStateChangeReturn: |
|
FAILURE = -1 |
|
SUCCESS = 0 |
|
|
|
|
|
class FakeFormat: |
|
TIME = 1 |
|
|
|
|
|
class FakeSeekFlags: |
|
FLUSH = 1 |
|
KEY_UNIT = 2 |
|
|
|
|
|
class FakeCaps: |
|
def __init__(self, width, height): |
|
self._width = width |
|
self._height = height |
|
|
|
def get_size(self): |
|
return 1 |
|
|
|
def get_structure(self, _index): |
|
return FakeStructure(self._width, self._height) |
|
|
|
|
|
class FakeStructure: |
|
def __init__(self, width, height): |
|
self._width = width |
|
self._height = height |
|
|
|
def has_field(self, name): |
|
return name in {"width", "height"} |
|
|
|
def get_value(self, name): |
|
return self._width if name == "width" else self._height |
|
|
|
|
|
class FakeMapInfo: |
|
def __init__(self, data: bytes): |
|
self.data = data |
|
self.size = len(data) |
|
|
|
|
|
class FakeBuffer: |
|
def __init__(self, payload: bytes): |
|
self._payload = payload |
|
|
|
def get_size(self): |
|
return len(self._payload) |
|
|
|
def extract_dup(self, _offset, _size): |
|
return self._payload |
|
|
|
def map(self, _flags): |
|
return True, FakeMapInfo(self._payload) |
|
|
|
def unmap(self, _map_info): |
|
pass |
|
|
|
|
|
class FakeSample: |
|
def __init__(self, width=1280, height=720, payload: bytes | None = None): |
|
self._caps = FakeCaps(width, height) |
|
self._buffer = FakeBuffer(payload or (b"\x00" * (width * height * 4))) |
|
|
|
def get_caps(self): |
|
return self._caps |
|
|
|
def get_buffer(self): |
|
return self._buffer |
|
|
|
|
|
class FakeAppSink: |
|
def __init__(self): |
|
self.props = {} |
|
self.connected = [] |
|
self.sample = None |
|
|
|
def set_property(self, name, value): |
|
self.props[name] = value |
|
|
|
def connect(self, signal_name, callback): |
|
self.connected.append((signal_name, callback)) |
|
|
|
def emit(self, signal_name): |
|
assert signal_name == "pull-sample" |
|
return self.sample |
|
|
|
|
|
class FakeBus: |
|
def __init__(self): |
|
self.message = None |
|
|
|
def timed_pop_filtered(self, *_args): |
|
msg, self.message = self.message, None |
|
return msg |
|
|
|
|
|
class FakePipeline: |
|
def __init__(self): |
|
self.props = {} |
|
self.state = FakeState.NULL |
|
self.uri = None |
|
self.position = 12 * 1_000_000_000 |
|
self.duration = 93 * 1_000_000_000 |
|
self.bus = FakeBus() |
|
self.seek_calls = [] |
|
|
|
def set_state(self, state): |
|
self.state = state |
|
return FakeStateChangeReturn.SUCCESS |
|
|
|
def set_property(self, name, value): |
|
self.props[name] = value |
|
if name == "uri": |
|
self.uri = value |
|
|
|
def get_property(self, name): |
|
if name == "volume": |
|
return self.props.get("volume", 1.0) |
|
return self.props.get(name) |
|
|
|
def get_bus(self): |
|
return self.bus |
|
|
|
def query_position(self, _format): |
|
return True, self.position |
|
|
|
def query_duration(self, _format): |
|
return True, self.duration |
|
|
|
def seek_simple(self, _format, flags, target): |
|
self.seek_calls.append((flags, target)) |
|
self.position = target |
|
return True |
|
|
|
|
|
class FakeMessage: |
|
def __init__(self, msg_type, src=None, buffering=0, state=None, error_text="boom", structure_name=None): |
|
self.type = msg_type |
|
self.src = src |
|
self._buffering = buffering |
|
self._state = state or (FakeState.NULL, FakeState.PLAYING, FakeState.NULL) |
|
self._error_text = error_text |
|
self._structure_name = structure_name |
|
|
|
def parse_buffering(self): |
|
return self._buffering |
|
|
|
def parse_state_changed(self): |
|
return self._state |
|
|
|
def parse_error(self): |
|
return SimpleNamespace(message=self._error_text), None |
|
|
|
def get_structure(self): |
|
if not self._structure_name: |
|
return None |
|
return SimpleNamespace(get_name=lambda: self._structure_name) |
|
|
|
|
|
class FakeMapFlags: |
|
READ = 1 |
|
|
|
|
|
class FakeGst: |
|
State = FakeState |
|
StateChangeReturn = FakeStateChangeReturn |
|
Format = FakeFormat |
|
SeekFlags = FakeSeekFlags |
|
MessageType = FakeMessageType |
|
FlowReturn = FakeFlowReturn |
|
MapFlags = FakeMapFlags |
|
SECOND = 1_000_000_000 |
|
MSECOND = 1_000_000 |
|
Caps = SimpleNamespace(from_string=lambda value: value) |
|
|
|
|
|
class _FakeFinfo: |
|
def __init__(self, name): |
|
self.name = name |
|
|
|
|
|
class FakeVideoInfoValue: |
|
def __init__(self, width, height, stride, fmt="BGRA"): |
|
self.width = width |
|
self.height = height |
|
self.stride = [stride, stride] |
|
self.finfo = _FakeFinfo(fmt) |
|
|
|
|
|
class FakeVideoInfo: |
|
@staticmethod |
|
def new_from_caps(caps): |
|
structure = caps.get_structure(0) |
|
width = structure.get_value("width") |
|
height = structure.get_value("height") |
|
fmt = "BGRA" |
|
try: |
|
fmt = structure.get_value("format") or fmt |
|
except Exception: |
|
pass |
|
return FakeVideoInfoValue(width, height, width * 4, fmt) |
|
|
|
|
|
class FakeGstVideo: |
|
VideoInfo = FakeVideoInfo |
|
|
|
|
|
class TestGStreamerBackend: |
|
def _make_backend(self): |
|
pipeline = FakePipeline() |
|
sink = FakeAppSink() |
|
backend = GStreamerBackend( |
|
gst_module=FakeGst, |
|
gst_video_module=FakeGstVideo, |
|
appsink_factory=lambda: sink, |
|
playbin_factory=lambda: pipeline, |
|
subsystem="x11", |
|
) |
|
backend._start_bus_thread = lambda: None |
|
backend._stop_bus_thread = lambda: None |
|
return backend, pipeline, sink |
|
|
|
def test_not_playing_initially(self): |
|
backend, _pipeline, _sink = self._make_backend() |
|
assert not backend.is_playing() |
|
|
|
def test_play_sets_uri_and_configures_appsink(self): |
|
backend, pipeline, sink = self._make_backend() |
|
|
|
backend.play("http://example.com/video.mp4") |
|
|
|
assert backend.is_playing() |
|
assert pipeline.uri == "http://example.com/video.mp4" |
|
assert pipeline.props["video-sink"] is sink |
|
assert sink.props["emit-signals"] is True |
|
assert sink.props["caps"] == "video/x-raw,format=BGRA" |
|
|
|
def test_new_sample_marks_frame_dirty_and_updates_resolution(self): |
|
events = [] |
|
backend, _pipeline, sink = self._make_backend() |
|
backend.set_event_callback(lambda event, *args: events.append((event, *args))) |
|
sink.sample = FakeSample(width=640, height=360) |
|
|
|
assert backend._on_new_sample(sink) == FakeFlowReturn.OK |
|
|
|
assert backend.has_new_frame() is True |
|
assert ("resolution", "640x360") in events |
|
|
|
def test_toggle_pause_switches_pipeline_state(self): |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.play("http://example.com/video.mp4") |
|
|
|
assert backend.toggle_pause() is True |
|
assert pipeline.state == FakeState.PAUSED |
|
assert backend.toggle_pause() is False |
|
assert pipeline.state == FakeState.PLAYING |
|
|
|
def test_seek_uses_relative_position(self): |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.play("http://example.com/video.mp4") |
|
|
|
backend.seek(10) |
|
|
|
assert pipeline.seek_calls == [(FakeSeekFlags.FLUSH | FakeSeekFlags.KEY_UNIT, 22 * 1_000_000_000)] |
|
|
|
def test_change_volume_clamps_to_supported_range(self): |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.play("http://example.com/video.mp4") |
|
|
|
assert backend.change_volume(15) == 115 |
|
assert pipeline.props["volume"] == 1.15 |
|
assert backend.change_volume(50) == 130 |
|
assert pipeline.props["volume"] == 1.3 |
|
|
|
def test_eos_notifies_stopped(self): |
|
events = [] |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.set_event_callback(lambda event, *args: events.append((event, *args))) |
|
backend.play("http://example.com/video.mp4") |
|
|
|
backend._handle_bus_message(FakeMessage(FakeMessageType.EOS, src=pipeline)) |
|
|
|
assert ("stopped",) in events |
|
assert not backend.is_playing() |
|
assert pipeline.state == FakeState.NULL |
|
|
|
def test_buffering_and_metrics_feed_hud(self): |
|
events = [] |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.set_event_callback(lambda event, *args: events.append((event, *args))) |
|
backend.play("http://example.com/video.mp4") |
|
backend._resolution = "1280x720" |
|
|
|
backend._handle_bus_message(FakeMessage(FakeMessageType.BUFFERING, buffering=76)) |
|
backend._emit_playback_metrics() |
|
|
|
assert ("buffering", 76) in events |
|
assert ("position", 12.0) in events |
|
assert ("duration", 93.0) in events |
|
assert ("volume", 100) in events |
|
assert ("resolution", "1280x720") in events |
|
|
|
def test_state_changed_updates_pause_event(self): |
|
events = [] |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.set_event_callback(lambda event, *args: events.append((event, *args))) |
|
backend.play("http://example.com/video.mp4") |
|
|
|
backend._handle_bus_message( |
|
FakeMessage( |
|
FakeMessageType.STATE_CHANGED, |
|
src=pipeline, |
|
state=(FakeState.PLAYING, FakeState.PAUSED, FakeState.NULL), |
|
) |
|
) |
|
|
|
assert ("paused", True) in events |
|
|
|
def test_error_message_propagates(self): |
|
events = [] |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.set_event_callback(lambda event, *args: events.append((event, *args))) |
|
backend.play("http://example.com/video.mp4") |
|
|
|
backend._handle_bus_message(FakeMessage(FakeMessageType.ERROR, src=pipeline, error_text="decoder fail")) |
|
|
|
assert ("error", "decoder fail") in events |
|
assert not backend.is_playing() |
|
|
|
def test_stop_sets_pipeline_to_null(self): |
|
backend, pipeline, _sink = self._make_backend() |
|
backend.play("http://example.com/video.mp4") |
|
|
|
backend.stop() |
|
|
|
assert pipeline.state == FakeState.NULL |
|
assert not backend.is_playing() |
|
|
|
def test_render_uploads_latest_frame_and_clears_dirty_flag(self, monkeypatch): |
|
backend, _pipeline, _sink = self._make_backend() |
|
raw_data = b"\x00" * (320 * 180 * 4) |
|
backend._raw_arr = (ctypes.c_ubyte * len(raw_data)).from_buffer_copy(raw_data) |
|
backend._raw_arr_size = len(raw_data) |
|
backend._latest_frame = SimpleNamespace(width=320, height=180, pitch=1280, pixel_format="BGRA", y_size=0, uv_pitch=0, buf_size=len(raw_data)) |
|
backend._frame_dirty = True |
|
|
|
calls = [] |
|
|
|
monkeypatch.setattr(sdl2, "SDL_CreateTexture", lambda *_args: object()) |
|
monkeypatch.setattr(sdl2, "SDL_UpdateTexture", lambda *_args: 0) |
|
monkeypatch.setattr(sdl2, "SDL_RenderCopy", lambda _renderer, _texture, _src, dst: calls.append((dst.x, dst.y, dst.w, dst.h))) |
|
|
|
backend.set_viewport(640, 480, 48, 80, 12, 12) |
|
|
|
assert backend.render(object()) is True |
|
assert backend.has_new_frame() is False |
|
assert calls == [(12, 51, 616, 346)]
|
|
|