"""Tests for the SDL-rendered GStreamer backend without using a real pipeline.""" import ctypes from types import SimpleNamespace import sdl2 from r36s_dlna_browser.player.gstreamer_backend import GStreamerBackend class FakeMessageType: ERROR = 1 EOS = 2 BUFFERING = 4 STATE_CHANGED = 8 WARNING = 16 class FakeFlowReturn: OK = 0 class FakeState: NULL = 0 READY = 1 PAUSED = 2 PLAYING = 3 class FakeStateChangeReturn: FAILURE = -1 SUCCESS = 0 class FakeFormat: TIME = 1 class FakeSeekFlags: FLUSH = 1 KEY_UNIT = 2 class FakeCaps: def __init__(self, width, height): self._width = width self._height = height def get_size(self): return 1 def get_structure(self, _index): return FakeStructure(self._width, self._height) class FakeStructure: def __init__(self, width, height): self._width = width self._height = height def has_field(self, name): return name in {"width", "height"} def get_value(self, name): return self._width if name == "width" else self._height class FakeMapInfo: def __init__(self, data: bytes): self.data = data self.size = len(data) class FakeBuffer: def __init__(self, payload: bytes): self._payload = payload def get_size(self): return len(self._payload) def extract_dup(self, _offset, _size): return self._payload def map(self, _flags): return True, FakeMapInfo(self._payload) def unmap(self, _map_info): pass class FakeSample: def __init__(self, width=1280, height=720, payload: bytes | None = None): self._caps = FakeCaps(width, height) self._buffer = FakeBuffer(payload or (b"\x00" * (width * height * 4))) def get_caps(self): return self._caps def get_buffer(self): return self._buffer class FakeAppSink: def __init__(self): self.props = {} self.connected = [] self.sample = None def set_property(self, name, value): self.props[name] = value def connect(self, signal_name, callback): self.connected.append((signal_name, callback)) def emit(self, signal_name): assert signal_name == "pull-sample" return self.sample class FakeBus: def __init__(self): self.message = None def timed_pop_filtered(self, *_args): msg, self.message = self.message, None return msg class FakePipeline: def __init__(self): self.props = {} self.state = FakeState.NULL self.uri = None self.position = 12 * 1_000_000_000 self.duration = 93 * 1_000_000_000 self.bus = FakeBus() self.seek_calls = [] def set_state(self, state): self.state = state return FakeStateChangeReturn.SUCCESS def set_property(self, name, value): self.props[name] = value if name == "uri": self.uri = value def get_property(self, name): if name == "volume": return self.props.get("volume", 1.0) return self.props.get(name) def get_bus(self): return self.bus def query_position(self, _format): return True, self.position def query_duration(self, _format): return True, self.duration def seek_simple(self, _format, flags, target): self.seek_calls.append((flags, target)) self.position = target return True class FakeMessage: def __init__(self, msg_type, src=None, buffering=0, state=None, error_text="boom", structure_name=None): self.type = msg_type self.src = src self._buffering = buffering self._state = state or (FakeState.NULL, FakeState.PLAYING, FakeState.NULL) self._error_text = error_text self._structure_name = structure_name def parse_buffering(self): return self._buffering def parse_state_changed(self): return self._state def parse_error(self): return SimpleNamespace(message=self._error_text), None def get_structure(self): if not self._structure_name: return None return SimpleNamespace(get_name=lambda: self._structure_name) class FakeGhostPad: def __init__(self, name, pad): self.name = name def set_active(self, _): pass class FakeBin: def __init__(self, name=""): self.name = name self._elements = [] self._pads = [] self.props = {} def add(self, elem): self._elements.append(elem) def add_pad(self, pad): self._pads.append(pad) def set_property(self, name, value): self.props[name] = value class FakeMapFlags: READ = 1 class FakeGst: State = FakeState StateChangeReturn = FakeStateChangeReturn Format = FakeFormat SeekFlags = FakeSeekFlags MessageType = FakeMessageType FlowReturn = FakeFlowReturn MapFlags = FakeMapFlags SECOND = 1_000_000_000 MSECOND = 1_000_000 Caps = SimpleNamespace(from_string=lambda value: value) GhostPad = SimpleNamespace(new=lambda name, pad: FakeGhostPad(name, pad)) Bin = SimpleNamespace(new=lambda name="": FakeBin(name)) @staticmethod def ElementFactory_make(name, alias=None): return None # signal "not available" so SW fallback kicks in # GStreamer uses Gst.ElementFactory.make, exposed as class attribute below ElementFactory = SimpleNamespace(make=lambda name, alias=None: None) FakeGst.ElementFactory = ElementFactory class _FakeFinfo: def __init__(self, name): self.name = name class FakeVideoInfoValue: def __init__(self, width, height, stride, fmt="BGRA"): self.width = width self.height = height self.stride = [stride, stride] self.finfo = _FakeFinfo(fmt) class FakeVideoInfo: @staticmethod def new_from_caps(caps): structure = caps.get_structure(0) width = structure.get_value("width") height = structure.get_value("height") fmt = "BGRA" try: fmt = structure.get_value("format") or fmt except Exception: pass return FakeVideoInfoValue(width, height, width * 4, fmt) class FakeGstVideo: VideoInfo = FakeVideoInfo class TestGStreamerBackend: def _make_backend(self): pipeline = FakePipeline() sink = FakeAppSink() backend = GStreamerBackend( gst_module=FakeGst, gst_video_module=FakeGstVideo, appsink_factory=lambda: sink, playbin_factory=lambda: pipeline, subsystem="x11", ) backend._start_bus_thread = lambda: None backend._stop_bus_thread = lambda: None return backend, pipeline, sink def test_not_playing_initially(self): backend, _pipeline, _sink = self._make_backend() assert not backend.is_playing() def test_play_sets_uri_and_configures_appsink(self): backend, pipeline, sink = self._make_backend() backend.play("http://example.com/video.mp4") assert backend.is_playing() assert pipeline.uri == "http://example.com/video.mp4" assert pipeline.props["video-sink"] is sink assert sink.props["emit-signals"] is True assert sink.props["caps"] == "video/x-raw,format=BGRA" def test_new_sample_marks_frame_dirty_and_updates_resolution(self): events = [] backend, _pipeline, sink = self._make_backend() backend.set_event_callback(lambda event, *args: events.append((event, *args))) sink.sample = FakeSample(width=640, height=360) assert backend._on_new_sample(sink) == FakeFlowReturn.OK assert backend.has_new_frame() is True assert ("resolution", "640x360") in events def test_toggle_pause_switches_pipeline_state(self): backend, pipeline, _sink = self._make_backend() backend.play("http://example.com/video.mp4") assert backend.toggle_pause() is True assert pipeline.state == FakeState.PAUSED assert backend.toggle_pause() is False assert pipeline.state == FakeState.PLAYING def test_seek_uses_relative_position(self): backend, pipeline, _sink = self._make_backend() backend.play("http://example.com/video.mp4") backend.seek(10) assert pipeline.seek_calls == [(FakeSeekFlags.FLUSH | FakeSeekFlags.KEY_UNIT, 22 * 1_000_000_000)] def test_change_volume_clamps_to_supported_range(self): backend, pipeline, _sink = self._make_backend() backend.play("http://example.com/video.mp4") assert backend.change_volume(15) == 115 assert pipeline.props["volume"] == 1.15 assert backend.change_volume(50) == 130 assert pipeline.props["volume"] == 1.3 def test_eos_notifies_stopped(self): events = [] backend, pipeline, _sink = self._make_backend() backend.set_event_callback(lambda event, *args: events.append((event, *args))) backend.play("http://example.com/video.mp4") backend._handle_bus_message(FakeMessage(FakeMessageType.EOS, src=pipeline)) assert ("stopped",) in events assert not backend.is_playing() assert pipeline.state == FakeState.NULL def test_buffering_and_metrics_feed_hud(self): events = [] backend, pipeline, _sink = self._make_backend() backend.set_event_callback(lambda event, *args: events.append((event, *args))) backend.play("http://example.com/video.mp4") backend._resolution = "1280x720" backend._handle_bus_message(FakeMessage(FakeMessageType.BUFFERING, buffering=76)) backend._emit_playback_metrics() assert ("buffering", 76) in events assert ("position", 12.0) in events assert ("duration", 93.0) in events assert ("volume", 100) in events assert ("resolution", "1280x720") in events def test_state_changed_updates_pause_event(self): events = [] backend, pipeline, _sink = self._make_backend() backend.set_event_callback(lambda event, *args: events.append((event, *args))) backend.play("http://example.com/video.mp4") backend._handle_bus_message( FakeMessage( FakeMessageType.STATE_CHANGED, src=pipeline, state=(FakeState.PLAYING, FakeState.PAUSED, FakeState.NULL), ) ) assert ("paused", True) in events def test_error_message_propagates(self): events = [] backend, pipeline, _sink = self._make_backend() backend.set_event_callback(lambda event, *args: events.append((event, *args))) backend.play("http://example.com/video.mp4") backend._handle_bus_message(FakeMessage(FakeMessageType.ERROR, src=pipeline, error_text="decoder fail")) assert ("error", "decoder fail") in events assert not backend.is_playing() def test_stop_sets_pipeline_to_null(self): backend, pipeline, _sink = self._make_backend() backend.play("http://example.com/video.mp4") backend.stop() assert pipeline.state == FakeState.NULL assert not backend.is_playing() def test_render_uploads_latest_frame_and_clears_dirty_flag(self, monkeypatch): backend, _pipeline, _sink = self._make_backend() raw_data = b"\x00" * (320 * 180 * 4) backend._raw_arr = (ctypes.c_ubyte * len(raw_data)).from_buffer_copy(raw_data) backend._raw_arr_size = len(raw_data) backend._latest_frame = SimpleNamespace(width=320, height=180, pitch=1280, pixel_format="BGRA", y_size=0, uv_pitch=0, buf_size=len(raw_data)) backend._frame_dirty = True calls = [] monkeypatch.setattr(sdl2, "SDL_CreateTexture", lambda *_args: object()) monkeypatch.setattr(sdl2, "SDL_UpdateTexture", lambda *_args: 0) monkeypatch.setattr(sdl2, "SDL_RenderCopy", lambda _renderer, _texture, _src, dst: calls.append((dst.x, dst.y, dst.w, dst.h))) backend.set_viewport(640, 480, 48, 80, 12, 12) assert backend.render(object()) is True assert backend.has_new_frame() is False assert calls == [(12, 51, 616, 346)]