Compare commits
2 Commits
1c160d5ccb
...
60732f2986
| Author | SHA1 | Date | |
|---|---|---|---|
|
60732f2986
|
|||
|
633983ed98
|
@@ -143,8 +143,8 @@ socket_path = "" # Unix socket path; defaults to <cache_dir>/
|
||||
Clone this repository:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/Uyanide/LRX-CLI.git
|
||||
cd LRX-CLI
|
||||
git clone https://github.com/Uyanide/lrx-cli.git
|
||||
cd lrx-cli
|
||||
```
|
||||
|
||||
Create a virtual environment and install dependencies (for example, using uv):
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "lrx-cli"
|
||||
version = "0.7.2"
|
||||
version = "0.7.3"
|
||||
description = "Fetch line-synced lyrics for your music player."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
|
||||
+31
-35
@@ -21,9 +21,9 @@ colorama==0.4.6 ; sys_platform == 'win32' \
|
||||
# via
|
||||
# loguru
|
||||
# pytest
|
||||
cyclopts==4.10.1 \
|
||||
--hash=sha256:35f37257139380a386d9fe4475e1e7c87ca7795765ef4f31abba579fcfcb6ecd \
|
||||
--hash=sha256:ad4e4bb90576412d32276b14a76f55d43353753d16217f2c3cd5bdceba7f15a0
|
||||
cyclopts==4.10.2 \
|
||||
--hash=sha256:a1f2d6f8f7afac9456b48f75a40b36658778ddc9c6d406b520d017ae32c990fe \
|
||||
--hash=sha256:d7b950457ef2563596d56331f80cbbbf86a2772535fb8b315c4f03bc7e6127f1
|
||||
# via lrx-cli
|
||||
dbus-next==0.2.3 \
|
||||
--hash=sha256:58948f9aff9db08316734c0be2a120f6dc502124d9642f55e90ac82ffb16a18b \
|
||||
@@ -79,27 +79,23 @@ packaging==26.0 \
|
||||
--hash=sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4 \
|
||||
--hash=sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529
|
||||
# via pytest
|
||||
platformdirs==4.9.4 \
|
||||
--hash=sha256:1ec356301b7dc906d83f371c8f487070e99d3ccf9e501686456394622a01a934 \
|
||||
--hash=sha256:68a9a4619a666ea6439f2ff250c12a853cd1cbd5158d258bd824a7df6be2f868
|
||||
platformdirs==4.9.6 \
|
||||
--hash=sha256:3bfa75b0ad0db84096ae777218481852c0ebc6c727b3168c1b9e0118e458cf0a \
|
||||
--hash=sha256:e61adb1d5e5cb3441b4b7710bea7e4c12250ca49439228cc1021c00dcfac0917
|
||||
# via lrx-cli
|
||||
pluggy==1.6.0 \
|
||||
--hash=sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3 \
|
||||
--hash=sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746
|
||||
# via pytest
|
||||
pygments==2.19.2 \
|
||||
--hash=sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887 \
|
||||
--hash=sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b
|
||||
pygments==2.20.0 \
|
||||
--hash=sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f \
|
||||
--hash=sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176
|
||||
# via
|
||||
# pytest
|
||||
# rich
|
||||
pytest==9.0.2 \
|
||||
--hash=sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b \
|
||||
--hash=sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11
|
||||
python-dotenv==1.2.2 \
|
||||
--hash=sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a \
|
||||
--hash=sha256:2c371a91fbd7ba082c2c1dc1f8bf89ca22564a087c2c287cd9b662adde799cf3
|
||||
# via lrx-cli
|
||||
pytest==9.0.3 \
|
||||
--hash=sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9 \
|
||||
--hash=sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c
|
||||
rich==14.3.3 \
|
||||
--hash=sha256:793431c1f8619afa7d3b52b2cdec859562b950ea0d4b6b505397612db8d5362d \
|
||||
--hash=sha256:b8daa0b9e4eef54dd8cf7c86c03713f53241884e814f4e2f5fb342fe520f639b
|
||||
@@ -110,25 +106,25 @@ rich-rst==1.3.2 \
|
||||
--hash=sha256:a1196fdddf1e364b02ec68a05e8ff8f6914fee10fbca2e6b6735f166bb0da8d4 \
|
||||
--hash=sha256:a99b4907cbe118cf9d18b0b44de272efa61f15117c61e39ebdc431baf5df722a
|
||||
# via cyclopts
|
||||
ruff==0.15.8 \
|
||||
--hash=sha256:04f79eff02a72db209d47d665ba7ebcad609d8918a134f86cb13dd132159fc89 \
|
||||
--hash=sha256:0f29b989a55572fb885b77464cf24af05500806ab4edf9a0fd8977f9759d85b1 \
|
||||
--hash=sha256:12e617fc01a95e5821648a6df341d80456bd627bfab8a829f7cfc26a14a4b4a3 \
|
||||
--hash=sha256:2033f963c43949d51e6fdccd3946633c6b37c484f5f98c3035f49c27395a8ab8 \
|
||||
--hash=sha256:432701303b26416d22ba696c39f2c6f12499b89093b61360abc34bcc9bf07762 \
|
||||
--hash=sha256:6ee3ae5c65a42f273f126686353f2e08ff29927b7b7e203b711514370d500de3 \
|
||||
--hash=sha256:75e5cd06b1cf3f47a3996cfc999226b19aa92e7cce682dcd62f80d7035f98f49 \
|
||||
--hash=sha256:8d9a5b8ea13f26ae90838afc33f91b547e61b794865374f114f349e9036835fb \
|
||||
--hash=sha256:995f11f63597ee362130d1d5a327a87cb6f3f5eae3094c620bcc632329a4d26e \
|
||||
--hash=sha256:ac51d486bf457cdc985a412fb1801b2dfd1bd8838372fc55de64b1510eff4bec \
|
||||
--hash=sha256:bc1f0a51254ba21767bfa9a8b5013ca8149dcf38092e6a9eb704d876de94dc34 \
|
||||
--hash=sha256:c2a33a529fb3cbc23a7124b5c6ff121e4d6228029cba374777bd7649cc8598b8 \
|
||||
--hash=sha256:c9861eb959edab053c10ad62c278835ee69ca527b6dcd72b47d5c1e5648964f6 \
|
||||
--hash=sha256:cbe05adeba76d58162762d6b239c9056f1a15a55bd4b346cfd21e26cd6ad7bc7 \
|
||||
--hash=sha256:cf891fa8e3bb430c0e7fac93851a5978fc99c8fa2c053b57b118972866f8e5f2 \
|
||||
--hash=sha256:d3e3d0b6ba8dca1b7ef9ab80a28e840a20070c4b62e56d675c24f366ef330570 \
|
||||
--hash=sha256:d910ae974b7a06a33a057cb87d2a10792a3b2b3b35e33d2699fdf63ec8f6b17a \
|
||||
--hash=sha256:fdce027ada77baa448077ccc6ebb2fa9c3c62fd110d8659d601cf2f475858d94
|
||||
ruff==0.15.10 \
|
||||
--hash=sha256:0744e31482f8f7d0d10a11fcbf897af272fefdfcb10f5af907b18c2813ff4d5f \
|
||||
--hash=sha256:0ee3ef42dab7078bda5ff6a1bcba8539e9857deb447132ad5566a038674540d0 \
|
||||
--hash=sha256:136c00ca2f47b0018b073f28cb5c1506642a830ea941a60354b0e8bc8076b151 \
|
||||
--hash=sha256:28cb32d53203242d403d819fd6983152489b12e4a3ae44993543d6fe62ab42ed \
|
||||
--hash=sha256:51cb8cc943e891ba99989dd92d61e29b1d231e14811db9be6440ecf25d5c1609 \
|
||||
--hash=sha256:601d1610a9e1f1c2165a4f561eeaa2e2ea1e97f3287c5aa258d3dab8b57c6188 \
|
||||
--hash=sha256:8154d43684e4333360fedd11aaa40b1b08a4e37d8ffa9d95fee6fa5b37b6fab1 \
|
||||
--hash=sha256:83e1dd04312997c99ea6965df66a14fb4f03ba978564574ffc68b0d61fd3989e \
|
||||
--hash=sha256:8ab88715f3a6deb6bde6c227f3a123410bec7b855c3ae331b4c006189e895cef \
|
||||
--hash=sha256:8b80a2f3c9c8a950d6237f2ca12b206bccff626139be9fa005f14feb881a1ae8 \
|
||||
--hash=sha256:93cc06a19e5155b4441dd72808fdf84290d84ad8a39ca3b0f994363ade4cebb1 \
|
||||
--hash=sha256:a768ff5969b4f44c349d48edf4ab4f91eddb27fd9d77799598e130fb628aa158 \
|
||||
--hash=sha256:b0c52744cf9f143a393e284125d2576140b68264a93c6716464e129a3e9adb48 \
|
||||
--hash=sha256:b1e7c16ea0ff5a53b7c2df52d947e685973049be1cdfe2b59a9c43601897b22e \
|
||||
--hash=sha256:d1f86e67ebfdef88e00faefa1552b5e510e1d35f3be7d423dc7e84e63788c94e \
|
||||
--hash=sha256:d4272e87e801e9a27a2e8df7b21011c909d9ddd82f4f3281d269b6ba19789ca5 \
|
||||
--hash=sha256:e3e53c588164dc025b671c9df2462429d60357ea91af7e92e9d56c565a9f1b07 \
|
||||
--hash=sha256:e59c9bdc056a320fb9ea1700a8d591718b8faf78af065484e801258d3a76bc3f
|
||||
win32-setctime==1.2.0 ; sys_platform == 'win32' \
|
||||
--hash=sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390 \
|
||||
--hash=sha256:ae1fdf948f5640aae05c511ade119313fb6a30d7eabe25fef9764dca5873c4c0
|
||||
|
||||
@@ -29,6 +29,7 @@ from .lrc import get_sidecar_path
|
||||
from .watch import WatchCoordinator
|
||||
from .watch.control import ControlClient, parse_delta
|
||||
from .watch.view.pipe import PipeOutput
|
||||
from .watch.view.print import PrintOutput
|
||||
|
||||
|
||||
app = cyclopts.App(
|
||||
@@ -432,6 +433,37 @@ def pipe(
|
||||
logger.info("Watch stopped.")
|
||||
|
||||
|
||||
@watch_app.command(name="print")
|
||||
def watch_print(
|
||||
plain: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(
|
||||
name="--plain",
|
||||
negative="",
|
||||
help="Output plain text (strips all tags). Takes priority over --normalize.",
|
||||
),
|
||||
] = False,
|
||||
) -> None:
|
||||
"""Watch active player and print all lyrics to stdout once per track change."""
|
||||
logger.info(
|
||||
"Starting watch print (player filter: {})",
|
||||
_player or "<none>",
|
||||
)
|
||||
output = PrintOutput(plain=plain)
|
||||
try:
|
||||
session = WatchCoordinator(
|
||||
manager,
|
||||
output,
|
||||
player_hint=_player,
|
||||
config=_app_config,
|
||||
)
|
||||
success = asyncio.run(session.run())
|
||||
if not success:
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Watch stopped.")
|
||||
|
||||
|
||||
@ctl_app.command
|
||||
def offset(delta: str) -> None:
|
||||
"""Adjust watch offset. Examples: +200, -200, 0."""
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
"""Watch subsystem public exports."""
|
||||
|
||||
from .session import WatchCoordinator
|
||||
|
||||
__all__ = ["WatchCoordinator"]
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
"""Unix-socket control channel for communicating with a running watch session."""
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-04-10 08:14:58
|
||||
Description: Unix-socket control channel for communicating with a running watch session.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
@@ -40,14 +44,17 @@ class ControlServer:
|
||||
return True
|
||||
|
||||
try:
|
||||
# probe the socket to distinguish a live session from a stale socket file
|
||||
reader, writer = await asyncio.open_unix_connection(str(self._socket_path))
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
# connection succeeded → another watch session is actively listening
|
||||
logger.error(
|
||||
"A watch session is already running. Use 'lrx watch ctl status'."
|
||||
)
|
||||
return False
|
||||
except Exception:
|
||||
# connection refused / file is stale → safe to remove and reuse
|
||||
try:
|
||||
self._socket_path.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
@@ -136,6 +143,8 @@ def parse_delta(raw: str) -> tuple[bool, int | None, str | None]:
|
||||
if value.startswith("+"):
|
||||
return True, int(value[1:]), None
|
||||
if value.startswith("-"):
|
||||
# keep the sign by negating; bare int() would accept "-123" too but
|
||||
# explicit split is clearer about intent and avoids double-negative edge cases
|
||||
return True, -int(value[1:]), None
|
||||
return True, int(value), None
|
||||
except ValueError:
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
"""Debounced lyric fetch orchestration for watch session."""
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-04-10 08:14:41
|
||||
Description: Debounced lyric fetch orchestration for watch session.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Awaitable, Callable, Optional
|
||||
@@ -50,6 +54,7 @@ class LyricFetcher:
|
||||
"""Request lyrics for track with debounce collapsing."""
|
||||
self._pending_track = track
|
||||
if self._debounce_task is not None:
|
||||
# cancel any pending debounce window — the new request supersedes it
|
||||
self._debounce_task.cancel()
|
||||
self._debounce_task = asyncio.create_task(self._debounce_then_fetch())
|
||||
|
||||
@@ -61,6 +66,7 @@ class LyricFetcher:
|
||||
return
|
||||
|
||||
if self._fetch_task is not None:
|
||||
# abort any in-flight fetch for a previous track before starting the new one
|
||||
self._fetch_task.cancel()
|
||||
await asyncio.gather(self._fetch_task, return_exceptions=True)
|
||||
|
||||
@@ -68,6 +74,7 @@ class LyricFetcher:
|
||||
|
||||
async def _do_fetch(self, track: TrackMeta) -> None:
|
||||
"""Execute fetch lifecycle callbacks and fetch lyrics for a track."""
|
||||
# callbacks may be plain functions or coroutines — handle both
|
||||
fetching_callback_result = self._on_fetching()
|
||||
if asyncio.iscoroutine(fetching_callback_result):
|
||||
await fetching_callback_result
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
"""Player discovery, state monitoring, and active-player selection for watch mode."""
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-04-10 08:14:27
|
||||
Description: Player discovery, state monitoring, and active-player selection for watch mode.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, Optional
|
||||
@@ -219,6 +223,7 @@ class PlayerMonitor:
|
||||
trackid = metadata.get("mpris:trackid")
|
||||
if trackid is not None:
|
||||
trackid = _variant_value(trackid)
|
||||
# normalize Spotify track IDs — the raw MPRIS value varies by client version
|
||||
if isinstance(trackid, str) and trackid.startswith("spotify:track:"):
|
||||
trackid = trackid.removeprefix("spotify:track:")
|
||||
elif isinstance(trackid, str) and trackid.startswith("/com/spotify/track/"):
|
||||
@@ -230,12 +235,14 @@ class PlayerMonitor:
|
||||
length_ms = None
|
||||
length_value = _variant_value(length) if length is not None else None
|
||||
if isinstance(length_value, int):
|
||||
# MPRIS reports length in microseconds; convert to milliseconds
|
||||
length_ms = length_value // 1000
|
||||
|
||||
artist = metadata.get("xesam:artist")
|
||||
artist_v = None
|
||||
artist_value = _variant_value(artist) if artist is not None else None
|
||||
if isinstance(artist_value, list) and artist_value:
|
||||
# xesam:artist is a list; take the first entry as primary artist
|
||||
artist_v = artist_value[0]
|
||||
|
||||
title = metadata.get("xesam:title")
|
||||
@@ -286,10 +293,14 @@ class PlayerMonitor:
|
||||
async def _resolve_well_known_name(self, unique_sender: str) -> str | None:
|
||||
"""Map a DBus unique sender (e.g. :1.42) to a tracked MPRIS bus name."""
|
||||
if unique_sender in self.players:
|
||||
# sender is already a well-known name we track (unlikely but fast path)
|
||||
return unique_sender
|
||||
if not self._bus:
|
||||
return None
|
||||
|
||||
# Seeked signals arrive with the unique connection name (:1.N), not the
|
||||
# well-known bus name (org.mpris.MediaPlayer2.X). Ask D-Bus which
|
||||
# well-known name owns that unique name.
|
||||
for bus_name in self.players:
|
||||
try:
|
||||
reply = await self._bus.call(
|
||||
@@ -325,6 +336,7 @@ class PlayerMonitor:
|
||||
message.interface == "org.freedesktop.DBus"
|
||||
and message.member == "NameOwnerChanged"
|
||||
):
|
||||
# a player appeared or disappeared — rescan the full player list
|
||||
if message.body and str(message.body[0]).startswith(
|
||||
"org.mpris.MediaPlayer2."
|
||||
):
|
||||
@@ -335,7 +347,9 @@ class PlayerMonitor:
|
||||
message.interface == "org.freedesktop.DBus.Properties"
|
||||
and message.member == "PropertiesChanged"
|
||||
):
|
||||
# Message.sender is a DBus unique name, so match by path+iface.
|
||||
# message.sender is a unique connection name, not the well-known bus
|
||||
# name, so we can't filter by sender here — match by object path and
|
||||
# interface instead to scope it to MPRIS Player properties only
|
||||
path_ok = message.path == "/org/mpris/MediaPlayer2"
|
||||
iface = message.body[0] if message.body else None
|
||||
if path_ok and iface == "org.mpris.MediaPlayer2.Player":
|
||||
@@ -348,6 +362,7 @@ class PlayerMonitor:
|
||||
):
|
||||
sender = message.sender or ""
|
||||
if sender and message.body:
|
||||
# MPRIS Seeked position is in microseconds; convert to ms
|
||||
position_us = int(message.body[0])
|
||||
asyncio.create_task(
|
||||
self._handle_seeked_signal(
|
||||
@@ -391,15 +406,19 @@ class ActivePlayerSelector:
|
||||
|
||||
playing = [name for name, st in players.items() if st.status == "Playing"]
|
||||
if len(playing) == 1:
|
||||
# unambiguous — only one player is currently playing
|
||||
return playing[0]
|
||||
|
||||
preferred = preferred_player.lower().strip()
|
||||
# when multiple players are playing, narrow candidates to those; otherwise
|
||||
# fall back to all known players so a paused preferred player still wins
|
||||
candidates = playing if playing else list(players.keys())
|
||||
if preferred:
|
||||
for name in candidates:
|
||||
if preferred in name.lower():
|
||||
return name
|
||||
|
||||
# preserve the last selection to avoid jitter when nothing else changes
|
||||
if last_active and last_active in players:
|
||||
return last_active
|
||||
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
"""Watch orchestration with explicit MVVM role boundaries.
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-04-10 08:10:52
|
||||
Description: Watch orchestration with explicit MVVM role boundaries.
|
||||
|
||||
- Model: WatchModel stores domain state.
|
||||
- ViewModel: WatchViewModel projects model to output-facing state/signature.
|
||||
- Coordinator: WatchCoordinator wires services and drives async workflows.
|
||||
- Model: WatchModel stores domain state.
|
||||
- ViewModel: WatchViewModel projects model to output-facing state/signature.
|
||||
- Coordinator: WatchCoordinator wires services and drives async workflows.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -17,7 +20,7 @@ from ..models import TrackMeta
|
||||
from .control import ControlServer
|
||||
from .fetcher import LyricFetcher
|
||||
from ..config import AppConfig
|
||||
from .view import BaseOutput, LyricView, WatchState
|
||||
from .view import BaseOutput, LyricView, WatchState, WatchStatus
|
||||
from .player import ActivePlayerSelector, PlayerMonitor, PlayerTarget
|
||||
from .tracker import PositionTracker
|
||||
|
||||
@@ -28,14 +31,14 @@ class WatchModel:
|
||||
offset_ms: int
|
||||
active_player: str | None
|
||||
active_track_key: str | None
|
||||
status: str
|
||||
status: WatchStatus
|
||||
lyrics: LyricView | None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.offset_ms = 0
|
||||
self.active_player: str | None = None
|
||||
self.active_track_key: str | None = None
|
||||
self.status: str = "idle"
|
||||
self.status: WatchStatus = WatchStatus.IDLE
|
||||
self.lyrics: LyricView | None = None
|
||||
|
||||
def set_lyrics(self, lyrics: LRCData | None) -> None:
|
||||
@@ -48,6 +51,8 @@ class WatchModel:
|
||||
|
||||
def state_signature(self, track: TrackMeta | None, position_ms: int) -> tuple:
|
||||
"""Build dedupe signature from model state and current lyric cursor."""
|
||||
# prefer trackid when available; fall back to display name for players
|
||||
# that don't expose a stable ID (e.g. some MPRIS implementations)
|
||||
track_key = (
|
||||
track.trackid
|
||||
if track and track.trackid
|
||||
@@ -56,7 +61,8 @@ class WatchModel:
|
||||
else None
|
||||
)
|
||||
|
||||
if self.status != "ok" or self.lyrics is None:
|
||||
if self.status != WatchStatus.OK or self.lyrics is None:
|
||||
# non-OK states don't have cursor position — discriminate by status alone
|
||||
return ("status", self.status, self.active_player, track_key)
|
||||
at_ms = position_ms + self.offset_ms
|
||||
cursor = self.lyrics.signature_cursor(at_ms)
|
||||
@@ -82,7 +88,7 @@ class WatchViewModel:
|
||||
lyrics=self._model.lyrics,
|
||||
position_ms=position_ms,
|
||||
offset_ms=self._model.offset_ms,
|
||||
status=self._model.status, # type: ignore[arg-type]
|
||||
status=self._model.status,
|
||||
)
|
||||
|
||||
|
||||
@@ -164,7 +170,9 @@ class WatchCoordinator:
|
||||
await self._player_monitor.start()
|
||||
await self._tracker.start()
|
||||
self._calibration_task = asyncio.create_task(self._calibration_loop())
|
||||
# emit once at startup so outputs don't sit blank until the first event
|
||||
self._schedule_emit()
|
||||
# block forever; CancelledError from signal handler exits the loop cleanly
|
||||
await asyncio.Event().wait()
|
||||
return True
|
||||
except asyncio.CancelledError:
|
||||
@@ -206,8 +214,10 @@ class WatchCoordinator:
|
||||
if track is None:
|
||||
return False
|
||||
if self._model.lyrics is not None:
|
||||
# lyrics already loaded — nothing to fetch
|
||||
return False
|
||||
if self._model.status == "fetching":
|
||||
if self._model.status == WatchStatus.FETCHING:
|
||||
# a fetch is already in flight — don't queue another
|
||||
return False
|
||||
logger.info("fetching lyrics for track ({}): {}", reason, track.display_name())
|
||||
self._fetcher.request(track)
|
||||
@@ -246,7 +256,7 @@ class WatchCoordinator:
|
||||
)
|
||||
|
||||
if selected is None:
|
||||
self._model.status = "idle"
|
||||
self._model.status = WatchStatus.IDLE
|
||||
self._model.active_track_key = None
|
||||
self._model.set_lyrics(None)
|
||||
self._schedule_emit()
|
||||
@@ -254,7 +264,7 @@ class WatchCoordinator:
|
||||
|
||||
state = self._player_monitor.players.get(selected)
|
||||
if state is None:
|
||||
self._model.status = "idle"
|
||||
self._model.status = WatchStatus.IDLE
|
||||
self._model.active_track_key = None
|
||||
self._model.set_lyrics(None)
|
||||
self._schedule_emit()
|
||||
@@ -272,6 +282,7 @@ class WatchCoordinator:
|
||||
track_changed = track_key != prev_track_key
|
||||
player_changed = selected != prev_player
|
||||
if track_changed or player_changed:
|
||||
# clear stale lyrics immediately so the old track's lines don't flash
|
||||
self._model.set_lyrics(None)
|
||||
|
||||
self._model.active_track_key = track_key
|
||||
@@ -284,27 +295,20 @@ class WatchCoordinator:
|
||||
)
|
||||
)
|
||||
|
||||
if state.status != "Playing":
|
||||
self._model.status = "paused"
|
||||
self._schedule_emit()
|
||||
return
|
||||
|
||||
# only fetch on identity change — calibration ticks must not re-trigger fetches
|
||||
started_fetch = False
|
||||
if track is not None and (player_changed or track_changed):
|
||||
started_fetch = self._request_fetch_for_active_track("track-changed")
|
||||
elif (
|
||||
track is not None
|
||||
and self._model.lyrics is None
|
||||
and self._model.status == "paused"
|
||||
):
|
||||
started_fetch = self._request_fetch_for_active_track("resume-playing")
|
||||
|
||||
# derive status from what actually happened this tick; preserve FETCHING
|
||||
# if an in-flight request was started before this snapshot arrived
|
||||
if self._model.lyrics is not None:
|
||||
self._model.status = "ok"
|
||||
self._model.status = WatchStatus.OK
|
||||
elif started_fetch:
|
||||
self._model.status = "fetching"
|
||||
elif self._model.status != "fetching":
|
||||
self._model.status = "no_lyrics"
|
||||
self._model.status = WatchStatus.FETCHING
|
||||
elif self._model.status != WatchStatus.FETCHING:
|
||||
# don't overwrite FETCHING with NO_LYRICS while a request is in flight
|
||||
self._model.status = WatchStatus.NO_LYRICS
|
||||
self._schedule_emit()
|
||||
|
||||
def _on_seeked(self, bus_name: str, position_ms: int) -> None:
|
||||
@@ -312,29 +316,18 @@ class WatchCoordinator:
|
||||
asyncio.create_task(self._tracker.on_seeked(bus_name, position_ms))
|
||||
|
||||
def _on_playback_status(self, bus_name: str, status: str) -> None:
|
||||
"""React to playback status change and tracker sync."""
|
||||
if bus_name == self._model.active_player:
|
||||
if status == "Playing":
|
||||
started_fetch = self._request_fetch_for_active_track("resume-playing")
|
||||
if self._model.lyrics is not None:
|
||||
self._model.status = "ok"
|
||||
elif started_fetch:
|
||||
self._model.status = "fetching"
|
||||
elif self._model.status != "fetching":
|
||||
self._model.status = "no_lyrics"
|
||||
else:
|
||||
self._model.status = "paused"
|
||||
self._schedule_emit()
|
||||
"""Forward playback status change to position tracker."""
|
||||
asyncio.create_task(self._tracker.on_playback_status(bus_name, status))
|
||||
|
||||
def _on_tracker_tick(self) -> None:
|
||||
"""Emit updates from tracker tick only while lyrics are actively rendering."""
|
||||
if self._model.status == "ok":
|
||||
if self._model.status == WatchStatus.OK and self._output.position_sensitive:
|
||||
self._schedule_emit()
|
||||
|
||||
def _schedule_emit(self) -> None:
|
||||
"""Coalesce frequent events into at most one in-flight emit task."""
|
||||
if self._emit_scheduled:
|
||||
# a task is already queued; it will pick up the latest model state when it runs
|
||||
return
|
||||
self._emit_scheduled = True
|
||||
asyncio.create_task(self._run_scheduled_emit())
|
||||
@@ -344,17 +337,20 @@ class WatchCoordinator:
|
||||
try:
|
||||
await self._emit_state()
|
||||
finally:
|
||||
# release the gate even on error so future events can still schedule
|
||||
self._emit_scheduled = False
|
||||
|
||||
async def _on_fetching(self) -> None:
|
||||
"""Mark model as fetching and emit state."""
|
||||
self._model.status = "fetching"
|
||||
self._model.status = WatchStatus.FETCHING
|
||||
await self._emit_state()
|
||||
|
||||
async def _on_lyrics_update(self, lyrics: Optional[LRCData]) -> None:
|
||||
"""Update model with fetched lyrics and emit state."""
|
||||
self._model.set_lyrics(lyrics)
|
||||
self._model.status = "ok" if lyrics is not None else "no_lyrics"
|
||||
self._model.status = (
|
||||
WatchStatus.OK if lyrics is not None else WatchStatus.NO_LYRICS
|
||||
)
|
||||
logger.info(
|
||||
"lyrics update result: {}",
|
||||
"found" if lyrics is not None else "not found",
|
||||
@@ -365,10 +361,17 @@ class WatchCoordinator:
|
||||
"""Emit output state only when semantic signature changes."""
|
||||
player = self._player_monitor.players.get(self._model.active_player or "")
|
||||
track = player.track if player else None
|
||||
position = await self._tracker.get_position_ms()
|
||||
# position=0 for non-position-sensitive outputs so the signature is stable
|
||||
# across ticks and on_state fires at most once per track+status transition
|
||||
position = (
|
||||
await self._tracker.get_position_ms()
|
||||
if self._output.position_sensitive
|
||||
else 0
|
||||
)
|
||||
|
||||
signature = self._view_model.signature(track, position)
|
||||
if signature == self._last_emit_signature:
|
||||
# state hasn't changed semantically — skip redundant render
|
||||
return
|
||||
self._last_emit_signature = signature
|
||||
state = self._view_model.state(track, position)
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
"""Playback position tracking utilities for watch mode."""
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-04-10 08:13:35
|
||||
Description: Playback position tracking utilities for watch mode.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
@@ -69,7 +73,10 @@ class PositionTracker:
|
||||
self._is_playing = playback_status == "Playing"
|
||||
status_changed_to_playing = self._is_playing and not was_playing
|
||||
if player_changed or track_changed:
|
||||
# reset to 0 so stale position from a previous track doesn't bleed through
|
||||
self._position_ms = 0
|
||||
# only poll MPRIS when something changed and the player is actually running;
|
||||
# avoids an unnecessary D-Bus round-trip on every calibration-loop tick
|
||||
should_calibrate_now = (
|
||||
self._is_playing
|
||||
and bool(self._active_player)
|
||||
@@ -97,6 +104,7 @@ class PositionTracker:
|
||||
return
|
||||
was_playing = self._is_playing
|
||||
self._is_playing = playback_status == "Playing"
|
||||
# re-anchor last_tick when resuming so the gap while paused isn't counted
|
||||
should_calibrate_now = self._is_playing and not was_playing
|
||||
self._last_tick = time.monotonic()
|
||||
|
||||
@@ -112,10 +120,13 @@ class PositionTracker:
|
||||
async with self._lock:
|
||||
now = time.monotonic()
|
||||
if self._is_playing and self._active_player:
|
||||
# accumulate elapsed wall-clock time as playback position;
|
||||
# seek events and calibration snapshots correct drift periodically
|
||||
delta_ms = int((now - self._last_tick) * 1000)
|
||||
if delta_ms > 0:
|
||||
self._position_ms += delta_ms
|
||||
should_notify = True
|
||||
# always update last_tick so paused time isn't counted on resume
|
||||
self._last_tick = now
|
||||
|
||||
if should_notify and self._on_tick is not None:
|
||||
|
||||
@@ -3,12 +3,20 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from bisect import bisect_right
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal, Optional
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from ...lrc import LRCData, LyricLine
|
||||
from ...models import TrackMeta
|
||||
|
||||
|
||||
class WatchStatus(str, Enum):
|
||||
IDLE = "idle"
|
||||
FETCHING = "fetching"
|
||||
OK = "ok"
|
||||
NO_LYRICS = "no_lyrics"
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class LyricView:
|
||||
"""View-ready immutable lyric data projected from one normalized LRC object."""
|
||||
@@ -29,13 +37,16 @@ class LyricView:
|
||||
line_index = 0
|
||||
for line in normalized.lines:
|
||||
if not isinstance(line, LyricLine):
|
||||
# skip metadata/tag lines that carry no renderable text
|
||||
continue
|
||||
text = line.text
|
||||
lines.append(text)
|
||||
# use first timestamp; clamp to 0 so bisect always works with non-negative ms
|
||||
timestamp = line.line_times_ms[0] if line.line_times_ms else 0
|
||||
entries.append((max(0, timestamp), line_index))
|
||||
line_index += 1
|
||||
|
||||
# extract timestamps into a flat tuple so bisect_right can binary-search it
|
||||
timestamps = tuple(timestamp for timestamp, _ in entries)
|
||||
return LyricView(
|
||||
normalized=normalized,
|
||||
@@ -47,12 +58,16 @@ class LyricView:
|
||||
def signature_cursor(self, at_ms: int) -> tuple:
|
||||
"""Build a stable cursor signature for dedupe decisions."""
|
||||
if not self.timed_line_entries:
|
||||
# untimed lyrics: signature is the full line set — changes only on track change
|
||||
return ("plain", self.lines)
|
||||
|
||||
first_ts = self.timed_line_entries[0][0]
|
||||
if at_ms < first_ts:
|
||||
# playback hasn't reached the first lyric yet; hold until it does
|
||||
return ("before_first", first_ts)
|
||||
|
||||
# bisect_right gives the insertion point after equal timestamps, so -1 gives
|
||||
# the last line whose timestamp <= at_ms (i.e. the currently active line)
|
||||
idx = bisect_right(self.timestamps, at_ms) - 1
|
||||
if idx < 0:
|
||||
idx = 0
|
||||
@@ -70,10 +85,15 @@ class WatchState:
|
||||
lyrics: Optional[LyricView]
|
||||
position_ms: int
|
||||
offset_ms: int
|
||||
status: Literal["fetching", "ok", "no_lyrics", "paused", "idle"]
|
||||
status: WatchStatus
|
||||
|
||||
|
||||
class BaseOutput(ABC):
|
||||
# When False, the coordinator passes position=0 for signature computation and
|
||||
# skips tracker-tick-driven emits, so on_state fires at most once per
|
||||
# track+status transition rather than on every lyric cursor advance.
|
||||
position_sensitive: bool = True
|
||||
|
||||
@abstractmethod
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
"""Render or deliver one watch state frame."""
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
"""Pipe output implementation for watch mode."""
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-04-10 08:15:17
|
||||
Description: Pipe output implementation for watch mode.
|
||||
"""
|
||||
|
||||
from bisect import bisect_right
|
||||
from dataclasses import dataclass
|
||||
import sys
|
||||
|
||||
from . import BaseOutput, WatchState
|
||||
from . import BaseOutput, WatchState, WatchStatus
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
@@ -38,12 +42,14 @@ class PipeOutput(BaseOutput):
|
||||
effective_ms = state.position_ms + state.offset_ms
|
||||
current_line_idx: int | None
|
||||
if entries and effective_ms < entries[0][0]:
|
||||
# Before first timestamp, current lyric is empty and after-window shows upcoming lines.
|
||||
# playback hasn't reached the first lyric yet; treat current slot as empty
|
||||
# so the after-window can show upcoming lines without a "current" anchor
|
||||
current_line_idx = None
|
||||
else:
|
||||
if not entries:
|
||||
current_line_idx = 0
|
||||
else:
|
||||
# bisect_right - 1 gives the last entry whose timestamp <= effective_ms
|
||||
current_entry_idx = (
|
||||
bisect_right(state.lyrics.timestamps, effective_ms) - 1
|
||||
)
|
||||
@@ -54,6 +60,8 @@ class PipeOutput(BaseOutput):
|
||||
out: list[str] = []
|
||||
for rel in range(-self.before, self.after + 1):
|
||||
if current_line_idx is None:
|
||||
# before-first-timestamp: before/current slots are empty; after slots
|
||||
# show lines starting from index 0 (rel=1 → line 0, rel=2 → line 1, …)
|
||||
if rel <= 0:
|
||||
out.append("")
|
||||
continue
|
||||
@@ -70,17 +78,16 @@ class PipeOutput(BaseOutput):
|
||||
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
"""Render and flush one frame for the latest watch state."""
|
||||
if state.status == "fetching":
|
||||
if state.status == WatchStatus.FETCHING:
|
||||
lines = self._render_status("[fetching...]")
|
||||
elif state.status == "no_lyrics":
|
||||
elif state.status == WatchStatus.NO_LYRICS:
|
||||
lines = self._render_status("[no lyrics]")
|
||||
elif state.status == "paused":
|
||||
lines = self._render_status("[paused]")
|
||||
elif state.status == "idle":
|
||||
elif state.status == WatchStatus.IDLE:
|
||||
lines = self._render_status("[idle]")
|
||||
else:
|
||||
lines = self._render_lyrics(state)
|
||||
|
||||
for line in lines:
|
||||
# no_newline mode lets callers use \r to overwrite the previous frame in-place
|
||||
sys.stdout.write(line + ("\n" if not self.no_newline else ""))
|
||||
sys.stdout.flush()
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-04-10 08:15:31
|
||||
Description: Print output implementation for watch mode — one shot per track.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
from . import BaseOutput, WatchState, WatchStatus
|
||||
|
||||
|
||||
class PrintOutput(BaseOutput):
|
||||
"""Emit full lyrics to stdout once per track transition, then stay silent.
|
||||
|
||||
Deduplication is delegated to the coordinator via position_sensitive=False:
|
||||
the coordinator uses a fixed position for signatures, so on_state fires at
|
||||
most once per (status, track_key) transition rather than on every tick.
|
||||
"""
|
||||
|
||||
# fixed position=0 in signatures → coordinator calls on_state only on
|
||||
# track/status transitions, never on lyric cursor advances
|
||||
position_sensitive = False
|
||||
|
||||
plain: bool
|
||||
|
||||
def __init__(self, plain: bool = False) -> None:
|
||||
self.plain = plain
|
||||
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
if state.status == WatchStatus.FETCHING or state.status == WatchStatus.IDLE:
|
||||
return
|
||||
|
||||
if state.status == WatchStatus.NO_LYRICS:
|
||||
# emit a blank line as a machine-readable sentinel for "track changed, no lyrics"
|
||||
sys.stdout.write("\n")
|
||||
sys.stdout.flush()
|
||||
elif state.status == WatchStatus.OK and state.lyrics is not None:
|
||||
lrc = state.lyrics.normalized
|
||||
if self.plain:
|
||||
text = lrc.to_plain()
|
||||
else:
|
||||
text = str(lrc)
|
||||
sys.stdout.write(text + "\n")
|
||||
sys.stdout.flush()
|
||||
+426
-269
@@ -2,20 +2,22 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from lrx_cli.lrc import LRCData
|
||||
from lrx_cli.models import TrackMeta
|
||||
from lrx_cli.watch.control import ControlClient, ControlServer, parse_delta
|
||||
from lrx_cli.watch.view import BaseOutput, LyricView, WatchState
|
||||
from lrx_cli.watch.view import BaseOutput, LyricView, WatchState, WatchStatus
|
||||
from lrx_cli.watch.view.pipe import PipeOutput
|
||||
from lrx_cli.watch.view.print import PrintOutput
|
||||
from lrx_cli.watch.player import ActivePlayerSelector, PlayerState, PlayerTarget
|
||||
from lrx_cli.watch.fetcher import LyricFetcher
|
||||
from lrx_cli.config import AppConfig
|
||||
from lrx_cli.watch.tracker import PositionTracker
|
||||
from lrx_cli.watch.session import WatchCoordinator
|
||||
|
||||
|
||||
TEST_CONFIG = AppConfig()
|
||||
BUS = "org.mpris.MediaPlayer2.spotify"
|
||||
|
||||
|
||||
def test_parse_delta_supports_plus_minus_and_reset() -> None:
|
||||
@@ -24,16 +26,17 @@ def test_parse_delta_supports_plus_minus_and_reset() -> None:
|
||||
assert parse_delta("0") == (True, 0, None)
|
||||
|
||||
|
||||
# PlayerTarget
|
||||
|
||||
|
||||
def test_player_target_allows_all_when_hint_empty() -> None:
|
||||
target = PlayerTarget()
|
||||
|
||||
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
|
||||
assert target.allows("org.mpris.MediaPlayer2.mpd") is True
|
||||
|
||||
|
||||
def test_player_target_filters_by_case_insensitive_substring() -> None:
|
||||
target = PlayerTarget("Spot")
|
||||
|
||||
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
|
||||
assert target.allows("org.mpris.MediaPlayer2.mpd") is False
|
||||
|
||||
@@ -43,60 +46,72 @@ def test_player_target_reports_blacklisted_hint() -> None:
|
||||
assert target.validation_error() is not None
|
||||
|
||||
|
||||
def test_player_target_non_blacklisted_hint_is_valid() -> None:
|
||||
target = PlayerTarget("mpd", player_blacklist=("spotify",))
|
||||
assert target.validation_error() is None
|
||||
|
||||
|
||||
# ActivePlayerSelector
|
||||
|
||||
|
||||
def _ps(bus: str, status: str = "Playing") -> PlayerState:
|
||||
return PlayerState(bus_name=bus, status=status, track=TrackMeta(title="T"))
|
||||
|
||||
|
||||
def test_active_player_selector_returns_none_when_no_players() -> None:
|
||||
assert ActivePlayerSelector.select({}, None, "spotify") is None
|
||||
|
||||
|
||||
def test_active_player_selector_prefers_single_playing() -> None:
|
||||
players = {
|
||||
"org.mpris.MediaPlayer2.foo": PlayerState(
|
||||
bus_name="org.mpris.MediaPlayer2.foo",
|
||||
status="Paused",
|
||||
track=TrackMeta(title="A"),
|
||||
),
|
||||
"org.mpris.MediaPlayer2.bar": PlayerState(
|
||||
bus_name="org.mpris.MediaPlayer2.bar",
|
||||
status="Playing",
|
||||
track=TrackMeta(title="B"),
|
||||
),
|
||||
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"),
|
||||
"org.mpris.MediaPlayer2.bar": _ps("org.mpris.MediaPlayer2.bar", "Playing"),
|
||||
}
|
||||
assert (
|
||||
ActivePlayerSelector.select(players, None, TEST_CONFIG.general.preferred_player)
|
||||
ActivePlayerSelector.select(players, None, "spotify")
|
||||
== "org.mpris.MediaPlayer2.bar"
|
||||
)
|
||||
|
||||
|
||||
def test_active_player_selector_prefers_keyword_among_multiple_playing() -> None:
|
||||
players = {
|
||||
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo"),
|
||||
"org.mpris.MediaPlayer2.spotify": _ps("org.mpris.MediaPlayer2.spotify"),
|
||||
}
|
||||
assert (
|
||||
ActivePlayerSelector.select(players, None, "spotify")
|
||||
== "org.mpris.MediaPlayer2.spotify"
|
||||
)
|
||||
|
||||
|
||||
def test_active_player_selector_uses_last_active_when_no_playing() -> None:
|
||||
players = {
|
||||
"org.mpris.MediaPlayer2.foo": PlayerState(
|
||||
bus_name="org.mpris.MediaPlayer2.foo",
|
||||
status="Paused",
|
||||
track=TrackMeta(title="A"),
|
||||
),
|
||||
"org.mpris.MediaPlayer2.bar": PlayerState(
|
||||
bus_name="org.mpris.MediaPlayer2.bar",
|
||||
status="Stopped",
|
||||
track=TrackMeta(title="B"),
|
||||
),
|
||||
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"),
|
||||
"org.mpris.MediaPlayer2.bar": _ps("org.mpris.MediaPlayer2.bar", "Stopped"),
|
||||
}
|
||||
|
||||
assert (
|
||||
ActivePlayerSelector.select(
|
||||
players,
|
||||
"org.mpris.MediaPlayer2.bar",
|
||||
TEST_CONFIG.general.preferred_player,
|
||||
)
|
||||
ActivePlayerSelector.select(players, "org.mpris.MediaPlayer2.bar", "spotify")
|
||||
== "org.mpris.MediaPlayer2.bar"
|
||||
)
|
||||
|
||||
|
||||
def test_active_player_selector_falls_back_to_first_when_no_preference() -> None:
|
||||
players = {
|
||||
"org.mpris.MediaPlayer2.foo": _ps("org.mpris.MediaPlayer2.foo", "Paused"),
|
||||
}
|
||||
result = ActivePlayerSelector.select(players, None, "")
|
||||
assert result == "org.mpris.MediaPlayer2.foo"
|
||||
|
||||
|
||||
# PositionTracker
|
||||
|
||||
|
||||
def test_position_tracker_seeked_calibrates_immediately() -> None:
|
||||
async def _run() -> None:
|
||||
async def _poll(_bus: str):
|
||||
return 1200
|
||||
|
||||
tracker = PositionTracker(_poll, TEST_CONFIG)
|
||||
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=1200), TEST_CONFIG)
|
||||
await tracker.start()
|
||||
await tracker.set_active_player(
|
||||
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
|
||||
)
|
||||
await tracker.on_seeked("org.mpris.MediaPlayer2.foo", 3500)
|
||||
await tracker.set_active_player(BUS, "Playing", "track-A")
|
||||
await tracker.on_seeked(BUS, 3500)
|
||||
pos = await tracker.get_position_ms()
|
||||
await tracker.stop()
|
||||
assert pos >= 3500
|
||||
@@ -104,74 +119,56 @@ def test_position_tracker_seeked_calibrates_immediately() -> None:
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_position_tracker_playback_status_pause_stops_fast_growth() -> None:
|
||||
def test_position_tracker_pause_stops_position_growth() -> None:
|
||||
async def _run() -> None:
|
||||
async def _poll(_bus: str):
|
||||
return 0
|
||||
|
||||
tracker = PositionTracker(_poll, TEST_CONFIG)
|
||||
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=0), TEST_CONFIG)
|
||||
await tracker.start()
|
||||
await tracker.set_active_player(
|
||||
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
|
||||
)
|
||||
await tracker.set_active_player(BUS, "Playing", "track-A")
|
||||
await asyncio.sleep(0.08)
|
||||
before = await tracker.get_position_ms()
|
||||
|
||||
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Paused")
|
||||
await tracker.on_playback_status(BUS, "Paused")
|
||||
await asyncio.sleep(0.08)
|
||||
after = await tracker.get_position_ms()
|
||||
await tracker.stop()
|
||||
|
||||
assert before > 0
|
||||
assert after - before < 20
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_position_tracker_playback_status_playing_calibrates_once() -> None:
|
||||
def test_position_tracker_resume_via_playback_status_calibrates() -> None:
|
||||
async def _run() -> None:
|
||||
async def _poll(_bus: str):
|
||||
return 50000
|
||||
|
||||
tracker = PositionTracker(_poll, TEST_CONFIG)
|
||||
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=50000), TEST_CONFIG)
|
||||
await tracker.start()
|
||||
await tracker.set_active_player(
|
||||
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
|
||||
)
|
||||
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Playing")
|
||||
await tracker.set_active_player(BUS, "Paused", "track-A")
|
||||
await tracker.on_playback_status(BUS, "Playing")
|
||||
pos = await tracker.get_position_ms()
|
||||
await tracker.stop()
|
||||
|
||||
assert pos >= 50000
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_position_tracker_set_active_player_playing_calibrates_on_resume() -> None:
|
||||
def test_position_tracker_resume_via_set_active_player_calibrates() -> None:
|
||||
async def _run() -> None:
|
||||
async def _poll(_bus: str):
|
||||
return 42000
|
||||
|
||||
tracker = PositionTracker(_poll, TEST_CONFIG)
|
||||
tracker = PositionTracker(lambda _: asyncio.sleep(0, result=42000), TEST_CONFIG)
|
||||
await tracker.start()
|
||||
await tracker.set_active_player(
|
||||
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
|
||||
)
|
||||
await tracker.set_active_player(
|
||||
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
|
||||
)
|
||||
await tracker.set_active_player(BUS, "Paused", "track-A")
|
||||
await tracker.set_active_player(BUS, "Playing", "track-A")
|
||||
pos = await tracker.get_position_ms()
|
||||
await tracker.stop()
|
||||
|
||||
assert pos >= 42000
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
# ControlServer and ControlClient
|
||||
|
||||
|
||||
def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
|
||||
async def _run() -> None:
|
||||
class _Session:
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self.offset = 0
|
||||
|
||||
def handle_offset(self, delta: int) -> dict:
|
||||
@@ -183,14 +180,11 @@ def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
|
||||
|
||||
socket_path = tmp_path / "watch.sock"
|
||||
server = ControlServer(socket_path=str(socket_path))
|
||||
session = _Session()
|
||||
|
||||
await server.start(session) # type: ignore
|
||||
await server.start(_Session()) # type: ignore
|
||||
client = ControlClient(socket_path=str(socket_path))
|
||||
r1 = await client._send_async({"cmd": "offset", "delta": 200})
|
||||
r2 = await client._send_async({"cmd": "status"})
|
||||
await server.stop()
|
||||
|
||||
assert r1 == {"ok": True, "offset_ms": 200}
|
||||
assert r2["ok"] is True
|
||||
assert r2["offset_ms"] == 200
|
||||
@@ -198,260 +192,423 @@ def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_pipe_output_prints_fixed_window_for_status(capsys) -> None:
|
||||
output = PipeOutput(before=1, after=1)
|
||||
state = WatchState(
|
||||
track=None,
|
||||
lyrics=None,
|
||||
position_ms=0,
|
||||
offset_ms=0,
|
||||
status="fetching",
|
||||
# PipeOutput
|
||||
|
||||
|
||||
def _pipe_state(
|
||||
status: WatchStatus,
|
||||
lyrics: Optional[LRCData] = None,
|
||||
position_ms: int = 0,
|
||||
offset_ms: int = 0,
|
||||
track: Optional[TrackMeta] = None,
|
||||
) -> WatchState:
|
||||
return WatchState(
|
||||
track=track,
|
||||
lyrics=LyricView.from_lrc(lyrics) if lyrics else None,
|
||||
position_ms=position_ms,
|
||||
offset_ms=offset_ms,
|
||||
status=status,
|
||||
)
|
||||
|
||||
asyncio.run(output.on_state(state))
|
||||
|
||||
printed = capsys.readouterr().out
|
||||
assert printed == "\n[fetching...]\n\n"
|
||||
|
||||
|
||||
def test_pipe_output_uses_context_window_for_lyrics(capsys) -> None:
|
||||
output = PipeOutput(before=1, after=1)
|
||||
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
state = WatchState(
|
||||
track=TrackMeta(title="Song"),
|
||||
lyrics=LyricView.from_lrc(lyrics),
|
||||
position_ms=2100,
|
||||
offset_ms=0,
|
||||
status="ok",
|
||||
def test_pipe_output_fetching_renders_status_window(capsys) -> None:
|
||||
asyncio.run(
|
||||
PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.FETCHING))
|
||||
)
|
||||
|
||||
asyncio.run(output.on_state(state))
|
||||
|
||||
printed = capsys.readouterr().out
|
||||
assert printed == "a\nb\nc\n"
|
||||
assert capsys.readouterr().out == "\n[fetching...]\n\n"
|
||||
|
||||
|
||||
def test_pipe_output_shows_upcoming_lines_before_first_timestamp(capsys) -> None:
|
||||
output = PipeOutput(before=1, after=1)
|
||||
lyrics = LRCData("[00:02.00]a\n[00:03.00]b")
|
||||
state = WatchState(
|
||||
track=TrackMeta(title="Song"),
|
||||
lyrics=LyricView.from_lrc(lyrics),
|
||||
position_ms=0,
|
||||
offset_ms=0,
|
||||
status="ok",
|
||||
def test_pipe_output_no_lyrics_renders_status_window(capsys) -> None:
|
||||
asyncio.run(
|
||||
PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.NO_LYRICS))
|
||||
)
|
||||
|
||||
asyncio.run(output.on_state(state))
|
||||
|
||||
printed = capsys.readouterr().out
|
||||
assert printed == "\n\na\n"
|
||||
assert capsys.readouterr().out == "\n[no lyrics]\n\n"
|
||||
|
||||
|
||||
def test_pipe_output_first_line_keeps_before_region_empty(capsys) -> None:
|
||||
output = PipeOutput(before=1, after=1)
|
||||
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
state = WatchState(
|
||||
track=TrackMeta(title="Song"),
|
||||
lyrics=LyricView.from_lrc(lyrics),
|
||||
position_ms=1100,
|
||||
offset_ms=0,
|
||||
status="ok",
|
||||
def test_pipe_output_idle_renders_status_window(capsys) -> None:
|
||||
asyncio.run(PipeOutput(before=1, after=1).on_state(_pipe_state(WatchStatus.IDLE)))
|
||||
assert capsys.readouterr().out == "\n[idle]\n\n"
|
||||
|
||||
|
||||
def test_pipe_output_no_newline_mode(capsys) -> None:
|
||||
asyncio.run(
|
||||
PipeOutput(before=0, after=0, no_newline=True).on_state(
|
||||
_pipe_state(WatchStatus.FETCHING)
|
||||
)
|
||||
)
|
||||
|
||||
asyncio.run(output.on_state(state))
|
||||
|
||||
printed = capsys.readouterr().out
|
||||
assert printed == "\na\nb\n"
|
||||
assert capsys.readouterr().out == "[fetching...]"
|
||||
|
||||
|
||||
def test_pipe_output_last_line_keeps_after_region_empty(capsys) -> None:
|
||||
output = PipeOutput(before=1, after=1)
|
||||
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
state = WatchState(
|
||||
track=TrackMeta(title="Song"),
|
||||
lyrics=LyricView.from_lrc(lyrics),
|
||||
position_ms=3100,
|
||||
offset_ms=0,
|
||||
status="ok",
|
||||
def test_pipe_output_default_window_shows_current_line(capsys) -> None:
|
||||
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
asyncio.run(
|
||||
PipeOutput().on_state(_pipe_state(WatchStatus.OK, lrc, position_ms=2100))
|
||||
)
|
||||
assert capsys.readouterr().out == "b\n"
|
||||
|
||||
asyncio.run(output.on_state(state))
|
||||
|
||||
printed = capsys.readouterr().out
|
||||
assert printed == "b\nc\n\n"
|
||||
def test_pipe_output_context_window(capsys) -> None:
|
||||
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
asyncio.run(
|
||||
PipeOutput(before=1, after=1).on_state(
|
||||
_pipe_state(WatchStatus.OK, lrc, position_ms=2100)
|
||||
)
|
||||
)
|
||||
assert capsys.readouterr().out == "a\nb\nc\n"
|
||||
|
||||
|
||||
def test_pipe_output_before_region_empty_at_first_line(capsys) -> None:
|
||||
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
asyncio.run(
|
||||
PipeOutput(before=1, after=1).on_state(
|
||||
_pipe_state(WatchStatus.OK, lrc, position_ms=1100)
|
||||
)
|
||||
)
|
||||
assert capsys.readouterr().out == "\na\nb\n"
|
||||
|
||||
|
||||
def test_pipe_output_after_region_empty_at_last_line(capsys) -> None:
|
||||
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
asyncio.run(
|
||||
PipeOutput(before=1, after=1).on_state(
|
||||
_pipe_state(WatchStatus.OK, lrc, position_ms=3100)
|
||||
)
|
||||
)
|
||||
assert capsys.readouterr().out == "b\nc\n\n"
|
||||
|
||||
|
||||
def test_pipe_output_upcoming_lines_before_first_timestamp(capsys) -> None:
|
||||
lrc = LRCData("[00:02.00]a\n[00:03.00]b")
|
||||
asyncio.run(
|
||||
PipeOutput(before=1, after=1).on_state(
|
||||
_pipe_state(WatchStatus.OK, lrc, position_ms=0)
|
||||
)
|
||||
)
|
||||
assert capsys.readouterr().out == "\n\na\n"
|
||||
|
||||
|
||||
def test_pipe_output_offset_ms_shifts_effective_position(capsys) -> None:
|
||||
lrc = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
|
||||
asyncio.run(
|
||||
PipeOutput().on_state(
|
||||
_pipe_state(WatchStatus.OK, lrc, position_ms=1000, offset_ms=1500)
|
||||
)
|
||||
)
|
||||
# effective = 2500 ms → line b
|
||||
assert capsys.readouterr().out == "b\n"
|
||||
|
||||
|
||||
def test_pipe_output_repeated_text_uses_correct_timed_occurrence(capsys) -> None:
|
||||
output = PipeOutput(before=1, after=1)
|
||||
lyrics = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C")
|
||||
state = WatchState(
|
||||
track=TrackMeta(title="Song"),
|
||||
lrc = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C")
|
||||
asyncio.run(
|
||||
PipeOutput(before=1, after=1).on_state(
|
||||
_pipe_state(WatchStatus.OK, lrc, position_ms=4100)
|
||||
)
|
||||
)
|
||||
assert capsys.readouterr().out == "B\nX\nC\n"
|
||||
|
||||
|
||||
# PrintOutput
|
||||
|
||||
|
||||
def _ok_state(lyrics: LRCData, track: Optional[TrackMeta] = None) -> WatchState:
|
||||
return WatchState(
|
||||
track=track or TrackMeta(title="Song", artist="Artist"),
|
||||
lyrics=LyricView.from_lrc(lyrics),
|
||||
position_ms=4100,
|
||||
position_ms=0,
|
||||
offset_ms=0,
|
||||
status="ok",
|
||||
status=WatchStatus.OK,
|
||||
)
|
||||
|
||||
|
||||
def _status_state(status: WatchStatus, track: Optional[TrackMeta] = None) -> WatchState:
|
||||
return WatchState(
|
||||
track=track or TrackMeta(title="Song", artist="Artist"),
|
||||
lyrics=None,
|
||||
position_ms=0,
|
||||
offset_ms=0,
|
||||
status=status,
|
||||
)
|
||||
|
||||
|
||||
def test_print_output_emits_lrc_on_ok(capsys) -> None:
|
||||
asyncio.run(
|
||||
PrintOutput().on_state(_ok_state(LRCData("[00:01.00]Hello\n[00:02.00]World")))
|
||||
)
|
||||
assert capsys.readouterr().out.startswith("[00:01.00]")
|
||||
|
||||
|
||||
def test_print_output_plain_strips_tags(capsys) -> None:
|
||||
asyncio.run(
|
||||
PrintOutput(plain=True).on_state(
|
||||
_ok_state(LRCData("[00:01.00]Hello\n[00:02.00]World"))
|
||||
)
|
||||
)
|
||||
out = capsys.readouterr().out
|
||||
assert "[" not in out
|
||||
assert "Hello" in out
|
||||
|
||||
|
||||
def test_print_output_plain_with_unsynced_lyrics(capsys) -> None:
|
||||
asyncio.run(PrintOutput(plain=True).on_state(_ok_state(LRCData("Hello\nWorld"))))
|
||||
out = capsys.readouterr().out
|
||||
assert "Hello" in out
|
||||
assert "[" not in out
|
||||
|
||||
|
||||
def test_print_output_no_lyrics_emits_blank_line(capsys) -> None:
|
||||
asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.NO_LYRICS)))
|
||||
assert capsys.readouterr().out == "\n"
|
||||
|
||||
|
||||
def test_print_output_fetching_emits_nothing(capsys) -> None:
|
||||
asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.FETCHING)))
|
||||
assert capsys.readouterr().out == ""
|
||||
|
||||
|
||||
def test_print_output_idle_emits_nothing(capsys) -> None:
|
||||
asyncio.run(PrintOutput().on_state(_status_state(WatchStatus.IDLE)))
|
||||
assert capsys.readouterr().out == ""
|
||||
|
||||
|
||||
def test_print_output_is_stateless(capsys) -> None:
|
||||
"""View has no internal deduplication — emits on every call."""
|
||||
output = PrintOutput()
|
||||
state = _ok_state(LRCData("[00:01.00]Hello"))
|
||||
asyncio.run(output.on_state(state))
|
||||
|
||||
printed = capsys.readouterr().out
|
||||
assert printed == "B\nX\nC\n"
|
||||
asyncio.run(output.on_state(state))
|
||||
lines = [ln for ln in capsys.readouterr().out.splitlines() if ln]
|
||||
assert len(lines) == 2
|
||||
|
||||
|
||||
def test_session_fetches_on_resume_playing_without_lyrics() -> None:
|
||||
def test_print_output_position_sensitive_is_false() -> None:
|
||||
assert PrintOutput.position_sensitive is False
|
||||
|
||||
|
||||
# WatchCoordinator
|
||||
|
||||
|
||||
class _CaptureFetcher:
|
||||
def __init__(self) -> None:
|
||||
self.requested: list[str] = []
|
||||
|
||||
def request(self, track: TrackMeta) -> None:
|
||||
self.requested.append(track.display_name())
|
||||
|
||||
async def stop(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def _make_coordinator(output: Optional[BaseOutput] = None) -> WatchCoordinator:
|
||||
class _Manager:
|
||||
def fetch_for_track(self, *_a, **_kw):
|
||||
return None
|
||||
|
||||
class _NullOutput(BaseOutput):
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
pass
|
||||
|
||||
session = WatchCoordinator(
|
||||
_Manager(), # type: ignore
|
||||
output or _NullOutput(),
|
||||
player_hint=None,
|
||||
config=TEST_CONFIG,
|
||||
)
|
||||
session._tracker = PositionTracker(
|
||||
lambda _bus: asyncio.sleep(0, result=0),
|
||||
TEST_CONFIG,
|
||||
)
|
||||
return session
|
||||
|
||||
|
||||
def _pstate(status: str = "Playing", title: str = "Song") -> PlayerState:
|
||||
return PlayerState(
|
||||
bus_name=BUS,
|
||||
status=status,
|
||||
track=TrackMeta(title=title, artist="Artist"),
|
||||
)
|
||||
|
||||
|
||||
def test_coordinator_fetches_on_initial_player() -> None:
|
||||
async def _run() -> None:
|
||||
class _Manager:
|
||||
def fetch_for_track(self, *_args, **_kwargs):
|
||||
return None
|
||||
session = _make_coordinator()
|
||||
fetcher = _CaptureFetcher()
|
||||
session._fetcher = fetcher # type: ignore[assignment]
|
||||
session._player_monitor.players = {BUS: _pstate("Playing")}
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
assert fetcher.requested == ["Artist - Song"]
|
||||
assert session._model.status == WatchStatus.FETCHING
|
||||
|
||||
class _Output(BaseOutput):
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
return None
|
||||
asyncio.run(_run())
|
||||
|
||||
class _Fetcher(LyricFetcher):
|
||||
def __init__(self):
|
||||
async def _fetch(_track: TrackMeta):
|
||||
return None
|
||||
|
||||
async def _on_fetching() -> None:
|
||||
return None
|
||||
def test_coordinator_fetches_while_paused() -> None:
|
||||
"""Fetch starts immediately even when player is paused — no wait for resume."""
|
||||
|
||||
async def _on_result(_lyrics) -> None:
|
||||
return None
|
||||
async def _run() -> None:
|
||||
session = _make_coordinator()
|
||||
fetcher = _CaptureFetcher()
|
||||
session._fetcher = fetcher # type: ignore[assignment]
|
||||
session._player_monitor.players = {BUS: _pstate("Paused")}
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
assert fetcher.requested == ["Artist - Song"]
|
||||
|
||||
super().__init__(
|
||||
_fetch, _on_fetching, _on_result, TEST_CONFIG.watch.debounce_ms
|
||||
)
|
||||
self.requested = []
|
||||
asyncio.run(_run())
|
||||
|
||||
def request(self, track: TrackMeta) -> None:
|
||||
self.requested.append(track.display_name())
|
||||
|
||||
session = WatchCoordinator(
|
||||
_Manager(), # type: ignore
|
||||
_Output(),
|
||||
player_hint=None,
|
||||
config=TEST_CONFIG,
|
||||
)
|
||||
fake_fetcher = _Fetcher()
|
||||
session._fetcher = fake_fetcher
|
||||
session._tracker = PositionTracker(
|
||||
lambda _bus: asyncio.sleep(0, result=0),
|
||||
TEST_CONFIG,
|
||||
)
|
||||
def test_coordinator_fetches_on_track_change() -> None:
|
||||
async def _run() -> None:
|
||||
session = _make_coordinator()
|
||||
session._model.active_player = BUS
|
||||
session._model.active_track_key = "Artist - Old Song"
|
||||
session._model.set_lyrics(LRCData("[00:01.00]old"))
|
||||
session._model.status = WatchStatus.OK
|
||||
|
||||
bus_name = "org.mpris.MediaPlayer2.spotify"
|
||||
track = TrackMeta(title="Song", artist="Artist")
|
||||
session._model.active_player = bus_name
|
||||
session._player_monitor.players = {
|
||||
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
|
||||
}
|
||||
session._model.set_lyrics(None)
|
||||
session._model.status = "paused"
|
||||
|
||||
session._on_playback_status(bus_name, "Playing")
|
||||
fetcher = _CaptureFetcher()
|
||||
session._fetcher = fetcher # type: ignore[assignment]
|
||||
session._player_monitor.players = {BUS: _pstate("Playing", title="New Song")}
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert fake_fetcher.requested == ["Artist - Song"]
|
||||
assert session._model.status == "fetching"
|
||||
assert fetcher.requested == ["Artist - New Song"]
|
||||
assert session._model.lyrics is None
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_session_emit_state_only_when_lyric_cursor_changes() -> None:
|
||||
def test_coordinator_no_refetch_on_calibration_no_lyrics() -> None:
|
||||
"""Calibration with same player/track and no_lyrics must NOT trigger a second fetch."""
|
||||
|
||||
async def _run() -> None:
|
||||
class _Manager:
|
||||
def fetch_for_track(self, *_args, **_kwargs):
|
||||
return None
|
||||
session = _make_coordinator()
|
||||
fetcher = _CaptureFetcher()
|
||||
session._fetcher = fetcher # type: ignore[assignment]
|
||||
session._player_monitor.players = {BUS: _pstate("Playing")}
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
assert len(fetcher.requested) == 1
|
||||
|
||||
class _Output(BaseOutput):
|
||||
def __init__(self):
|
||||
self.count = 0
|
||||
session._model.status = WatchStatus.NO_LYRICS
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
assert len(fetcher.requested) == 1
|
||||
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
self.count += 1
|
||||
asyncio.run(_run())
|
||||
|
||||
output = _Output()
|
||||
session = WatchCoordinator(
|
||||
_Manager(), # type: ignore
|
||||
output,
|
||||
player_hint=None,
|
||||
config=TEST_CONFIG,
|
||||
)
|
||||
session._tracker = PositionTracker(
|
||||
lambda _bus: asyncio.sleep(0, result=0),
|
||||
TEST_CONFIG,
|
||||
)
|
||||
|
||||
bus_name = "org.mpris.MediaPlayer2.spotify"
|
||||
track = TrackMeta(title="Song", artist="Artist")
|
||||
session._model.active_player = bus_name
|
||||
def test_coordinator_no_fetch_when_lyrics_present() -> None:
|
||||
async def _run() -> None:
|
||||
session = _make_coordinator()
|
||||
session._model.active_player = BUS
|
||||
session._model.active_track_key = "Artist - Song"
|
||||
session._model.set_lyrics(LRCData("[00:01.00]line"))
|
||||
session._model.status = WatchStatus.OK
|
||||
|
||||
fetcher = _CaptureFetcher()
|
||||
session._fetcher = fetcher # type: ignore[assignment]
|
||||
session._player_monitor.players = {BUS: _pstate("Playing")}
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert fetcher.requested == []
|
||||
assert session._model.status == WatchStatus.OK
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_coordinator_player_disappears_goes_idle() -> None:
|
||||
async def _run() -> None:
|
||||
session = _make_coordinator()
|
||||
session._model.active_player = BUS
|
||||
session._model.active_track_key = "Artist - Song"
|
||||
session._model.set_lyrics(LRCData("[00:01.00]line"))
|
||||
session._model.status = WatchStatus.OK
|
||||
|
||||
session._player_monitor.players = {}
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert session._model.status == WatchStatus.IDLE
|
||||
assert session._model.lyrics is None
|
||||
assert session._model.active_player is None
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_coordinator_no_fetch_when_track_is_none() -> None:
|
||||
"""Player present but reports no track metadata → no fetch, status NO_LYRICS."""
|
||||
|
||||
async def _run() -> None:
|
||||
session = _make_coordinator()
|
||||
fetcher = _CaptureFetcher()
|
||||
session._fetcher = fetcher # type: ignore[assignment]
|
||||
session._player_monitor.players = {
|
||||
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
|
||||
BUS: PlayerState(bus_name=BUS, status="Playing", track=None)
|
||||
}
|
||||
session._on_player_change()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert fetcher.requested == []
|
||||
assert session._model.status == WatchStatus.NO_LYRICS
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_coordinator_emit_deduplicates_on_same_cursor() -> None:
|
||||
async def _run() -> None:
|
||||
counts = [0]
|
||||
|
||||
class _CountOutput(BaseOutput):
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
counts[0] += 1
|
||||
|
||||
session = _make_coordinator(_CountOutput())
|
||||
track = TrackMeta(title="Song", artist="Artist")
|
||||
session._model.active_player = BUS
|
||||
session._player_monitor.players = {
|
||||
BUS: PlayerState(bus_name=BUS, status="Playing", track=track)
|
||||
}
|
||||
session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b"))
|
||||
session._model.status = "ok"
|
||||
await session._tracker.set_active_player(
|
||||
bus_name,
|
||||
"Playing",
|
||||
"Artist - Song",
|
||||
)
|
||||
session._model.status = WatchStatus.OK
|
||||
await session._tracker.set_active_player(BUS, "Playing", "Artist - Song")
|
||||
|
||||
await session._emit_state()
|
||||
await session._emit_state()
|
||||
await session._emit_state() # emits
|
||||
await session._emit_state() # same cursor → no emit
|
||||
assert counts[0] == 1
|
||||
|
||||
await session._tracker.on_seeked(bus_name, 3200)
|
||||
await session._emit_state()
|
||||
|
||||
assert output.count == 2
|
||||
await session._tracker.on_seeked(BUS, 3200)
|
||||
await session._emit_state() # cursor advanced → emits
|
||||
assert counts[0] == 2
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
def test_session_emits_when_crossing_first_timestamp() -> None:
|
||||
def test_coordinator_position_insensitive_output_ignores_seeks() -> None:
|
||||
"""With position_sensitive=False, seek events do not trigger re-emit."""
|
||||
|
||||
async def _run() -> None:
|
||||
class _Manager:
|
||||
def fetch_for_track(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
class _Output(BaseOutput):
|
||||
def __init__(self):
|
||||
self.count = 0
|
||||
counts = [0]
|
||||
|
||||
class _CountPrint(PrintOutput):
|
||||
async def on_state(self, state: WatchState) -> None:
|
||||
self.count += 1
|
||||
counts[0] += 1
|
||||
|
||||
output = _Output()
|
||||
session = WatchCoordinator(
|
||||
_Manager(), # type: ignore
|
||||
output,
|
||||
player_hint=None,
|
||||
config=TEST_CONFIG,
|
||||
)
|
||||
session._tracker = PositionTracker(
|
||||
lambda _bus: asyncio.sleep(0, result=0),
|
||||
TEST_CONFIG,
|
||||
)
|
||||
|
||||
bus_name = "org.mpris.MediaPlayer2.spotify"
|
||||
session = _make_coordinator(_CountPrint())
|
||||
track = TrackMeta(title="Song", artist="Artist")
|
||||
session._model.active_player = bus_name
|
||||
session._model.active_player = BUS
|
||||
session._player_monitor.players = {
|
||||
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
|
||||
BUS: PlayerState(bus_name=BUS, status="Playing", track=track)
|
||||
}
|
||||
session._model.set_lyrics(LRCData("[00:02.00]a\n[00:03.00]b"))
|
||||
session._model.status = "ok"
|
||||
await session._tracker.set_active_player(
|
||||
bus_name,
|
||||
"Playing",
|
||||
"Artist - Song",
|
||||
)
|
||||
session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b"))
|
||||
session._model.status = WatchStatus.OK
|
||||
|
||||
await session._emit_state()
|
||||
await session._tracker.on_seeked(bus_name, 2500)
|
||||
await session._emit_state()
|
||||
await session._emit_state() # emits once
|
||||
assert counts[0] == 1
|
||||
|
||||
assert output.count == 2
|
||||
await session._tracker.on_seeked(BUS, 3200)
|
||||
await session._emit_state() # position fixed at 0 → same signature → no re-emit
|
||||
assert counts[0] == 1
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
Reference in New Issue
Block a user