feat: add watch command and pipe view

This commit is contained in:
2026-04-09 10:15:14 +02:00
parent 03970bf17f
commit e6b8583868
13 changed files with 1990 additions and 15 deletions
+20 -14
View File
@@ -85,9 +85,15 @@ class CacheEngine:
self.db_path = db_path self.db_path = db_path
self._init_db() self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def _init_db(self) -> None: def _init_db(self) -> None:
"""Create cache tables and run one-time slot/cache migrations.""" """Create cache tables and run one-time slot/cache migrations."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS credentials ( CREATE TABLE IF NOT EXISTS credentials (
name TEXT PRIMARY KEY, name TEXT PRIMARY KEY,
@@ -256,7 +262,7 @@ class CacheEngine:
return [] return []
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute( conn.execute(
"DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at < ?", "DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at < ?",
(key, now), (key, now),
@@ -353,7 +359,7 @@ class CacheEngine:
# Convenience for callers that still pass a single negative result. # Convenience for callers that still pass a single negative result.
kinds = [SLOT_SYNCED, SLOT_UNSYNCED] kinds = [SLOT_SYNCED, SLOT_UNSYNCED]
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
for kind in kinds: for kind in kinds:
conn.execute( conn.execute(
"""INSERT OR REPLACE INTO cache """INSERT OR REPLACE INTO cache
@@ -386,7 +392,7 @@ class CacheEngine:
def clear_all(self) -> None: def clear_all(self) -> None:
"""Remove every entry from the cache.""" """Remove every entry from the cache."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute("DELETE FROM cache") conn.execute("DELETE FROM cache")
conn.commit() conn.commit()
logger.info("Cache cleared.") logger.info("Cache cleared.")
@@ -396,7 +402,7 @@ class CacheEngine:
if not self._track_has_meta(track): if not self._track_has_meta(track):
logger.info(f"No cache entries found for {track.display_name()}.") logger.info(f"No cache entries found for {track.display_name()}.")
return return
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
cur = conn.execute( cur = conn.execute(
f"DELETE FROM cache WHERE {_TRACK_WHERE}", f"DELETE FROM cache WHERE {_TRACK_WHERE}",
_track_where_params(track), _track_where_params(track),
@@ -411,7 +417,7 @@ class CacheEngine:
def prune(self) -> int: def prune(self) -> int:
"""Remove all expired entries. Returns the number of rows deleted.""" """Remove all expired entries. Returns the number of rows deleted."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
cur = conn.execute( cur = conn.execute(
"DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", "DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
(int(time.time()),), (int(time.time()),),
@@ -439,7 +445,7 @@ class CacheEngine:
return None return None
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
rows = conn.execute( rows = conn.execute(
f"SELECT status, lyrics, source, confidence FROM cache" f"SELECT status, lyrics, source, confidence FROM cache"
@@ -495,7 +501,7 @@ class CacheEngine:
return [] return []
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
rows = conn.execute( rows = conn.execute(
"""SELECT * FROM cache """SELECT * FROM cache
@@ -557,7 +563,7 @@ class CacheEngine:
""" """
if not self._track_has_meta(track): if not self._track_has_meta(track):
return 0 return 0
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
cur = conn.execute( cur = conn.execute(
f"UPDATE cache SET confidence = ? WHERE {_TRACK_WHERE} AND source = ?", f"UPDATE cache SET confidence = ? WHERE {_TRACK_WHERE} AND source = ?",
[confidence] + _track_where_params(track) + [source], [confidence] + _track_where_params(track) + [source],
@@ -571,7 +577,7 @@ class CacheEngine:
"""Return all cached rows for a given track (across all sources).""" """Return all cached rows for a given track (across all sources)."""
if not self._track_has_meta(track): if not self._track_has_meta(track):
return [] return []
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return [ return [
dict(r) dict(r)
@@ -586,7 +592,7 @@ class CacheEngine:
def get_credential(self, name: str) -> Optional[dict]: def get_credential(self, name: str) -> Optional[dict]:
"""Return cached credential data if present and not expired.""" """Return cached credential data if present and not expired."""
now_ms = int(time.time() * 1000) now_ms = int(time.time() * 1000)
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
row = conn.execute( row = conn.execute(
"SELECT data FROM credentials WHERE name = ? AND (expires_at IS NULL OR expires_at > ?)", "SELECT data FROM credentials WHERE name = ? AND (expires_at IS NULL OR expires_at > ?)",
@@ -603,7 +609,7 @@ class CacheEngine:
self, name: str, data: dict, expires_at_ms: Optional[int] = None self, name: str, data: dict, expires_at_ms: Optional[int] = None
) -> None: ) -> None:
"""Persist credential data, optionally with an expiry timestamp (Unix ms).""" """Persist credential data, optionally with an expiry timestamp (Unix ms)."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute( conn.execute(
"INSERT OR REPLACE INTO credentials (name, data, expires_at) VALUES (?, ?, ?)", "INSERT OR REPLACE INTO credentials (name, data, expires_at) VALUES (?, ?, ?)",
(name, json.dumps(data), expires_at_ms), (name, json.dumps(data), expires_at_ms),
@@ -612,14 +618,14 @@ class CacheEngine:
def query_all(self) -> list[dict]: def query_all(self) -> list[dict]:
"""Return every row in the cache table.""" """Return every row in the cache table."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()] return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
def stats(self) -> dict: def stats(self) -> dict:
"""Return aggregate cache statistics.""" """Return aggregate cache statistics."""
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0] total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
expired = conn.execute( expired = conn.execute(
"SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", "SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
+102 -1
View File
@@ -7,18 +7,33 @@ Description: CLI interface.
import sys import sys
import time import time
import os import os
import asyncio
import json
from pathlib import Path from pathlib import Path
from typing import Annotated from typing import Annotated
from urllib.parse import quote from urllib.parse import quote
import cyclopts import cyclopts
from loguru import logger from loguru import logger
from .config import DB_PATH, enable_debug from .config import (
DB_PATH,
PLAYER_BLACKLIST,
PREFERRED_PLAYER,
WATCH_CALIBRATION_INTERVAL_S,
WATCH_DEBOUNCE_MS,
WATCH_POSITION_TICK_MS,
WATCH_SOCKET_PATH,
enable_debug,
)
from .models import TrackMeta from .models import TrackMeta
from .mpris import get_current_track from .mpris import get_current_track
from .core import LrcManager from .core import LrcManager
from .fetchers import FetcherMethodType from .fetchers import FetcherMethodType
from .lrc import get_sidecar_path from .lrc import get_sidecar_path
from .watch import WatchCoordinator
from .watch.control import ControlClient, parse_delta
from .watch.options import WatchOptions
from .watch.view.pipe import PipeOutput
app = cyclopts.App( app = cyclopts.App(
@@ -29,6 +44,12 @@ app.register_install_completion_command()
cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.") cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.")
app.command(cache_app) app.command(cache_app)
watch_app = cyclopts.App(name="watch", help="Watch MPRIS and output lyrics.")
app.command(watch_app)
ctl_app = cyclopts.App(name="ctl", help="Control a running watch session.")
watch_app.command(ctl_app)
# Global state set by the meta launcher # Global state set by the meta launcher
_player: str | None = None _player: str | None = None
@@ -38,6 +59,18 @@ _db_path: str | None = None
manager: LrcManager = None # type: ignore manager: LrcManager = None # type: ignore
def _build_watch_options() -> WatchOptions:
"""Build runtime watch options from CLI composition root."""
return WatchOptions(
preferred_player=PREFERRED_PLAYER,
player_blacklist=tuple(PLAYER_BLACKLIST),
debounce_ms=WATCH_DEBOUNCE_MS,
position_tick_ms=WATCH_POSITION_TICK_MS,
calibration_interval_s=WATCH_CALIBRATION_INTERVAL_S,
socket_path=WATCH_SOCKET_PATH,
)
@app.meta.default @app.meta.default
def launcher( def launcher(
*tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)], *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
@@ -357,6 +390,74 @@ def export(
sys.exit(1) sys.exit(1)
# watch subcommands
@watch_app.command
def pipe(
before: Annotated[
int,
cyclopts.Parameter(
name="--before",
help="Number of lyric lines to show before current line.",
),
] = 0,
after: Annotated[
int,
cyclopts.Parameter(
name="--after",
help="Number of lyric lines to show after current line.",
),
] = 0,
):
"""Watch active player and continuously print lyric window to stdout."""
logger.info(
"Starting watch pipe (player filter: {})",
_player or "<none>",
)
output = PipeOutput(before=max(0, before), after=max(0, after))
options = _build_watch_options()
try:
session = WatchCoordinator(
manager,
output,
player_hint=_player,
options=options,
)
success = asyncio.run(session.run())
if not success:
sys.exit(1)
except KeyboardInterrupt:
logger.info("Watch stopped.")
@ctl_app.command
def offset(delta: str) -> None:
"""Adjust watch offset. Examples: +200, -200, 0."""
parsed_ok, parsed_delta, parse_error = parse_delta(delta)
if not parsed_ok or parsed_delta is None:
logger.error(parse_error or "Invalid offset delta")
sys.exit(1)
response = ControlClient(options=_build_watch_options()).send(
{"cmd": "offset", "delta": parsed_delta}
)
if not response.get("ok"):
logger.error(response.get("error", "Unknown error"))
sys.exit(1)
print(json.dumps(response, indent=2, ensure_ascii=False))
@ctl_app.command
def status() -> None:
"""Print current watch session status as JSON."""
response = ControlClient(options=_build_watch_options()).send({"cmd": "status"})
if not response.get("ok"):
logger.error(response.get("error", "Unknown error"))
sys.exit(1)
print(json.dumps(response, indent=2, ensure_ascii=False))
# cache subcommands # cache subcommands
+11
View File
@@ -68,6 +68,17 @@ MUSIXMATCH_COOLDOWN_MS = 600_000 # 10 minutes
# Player preference (used when multiple MPRIS players are active) # Player preference (used when multiple MPRIS players are active)
PREFERRED_PLAYER = os.environ.get("PREFERRED_PLAYER", "spotify") PREFERRED_PLAYER = os.environ.get("PREFERRED_PLAYER", "spotify")
PLAYER_BLACKLIST = [
s.strip() for s in os.environ.get("PLAYER_BLACKLIST", "").split(",") if s.strip()
]
# Watch mode
WATCH_DEBOUNCE_MS = int(os.environ.get("WATCH_DEBOUNCE_MS", "400"))
WATCH_CALIBRATION_INTERVAL_S = float(
os.environ.get("WATCH_CALIBRATION_INTERVAL_S", "3.0")
)
WATCH_POSITION_TICK_MS = int(os.environ.get("WATCH_POSITION_TICK_MS", "50"))
WATCH_SOCKET_PATH = Path(CACHE_DIR) / "watch.sock"
class _Credentials: class _Credentials:
+5
View File
@@ -0,0 +1,5 @@
"""Watch subsystem public exports."""
from .session import WatchCoordinator
__all__ = ["WatchCoordinator"]
+172
View File
@@ -0,0 +1,172 @@
"""Unix-socket control channel for communicating with a running watch session."""
import asyncio
import json
from pathlib import Path
from typing import Protocol, TypeAlias
from loguru import logger
from .options import WatchOptions
JSONPrimitive: TypeAlias = str | int | float | bool | None
JSONValue: TypeAlias = JSONPrimitive | dict[str, "JSONValue"] | list["JSONValue"]
JSONDict: TypeAlias = dict[str, JSONValue]
class ControlSession(Protocol):
"""Session protocol used by control channel handlers."""
def handle_offset(self, delta: int) -> JSONDict:
"""Apply offset delta and return JSON response payload."""
...
def handle_status(self) -> JSONDict:
"""Return current session status payload."""
...
class ControlServer:
"""Control server that handles offset/status commands over a Unix socket."""
_options: WatchOptions
_socket_path: Path
_server: asyncio.AbstractServer | None
def __init__(
self,
options: WatchOptions,
socket_path: Path | None = None,
) -> None:
"""Initialize control server with explicit socket path or runtime options."""
self._options = options
resolved_socket_path = socket_path or self._options.socket_path
self._socket_path: Path = resolved_socket_path
self._server: asyncio.AbstractServer | None = None
async def start(self, session: ControlSession) -> bool:
"""Start listening for control requests and bind session handlers."""
if not await self._prepare_socket_path():
return False
self._socket_path.parent.mkdir(parents=True, exist_ok=True)
self._server = await asyncio.start_unix_server(
lambda r, w: self._handle(session, r, w),
path=str(self._socket_path),
)
return True
async def _prepare_socket_path(self) -> bool:
"""Ensure socket path is usable and reject when another session is active."""
if not self._socket_path.exists():
return True
try:
reader, writer = await asyncio.open_unix_connection(str(self._socket_path))
writer.close()
await writer.wait_closed()
logger.error(
"A watch session is already running. Use 'lrx watch ctl status'."
)
return False
except Exception:
try:
self._socket_path.unlink(missing_ok=True)
except Exception:
pass
return True
async def stop(self) -> None:
"""Stop control server and remove stale socket path."""
if self._server is not None:
self._server.close()
await self._server.wait_closed()
self._server = None
try:
self._socket_path.unlink(missing_ok=True)
except Exception:
pass
async def _handle(
self,
session: ControlSession,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Handle one control request and send JSON response."""
resp: JSONDict = {"ok": False, "error": "internal error"}
try:
line = await reader.readline()
if not line:
resp = {"ok": False, "error": "empty request"}
else:
req = json.loads(line.decode("utf-8"))
cmd = req.get("cmd")
if cmd == "offset":
delta = int(req.get("delta", 0))
resp = session.handle_offset(delta)
elif cmd == "status":
resp = session.handle_status()
else:
resp = {"ok": False, "error": "unknown command"}
except Exception as e:
resp = {"ok": False, "error": str(e)}
finally:
writer.write((json.dumps(resp) + "\n").encode("utf-8"))
await writer.drain()
writer.close()
await writer.wait_closed()
class ControlClient:
"""Control client used by CLI commands to talk to active watch session."""
_options: WatchOptions
_socket_path: Path
def __init__(
self,
options: WatchOptions,
socket_path: Path | None = None,
) -> None:
"""Initialize control client with explicit socket path or runtime options."""
self._options = options
resolved_socket_path = socket_path or self._options.socket_path
self._socket_path: Path = resolved_socket_path
async def _send_async(self, cmd: JSONDict) -> JSONDict:
"""Send one JSON command to control server and return JSON response."""
if not self._socket_path.exists():
return {"ok": False, "error": "No watch session running."}
try:
reader, writer = await asyncio.open_unix_connection(str(self._socket_path))
except Exception:
return {"ok": False, "error": "No watch session running."}
writer.write((json.dumps(cmd) + "\n").encode("utf-8"))
await writer.drain()
line = await reader.readline()
writer.close()
await writer.wait_closed()
if not line:
return {"ok": False, "error": "Empty response."}
return json.loads(line.decode("utf-8"))
def send(self, cmd: JSONDict) -> JSONDict:
"""Synchronous wrapper around async control request."""
return asyncio.run(self._send_async(cmd))
def parse_delta(raw: str) -> tuple[bool, int | None, str | None]:
"""Parse signed millisecond offset delta string for ctl offset command."""
value = raw.strip()
try:
if value.startswith("+"):
return True, int(value[1:]), None
if value.startswith("-"):
return True, -int(value[1:]), None
return True, int(value), None
except ValueError:
return False, None, f"Invalid offset delta: {raw}"
+80
View File
@@ -0,0 +1,80 @@
"""Debounced lyric fetch orchestration for watch session."""
import asyncio
from typing import Awaitable, Callable, Optional
from ..lrc import LRCData
from ..models import TrackMeta
from .options import WatchOptions
class LyricFetcher:
"""Debounces track updates and runs at most one lyric fetch task at a time."""
_options: WatchOptions
_fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]]
_on_fetching: Callable[[], Awaitable[None] | None]
_on_result: Callable[[Optional[LRCData]], Awaitable[None] | None]
_debounce_task: asyncio.Task | None
_fetch_task: asyncio.Task | None
_pending_track: TrackMeta | None
def __init__(
self,
fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]],
on_fetching: Callable[[], Awaitable[None] | None],
on_result: Callable[[Optional[LRCData]], Awaitable[None] | None],
options: WatchOptions,
) -> None:
"""Initialize fetch callbacks and runtime options."""
self._options = options
self._fetch_func = fetch_func
self._on_fetching = on_fetching
self._on_result = on_result
self._debounce_task: asyncio.Task | None = None
self._fetch_task: asyncio.Task | None = None
self._pending_track: TrackMeta | None = None
async def stop(self) -> None:
"""Cancel and await all in-flight debounce/fetch tasks."""
for task in (self._debounce_task, self._fetch_task):
if task is not None:
task.cancel()
await asyncio.gather(
*[t for t in (self._debounce_task, self._fetch_task) if t is not None],
return_exceptions=True,
)
self._debounce_task = None
self._fetch_task = None
def request(self, track: TrackMeta) -> None:
"""Request lyrics for track with debounce collapsing."""
self._pending_track = track
if self._debounce_task is not None:
self._debounce_task.cancel()
self._debounce_task = asyncio.create_task(self._debounce_then_fetch())
async def _debounce_then_fetch(self) -> None:
"""Wait debounce window then start a fresh fetch task for latest pending track."""
await asyncio.sleep(self._options.debounce_ms / 1000.0)
track = self._pending_track
if track is None:
return
if self._fetch_task is not None:
self._fetch_task.cancel()
await asyncio.gather(self._fetch_task, return_exceptions=True)
self._fetch_task = asyncio.create_task(self._do_fetch(track))
async def _do_fetch(self, track: TrackMeta) -> None:
"""Execute fetch lifecycle callbacks and fetch lyrics for a track."""
fetching_callback_result = self._on_fetching()
if asyncio.iscoroutine(fetching_callback_result):
await fetching_callback_result
lyrics = await self._fetch_func(track)
result_callback_result = self._on_result(lyrics)
if asyncio.iscoroutine(result_callback_result):
await result_callback_result
+16
View File
@@ -0,0 +1,16 @@
"""Watch runtime options passed from CLI composition root."""
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class WatchOptions:
"""Runtime settings used by watch components."""
preferred_player: str
player_blacklist: tuple[str, ...]
debounce_ms: int
position_tick_ms: int
calibration_interval_s: float
socket_path: Path
+411
View File
@@ -0,0 +1,411 @@
"""Player discovery, state monitoring, and active-player selection for watch mode."""
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
from dbus_next.aio.message_bus import MessageBus
from dbus_next.constants import BusType
from dbus_next.message import Message
from loguru import logger
from ..models import TrackMeta
from .options import WatchOptions
def _variant_value(item: object) -> object | None:
"""Extract .value from DBus variant-like objects when available."""
if hasattr(item, "value"):
return getattr(item, "value")
return None
@dataclass(slots=True)
class PlayerState:
"""Current observable state for one MPRIS player."""
bus_name: str
status: str
track: Optional[TrackMeta]
@dataclass(frozen=True, slots=True)
class PlayerTarget:
"""Constraint for choosing which players are visible to watch."""
hint: Optional[str] = None
player_blacklist: tuple[str, ...] = ()
def validation_error(self) -> str | None:
"""Return validation message when hint conflicts with blacklist, else None."""
normalized_hint = self.normalized_hint
if not normalized_hint:
return None
for blocked in self.player_blacklist:
normalized_blocked = blocked.strip().lower()
if not normalized_blocked:
continue
if _keyword_match(normalized_hint, normalized_blocked) or _keyword_match(
normalized_blocked, normalized_hint
):
return (
f"Requested player '{self.hint}' is blocked by "
f"PLAYER_BLACKLIST entry '{blocked}'."
)
return None
@property
def normalized_hint(self) -> str:
"""Return normalized lowercase player hint string."""
return (self.hint or "").strip().lower()
def allows(self, bus_name: str) -> bool:
"""Return whether given MPRIS bus name passes this target constraint."""
normalized_hint = self.normalized_hint
if not normalized_hint:
return True
return _keyword_match(bus_name, normalized_hint)
def _keyword_match(text: str, keyword: str) -> bool:
"""Return True when keyword exists in text, case-insensitively."""
return keyword.strip().lower() in text.lower()
class PlayerMonitor:
"""Tracks MPRIS players and forwards signal-driven state updates to session callbacks."""
_options: WatchOptions
_on_players_changed: Callable[[], None]
_on_seeked: Callable[[str, int], None]
_on_playback_status: Callable[[str, str], None]
_target: PlayerTarget
players: dict[str, PlayerState]
_bus: MessageBus | None
_props_cache: dict[str, object]
def __init__(
self,
on_players_changed: Callable[[], None],
on_seeked: Callable[[str, int], None],
on_playback_status: Callable[[str, str], None],
options: WatchOptions,
target: Optional[PlayerTarget] = None,
) -> None:
"""Initialize monitor callbacks, runtime options, and player target filter."""
self._options = options
self._on_players_changed = on_players_changed
self._on_seeked = on_seeked
self._on_playback_status = on_playback_status
self._target = target or PlayerTarget(
player_blacklist=self._options.player_blacklist
)
self.players: dict[str, PlayerState] = {}
self._bus: MessageBus | None = None
self._props_cache: dict[str, object] = {}
async def start(self) -> None:
"""Start DBus monitoring and populate initial player snapshot."""
self._bus = await MessageBus(bus_type=BusType.SESSION).connect()
self._bus.add_message_handler(self._on_message)
await self._add_match_rules()
await self.refresh()
async def close(self) -> None:
"""Stop DBus monitoring and close bus connection."""
self._props_cache.clear()
if self._bus:
self._bus.disconnect()
self._bus = None
async def _get_player_props(self, bus_name: str) -> object | None:
"""Return cached DBus Properties interface for player, creating it if missing."""
if not self._bus:
return None
if bus_name in self._props_cache:
return self._props_cache[bus_name]
try:
introspection = await self._bus.introspect(
bus_name, "/org/mpris/MediaPlayer2"
)
proxy = self._bus.get_proxy_object(
bus_name, "/org/mpris/MediaPlayer2", introspection
)
props = proxy.get_interface("org.freedesktop.DBus.Properties")
self._props_cache[bus_name] = props
return props
except Exception as e:
logger.debug(f"Failed to prepare DBus props for {bus_name}: {e}")
self._props_cache.pop(bus_name, None)
return None
async def _add_match_rules(self) -> None:
"""Register signal subscriptions needed by monitor."""
if not self._bus:
return
rules = [
"type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged'",
"type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'",
"type='signal',interface='org.mpris.MediaPlayer2.Player',member='Seeked'",
]
for rule in rules:
try:
await self._bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="AddMatch",
signature="s",
body=[rule],
)
)
except Exception as e:
logger.debug(f"Failed to add DBus match rule {rule}: {e}")
async def _list_mpris_players(self) -> list[str]:
"""List visible MPRIS players after applying blacklist and target filter."""
if not self._bus:
return []
try:
reply = await self._bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="ListNames",
)
)
if not reply or not reply.body:
return []
out: list[str] = []
for name in reply.body[0]:
if not name.startswith("org.mpris.MediaPlayer2."):
continue
if any(
x.lower() in name.lower() for x in self._options.player_blacklist
):
continue
if not self._target.allows(name):
continue
out.append(name)
return out
except Exception as e:
logger.debug(f"Failed to list mpris players: {e}")
return []
async def _fetch_player_state(self, bus_name: str) -> Optional[PlayerState]:
"""Read current playback status and metadata from one player service."""
props = await self._get_player_props(bus_name)
if props is None:
return None
try:
status_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "PlaybackStatus"
)
metadata_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "Metadata"
)
status = status_var.value if status_var else "Stopped"
track = self._track_from_metadata(
metadata_var.value if metadata_var else {}
)
return PlayerState(bus_name=bus_name, status=status, track=track)
except Exception as e:
logger.debug(f"Failed to read state for {bus_name}: {e}")
self._props_cache.pop(bus_name, None)
return None
def _track_from_metadata(self, metadata: dict[str, object]) -> Optional[TrackMeta]:
"""Build TrackMeta object from MPRIS metadata map."""
if not metadata:
return None
trackid = metadata.get("mpris:trackid")
if trackid is not None:
trackid = _variant_value(trackid)
if isinstance(trackid, str) and trackid.startswith("spotify:track:"):
trackid = trackid.removeprefix("spotify:track:")
elif isinstance(trackid, str) and trackid.startswith("/com/spotify/track/"):
trackid = trackid.removeprefix("/com/spotify/track/")
elif not isinstance(trackid, str):
trackid = None
length = metadata.get("mpris:length")
length_ms = None
length_value = _variant_value(length) if length is not None else None
if isinstance(length_value, int):
length_ms = length_value // 1000
artist = metadata.get("xesam:artist")
artist_v = None
artist_value = _variant_value(artist) if artist is not None else None
if isinstance(artist_value, list) and artist_value:
artist_v = artist_value[0]
title = metadata.get("xesam:title")
album = metadata.get("xesam:album")
url = metadata.get("xesam:url")
title_value = _variant_value(title) if title is not None else None
album_value = _variant_value(album) if album is not None else None
url_value = _variant_value(url) if url is not None else None
return TrackMeta(
trackid=trackid,
length=length_ms,
album=album_value if isinstance(album_value, str) else None,
artist=artist_v,
title=title_value if isinstance(title_value, str) else None,
url=url_value if isinstance(url_value, str) else None,
)
async def refresh(self) -> None:
"""Refresh full player snapshot and notify session when visible set changes."""
players = await self._list_mpris_players()
updated: dict[str, PlayerState] = {}
for bus_name in players:
st = await self._fetch_player_state(bus_name)
if st is not None:
updated[bus_name] = st
before = set(self.players.keys())
after = set(updated.keys())
added = sorted(after - before)
removed = sorted(before - after)
for bus_name in removed:
self._props_cache.pop(bus_name, None)
self.players = updated
if added or removed:
logger.info(
"MPRIS players updated: added={}, removed={}",
added,
removed,
)
self._on_players_changed()
async def _resolve_well_known_name(self, unique_sender: str) -> str | None:
"""Map a DBus unique sender (e.g. :1.42) to a tracked MPRIS bus name."""
if unique_sender in self.players:
return unique_sender
if not self._bus:
return None
for bus_name in self.players:
try:
reply = await self._bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="GetNameOwner",
signature="s",
body=[bus_name],
)
)
if reply and reply.body and str(reply.body[0]) == unique_sender:
return bus_name
except Exception:
continue
return None
async def _handle_seeked_signal(self, sender: str, position_ms: int) -> None:
"""Route Seeked signal to session using well-known bus name when possible."""
bus_name = await self._resolve_well_known_name(sender)
if bus_name is not None:
self._on_seeked(bus_name, position_ms)
return
# If we cannot map sender reliably, force a state refresh to converge.
await self.refresh()
def _on_message(self, message: Message) -> bool:
"""Low-level DBus signal handler for player lifecycle/status/seek events."""
try:
if (
message.interface == "org.freedesktop.DBus"
and message.member == "NameOwnerChanged"
):
if message.body and str(message.body[0]).startswith(
"org.mpris.MediaPlayer2."
):
asyncio.create_task(self.refresh())
return False
if (
message.interface == "org.freedesktop.DBus.Properties"
and message.member == "PropertiesChanged"
):
# Message.sender is a DBus unique name, so match by path+iface.
path_ok = message.path == "/org/mpris/MediaPlayer2"
iface = message.body[0] if message.body else None
if path_ok and iface == "org.mpris.MediaPlayer2.Player":
asyncio.create_task(self.refresh())
return False
if (
message.interface == "org.mpris.MediaPlayer2.Player"
and message.member == "Seeked"
):
sender = message.sender or ""
if sender and message.body:
position_us = int(message.body[0])
asyncio.create_task(
self._handle_seeked_signal(
sender,
max(0, position_us // 1000),
)
)
return False
except Exception as e:
logger.debug(f"PlayerMonitor signal handling error: {e}")
return False
async def get_position_ms(self, bus_name: str) -> Optional[int]:
"""Read player-reported position in milliseconds."""
props = await self._get_player_props(bus_name)
if props is None:
return None
try:
position_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "Position"
)
if position_var is None:
return None
return max(0, int(position_var.value) // 1000)
except Exception as e:
logger.debug(f"Failed to read position from {bus_name}: {e}")
self._props_cache.pop(bus_name, None)
return None
class ActivePlayerSelector:
@staticmethod
def select(
players: dict[str, PlayerState],
last_active: str | None,
options: WatchOptions,
) -> str | None:
"""Select active player by playing state, preferred keyword, and continuity."""
if not players:
return None
playing = [name for name, st in players.items() if st.status == "Playing"]
if len(playing) == 1:
return playing[0]
preferred = options.preferred_player.lower().strip()
candidates = playing if playing else list(players.keys())
if preferred:
for name in candidates:
if preferred in name.lower():
return name
if last_active and last_active in players:
return last_active
return candidates[0] if candidates else None
+403
View File
@@ -0,0 +1,403 @@
"""Watch orchestration with explicit MVVM role boundaries.
- Model: WatchModel stores domain state.
- ViewModel: WatchViewModel projects model to output-facing state/signature.
- Coordinator: WatchCoordinator wires services and drives async workflows.
"""
import asyncio
from dataclasses import asdict
from typing import Optional, Protocol
from loguru import logger
from ..fetchers import FetcherMethodType
from ..lrc import LRCData
from ..models import LyricResult
from ..models import TrackMeta
from .control import ControlServer
from .fetcher import LyricFetcher
from .options import WatchOptions
from .view import BaseOutput, LyricView, WatchState
from .player import ActivePlayerSelector, PlayerMonitor, PlayerTarget
from .tracker import PositionTracker
class FetchManager(Protocol):
"""Protocol for lyric fetch manager consumed by watch session."""
def fetch_for_track(
self,
track: TrackMeta,
force_method: FetcherMethodType | None = None,
bypass_cache: bool = False,
allow_unsynced: bool = False,
) -> Optional[LyricResult]:
"""Fetch lyrics for one track."""
class WatchModel:
"""Model layer that owns watch state and lyric timeline representation."""
offset_ms: int
active_player: str | None
active_track_key: str | None
status: str
lyrics: LyricView | None
def __init__(self) -> None:
self.offset_ms = 0
self.active_player: str | None = None
self.active_track_key: str | None = None
self.status: str = "idle"
self.lyrics: LyricView | None = None
def set_lyrics(self, lyrics: LRCData | None) -> None:
"""Update lyrics and rebuild projection once per lyric object change."""
if lyrics is None:
self.lyrics = None
return
self.lyrics = LyricView.from_lrc(lyrics)
def state_signature(self, track: TrackMeta | None, position_ms: int) -> tuple:
"""Build dedupe signature from model state and current lyric cursor."""
track_key = (
track.trackid
if track and track.trackid
else track.display_name()
if track
else None
)
if self.status != "ok" or self.lyrics is None:
return ("status", self.status, self.active_player, track_key)
at_ms = position_ms + self.offset_ms
cursor = self.lyrics.signature_cursor(at_ms)
return ("lyrics", self.active_player, track_key, cursor)
class WatchViewModel:
"""ViewModel that projects WatchModel into view-consumable snapshots."""
_model: WatchModel
def __init__(self, model: WatchModel) -> None:
self._model = model
def signature(self, track: TrackMeta | None, position_ms: int) -> tuple:
"""Build dedupe signature for current projected state."""
return self._model.state_signature(track, position_ms)
def state(self, track: TrackMeta | None, position_ms: int) -> WatchState:
"""Project model values into immutable WatchState payload."""
return WatchState(
track=track,
lyrics=self._model.lyrics,
position_ms=position_ms,
offset_ms=self._model.offset_ms,
status=self._model.status, # type: ignore[arg-type]
)
class WatchCoordinator:
"""Application/service orchestration layer for watch runtime."""
_manager: FetchManager
_output: BaseOutput
_options: WatchOptions
_model: WatchModel
_view_model: WatchViewModel
_player_hint: str | None
_last_emit_signature: tuple | None
_target: PlayerTarget
_control: ControlServer
_player_monitor: PlayerMonitor
_tracker: PositionTracker
_fetcher: LyricFetcher
_emit_scheduled: bool
_calibration_task: asyncio.Task | None
def __init__(
self,
manager: FetchManager,
output: BaseOutput,
player_hint: str | None,
options: WatchOptions,
) -> None:
self._manager = manager
self._output = output
self._options = options
self._model = WatchModel()
self._view_model = WatchViewModel(self._model)
self._player_hint = player_hint
self._last_emit_signature: tuple | None = None
self._emit_scheduled = False
self._calibration_task = None
self._target = PlayerTarget(
hint=player_hint,
player_blacklist=self._options.player_blacklist,
)
self._control = ControlServer(options=self._options)
self._player_monitor = PlayerMonitor(
on_players_changed=self._on_player_change,
on_seeked=self._on_seeked,
on_playback_status=self._on_playback_status,
options=self._options,
target=self._target,
)
self._tracker = PositionTracker(
poll_position_ms=self._player_monitor.get_position_ms,
options=self._options,
on_tick=self._on_tracker_tick,
)
self._fetcher = LyricFetcher(
fetch_func=self._fetch_lyrics,
on_fetching=self._on_fetching,
on_result=self._on_lyrics_update,
options=self._options,
)
async def run(self) -> bool:
"""Run watch workflow and return success flag."""
target_issue = self._target.validation_error()
if target_issue:
logger.error(target_issue)
return False
logger.info(
"watch session starting (player filter: {})",
self._player_hint or "<none>",
)
if not await self._control.start(self):
return False
try:
await self._player_monitor.start()
await self._tracker.start()
self._calibration_task = asyncio.create_task(self._calibration_loop())
self._schedule_emit()
await asyncio.Event().wait()
return True
except asyncio.CancelledError:
return True
except Exception as exc:
logger.exception("watch runtime error: {}", exc)
return False
finally:
logger.info("watch session stopping")
if self._calibration_task is not None:
self._calibration_task.cancel()
await asyncio.gather(self._calibration_task, return_exceptions=True)
self._calibration_task = None
await self._fetcher.stop()
await self._tracker.stop()
await self._player_monitor.close()
await self._control.stop()
async def _calibration_loop(self) -> None:
"""Periodically refresh full MPRIS snapshot as fallback calibration."""
interval = max(0.1, self._options.calibration_interval_s)
while True:
await asyncio.sleep(interval)
try:
await self._player_monitor.refresh()
except asyncio.CancelledError:
raise
except Exception as exc:
logger.debug("mpris calibration refresh failed: {}", exc)
def _active_track(self) -> TrackMeta | None:
"""Return active track metadata from selected player."""
player = self._player_monitor.players.get(self._model.active_player or "")
return player.track if player else None
def _request_fetch_for_active_track(self, reason: str) -> bool:
"""Trigger lyric fetch for active track when needed."""
track = self._active_track()
if track is None:
return False
if self._model.lyrics is not None:
return False
if self._model.status == "fetching":
return False
logger.info("fetching lyrics for track ({}): {}", reason, track.display_name())
self._fetcher.request(track)
return True
async def _fetch_lyrics(self, track: TrackMeta) -> Optional[LRCData]:
"""Fetch lyrics in worker thread."""
result = await asyncio.to_thread(
self._manager.fetch_for_track,
track,
None,
False,
True,
)
if result and result.lyrics:
return result.lyrics
return None
def _on_player_change(self) -> None:
"""React to monitor player snapshot change."""
prev_player = self._model.active_player
prev_track_key = self._model.active_track_key
selected = ActivePlayerSelector.select(
self._player_monitor.players,
self._model.active_player,
self._options,
)
self._model.active_player = selected
if selected != prev_player:
logger.info(
"active player changed: {} -> {}",
prev_player or "<none>",
selected or "<none>",
)
if selected is None:
self._model.status = "idle"
self._model.active_track_key = None
self._model.set_lyrics(None)
self._schedule_emit()
return
state = self._player_monitor.players.get(selected)
if state is None:
self._model.status = "idle"
self._model.active_track_key = None
self._model.set_lyrics(None)
self._schedule_emit()
return
track = state.track
track_key = (
track.trackid
if track and track.trackid
else track.display_name()
if track
else None
)
track_changed = track_key != prev_track_key
player_changed = selected != prev_player
if track_changed or player_changed:
self._model.set_lyrics(None)
self._model.active_track_key = track_key
asyncio.create_task(
self._tracker.set_active_player(
selected,
state.status,
track_key,
)
)
if state.status != "Playing":
self._model.status = "paused"
self._schedule_emit()
return
started_fetch = False
if track is not None and (
player_changed or track_changed or self._model.lyrics is None
):
started_fetch = self._request_fetch_for_active_track("track-changed")
if self._model.lyrics is not None:
self._model.status = "ok"
elif started_fetch:
self._model.status = "fetching"
else:
self._model.status = "paused"
self._schedule_emit()
def _on_seeked(self, bus_name: str, position_ms: int) -> None:
"""Forward seek event to tracker."""
asyncio.create_task(self._tracker.on_seeked(bus_name, position_ms))
def _on_playback_status(self, bus_name: str, status: str) -> None:
"""React to playback status change and tracker sync."""
if bus_name == self._model.active_player:
if status == "Playing":
started_fetch = self._request_fetch_for_active_track("resume-playing")
if self._model.lyrics is not None:
self._model.status = "ok"
elif started_fetch:
self._model.status = "fetching"
else:
self._model.status = "paused"
else:
self._model.status = "paused"
self._schedule_emit()
asyncio.create_task(self._tracker.on_playback_status(bus_name, status))
def _on_tracker_tick(self) -> None:
"""Emit updates from tracker tick only while lyrics are actively rendering."""
if self._model.status == "ok":
self._schedule_emit()
def _schedule_emit(self) -> None:
"""Coalesce frequent events into at most one in-flight emit task."""
if self._emit_scheduled:
return
self._emit_scheduled = True
asyncio.create_task(self._run_scheduled_emit())
async def _run_scheduled_emit(self) -> None:
"""Run one coalesced emit and release scheduler gate."""
try:
await self._emit_state()
finally:
self._emit_scheduled = False
async def _on_fetching(self) -> None:
"""Mark model as fetching and emit state."""
self._model.status = "fetching"
await self._emit_state()
async def _on_lyrics_update(self, lyrics: Optional[LRCData]) -> None:
"""Update model with fetched lyrics and emit state."""
self._model.set_lyrics(lyrics)
self._model.status = "ok" if lyrics is not None else "no_lyrics"
logger.info(
"lyrics update result: {}",
"found" if lyrics is not None else "not found",
)
await self._emit_state()
async def _emit_state(self) -> None:
"""Emit output state only when semantic signature changes."""
player = self._player_monitor.players.get(self._model.active_player or "")
track = player.track if player else None
position = await self._tracker.get_position_ms()
signature = self._view_model.signature(track, position)
if signature == self._last_emit_signature:
return
self._last_emit_signature = signature
state = self._view_model.state(track, position)
await self._output.on_state(state)
def handle_offset(self, delta: int) -> dict:
"""Apply offset update requested by control channel."""
self._model.offset_ms += delta
return {"ok": True, "offset_ms": self._model.offset_ms}
def handle_status(self) -> dict:
"""Return status payload for control channel."""
player = self._player_monitor.players.get(self._model.active_player or "")
track = asdict(player.track) if player and player.track else None
return {
"ok": True,
"offset_ms": self._model.offset_ms,
"player": self._model.active_player,
"track": track,
"position_ms": self._tracker.peek_position_ms(),
"lyrics_status": self._model.status,
}
+143
View File
@@ -0,0 +1,143 @@
"""Playback position tracking utilities for watch mode."""
import asyncio
import time
from typing import Awaitable, Callable, Optional
from .options import WatchOptions
class PositionTracker:
"""Maintains an estimated playback position from seek/status events plus local clock."""
_options: WatchOptions
_poll_position_ms: Callable[[str], Awaitable[Optional[int]]]
_active_player: str | None
_is_playing: bool
_track_key: str | None
_position_ms: int
_last_tick: float
_fast_task: asyncio.Task | None
_on_tick: Callable[[], None] | None
_lock: asyncio.Lock
def __init__(
self,
poll_position_ms: Callable[[str], Awaitable[Optional[int]]],
options: WatchOptions,
on_tick: Callable[[], None] | None = None,
) -> None:
"""Initialize tracker with position polling callback and runtime options."""
self._options = options
self._poll_position_ms = poll_position_ms
self._on_tick = on_tick
self._active_player: str | None = None
self._is_playing = False
self._track_key: str | None = None
self._position_ms = 0
self._last_tick = time.monotonic()
self._fast_task: asyncio.Task | None = None
self._lock = asyncio.Lock()
async def start(self) -> None:
"""Start local monotonic position ticking task."""
self._last_tick = time.monotonic()
self._fast_task = asyncio.create_task(self._fast_loop())
async def stop(self) -> None:
"""Stop tracker tasks and await clean cancellation."""
tasks = [t for t in (self._fast_task,) if t is not None]
for task in tasks:
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._fast_task = None
async def set_active_player(
self,
bus_name: str | None,
playback_status: str,
track_key: str | None,
) -> None:
"""Switch active source and calibrate position once when entering a new playing track."""
should_calibrate_now = False
async with self._lock:
player_changed = self._active_player != bus_name
track_changed = self._track_key != track_key
was_playing = self._is_playing
self._active_player = bus_name
self._is_playing = playback_status == "Playing"
status_changed_to_playing = self._is_playing and not was_playing
if player_changed or track_changed:
self._position_ms = 0
should_calibrate_now = (
self._is_playing
and bool(self._active_player)
and (player_changed or track_changed or status_changed_to_playing)
)
self._track_key = track_key
self._last_tick = time.monotonic()
if should_calibrate_now and self._active_player:
await self._calibrate_once(self._active_player)
async def on_seeked(self, bus_name: str, position_ms: int) -> None:
"""Apply explicit seek position update for active player."""
async with self._lock:
if bus_name != self._active_player:
return
self._position_ms = max(0, position_ms)
self._last_tick = time.monotonic()
async def on_playback_status(self, bus_name: str, playback_status: str) -> None:
"""Update playing state and calibrate once on paused-to-playing transition."""
should_calibrate_now = False
async with self._lock:
if bus_name != self._active_player:
return
was_playing = self._is_playing
self._is_playing = playback_status == "Playing"
should_calibrate_now = self._is_playing and not was_playing
self._last_tick = time.monotonic()
if should_calibrate_now:
await self._calibrate_once(bus_name)
async def _fast_loop(self) -> None:
"""Advance position by monotonic clock while active player is playing."""
interval = self._options.position_tick_ms / 1000.0
while True:
await asyncio.sleep(interval)
should_notify = False
async with self._lock:
now = time.monotonic()
if self._is_playing and self._active_player:
delta_ms = int((now - self._last_tick) * 1000)
if delta_ms > 0:
self._position_ms += delta_ms
should_notify = True
self._last_tick = now
if should_notify and self._on_tick is not None:
self._on_tick()
async def _calibrate_once(self, bus_name: str) -> None:
"""Poll player-reported position once and synchronize local tracker state."""
polled = await self._poll_position_ms(bus_name)
if polled is None:
return
async with self._lock:
if bus_name != self._active_player:
return
# Drift correction is signal-assisted; polling is fallback.
self._position_ms = max(0, polled)
self._last_tick = time.monotonic()
async def get_position_ms(self) -> int:
"""Return current tracked position in milliseconds."""
async with self._lock:
return max(0, int(self._position_ms))
def peek_position_ms(self) -> int:
"""Return current tracked position without awaiting lock (best-effort snapshot)."""
return max(0, int(self._position_ms))
+80
View File
@@ -0,0 +1,80 @@
"""Output abstraction types for watch mode rendering."""
from abc import ABC, abstractmethod
from bisect import bisect_right
from dataclasses import dataclass
from typing import Literal, Optional
from ...lrc import LRCData, LyricLine
from ...models import TrackMeta
@dataclass(slots=True, frozen=True)
class LyricView:
"""View-ready immutable lyric data projected from one normalized LRC object."""
normalized: LRCData
lines: tuple[str, ...]
timed_line_entries: tuple[tuple[int, int], ...]
timestamps: tuple[int, ...]
@staticmethod
def from_lrc(lyrics: LRCData) -> "LyricView":
"""Build a view projection once from normalized lyrics."""
normalized = lyrics.normalize()
lines: list[str] = []
entries: list[tuple[int, int]] = []
line_index = 0
for line in normalized.lines:
if not isinstance(line, LyricLine):
continue
text = line.text
lines.append(text)
timestamp = line.line_times_ms[0] if line.line_times_ms else 0
entries.append((max(0, timestamp), line_index))
line_index += 1
timestamps = tuple(timestamp for timestamp, _ in entries)
return LyricView(
normalized=normalized,
lines=tuple(lines),
timed_line_entries=tuple(entries),
timestamps=timestamps,
)
def signature_cursor(self, at_ms: int) -> tuple:
"""Build a stable cursor signature for dedupe decisions."""
if not self.timed_line_entries:
return ("plain", self.lines)
first_ts = self.timed_line_entries[0][0]
if at_ms < first_ts:
return ("before_first", first_ts)
idx = bisect_right(self.timestamps, at_ms) - 1
if idx < 0:
idx = 0
ts, line_idx = self.timed_line_entries[idx]
text = self.lines[line_idx] if line_idx < len(self.lines) else ""
return ("ok", idx, ts, text)
@dataclass(slots=True)
class WatchState:
"""Immutable snapshot payload delivered from session to output implementations."""
track: Optional[TrackMeta]
lyrics: Optional[LyricView]
position_ms: int
offset_ms: int
status: Literal["fetching", "ok", "no_lyrics", "paused", "idle"]
class BaseOutput(ABC):
@abstractmethod
async def on_state(self, state: WatchState) -> None:
"""Render or deliver one watch state frame."""
...
+85
View File
@@ -0,0 +1,85 @@
"""Pipe output implementation for watch mode."""
from bisect import bisect_right
from dataclasses import dataclass
import sys
from . import BaseOutput, WatchState
@dataclass(slots=True)
class PipeOutput(BaseOutput):
"""Render a fixed lyric context window to stdout for streaming/pipe usage."""
before: int = 0
after: int = 0
def _window_size(self) -> int:
"""Return rendered lyric window size."""
return self.before + 1 + self.after
def _render_status(self, message: str) -> list[str]:
"""Render centered status line in fixed-size window."""
lines = [""] * self._window_size()
lines[self.before] = message
return lines
def _render_lyrics(self, state: WatchState) -> list[str]:
"""Render context lines centered on current timed lyric entry."""
if state.lyrics is None:
return self._render_status("[no lyrics]")
all_lines = state.lyrics.lines
if not all_lines:
return self._render_status("[no lyrics]")
entries = state.lyrics.timed_line_entries
effective_ms = state.position_ms + state.offset_ms
current_line_idx: int | None
if entries and effective_ms < entries[0][0]:
# Before first timestamp, current lyric is empty and after-window shows upcoming lines.
current_line_idx = None
else:
if not entries:
current_line_idx = 0
else:
current_entry_idx = (
bisect_right(state.lyrics.timestamps, effective_ms) - 1
)
if current_entry_idx < 0:
current_entry_idx = 0
current_line_idx = entries[current_entry_idx][1]
out: list[str] = []
for rel in range(-self.before, self.after + 1):
if current_line_idx is None:
if rel <= 0:
out.append("")
continue
line_idx = rel - 1
else:
line_idx = current_line_idx + rel
if 0 <= line_idx < len(all_lines):
out.append(all_lines[line_idx])
else:
out.append("")
return out
async def on_state(self, state: WatchState) -> None:
"""Render and flush one frame for the latest watch state."""
if state.status == "fetching":
lines = self._render_status("[fetching...]")
elif state.status == "no_lyrics":
lines = self._render_status("[no lyrics]")
elif state.status == "paused":
lines = self._render_status("[paused]")
elif state.status == "idle":
lines = self._render_status("[idle]")
else:
lines = self._render_lyrics(state)
for line in lines:
print(line)
sys.stdout.flush()
+462
View File
@@ -0,0 +1,462 @@
from __future__ import annotations
import asyncio
from pathlib import Path
from lrx_cli.lrc import LRCData
from lrx_cli.models import TrackMeta
from lrx_cli.watch.control import ControlClient, ControlServer, parse_delta
from lrx_cli.watch.view import BaseOutput, LyricView, WatchState
from lrx_cli.watch.view.pipe import PipeOutput
from lrx_cli.watch.player import ActivePlayerSelector, PlayerState, PlayerTarget
from lrx_cli.watch.fetcher import LyricFetcher
from lrx_cli.watch.options import WatchOptions
from lrx_cli.watch.tracker import PositionTracker
from lrx_cli.watch.session import WatchCoordinator
TEST_WATCH_OPTIONS = WatchOptions(
preferred_player="spotify",
player_blacklist=(),
debounce_ms=400,
position_tick_ms=50,
calibration_interval_s=3.0,
socket_path=Path("/tmp/lrx-watch-test.sock"),
)
def test_parse_delta_supports_plus_minus_and_reset() -> None:
assert parse_delta("+200") == (True, 200, None)
assert parse_delta("-150") == (True, -150, None)
assert parse_delta("0") == (True, 0, None)
def test_player_target_allows_all_when_hint_empty() -> None:
target = PlayerTarget()
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
assert target.allows("org.mpris.MediaPlayer2.mpd") is True
def test_player_target_filters_by_case_insensitive_substring() -> None:
target = PlayerTarget("Spot")
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
assert target.allows("org.mpris.MediaPlayer2.mpd") is False
def test_player_target_reports_blacklisted_hint() -> None:
target = PlayerTarget("spot", player_blacklist=("spotify",))
assert target.validation_error() is not None
def test_active_player_selector_prefers_single_playing() -> None:
players = {
"org.mpris.MediaPlayer2.foo": PlayerState(
bus_name="org.mpris.MediaPlayer2.foo",
status="Paused",
track=TrackMeta(title="A"),
),
"org.mpris.MediaPlayer2.bar": PlayerState(
bus_name="org.mpris.MediaPlayer2.bar",
status="Playing",
track=TrackMeta(title="B"),
),
}
assert (
ActivePlayerSelector.select(players, None, TEST_WATCH_OPTIONS)
== "org.mpris.MediaPlayer2.bar"
)
def test_active_player_selector_uses_last_active_when_no_playing() -> None:
players = {
"org.mpris.MediaPlayer2.foo": PlayerState(
bus_name="org.mpris.MediaPlayer2.foo",
status="Paused",
track=TrackMeta(title="A"),
),
"org.mpris.MediaPlayer2.bar": PlayerState(
bus_name="org.mpris.MediaPlayer2.bar",
status="Stopped",
track=TrackMeta(title="B"),
),
}
assert (
ActivePlayerSelector.select(
players,
"org.mpris.MediaPlayer2.bar",
TEST_WATCH_OPTIONS,
)
== "org.mpris.MediaPlayer2.bar"
)
def test_position_tracker_seeked_calibrates_immediately() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 1200
tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
await tracker.on_seeked("org.mpris.MediaPlayer2.foo", 3500)
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 3500
asyncio.run(_run())
def test_position_tracker_playback_status_pause_stops_fast_growth() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 0
tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
await asyncio.sleep(0.08)
before = await tracker.get_position_ms()
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Paused")
await asyncio.sleep(0.08)
after = await tracker.get_position_ms()
await tracker.stop()
assert before > 0
assert after - before < 20
asyncio.run(_run())
def test_position_tracker_playback_status_playing_calibrates_once() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 50000
tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
)
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Playing")
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 50000
asyncio.run(_run())
def test_position_tracker_set_active_player_playing_calibrates_on_resume() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 42000
tracker = PositionTracker(_poll, TEST_WATCH_OPTIONS)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
)
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 42000
asyncio.run(_run())
def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
async def _run() -> None:
class _Session:
def __init__(self):
self.offset = 0
def handle_offset(self, delta: int) -> dict:
self.offset += delta
return {"ok": True, "offset_ms": self.offset}
def handle_status(self) -> dict:
return {"ok": True, "offset_ms": self.offset, "lyrics_status": "idle"}
socket_path = tmp_path / "watch.sock"
server = ControlServer(socket_path=socket_path, options=TEST_WATCH_OPTIONS)
session = _Session()
await server.start(session)
client = ControlClient(socket_path=socket_path, options=TEST_WATCH_OPTIONS)
r1 = await client._send_async({"cmd": "offset", "delta": 200})
r2 = await client._send_async({"cmd": "status"})
await server.stop()
assert r1 == {"ok": True, "offset_ms": 200}
assert r2["ok"] is True
assert r2["offset_ms"] == 200
asyncio.run(_run())
def test_pipe_output_prints_fixed_window_for_status(capsys) -> None:
output = PipeOutput(before=1, after=1)
state = WatchState(
track=None,
lyrics=None,
position_ms=0,
offset_ms=0,
status="fetching",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\n[fetching...]\n\n"
def test_pipe_output_uses_context_window_for_lyrics(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=2100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "a\nb\nc\n"
def test_pipe_output_shows_upcoming_lines_before_first_timestamp(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:02.00]a\n[00:03.00]b")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=0,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\n\na\n"
def test_pipe_output_first_line_keeps_before_region_empty(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=1100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\na\nb\n"
def test_pipe_output_last_line_keeps_after_region_empty(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=3100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "b\nc\n\n"
def test_pipe_output_repeated_text_uses_correct_timed_occurrence(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=4100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "B\nX\nC\n"
def test_session_fetches_on_resume_playing_without_lyrics() -> None:
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
class _Output(BaseOutput):
async def on_state(self, state: WatchState) -> None:
return None
class _Fetcher(LyricFetcher):
def __init__(self):
async def _fetch(_track: TrackMeta):
return None
async def _on_fetching() -> None:
return None
async def _on_result(_lyrics) -> None:
return None
super().__init__(_fetch, _on_fetching, _on_result, TEST_WATCH_OPTIONS)
self.requested = []
def request(self, track: TrackMeta) -> None:
self.requested.append(track.display_name())
session = WatchCoordinator(
_Manager(),
_Output(),
player_hint=None,
options=TEST_WATCH_OPTIONS,
)
fake_fetcher = _Fetcher()
session._fetcher = fake_fetcher
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_WATCH_OPTIONS,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
}
session._model.set_lyrics(None)
session._model.status = "paused"
session._on_playback_status(bus_name, "Playing")
await asyncio.sleep(0)
assert fake_fetcher.requested == ["Artist - Song"]
assert session._model.status == "fetching"
asyncio.run(_run())
def test_session_emit_state_only_when_lyric_cursor_changes() -> None:
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
class _Output(BaseOutput):
def __init__(self):
self.count = 0
async def on_state(self, state: WatchState) -> None:
self.count += 1
output = _Output()
session = WatchCoordinator(
_Manager(),
output,
player_hint=None,
options=TEST_WATCH_OPTIONS,
)
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_WATCH_OPTIONS,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
}
session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b"))
session._model.status = "ok"
await session._tracker.set_active_player(
bus_name,
"Playing",
"Artist - Song",
)
await session._emit_state()
await session._emit_state()
await session._tracker.on_seeked(bus_name, 3200)
await session._emit_state()
assert output.count == 2
asyncio.run(_run())
def test_session_emits_when_crossing_first_timestamp() -> None:
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
class _Output(BaseOutput):
def __init__(self):
self.count = 0
async def on_state(self, state: WatchState) -> None:
self.count += 1
output = _Output()
session = WatchCoordinator(
_Manager(),
output,
player_hint=None,
options=TEST_WATCH_OPTIONS,
)
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_WATCH_OPTIONS,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
}
session._model.set_lyrics(LRCData("[00:02.00]a\n[00:03.00]b"))
session._model.status = "ok"
await session._tracker.set_active_player(
bus_name,
"Playing",
"Artist - Song",
)
await session._emit_state()
await session._tracker.on_seeked(bus_name, 2500)
await session._emit_state()
assert output.count == 2
asyncio.run(_run())