From 60732f29860feef0061b17413e08d14a2604fb4a Mon Sep 17 00:00:00 2001 From: Uyanide Date: Fri, 10 Apr 2026 07:45:02 +0200 Subject: [PATCH] feat: add watch print mode test: refactor test_watch style: add inline comments for watch --- pyproject.toml | 2 +- src/lrx_cli/cli.py | 32 ++ src/lrx_cli/watch/__init__.py | 2 - src/lrx_cli/watch/control.py | 11 +- src/lrx_cli/watch/fetcher.py | 9 +- src/lrx_cli/watch/player.py | 23 +- src/lrx_cli/watch/session.py | 36 +- src/lrx_cli/watch/tracker.py | 13 +- src/lrx_cli/watch/view/__init__.py | 12 + src/lrx_cli/watch/view/pipe.py | 13 +- src/lrx_cli/watch/view/print.py | 44 +++ tests/test_watch.py | 559 ++++++++++++++++------------- uv.lock | 2 +- 13 files changed, 497 insertions(+), 261 deletions(-) create mode 100644 src/lrx_cli/watch/view/print.py diff --git a/pyproject.toml b/pyproject.toml index 70ad7e4..ee879bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "lrx-cli" -version = "0.7.2" +version = "0.7.3" description = "Fetch line-synced lyrics for your music player." readme = "README.md" requires-python = ">=3.13" diff --git a/src/lrx_cli/cli.py b/src/lrx_cli/cli.py index b3a9628..6c25c7b 100644 --- a/src/lrx_cli/cli.py +++ b/src/lrx_cli/cli.py @@ -29,6 +29,7 @@ from .lrc import get_sidecar_path from .watch import WatchCoordinator from .watch.control import ControlClient, parse_delta from .watch.view.pipe import PipeOutput +from .watch.view.print import PrintOutput app = cyclopts.App( @@ -432,6 +433,37 @@ def pipe( logger.info("Watch stopped.") +@watch_app.command(name="print") +def watch_print( + plain: Annotated[ + bool, + cyclopts.Parameter( + name="--plain", + negative="", + help="Output plain text (strips all tags). Takes priority over --normalize.", + ), + ] = False, +) -> None: + """Watch active player and print all lyrics to stdout once per track change.""" + logger.info( + "Starting watch print (player filter: {})", + _player or "", + ) + output = PrintOutput(plain=plain) + try: + session = WatchCoordinator( + manager, + output, + player_hint=_player, + config=_app_config, + ) + success = asyncio.run(session.run()) + if not success: + sys.exit(1) + except KeyboardInterrupt: + logger.info("Watch stopped.") + + @ctl_app.command def offset(delta: str) -> None: """Adjust watch offset. Examples: +200, -200, 0.""" diff --git a/src/lrx_cli/watch/__init__.py b/src/lrx_cli/watch/__init__.py index 06ddf68..f2f968a 100644 --- a/src/lrx_cli/watch/__init__.py +++ b/src/lrx_cli/watch/__init__.py @@ -1,5 +1,3 @@ -"""Watch subsystem public exports.""" - from .session import WatchCoordinator __all__ = ["WatchCoordinator"] diff --git a/src/lrx_cli/watch/control.py b/src/lrx_cli/watch/control.py index f9c4a0a..8c10ae6 100644 --- a/src/lrx_cli/watch/control.py +++ b/src/lrx_cli/watch/control.py @@ -1,4 +1,8 @@ -"""Unix-socket control channel for communicating with a running watch session.""" +""" +Author: Uyanide pywang0608@foxmail.com +Date: 2026-04-10 08:14:58 +Description: Unix-socket control channel for communicating with a running watch session. +""" import asyncio import json @@ -40,14 +44,17 @@ class ControlServer: return True try: + # probe the socket to distinguish a live session from a stale socket file reader, writer = await asyncio.open_unix_connection(str(self._socket_path)) writer.close() await writer.wait_closed() + # connection succeeded → another watch session is actively listening logger.error( "A watch session is already running. Use 'lrx watch ctl status'." ) return False except Exception: + # connection refused / file is stale → safe to remove and reuse try: self._socket_path.unlink(missing_ok=True) except Exception: @@ -136,6 +143,8 @@ def parse_delta(raw: str) -> tuple[bool, int | None, str | None]: if value.startswith("+"): return True, int(value[1:]), None if value.startswith("-"): + # keep the sign by negating; bare int() would accept "-123" too but + # explicit split is clearer about intent and avoids double-negative edge cases return True, -int(value[1:]), None return True, int(value), None except ValueError: diff --git a/src/lrx_cli/watch/fetcher.py b/src/lrx_cli/watch/fetcher.py index db6d3ed..260cacd 100644 --- a/src/lrx_cli/watch/fetcher.py +++ b/src/lrx_cli/watch/fetcher.py @@ -1,4 +1,8 @@ -"""Debounced lyric fetch orchestration for watch session.""" +""" +Author: Uyanide pywang0608@foxmail.com +Date: 2026-04-10 08:14:41 +Description: Debounced lyric fetch orchestration for watch session. +""" import asyncio from typing import Awaitable, Callable, Optional @@ -50,6 +54,7 @@ class LyricFetcher: """Request lyrics for track with debounce collapsing.""" self._pending_track = track if self._debounce_task is not None: + # cancel any pending debounce window — the new request supersedes it self._debounce_task.cancel() self._debounce_task = asyncio.create_task(self._debounce_then_fetch()) @@ -61,6 +66,7 @@ class LyricFetcher: return if self._fetch_task is not None: + # abort any in-flight fetch for a previous track before starting the new one self._fetch_task.cancel() await asyncio.gather(self._fetch_task, return_exceptions=True) @@ -68,6 +74,7 @@ class LyricFetcher: async def _do_fetch(self, track: TrackMeta) -> None: """Execute fetch lifecycle callbacks and fetch lyrics for a track.""" + # callbacks may be plain functions or coroutines — handle both fetching_callback_result = self._on_fetching() if asyncio.iscoroutine(fetching_callback_result): await fetching_callback_result diff --git a/src/lrx_cli/watch/player.py b/src/lrx_cli/watch/player.py index 85f9f12..7c6ca3d 100644 --- a/src/lrx_cli/watch/player.py +++ b/src/lrx_cli/watch/player.py @@ -1,4 +1,8 @@ -"""Player discovery, state monitoring, and active-player selection for watch mode.""" +""" +Author: Uyanide pywang0608@foxmail.com +Date: 2026-04-10 08:14:27 +Description: Player discovery, state monitoring, and active-player selection for watch mode. +""" from dataclasses import dataclass from typing import Callable, Optional @@ -219,6 +223,7 @@ class PlayerMonitor: trackid = metadata.get("mpris:trackid") if trackid is not None: trackid = _variant_value(trackid) + # normalize Spotify track IDs — the raw MPRIS value varies by client version if isinstance(trackid, str) and trackid.startswith("spotify:track:"): trackid = trackid.removeprefix("spotify:track:") elif isinstance(trackid, str) and trackid.startswith("/com/spotify/track/"): @@ -230,12 +235,14 @@ class PlayerMonitor: length_ms = None length_value = _variant_value(length) if length is not None else None if isinstance(length_value, int): + # MPRIS reports length in microseconds; convert to milliseconds length_ms = length_value // 1000 artist = metadata.get("xesam:artist") artist_v = None artist_value = _variant_value(artist) if artist is not None else None if isinstance(artist_value, list) and artist_value: + # xesam:artist is a list; take the first entry as primary artist artist_v = artist_value[0] title = metadata.get("xesam:title") @@ -286,10 +293,14 @@ class PlayerMonitor: async def _resolve_well_known_name(self, unique_sender: str) -> str | None: """Map a DBus unique sender (e.g. :1.42) to a tracked MPRIS bus name.""" if unique_sender in self.players: + # sender is already a well-known name we track (unlikely but fast path) return unique_sender if not self._bus: return None + # Seeked signals arrive with the unique connection name (:1.N), not the + # well-known bus name (org.mpris.MediaPlayer2.X). Ask D-Bus which + # well-known name owns that unique name. for bus_name in self.players: try: reply = await self._bus.call( @@ -325,6 +336,7 @@ class PlayerMonitor: message.interface == "org.freedesktop.DBus" and message.member == "NameOwnerChanged" ): + # a player appeared or disappeared — rescan the full player list if message.body and str(message.body[0]).startswith( "org.mpris.MediaPlayer2." ): @@ -335,7 +347,9 @@ class PlayerMonitor: message.interface == "org.freedesktop.DBus.Properties" and message.member == "PropertiesChanged" ): - # Message.sender is a DBus unique name, so match by path+iface. + # message.sender is a unique connection name, not the well-known bus + # name, so we can't filter by sender here — match by object path and + # interface instead to scope it to MPRIS Player properties only path_ok = message.path == "/org/mpris/MediaPlayer2" iface = message.body[0] if message.body else None if path_ok and iface == "org.mpris.MediaPlayer2.Player": @@ -348,6 +362,7 @@ class PlayerMonitor: ): sender = message.sender or "" if sender and message.body: + # MPRIS Seeked position is in microseconds; convert to ms position_us = int(message.body[0]) asyncio.create_task( self._handle_seeked_signal( @@ -391,15 +406,19 @@ class ActivePlayerSelector: playing = [name for name, st in players.items() if st.status == "Playing"] if len(playing) == 1: + # unambiguous — only one player is currently playing return playing[0] preferred = preferred_player.lower().strip() + # when multiple players are playing, narrow candidates to those; otherwise + # fall back to all known players so a paused preferred player still wins candidates = playing if playing else list(players.keys()) if preferred: for name in candidates: if preferred in name.lower(): return name + # preserve the last selection to avoid jitter when nothing else changes if last_active and last_active in players: return last_active diff --git a/src/lrx_cli/watch/session.py b/src/lrx_cli/watch/session.py index 3e63490..df354a2 100644 --- a/src/lrx_cli/watch/session.py +++ b/src/lrx_cli/watch/session.py @@ -1,8 +1,11 @@ -"""Watch orchestration with explicit MVVM role boundaries. +""" +Author: Uyanide pywang0608@foxmail.com +Date: 2026-04-10 08:10:52 +Description: Watch orchestration with explicit MVVM role boundaries. -- Model: WatchModel stores domain state. -- ViewModel: WatchViewModel projects model to output-facing state/signature. -- Coordinator: WatchCoordinator wires services and drives async workflows. + - Model: WatchModel stores domain state. + - ViewModel: WatchViewModel projects model to output-facing state/signature. + - Coordinator: WatchCoordinator wires services and drives async workflows. """ import asyncio @@ -48,6 +51,8 @@ class WatchModel: def state_signature(self, track: TrackMeta | None, position_ms: int) -> tuple: """Build dedupe signature from model state and current lyric cursor.""" + # prefer trackid when available; fall back to display name for players + # that don't expose a stable ID (e.g. some MPRIS implementations) track_key = ( track.trackid if track and track.trackid @@ -57,6 +62,7 @@ class WatchModel: ) if self.status != WatchStatus.OK or self.lyrics is None: + # non-OK states don't have cursor position — discriminate by status alone return ("status", self.status, self.active_player, track_key) at_ms = position_ms + self.offset_ms cursor = self.lyrics.signature_cursor(at_ms) @@ -164,7 +170,9 @@ class WatchCoordinator: await self._player_monitor.start() await self._tracker.start() self._calibration_task = asyncio.create_task(self._calibration_loop()) + # emit once at startup so outputs don't sit blank until the first event self._schedule_emit() + # block forever; CancelledError from signal handler exits the loop cleanly await asyncio.Event().wait() return True except asyncio.CancelledError: @@ -206,8 +214,10 @@ class WatchCoordinator: if track is None: return False if self._model.lyrics is not None: + # lyrics already loaded — nothing to fetch return False if self._model.status == WatchStatus.FETCHING: + # a fetch is already in flight — don't queue another return False logger.info("fetching lyrics for track ({}): {}", reason, track.display_name()) self._fetcher.request(track) @@ -272,6 +282,7 @@ class WatchCoordinator: track_changed = track_key != prev_track_key player_changed = selected != prev_player if track_changed or player_changed: + # clear stale lyrics immediately so the old track's lines don't flash self._model.set_lyrics(None) self._model.active_track_key = track_key @@ -284,15 +295,19 @@ class WatchCoordinator: ) ) + # only fetch on identity change — calibration ticks must not re-trigger fetches started_fetch = False if track is not None and (player_changed or track_changed): started_fetch = self._request_fetch_for_active_track("track-changed") + # derive status from what actually happened this tick; preserve FETCHING + # if an in-flight request was started before this snapshot arrived if self._model.lyrics is not None: self._model.status = WatchStatus.OK elif started_fetch: self._model.status = WatchStatus.FETCHING elif self._model.status != WatchStatus.FETCHING: + # don't overwrite FETCHING with NO_LYRICS while a request is in flight self._model.status = WatchStatus.NO_LYRICS self._schedule_emit() @@ -306,12 +321,13 @@ class WatchCoordinator: def _on_tracker_tick(self) -> None: """Emit updates from tracker tick only while lyrics are actively rendering.""" - if self._model.status == WatchStatus.OK: + if self._model.status == WatchStatus.OK and self._output.position_sensitive: self._schedule_emit() def _schedule_emit(self) -> None: """Coalesce frequent events into at most one in-flight emit task.""" if self._emit_scheduled: + # a task is already queued; it will pick up the latest model state when it runs return self._emit_scheduled = True asyncio.create_task(self._run_scheduled_emit()) @@ -321,6 +337,7 @@ class WatchCoordinator: try: await self._emit_state() finally: + # release the gate even on error so future events can still schedule self._emit_scheduled = False async def _on_fetching(self) -> None: @@ -344,10 +361,17 @@ class WatchCoordinator: """Emit output state only when semantic signature changes.""" player = self._player_monitor.players.get(self._model.active_player or "") track = player.track if player else None - position = await self._tracker.get_position_ms() + # position=0 for non-position-sensitive outputs so the signature is stable + # across ticks and on_state fires at most once per track+status transition + position = ( + await self._tracker.get_position_ms() + if self._output.position_sensitive + else 0 + ) signature = self._view_model.signature(track, position) if signature == self._last_emit_signature: + # state hasn't changed semantically — skip redundant render return self._last_emit_signature = signature state = self._view_model.state(track, position) diff --git a/src/lrx_cli/watch/tracker.py b/src/lrx_cli/watch/tracker.py index bd565af..91c0f75 100644 --- a/src/lrx_cli/watch/tracker.py +++ b/src/lrx_cli/watch/tracker.py @@ -1,4 +1,8 @@ -"""Playback position tracking utilities for watch mode.""" +""" +Author: Uyanide pywang0608@foxmail.com +Date: 2026-04-10 08:13:35 +Description: Playback position tracking utilities for watch mode. +""" import asyncio import time @@ -69,7 +73,10 @@ class PositionTracker: self._is_playing = playback_status == "Playing" status_changed_to_playing = self._is_playing and not was_playing if player_changed or track_changed: + # reset to 0 so stale position from a previous track doesn't bleed through self._position_ms = 0 + # only poll MPRIS when something changed and the player is actually running; + # avoids an unnecessary D-Bus round-trip on every calibration-loop tick should_calibrate_now = ( self._is_playing and bool(self._active_player) @@ -97,6 +104,7 @@ class PositionTracker: return was_playing = self._is_playing self._is_playing = playback_status == "Playing" + # re-anchor last_tick when resuming so the gap while paused isn't counted should_calibrate_now = self._is_playing and not was_playing self._last_tick = time.monotonic() @@ -112,10 +120,13 @@ class PositionTracker: async with self._lock: now = time.monotonic() if self._is_playing and self._active_player: + # accumulate elapsed wall-clock time as playback position; + # seek events and calibration snapshots correct drift periodically delta_ms = int((now - self._last_tick) * 1000) if delta_ms > 0: self._position_ms += delta_ms should_notify = True + # always update last_tick so paused time isn't counted on resume self._last_tick = now if should_notify and self._on_tick is not None: diff --git a/src/lrx_cli/watch/view/__init__.py b/src/lrx_cli/watch/view/__init__.py index c01e3fc..19fa254 100644 --- a/src/lrx_cli/watch/view/__init__.py +++ b/src/lrx_cli/watch/view/__init__.py @@ -37,13 +37,16 @@ class LyricView: line_index = 0 for line in normalized.lines: if not isinstance(line, LyricLine): + # skip metadata/tag lines that carry no renderable text continue text = line.text lines.append(text) + # use first timestamp; clamp to 0 so bisect always works with non-negative ms timestamp = line.line_times_ms[0] if line.line_times_ms else 0 entries.append((max(0, timestamp), line_index)) line_index += 1 + # extract timestamps into a flat tuple so bisect_right can binary-search it timestamps = tuple(timestamp for timestamp, _ in entries) return LyricView( normalized=normalized, @@ -55,12 +58,16 @@ class LyricView: def signature_cursor(self, at_ms: int) -> tuple: """Build a stable cursor signature for dedupe decisions.""" if not self.timed_line_entries: + # untimed lyrics: signature is the full line set — changes only on track change return ("plain", self.lines) first_ts = self.timed_line_entries[0][0] if at_ms < first_ts: + # playback hasn't reached the first lyric yet; hold until it does return ("before_first", first_ts) + # bisect_right gives the insertion point after equal timestamps, so -1 gives + # the last line whose timestamp <= at_ms (i.e. the currently active line) idx = bisect_right(self.timestamps, at_ms) - 1 if idx < 0: idx = 0 @@ -82,6 +89,11 @@ class WatchState: class BaseOutput(ABC): + # When False, the coordinator passes position=0 for signature computation and + # skips tracker-tick-driven emits, so on_state fires at most once per + # track+status transition rather than on every lyric cursor advance. + position_sensitive: bool = True + @abstractmethod async def on_state(self, state: WatchState) -> None: """Render or deliver one watch state frame.""" diff --git a/src/lrx_cli/watch/view/pipe.py b/src/lrx_cli/watch/view/pipe.py index 0eb5390..19a9af5 100644 --- a/src/lrx_cli/watch/view/pipe.py +++ b/src/lrx_cli/watch/view/pipe.py @@ -1,4 +1,8 @@ -"""Pipe output implementation for watch mode.""" +""" +Author: Uyanide pywang0608@foxmail.com +Date: 2026-04-10 08:15:17 +Description: Pipe output implementation for watch mode. +""" from bisect import bisect_right from dataclasses import dataclass @@ -38,12 +42,14 @@ class PipeOutput(BaseOutput): effective_ms = state.position_ms + state.offset_ms current_line_idx: int | None if entries and effective_ms < entries[0][0]: - # Before first timestamp, current lyric is empty and after-window shows upcoming lines. + # playback hasn't reached the first lyric yet; treat current slot as empty + # so the after-window can show upcoming lines without a "current" anchor current_line_idx = None else: if not entries: current_line_idx = 0 else: + # bisect_right - 1 gives the last entry whose timestamp <= effective_ms current_entry_idx = ( bisect_right(state.lyrics.timestamps, effective_ms) - 1 ) @@ -54,6 +60,8 @@ class PipeOutput(BaseOutput): out: list[str] = [] for rel in range(-self.before, self.after + 1): if current_line_idx is None: + # before-first-timestamp: before/current slots are empty; after slots + # show lines starting from index 0 (rel=1 → line 0, rel=2 → line 1, …) if rel <= 0: out.append("") continue @@ -80,5 +88,6 @@ class PipeOutput(BaseOutput): lines = self._render_lyrics(state) for line in lines: + # no_newline mode lets callers use \r to overwrite the previous frame in-place sys.stdout.write(line + ("\n" if not self.no_newline else "")) sys.stdout.flush() diff --git a/src/lrx_cli/watch/view/print.py b/src/lrx_cli/watch/view/print.py new file mode 100644 index 0000000..1dafdae --- /dev/null +++ b/src/lrx_cli/watch/view/print.py @@ -0,0 +1,44 @@ +""" +Author: Uyanide pywang0608@foxmail.com +Date: 2026-04-10 08:15:31 +Description: Print output implementation for watch mode — one shot per track. +""" + +import sys + +from . import BaseOutput, WatchState, WatchStatus + + +class PrintOutput(BaseOutput): + """Emit full lyrics to stdout once per track transition, then stay silent. + + Deduplication is delegated to the coordinator via position_sensitive=False: + the coordinator uses a fixed position for signatures, so on_state fires at + most once per (status, track_key) transition rather than on every tick. + """ + + # fixed position=0 in signatures → coordinator calls on_state only on + # track/status transitions, never on lyric cursor advances + position_sensitive = False + + plain: bool + + def __init__(self, plain: bool = False) -> None: + self.plain = plain + + async def on_state(self, state: WatchState) -> None: + if state.status == WatchStatus.FETCHING or state.status == WatchStatus.IDLE: + return + + if state.status == WatchStatus.NO_LYRICS: + # emit a blank line as a machine-readable sentinel for "track changed, no lyrics" + sys.stdout.write("\n") + sys.stdout.flush() + elif state.status == WatchStatus.OK and state.lyrics is not None: + lrc = state.lyrics.normalized + if self.plain: + text = lrc.to_plain() + else: + text = str(lrc) + sys.stdout.write(text + "\n") + sys.stdout.flush() diff --git a/tests/test_watch.py b/tests/test_watch.py index 03acf96..d8f8994 100644 --- a/tests/test_watch.py +++ b/tests/test_watch.py @@ -2,20 +2,22 @@ from __future__ import annotations import asyncio from pathlib import Path +from typing import Optional from lrx_cli.lrc import LRCData from lrx_cli.models import TrackMeta from lrx_cli.watch.control import ControlClient, ControlServer, parse_delta from lrx_cli.watch.view import BaseOutput, LyricView, WatchState, WatchStatus from lrx_cli.watch.view.pipe import PipeOutput +from lrx_cli.watch.view.print import PrintOutput from lrx_cli.watch.player import ActivePlayerSelector, PlayerState, PlayerTarget -from lrx_cli.watch.fetcher import LyricFetcher from lrx_cli.config import AppConfig from lrx_cli.watch.tracker import PositionTracker from lrx_cli.watch.session import WatchCoordinator TEST_CONFIG = AppConfig() +BUS = "org.mpris.MediaPlayer2.spotify" def test_parse_delta_supports_plus_minus_and_reset() -> None: @@ -24,16 +26,17 @@ def test_parse_delta_supports_plus_minus_and_reset() -> None: assert parse_delta("0") == (True, 0, None) +# PlayerTarget + + def test_player_target_allows_all_when_hint_empty() -> None: target = PlayerTarget() - assert target.allows("org.mpris.MediaPlayer2.spotify") is True assert target.allows("org.mpris.MediaPlayer2.mpd") is True def test_player_target_filters_by_case_insensitive_substring() -> None: target = PlayerTarget("Spot") - assert target.allows("org.mpris.MediaPlayer2.spotify") is True assert target.allows("org.mpris.MediaPlayer2.mpd") is False @@ -43,60 +46,72 @@ def test_player_target_reports_blacklisted_hint() -> None: assert target.validation_error() is not None +def test_player_target_non_blacklisted_hint_is_valid() -> None: + target = PlayerTarget("mpd", player_blacklist=("spotify",)) + assert target.validation_error() is None + + +# ActivePlayerSelector + + +def _ps(bus: str, status: str = "Playing") -> PlayerState: + return PlayerState(bus_name=bus, status=status, track=TrackMeta(title="T")) + + +def test_active_player_selector_returns_none_when_no_players() -> None: + assert ActivePlayerSelector.select({}, None, "spotify") is None + + def test_active_player_selector_prefers_single_playing() -> None: players = { - "org.mpris.MediaPlayer2.foo": PlayerState( - bus_name="org.mpris.MediaPlayer2.foo", - status="Paused", - track=TrackMeta(title="A"), - ), - "org.mpris.MediaPlayer2.bar": PlayerState( - bus_name="org.mpris.MediaPlayer2.bar", - status="Playing", - track=TrackMeta(title="B"), - ), + "org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"), + "org.mpris.MediaPlayer2.bar": _ps("org.mpris.MediaPlayer2.bar", "Playing"), } assert ( - ActivePlayerSelector.select(players, None, TEST_CONFIG.general.preferred_player) + ActivePlayerSelector.select(players, None, "spotify") == "org.mpris.MediaPlayer2.bar" ) +def test_active_player_selector_prefers_keyword_among_multiple_playing() -> None: + players = { + "org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo"), + "org.mpris.MediaPlayer2.spotify": _ps("org.mpris.MediaPlayer2.spotify"), + } + assert ( + ActivePlayerSelector.select(players, None, "spotify") + == "org.mpris.MediaPlayer2.spotify" + ) + + def test_active_player_selector_uses_last_active_when_no_playing() -> None: players = { - "org.mpris.MediaPlayer2.foo": PlayerState( - bus_name="org.mpris.MediaPlayer2.foo", - status="Paused", - track=TrackMeta(title="A"), - ), - "org.mpris.MediaPlayer2.bar": PlayerState( - bus_name="org.mpris.MediaPlayer2.bar", - status="Stopped", - track=TrackMeta(title="B"), - ), + "org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"), + "org.mpris.MediaPlayer2.bar": _ps("org.mpris.MediaPlayer2.bar", "Stopped"), } - assert ( - ActivePlayerSelector.select( - players, - "org.mpris.MediaPlayer2.bar", - TEST_CONFIG.general.preferred_player, - ) + ActivePlayerSelector.select(players, "org.mpris.MediaPlayer2.bar", "spotify") == "org.mpris.MediaPlayer2.bar" ) +def test_active_player_selector_falls_back_to_first_when_no_preference() -> None: + players = { + "org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"), + } + result = ActivePlayerSelector.select(players, None, "") + assert result == "org.mpris.MediaPlayer2.foo" + + +# PositionTracker + + def test_position_tracker_seeked_calibrates_immediately() -> None: async def _run() -> None: - async def _poll(_bus: str): - return 1200 - - tracker = PositionTracker(_poll, TEST_CONFIG) + tracker = PositionTracker(lambda _: asyncio.sleep(0, result=1200), TEST_CONFIG) await tracker.start() - await tracker.set_active_player( - "org.mpris.MediaPlayer2.foo", "Playing", "track-A" - ) - await tracker.on_seeked("org.mpris.MediaPlayer2.foo", 3500) + await tracker.set_active_player(BUS, "Playing", "track-A") + await tracker.on_seeked(BUS, 3500) pos = await tracker.get_position_ms() await tracker.stop() assert pos >= 3500 @@ -104,74 +119,56 @@ def test_position_tracker_seeked_calibrates_immediately() -> None: asyncio.run(_run()) -def test_position_tracker_playback_status_pause_stops_fast_growth() -> None: +def test_position_tracker_pause_stops_position_growth() -> None: async def _run() -> None: - async def _poll(_bus: str): - return 0 - - tracker = PositionTracker(_poll, TEST_CONFIG) + tracker = PositionTracker(lambda _: asyncio.sleep(0, result=0), TEST_CONFIG) await tracker.start() - await tracker.set_active_player( - "org.mpris.MediaPlayer2.foo", "Playing", "track-A" - ) + await tracker.set_active_player(BUS, "Playing", "track-A") await asyncio.sleep(0.08) before = await tracker.get_position_ms() - - await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Paused") + await tracker.on_playback_status(BUS, "Paused") await asyncio.sleep(0.08) after = await tracker.get_position_ms() await tracker.stop() - assert before > 0 assert after - before < 20 asyncio.run(_run()) -def test_position_tracker_playback_status_playing_calibrates_once() -> None: +def test_position_tracker_resume_via_playback_status_calibrates() -> None: async def _run() -> None: - async def _poll(_bus: str): - return 50000 - - tracker = PositionTracker(_poll, TEST_CONFIG) + tracker = PositionTracker(lambda _: asyncio.sleep(0, result=50000), TEST_CONFIG) await tracker.start() - await tracker.set_active_player( - "org.mpris.MediaPlayer2.foo", "Paused", "track-A" - ) - await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Playing") + await tracker.set_active_player(BUS, "Paused", "track-A") + await tracker.on_playback_status(BUS, "Playing") pos = await tracker.get_position_ms() await tracker.stop() - assert pos >= 50000 asyncio.run(_run()) -def test_position_tracker_set_active_player_playing_calibrates_on_resume() -> None: +def test_position_tracker_resume_via_set_active_player_calibrates() -> None: async def _run() -> None: - async def _poll(_bus: str): - return 42000 - - tracker = PositionTracker(_poll, TEST_CONFIG) + tracker = PositionTracker(lambda _: asyncio.sleep(0, result=42000), TEST_CONFIG) await tracker.start() - await tracker.set_active_player( - "org.mpris.MediaPlayer2.foo", "Paused", "track-A" - ) - await tracker.set_active_player( - "org.mpris.MediaPlayer2.foo", "Playing", "track-A" - ) + await tracker.set_active_player(BUS, "Paused", "track-A") + await tracker.set_active_player(BUS, "Playing", "track-A") pos = await tracker.get_position_ms() await tracker.stop() - assert pos >= 42000 asyncio.run(_run()) +# ControlServer and ControlClient + + def test_control_server_and_client_roundtrip(tmp_path: Path) -> None: async def _run() -> None: class _Session: - def __init__(self): + def __init__(self) -> None: self.offset = 0 def handle_offset(self, delta: int) -> dict: @@ -183,14 +180,11 @@ def test_control_server_and_client_roundtrip(tmp_path: Path) -> None: socket_path = tmp_path / "watch.sock" server = ControlServer(socket_path=str(socket_path)) - session = _Session() - - await server.start(session) # type: ignore + await server.start(_Session()) # type: ignore client = ControlClient(socket_path=str(socket_path)) r1 = await client._send_async({"cmd": "offset", "delta": 200}) r2 = await client._send_async({"cmd": "status"}) await server.stop() - assert r1 == {"ok": True, "offset_ms": 200} assert r2["ok"] is True assert r2["offset_ms"] == 200 @@ -198,113 +192,203 @@ def test_control_server_and_client_roundtrip(tmp_path: Path) -> None: asyncio.run(_run()) -def test_pipe_output_prints_fixed_window_for_status(capsys) -> None: - output = PipeOutput(before=1, after=1) - state = WatchState( - track=None, - lyrics=None, - position_ms=0, - offset_ms=0, - status="fetching", +# PipeOutput + + +def _pipe_state( + status: WatchStatus, + lyrics: Optional[LRCData] = None, + position_ms: int = 0, + offset_ms: int = 0, + track: Optional[TrackMeta] = None, +) -> WatchState: + return WatchState( + track=track, + lyrics=LyricView.from_lrc(lyrics) if lyrics else None, + position_ms=position_ms, + offset_ms=offset_ms, + status=status, ) - asyncio.run(output.on_state(state)) - printed = capsys.readouterr().out - assert printed == "\n[fetching...]\n\n" - - -def test_pipe_output_uses_context_window_for_lyrics(capsys) -> None: - output = PipeOutput(before=1, after=1) - lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") - state = WatchState( - track=TrackMeta(title="Song"), - lyrics=LyricView.from_lrc(lyrics), - position_ms=2100, - offset_ms=0, - status="ok", +def test_pipe_output_fetching_renders_status_window(capsys) -> None: + asyncio.run( + PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.FETCHING)) ) - - asyncio.run(output.on_state(state)) - - printed = capsys.readouterr().out - assert printed == "a\nb\nc\n" + assert capsys.readouterr().out == "\n[fetching...]\n\n" -def test_pipe_output_shows_upcoming_lines_before_first_timestamp(capsys) -> None: - output = PipeOutput(before=1, after=1) - lyrics = LRCData("[00:02.00]a\n[00:03.00]b") - state = WatchState( - track=TrackMeta(title="Song"), - lyrics=LyricView.from_lrc(lyrics), - position_ms=0, - offset_ms=0, - status="ok", +def test_pipe_output_no_lyrics_renders_status_window(capsys) -> None: + asyncio.run( + PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.NO_LYRICS)) ) - - asyncio.run(output.on_state(state)) - - printed = capsys.readouterr().out - assert printed == "\n\na\n" + assert capsys.readouterr().out == "\n[no lyrics]\n\n" -def test_pipe_output_first_line_keeps_before_region_empty(capsys) -> None: - output = PipeOutput(before=1, after=1) - lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") - state = WatchState( - track=TrackMeta(title="Song"), - lyrics=LyricView.from_lrc(lyrics), - position_ms=1100, - offset_ms=0, - status="ok", +def test_pipe_output_idle_renders_status_window(capsys) -> None: + asyncio.run(PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.IDLE))) + assert capsys.readouterr().out == "\n[idle]\n\n" + + +def test_pipe_output_no_newline_mode(capsys) -> None: + asyncio.run( + PipeOutput(before=0, after=0, no_newline=True).on_state( + _pipe_state(WatchStatus.FETCHING) + ) ) - - asyncio.run(output.on_state(state)) - - printed = capsys.readouterr().out - assert printed == "\na\nb\n" + assert capsys.readouterr().out == "[fetching...]" -def test_pipe_output_last_line_keeps_after_region_empty(capsys) -> None: - output = PipeOutput(before=1, after=1) - lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") - state = WatchState( - track=TrackMeta(title="Song"), - lyrics=LyricView.from_lrc(lyrics), - position_ms=3100, - offset_ms=0, - status="ok", +def test_pipe_output_default_window_shows_current_line(capsys) -> None: + lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + asyncio.run( + PipeOutput().on_state(_pipe_state(WatchStatus.OK, lrc, position_ms=2100)) ) + assert capsys.readouterr().out == "b\n" - asyncio.run(output.on_state(state)) - printed = capsys.readouterr().out - assert printed == "b\nc\n\n" +def test_pipe_output_context_window(capsys) -> None: + lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + asyncio.run( + PipeOutput(before=1, after=1).on_state( + _pipe_state(WatchStatus.OK, lrc, position_ms=2100) + ) + ) + assert capsys.readouterr().out == "a\nb\nc\n" + + +def test_pipe_output_before_region_empty_at_first_line(capsys) -> None: + lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + asyncio.run( + PipeOutput(before=1, after=1).on_state( + _pipe_state(WatchStatus.OK, lrc, position_ms=1100) + ) + ) + assert capsys.readouterr().out == "\na\nb\n" + + +def test_pipe_output_after_region_empty_at_last_line(capsys) -> None: + lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + asyncio.run( + PipeOutput(before=1, after=1).on_state( + _pipe_state(WatchStatus.OK, lrc, position_ms=3100) + ) + ) + assert capsys.readouterr().out == "b\nc\n\n" + + +def test_pipe_output_upcoming_lines_before_first_timestamp(capsys) -> None: + lrc = LRCData("[00:02.00]a\n[00:03.00]b") + asyncio.run( + PipeOutput(before=1, after=1).on_state( + _pipe_state(WatchStatus.OK, lrc, position_ms=0) + ) + ) + assert capsys.readouterr().out == "\n\na\n" + + +def test_pipe_output_offset_ms_shifts_effective_position(capsys) -> None: + lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + asyncio.run( + PipeOutput().on_state( + _pipe_state(WatchStatus.OK, lrc, position_ms=1000, offset_ms=1500) + ) + ) + # effective = 2500 ms → line b + assert capsys.readouterr().out == "b\n" def test_pipe_output_repeated_text_uses_correct_timed_occurrence(capsys) -> None: - output = PipeOutput(before=1, after=1) - lyrics = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C") - state = WatchState( - track=TrackMeta(title="Song"), + lrc = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C") + asyncio.run( + PipeOutput(before=1, after=1).on_state( + _pipe_state(WatchStatus.OK, lrc, position_ms=4100) + ) + ) + assert capsys.readouterr().out == "B\nX\nC\n" + + +# PrintOutput + + +def _ok_state(lyrics: LRCData, track: Optional[TrackMeta] = None) -> WatchState: + return WatchState( + track=track or TrackMeta(title="Song", artist="Artist"), lyrics=LyricView.from_lrc(lyrics), - position_ms=4100, + position_ms=0, offset_ms=0, - status="ok", + status=WatchStatus.OK, ) + +def _status_state(status: WatchStatus, track: Optional[TrackMeta] = None) -> WatchState: + return WatchState( + track=track or TrackMeta(title="Song", artist="Artist"), + lyrics=None, + position_ms=0, + offset_ms=0, + status=status, + ) + + +def test_print_output_emits_lrc_on_ok(capsys) -> None: + asyncio.run( + PrintOutput().on_state(_ok_state(LRCData("[00:01.00]Hello\n[00:02.00]World"))) + ) + assert capsys.readouterr().out.startswith("[00:01.00]") + + +def test_print_output_plain_strips_tags(capsys) -> None: + asyncio.run( + PrintOutput(plain=True).on_state( + _ok_state(LRCData("[00:01.00]Hello\n[00:02.00]World")) + ) + ) + out = capsys.readouterr().out + assert "[" not in out + assert "Hello" in out + + +def test_print_output_plain_with_unsynced_lyrics(capsys) -> None: + asyncio.run(PrintOutput(plain=True).on_state(_ok_state(LRCData("Hello\nWorld")))) + out = capsys.readouterr().out + assert "Hello" in out + assert "[" not in out + + +def test_print_output_no_lyrics_emits_blank_line(capsys) -> None: + asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.NO_LYRICS))) + assert capsys.readouterr().out == "\n" + + +def test_print_output_fetching_emits_nothing(capsys) -> None: + asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.FETCHING))) + assert capsys.readouterr().out == "" + + +def test_print_output_idle_emits_nothing(capsys) -> None: + asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.IDLE))) + assert capsys.readouterr().out == "" + + +def test_print_output_is_stateless(capsys) -> None: + """View has no internal deduplication — emits on every call.""" + output = PrintOutput() + state = _ok_state(LRCData("[00:01.00]Hello")) asyncio.run(output.on_state(state)) - - printed = capsys.readouterr().out - assert printed == "B\nX\nC\n" + asyncio.run(output.on_state(state)) + lines = [ln for ln in capsys.readouterr().out.splitlines() if ln] + assert len(lines) == 2 -# ── WatchCoordinator state machine ─────────────────────────────────────────── +def test_print_output_position_sensitive_is_false() -> None: + assert PrintOutput.position_sensitive is False + + +# WatchCoordinator class _CaptureFetcher: - """Records fetch requests without doing real network calls.""" - def __init__(self) -> None: self.requested: list[str] = [] @@ -315,18 +399,18 @@ class _CaptureFetcher: pass -def _make_coordinator() -> WatchCoordinator: +def _make_coordinator(output: Optional[BaseOutput] = None) -> WatchCoordinator: class _Manager: def fetch_for_track(self, *_a, **_kw): return None - class _Output(BaseOutput): + class _NullOutput(BaseOutput): async def on_state(self, state: WatchState) -> None: pass session = WatchCoordinator( _Manager(), # type: ignore - _Output(), + output or _NullOutput(), player_hint=None, config=TEST_CONFIG, ) @@ -337,9 +421,6 @@ def _make_coordinator() -> WatchCoordinator: return session -BUS = "org.mpris.MediaPlayer2.spotify" - - def _pstate(status: str = "Playing", title: str = "Song") -> PlayerState: return PlayerState( bus_name=BUS, @@ -353,11 +434,9 @@ def test_coordinator_fetches_on_initial_player() -> None: session = _make_coordinator() fetcher = _CaptureFetcher() session._fetcher = fetcher # type: ignore[assignment] - session._player_monitor.players = {BUS: _pstate("Playing")} session._on_player_change() await asyncio.sleep(0) - assert fetcher.requested == ["Artist - Song"] assert session._model.status == WatchStatus.FETCHING @@ -365,17 +444,15 @@ def test_coordinator_fetches_on_initial_player() -> None: def test_coordinator_fetches_while_paused() -> None: - """Fetch is triggered immediately even when player is paused.""" + """Fetch starts immediately even when player is paused — no wait for resume.""" async def _run() -> None: session = _make_coordinator() fetcher = _CaptureFetcher() session._fetcher = fetcher # type: ignore[assignment] - session._player_monitor.players = {BUS: _pstate("Paused")} session._on_player_change() await asyncio.sleep(0) - assert fetcher.requested == ["Artist - Song"] asyncio.run(_run()) @@ -391,13 +468,12 @@ def test_coordinator_fetches_on_track_change() -> None: fetcher = _CaptureFetcher() session._fetcher = fetcher # type: ignore[assignment] - session._player_monitor.players = {BUS: _pstate("Playing", title="New Song")} session._on_player_change() await asyncio.sleep(0) assert fetcher.requested == ["Artist - New Song"] - assert session._model.lyrics is None # cleared on track change + assert session._model.lyrics is None asyncio.run(_run()) @@ -409,17 +485,15 @@ def test_coordinator_no_refetch_on_calibration_no_lyrics() -> None: session = _make_coordinator() fetcher = _CaptureFetcher() session._fetcher = fetcher # type: ignore[assignment] - session._player_monitor.players = {BUS: _pstate("Playing")} - session._on_player_change() # first call: player appears → fetch + session._on_player_change() await asyncio.sleep(0) assert len(fetcher.requested) == 1 - session._model.status = WatchStatus.NO_LYRICS # simulate fetch returned nothing - - session._on_player_change() # calibration: same player/track + session._model.status = WatchStatus.NO_LYRICS + session._on_player_change() await asyncio.sleep(0) - assert len(fetcher.requested) == 1 # no second fetch + assert len(fetcher.requested) == 1 asyncio.run(_run()) @@ -434,7 +508,6 @@ def test_coordinator_no_fetch_when_lyrics_present() -> None: fetcher = _CaptureFetcher() session._fetcher = fetcher # type: ignore[assignment] - session._player_monitor.players = {BUS: _pstate("Playing")} session._on_player_change() await asyncio.sleep(0) @@ -445,99 +518,97 @@ def test_coordinator_no_fetch_when_lyrics_present() -> None: asyncio.run(_run()) -def test_session_emit_state_only_when_lyric_cursor_changes() -> None: +def test_coordinator_player_disappears_goes_idle() -> None: async def _run() -> None: - class _Manager: - def fetch_for_track(self, *_args, **_kwargs): - return None + session = _make_coordinator() + session._model.active_player = BUS + session._model.active_track_key = "Artist - Song" + session._model.set_lyrics(LRCData("[00:01.00]line")) + session._model.status = WatchStatus.OK - class _Output(BaseOutput): - def __init__(self): - self.count = 0 + session._player_monitor.players = {} + session._on_player_change() + await asyncio.sleep(0) - async def on_state(self, state: WatchState) -> None: - self.count += 1 + assert session._model.status == WatchStatus.IDLE + assert session._model.lyrics is None + assert session._model.active_player is None - output = _Output() - session = WatchCoordinator( - _Manager(), # type: ignore - output, - player_hint=None, - config=TEST_CONFIG, - ) - session._tracker = PositionTracker( - lambda _bus: asyncio.sleep(0, result=0), - TEST_CONFIG, - ) + asyncio.run(_run()) - bus_name = "org.mpris.MediaPlayer2.spotify" - track = TrackMeta(title="Song", artist="Artist") - session._model.active_player = bus_name + +def test_coordinator_no_fetch_when_track_is_none() -> None: + """Player present but reports no track metadata → no fetch, status NO_LYRICS.""" + + async def _run() -> None: + session = _make_coordinator() + fetcher = _CaptureFetcher() + session._fetcher = fetcher # type: ignore[assignment] session._player_monitor.players = { - bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track) + BUS: PlayerState(bus_name=BUS, status="Playing", track=None) + } + session._on_player_change() + await asyncio.sleep(0) + + assert fetcher.requested == [] + assert session._model.status == WatchStatus.NO_LYRICS + + asyncio.run(_run()) + + +def test_coordinator_emit_deduplicates_on_same_cursor() -> None: + async def _run() -> None: + counts = [0] + + class _CountOutput(BaseOutput): + async def on_state(self, state: WatchState) -> None: + counts[0] += 1 + + session = _make_coordinator(_CountOutput()) + track = TrackMeta(title="Song", artist="Artist") + session._model.active_player = BUS + session._player_monitor.players = { + BUS: PlayerState(bus_name=BUS, status="Playing", track=track) } session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b")) session._model.status = WatchStatus.OK - await session._tracker.set_active_player( - bus_name, - "Playing", - "Artist - Song", - ) + await session._tracker.set_active_player(BUS, "Playing", "Artist - Song") - await session._emit_state() - await session._emit_state() + await session._emit_state() # emits + await session._emit_state() # same cursor → no emit + assert counts[0] == 1 - await session._tracker.on_seeked(bus_name, 3200) - await session._emit_state() - - assert output.count == 2 + await session._tracker.on_seeked(BUS, 3200) + await session._emit_state() # cursor advanced → emits + assert counts[0] == 2 asyncio.run(_run()) -def test_session_emits_when_crossing_first_timestamp() -> None: +def test_coordinator_position_insensitive_output_ignores_seeks() -> None: + """With position_sensitive=False, seek events do not trigger re-emit.""" + async def _run() -> None: - class _Manager: - def fetch_for_track(self, *_args, **_kwargs): - return None - - class _Output(BaseOutput): - def __init__(self): - self.count = 0 + counts = [0] + class _CountPrint(PrintOutput): async def on_state(self, state: WatchState) -> None: - self.count += 1 + counts[0] += 1 - output = _Output() - session = WatchCoordinator( - _Manager(), # type: ignore - output, - player_hint=None, - config=TEST_CONFIG, - ) - session._tracker = PositionTracker( - lambda _bus: asyncio.sleep(0, result=0), - TEST_CONFIG, - ) - - bus_name = "org.mpris.MediaPlayer2.spotify" + session = _make_coordinator(_CountPrint()) track = TrackMeta(title="Song", artist="Artist") - session._model.active_player = bus_name + session._model.active_player = BUS session._player_monitor.players = { - bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track) + BUS: PlayerState(bus_name=BUS, status="Playing", track=track) } - session._model.set_lyrics(LRCData("[00:02.00]a\n[00:03.00]b")) + session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b")) session._model.status = WatchStatus.OK - await session._tracker.set_active_player( - bus_name, - "Playing", - "Artist - Song", - ) - await session._emit_state() - await session._tracker.on_seeked(bus_name, 2500) - await session._emit_state() + await session._emit_state() # emits once + assert counts[0] == 1 - assert output.count == 2 + await session._tracker.on_seeked(BUS, 3200) + await session._emit_state() # position fixed at 0 → same signature → no re-emit + assert counts[0] == 1 asyncio.run(_run()) diff --git a/uv.lock b/uv.lock index dbef3d5..1cf98bf 100644 --- a/uv.lock +++ b/uv.lock @@ -153,7 +153,7 @@ wheels = [ [[package]] name = "lrx-cli" -version = "0.7.2" +version = "0.7.3" source = { editable = "." } dependencies = [ { name = "cyclopts" },