From e6b8583868afee407c8e9bdac80342df99e2ca43 Mon Sep 17 00:00:00 2001 From: Uyanide Date: Thu, 9 Apr 2026 10:15:14 +0200 Subject: [PATCH] feat: add watch command and pipe view --- src/lrx_cli/cache.py | 34 ++- src/lrx_cli/cli.py | 103 ++++++- src/lrx_cli/config.py | 11 + src/lrx_cli/watch/__init__.py | 5 + src/lrx_cli/watch/control.py | 172 +++++++++++ src/lrx_cli/watch/fetcher.py | 80 +++++ src/lrx_cli/watch/options.py | 16 + src/lrx_cli/watch/player.py | 411 +++++++++++++++++++++++++ src/lrx_cli/watch/session.py | 403 +++++++++++++++++++++++++ src/lrx_cli/watch/tracker.py | 143 +++++++++ src/lrx_cli/watch/view/__init__.py | 80 +++++ src/lrx_cli/watch/view/pipe.py | 85 ++++++ tests/test_watch.py | 462 +++++++++++++++++++++++++++++ 13 files changed, 1990 insertions(+), 15 deletions(-) create mode 100644 src/lrx_cli/watch/__init__.py create mode 100644 src/lrx_cli/watch/control.py create mode 100644 src/lrx_cli/watch/fetcher.py create mode 100644 src/lrx_cli/watch/options.py create mode 100644 src/lrx_cli/watch/player.py create mode 100644 src/lrx_cli/watch/session.py create mode 100644 src/lrx_cli/watch/tracker.py create mode 100644 src/lrx_cli/watch/view/__init__.py create mode 100644 src/lrx_cli/watch/view/pipe.py create mode 100644 tests/test_watch.py diff --git a/src/lrx_cli/cache.py b/src/lrx_cli/cache.py index d7accb7..a46cc8e 100644 --- a/src/lrx_cli/cache.py +++ b/src/lrx_cli/cache.py @@ -85,9 +85,15 @@ class CacheEngine: self.db_path = db_path self._init_db() + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=5000") + return conn + def _init_db(self) -> None: """Create cache tables and run one-time slot/cache migrations.""" - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS credentials ( name TEXT PRIMARY KEY, @@ -256,7 +262,7 @@ class CacheEngine: return [] now = int(time.time()) - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.execute( "DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at < ?", (key, now), @@ -353,7 +359,7 @@ class CacheEngine: # Convenience for callers that still pass a single negative result. kinds = [SLOT_SYNCED, SLOT_UNSYNCED] - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: for kind in kinds: conn.execute( """INSERT OR REPLACE INTO cache @@ -386,7 +392,7 @@ class CacheEngine: def clear_all(self) -> None: """Remove every entry from the cache.""" - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.execute("DELETE FROM cache") conn.commit() logger.info("Cache cleared.") @@ -396,7 +402,7 @@ class CacheEngine: if not self._track_has_meta(track): logger.info(f"No cache entries found for {track.display_name()}.") return - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: cur = conn.execute( f"DELETE FROM cache WHERE {_TRACK_WHERE}", _track_where_params(track), @@ -411,7 +417,7 @@ class CacheEngine: def prune(self) -> int: """Remove all expired entries. Returns the number of rows deleted.""" - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: cur = conn.execute( "DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", (int(time.time()),), @@ -439,7 +445,7 @@ class CacheEngine: return None now = int(time.time()) - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.row_factory = sqlite3.Row rows = conn.execute( f"SELECT status, lyrics, source, confidence FROM cache" @@ -495,7 +501,7 @@ class CacheEngine: return [] now = int(time.time()) - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.row_factory = sqlite3.Row rows = conn.execute( """SELECT * FROM cache @@ -557,7 +563,7 @@ class CacheEngine: """ if not self._track_has_meta(track): return 0 - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: cur = conn.execute( f"UPDATE cache SET confidence = ? WHERE {_TRACK_WHERE} AND source = ?", [confidence] + _track_where_params(track) + [source], @@ -571,7 +577,7 @@ class CacheEngine: """Return all cached rows for a given track (across all sources).""" if not self._track_has_meta(track): return [] - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.row_factory = sqlite3.Row return [ dict(r) @@ -586,7 +592,7 @@ class CacheEngine: def get_credential(self, name: str) -> Optional[dict]: """Return cached credential data if present and not expired.""" now_ms = int(time.time() * 1000) - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.row_factory = sqlite3.Row row = conn.execute( "SELECT data FROM credentials WHERE name = ? AND (expires_at IS NULL OR expires_at > ?)", @@ -603,7 +609,7 @@ class CacheEngine: self, name: str, data: dict, expires_at_ms: Optional[int] = None ) -> None: """Persist credential data, optionally with an expiry timestamp (Unix ms).""" - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.execute( "INSERT OR REPLACE INTO credentials (name, data, expires_at) VALUES (?, ?, ?)", (name, json.dumps(data), expires_at_ms), @@ -612,14 +618,14 @@ class CacheEngine: def query_all(self) -> list[dict]: """Return every row in the cache table.""" - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: conn.row_factory = sqlite3.Row return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()] def stats(self) -> dict: """Return aggregate cache statistics.""" now = int(time.time()) - with sqlite3.connect(self.db_path) as conn: + with self._connect() as conn: total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0] expired = conn.execute( "SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", diff --git a/src/lrx_cli/cli.py b/src/lrx_cli/cli.py index 555ff60..fc8f19e 100644 --- a/src/lrx_cli/cli.py +++ b/src/lrx_cli/cli.py @@ -7,18 +7,33 @@ Description: CLI interface. import sys import time import os +import asyncio +import json from pathlib import Path from typing import Annotated from urllib.parse import quote import cyclopts from loguru import logger -from .config import DB_PATH, enable_debug +from .config import ( + DB_PATH, + PLAYER_BLACKLIST, + PREFERRED_PLAYER, + WATCH_CALIBRATION_INTERVAL_S, + WATCH_DEBOUNCE_MS, + WATCH_POSITION_TICK_MS, + WATCH_SOCKET_PATH, + enable_debug, +) from .models import TrackMeta from .mpris import get_current_track from .core import LrcManager from .fetchers import FetcherMethodType from .lrc import get_sidecar_path +from .watch import WatchCoordinator +from .watch.control import ControlClient, parse_delta +from .watch.options import WatchOptions +from .watch.view.pipe import PipeOutput app = cyclopts.App( @@ -29,6 +44,12 @@ app.register_install_completion_command() cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.") app.command(cache_app) +watch_app = cyclopts.App(name="watch", help="Watch MPRIS and output lyrics.") +app.command(watch_app) + +ctl_app = cyclopts.App(name="ctl", help="Control a running watch session.") +watch_app.command(ctl_app) + # Global state set by the meta launcher _player: str | None = None @@ -38,6 +59,18 @@ _db_path: str | None = None manager: LrcManager = None # type: ignore +def _build_watch_options() -> WatchOptions: + """Build runtime watch options from CLI composition root.""" + return WatchOptions( + preferred_player=PREFERRED_PLAYER, + player_blacklist=tuple(PLAYER_BLACKLIST), + debounce_ms=WATCH_DEBOUNCE_MS, + position_tick_ms=WATCH_POSITION_TICK_MS, + calibration_interval_s=WATCH_CALIBRATION_INTERVAL_S, + socket_path=WATCH_SOCKET_PATH, + ) + + @app.meta.default def launcher( *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)], @@ -357,6 +390,74 @@ def export( sys.exit(1) +# watch subcommands + + +@watch_app.command +def pipe( + before: Annotated[ + int, + cyclopts.Parameter( + name="--before", + help="Number of lyric lines to show before current line.", + ), + ] = 0, + after: Annotated[ + int, + cyclopts.Parameter( + name="--after", + help="Number of lyric lines to show after current line.", + ), + ] = 0, +): + """Watch active player and continuously print lyric window to stdout.""" + logger.info( + "Starting watch pipe (player filter: {})", + _player or "", + ) + output = PipeOutput(before=max(0, before), after=max(0, after)) + options = _build_watch_options() + try: + session = WatchCoordinator( + manager, + output, + player_hint=_player, + options=options, + ) + success = asyncio.run(session.run()) + if not success: + sys.exit(1) + except KeyboardInterrupt: + logger.info("Watch stopped.") + + +@ctl_app.command +def offset(delta: str) -> None: + """Adjust watch offset. Examples: +200, -200, 0.""" + parsed_ok, parsed_delta, parse_error = parse_delta(delta) + if not parsed_ok or parsed_delta is None: + logger.error(parse_error or "Invalid offset delta") + sys.exit(1) + + response = ControlClient(options=_build_watch_options()).send( + {"cmd": "offset", "delta": parsed_delta} + ) + if not response.get("ok"): + logger.error(response.get("error", "Unknown error")) + sys.exit(1) + print(json.dumps(response, indent=2, ensure_ascii=False)) + + +@ctl_app.command +def status() -> None: + """Print current watch session status as JSON.""" + response = ControlClient(options=_build_watch_options()).send({"cmd": "status"}) + if not response.get("ok"): + logger.error(response.get("error", "Unknown error")) + sys.exit(1) + print(json.dumps(response, indent=2, ensure_ascii=False)) + + # cache subcommands diff --git a/src/lrx_cli/config.py b/src/lrx_cli/config.py index 5b49c31..5994b3d 100644 --- a/src/lrx_cli/config.py +++ b/src/lrx_cli/config.py @@ -68,6 +68,17 @@ MUSIXMATCH_COOLDOWN_MS = 600_000 # 10 minutes # Player preference (used when multiple MPRIS players are active) PREFERRED_PLAYER = os.environ.get("PREFERRED_PLAYER", "spotify") +PLAYER_BLACKLIST = [ + s.strip() for s in os.environ.get("PLAYER_BLACKLIST", "").split(",") if s.strip() +] + +# Watch mode +WATCH_DEBOUNCE_MS = int(os.environ.get("WATCH_DEBOUNCE_MS", "400")) +WATCH_CALIBRATION_INTERVAL_S = float( + os.environ.get("WATCH_CALIBRATION_INTERVAL_S", "3.0") +) +WATCH_POSITION_TICK_MS = int(os.environ.get("WATCH_POSITION_TICK_MS", "50")) +WATCH_SOCKET_PATH = Path(CACHE_DIR) / "watch.sock" class _Credentials: diff --git a/src/lrx_cli/watch/__init__.py b/src/lrx_cli/watch/__init__.py new file mode 100644 index 0000000..06ddf68 --- /dev/null +++ b/src/lrx_cli/watch/__init__.py @@ -0,0 +1,5 @@ +"""Watch subsystem public exports.""" + +from .session import WatchCoordinator + +__all__ = ["WatchCoordinator"] diff --git a/src/lrx_cli/watch/control.py b/src/lrx_cli/watch/control.py new file mode 100644 index 0000000..ad020e2 --- /dev/null +++ b/src/lrx_cli/watch/control.py @@ -0,0 +1,172 @@ +"""Unix-socket control channel for communicating with a running watch session.""" + +import asyncio +import json +from pathlib import Path +from typing import Protocol, TypeAlias + +from loguru import logger + +from .options import WatchOptions + + +JSONPrimitive: TypeAlias = str | int | float | bool | None +JSONValue: TypeAlias = JSONPrimitive | dict[str, "JSONValue"] | list["JSONValue"] +JSONDict: TypeAlias = dict[str, JSONValue] + + +class ControlSession(Protocol): + """Session protocol used by control channel handlers.""" + + def handle_offset(self, delta: int) -> JSONDict: + """Apply offset delta and return JSON response payload.""" + ... + + def handle_status(self) -> JSONDict: + """Return current session status payload.""" + ... + + +class ControlServer: + """Control server that handles offset/status commands over a Unix socket.""" + + _options: WatchOptions + _socket_path: Path + _server: asyncio.AbstractServer | None + + def __init__( + self, + options: WatchOptions, + socket_path: Path | None = None, + ) -> None: + """Initialize control server with explicit socket path or runtime options.""" + self._options = options + resolved_socket_path = socket_path or self._options.socket_path + self._socket_path: Path = resolved_socket_path + self._server: asyncio.AbstractServer | None = None + + async def start(self, session: ControlSession) -> bool: + """Start listening for control requests and bind session handlers.""" + if not await self._prepare_socket_path(): + return False + + self._socket_path.parent.mkdir(parents=True, exist_ok=True) + self._server = await asyncio.start_unix_server( + lambda r, w: self._handle(session, r, w), + path=str(self._socket_path), + ) + return True + + async def _prepare_socket_path(self) -> bool: + """Ensure socket path is usable and reject when another session is active.""" + if not self._socket_path.exists(): + return True + + try: + reader, writer = await asyncio.open_unix_connection(str(self._socket_path)) + writer.close() + await writer.wait_closed() + logger.error( + "A watch session is already running. Use 'lrx watch ctl status'." + ) + return False + except Exception: + try: + self._socket_path.unlink(missing_ok=True) + except Exception: + pass + return True + + async def stop(self) -> None: + """Stop control server and remove stale socket path.""" + if self._server is not None: + self._server.close() + await self._server.wait_closed() + self._server = None + try: + self._socket_path.unlink(missing_ok=True) + except Exception: + pass + + async def _handle( + self, + session: ControlSession, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + """Handle one control request and send JSON response.""" + resp: JSONDict = {"ok": False, "error": "internal error"} + try: + line = await reader.readline() + if not line: + resp = {"ok": False, "error": "empty request"} + else: + req = json.loads(line.decode("utf-8")) + cmd = req.get("cmd") + if cmd == "offset": + delta = int(req.get("delta", 0)) + resp = session.handle_offset(delta) + elif cmd == "status": + resp = session.handle_status() + else: + resp = {"ok": False, "error": "unknown command"} + except Exception as e: + resp = {"ok": False, "error": str(e)} + finally: + writer.write((json.dumps(resp) + "\n").encode("utf-8")) + await writer.drain() + writer.close() + await writer.wait_closed() + + +class ControlClient: + """Control client used by CLI commands to talk to active watch session.""" + + _options: WatchOptions + _socket_path: Path + + def __init__( + self, + options: WatchOptions, + socket_path: Path | None = None, + ) -> None: + """Initialize control client with explicit socket path or runtime options.""" + self._options = options + resolved_socket_path = socket_path or self._options.socket_path + self._socket_path: Path = resolved_socket_path + + async def _send_async(self, cmd: JSONDict) -> JSONDict: + """Send one JSON command to control server and return JSON response.""" + if not self._socket_path.exists(): + return {"ok": False, "error": "No watch session running."} + + try: + reader, writer = await asyncio.open_unix_connection(str(self._socket_path)) + except Exception: + return {"ok": False, "error": "No watch session running."} + + writer.write((json.dumps(cmd) + "\n").encode("utf-8")) + await writer.drain() + line = await reader.readline() + writer.close() + await writer.wait_closed() + if not line: + return {"ok": False, "error": "Empty response."} + return json.loads(line.decode("utf-8")) + + def send(self, cmd: JSONDict) -> JSONDict: + """Synchronous wrapper around async control request.""" + return asyncio.run(self._send_async(cmd)) + + +def parse_delta(raw: str) -> tuple[bool, int | None, str | None]: + """Parse signed millisecond offset delta string for ctl offset command.""" + value = raw.strip() + try: + if value.startswith("+"): + return True, int(value[1:]), None + if value.startswith("-"): + return True, -int(value[1:]), None + return True, int(value), None + except ValueError: + return False, None, f"Invalid offset delta: {raw}" diff --git a/src/lrx_cli/watch/fetcher.py b/src/lrx_cli/watch/fetcher.py new file mode 100644 index 0000000..e0f0cdd --- /dev/null +++ b/src/lrx_cli/watch/fetcher.py @@ -0,0 +1,80 @@ +"""Debounced lyric fetch orchestration for watch session.""" + +import asyncio +from typing import Awaitable, Callable, Optional + +from ..lrc import LRCData +from ..models import TrackMeta +from .options import WatchOptions + + +class LyricFetcher: + """Debounces track updates and runs at most one lyric fetch task at a time.""" + + _options: WatchOptions + _fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]] + _on_fetching: Callable[[], Awaitable[None] | None] + _on_result: Callable[[Optional[LRCData]], Awaitable[None] | None] + _debounce_task: asyncio.Task | None + _fetch_task: asyncio.Task | None + _pending_track: TrackMeta | None + + def __init__( + self, + fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]], + on_fetching: Callable[[], Awaitable[None] | None], + on_result: Callable[[Optional[LRCData]], Awaitable[None] | None], + options: WatchOptions, + ) -> None: + """Initialize fetch callbacks and runtime options.""" + self._options = options + self._fetch_func = fetch_func + self._on_fetching = on_fetching + self._on_result = on_result + self._debounce_task: asyncio.Task | None = None + self._fetch_task: asyncio.Task | None = None + self._pending_track: TrackMeta | None = None + + async def stop(self) -> None: + """Cancel and await all in-flight debounce/fetch tasks.""" + for task in (self._debounce_task, self._fetch_task): + if task is not None: + task.cancel() + await asyncio.gather( + *[t for t in (self._debounce_task, self._fetch_task) if t is not None], + return_exceptions=True, + ) + self._debounce_task = None + self._fetch_task = None + + def request(self, track: TrackMeta) -> None: + """Request lyrics for track with debounce collapsing.""" + self._pending_track = track + if self._debounce_task is not None: + self._debounce_task.cancel() + self._debounce_task = asyncio.create_task(self._debounce_then_fetch()) + + async def _debounce_then_fetch(self) -> None: + """Wait debounce window then start a fresh fetch task for latest pending track.""" + await asyncio.sleep(self._options.debounce_ms / 1000.0) + track = self._pending_track + if track is None: + return + + if self._fetch_task is not None: + self._fetch_task.cancel() + await asyncio.gather(self._fetch_task, return_exceptions=True) + + self._fetch_task = asyncio.create_task(self._do_fetch(track)) + + async def _do_fetch(self, track: TrackMeta) -> None: + """Execute fetch lifecycle callbacks and fetch lyrics for a track.""" + fetching_callback_result = self._on_fetching() + if asyncio.iscoroutine(fetching_callback_result): + await fetching_callback_result + + lyrics = await self._fetch_func(track) + + result_callback_result = self._on_result(lyrics) + if asyncio.iscoroutine(result_callback_result): + await result_callback_result diff --git a/src/lrx_cli/watch/options.py b/src/lrx_cli/watch/options.py new file mode 100644 index 0000000..1dc5358 --- /dev/null +++ b/src/lrx_cli/watch/options.py @@ -0,0 +1,16 @@ +"""Watch runtime options passed from CLI composition root.""" + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class WatchOptions: + """Runtime settings used by watch components.""" + + preferred_player: str + player_blacklist: tuple[str, ...] + debounce_ms: int + position_tick_ms: int + calibration_interval_s: float + socket_path: Path diff --git a/src/lrx_cli/watch/player.py b/src/lrx_cli/watch/player.py new file mode 100644 index 0000000..ccf3a8a --- /dev/null +++ b/src/lrx_cli/watch/player.py @@ -0,0 +1,411 @@ +"""Player discovery, state monitoring, and active-player selection for watch mode.""" + +from dataclasses import dataclass +from typing import Callable, Optional +import asyncio + +from dbus_next.aio.message_bus import MessageBus +from dbus_next.constants import BusType +from dbus_next.message import Message +from loguru import logger + +from ..models import TrackMeta +from .options import WatchOptions + + +def _variant_value(item: object) -> object | None: + """Extract .value from DBus variant-like objects when available.""" + if hasattr(item, "value"): + return getattr(item, "value") + return None + + +@dataclass(slots=True) +class PlayerState: + """Current observable state for one MPRIS player.""" + + bus_name: str + status: str + track: Optional[TrackMeta] + + +@dataclass(frozen=True, slots=True) +class PlayerTarget: + """Constraint for choosing which players are visible to watch.""" + + hint: Optional[str] = None + player_blacklist: tuple[str, ...] = () + + def validation_error(self) -> str | None: + """Return validation message when hint conflicts with blacklist, else None.""" + normalized_hint = self.normalized_hint + if not normalized_hint: + return None + for blocked in self.player_blacklist: + normalized_blocked = blocked.strip().lower() + if not normalized_blocked: + continue + if _keyword_match(normalized_hint, normalized_blocked) or _keyword_match( + normalized_blocked, normalized_hint + ): + return ( + f"Requested player '{self.hint}' is blocked by " + f"PLAYER_BLACKLIST entry '{blocked}'." + ) + return None + + @property + def normalized_hint(self) -> str: + """Return normalized lowercase player hint string.""" + return (self.hint or "").strip().lower() + + def allows(self, bus_name: str) -> bool: + """Return whether given MPRIS bus name passes this target constraint.""" + normalized_hint = self.normalized_hint + if not normalized_hint: + return True + return _keyword_match(bus_name, normalized_hint) + + +def _keyword_match(text: str, keyword: str) -> bool: + """Return True when keyword exists in text, case-insensitively.""" + return keyword.strip().lower() in text.lower() + + +class PlayerMonitor: + """Tracks MPRIS players and forwards signal-driven state updates to session callbacks.""" + + _options: WatchOptions + _on_players_changed: Callable[[], None] + _on_seeked: Callable[[str, int], None] + _on_playback_status: Callable[[str, str], None] + _target: PlayerTarget + players: dict[str, PlayerState] + _bus: MessageBus | None + _props_cache: dict[str, object] + + def __init__( + self, + on_players_changed: Callable[[], None], + on_seeked: Callable[[str, int], None], + on_playback_status: Callable[[str, str], None], + options: WatchOptions, + target: Optional[PlayerTarget] = None, + ) -> None: + """Initialize monitor callbacks, runtime options, and player target filter.""" + self._options = options + self._on_players_changed = on_players_changed + self._on_seeked = on_seeked + self._on_playback_status = on_playback_status + self._target = target or PlayerTarget( + player_blacklist=self._options.player_blacklist + ) + self.players: dict[str, PlayerState] = {} + self._bus: MessageBus | None = None + self._props_cache: dict[str, object] = {} + + async def start(self) -> None: + """Start DBus monitoring and populate initial player snapshot.""" + self._bus = await MessageBus(bus_type=BusType.SESSION).connect() + self._bus.add_message_handler(self._on_message) + await self._add_match_rules() + await self.refresh() + + async def close(self) -> None: + """Stop DBus monitoring and close bus connection.""" + self._props_cache.clear() + if self._bus: + self._bus.disconnect() + self._bus = None + + async def _get_player_props(self, bus_name: str) -> object | None: + """Return cached DBus Properties interface for player, creating it if missing.""" + if not self._bus: + return None + if bus_name in self._props_cache: + return self._props_cache[bus_name] + + try: + introspection = await self._bus.introspect( + bus_name, "/org/mpris/MediaPlayer2" + ) + proxy = self._bus.get_proxy_object( + bus_name, "/org/mpris/MediaPlayer2", introspection + ) + props = proxy.get_interface("org.freedesktop.DBus.Properties") + self._props_cache[bus_name] = props + return props + except Exception as e: + logger.debug(f"Failed to prepare DBus props for {bus_name}: {e}") + self._props_cache.pop(bus_name, None) + return None + + async def _add_match_rules(self) -> None: + """Register signal subscriptions needed by monitor.""" + if not self._bus: + return + rules = [ + "type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged'", + "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'", + "type='signal',interface='org.mpris.MediaPlayer2.Player',member='Seeked'", + ] + for rule in rules: + try: + await self._bus.call( + Message( + destination="org.freedesktop.DBus", + path="/org/freedesktop/DBus", + interface="org.freedesktop.DBus", + member="AddMatch", + signature="s", + body=[rule], + ) + ) + except Exception as e: + logger.debug(f"Failed to add DBus match rule {rule}: {e}") + + async def _list_mpris_players(self) -> list[str]: + """List visible MPRIS players after applying blacklist and target filter.""" + if not self._bus: + return [] + try: + reply = await self._bus.call( + Message( + destination="org.freedesktop.DBus", + path="/org/freedesktop/DBus", + interface="org.freedesktop.DBus", + member="ListNames", + ) + ) + if not reply or not reply.body: + return [] + out: list[str] = [] + for name in reply.body[0]: + if not name.startswith("org.mpris.MediaPlayer2."): + continue + if any( + x.lower() in name.lower() for x in self._options.player_blacklist + ): + continue + if not self._target.allows(name): + continue + out.append(name) + return out + except Exception as e: + logger.debug(f"Failed to list mpris players: {e}") + return [] + + async def _fetch_player_state(self, bus_name: str) -> Optional[PlayerState]: + """Read current playback status and metadata from one player service.""" + props = await self._get_player_props(bus_name) + if props is None: + return None + try: + status_var = await getattr(props, "call_get")( + "org.mpris.MediaPlayer2.Player", "PlaybackStatus" + ) + metadata_var = await getattr(props, "call_get")( + "org.mpris.MediaPlayer2.Player", "Metadata" + ) + status = status_var.value if status_var else "Stopped" + track = self._track_from_metadata( + metadata_var.value if metadata_var else {} + ) + return PlayerState(bus_name=bus_name, status=status, track=track) + except Exception as e: + logger.debug(f"Failed to read state for {bus_name}: {e}") + self._props_cache.pop(bus_name, None) + return None + + def _track_from_metadata(self, metadata: dict[str, object]) -> Optional[TrackMeta]: + """Build TrackMeta object from MPRIS metadata map.""" + if not metadata: + return None + trackid = metadata.get("mpris:trackid") + if trackid is not None: + trackid = _variant_value(trackid) + if isinstance(trackid, str) and trackid.startswith("spotify:track:"): + trackid = trackid.removeprefix("spotify:track:") + elif isinstance(trackid, str) and trackid.startswith("/com/spotify/track/"): + trackid = trackid.removeprefix("/com/spotify/track/") + elif not isinstance(trackid, str): + trackid = None + + length = metadata.get("mpris:length") + length_ms = None + length_value = _variant_value(length) if length is not None else None + if isinstance(length_value, int): + length_ms = length_value // 1000 + + artist = metadata.get("xesam:artist") + artist_v = None + artist_value = _variant_value(artist) if artist is not None else None + if isinstance(artist_value, list) and artist_value: + artist_v = artist_value[0] + + title = metadata.get("xesam:title") + album = metadata.get("xesam:album") + url = metadata.get("xesam:url") + + title_value = _variant_value(title) if title is not None else None + album_value = _variant_value(album) if album is not None else None + url_value = _variant_value(url) if url is not None else None + + return TrackMeta( + trackid=trackid, + length=length_ms, + album=album_value if isinstance(album_value, str) else None, + artist=artist_v, + title=title_value if isinstance(title_value, str) else None, + url=url_value if isinstance(url_value, str) else None, + ) + + async def refresh(self) -> None: + """Refresh full player snapshot and notify session when visible set changes.""" + players = await self._list_mpris_players() + updated: dict[str, PlayerState] = {} + for bus_name in players: + st = await self._fetch_player_state(bus_name) + if st is not None: + updated[bus_name] = st + + before = set(self.players.keys()) + after = set(updated.keys()) + added = sorted(after - before) + removed = sorted(before - after) + + for bus_name in removed: + self._props_cache.pop(bus_name, None) + + self.players = updated + + if added or removed: + logger.info( + "MPRIS players updated: added={}, removed={}", + added, + removed, + ) + + self._on_players_changed() + + async def _resolve_well_known_name(self, unique_sender: str) -> str | None: + """Map a DBus unique sender (e.g. :1.42) to a tracked MPRIS bus name.""" + if unique_sender in self.players: + return unique_sender + if not self._bus: + return None + + for bus_name in self.players: + try: + reply = await self._bus.call( + Message( + destination="org.freedesktop.DBus", + path="/org/freedesktop/DBus", + interface="org.freedesktop.DBus", + member="GetNameOwner", + signature="s", + body=[bus_name], + ) + ) + if reply and reply.body and str(reply.body[0]) == unique_sender: + return bus_name + except Exception: + continue + return None + + async def _handle_seeked_signal(self, sender: str, position_ms: int) -> None: + """Route Seeked signal to session using well-known bus name when possible.""" + bus_name = await self._resolve_well_known_name(sender) + if bus_name is not None: + self._on_seeked(bus_name, position_ms) + return + + # If we cannot map sender reliably, force a state refresh to converge. + await self.refresh() + + def _on_message(self, message: Message) -> bool: + """Low-level DBus signal handler for player lifecycle/status/seek events.""" + try: + if ( + message.interface == "org.freedesktop.DBus" + and message.member == "NameOwnerChanged" + ): + if message.body and str(message.body[0]).startswith( + "org.mpris.MediaPlayer2." + ): + asyncio.create_task(self.refresh()) + return False + + if ( + message.interface == "org.freedesktop.DBus.Properties" + and message.member == "PropertiesChanged" + ): + # Message.sender is a DBus unique name, so match by path+iface. + path_ok = message.path == "/org/mpris/MediaPlayer2" + iface = message.body[0] if message.body else None + if path_ok and iface == "org.mpris.MediaPlayer2.Player": + asyncio.create_task(self.refresh()) + return False + + if ( + message.interface == "org.mpris.MediaPlayer2.Player" + and message.member == "Seeked" + ): + sender = message.sender or "" + if sender and message.body: + position_us = int(message.body[0]) + asyncio.create_task( + self._handle_seeked_signal( + sender, + max(0, position_us // 1000), + ) + ) + return False + except Exception as e: + logger.debug(f"PlayerMonitor signal handling error: {e}") + return False + + async def get_position_ms(self, bus_name: str) -> Optional[int]: + """Read player-reported position in milliseconds.""" + props = await self._get_player_props(bus_name) + if props is None: + return None + try: + position_var = await getattr(props, "call_get")( + "org.mpris.MediaPlayer2.Player", "Position" + ) + if position_var is None: + return None + return max(0, int(position_var.value) // 1000) + except Exception as e: + logger.debug(f"Failed to read position from {bus_name}: {e}") + self._props_cache.pop(bus_name, None) + return None + + +class ActivePlayerSelector: + @staticmethod + def select( + players: dict[str, PlayerState], + last_active: str | None, + options: WatchOptions, + ) -> str | None: + """Select active player by playing state, preferred keyword, and continuity.""" + if not players: + return None + + playing = [name for name, st in players.items() if st.status == "Playing"] + if len(playing) == 1: + return playing[0] + + preferred = options.preferred_player.lower().strip() + candidates = playing if playing else list(players.keys()) + if preferred: + for name in candidates: + if preferred in name.lower(): + return name + + if last_active and last_active in players: + return last_active + + return candidates[0] if candidates else None diff --git a/src/lrx_cli/watch/session.py b/src/lrx_cli/watch/session.py new file mode 100644 index 0000000..983b3f7 --- /dev/null +++ b/src/lrx_cli/watch/session.py @@ -0,0 +1,403 @@ +"""Watch orchestration with explicit MVVM role boundaries. + +- Model: WatchModel stores domain state. +- ViewModel: WatchViewModel projects model to output-facing state/signature. +- Coordinator: WatchCoordinator wires services and drives async workflows. +""" + +import asyncio +from dataclasses import asdict +from typing import Optional, Protocol + +from loguru import logger + +from ..fetchers import FetcherMethodType +from ..lrc import LRCData +from ..models import LyricResult +from ..models import TrackMeta +from .control import ControlServer +from .fetcher import LyricFetcher +from .options import WatchOptions +from .view import BaseOutput, LyricView, WatchState +from .player import ActivePlayerSelector, PlayerMonitor, PlayerTarget +from .tracker import PositionTracker + + +class FetchManager(Protocol): + """Protocol for lyric fetch manager consumed by watch session.""" + + def fetch_for_track( + self, + track: TrackMeta, + force_method: FetcherMethodType | None = None, + bypass_cache: bool = False, + allow_unsynced: bool = False, + ) -> Optional[LyricResult]: + """Fetch lyrics for one track.""" + + +class WatchModel: + """Model layer that owns watch state and lyric timeline representation.""" + + offset_ms: int + active_player: str | None + active_track_key: str | None + status: str + lyrics: LyricView | None + + def __init__(self) -> None: + self.offset_ms = 0 + self.active_player: str | None = None + self.active_track_key: str | None = None + self.status: str = "idle" + self.lyrics: LyricView | None = None + + def set_lyrics(self, lyrics: LRCData | None) -> None: + """Update lyrics and rebuild projection once per lyric object change.""" + if lyrics is None: + self.lyrics = None + return + + self.lyrics = LyricView.from_lrc(lyrics) + + def state_signature(self, track: TrackMeta | None, position_ms: int) -> tuple: + """Build dedupe signature from model state and current lyric cursor.""" + track_key = ( + track.trackid + if track and track.trackid + else track.display_name() + if track + else None + ) + + if self.status != "ok" or self.lyrics is None: + return ("status", self.status, self.active_player, track_key) + at_ms = position_ms + self.offset_ms + cursor = self.lyrics.signature_cursor(at_ms) + return ("lyrics", self.active_player, track_key, cursor) + + +class WatchViewModel: + """ViewModel that projects WatchModel into view-consumable snapshots.""" + + _model: WatchModel + + def __init__(self, model: WatchModel) -> None: + self._model = model + + def signature(self, track: TrackMeta | None, position_ms: int) -> tuple: + """Build dedupe signature for current projected state.""" + return self._model.state_signature(track, position_ms) + + def state(self, track: TrackMeta | None, position_ms: int) -> WatchState: + """Project model values into immutable WatchState payload.""" + return WatchState( + track=track, + lyrics=self._model.lyrics, + position_ms=position_ms, + offset_ms=self._model.offset_ms, + status=self._model.status, # type: ignore[arg-type] + ) + + +class WatchCoordinator: + """Application/service orchestration layer for watch runtime.""" + + _manager: FetchManager + _output: BaseOutput + _options: WatchOptions + _model: WatchModel + _view_model: WatchViewModel + _player_hint: str | None + _last_emit_signature: tuple | None + _target: PlayerTarget + _control: ControlServer + _player_monitor: PlayerMonitor + _tracker: PositionTracker + _fetcher: LyricFetcher + _emit_scheduled: bool + _calibration_task: asyncio.Task | None + + def __init__( + self, + manager: FetchManager, + output: BaseOutput, + player_hint: str | None, + options: WatchOptions, + ) -> None: + self._manager = manager + self._output = output + self._options = options + self._model = WatchModel() + self._view_model = WatchViewModel(self._model) + self._player_hint = player_hint + self._last_emit_signature: tuple | None = None + self._emit_scheduled = False + self._calibration_task = None + + self._target = PlayerTarget( + hint=player_hint, + player_blacklist=self._options.player_blacklist, + ) + + self._control = ControlServer(options=self._options) + self._player_monitor = PlayerMonitor( + on_players_changed=self._on_player_change, + on_seeked=self._on_seeked, + on_playback_status=self._on_playback_status, + options=self._options, + target=self._target, + ) + self._tracker = PositionTracker( + poll_position_ms=self._player_monitor.get_position_ms, + options=self._options, + on_tick=self._on_tracker_tick, + ) + self._fetcher = LyricFetcher( + fetch_func=self._fetch_lyrics, + on_fetching=self._on_fetching, + on_result=self._on_lyrics_update, + options=self._options, + ) + + async def run(self) -> bool: + """Run watch workflow and return success flag.""" + target_issue = self._target.validation_error() + if target_issue: + logger.error(target_issue) + return False + + logger.info( + "watch session starting (player filter: {})", + self._player_hint or "", + ) + + if not await self._control.start(self): + return False + try: + await self._player_monitor.start() + await self._tracker.start() + self._calibration_task = asyncio.create_task(self._calibration_loop()) + self._schedule_emit() + await asyncio.Event().wait() + return True + except asyncio.CancelledError: + return True + except Exception as exc: + logger.exception("watch runtime error: {}", exc) + return False + finally: + logger.info("watch session stopping") + if self._calibration_task is not None: + self._calibration_task.cancel() + await asyncio.gather(self._calibration_task, return_exceptions=True) + self._calibration_task = None + await self._fetcher.stop() + await self._tracker.stop() + await self._player_monitor.close() + await self._control.stop() + + async def _calibration_loop(self) -> None: + """Periodically refresh full MPRIS snapshot as fallback calibration.""" + interval = max(0.1, self._options.calibration_interval_s) + while True: + await asyncio.sleep(interval) + try: + await self._player_monitor.refresh() + except asyncio.CancelledError: + raise + except Exception as exc: + logger.debug("mpris calibration refresh failed: {}", exc) + + def _active_track(self) -> TrackMeta | None: + """Return active track metadata from selected player.""" + player = self._player_monitor.players.get(self._model.active_player or "") + return player.track if player else None + + def _request_fetch_for_active_track(self, reason: str) -> bool: + """Trigger lyric fetch for active track when needed.""" + track = self._active_track() + if track is None: + return False + if self._model.lyrics is not None: + return False + if self._model.status == "fetching": + return False + logger.info("fetching lyrics for track ({}): {}", reason, track.display_name()) + self._fetcher.request(track) + return True + + async def _fetch_lyrics(self, track: TrackMeta) -> Optional[LRCData]: + """Fetch lyrics in worker thread.""" + result = await asyncio.to_thread( + self._manager.fetch_for_track, + track, + None, + False, + True, + ) + if result and result.lyrics: + return result.lyrics + return None + + def _on_player_change(self) -> None: + """React to monitor player snapshot change.""" + prev_player = self._model.active_player + prev_track_key = self._model.active_track_key + + selected = ActivePlayerSelector.select( + self._player_monitor.players, + self._model.active_player, + self._options, + ) + self._model.active_player = selected + + if selected != prev_player: + logger.info( + "active player changed: {} -> {}", + prev_player or "", + selected or "", + ) + + if selected is None: + self._model.status = "idle" + self._model.active_track_key = None + self._model.set_lyrics(None) + self._schedule_emit() + return + + state = self._player_monitor.players.get(selected) + if state is None: + self._model.status = "idle" + self._model.active_track_key = None + self._model.set_lyrics(None) + self._schedule_emit() + return + + track = state.track + track_key = ( + track.trackid + if track and track.trackid + else track.display_name() + if track + else None + ) + + track_changed = track_key != prev_track_key + player_changed = selected != prev_player + if track_changed or player_changed: + self._model.set_lyrics(None) + + self._model.active_track_key = track_key + + asyncio.create_task( + self._tracker.set_active_player( + selected, + state.status, + track_key, + ) + ) + + if state.status != "Playing": + self._model.status = "paused" + self._schedule_emit() + return + + started_fetch = False + if track is not None and ( + player_changed or track_changed or self._model.lyrics is None + ): + started_fetch = self._request_fetch_for_active_track("track-changed") + + if self._model.lyrics is not None: + self._model.status = "ok" + elif started_fetch: + self._model.status = "fetching" + else: + self._model.status = "paused" + self._schedule_emit() + + def _on_seeked(self, bus_name: str, position_ms: int) -> None: + """Forward seek event to tracker.""" + asyncio.create_task(self._tracker.on_seeked(bus_name, position_ms)) + + def _on_playback_status(self, bus_name: str, status: str) -> None: + """React to playback status change and tracker sync.""" + if bus_name == self._model.active_player: + if status == "Playing": + started_fetch = self._request_fetch_for_active_track("resume-playing") + if self._model.lyrics is not None: + self._model.status = "ok" + elif started_fetch: + self._model.status = "fetching" + else: + self._model.status = "paused" + else: + self._model.status = "paused" + self._schedule_emit() + asyncio.create_task(self._tracker.on_playback_status(bus_name, status)) + + def _on_tracker_tick(self) -> None: + """Emit updates from tracker tick only while lyrics are actively rendering.""" + if self._model.status == "ok": + self._schedule_emit() + + def _schedule_emit(self) -> None: + """Coalesce frequent events into at most one in-flight emit task.""" + if self._emit_scheduled: + return + self._emit_scheduled = True + asyncio.create_task(self._run_scheduled_emit()) + + async def _run_scheduled_emit(self) -> None: + """Run one coalesced emit and release scheduler gate.""" + try: + await self._emit_state() + finally: + self._emit_scheduled = False + + async def _on_fetching(self) -> None: + """Mark model as fetching and emit state.""" + self._model.status = "fetching" + await self._emit_state() + + async def _on_lyrics_update(self, lyrics: Optional[LRCData]) -> None: + """Update model with fetched lyrics and emit state.""" + self._model.set_lyrics(lyrics) + self._model.status = "ok" if lyrics is not None else "no_lyrics" + logger.info( + "lyrics update result: {}", + "found" if lyrics is not None else "not found", + ) + await self._emit_state() + + async def _emit_state(self) -> None: + """Emit output state only when semantic signature changes.""" + player = self._player_monitor.players.get(self._model.active_player or "") + track = player.track if player else None + position = await self._tracker.get_position_ms() + + signature = self._view_model.signature(track, position) + if signature == self._last_emit_signature: + return + self._last_emit_signature = signature + state = self._view_model.state(track, position) + await self._output.on_state(state) + + def handle_offset(self, delta: int) -> dict: + """Apply offset update requested by control channel.""" + self._model.offset_ms += delta + return {"ok": True, "offset_ms": self._model.offset_ms} + + def handle_status(self) -> dict: + """Return status payload for control channel.""" + player = self._player_monitor.players.get(self._model.active_player or "") + track = asdict(player.track) if player and player.track else None + return { + "ok": True, + "offset_ms": self._model.offset_ms, + "player": self._model.active_player, + "track": track, + "position_ms": self._tracker.peek_position_ms(), + "lyrics_status": self._model.status, + } diff --git a/src/lrx_cli/watch/tracker.py b/src/lrx_cli/watch/tracker.py new file mode 100644 index 0000000..c0438bf --- /dev/null +++ b/src/lrx_cli/watch/tracker.py @@ -0,0 +1,143 @@ +"""Playback position tracking utilities for watch mode.""" + +import asyncio +import time +from typing import Awaitable, Callable, Optional + +from .options import WatchOptions + + +class PositionTracker: + """Maintains an estimated playback position from seek/status events plus local clock.""" + + _options: WatchOptions + _poll_position_ms: Callable[[str], Awaitable[Optional[int]]] + _active_player: str | None + _is_playing: bool + _track_key: str | None + _position_ms: int + _last_tick: float + _fast_task: asyncio.Task | None + _on_tick: Callable[[], None] | None + _lock: asyncio.Lock + + def __init__( + self, + poll_position_ms: Callable[[str], Awaitable[Optional[int]]], + options: WatchOptions, + on_tick: Callable[[], None] | None = None, + ) -> None: + """Initialize tracker with position polling callback and runtime options.""" + self._options = options + self._poll_position_ms = poll_position_ms + self._on_tick = on_tick + self._active_player: str | None = None + self._is_playing = False + self._track_key: str | None = None + self._position_ms = 0 + self._last_tick = time.monotonic() + self._fast_task: asyncio.Task | None = None + self._lock = asyncio.Lock() + + async def start(self) -> None: + """Start local monotonic position ticking task.""" + self._last_tick = time.monotonic() + self._fast_task = asyncio.create_task(self._fast_loop()) + + async def stop(self) -> None: + """Stop tracker tasks and await clean cancellation.""" + tasks = [t for t in (self._fast_task,) if t is not None] + for task in tasks: + task.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + self._fast_task = None + + async def set_active_player( + self, + bus_name: str | None, + playback_status: str, + track_key: str | None, + ) -> None: + """Switch active source and calibrate position once when entering a new playing track.""" + should_calibrate_now = False + async with self._lock: + player_changed = self._active_player != bus_name + track_changed = self._track_key != track_key + was_playing = self._is_playing + self._active_player = bus_name + self._is_playing = playback_status == "Playing" + status_changed_to_playing = self._is_playing and not was_playing + if player_changed or track_changed: + self._position_ms = 0 + should_calibrate_now = ( + self._is_playing + and bool(self._active_player) + and (player_changed or track_changed or status_changed_to_playing) + ) + self._track_key = track_key + self._last_tick = time.monotonic() + + if should_calibrate_now and self._active_player: + await self._calibrate_once(self._active_player) + + async def on_seeked(self, bus_name: str, position_ms: int) -> None: + """Apply explicit seek position update for active player.""" + async with self._lock: + if bus_name != self._active_player: + return + self._position_ms = max(0, position_ms) + self._last_tick = time.monotonic() + + async def on_playback_status(self, bus_name: str, playback_status: str) -> None: + """Update playing state and calibrate once on paused-to-playing transition.""" + should_calibrate_now = False + async with self._lock: + if bus_name != self._active_player: + return + was_playing = self._is_playing + self._is_playing = playback_status == "Playing" + should_calibrate_now = self._is_playing and not was_playing + self._last_tick = time.monotonic() + + if should_calibrate_now: + await self._calibrate_once(bus_name) + + async def _fast_loop(self) -> None: + """Advance position by monotonic clock while active player is playing.""" + interval = self._options.position_tick_ms / 1000.0 + while True: + await asyncio.sleep(interval) + should_notify = False + async with self._lock: + now = time.monotonic() + if self._is_playing and self._active_player: + delta_ms = int((now - self._last_tick) * 1000) + if delta_ms > 0: + self._position_ms += delta_ms + should_notify = True + self._last_tick = now + + if should_notify and self._on_tick is not None: + self._on_tick() + + async def _calibrate_once(self, bus_name: str) -> None: + """Poll player-reported position once and synchronize local tracker state.""" + polled = await self._poll_position_ms(bus_name) + if polled is None: + return + async with self._lock: + if bus_name != self._active_player: + return + # Drift correction is signal-assisted; polling is fallback. + self._position_ms = max(0, polled) + self._last_tick = time.monotonic() + + async def get_position_ms(self) -> int: + """Return current tracked position in milliseconds.""" + async with self._lock: + return max(0, int(self._position_ms)) + + def peek_position_ms(self) -> int: + """Return current tracked position without awaiting lock (best-effort snapshot).""" + return max(0, int(self._position_ms)) diff --git a/src/lrx_cli/watch/view/__init__.py b/src/lrx_cli/watch/view/__init__.py new file mode 100644 index 0000000..699ca1e --- /dev/null +++ b/src/lrx_cli/watch/view/__init__.py @@ -0,0 +1,80 @@ +"""Output abstraction types for watch mode rendering.""" + +from abc import ABC, abstractmethod +from bisect import bisect_right +from dataclasses import dataclass +from typing import Literal, Optional + +from ...lrc import LRCData, LyricLine +from ...models import TrackMeta + + +@dataclass(slots=True, frozen=True) +class LyricView: + """View-ready immutable lyric data projected from one normalized LRC object.""" + + normalized: LRCData + lines: tuple[str, ...] + timed_line_entries: tuple[tuple[int, int], ...] + timestamps: tuple[int, ...] + + @staticmethod + def from_lrc(lyrics: LRCData) -> "LyricView": + """Build a view projection once from normalized lyrics.""" + normalized = lyrics.normalize() + + lines: list[str] = [] + entries: list[tuple[int, int]] = [] + + line_index = 0 + for line in normalized.lines: + if not isinstance(line, LyricLine): + continue + text = line.text + lines.append(text) + timestamp = line.line_times_ms[0] if line.line_times_ms else 0 + entries.append((max(0, timestamp), line_index)) + line_index += 1 + + timestamps = tuple(timestamp for timestamp, _ in entries) + return LyricView( + normalized=normalized, + lines=tuple(lines), + timed_line_entries=tuple(entries), + timestamps=timestamps, + ) + + def signature_cursor(self, at_ms: int) -> tuple: + """Build a stable cursor signature for dedupe decisions.""" + if not self.timed_line_entries: + return ("plain", self.lines) + + first_ts = self.timed_line_entries[0][0] + if at_ms < first_ts: + return ("before_first", first_ts) + + idx = bisect_right(self.timestamps, at_ms) - 1 + if idx < 0: + idx = 0 + + ts, line_idx = self.timed_line_entries[idx] + text = self.lines[line_idx] if line_idx < len(self.lines) else "" + return ("ok", idx, ts, text) + + +@dataclass(slots=True) +class WatchState: + """Immutable snapshot payload delivered from session to output implementations.""" + + track: Optional[TrackMeta] + lyrics: Optional[LyricView] + position_ms: int + offset_ms: int + status: Literal["fetching", "ok", "no_lyrics", "paused", "idle"] + + +class BaseOutput(ABC): + @abstractmethod + async def on_state(self, state: WatchState) -> None: + """Render or deliver one watch state frame.""" + ... diff --git a/src/lrx_cli/watch/view/pipe.py b/src/lrx_cli/watch/view/pipe.py new file mode 100644 index 0000000..e105510 --- /dev/null +++ b/src/lrx_cli/watch/view/pipe.py @@ -0,0 +1,85 @@ +"""Pipe output implementation for watch mode.""" + +from bisect import bisect_right +from dataclasses import dataclass +import sys + +from . import BaseOutput, WatchState + + +@dataclass(slots=True) +class PipeOutput(BaseOutput): + """Render a fixed lyric context window to stdout for streaming/pipe usage.""" + + before: int = 0 + after: int = 0 + + def _window_size(self) -> int: + """Return rendered lyric window size.""" + return self.before + 1 + self.after + + def _render_status(self, message: str) -> list[str]: + """Render centered status line in fixed-size window.""" + lines = [""] * self._window_size() + lines[self.before] = message + return lines + + def _render_lyrics(self, state: WatchState) -> list[str]: + """Render context lines centered on current timed lyric entry.""" + if state.lyrics is None: + return self._render_status("[no lyrics]") + + all_lines = state.lyrics.lines + if not all_lines: + return self._render_status("[no lyrics]") + entries = state.lyrics.timed_line_entries + + effective_ms = state.position_ms + state.offset_ms + current_line_idx: int | None + if entries and effective_ms < entries[0][0]: + # Before first timestamp, current lyric is empty and after-window shows upcoming lines. + current_line_idx = None + else: + if not entries: + current_line_idx = 0 + else: + current_entry_idx = ( + bisect_right(state.lyrics.timestamps, effective_ms) - 1 + ) + if current_entry_idx < 0: + current_entry_idx = 0 + current_line_idx = entries[current_entry_idx][1] + + out: list[str] = [] + for rel in range(-self.before, self.after + 1): + if current_line_idx is None: + if rel <= 0: + out.append("") + continue + line_idx = rel - 1 + else: + line_idx = current_line_idx + rel + + if 0 <= line_idx < len(all_lines): + out.append(all_lines[line_idx]) + else: + out.append("") + + return out + + async def on_state(self, state: WatchState) -> None: + """Render and flush one frame for the latest watch state.""" + if state.status == "fetching": + lines = self._render_status("[fetching...]") + elif state.status == "no_lyrics": + lines = self._render_status("[no lyrics]") + elif state.status == "paused": + lines = self._render_status("[paused]") + elif state.status == "idle": + lines = self._render_status("[idle]") + else: + lines = self._render_lyrics(state) + + for line in lines: + print(line) + sys.stdout.flush() diff --git a/tests/test_watch.py b/tests/test_watch.py new file mode 100644 index 0000000..c381cb3 --- /dev/null +++ b/tests/test_watch.py @@ -0,0 +1,462 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path + +from lrx_cli.lrc import LRCData +from lrx_cli.models import TrackMeta +from lrx_cli.watch.control import ControlClient, ControlServer, parse_delta +from lrx_cli.watch.view import BaseOutput, LyricView, WatchState +from lrx_cli.watch.view.pipe import PipeOutput +from lrx_cli.watch.player import ActivePlayerSelector, PlayerState, PlayerTarget +from lrx_cli.watch.fetcher import LyricFetcher +from lrx_cli.watch.options import WatchOptions +from lrx_cli.watch.tracker import PositionTracker +from lrx_cli.watch.session import WatchCoordinator + + +TEST_WATCH_OPTIONS = WatchOptions( + preferred_player="spotify", + player_blacklist=(), + debounce_ms=400, + position_tick_ms=50, + calibration_interval_s=3.0, + socket_path=Path("/tmp/lrx-watch-test.sock"), +) + + +def test_parse_delta_supports_plus_minus_and_reset() -> None: + assert parse_delta("+200") == (True, 200, None) + assert parse_delta("-150") == (True, -150, None) + assert parse_delta("0") == (True, 0, None) + + +def test_player_target_allows_all_when_hint_empty() -> None: + target = PlayerTarget() + + assert target.allows("org.mpris.MediaPlayer2.spotify") is True + assert target.allows("org.mpris.MediaPlayer2.mpd") is True + + +def test_player_target_filters_by_case_insensitive_substring() -> None: + target = PlayerTarget("Spot") + + assert target.allows("org.mpris.MediaPlayer2.spotify") is True + assert target.allows("org.mpris.MediaPlayer2.mpd") is False + + +def test_player_target_reports_blacklisted_hint() -> None: + target = PlayerTarget("spot", player_blacklist=("spotify",)) + assert target.validation_error() is not None + + +def test_active_player_selector_prefers_single_playing() -> None: + players = { + "org.mpris.MediaPlayer2.foo": PlayerState( + bus_name="org.mpris.MediaPlayer2.foo", + status="Paused", + track=TrackMeta(title="A"), + ), + "org.mpris.MediaPlayer2.bar": PlayerState( + bus_name="org.mpris.MediaPlayer2.bar", + status="Playing", + track=TrackMeta(title="B"), + ), + } + assert ( + ActivePlayerSelector.select(players, None, TEST_WATCH_OPTIONS) + == "org.mpris.MediaPlayer2.bar" + ) + + +def test_active_player_selector_uses_last_active_when_no_playing() -> None: + players = { + "org.mpris.MediaPlayer2.foo": PlayerState( + bus_name="org.mpris.MediaPlayer2.foo", + status="Paused", + track=TrackMeta(title="A"), + ), + "org.mpris.MediaPlayer2.bar": PlayerState( + bus_name="org.mpris.MediaPlayer2.bar", + status="Stopped", + track=TrackMeta(title="B"), + ), + } + + assert ( + ActivePlayerSelector.select( + players, + "org.mpris.MediaPlayer2.bar", + TEST_WATCH_OPTIONS, + ) + == "org.mpris.MediaPlayer2.bar" + ) + + +def test_position_tracker_seeked_calibrates_immediately() -> None: + async def _run() -> None: + async def _poll(_bus: str): + return 1200 + + tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS) + await tracker.start() + await tracker.set_active_player( + "org.mpris.MediaPlayer2.foo", "Playing", "track-A" + ) + await tracker.on_seeked("org.mpris.MediaPlayer2.foo", 3500) + pos = await tracker.get_position_ms() + await tracker.stop() + assert pos >= 3500 + + asyncio.run(_run()) + + +def test_position_tracker_playback_status_pause_stops_fast_growth() -> None: + async def _run() -> None: + async def _poll(_bus: str): + return 0 + + tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS) + await tracker.start() + await tracker.set_active_player( + "org.mpris.MediaPlayer2.foo", "Playing", "track-A" + ) + await asyncio.sleep(0.08) + before = await tracker.get_position_ms() + + await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Paused") + await asyncio.sleep(0.08) + after = await tracker.get_position_ms() + await tracker.stop() + + assert before > 0 + assert after - before < 20 + + asyncio.run(_run()) + + +def test_position_tracker_playback_status_playing_calibrates_once() -> None: + async def _run() -> None: + async def _poll(_bus: str): + return 50000 + + tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS) + await tracker.start() + await tracker.set_active_player( + "org.mpris.MediaPlayer2.foo", "Paused", "track-A" + ) + await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Playing") + pos = await tracker.get_position_ms() + await tracker.stop() + + assert pos >= 50000 + + asyncio.run(_run()) + + +def test_position_tracker_set_active_player_playing_calibrates_on_resume() -> None: + async def _run() -> None: + async def _poll(_bus: str): + return 42000 + + tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS) + await tracker.start() + await tracker.set_active_player( + "org.mpris.MediaPlayer2.foo", "Paused", "track-A" + ) + await tracker.set_active_player( + "org.mpris.MediaPlayer2.foo", "Playing", "track-A" + ) + pos = await tracker.get_position_ms() + await tracker.stop() + + assert pos >= 42000 + + asyncio.run(_run()) + + +def test_control_server_and_client_roundtrip(tmp_path: Path) -> None: + async def _run() -> None: + class _Session: + def __init__(self): + self.offset = 0 + + def handle_offset(self, delta: int) -> dict: + self.offset += delta + return {"ok": True, "offset_ms": self.offset} + + def handle_status(self) -> dict: + return {"ok": True, "offset_ms": self.offset, "lyrics_status": "idle"} + + socket_path = tmp_path / "watch.sock" + server = ControlServer(socket_path=socket_path, options=TEST_WATCH_OPTIONS) + session = _Session() + + await server.start(session) + client = ControlClient(socket_path=socket_path, options=TEST_WATCH_OPTIONS) + r1 = await client._send_async({"cmd": "offset", "delta": 200}) + r2 = await client._send_async({"cmd": "status"}) + await server.stop() + + assert r1 == {"ok": True, "offset_ms": 200} + assert r2["ok"] is True + assert r2["offset_ms"] == 200 + + asyncio.run(_run()) + + +def test_pipe_output_prints_fixed_window_for_status(capsys) -> None: + output = PipeOutput(before=1, after=1) + state = WatchState( + track=None, + lyrics=None, + position_ms=0, + offset_ms=0, + status="fetching", + ) + + asyncio.run(output.on_state(state)) + + printed = capsys.readouterr().out + assert printed == "\n[fetching...]\n\n" + + +def test_pipe_output_uses_context_window_for_lyrics(capsys) -> None: + output = PipeOutput(before=1, after=1) + lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + state = WatchState( + track=TrackMeta(title="Song"), + lyrics=LyricView.from_lrc(lyrics), + position_ms=2100, + offset_ms=0, + status="ok", + ) + + asyncio.run(output.on_state(state)) + + printed = capsys.readouterr().out + assert printed == "a\nb\nc\n" + + +def test_pipe_output_shows_upcoming_lines_before_first_timestamp(capsys) -> None: + output = PipeOutput(before=1, after=1) + lyrics = LRCData("[00:02.00]a\n[00:03.00]b") + state = WatchState( + track=TrackMeta(title="Song"), + lyrics=LyricView.from_lrc(lyrics), + position_ms=0, + offset_ms=0, + status="ok", + ) + + asyncio.run(output.on_state(state)) + + printed = capsys.readouterr().out + assert printed == "\n\na\n" + + +def test_pipe_output_first_line_keeps_before_region_empty(capsys) -> None: + output = PipeOutput(before=1, after=1) + lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + state = WatchState( + track=TrackMeta(title="Song"), + lyrics=LyricView.from_lrc(lyrics), + position_ms=1100, + offset_ms=0, + status="ok", + ) + + asyncio.run(output.on_state(state)) + + printed = capsys.readouterr().out + assert printed == "\na\nb\n" + + +def test_pipe_output_last_line_keeps_after_region_empty(capsys) -> None: + output = PipeOutput(before=1, after=1) + lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c") + state = WatchState( + track=TrackMeta(title="Song"), + lyrics=LyricView.from_lrc(lyrics), + position_ms=3100, + offset_ms=0, + status="ok", + ) + + asyncio.run(output.on_state(state)) + + printed = capsys.readouterr().out + assert printed == "b\nc\n\n" + + +def test_pipe_output_repeated_text_uses_correct_timed_occurrence(capsys) -> None: + output = PipeOutput(before=1, after=1) + lyrics = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C") + state = WatchState( + track=TrackMeta(title="Song"), + lyrics=LyricView.from_lrc(lyrics), + position_ms=4100, + offset_ms=0, + status="ok", + ) + + asyncio.run(output.on_state(state)) + + printed = capsys.readouterr().out + assert printed == "B\nX\nC\n" + + +def test_session_fetches_on_resume_playing_without_lyrics() -> None: + async def _run() -> None: + class _Manager: + def fetch_for_track(self, *_args, **_kwargs): + return None + + class _Output(BaseOutput): + async def on_state(self, state: WatchState) -> None: + return None + + class _Fetcher(LyricFetcher): + def __init__(self): + async def _fetch(_track: TrackMeta): + return None + + async def _on_fetching() -> None: + return None + + async def _on_result(_lyrics) -> None: + return None + + super().__init__(_fetch, _on_fetching, _on_result, TEST_WATCH_OPTIONS) + self.requested = [] + + def request(self, track: TrackMeta) -> None: + self.requested.append(track.display_name()) + + session = WatchCoordinator( + _Manager(), + _Output(), + player_hint=None, + options=TEST_WATCH_OPTIONS, + ) + fake_fetcher = _Fetcher() + session._fetcher = fake_fetcher + session._tracker = PositionTracker( + lambda _bus: asyncio.sleep(0, result=0), + TEST_WATCH_OPTIONS, + ) + + bus_name = "org.mpris.MediaPlayer2.spotify" + track = TrackMeta(title="Song", artist="Artist") + session._model.active_player = bus_name + session._player_monitor.players = { + bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track) + } + session._model.set_lyrics(None) + session._model.status = "paused" + + session._on_playback_status(bus_name, "Playing") + await asyncio.sleep(0) + + assert fake_fetcher.requested == ["Artist - Song"] + assert session._model.status == "fetching" + + asyncio.run(_run()) + + +def test_session_emit_state_only_when_lyric_cursor_changes() -> None: + async def _run() -> None: + class _Manager: + def fetch_for_track(self, *_args, **_kwargs): + return None + + class _Output(BaseOutput): + def __init__(self): + self.count = 0 + + async def on_state(self, state: WatchState) -> None: + self.count += 1 + + output = _Output() + session = WatchCoordinator( + _Manager(), + output, + player_hint=None, + options=TEST_WATCH_OPTIONS, + ) + session._tracker = PositionTracker( + lambda _bus: asyncio.sleep(0, result=0), + TEST_WATCH_OPTIONS, + ) + + bus_name = "org.mpris.MediaPlayer2.spotify" + track = TrackMeta(title="Song", artist="Artist") + session._model.active_player = bus_name + session._player_monitor.players = { + bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track) + } + session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b")) + session._model.status = "ok" + await session._tracker.set_active_player( + bus_name, + "Playing", + "Artist - Song", + ) + + await session._emit_state() + await session._emit_state() + + await session._tracker.on_seeked(bus_name, 3200) + await session._emit_state() + + assert output.count == 2 + + asyncio.run(_run()) + + +def test_session_emits_when_crossing_first_timestamp() -> None: + async def _run() -> None: + class _Manager: + def fetch_for_track(self, *_args, **_kwargs): + return None + + class _Output(BaseOutput): + def __init__(self): + self.count = 0 + + async def on_state(self, state: WatchState) -> None: + self.count += 1 + + output = _Output() + session = WatchCoordinator( + _Manager(), + output, + player_hint=None, + options=TEST_WATCH_OPTIONS, + ) + session._tracker = PositionTracker( + lambda _bus: asyncio.sleep(0, result=0), + TEST_WATCH_OPTIONS, + ) + + bus_name = "org.mpris.MediaPlayer2.spotify" + track = TrackMeta(title="Song", artist="Artist") + session._model.active_player = bus_name + session._player_monitor.players = { + bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track) + } + session._model.set_lyrics(LRCData("[00:02.00]a\n[00:03.00]b")) + session._model.status = "ok" + await session._tracker.set_active_player( + bus_name, + "Playing", + "Artist - Song", + ) + + await session._emit_state() + await session._tracker.on_seeked(bus_name, 2500) + await session._emit_state() + + assert output.count == 2 + + asyncio.run(_run())