Compare commits

..

2 Commits

Author SHA1 Message Date
Uyanide 60732f2986 feat: add watch print mode
test: refactor test_watch
style: add inline comments for watch
2026-04-10 08:23:54 +02:00
Uyanide 633983ed98 feat: watch mode fetch immediatly on track changes regardless of player status 2026-04-10 08:23:54 +02:00
15 changed files with 672 additions and 369 deletions
+2 -2
View File
@@ -143,8 +143,8 @@ socket_path = "" # Unix socket path; defaults to <cache_dir>/
Clone this repository:
```bash
git clone https://github.com/Uyanide/LRX-CLI.git
cd LRX-CLI
git clone https://github.com/Uyanide/lrx-cli.git
cd lrx-cli
```
Create a virtual environment and install dependencies (for example, using uv):
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "lrx-cli"
version = "0.7.2"
version = "0.7.3"
description = "Fetch line-synced lyrics for your music player."
readme = "README.md"
requires-python = ">=3.13"
+31 -35
View File
@@ -21,9 +21,9 @@ colorama==0.4.6 ; sys_platform == 'win32' \
# via
# loguru
# pytest
cyclopts==4.10.1 \
--hash=sha256:35f37257139380a386d9fe4475e1e7c87ca7795765ef4f31abba579fcfcb6ecd \
--hash=sha256:ad4e4bb90576412d32276b14a76f55d43353753d16217f2c3cd5bdceba7f15a0
cyclopts==4.10.2 \
--hash=sha256:a1f2d6f8f7afac9456b48f75a40b36658778ddc9c6d406b520d017ae32c990fe \
--hash=sha256:d7b950457ef2563596d56331f80cbbbf86a2772535fb8b315c4f03bc7e6127f1
# via lrx-cli
dbus-next==0.2.3 \
--hash=sha256:58948f9aff9db08316734c0be2a120f6dc502124d9642f55e90ac82ffb16a18b \
@@ -79,27 +79,23 @@ packaging==26.0 \
--hash=sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4 \
--hash=sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529
# via pytest
platformdirs==4.9.4 \
--hash=sha256:1ec356301b7dc906d83f371c8f487070e99d3ccf9e501686456394622a01a934 \
--hash=sha256:68a9a4619a666ea6439f2ff250c12a853cd1cbd5158d258bd824a7df6be2f868
platformdirs==4.9.6 \
--hash=sha256:3bfa75b0ad0db84096ae777218481852c0ebc6c727b3168c1b9e0118e458cf0a \
--hash=sha256:e61adb1d5e5cb3441b4b7710bea7e4c12250ca49439228cc1021c00dcfac0917
# via lrx-cli
pluggy==1.6.0 \
--hash=sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3 \
--hash=sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746
# via pytest
pygments==2.19.2 \
--hash=sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887 \
--hash=sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b
pygments==2.20.0 \
--hash=sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f \
--hash=sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176
# via
# pytest
# rich
pytest==9.0.2 \
--hash=sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b \
--hash=sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11
python-dotenv==1.2.2 \
--hash=sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a \
--hash=sha256:2c371a91fbd7ba082c2c1dc1f8bf89ca22564a087c2c287cd9b662adde799cf3
# via lrx-cli
pytest==9.0.3 \
--hash=sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9 \
--hash=sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c
rich==14.3.3 \
--hash=sha256:793431c1f8619afa7d3b52b2cdec859562b950ea0d4b6b505397612db8d5362d \
--hash=sha256:b8daa0b9e4eef54dd8cf7c86c03713f53241884e814f4e2f5fb342fe520f639b
@@ -110,25 +106,25 @@ rich-rst==1.3.2 \
--hash=sha256:a1196fdddf1e364b02ec68a05e8ff8f6914fee10fbca2e6b6735f166bb0da8d4 \
--hash=sha256:a99b4907cbe118cf9d18b0b44de272efa61f15117c61e39ebdc431baf5df722a
# via cyclopts
ruff==0.15.8 \
--hash=sha256:04f79eff02a72db209d47d665ba7ebcad609d8918a134f86cb13dd132159fc89 \
--hash=sha256:0f29b989a55572fb885b77464cf24af05500806ab4edf9a0fd8977f9759d85b1 \
--hash=sha256:12e617fc01a95e5821648a6df341d80456bd627bfab8a829f7cfc26a14a4b4a3 \
--hash=sha256:2033f963c43949d51e6fdccd3946633c6b37c484f5f98c3035f49c27395a8ab8 \
--hash=sha256:432701303b26416d22ba696c39f2c6f12499b89093b61360abc34bcc9bf07762 \
--hash=sha256:6ee3ae5c65a42f273f126686353f2e08ff29927b7b7e203b711514370d500de3 \
--hash=sha256:75e5cd06b1cf3f47a3996cfc999226b19aa92e7cce682dcd62f80d7035f98f49 \
--hash=sha256:8d9a5b8ea13f26ae90838afc33f91b547e61b794865374f114f349e9036835fb \
--hash=sha256:995f11f63597ee362130d1d5a327a87cb6f3f5eae3094c620bcc632329a4d26e \
--hash=sha256:ac51d486bf457cdc985a412fb1801b2dfd1bd8838372fc55de64b1510eff4bec \
--hash=sha256:bc1f0a51254ba21767bfa9a8b5013ca8149dcf38092e6a9eb704d876de94dc34 \
--hash=sha256:c2a33a529fb3cbc23a7124b5c6ff121e4d6228029cba374777bd7649cc8598b8 \
--hash=sha256:c9861eb959edab053c10ad62c278835ee69ca527b6dcd72b47d5c1e5648964f6 \
--hash=sha256:cbe05adeba76d58162762d6b239c9056f1a15a55bd4b346cfd21e26cd6ad7bc7 \
--hash=sha256:cf891fa8e3bb430c0e7fac93851a5978fc99c8fa2c053b57b118972866f8e5f2 \
--hash=sha256:d3e3d0b6ba8dca1b7ef9ab80a28e840a20070c4b62e56d675c24f366ef330570 \
--hash=sha256:d910ae974b7a06a33a057cb87d2a10792a3b2b3b35e33d2699fdf63ec8f6b17a \
--hash=sha256:fdce027ada77baa448077ccc6ebb2fa9c3c62fd110d8659d601cf2f475858d94
ruff==0.15.10 \
--hash=sha256:0744e31482f8f7d0d10a11fcbf897af272fefdfcb10f5af907b18c2813ff4d5f \
--hash=sha256:0ee3ef42dab7078bda5ff6a1bcba8539e9857deb447132ad5566a038674540d0 \
--hash=sha256:136c00ca2f47b0018b073f28cb5c1506642a830ea941a60354b0e8bc8076b151 \
--hash=sha256:28cb32d53203242d403d819fd6983152489b12e4a3ae44993543d6fe62ab42ed \
--hash=sha256:51cb8cc943e891ba99989dd92d61e29b1d231e14811db9be6440ecf25d5c1609 \
--hash=sha256:601d1610a9e1f1c2165a4f561eeaa2e2ea1e97f3287c5aa258d3dab8b57c6188 \
--hash=sha256:8154d43684e4333360fedd11aaa40b1b08a4e37d8ffa9d95fee6fa5b37b6fab1 \
--hash=sha256:83e1dd04312997c99ea6965df66a14fb4f03ba978564574ffc68b0d61fd3989e \
--hash=sha256:8ab88715f3a6deb6bde6c227f3a123410bec7b855c3ae331b4c006189e895cef \
--hash=sha256:8b80a2f3c9c8a950d6237f2ca12b206bccff626139be9fa005f14feb881a1ae8 \
--hash=sha256:93cc06a19e5155b4441dd72808fdf84290d84ad8a39ca3b0f994363ade4cebb1 \
--hash=sha256:a768ff5969b4f44c349d48edf4ab4f91eddb27fd9d77799598e130fb628aa158 \
--hash=sha256:b0c52744cf9f143a393e284125d2576140b68264a93c6716464e129a3e9adb48 \
--hash=sha256:b1e7c16ea0ff5a53b7c2df52d947e685973049be1cdfe2b59a9c43601897b22e \
--hash=sha256:d1f86e67ebfdef88e00faefa1552b5e510e1d35f3be7d423dc7e84e63788c94e \
--hash=sha256:d4272e87e801e9a27a2e8df7b21011c909d9ddd82f4f3281d269b6ba19789ca5 \
--hash=sha256:e3e53c588164dc025b671c9df2462429d60357ea91af7e92e9d56c565a9f1b07 \
--hash=sha256:e59c9bdc056a320fb9ea1700a8d591718b8faf78af065484e801258d3a76bc3f
win32-setctime==1.2.0 ; sys_platform == 'win32' \
--hash=sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390 \
--hash=sha256:ae1fdf948f5640aae05c511ade119313fb6a30d7eabe25fef9764dca5873c4c0
+32
View File
@@ -29,6 +29,7 @@ from .lrc import get_sidecar_path
from .watch import WatchCoordinator
from .watch.control import ControlClient, parse_delta
from .watch.view.pipe import PipeOutput
from .watch.view.print import PrintOutput
app = cyclopts.App(
@@ -432,6 +433,37 @@ def pipe(
logger.info("Watch stopped.")
@watch_app.command(name="print")
def watch_print(
plain: Annotated[
bool,
cyclopts.Parameter(
name="--plain",
negative="",
help="Output plain text (strips all tags). Takes priority over --normalize.",
),
] = False,
) -> None:
"""Watch active player and print all lyrics to stdout once per track change."""
logger.info(
"Starting watch print (player filter: {})",
_player or "<none>",
)
output = PrintOutput(plain=plain)
try:
session = WatchCoordinator(
manager,
output,
player_hint=_player,
config=_app_config,
)
success = asyncio.run(session.run())
if not success:
sys.exit(1)
except KeyboardInterrupt:
logger.info("Watch stopped.")
@ctl_app.command
def offset(delta: str) -> None:
"""Adjust watch offset. Examples: +200, -200, 0."""
-2
View File
@@ -1,5 +1,3 @@
"""Watch subsystem public exports."""
from .session import WatchCoordinator
__all__ = ["WatchCoordinator"]
+10 -1
View File
@@ -1,4 +1,8 @@
"""Unix-socket control channel for communicating with a running watch session."""
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:14:58
Description: Unix-socket control channel for communicating with a running watch session.
"""
import asyncio
import json
@@ -40,14 +44,17 @@ class ControlServer:
return True
try:
# probe the socket to distinguish a live session from a stale socket file
reader, writer = await asyncio.open_unix_connection(str(self._socket_path))
writer.close()
await writer.wait_closed()
# connection succeeded → another watch session is actively listening
logger.error(
"A watch session is already running. Use 'lrx watch ctl status'."
)
return False
except Exception:
# connection refused / file is stale → safe to remove and reuse
try:
self._socket_path.unlink(missing_ok=True)
except Exception:
@@ -136,6 +143,8 @@ def parse_delta(raw: str) -> tuple[bool, int | None, str | None]:
if value.startswith("+"):
return True, int(value[1:]), None
if value.startswith("-"):
# keep the sign by negating; bare int() would accept "-123" too but
# explicit split is clearer about intent and avoids double-negative edge cases
return True, -int(value[1:]), None
return True, int(value), None
except ValueError:
+8 -1
View File
@@ -1,4 +1,8 @@
"""Debounced lyric fetch orchestration for watch session."""
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:14:41
Description: Debounced lyric fetch orchestration for watch session.
"""
import asyncio
from typing import Awaitable, Callable, Optional
@@ -50,6 +54,7 @@ class LyricFetcher:
"""Request lyrics for track with debounce collapsing."""
self._pending_track = track
if self._debounce_task is not None:
# cancel any pending debounce window — the new request supersedes it
self._debounce_task.cancel()
self._debounce_task = asyncio.create_task(self._debounce_then_fetch())
@@ -61,6 +66,7 @@ class LyricFetcher:
return
if self._fetch_task is not None:
# abort any in-flight fetch for a previous track before starting the new one
self._fetch_task.cancel()
await asyncio.gather(self._fetch_task, return_exceptions=True)
@@ -68,6 +74,7 @@ class LyricFetcher:
async def _do_fetch(self, track: TrackMeta) -> None:
"""Execute fetch lifecycle callbacks and fetch lyrics for a track."""
# callbacks may be plain functions or coroutines — handle both
fetching_callback_result = self._on_fetching()
if asyncio.iscoroutine(fetching_callback_result):
await fetching_callback_result
+21 -2
View File
@@ -1,4 +1,8 @@
"""Player discovery, state monitoring, and active-player selection for watch mode."""
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:14:27
Description: Player discovery, state monitoring, and active-player selection for watch mode.
"""
from dataclasses import dataclass
from typing import Callable, Optional
@@ -219,6 +223,7 @@ class PlayerMonitor:
trackid = metadata.get("mpris:trackid")
if trackid is not None:
trackid = _variant_value(trackid)
# normalize Spotify track IDs — the raw MPRIS value varies by client version
if isinstance(trackid, str) and trackid.startswith("spotify:track:"):
trackid = trackid.removeprefix("spotify:track:")
elif isinstance(trackid, str) and trackid.startswith("/com/spotify/track/"):
@@ -230,12 +235,14 @@ class PlayerMonitor:
length_ms = None
length_value = _variant_value(length) if length is not None else None
if isinstance(length_value, int):
# MPRIS reports length in microseconds; convert to milliseconds
length_ms = length_value // 1000
artist = metadata.get("xesam:artist")
artist_v = None
artist_value = _variant_value(artist) if artist is not None else None
if isinstance(artist_value, list) and artist_value:
# xesam:artist is a list; take the first entry as primary artist
artist_v = artist_value[0]
title = metadata.get("xesam:title")
@@ -286,10 +293,14 @@ class PlayerMonitor:
async def _resolve_well_known_name(self, unique_sender: str) -> str | None:
"""Map a DBus unique sender (e.g. :1.42) to a tracked MPRIS bus name."""
if unique_sender in self.players:
# sender is already a well-known name we track (unlikely but fast path)
return unique_sender
if not self._bus:
return None
# Seeked signals arrive with the unique connection name (:1.N), not the
# well-known bus name (org.mpris.MediaPlayer2.X). Ask D-Bus which
# well-known name owns that unique name.
for bus_name in self.players:
try:
reply = await self._bus.call(
@@ -325,6 +336,7 @@ class PlayerMonitor:
message.interface == "org.freedesktop.DBus"
and message.member == "NameOwnerChanged"
):
# a player appeared or disappeared — rescan the full player list
if message.body and str(message.body[0]).startswith(
"org.mpris.MediaPlayer2."
):
@@ -335,7 +347,9 @@ class PlayerMonitor:
message.interface == "org.freedesktop.DBus.Properties"
and message.member == "PropertiesChanged"
):
# Message.sender is a DBus unique name, so match by path+iface.
# message.sender is a unique connection name, not the well-known bus
# name, so we can't filter by sender here — match by object path and
# interface instead to scope it to MPRIS Player properties only
path_ok = message.path == "/org/mpris/MediaPlayer2"
iface = message.body[0] if message.body else None
if path_ok and iface == "org.mpris.MediaPlayer2.Player":
@@ -348,6 +362,7 @@ class PlayerMonitor:
):
sender = message.sender or ""
if sender and message.body:
# MPRIS Seeked position is in microseconds; convert to ms
position_us = int(message.body[0])
asyncio.create_task(
self._handle_seeked_signal(
@@ -391,15 +406,19 @@ class ActivePlayerSelector:
playing = [name for name, st in players.items() if st.status == "Playing"]
if len(playing) == 1:
# unambiguous — only one player is currently playing
return playing[0]
preferred = preferred_player.lower().strip()
# when multiple players are playing, narrow candidates to those; otherwise
# fall back to all known players so a paused preferred player still wins
candidates = playing if playing else list(players.keys())
if preferred:
for name in candidates:
if preferred in name.lower():
return name
# preserve the last selection to avoid jitter when nothing else changes
if last_active and last_active in players:
return last_active
+47 -44
View File
@@ -1,8 +1,11 @@
"""Watch orchestration with explicit MVVM role boundaries.
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:10:52
Description: Watch orchestration with explicit MVVM role boundaries.
- Model: WatchModel stores domain state.
- ViewModel: WatchViewModel projects model to output-facing state/signature.
- Coordinator: WatchCoordinator wires services and drives async workflows.
- Model: WatchModel stores domain state.
- ViewModel: WatchViewModel projects model to output-facing state/signature.
- Coordinator: WatchCoordinator wires services and drives async workflows.
"""
import asyncio
@@ -17,7 +20,7 @@ from ..models import TrackMeta
from .control import ControlServer
from .fetcher import LyricFetcher
from ..config import AppConfig
from .view import BaseOutput, LyricView, WatchState
from .view import BaseOutput, LyricView, WatchState, WatchStatus
from .player import ActivePlayerSelector, PlayerMonitor, PlayerTarget
from .tracker import PositionTracker
@@ -28,14 +31,14 @@ class WatchModel:
offset_ms: int
active_player: str | None
active_track_key: str | None
status: str
status: WatchStatus
lyrics: LyricView | None
def __init__(self) -> None:
self.offset_ms = 0
self.active_player: str | None = None
self.active_track_key: str | None = None
self.status: str = "idle"
self.status: WatchStatus = WatchStatus.IDLE
self.lyrics: LyricView | None = None
def set_lyrics(self, lyrics: LRCData | None) -> None:
@@ -48,6 +51,8 @@ class WatchModel:
def state_signature(self, track: TrackMeta | None, position_ms: int) -> tuple:
"""Build dedupe signature from model state and current lyric cursor."""
# prefer trackid when available; fall back to display name for players
# that don't expose a stable ID (e.g. some MPRIS implementations)
track_key = (
track.trackid
if track and track.trackid
@@ -56,7 +61,8 @@ class WatchModel:
else None
)
if self.status != "ok" or self.lyrics is None:
if self.status != WatchStatus.OK or self.lyrics is None:
# non-OK states don't have cursor position — discriminate by status alone
return ("status", self.status, self.active_player, track_key)
at_ms = position_ms + self.offset_ms
cursor = self.lyrics.signature_cursor(at_ms)
@@ -82,7 +88,7 @@ class WatchViewModel:
lyrics=self._model.lyrics,
position_ms=position_ms,
offset_ms=self._model.offset_ms,
status=self._model.status, # type: ignore[arg-type]
status=self._model.status,
)
@@ -164,7 +170,9 @@ class WatchCoordinator:
await self._player_monitor.start()
await self._tracker.start()
self._calibration_task = asyncio.create_task(self._calibration_loop())
# emit once at startup so outputs don't sit blank until the first event
self._schedule_emit()
# block forever; CancelledError from signal handler exits the loop cleanly
await asyncio.Event().wait()
return True
except asyncio.CancelledError:
@@ -206,8 +214,10 @@ class WatchCoordinator:
if track is None:
return False
if self._model.lyrics is not None:
# lyrics already loaded — nothing to fetch
return False
if self._model.status == "fetching":
if self._model.status == WatchStatus.FETCHING:
# a fetch is already in flight — don't queue another
return False
logger.info("fetching lyrics for track ({}): {}", reason, track.display_name())
self._fetcher.request(track)
@@ -246,7 +256,7 @@ class WatchCoordinator:
)
if selected is None:
self._model.status = "idle"
self._model.status = WatchStatus.IDLE
self._model.active_track_key = None
self._model.set_lyrics(None)
self._schedule_emit()
@@ -254,7 +264,7 @@ class WatchCoordinator:
state = self._player_monitor.players.get(selected)
if state is None:
self._model.status = "idle"
self._model.status = WatchStatus.IDLE
self._model.active_track_key = None
self._model.set_lyrics(None)
self._schedule_emit()
@@ -272,6 +282,7 @@ class WatchCoordinator:
track_changed = track_key != prev_track_key
player_changed = selected != prev_player
if track_changed or player_changed:
# clear stale lyrics immediately so the old track's lines don't flash
self._model.set_lyrics(None)
self._model.active_track_key = track_key
@@ -284,27 +295,20 @@ class WatchCoordinator:
)
)
if state.status != "Playing":
self._model.status = "paused"
self._schedule_emit()
return
# only fetch on identity change — calibration ticks must not re-trigger fetches
started_fetch = False
if track is not None and (player_changed or track_changed):
started_fetch = self._request_fetch_for_active_track("track-changed")
elif (
track is not None
and self._model.lyrics is None
and self._model.status == "paused"
):
started_fetch = self._request_fetch_for_active_track("resume-playing")
# derive status from what actually happened this tick; preserve FETCHING
# if an in-flight request was started before this snapshot arrived
if self._model.lyrics is not None:
self._model.status = "ok"
self._model.status = WatchStatus.OK
elif started_fetch:
self._model.status = "fetching"
elif self._model.status != "fetching":
self._model.status = "no_lyrics"
self._model.status = WatchStatus.FETCHING
elif self._model.status != WatchStatus.FETCHING:
# don't overwrite FETCHING with NO_LYRICS while a request is in flight
self._model.status = WatchStatus.NO_LYRICS
self._schedule_emit()
def _on_seeked(self, bus_name: str, position_ms: int) -> None:
@@ -312,29 +316,18 @@ class WatchCoordinator:
asyncio.create_task(self._tracker.on_seeked(bus_name, position_ms))
def _on_playback_status(self, bus_name: str, status: str) -> None:
"""React to playback status change and tracker sync."""
if bus_name == self._model.active_player:
if status == "Playing":
started_fetch = self._request_fetch_for_active_track("resume-playing")
if self._model.lyrics is not None:
self._model.status = "ok"
elif started_fetch:
self._model.status = "fetching"
elif self._model.status != "fetching":
self._model.status = "no_lyrics"
else:
self._model.status = "paused"
self._schedule_emit()
"""Forward playback status change to position tracker."""
asyncio.create_task(self._tracker.on_playback_status(bus_name, status))
def _on_tracker_tick(self) -> None:
"""Emit updates from tracker tick only while lyrics are actively rendering."""
if self._model.status == "ok":
if self._model.status == WatchStatus.OK and self._output.position_sensitive:
self._schedule_emit()
def _schedule_emit(self) -> None:
"""Coalesce frequent events into at most one in-flight emit task."""
if self._emit_scheduled:
# a task is already queued; it will pick up the latest model state when it runs
return
self._emit_scheduled = True
asyncio.create_task(self._run_scheduled_emit())
@@ -344,17 +337,20 @@ class WatchCoordinator:
try:
await self._emit_state()
finally:
# release the gate even on error so future events can still schedule
self._emit_scheduled = False
async def _on_fetching(self) -> None:
"""Mark model as fetching and emit state."""
self._model.status = "fetching"
self._model.status = WatchStatus.FETCHING
await self._emit_state()
async def _on_lyrics_update(self, lyrics: Optional[LRCData]) -> None:
"""Update model with fetched lyrics and emit state."""
self._model.set_lyrics(lyrics)
self._model.status = "ok" if lyrics is not None else "no_lyrics"
self._model.status = (
WatchStatus.OK if lyrics is not None else WatchStatus.NO_LYRICS
)
logger.info(
"lyrics update result: {}",
"found" if lyrics is not None else "not found",
@@ -365,10 +361,17 @@ class WatchCoordinator:
"""Emit output state only when semantic signature changes."""
player = self._player_monitor.players.get(self._model.active_player or "")
track = player.track if player else None
position = await self._tracker.get_position_ms()
# position=0 for non-position-sensitive outputs so the signature is stable
# across ticks and on_state fires at most once per track+status transition
position = (
await self._tracker.get_position_ms()
if self._output.position_sensitive
else 0
)
signature = self._view_model.signature(track, position)
if signature == self._last_emit_signature:
# state hasn't changed semantically — skip redundant render
return
self._last_emit_signature = signature
state = self._view_model.state(track, position)
+12 -1
View File
@@ -1,4 +1,8 @@
"""Playback position tracking utilities for watch mode."""
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:13:35
Description: Playback position tracking utilities for watch mode.
"""
import asyncio
import time
@@ -69,7 +73,10 @@ class PositionTracker:
self._is_playing = playback_status == "Playing"
status_changed_to_playing = self._is_playing and not was_playing
if player_changed or track_changed:
# reset to 0 so stale position from a previous track doesn't bleed through
self._position_ms = 0
# only poll MPRIS when something changed and the player is actually running;
# avoids an unnecessary D-Bus round-trip on every calibration-loop tick
should_calibrate_now = (
self._is_playing
and bool(self._active_player)
@@ -97,6 +104,7 @@ class PositionTracker:
return
was_playing = self._is_playing
self._is_playing = playback_status == "Playing"
# re-anchor last_tick when resuming so the gap while paused isn't counted
should_calibrate_now = self._is_playing and not was_playing
self._last_tick = time.monotonic()
@@ -112,10 +120,13 @@ class PositionTracker:
async with self._lock:
now = time.monotonic()
if self._is_playing and self._active_player:
# accumulate elapsed wall-clock time as playback position;
# seek events and calibration snapshots correct drift periodically
delta_ms = int((now - self._last_tick) * 1000)
if delta_ms > 0:
self._position_ms += delta_ms
should_notify = True
# always update last_tick so paused time isn't counted on resume
self._last_tick = now
if should_notify and self._on_tick is not None:
+22 -2
View File
@@ -3,12 +3,20 @@
from abc import ABC, abstractmethod
from bisect import bisect_right
from dataclasses import dataclass
from typing import Literal, Optional
from enum import Enum
from typing import Optional
from ...lrc import LRCData, LyricLine
from ...models import TrackMeta
class WatchStatus(str, Enum):
IDLE = "idle"
FETCHING = "fetching"
OK = "ok"
NO_LYRICS = "no_lyrics"
@dataclass(slots=True, frozen=True)
class LyricView:
"""View-ready immutable lyric data projected from one normalized LRC object."""
@@ -29,13 +37,16 @@ class LyricView:
line_index = 0
for line in normalized.lines:
if not isinstance(line, LyricLine):
# skip metadata/tag lines that carry no renderable text
continue
text = line.text
lines.append(text)
# use first timestamp; clamp to 0 so bisect always works with non-negative ms
timestamp = line.line_times_ms[0] if line.line_times_ms else 0
entries.append((max(0, timestamp), line_index))
line_index += 1
# extract timestamps into a flat tuple so bisect_right can binary-search it
timestamps = tuple(timestamp for timestamp, _ in entries)
return LyricView(
normalized=normalized,
@@ -47,12 +58,16 @@ class LyricView:
def signature_cursor(self, at_ms: int) -> tuple:
"""Build a stable cursor signature for dedupe decisions."""
if not self.timed_line_entries:
# untimed lyrics: signature is the full line set — changes only on track change
return ("plain", self.lines)
first_ts = self.timed_line_entries[0][0]
if at_ms < first_ts:
# playback hasn't reached the first lyric yet; hold until it does
return ("before_first", first_ts)
# bisect_right gives the insertion point after equal timestamps, so -1 gives
# the last line whose timestamp <= at_ms (i.e. the currently active line)
idx = bisect_right(self.timestamps, at_ms) - 1
if idx < 0:
idx = 0
@@ -70,10 +85,15 @@ class WatchState:
lyrics: Optional[LyricView]
position_ms: int
offset_ms: int
status: Literal["fetching", "ok", "no_lyrics", "paused", "idle"]
status: WatchStatus
class BaseOutput(ABC):
# When False, the coordinator passes position=0 for signature computation and
# skips tracker-tick-driven emits, so on_state fires at most once per
# track+status transition rather than on every lyric cursor advance.
position_sensitive: bool = True
@abstractmethod
async def on_state(self, state: WatchState) -> None:
"""Render or deliver one watch state frame."""
+15 -8
View File
@@ -1,10 +1,14 @@
"""Pipe output implementation for watch mode."""
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:15:17
Description: Pipe output implementation for watch mode.
"""
from bisect import bisect_right
from dataclasses import dataclass
import sys
from . import BaseOutput, WatchState
from . import BaseOutput, WatchState, WatchStatus
@dataclass(slots=True)
@@ -38,12 +42,14 @@ class PipeOutput(BaseOutput):
effective_ms = state.position_ms + state.offset_ms
current_line_idx: int | None
if entries and effective_ms < entries[0][0]:
# Before first timestamp, current lyric is empty and after-window shows upcoming lines.
# playback hasn't reached the first lyric yet; treat current slot as empty
# so the after-window can show upcoming lines without a "current" anchor
current_line_idx = None
else:
if not entries:
current_line_idx = 0
else:
# bisect_right - 1 gives the last entry whose timestamp <= effective_ms
current_entry_idx = (
bisect_right(state.lyrics.timestamps, effective_ms) - 1
)
@@ -54,6 +60,8 @@ class PipeOutput(BaseOutput):
out: list[str] = []
for rel in range(-self.before, self.after + 1):
if current_line_idx is None:
# before-first-timestamp: before/current slots are empty; after slots
# show lines starting from index 0 (rel=1 → line 0, rel=2 → line 1, …)
if rel <= 0:
out.append("")
continue
@@ -70,17 +78,16 @@ class PipeOutput(BaseOutput):
async def on_state(self, state: WatchState) -> None:
"""Render and flush one frame for the latest watch state."""
if state.status == "fetching":
if state.status == WatchStatus.FETCHING:
lines = self._render_status("[fetching...]")
elif state.status == "no_lyrics":
elif state.status == WatchStatus.NO_LYRICS:
lines = self._render_status("[no lyrics]")
elif state.status == "paused":
lines = self._render_status("[paused]")
elif state.status == "idle":
elif state.status == WatchStatus.IDLE:
lines = self._render_status("[idle]")
else:
lines = self._render_lyrics(state)
for line in lines:
# no_newline mode lets callers use \r to overwrite the previous frame in-place
sys.stdout.write(line + ("\n" if not self.no_newline else ""))
sys.stdout.flush()
+44
View File
@@ -0,0 +1,44 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:15:31
Description: Print output implementation for watch mode — one shot per track.
"""
import sys
from . import BaseOutput, WatchState, WatchStatus
class PrintOutput(BaseOutput):
"""Emit full lyrics to stdout once per track transition, then stay silent.
Deduplication is delegated to the coordinator via position_sensitive=False:
the coordinator uses a fixed position for signatures, so on_state fires at
most once per (status, track_key) transition rather than on every tick.
"""
# fixed position=0 in signatures → coordinator calls on_state only on
# track/status transitions, never on lyric cursor advances
position_sensitive = False
plain: bool
def __init__(self, plain: bool = False) -> None:
self.plain = plain
async def on_state(self, state: WatchState) -> None:
if state.status == WatchStatus.FETCHING or state.status == WatchStatus.IDLE:
return
if state.status == WatchStatus.NO_LYRICS:
# emit a blank line as a machine-readable sentinel for "track changed, no lyrics"
sys.stdout.write("\n")
sys.stdout.flush()
elif state.status == WatchStatus.OK and state.lyrics is not None:
lrc = state.lyrics.normalized
if self.plain:
text = lrc.to_plain()
else:
text = str(lrc)
sys.stdout.write(text + "\n")
sys.stdout.flush()
+426 -269
View File
@@ -2,20 +2,22 @@ from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Optional
from lrx_cli.lrc import LRCData
from lrx_cli.models import TrackMeta
from lrx_cli.watch.control import ControlClient, ControlServer, parse_delta
from lrx_cli.watch.view import BaseOutput, LyricView, WatchState
from lrx_cli.watch.view import BaseOutput, LyricView, WatchState, WatchStatus
from lrx_cli.watch.view.pipe import PipeOutput
from lrx_cli.watch.view.print import PrintOutput
from lrx_cli.watch.player import ActivePlayerSelector, PlayerState, PlayerTarget
from lrx_cli.watch.fetcher import LyricFetcher
from lrx_cli.config import AppConfig
from lrx_cli.watch.tracker import PositionTracker
from lrx_cli.watch.session import WatchCoordinator
TEST_CONFIG = AppConfig()
BUS = "org.mpris.MediaPlayer2.spotify"
def test_parse_delta_supports_plus_minus_and_reset() -> None:
@@ -24,16 +26,17 @@ def test_parse_delta_supports_plus_minus_and_reset() -> None:
assert parse_delta("0") == (True, 0, None)
# PlayerTarget
def test_player_target_allows_all_when_hint_empty() -> None:
target = PlayerTarget()
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
assert target.allows("org.mpris.MediaPlayer2.mpd") is True
def test_player_target_filters_by_case_insensitive_substring() -> None:
target = PlayerTarget("Spot")
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
assert target.allows("org.mpris.MediaPlayer2.mpd") is False
@@ -43,60 +46,72 @@ def test_player_target_reports_blacklisted_hint() -> None:
assert target.validation_error() is not None
def test_player_target_non_blacklisted_hint_is_valid() -> None:
target = PlayerTarget("mpd", player_blacklist=("spotify",))
assert target.validation_error() is None
# ActivePlayerSelector
def _ps(bus: str, status: str = "Playing") -> PlayerState:
return PlayerState(bus_name=bus, status=status, track=TrackMeta(title="T"))
def test_active_player_selector_returns_none_when_no_players() -> None:
assert ActivePlayerSelector.select({}, None, "spotify") is None
def test_active_player_selector_prefers_single_playing() -> None:
players = {
"org.mpris.MediaPlayer2.foo": PlayerState(
bus_name="org.mpris.MediaPlayer2.foo",
status="Paused",
track=TrackMeta(title="A"),
),
"org.mpris.MediaPlayer2.bar": PlayerState(
bus_name="org.mpris.MediaPlayer2.bar",
status="Playing",
track=TrackMeta(title="B"),
),
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"),
"org.mpris.MediaPlayer2.bar": _ps("org.mpris.MediaPlayer2.bar", "Playing"),
}
assert (
ActivePlayerSelector.select(players, None, TEST_CONFIG.general.preferred_player)
ActivePlayerSelector.select(players, None, "spotify")
== "org.mpris.MediaPlayer2.bar"
)
def test_active_player_selector_prefers_keyword_among_multiple_playing() -> None:
players = {
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo"),
"org.mpris.MediaPlayer2.spotify": _ps("org.mpris.MediaPlayer2.spotify"),
}
assert (
ActivePlayerSelector.select(players, None, "spotify")
== "org.mpris.MediaPlayer2.spotify"
)
def test_active_player_selector_uses_last_active_when_no_playing() -> None:
players = {
"org.mpris.MediaPlayer2.foo": PlayerState(
bus_name="org.mpris.MediaPlayer2.foo",
status="Paused",
track=TrackMeta(title="A"),
),
"org.mpris.MediaPlayer2.bar": PlayerState(
bus_name="org.mpris.MediaPlayer2.bar",
status="Stopped",
track=TrackMeta(title="B"),
),
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"),
"org.mpris.MediaPlayer2.bar": _ps("org.mpris.MediaPlayer2.bar", "Stopped"),
}
assert (
ActivePlayerSelector.select(
players,
"org.mpris.MediaPlayer2.bar",
TEST_CONFIG.general.preferred_player,
)
ActivePlayerSelector.select(players, "org.mpris.MediaPlayer2.bar", "spotify")
== "org.mpris.MediaPlayer2.bar"
)
def test_active_player_selector_falls_back_to_first_when_no_preference() -> None:
players = {
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"),
}
result = ActivePlayerSelector.select(players, None, "")
assert result == "org.mpris.MediaPlayer2.foo"
# PositionTracker
def test_position_tracker_seeked_calibrates_immediately() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 1200
tracker = PositionTracker(_poll, TEST_CONFIG)
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=1200), TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
await tracker.on_seeked("org.mpris.MediaPlayer2.foo", 3500)
await tracker.set_active_player(BUS, "Playing", "track-A")
await tracker.on_seeked(BUS, 3500)
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 3500
@@ -104,74 +119,56 @@ def test_position_tracker_seeked_calibrates_immediately() -> None:
asyncio.run(_run())
def test_position_tracker_playback_status_pause_stops_fast_growth() -> None:
def test_position_tracker_pause_stops_position_growth() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 0
tracker = PositionTracker(_poll, TEST_CONFIG)
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=0), TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
await tracker.set_active_player(BUS, "Playing", "track-A")
await asyncio.sleep(0.08)
before = await tracker.get_position_ms()
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Paused")
await tracker.on_playback_status(BUS, "Paused")
await asyncio.sleep(0.08)
after = await tracker.get_position_ms()
await tracker.stop()
assert before > 0
assert after - before < 20
asyncio.run(_run())
def test_position_tracker_playback_status_playing_calibrates_once() -> None:
def test_position_tracker_resume_via_playback_status_calibrates() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 50000
tracker = PositionTracker(_poll, TEST_CONFIG)
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=50000), TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
)
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Playing")
await tracker.set_active_player(BUS, "Paused", "track-A")
await tracker.on_playback_status(BUS, "Playing")
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 50000
asyncio.run(_run())
def test_position_tracker_set_active_player_playing_calibrates_on_resume() -> None:
def test_position_tracker_resume_via_set_active_player_calibrates() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 42000
tracker = PositionTracker(_poll, TEST_CONFIG)
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=42000), TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
)
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
await tracker.set_active_player(BUS, "Paused", "track-A")
await tracker.set_active_player(BUS, "Playing", "track-A")
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 42000
asyncio.run(_run())
# ControlServer and ControlClient
def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
async def _run() -> None:
class _Session:
def __init__(self):
def __init__(self) -> None:
self.offset = 0
def handle_offset(self, delta: int) -> dict:
@@ -183,14 +180,11 @@ def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
socket_path = tmp_path / "watch.sock"
server = ControlServer(socket_path=str(socket_path))
session = _Session()
await server.start(session) # type: ignore
await server.start(_Session()) # type: ignore
client = ControlClient(socket_path=str(socket_path))
r1 = await client._send_async({"cmd": "offset", "delta": 200})
r2 = await client._send_async({"cmd": "status"})
await server.stop()
assert r1 == {"ok": True, "offset_ms": 200}
assert r2["ok"] is True
assert r2["offset_ms"] == 200
@@ -198,260 +192,423 @@ def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
asyncio.run(_run())
def test_pipe_output_prints_fixed_window_for_status(capsys) -> None:
output = PipeOutput(before=1, after=1)
state = WatchState(
track=None,
lyrics=None,
position_ms=0,
offset_ms=0,
status="fetching",
# PipeOutput
def _pipe_state(
status: WatchStatus,
lyrics: Optional[LRCData] = None,
position_ms: int = 0,
offset_ms: int = 0,
track: Optional[TrackMeta] = None,
) -> WatchState:
return WatchState(
track=track,
lyrics=LyricView.from_lrc(lyrics) if lyrics else None,
position_ms=position_ms,
offset_ms=offset_ms,
status=status,
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\n[fetching...]\n\n"
def test_pipe_output_uses_context_window_for_lyrics(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=2100,
offset_ms=0,
status="ok",
def test_pipe_output_fetching_renders_status_window(capsys) -> None:
asyncio.run(
PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.FETCHING))
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "a\nb\nc\n"
assert capsys.readouterr().out == "\n[fetching...]\n\n"
def test_pipe_output_shows_upcoming_lines_before_first_timestamp(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:02.00]a\n[00:03.00]b")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=0,
offset_ms=0,
status="ok",
def test_pipe_output_no_lyrics_renders_status_window(capsys) -> None:
asyncio.run(
PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.NO_LYRICS))
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\n\na\n"
assert capsys.readouterr().out == "\n[no lyrics]\n\n"
def test_pipe_output_first_line_keeps_before_region_empty(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=1100,
offset_ms=0,
status="ok",
def test_pipe_output_idle_renders_status_window(capsys) -> None:
asyncio.run(PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.IDLE)))
assert capsys.readouterr().out == "\n[idle]\n\n"
def test_pipe_output_no_newline_mode(capsys) -> None:
asyncio.run(
PipeOutput(before=0, after=0, no_newline=True).on_state(
_pipe_state(WatchStatus.FETCHING)
)
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\na\nb\n"
assert capsys.readouterr().out == "[fetching...]"
def test_pipe_output_last_line_keeps_after_region_empty(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=3100,
offset_ms=0,
status="ok",
def test_pipe_output_default_window_shows_current_line(capsys) -> None:
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
asyncio.run(
PipeOutput().on_state(_pipe_state(WatchStatus.OK, lrc, position_ms=2100))
)
assert capsys.readouterr().out == "b\n"
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "b\nc\n\n"
def test_pipe_output_context_window(capsys) -> None:
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
asyncio.run(
PipeOutput(before=1, after=1).on_state(
_pipe_state(WatchStatus.OK, lrc, position_ms=2100)
)
)
assert capsys.readouterr().out == "a\nb\nc\n"
def test_pipe_output_before_region_empty_at_first_line(capsys) -> None:
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
asyncio.run(
PipeOutput(before=1, after=1).on_state(
_pipe_state(WatchStatus.OK, lrc, position_ms=1100)
)
)
assert capsys.readouterr().out == "\na\nb\n"
def test_pipe_output_after_region_empty_at_last_line(capsys) -> None:
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
asyncio.run(
PipeOutput(before=1, after=1).on_state(
_pipe_state(WatchStatus.OK, lrc, position_ms=3100)
)
)
assert capsys.readouterr().out == "b\nc\n\n"
def test_pipe_output_upcoming_lines_before_first_timestamp(capsys) -> None:
lrc = LRCData("[00:02.00]a\n[00:03.00]b")
asyncio.run(
PipeOutput(before=1, after=1).on_state(
_pipe_state(WatchStatus.OK, lrc, position_ms=0)
)
)
assert capsys.readouterr().out == "\n\na\n"
def test_pipe_output_offset_ms_shifts_effective_position(capsys) -> None:
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
asyncio.run(
PipeOutput().on_state(
_pipe_state(WatchStatus.OK, lrc, position_ms=1000, offset_ms=1500)
)
)
# effective = 2500 ms → line b
assert capsys.readouterr().out == "b\n"
def test_pipe_output_repeated_text_uses_correct_timed_occurrence(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C")
state = WatchState(
track=TrackMeta(title="Song"),
lrc = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C")
asyncio.run(
PipeOutput(before=1, after=1).on_state(
_pipe_state(WatchStatus.OK, lrc, position_ms=4100)
)
)
assert capsys.readouterr().out == "B\nX\nC\n"
# PrintOutput
def _ok_state(lyrics: LRCData, track: Optional[TrackMeta] = None) -> WatchState:
return WatchState(
track=track or TrackMeta(title="Song", artist="Artist"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=4100,
position_ms=0,
offset_ms=0,
status="ok",
status=WatchStatus.OK,
)
def _status_state(status: WatchStatus, track: Optional[TrackMeta] = None) -> WatchState:
return WatchState(
track=track or TrackMeta(title="Song", artist="Artist"),
lyrics=None,
position_ms=0,
offset_ms=0,
status=status,
)
def test_print_output_emits_lrc_on_ok(capsys) -> None:
asyncio.run(
PrintOutput().on_state(_ok_state(LRCData("[00:01.00]Hello\n[00:02.00]World")))
)
assert capsys.readouterr().out.startswith("[00:01.00]")
def test_print_output_plain_strips_tags(capsys) -> None:
asyncio.run(
PrintOutput(plain=True).on_state(
_ok_state(LRCData("[00:01.00]Hello\n[00:02.00]World"))
)
)
out = capsys.readouterr().out
assert "[" not in out
assert "Hello" in out
def test_print_output_plain_with_unsynced_lyrics(capsys) -> None:
asyncio.run(PrintOutput(plain=True).on_state(_ok_state(LRCData("Hello\nWorld"))))
out = capsys.readouterr().out
assert "Hello" in out
assert "[" not in out
def test_print_output_no_lyrics_emits_blank_line(capsys) -> None:
asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.NO_LYRICS)))
assert capsys.readouterr().out == "\n"
def test_print_output_fetching_emits_nothing(capsys) -> None:
asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.FETCHING)))
assert capsys.readouterr().out == ""
def test_print_output_idle_emits_nothing(capsys) -> None:
asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.IDLE)))
assert capsys.readouterr().out == ""
def test_print_output_is_stateless(capsys) -> None:
"""View has no internal deduplication — emits on every call."""
output = PrintOutput()
state = _ok_state(LRCData("[00:01.00]Hello"))
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "B\nX\nC\n"
asyncio.run(output.on_state(state))
lines = [ln for ln in capsys.readouterr().out.splitlines() if ln]
assert len(lines) == 2
def test_session_fetches_on_resume_playing_without_lyrics() -> None:
def test_print_output_position_sensitive_is_false() -> None:
assert PrintOutput.position_sensitive is False
# WatchCoordinator
class _CaptureFetcher:
def __init__(self) -> None:
self.requested: list[str] = []
def request(self, track: TrackMeta) -> None:
self.requested.append(track.display_name())
async def stop(self) -> None:
pass
def _make_coordinator(output: Optional[BaseOutput] = None) -> WatchCoordinator:
class _Manager:
def fetch_for_track(self, *_a, **_kw):
return None
class _NullOutput(BaseOutput):
async def on_state(self, state: WatchState) -> None:
pass
session = WatchCoordinator(
_Manager(), # type: ignore
output or _NullOutput(),
player_hint=None,
config=TEST_CONFIG,
)
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_CONFIG,
)
return session
def _pstate(status: str = "Playing", title: str = "Song") -> PlayerState:
return PlayerState(
bus_name=BUS,
status=status,
track=TrackMeta(title=title, artist="Artist"),
)
def test_coordinator_fetches_on_initial_player() -> None:
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
session = _make_coordinator()
fetcher = _CaptureFetcher()
session._fetcher = fetcher # type: ignore[assignment]
session._player_monitor.players = {BUS: _pstate("Playing")}
session._on_player_change()
await asyncio.sleep(0)
assert fetcher.requested == ["Artist - Song"]
assert session._model.status == WatchStatus.FETCHING
class _Output(BaseOutput):
async def on_state(self, state: WatchState) -> None:
return None
asyncio.run(_run())
class _Fetcher(LyricFetcher):
def __init__(self):
async def _fetch(_track: TrackMeta):
return None
async def _on_fetching() -> None:
return None
def test_coordinator_fetches_while_paused() -> None:
"""Fetch starts immediately even when player is paused — no wait for resume."""
async def _on_result(_lyrics) -> None:
return None
async def _run() -> None:
session = _make_coordinator()
fetcher = _CaptureFetcher()
session._fetcher = fetcher # type: ignore[assignment]
session._player_monitor.players = {BUS: _pstate("Paused")}
session._on_player_change()
await asyncio.sleep(0)
assert fetcher.requested == ["Artist - Song"]
super().__init__(
_fetch, _on_fetching, _on_result, TEST_CONFIG.watch.debounce_ms
)
self.requested = []
asyncio.run(_run())
def request(self, track: TrackMeta) -> None:
self.requested.append(track.display_name())
session = WatchCoordinator(
_Manager(), # type: ignore
_Output(),
player_hint=None,
config=TEST_CONFIG,
)
fake_fetcher = _Fetcher()
session._fetcher = fake_fetcher
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_CONFIG,
)
def test_coordinator_fetches_on_track_change() -> None:
async def _run() -> None:
session = _make_coordinator()
session._model.active_player = BUS
session._model.active_track_key = "Artist - Old Song"
session._model.set_lyrics(LRCData("[00:01.00]old"))
session._model.status = WatchStatus.OK
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
}
session._model.set_lyrics(None)
session._model.status = "paused"
session._on_playback_status(bus_name, "Playing")
fetcher = _CaptureFetcher()
session._fetcher = fetcher # type: ignore[assignment]
session._player_monitor.players = {BUS: _pstate("Playing", title="New Song")}
session._on_player_change()
await asyncio.sleep(0)
assert fake_fetcher.requested == ["Artist - Song"]
assert session._model.status == "fetching"
assert fetcher.requested == ["Artist - New Song"]
assert session._model.lyrics is None
asyncio.run(_run())
def test_session_emit_state_only_when_lyric_cursor_changes() -> None:
def test_coordinator_no_refetch_on_calibration_no_lyrics() -> None:
"""Calibration with same player/track and no_lyrics must NOT trigger a second fetch."""
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
session = _make_coordinator()
fetcher = _CaptureFetcher()
session._fetcher = fetcher # type: ignore[assignment]
session._player_monitor.players = {BUS: _pstate("Playing")}
session._on_player_change()
await asyncio.sleep(0)
assert len(fetcher.requested) == 1
class _Output(BaseOutput):
def __init__(self):
self.count = 0
session._model.status = WatchStatus.NO_LYRICS
session._on_player_change()
await asyncio.sleep(0)
assert len(fetcher.requested) == 1
async def on_state(self, state: WatchState) -> None:
self.count += 1
asyncio.run(_run())
output = _Output()
session = WatchCoordinator(
_Manager(), # type: ignore
output,
player_hint=None,
config=TEST_CONFIG,
)
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_CONFIG,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
def test_coordinator_no_fetch_when_lyrics_present() -> None:
async def _run() -> None:
session = _make_coordinator()
session._model.active_player = BUS
session._model.active_track_key = "Artist - Song"
session._model.set_lyrics(LRCData("[00:01.00]line"))
session._model.status = WatchStatus.OK
fetcher = _CaptureFetcher()
session._fetcher = fetcher # type: ignore[assignment]
session._player_monitor.players = {BUS: _pstate("Playing")}
session._on_player_change()
await asyncio.sleep(0)
assert fetcher.requested == []
assert session._model.status == WatchStatus.OK
asyncio.run(_run())
def test_coordinator_player_disappears_goes_idle() -> None:
async def _run() -> None:
session = _make_coordinator()
session._model.active_player = BUS
session._model.active_track_key = "Artist - Song"
session._model.set_lyrics(LRCData("[00:01.00]line"))
session._model.status = WatchStatus.OK
session._player_monitor.players = {}
session._on_player_change()
await asyncio.sleep(0)
assert session._model.status == WatchStatus.IDLE
assert session._model.lyrics is None
assert session._model.active_player is None
asyncio.run(_run())
def test_coordinator_no_fetch_when_track_is_none() -> None:
"""Player present but reports no track metadata → no fetch, status NO_LYRICS."""
async def _run() -> None:
session = _make_coordinator()
fetcher = _CaptureFetcher()
session._fetcher = fetcher # type: ignore[assignment]
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
BUS: PlayerState(bus_name=BUS, status="Playing", track=None)
}
session._on_player_change()
await asyncio.sleep(0)
assert fetcher.requested == []
assert session._model.status == WatchStatus.NO_LYRICS
asyncio.run(_run())
def test_coordinator_emit_deduplicates_on_same_cursor() -> None:
async def _run() -> None:
counts = [0]
class _CountOutput(BaseOutput):
async def on_state(self, state: WatchState) -> None:
counts[0] += 1
session = _make_coordinator(_CountOutput())
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = BUS
session._player_monitor.players = {
BUS: PlayerState(bus_name=BUS, status="Playing", track=track)
}
session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b"))
session._model.status = "ok"
await session._tracker.set_active_player(
bus_name,
"Playing",
"Artist - Song",
)
session._model.status = WatchStatus.OK
await session._tracker.set_active_player(BUS, "Playing", "Artist - Song")
await session._emit_state()
await session._emit_state()
await session._emit_state() # emits
await session._emit_state() # same cursor → no emit
assert counts[0] == 1
await session._tracker.on_seeked(bus_name, 3200)
await session._emit_state()
assert output.count == 2
await session._tracker.on_seeked(BUS, 3200)
await session._emit_state() # cursor advanced → emits
assert counts[0] == 2
asyncio.run(_run())
def test_session_emits_when_crossing_first_timestamp() -> None:
def test_coordinator_position_insensitive_output_ignores_seeks() -> None:
"""With position_sensitive=False, seek events do not trigger re-emit."""
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
class _Output(BaseOutput):
def __init__(self):
self.count = 0
counts = [0]
class _CountPrint(PrintOutput):
async def on_state(self, state: WatchState) -> None:
self.count += 1
counts[0] += 1
output = _Output()
session = WatchCoordinator(
_Manager(), # type: ignore
output,
player_hint=None,
config=TEST_CONFIG,
)
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_CONFIG,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
session = _make_coordinator(_CountPrint())
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._model.active_player = BUS
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
BUS: PlayerState(bus_name=BUS, status="Playing", track=track)
}
session._model.set_lyrics(LRCData("[00:02.00]a\n[00:03.00]b"))
session._model.status = "ok"
await session._tracker.set_active_player(
bus_name,
"Playing",
"Artist - Song",
)
session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b"))
session._model.status = WatchStatus.OK
await session._emit_state()
await session._tracker.on_seeked(bus_name, 2500)
await session._emit_state()
await session._emit_state() # emits once
assert counts[0] == 1
assert output.count == 2
await session._tracker.on_seeked(BUS, 3200)
await session._emit_state() # position fixed at 0 → same signature → no re-emit
assert counts[0] == 1
asyncio.run(_run())
Generated
+1 -1
View File
@@ -153,7 +153,7 @@ wheels = [
[[package]]
name = "lrx-cli"
version = "0.7.2"
version = "0.7.3"
source = { editable = "." }
dependencies = [
{ name = "cyclopts" },