SDL2/GStreamer DLNA browser for R36S by Matteo Benedetto
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

389 lines
11 KiB

"""Tests for the SDL-rendered GStreamer backend without using a real pipeline."""
import ctypes
from types import SimpleNamespace
import sdl2
from r36s_dlna_browser.player.gstreamer_backend import GStreamerBackend
class FakeMessageType:
ERROR = 1
EOS = 2
BUFFERING = 4
STATE_CHANGED = 8
WARNING = 16
class FakeFlowReturn:
OK = 0
class FakeState:
NULL = 0
READY = 1
PAUSED = 2
PLAYING = 3
class FakeStateChangeReturn:
FAILURE = -1
SUCCESS = 0
class FakeFormat:
TIME = 1
class FakeSeekFlags:
FLUSH = 1
KEY_UNIT = 2
class FakeCaps:
def __init__(self, width, height):
self._width = width
self._height = height
def get_size(self):
return 1
def get_structure(self, _index):
return FakeStructure(self._width, self._height)
class FakeStructure:
def __init__(self, width, height):
self._width = width
self._height = height
def has_field(self, name):
return name in {"width", "height"}
def get_value(self, name):
return self._width if name == "width" else self._height
class FakeMapInfo:
def __init__(self, data: bytes):
self.data = data
self.size = len(data)
class FakeBuffer:
def __init__(self, payload: bytes):
self._payload = payload
def get_size(self):
return len(self._payload)
def extract_dup(self, _offset, _size):
return self._payload
def map(self, _flags):
return True, FakeMapInfo(self._payload)
def unmap(self, _map_info):
pass
class FakeSample:
def __init__(self, width=1280, height=720, payload: bytes | None = None):
self._caps = FakeCaps(width, height)
self._buffer = FakeBuffer(payload or (b"\x00" * (width * height * 4)))
def get_caps(self):
return self._caps
def get_buffer(self):
return self._buffer
class FakeAppSink:
def __init__(self):
self.props = {}
self.connected = []
self.sample = None
def set_property(self, name, value):
self.props[name] = value
def connect(self, signal_name, callback):
self.connected.append((signal_name, callback))
def emit(self, signal_name):
assert signal_name == "pull-sample"
return self.sample
class FakeBus:
def __init__(self):
self.message = None
def timed_pop_filtered(self, *_args):
msg, self.message = self.message, None
return msg
class FakePipeline:
def __init__(self):
self.props = {}
self.state = FakeState.NULL
self.uri = None
self.position = 12 * 1_000_000_000
self.duration = 93 * 1_000_000_000
self.bus = FakeBus()
self.seek_calls = []
def set_state(self, state):
self.state = state
return FakeStateChangeReturn.SUCCESS
def set_property(self, name, value):
self.props[name] = value
if name == "uri":
self.uri = value
def get_property(self, name):
if name == "volume":
return self.props.get("volume", 1.0)
return self.props.get(name)
def get_bus(self):
return self.bus
def query_position(self, _format):
return True, self.position
def query_duration(self, _format):
return True, self.duration
def seek_simple(self, _format, flags, target):
self.seek_calls.append((flags, target))
self.position = target
return True
class FakeMessage:
def __init__(self, msg_type, src=None, buffering=0, state=None, error_text="boom", structure_name=None):
self.type = msg_type
self.src = src
self._buffering = buffering
self._state = state or (FakeState.NULL, FakeState.PLAYING, FakeState.NULL)
self._error_text = error_text
self._structure_name = structure_name
def parse_buffering(self):
return self._buffering
def parse_state_changed(self):
return self._state
def parse_error(self):
return SimpleNamespace(message=self._error_text), None
def get_structure(self):
if not self._structure_name:
return None
return SimpleNamespace(get_name=lambda: self._structure_name)
class FakeMapFlags:
READ = 1
class FakeGst:
State = FakeState
StateChangeReturn = FakeStateChangeReturn
Format = FakeFormat
SeekFlags = FakeSeekFlags
MessageType = FakeMessageType
FlowReturn = FakeFlowReturn
MapFlags = FakeMapFlags
SECOND = 1_000_000_000
MSECOND = 1_000_000
Caps = SimpleNamespace(from_string=lambda value: value)
class _FakeFinfo:
def __init__(self, name):
self.name = name
class FakeVideoInfoValue:
def __init__(self, width, height, stride, fmt="BGRA"):
self.width = width
self.height = height
self.stride = [stride, stride]
self.finfo = _FakeFinfo(fmt)
class FakeVideoInfo:
@staticmethod
def new_from_caps(caps):
structure = caps.get_structure(0)
width = structure.get_value("width")
height = structure.get_value("height")
fmt = "BGRA"
try:
fmt = structure.get_value("format") or fmt
except Exception:
pass
return FakeVideoInfoValue(width, height, width * 4, fmt)
class FakeGstVideo:
VideoInfo = FakeVideoInfo
class TestGStreamerBackend:
def _make_backend(self):
pipeline = FakePipeline()
sink = FakeAppSink()
backend = GStreamerBackend(
gst_module=FakeGst,
gst_video_module=FakeGstVideo,
appsink_factory=lambda: sink,
playbin_factory=lambda: pipeline,
subsystem="x11",
)
backend._start_bus_thread = lambda: None
backend._stop_bus_thread = lambda: None
return backend, pipeline, sink
def test_not_playing_initially(self):
backend, _pipeline, _sink = self._make_backend()
assert not backend.is_playing()
def test_play_sets_uri_and_configures_appsink(self):
backend, pipeline, sink = self._make_backend()
backend.play("http://example.com/video.mp4")
assert backend.is_playing()
assert pipeline.uri == "http://example.com/video.mp4"
assert pipeline.props["video-sink"] is sink
assert sink.props["emit-signals"] is True
assert sink.props["caps"] == "video/x-raw,format=BGRA"
def test_new_sample_marks_frame_dirty_and_updates_resolution(self):
events = []
backend, _pipeline, sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
sink.sample = FakeSample(width=640, height=360)
assert backend._on_new_sample(sink) == FakeFlowReturn.OK
assert backend.has_new_frame() is True
assert ("resolution", "640x360") in events
def test_toggle_pause_switches_pipeline_state(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
assert backend.toggle_pause() is True
assert pipeline.state == FakeState.PAUSED
assert backend.toggle_pause() is False
assert pipeline.state == FakeState.PLAYING
def test_seek_uses_relative_position(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
backend.seek(10)
assert pipeline.seek_calls == [(FakeSeekFlags.FLUSH | FakeSeekFlags.KEY_UNIT, 22 * 1_000_000_000)]
def test_change_volume_clamps_to_supported_range(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
assert backend.change_volume(15) == 115
assert pipeline.props["volume"] == 1.15
assert backend.change_volume(50) == 130
assert pipeline.props["volume"] == 1.3
def test_eos_notifies_stopped(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._handle_bus_message(FakeMessage(FakeMessageType.EOS, src=pipeline))
assert ("stopped",) in events
assert not backend.is_playing()
assert pipeline.state == FakeState.NULL
def test_buffering_and_metrics_feed_hud(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._resolution = "1280x720"
backend._handle_bus_message(FakeMessage(FakeMessageType.BUFFERING, buffering=76))
backend._emit_playback_metrics()
assert ("buffering", 76) in events
assert ("position", 12.0) in events
assert ("duration", 93.0) in events
assert ("volume", 100) in events
assert ("resolution", "1280x720") in events
def test_state_changed_updates_pause_event(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._handle_bus_message(
FakeMessage(
FakeMessageType.STATE_CHANGED,
src=pipeline,
state=(FakeState.PLAYING, FakeState.PAUSED, FakeState.NULL),
)
)
assert ("paused", True) in events
def test_error_message_propagates(self):
events = []
backend, pipeline, _sink = self._make_backend()
backend.set_event_callback(lambda event, *args: events.append((event, *args)))
backend.play("http://example.com/video.mp4")
backend._handle_bus_message(FakeMessage(FakeMessageType.ERROR, src=pipeline, error_text="decoder fail"))
assert ("error", "decoder fail") in events
assert not backend.is_playing()
def test_stop_sets_pipeline_to_null(self):
backend, pipeline, _sink = self._make_backend()
backend.play("http://example.com/video.mp4")
backend.stop()
assert pipeline.state == FakeState.NULL
assert not backend.is_playing()
def test_render_uploads_latest_frame_and_clears_dirty_flag(self, monkeypatch):
backend, _pipeline, _sink = self._make_backend()
raw_data = b"\x00" * (320 * 180 * 4)
backend._raw_arr = (ctypes.c_ubyte * len(raw_data)).from_buffer_copy(raw_data)
backend._raw_arr_size = len(raw_data)
backend._latest_frame = SimpleNamespace(width=320, height=180, pitch=1280, pixel_format="BGRA", y_size=0, uv_pitch=0, buf_size=len(raw_data))
backend._frame_dirty = True
calls = []
monkeypatch.setattr(sdl2, "SDL_CreateTexture", lambda *_args: object())
monkeypatch.setattr(sdl2, "SDL_UpdateTexture", lambda *_args: 0)
monkeypatch.setattr(sdl2, "SDL_RenderCopy", lambda _renderer, _texture, _src, dst: calls.append((dst.x, dst.y, dst.w, dst.h)))
backend.set_viewport(640, 480, 48, 80, 12, 12)
assert backend.render(object()) is True
assert backend.has_new_frame() is False
assert calls == [(12, 51, 616, 346)]