Compare commits

..

2 Commits

Author SHA1 Message Date
Uyanide d2a3e64b89 feat: config file
refactor: as the config module changed
test: add test for config
test: add test for local fetcher and local enrichers
test: add test for manual insertion
fix: some random bugs left by the last commit
2026-04-09 22:42:41 +02:00
Uyanide e6b8583868 feat: add watch command and pipe view 2026-04-09 22:42:41 +02:00
37 changed files with 2574 additions and 263 deletions
+52 -31
View File
@@ -21,7 +21,7 @@ highest-confidence result wins.
1. **Local** — sidecar `.lrc` files or embedded audio metadata (FLAC, MP3) 1. **Local** — sidecar `.lrc` files or embedded audio metadata (FLAC, MP3)
2. **Cache Search** — fuzzy cross-album lookup in local cache 2. **Cache Search** — fuzzy cross-album lookup in local cache
3. **Spotify** — synced lyrics via Spotify's API 3. **Spotify** — synced lyrics via Spotify's API
(requires `SPOTIFY_SP_DC` and Spotify trackid) (requires `credentials.spotify_sp_dc` and Spotify trackid)
4. **LRCLIB** — exact match from [lrclib.net](https://lrclib.net) 4. **LRCLIB** — exact match from [lrclib.net](https://lrclib.net)
(requires full metadata) (requires full metadata)
5. **Musixmatch (Spotify)** — Musixmatch API with Spotify trackid 5. **Musixmatch (Spotify)** — Musixmatch API with Spotify trackid
@@ -30,7 +30,7 @@ highest-confidence result wins.
7. **Musixmatch** — Musixmatch API with metadata search (requires at least a title) 7. **Musixmatch** — Musixmatch API with metadata search (requires at least a title)
8. **Netease** — Netease Cloud Music public API 8. **Netease** — Netease Cloud Music public API
9. **QQ Music** — QQ Music via self-hosted API proxy 9. **QQ Music** — QQ Music via self-hosted API proxy
(requires `QQ_MUSIC_API_URL` that provides the same interface as [tooplick/qq-music-api](https://github.com/tooplick/qq-music-api)) (requires `credentials.qq_music_api_url`; compatible with [tooplick/qq-music-api](https://github.com/tooplick/qq-music-api))
> I'm aware that Spotify's lyrics are provided by Musixmatch, but the fact is > I'm aware that Spotify's lyrics are provided by Musixmatch, but the fact is
> that Musixmatch's own search will yield different (and more) results than > that Musixmatch's own search will yield different (and more) results than
@@ -46,7 +46,7 @@ See `lrx --help` for full command reference. Common use cases:
lrx fetch lrx fetch
``` ```
targeting a specific player and a source to fetch from: targeting a specific player and source:
```bash ```bash
lrx fetch --player mpd --method lrclib-search lrx fetch --player mpd --method lrclib-search
@@ -73,6 +73,21 @@ See `lrx --help` for full command reference. Common use cases:
lrx export --output /path/to/lyrics.lrc lrx export --output /path/to/lyrics.lrc
``` ```
- Watch active player and stream lyrics continuously to stdout:
```bash
lrx watch pipe
lrx watch pipe --before 1 --after 2 # show context lines
```
Control a running watch session:
```bash
lrx watch ctl status # print session status as JSON
lrx watch ctl offset +200 # shift lyrics forward 200 ms
lrx watch ctl offset -150
```
- Cache management: - Cache management:
```bash ```bash
@@ -83,40 +98,46 @@ See `lrx --help` for full command reference. Common use cases:
lrx cache confidence spotify 100 # manually set confidence for a source lrx cache confidence spotify 100 # manually set confidence for a source
``` ```
## Configuration
Set credentials via environment variable or `.env` file:
- `~/.config/lrx/.env` — user-level
- `.env` in working directory — project-local
- Shell environment — highest priority
```env
SPOTIFY_SP_DC=your_cookie_value
MUSIXMATCH_USERTOKEN=your_musixmatch_usertoken
QQ_MUSIC_API_URL=https://api.example.com
PREFERRED_PLAYER=spotify
```
- `SPOTIFY_SP_DC` — required for Spotify source. Defaults to empty
(disabled Spotify source).
- `MUSIXMATCH_USERTOKEN` — optional for Musixmatch sources
([Curators Settings Page](https://curators.musixmatch.com/settings)
-> Login (if required)
-> "Copy debug info").
If not set, an anonymous token will be fetched at runtime.
- `QQ_MUSIC_API_URL` — required for QQ Music source. Defaults to empty
(disabled QQ Music source).
- `PREFERRED_PLAYER` — preferred MPRIS player when multiple are active.
Defaults to `spotify`. Only used when no `--player` flag is given and more
than one player (or none of them) is currently playing.
Shell completion (zsh/fish/bash): Shell completion (zsh/fish/bash):
```bash ```bash
lrx --install-completion lrx --install-completion
``` ```
## Configuration
Configuration is read from `~/.config/lrx-cli/config.toml`. The file is
optional; all values have defaults. Unknown keys are rejected with an error.
```toml
[general]
preferred_player = "spotify" # preferred MPRIS player when multiple are active
player_blacklist = ["firefox", "zen", "chrome", "chromium", "vivaldi", "edge", "opera", "mpv"]
http_timeout = 10.0 # seconds
[credentials]
spotify_sp_dc = "" # required for Spotify source
musixmatch_usertoken = "" # optional; anonymous token fetched if empty
qq_music_api_url = "" # required for QQ Music source
[watch]
debounce_ms = 400 # ms to wait after a track change before fetching
calibration_interval_s = 3.0 # seconds between full MPRIS position recalibrations
position_tick_ms = 50 # ms between local position ticks
socket_path = "" # Unix socket path; defaults to <cache_dir>/watch.sock
```
**Credentials:**
- `spotify_sp_dc` — `SP_DC` cookie from a logged-in Spotify web session. Required
for the Spotify source; leave empty to disable it.
- `musixmatch_usertoken` — found at
[Curators Settings Page](https://curators.musixmatch.com/settings) → Login → "Copy debug info".
If empty, an anonymous token is fetched at runtime.
- `qq_music_api_url` — base URL of a self-hosted
[qq-music-api](https://github.com/tooplick/qq-music-api) (compatible) instance. Required
for the QQ Music source; leave empty to disable it.
## Development ## Development
Clone this repository: Clone this repository:
+2 -3
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "lrx-cli" name = "lrx-cli"
version = "0.6.4" version = "0.6.5"
description = "Fetch line-synced lyrics for your music player." description = "Fetch line-synced lyrics for your music player."
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
@@ -14,8 +14,7 @@ dependencies = [
"httpx>=0.28.1", "httpx>=0.28.1",
"loguru>=0.7.3", "loguru>=0.7.3",
"mutagen>=1.47.0", "mutagen>=1.47.0",
"platformdirs>=4.9.4", "platformdirs>=4.9.6",
"python-dotenv>=1.2.2",
] ]
[project.scripts] [project.scripts]
+9 -6
View File
@@ -10,6 +10,7 @@ from .base import BaseAuthenticator
from .spotify import SpotifyAuthenticator from .spotify import SpotifyAuthenticator
from .musixmatch import MusixmatchAuthenticator from .musixmatch import MusixmatchAuthenticator
from .dummy import DummyAuthenticator from .dummy import DummyAuthenticator
from ..config import AppConfig
__all__ = [ __all__ = [
"BaseAuthenticator", "BaseAuthenticator",
@@ -20,11 +21,13 @@ __all__ = [
] ]
def create_authenticators(cache) -> dict[str, BaseAuthenticator]: def create_authenticators(cache, config: AppConfig) -> dict[str, BaseAuthenticator]:
"""Factory function to create authenticators with cache access.""" """Factory function to create authenticators with injected config."""
return { return {
"dummy": DummyAuthenticator(), "dummy": DummyAuthenticator(cache, config.credentials, config.general),
"spotify": SpotifyAuthenticator(cache), "spotify": SpotifyAuthenticator(cache, config.credentials, config.general),
"musixmatch": MusixmatchAuthenticator(cache), "musixmatch": MusixmatchAuthenticator(
"qqmusic": QQMusicAuthenticator(), cache, config.credentials, config.general
),
"qqmusic": QQMusicAuthenticator(cache, config.credentials, config.general),
} }
+10
View File
@@ -7,10 +7,20 @@ Description: Base class for credential authenticators.
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional from typing import Optional
from ..cache import CacheEngine
from ..config import CredentialConfig, GeneralConfig
class BaseAuthenticator(ABC): class BaseAuthenticator(ABC):
"""Manages obtaining, caching, and refreshing a credential for one provider.""" """Manages obtaining, caching, and refreshing a credential for one provider."""
def __init__(
self, cache: CacheEngine, credentials: CredentialConfig, general: GeneralConfig
) -> None:
self._cache = cache
self._credentials = credentials
self._general = general
@property @property
@abstractmethod @abstractmethod
def name(self) -> str: ... def name(self) -> str: ...
+9 -7
View File
@@ -12,7 +12,7 @@ from loguru import logger
from .base import BaseAuthenticator from .base import BaseAuthenticator
from ..cache import CacheEngine from ..cache import CacheEngine
from ..config import HTTP_TIMEOUT, MUSIXMATCH_COOLDOWN_MS, credentials from ..config import CredentialConfig, GeneralConfig, MUSIXMATCH_COOLDOWN_MS
_MUSIXMATCH_TOKEN_URL = "https://apic-desktop.musixmatch.com/ws/1.1/token.get" _MUSIXMATCH_TOKEN_URL = "https://apic-desktop.musixmatch.com/ws/1.1/token.get"
@@ -24,8 +24,10 @@ _MXM_BASE_PARAMS = {
class MusixmatchAuthenticator(BaseAuthenticator): class MusixmatchAuthenticator(BaseAuthenticator):
def __init__(self, cache: CacheEngine) -> None: def __init__(
self._cache = cache self, cache: CacheEngine, credentials: CredentialConfig, general: GeneralConfig
) -> None:
super().__init__(cache, credentials, general)
self._cached_token: Optional[str] = None self._cached_token: Optional[str] = None
self._cooldown_until_ms: int = 0 self._cooldown_until_ms: int = 0
@@ -77,7 +79,7 @@ class MusixmatchAuthenticator(BaseAuthenticator):
logger.debug("Musixmatch: fetching anonymous token") logger.debug("Musixmatch: fetching anonymous token")
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.get(url, headers=_MXM_HEADERS) resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status() resp.raise_for_status()
data = resp.json() data = resp.json()
@@ -102,8 +104,8 @@ class MusixmatchAuthenticator(BaseAuthenticator):
async def _get_token(self) -> Optional[str]: async def _get_token(self) -> Optional[str]:
"""Return a valid token: env var > memory > DB > fresh fetch.""" """Return a valid token: env var > memory > DB > fresh fetch."""
if credentials.MUSIXMATCH_USERTOKEN: if self._credentials.musixmatch_usertoken:
return credentials.MUSIXMATCH_USERTOKEN return self._credentials.musixmatch_usertoken
if self._cached_token: if self._cached_token:
return self._cached_token return self._cached_token
@@ -139,7 +141,7 @@ class MusixmatchAuthenticator(BaseAuthenticator):
self._set_cooldown() self._set_cooldown()
return None return None
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
url = f"{url_base}?{urlencode({**_MXM_BASE_PARAMS, **params, 'usertoken': token})}" url = f"{url_base}?{urlencode({**_MXM_BASE_PARAMS, **params, 'usertoken': token})}"
resp = await client.get(url, headers=_MXM_HEADERS) resp = await client.get(url, headers=_MXM_HEADERS)
+8 -5
View File
@@ -7,19 +7,22 @@ Description: QQ Music API authenticator - currently only a proxy.
from typing import Optional from typing import Optional
from .base import BaseAuthenticator from .base import BaseAuthenticator
from ..config import credentials from ..cache import CacheEngine
from ..config import CredentialConfig, GeneralConfig
class QQMusicAuthenticator(BaseAuthenticator): class QQMusicAuthenticator(BaseAuthenticator):
def __init__(self) -> None: def __init__(
pass self, cache: CacheEngine, credentials: CredentialConfig, general: GeneralConfig
) -> None:
super().__init__(cache, credentials, general)
@property @property
def name(self) -> str: def name(self) -> str:
return "qqmusic" return "qqmusic"
def is_configured(self) -> bool: def is_configured(self) -> bool:
return bool(credentials.QQ_MUSIC_API_URL) return bool(self._credentials.qq_music_api_url)
async def authenticate(self) -> Optional[str]: async def authenticate(self) -> Optional[str]:
return credentials.QQ_MUSIC_API_URL return self._credentials.qq_music_api_url.rstrip("/") or None
+18 -10
View File
@@ -14,7 +14,7 @@ from loguru import logger
from .base import BaseAuthenticator from .base import BaseAuthenticator
from ..cache import CacheEngine from ..cache import CacheEngine
from ..config import HTTP_TIMEOUT, UA_BROWSER, credentials from ..config import CredentialConfig, GeneralConfig, UA_BROWSER
_SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token" _SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token"
_SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time" _SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time"
@@ -32,8 +32,10 @@ SPOTIFY_BASE_HEADERS = {
class SpotifyAuthenticator(BaseAuthenticator): class SpotifyAuthenticator(BaseAuthenticator):
def __init__(self, cache: CacheEngine) -> None: def __init__(
self._cache = cache self, cache: CacheEngine, credentials: CredentialConfig, general: GeneralConfig
) -> None:
super().__init__(cache, credentials, general)
self._cached_secret: Optional[Tuple[str, int]] = None self._cached_secret: Optional[Tuple[str, int]] = None
self._cached_token: Optional[str] = None self._cached_token: Optional[str] = None
self._token_expires_at: float = 0.0 self._token_expires_at: float = 0.0
@@ -43,7 +45,7 @@ class SpotifyAuthenticator(BaseAuthenticator):
return "spotify" return "spotify"
def is_configured(self) -> bool: def is_configured(self) -> bool:
return bool(credentials.SPOTIFY_SP_DC) return bool(self._credentials.spotify_sp_dc)
@staticmethod @staticmethod
def _generate_totp(server_time_s: int, secret: str) -> str: def _generate_totp(server_time_s: int, secret: str) -> str:
@@ -82,7 +84,9 @@ class SpotifyAuthenticator(BaseAuthenticator):
async def _get_server_time(self, client: httpx.AsyncClient) -> Optional[int]: async def _get_server_time(self, client: httpx.AsyncClient) -> Optional[int]:
try: try:
res = await client.get(_SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT) res = await client.get(
_SPOTIFY_SERVER_TIME_URL, timeout=self._general.http_timeout
)
res.raise_for_status() res.raise_for_status()
data = res.json() data = res.json()
if not isinstance(data, dict) or "serverTime" not in data: if not isinstance(data, dict) or "serverTime" not in data:
@@ -100,7 +104,9 @@ class SpotifyAuthenticator(BaseAuthenticator):
logger.debug("Spotify: using cached TOTP secret") logger.debug("Spotify: using cached TOTP secret")
return self._cached_secret return self._cached_secret
try: try:
res = await client.get(_SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT) res = await client.get(
_SPOTIFY_SECRET_URL, timeout=self._general.http_timeout
)
res.raise_for_status() res.raise_for_status()
data = res.json() data = res.json()
if not isinstance(data, list) or len(data) == 0: if not isinstance(data, list) or len(data) == 0:
@@ -133,13 +139,13 @@ class SpotifyAuthenticator(BaseAuthenticator):
if db_token and time.time() < self._token_expires_at - 30: if db_token and time.time() < self._token_expires_at - 30:
return db_token return db_token
if not credentials.SPOTIFY_SP_DC: if not self._credentials.spotify_sp_dc:
logger.error("Spotify: SPOTIFY_SP_DC env var not set — cannot authenticate") logger.error("Spotify: spotify_sp_dc not configured — cannot authenticate")
return None return None
headers = { headers = {
"Accept": "*/*", "Accept": "*/*",
"Cookie": f"sp_dc={credentials.SPOTIFY_SP_DC}", "Cookie": f"sp_dc={self._credentials.spotify_sp_dc}",
**SPOTIFY_BASE_HEADERS, **SPOTIFY_BASE_HEADERS,
} }
@@ -166,7 +172,9 @@ class SpotifyAuthenticator(BaseAuthenticator):
try: try:
res = await client.get( res = await client.get(
_SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT _SPOTIFY_TOKEN_URL,
params=params,
timeout=self._general.http_timeout,
) )
if res.status_code != 200: if res.status_code != 200:
logger.error(f"Spotify: token request returned {res.status_code}") logger.error(f"Spotify: token request returned {res.status_code}")
+20 -14
View File
@@ -85,9 +85,15 @@ class CacheEngine:
self.db_path = db_path self.db_path = db_path
self._init_db() self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def _init_db(self) -> None: def _init_db(self) -> None:
"""Create cache tables and run one-time slot/cache migrations.""" """Create cache tables and run one-time slot/cache migrations."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS credentials ( CREATE TABLE IF NOT EXISTS credentials (
name TEXT PRIMARY KEY, name TEXT PRIMARY KEY,
@@ -256,7 +262,7 @@ class CacheEngine:
return [] return []
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute( conn.execute(
"DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at < ?", "DELETE FROM cache WHERE key = ? AND expires_at IS NOT NULL AND expires_at < ?",
(key, now), (key, now),
@@ -353,7 +359,7 @@ class CacheEngine:
# Convenience for callers that still pass a single negative result. # Convenience for callers that still pass a single negative result.
kinds = [SLOT_SYNCED, SLOT_UNSYNCED] kinds = [SLOT_SYNCED, SLOT_UNSYNCED]
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
for kind in kinds: for kind in kinds:
conn.execute( conn.execute(
"""INSERT OR REPLACE INTO cache """INSERT OR REPLACE INTO cache
@@ -386,7 +392,7 @@ class CacheEngine:
def clear_all(self) -> None: def clear_all(self) -> None:
"""Remove every entry from the cache.""" """Remove every entry from the cache."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute("DELETE FROM cache") conn.execute("DELETE FROM cache")
conn.commit() conn.commit()
logger.info("Cache cleared.") logger.info("Cache cleared.")
@@ -396,7 +402,7 @@ class CacheEngine:
if not self._track_has_meta(track): if not self._track_has_meta(track):
logger.info(f"No cache entries found for {track.display_name()}.") logger.info(f"No cache entries found for {track.display_name()}.")
return return
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
cur = conn.execute( cur = conn.execute(
f"DELETE FROM cache WHERE {_TRACK_WHERE}", f"DELETE FROM cache WHERE {_TRACK_WHERE}",
_track_where_params(track), _track_where_params(track),
@@ -411,7 +417,7 @@ class CacheEngine:
def prune(self) -> int: def prune(self) -> int:
"""Remove all expired entries. Returns the number of rows deleted.""" """Remove all expired entries. Returns the number of rows deleted."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
cur = conn.execute( cur = conn.execute(
"DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", "DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
(int(time.time()),), (int(time.time()),),
@@ -439,7 +445,7 @@ class CacheEngine:
return None return None
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
rows = conn.execute( rows = conn.execute(
f"SELECT status, lyrics, source, confidence FROM cache" f"SELECT status, lyrics, source, confidence FROM cache"
@@ -495,7 +501,7 @@ class CacheEngine:
return [] return []
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
rows = conn.execute( rows = conn.execute(
"""SELECT * FROM cache """SELECT * FROM cache
@@ -557,7 +563,7 @@ class CacheEngine:
""" """
if not self._track_has_meta(track): if not self._track_has_meta(track):
return 0 return 0
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
cur = conn.execute( cur = conn.execute(
f"UPDATE cache SET confidence = ? WHERE {_TRACK_WHERE} AND source = ?", f"UPDATE cache SET confidence = ? WHERE {_TRACK_WHERE} AND source = ?",
[confidence] + _track_where_params(track) + [source], [confidence] + _track_where_params(track) + [source],
@@ -571,7 +577,7 @@ class CacheEngine:
"""Return all cached rows for a given track (across all sources).""" """Return all cached rows for a given track (across all sources)."""
if not self._track_has_meta(track): if not self._track_has_meta(track):
return [] return []
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return [ return [
dict(r) dict(r)
@@ -586,7 +592,7 @@ class CacheEngine:
def get_credential(self, name: str) -> Optional[dict]: def get_credential(self, name: str) -> Optional[dict]:
"""Return cached credential data if present and not expired.""" """Return cached credential data if present and not expired."""
now_ms = int(time.time() * 1000) now_ms = int(time.time() * 1000)
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
row = conn.execute( row = conn.execute(
"SELECT data FROM credentials WHERE name = ? AND (expires_at IS NULL OR expires_at > ?)", "SELECT data FROM credentials WHERE name = ? AND (expires_at IS NULL OR expires_at > ?)",
@@ -603,7 +609,7 @@ class CacheEngine:
self, name: str, data: dict, expires_at_ms: Optional[int] = None self, name: str, data: dict, expires_at_ms: Optional[int] = None
) -> None: ) -> None:
"""Persist credential data, optionally with an expiry timestamp (Unix ms).""" """Persist credential data, optionally with an expiry timestamp (Unix ms)."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.execute( conn.execute(
"INSERT OR REPLACE INTO credentials (name, data, expires_at) VALUES (?, ?, ?)", "INSERT OR REPLACE INTO credentials (name, data, expires_at) VALUES (?, ?, ?)",
(name, json.dumps(data), expires_at_ms), (name, json.dumps(data), expires_at_ms),
@@ -612,14 +618,14 @@ class CacheEngine:
def query_all(self) -> list[dict]: def query_all(self) -> list[dict]:
"""Return every row in the cache table.""" """Return every row in the cache table."""
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()] return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
def stats(self) -> dict: def stats(self) -> dict:
"""Return aggregate cache statistics.""" """Return aggregate cache statistics."""
now = int(time.time()) now = int(time.time())
with sqlite3.connect(self.db_path) as conn: with self._connect() as conn:
total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0] total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
expired = conn.execute( expired = conn.execute(
"SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", "SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
+118 -10
View File
@@ -7,18 +7,28 @@ Description: CLI interface.
import sys import sys
import time import time
import os import os
import asyncio
import json
from pathlib import Path from pathlib import Path
from typing import Annotated from typing import Annotated
from urllib.parse import quote from urllib.parse import quote
import cyclopts import cyclopts
from loguru import logger from loguru import logger
from .config import DB_PATH, enable_debug from .config import (
DB_PATH,
AppConfig,
load_config,
enable_debug,
)
from .models import TrackMeta from .models import TrackMeta
from .mpris import get_current_track from .mpris import get_current_track
from .core import LrcManager from .core import LrcManager
from .fetchers import FetcherMethodType from .fetchers import FetcherMethodType
from .lrc import get_sidecar_path from .lrc import get_sidecar_path
from .watch import WatchCoordinator
from .watch.control import ControlClient, parse_delta
from .watch.view.pipe import PipeOutput
app = cyclopts.App( app = cyclopts.App(
@@ -29,10 +39,17 @@ app.register_install_completion_command()
cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.") cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.")
app.command(cache_app) app.command(cache_app)
watch_app = cyclopts.App(name="watch", help="Watch MPRIS and output lyrics.")
app.command(watch_app)
ctl_app = cyclopts.App(name="ctl", help="Control a running watch session.")
watch_app.command(ctl_app)
# Global state set by the meta launcher # Global state set by the meta launcher
_player: str | None = None _player: str | None = None
_db_path: str | None = None _db_path: str | None = None
_app_config: AppConfig = AppConfig()
# Will be initialized before any command runs, safe to set to None here # Will be initialized before any command runs, safe to set to None here
manager: LrcManager = None # type: ignore manager: LrcManager = None # type: ignore
@@ -62,13 +79,13 @@ def launcher(
), ),
] = None, ] = None,
): ):
global _player, _db_path global _player, _db_path, _app_config, manager
if debug: if debug:
enable_debug() enable_debug()
_player = player _player = player
_db_path = str(Path(db_path).resolve()) if db_path else DB_PATH _db_path = str(Path(db_path).resolve()) if db_path else DB_PATH
global manager _app_config = load_config()
manager = LrcManager(db_path=_db_path) manager = LrcManager(db_path=_db_path, config=_app_config)
app(tokens) app(tokens)
@@ -114,7 +131,11 @@ def fetch(
] = False, ] = False,
): ):
"""Fetch and print lyrics for the currently playing track.""" """Fetch and print lyrics for the currently playing track."""
track = get_current_track(_player) track = get_current_track(
_player,
preferred_player=_app_config.general.preferred_player,
player_blacklist=_app_config.general.player_blacklist,
)
if not track: if not track:
logger.error("No active playing track found.") logger.error("No active playing track found.")
@@ -298,7 +319,11 @@ def export(
] = False, ] = False,
): ):
"""Export lyrics of the current track to a .lrc file.""" """Export lyrics of the current track to a .lrc file."""
track = get_current_track(_player) track = get_current_track(
_player,
preferred_player=_app_config.general.preferred_player,
player_blacklist=_app_config.general.player_blacklist,
)
if not track: if not track:
logger.error("No active playing track found.") logger.error("No active playing track found.")
sys.exit(1) sys.exit(1)
@@ -357,6 +382,73 @@ def export(
sys.exit(1) sys.exit(1)
# watch subcommands
@watch_app.command
def pipe(
before: Annotated[
int,
cyclopts.Parameter(
name="--before",
help="Number of lyric lines to show before current line.",
),
] = 0,
after: Annotated[
int,
cyclopts.Parameter(
name="--after",
help="Number of lyric lines to show after current line.",
),
] = 0,
):
"""Watch active player and continuously print lyric window to stdout."""
logger.info(
"Starting watch pipe (player filter: {})",
_player or "<none>",
)
output = PipeOutput(before=max(0, before), after=max(0, after))
try:
session = WatchCoordinator(
manager,
output,
player_hint=_player,
config=_app_config,
)
success = asyncio.run(session.run())
if not success:
sys.exit(1)
except KeyboardInterrupt:
logger.info("Watch stopped.")
@ctl_app.command
def offset(delta: str) -> None:
"""Adjust watch offset. Examples: +200, -200, 0."""
parsed_ok, parsed_delta, parse_error = parse_delta(delta)
if not parsed_ok or parsed_delta is None:
logger.error(parse_error or "Invalid offset delta")
sys.exit(1)
response = ControlClient(config=_app_config).send(
{"cmd": "offset", "delta": parsed_delta}
)
if not response.get("ok"):
logger.error(response.get("error", "Unknown error"))
sys.exit(1)
print(json.dumps(response, indent=2, ensure_ascii=False))
@ctl_app.command
def status() -> None:
"""Print current watch session status as JSON."""
response = ControlClient(config=_app_config).send({"cmd": "status"})
if not response.get("ok"):
logger.error(response.get("error", "Unknown error"))
sys.exit(1)
print(json.dumps(response, indent=2, ensure_ascii=False))
# cache subcommands # cache subcommands
@@ -379,7 +471,11 @@ def query(
print() print()
return return
track = get_current_track(_player) track = get_current_track(
_player,
preferred_player=_app_config.general.preferred_player,
player_blacklist=_app_config.general.player_blacklist,
)
if not track: if not track:
logger.error("No active playing track found.") logger.error("No active playing track found.")
sys.exit(1) sys.exit(1)
@@ -399,7 +495,11 @@ def clear(
manager.cache.clear_all() manager.cache.clear_all()
return return
track = get_current_track(_player) track = get_current_track(
_player,
preferred_player=_app_config.general.preferred_player,
player_blacklist=_app_config.general.player_blacklist,
)
if not track: if not track:
logger.error("No active playing track found.") logger.error("No active playing track found.")
sys.exit(1) sys.exit(1)
@@ -489,7 +589,11 @@ def confidence(
logger.error("Score must be between 0 and 100.") logger.error("Score must be between 0 and 100.")
sys.exit(1) sys.exit(1)
track = get_current_track(_player) track = get_current_track(
_player,
preferred_player=_app_config.general.preferred_player,
player_blacklist=_app_config.general.player_blacklist,
)
if not track: if not track:
logger.error("No active playing track found.") logger.error("No active playing track found.")
sys.exit(1) sys.exit(1)
@@ -513,7 +617,11 @@ def insert(
] = None, ] = None,
): ):
"""Manually insert lyrics into the cache for the current track.""" """Manually insert lyrics into the cache for the current track."""
track = get_current_track(_player) track = get_current_track(
_player,
preferred_player=_app_config.general.preferred_player,
player_blacklist=_app_config.general.player_blacklist,
)
if not track: if not track:
logger.error("No active playing track found.") logger.error("No active playing track found.")
sys.exit(1) sys.exit(1)
+130 -37
View File
@@ -1,14 +1,18 @@
""" """
Author: Uyanide pywang0608@foxmail.com Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:17:56 Date: 2026-03-25 10:17:56
Description: Global configuration constants and logger setup. Description: Global configuration constants, typed config dataclasses, and logger setup.
""" """
import dataclasses
import os import os
import sys import sys
import tomllib
from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, get_type_hints
from platformdirs import user_cache_dir, user_config_dir from platformdirs import user_cache_dir, user_config_dir
from dotenv import load_dotenv
from loguru import logger from loguru import logger
from importlib.metadata import version from importlib.metadata import version
@@ -24,13 +28,7 @@ DB_PATH = os.path.join(CACHE_DIR, "cache.db")
SLOT_SYNCED = "SYNCED" SLOT_SYNCED = "SYNCED"
SLOT_UNSYNCED = "UNSYNCED" SLOT_UNSYNCED = "UNSYNCED"
# .env loading _WATCH_SOCKET_PATH = str(Path(CACHE_DIR) / "watch.sock")
_config_env = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / ".env"
load_dotenv(_config_env) # ~/.config/lrx-cli/.env
load_dotenv() # .env in cwd (does NOT override existing vars)
# HTTP
HTTP_TIMEOUT = 10.0
# Cache TTLs (seconds) # Cache TTLs (seconds)
TTL_SYNCED = None # never expires TTL_SYNCED = None # never expires
@@ -66,36 +64,131 @@ UA_LRX = f"LRX-CLI {APP_VERSION} (https://github.com/Uyanide/lrx-cli)"
MUSIXMATCH_COOLDOWN_MS = 600_000 # 10 minutes MUSIXMATCH_COOLDOWN_MS = 600_000 # 10 minutes
# Player preference (used when multiple MPRIS players are active)
PREFERRED_PLAYER = os.environ.get("PREFERRED_PLAYER", "spotify")
class _Credentials:
"""Credential config with lazy os.environ reads.
Stable constants live as module-level names above.
Credentials are @property so monkeypatch.setenv / monkeypatch.delenv
affect them without needing to patch each consumer separately.
"""
@property
def SPOTIFY_SP_DC(self) -> str:
return os.environ.get("SPOTIFY_SP_DC", "")
@property
def QQ_MUSIC_API_URL(self) -> str:
return os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/")
@property
def MUSIXMATCH_USERTOKEN(self) -> str:
return os.environ.get("MUSIXMATCH_USERTOKEN", "")
credentials = _Credentials()
os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True)
# Logger
DEFAULT_PREFERRED_PLAYER = "spotify"
DEFAULT_PLAYER_BLACKLIST: tuple[str, ...] = (
"firefox",
"zen",
"chrome",
"chromium",
"vivaldi",
"edge",
"opera",
"mpv",
)
@dataclass(frozen=True)
class GeneralConfig:
preferred_player: str = DEFAULT_PREFERRED_PLAYER
player_blacklist: tuple[str, ...] = DEFAULT_PLAYER_BLACKLIST
http_timeout: float = 10.0
@dataclass(frozen=True)
class CredentialConfig:
spotify_sp_dc: str = ""
musixmatch_usertoken: str = ""
qq_music_api_url: str = ""
@dataclass(frozen=True)
class WatchConfig:
debounce_ms: int = 400
calibration_interval_s: float = 3.0
position_tick_ms: int = 50
socket_path: str = field(default_factory=lambda: _WATCH_SOCKET_PATH)
@dataclass(frozen=True)
class AppConfig:
general: GeneralConfig = field(default_factory=GeneralConfig)
credentials: CredentialConfig = field(default_factory=CredentialConfig)
watch: WatchConfig = field(default_factory=WatchConfig)
_CONFIG_PATH = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / "config.toml"
def _coerce(val: Any, hint: Any, section: str, name: str) -> Any:
"""Coerce and validate one TOML value against its declared field type."""
if hint is str:
if not isinstance(val, str):
raise ValueError(
f"[{section}].{name}: expected str, got {type(val).__name__}"
)
return val
if hint is int:
if not isinstance(val, int) or isinstance(val, bool):
raise ValueError(
f"[{section}].{name}: expected int, got {type(val).__name__}"
)
return val
if hint is float:
if isinstance(val, bool):
raise ValueError(f"[{section}].{name}: expected float, got bool")
if isinstance(val, (int, float)):
return float(val)
raise ValueError(
f"[{section}].{name}: expected float, got {type(val).__name__}"
)
origin = getattr(hint, "__origin__", None)
if origin is tuple:
if not isinstance(val, list):
raise ValueError(
f"[{section}].{name}: expected array, got {type(val).__name__}"
)
for i, item in enumerate(val):
if not isinstance(item, str):
raise ValueError(
f"[{section}].{name}[{i}]: expected str, got {type(item).__name__}"
)
return tuple(val)
raise ValueError(f"[{section}].{name}: unsupported field type {hint!r}")
def _parse_section(raw: dict[str, Any], cls: type, section: str) -> Any:
"""Parse one TOML section dict into a frozen dataclass, rejecting unknown keys."""
fields_map = {f.name: f for f in dataclasses.fields(cls)}
hints = get_type_hints(cls)
unknown = set(raw) - set(fields_map)
if unknown:
raise ValueError(
f"Unknown config keys in [{section}]: {', '.join(sorted(unknown))}"
)
kwargs: dict[str, Any] = {}
for name, f in fields_map.items():
if name not in raw:
if f.default is not dataclasses.MISSING:
kwargs[name] = f.default
elif f.default_factory is not dataclasses.MISSING: # type: ignore[misc]
kwargs[name] = f.default_factory()
continue
kwargs[name] = _coerce(raw[name], hints[name], section, name)
return cls(**kwargs)
def load_config(path: Path | None = None) -> AppConfig:
"""Load AppConfig from TOML file; return all-defaults when file is absent."""
resolved = path or _CONFIG_PATH
if not resolved.exists():
return AppConfig()
with open(resolved, "rb") as f:
data = tomllib.load(f)
return AppConfig(
general=_parse_section(data.get("general", {}), GeneralConfig, "general"),
credentials=_parse_section(
data.get("credentials", {}), CredentialConfig, "credentials"
),
watch=_parse_section(data.get("watch", {}), WatchConfig, "watch"),
)
_LOG_FORMAT = ( _LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | " "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | " "<level>{level: <8}</level> | "
+4 -3
View File
@@ -22,6 +22,7 @@ from .config import (
HIGH_CONFIDENCE, HIGH_CONFIDENCE,
SLOT_SYNCED, SLOT_SYNCED,
SLOT_UNSYNCED, SLOT_UNSYNCED,
AppConfig,
) )
from .models import TrackMeta, LyricResult, CacheStatus from .models import TrackMeta, LyricResult, CacheStatus
from .enrichers import create_enrichers, enrich_track from .enrichers import create_enrichers, enrich_track
@@ -92,10 +93,10 @@ def _has_negative_for_both_slots(cached_rows: list[LyricResult]) -> bool:
class LrcManager: class LrcManager:
"""Main entry point for fetching lyrics with caching.""" """Main entry point for fetching lyrics with caching."""
def __init__(self, db_path: str) -> None: def __init__(self, db_path: str, config: AppConfig = AppConfig()) -> None:
self.cache = CacheEngine(db_path=db_path) self.cache = CacheEngine(db_path=db_path)
self.authenticators = create_authenticators(self.cache) self.authenticators = create_authenticators(self.cache, config)
self.fetchers = create_fetchers(self.cache, self.authenticators) self.fetchers = create_fetchers(self.cache, self.authenticators, config)
self.enrichers = create_enrichers(self.authenticators) self.enrichers = create_enrichers(self.authenticators)
async def _run_group( async def _run_group(
+13 -11
View File
@@ -23,6 +23,7 @@ from ..authenticators import (
QQMusicAuthenticator, QQMusicAuthenticator,
) )
from ..cache import CacheEngine from ..cache import CacheEngine
from ..config import AppConfig
from ..models import TrackMeta from ..models import TrackMeta
FetcherMethodType = Literal[ FetcherMethodType = Literal[
@@ -52,26 +53,27 @@ _FETCHER_GROUPS: list[list[FetcherMethodType]] = [
def create_fetchers( def create_fetchers(
cache: CacheEngine, cache: CacheEngine,
authenticators: dict[str, BaseAuthenticator], authenticators: dict[str, BaseAuthenticator],
config: AppConfig,
) -> dict[FetcherMethodType, BaseFetcher]: ) -> dict[FetcherMethodType, BaseFetcher]:
"""Instantiate all fetchers. Returns a dict keyed by source name.""" """Instantiate all fetchers. Returns a dict keyed by source name."""
spotify_auth = authenticators["spotify"] spotify_auth = authenticators["spotify"]
mxm_auth = authenticators["musixmatch"] mxm_auth = authenticators["musixmatch"]
qqmusic_auth = authenticators.get("qqmusic") qqmusic_auth = authenticators["qqmusic"]
assert isinstance(spotify_auth, SpotifyAuthenticator) assert isinstance(spotify_auth, SpotifyAuthenticator)
assert isinstance(mxm_auth, MusixmatchAuthenticator) assert isinstance(mxm_auth, MusixmatchAuthenticator)
assert isinstance(qqmusic_auth, QQMusicAuthenticator) assert isinstance(qqmusic_auth, QQMusicAuthenticator)
fetchers: dict[FetcherMethodType, BaseFetcher] = { g = config.general
"local": LocalFetcher(), return {
"local": LocalFetcher(g),
"cache-search": CacheSearchFetcher(cache), "cache-search": CacheSearchFetcher(cache),
"spotify": SpotifyFetcher(spotify_auth), "spotify": SpotifyFetcher(g, spotify_auth),
"lrclib": LrclibFetcher(), "lrclib": LrclibFetcher(g),
"musixmatch-spotify": MusixmatchSpotifyFetcher(mxm_auth), "musixmatch-spotify": MusixmatchSpotifyFetcher(g, mxm_auth),
"lrclib-search": LrclibSearchFetcher(), "lrclib-search": LrclibSearchFetcher(g),
"netease": NeteaseFetcher(), "netease": NeteaseFetcher(g),
"qqmusic": QQMusicFetcher(qqmusic_auth), "qqmusic": QQMusicFetcher(g, qqmusic_auth),
"musixmatch": MusixmatchFetcher(mxm_auth), "musixmatch": MusixmatchFetcher(g, mxm_auth),
} }
return fetchers
def build_plan( def build_plan(
+8
View File
@@ -8,6 +8,8 @@ from abc import ABC, abstractmethod
from typing import Optional from typing import Optional
from dataclasses import dataclass from dataclasses import dataclass
from ..authenticators.base import BaseAuthenticator
from ..config import GeneralConfig
from ..models import CacheStatus, TrackMeta, LyricResult from ..models import CacheStatus, TrackMeta, LyricResult
@@ -38,6 +40,12 @@ class FetchResult:
class BaseFetcher(ABC): class BaseFetcher(ABC):
def __init__(
self, general: GeneralConfig, auth: Optional[BaseAuthenticator] = None
) -> None:
self._general = general
self._auth = auth
@property @property
@abstractmethod @abstractmethod
def source_name(self) -> str: def source_name(self) -> str:
+1 -2
View File
@@ -13,7 +13,6 @@ from .base import BaseFetcher, FetchResult
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED, TTL_UNSYNCED,
TTL_NOT_FOUND, TTL_NOT_FOUND,
UA_LRX, UA_LRX,
@@ -46,7 +45,7 @@ class LrclibFetcher(BaseFetcher):
logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}") logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.get(url, headers={"User-Agent": UA_LRX}) resp = await client.get(url, headers={"User-Agent": UA_LRX})
if resp.status_code == 404: if resp.status_code == 404:
+1 -2
View File
@@ -15,7 +15,6 @@ from .selection import SearchCandidate, select_best
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED, TTL_UNSYNCED,
TTL_NOT_FOUND, TTL_NOT_FOUND,
UA_LRX, UA_LRX,
@@ -73,7 +72,7 @@ class LrclibSearchFetcher(BaseFetcher):
had_error = False had_error = False
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
async def _query(params: dict[str, str]) -> tuple[list[dict], bool]: async def _query(params: dict[str, str]) -> tuple[list[dict], bool]:
url = f"{_LRCLIB_SEARCH_URL}?{urlencode(params)}" url = f"{_LRCLIB_SEARCH_URL}?{urlencode(params)}"
+14 -9
View File
@@ -18,6 +18,7 @@ from loguru import logger
from .base import BaseFetcher, FetchResult from .base import BaseFetcher, FetchResult
from .selection import SearchCandidate, select_best from .selection import SearchCandidate, select_best
from ..authenticators.musixmatch import MusixmatchAuthenticator from ..authenticators.musixmatch import MusixmatchAuthenticator
from ..config import GeneralConfig
from ..lrc import LRCData from ..lrc import LRCData
from ..models import CacheStatus, LyricResult, TrackMeta from ..models import CacheStatus, LyricResult, TrackMeta
@@ -145,22 +146,24 @@ async def _fetch_macro(
class MusixmatchSpotifyFetcher(BaseFetcher): class MusixmatchSpotifyFetcher(BaseFetcher):
"""Direct lookup by Spotify track ID — no search, single request.""" """Direct lookup by Spotify track ID — no search, single request."""
def __init__(self, auth: MusixmatchAuthenticator) -> None: _auth: MusixmatchAuthenticator
self.auth = auth
def __init__(self, general: GeneralConfig, auth: MusixmatchAuthenticator) -> None:
super().__init__(general, auth)
@property @property
def source_name(self) -> str: def source_name(self) -> str:
return "musixmatch-spotify" return "musixmatch-spotify"
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and not self.auth.is_cooldown() return bool(track.trackid) and not self._auth.is_cooldown()
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult: async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
logger.info(f"Musixmatch-Spotify: fetching lyrics for {track.display_name()}") logger.info(f"Musixmatch-Spotify: fetching lyrics for {track.display_name()}")
try: try:
lrc = await _fetch_macro( lrc = await _fetch_macro(
self.auth, self._auth,
{"track_spotify_id": track.trackid}, # type: ignore[dict-item] {"track_spotify_id": track.trackid}, # type: ignore[dict-item]
) )
except AttributeError: except AttributeError:
@@ -191,8 +194,10 @@ class MusixmatchSpotifyFetcher(BaseFetcher):
class MusixmatchFetcher(BaseFetcher): class MusixmatchFetcher(BaseFetcher):
"""Metadata search + best-candidate lyric fetch.""" """Metadata search + best-candidate lyric fetch."""
def __init__(self, auth: MusixmatchAuthenticator) -> None: _auth: MusixmatchAuthenticator
self.auth = auth
def __init__(self, general: GeneralConfig, auth: MusixmatchAuthenticator) -> None:
super().__init__(general, auth)
@property @property
def source_name(self) -> str: def source_name(self) -> str:
@@ -203,7 +208,7 @@ class MusixmatchFetcher(BaseFetcher):
return "musixmatch" return "musixmatch"
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and not self.auth.is_cooldown() return bool(track.title) and not self._auth.is_cooldown()
async def _search(self, track: TrackMeta) -> tuple[Optional[int], float]: async def _search(self, track: TrackMeta) -> tuple[Optional[int], float]:
"""Search for track metadata. Raises on network/HTTP errors.""" """Search for track metadata. Raises on network/HTTP errors."""
@@ -218,7 +223,7 @@ class MusixmatchFetcher(BaseFetcher):
params["q_album"] = track.album params["q_album"] = track.album
logger.debug(f"Musixmatch: searching for '{track.display_name()}'") logger.debug(f"Musixmatch: searching for '{track.display_name()}'")
data = await self.auth.get_json(_MUSIXMATCH_SEARCH_URL, params) data = await self._auth.get_json(_MUSIXMATCH_SEARCH_URL, params)
if data is None: if data is None:
return None, 0.0 return None, 0.0
@@ -270,7 +275,7 @@ class MusixmatchFetcher(BaseFetcher):
return FetchResult.from_not_found() return FetchResult.from_not_found()
lrc = await _fetch_macro( lrc = await _fetch_macro(
self.auth, self._auth,
{"commontrack_id": str(commontrack_id)}, {"commontrack_id": str(commontrack_id)},
) )
except AttributeError: except AttributeError:
+2 -3
View File
@@ -16,7 +16,6 @@ from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND, TTL_NOT_FOUND,
MULTI_CANDIDATE_DELAY_S, MULTI_CANDIDATE_DELAY_S,
UA_BROWSER, UA_BROWSER,
@@ -49,7 +48,7 @@ class NeteaseFetcher(BaseFetcher):
logger.debug(f"Netease: searching for '{query}' (limit={limit})") logger.debug(f"Netease: searching for '{query}' (limit={limit})")
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.post( resp = await client.post(
_NETEASE_SEARCH_URL, _NETEASE_SEARCH_URL,
headers=_NETEASE_BASE_HEADERS, headers=_NETEASE_BASE_HEADERS,
@@ -114,7 +113,7 @@ class NeteaseFetcher(BaseFetcher):
logger.debug(f"Netease: fetching lyrics for song_id={song_id}") logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.post( resp = await client.post(
_NETEASE_LYRIC_URL, _NETEASE_LYRIC_URL,
headers=_NETEASE_BASE_HEADERS, headers=_NETEASE_BASE_HEADERS,
+11 -9
View File
@@ -18,7 +18,7 @@ from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, GeneralConfig,
TTL_NOT_FOUND, TTL_NOT_FOUND,
MULTI_CANDIDATE_DELAY_S, MULTI_CANDIDATE_DELAY_S,
) )
@@ -29,15 +29,17 @@ from ..authenticators import QQMusicAuthenticator
class QQMusicFetcher(BaseFetcher): class QQMusicFetcher(BaseFetcher):
def __init__(self, auth: QQMusicAuthenticator) -> None: _auth: QQMusicAuthenticator
self.auth = auth
def __init__(self, general: GeneralConfig, auth: QQMusicAuthenticator) -> None:
super().__init__(general, auth)
@property @property
def source_name(self) -> str: def source_name(self) -> str:
return "qqmusic" return "qqmusic"
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and self.auth.is_configured() return bool(track.title) and self._auth.is_configured()
async def _search( async def _search(
self, track: TrackMeta, limit: int = 10 self, track: TrackMeta, limit: int = 10
@@ -49,9 +51,9 @@ class QQMusicFetcher(BaseFetcher):
logger.debug(f"QQMusic: searching for '{query}' (limit={limit})") logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.get( resp = await client.get(
f"{await self.auth.authenticate()}{_QQ_MUSIC_API_SEARCH_ENDPOINT}", f"{await self._auth.authenticate()}{_QQ_MUSIC_API_SEARCH_ENDPOINT}",
params={"keyword": query, "type": "song", "num": limit}, params={"keyword": query, "type": "song", "num": limit},
) )
resp.raise_for_status() resp.raise_for_status()
@@ -106,9 +108,9 @@ class QQMusicFetcher(BaseFetcher):
logger.debug(f"QQMusic: fetching lyrics for mid={mid}") logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.get( resp = await client.get(
f"{await self.auth.authenticate()}{_QQ_MUSIC_API_LYRIC_ENDPOINT}", f"{await self._auth.authenticate()}{_QQ_MUSIC_API_LYRIC_ENDPOINT}",
params={"mid": mid}, params={"mid": mid},
) )
resp.raise_for_status() resp.raise_for_status()
@@ -154,7 +156,7 @@ class QQMusicFetcher(BaseFetcher):
return FetchResult.from_network_error() return FetchResult.from_network_error()
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult: async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if not self.auth.is_configured(): if not self._auth.is_configured():
logger.debug("QQMusic: skipped — Auth not configured") logger.debug("QQMusic: skipped — Auth not configured")
return FetchResult() return FetchResult()
+8 -6
View File
@@ -11,21 +11,23 @@ from .base import BaseFetcher, FetchResult
from ..authenticators.spotify import SpotifyAuthenticator, SPOTIFY_BASE_HEADERS from ..authenticators.spotify import SpotifyAuthenticator, SPOTIFY_BASE_HEADERS
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData from ..lrc import LRCData
from ..config import HTTP_TIMEOUT, TTL_NOT_FOUND from ..config import GeneralConfig, TTL_NOT_FOUND
_SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/" _SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
class SpotifyFetcher(BaseFetcher): class SpotifyFetcher(BaseFetcher):
def __init__(self, auth: SpotifyAuthenticator) -> None: def __init__(self, general: GeneralConfig, auth: SpotifyAuthenticator) -> None:
self.auth = auth super().__init__(general, auth)
_auth: SpotifyAuthenticator
@property @property
def source_name(self) -> str: def source_name(self) -> str:
return "spotify" return "spotify"
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and self.auth.is_configured() return bool(track.trackid) and self._auth.is_configured()
@staticmethod @staticmethod
def _format_lrc_line(start_ms: int, words: str) -> str: def _format_lrc_line(start_ms: int, words: str) -> str:
@@ -52,7 +54,7 @@ class SpotifyFetcher(BaseFetcher):
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}") logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
token = await self.auth.authenticate() token = await self._auth.authenticate()
if not token: if not token:
logger.error("Spotify: cannot fetch lyrics without a token") logger.error("Spotify: cannot fetch lyrics without a token")
return FetchResult.from_network_error() return FetchResult.from_network_error()
@@ -65,7 +67,7 @@ class SpotifyFetcher(BaseFetcher):
} }
try: try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
res = await client.get(url, headers=headers) res = await client.get(url, headers=headers)
if res.status_code == 404: if res.status_code == 404:
+32 -13
View File
@@ -8,14 +8,17 @@ import asyncio
from dbus_next.aio.message_bus import MessageBus from dbus_next.aio.message_bus import MessageBus
from dbus_next.constants import BusType from dbus_next.constants import BusType
from dbus_next.message import Message from dbus_next.message import Message
from lrx_cli.config import DEFAULT_PLAYER_BLACKLIST, DEFAULT_PREFERRED_PLAYER
from lrx_cli.models import TrackMeta from lrx_cli.models import TrackMeta
from lrx_cli.config import PREFERRED_PLAYER
from loguru import logger from loguru import logger
from typing import Optional, List, Any from typing import Optional, List, Any
async def _list_mpris_players(bus: MessageBus) -> List[str]: async def _list_mpris_players(
"""List all MPRIS player bus names.""" bus: MessageBus,
player_blacklist: tuple[str, ...],
) -> List[str]:
"""List all MPRIS player bus names, excluding blacklisted entries."""
try: try:
reply = await bus.call( reply = await bus.call(
Message( Message(
@@ -28,7 +31,10 @@ async def _list_mpris_players(bus: MessageBus) -> List[str]:
if not reply or not reply.body: if not reply or not reply.body:
return [] return []
return [ return [
name for name in reply.body[0] if name.startswith("org.mpris.MediaPlayer2.") name
for name in reply.body[0]
if name.startswith("org.mpris.MediaPlayer2.")
and not any(x.lower() in name.lower() for x in player_blacklist)
] ]
except Exception as e: except Exception as e:
logger.error(f"Failed to list DBus names: {e}") logger.error(f"Failed to list DBus names: {e}")
@@ -53,15 +59,18 @@ async def _get_playback_status(bus: MessageBus, player_name: str) -> Optional[st
async def _select_player( async def _select_player(
bus: MessageBus, specific_player: Optional[str] = None bus: MessageBus,
specific_player: Optional[str],
preferred_player: str,
player_blacklist: tuple[str, ...],
) -> Optional[str]: ) -> Optional[str]:
"""Select the best MPRIS player. """Select the best MPRIS player.
When specific_player is given, filter by name match. When specific_player is given, filter by name match.
Otherwise: prefer the currently playing player. If multiple are playing, Otherwise: prefer the currently playing player. If multiple are playing,
prefer the one matching PREFERRED_PLAYER env var (default: spotify). prefer the one matching preferred_player (default: spotify).
""" """
players = await _list_mpris_players(bus) players = await _list_mpris_players(bus, player_blacklist)
if not players: if not players:
return None return None
@@ -82,8 +91,8 @@ async def _select_player(
if len(candidates) == 1: if len(candidates) == 1:
return candidates[0] return candidates[0]
# Multiple candidates: prefer PREFERRED_PLAYER # Multiple candidates: prefer preferred_player
preferred = PREFERRED_PLAYER.lower() preferred = preferred_player.lower()
if preferred: if preferred:
for p in candidates: for p in candidates:
if preferred in p.lower(): if preferred in p.lower():
@@ -92,7 +101,9 @@ async def _select_player(
async def _fetch_metadata_dbus( async def _fetch_metadata_dbus(
specific_player: Optional[str] = None, specific_player: Optional[str],
preferred_player: str,
player_blacklist: tuple[str, ...],
) -> Optional[TrackMeta]: ) -> Optional[TrackMeta]:
bus = None bus = None
try: try:
@@ -102,7 +113,9 @@ async def _fetch_metadata_dbus(
return None return None
try: try:
player_name = await _select_player(bus, specific_player) player_name = await _select_player(
bus, specific_player, preferred_player, player_blacklist
)
if not player_name: if not player_name:
logger.debug( logger.debug(
f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}." f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}."
@@ -182,9 +195,15 @@ async def _fetch_metadata_dbus(
bus.disconnect() bus.disconnect()
def get_current_track(player_name: Optional[str] = None) -> Optional[TrackMeta]: def get_current_track(
player_name: Optional[str] = None,
preferred_player: str = DEFAULT_PREFERRED_PLAYER,
player_blacklist: tuple[str, ...] = DEFAULT_PLAYER_BLACKLIST,
) -> Optional[TrackMeta]:
try: try:
return asyncio.run(_fetch_metadata_dbus(player_name)) return asyncio.run(
_fetch_metadata_dbus(player_name, preferred_player, player_blacklist)
)
except Exception as e: except Exception as e:
logger.error(f"DBus async loop failed: {e}") logger.error(f"DBus async loop failed: {e}")
return None return None
+5
View File
@@ -0,0 +1,5 @@
"""Watch subsystem public exports."""
from .session import WatchCoordinator
__all__ = ["WatchCoordinator"]
+152
View File
@@ -0,0 +1,152 @@
"""Unix-socket control channel for communicating with a running watch session."""
import asyncio
import json
from pathlib import Path
from typing import TYPE_CHECKING
from loguru import logger
from ..config import AppConfig
if TYPE_CHECKING:
from .session import WatchCoordinator
class ControlServer:
"""Control server that handles offset/status commands over a Unix socket."""
_socket_path: Path
_server: asyncio.AbstractServer | None
def __init__(
self,
config: AppConfig,
socket_path: Path | None = None,
) -> None:
"""Initialize control server with socket path from config or explicit override."""
self._socket_path: Path = socket_path or Path(config.watch.socket_path)
self._server: asyncio.AbstractServer | None = None
async def start(self, session: "WatchCoordinator") -> bool:
"""Start listening for control requests and bind session handlers."""
if not await self._prepare_socket_path():
return False
self._socket_path.parent.mkdir(parents=True, exist_ok=True)
self._server = await asyncio.start_unix_server(
lambda r, w: self._handle(session, r, w),
path=str(self._socket_path),
)
return True
async def _prepare_socket_path(self) -> bool:
"""Ensure socket path is usable and reject when another session is active."""
if not self._socket_path.exists():
return True
try:
reader, writer = await asyncio.open_unix_connection(str(self._socket_path))
writer.close()
await writer.wait_closed()
logger.error(
"A watch session is already running. Use 'lrx watch ctl status'."
)
return False
except Exception:
try:
self._socket_path.unlink(missing_ok=True)
except Exception:
pass
return True
async def stop(self) -> None:
"""Stop control server and remove stale socket path."""
if self._server is not None:
self._server.close()
await self._server.wait_closed()
self._server = None
try:
self._socket_path.unlink(missing_ok=True)
except Exception:
pass
async def _handle(
self,
session: "WatchCoordinator",
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Handle one control request and send JSON response."""
resp: dict[str, object] = {"ok": False, "error": "internal error"}
try:
line = await reader.readline()
if not line:
resp = {"ok": False, "error": "empty request"}
else:
req = json.loads(line.decode("utf-8"))
cmd = req.get("cmd")
if cmd == "offset":
delta = int(req.get("delta", 0))
resp = session.handle_offset(delta)
elif cmd == "status":
resp = session.handle_status()
else:
resp = {"ok": False, "error": "unknown command"}
except Exception as e:
resp = {"ok": False, "error": str(e)}
finally:
writer.write((json.dumps(resp) + "\n").encode("utf-8"))
await writer.drain()
writer.close()
await writer.wait_closed()
class ControlClient:
"""Control client used by CLI commands to talk to active watch session."""
_socket_path: Path
def __init__(
self,
config: AppConfig,
socket_path: Path | None = None,
) -> None:
"""Initialize control client with socket path from config or explicit override."""
self._socket_path: Path = socket_path or Path(config.watch.socket_path)
async def _send_async(self, cmd: dict[str, object]) -> dict[str, object]:
"""Send one JSON command to control server and return JSON response."""
if not self._socket_path.exists():
return {"ok": False, "error": "No watch session running."}
try:
reader, writer = await asyncio.open_unix_connection(str(self._socket_path))
except Exception:
return {"ok": False, "error": "No watch session running."}
writer.write((json.dumps(cmd) + "\n").encode("utf-8"))
await writer.drain()
line = await reader.readline()
writer.close()
await writer.wait_closed()
if not line:
return {"ok": False, "error": "Empty response."}
return json.loads(line.decode("utf-8"))
def send(self, cmd: dict[str, object]) -> dict[str, object]:
"""Synchronous wrapper around async control request."""
return asyncio.run(self._send_async(cmd))
def parse_delta(raw: str) -> tuple[bool, int | None, str | None]:
"""Parse signed millisecond offset delta string for ctl offset command."""
value = raw.strip()
try:
if value.startswith("+"):
return True, int(value[1:]), None
if value.startswith("-"):
return True, -int(value[1:]), None
return True, int(value), None
except ValueError:
return False, None, f"Invalid offset delta: {raw}"
+80
View File
@@ -0,0 +1,80 @@
"""Debounced lyric fetch orchestration for watch session."""
import asyncio
from typing import Awaitable, Callable, Optional
from ..config import AppConfig
from ..lrc import LRCData
from ..models import TrackMeta
class LyricFetcher:
"""Debounces track updates and runs at most one lyric fetch task at a time."""
_config: AppConfig
_fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]]
_on_fetching: Callable[[], Awaitable[None] | None]
_on_result: Callable[[Optional[LRCData]], Awaitable[None] | None]
_debounce_task: asyncio.Task | None
_fetch_task: asyncio.Task | None
_pending_track: TrackMeta | None
def __init__(
self,
fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]],
on_fetching: Callable[[], Awaitable[None] | None],
on_result: Callable[[Optional[LRCData]], Awaitable[None] | None],
config: AppConfig,
) -> None:
"""Initialize fetch callbacks and runtime options."""
self._config = config
self._fetch_func = fetch_func
self._on_fetching = on_fetching
self._on_result = on_result
self._debounce_task: asyncio.Task | None = None
self._fetch_task: asyncio.Task | None = None
self._pending_track: TrackMeta | None = None
async def stop(self) -> None:
"""Cancel and await all in-flight debounce/fetch tasks."""
for task in (self._debounce_task, self._fetch_task):
if task is not None:
task.cancel()
await asyncio.gather(
*[t for t in (self._debounce_task, self._fetch_task) if t is not None],
return_exceptions=True,
)
self._debounce_task = None
self._fetch_task = None
def request(self, track: TrackMeta) -> None:
"""Request lyrics for track with debounce collapsing."""
self._pending_track = track
if self._debounce_task is not None:
self._debounce_task.cancel()
self._debounce_task = asyncio.create_task(self._debounce_then_fetch())
async def _debounce_then_fetch(self) -> None:
"""Wait debounce window then start a fresh fetch task for latest pending track."""
await asyncio.sleep(self._config.watch.debounce_ms / 1000.0)
track = self._pending_track
if track is None:
return
if self._fetch_task is not None:
self._fetch_task.cancel()
await asyncio.gather(self._fetch_task, return_exceptions=True)
self._fetch_task = asyncio.create_task(self._do_fetch(track))
async def _do_fetch(self, track: TrackMeta) -> None:
"""Execute fetch lifecycle callbacks and fetch lyrics for a track."""
fetching_callback_result = self._on_fetching()
if asyncio.iscoroutine(fetching_callback_result):
await fetching_callback_result
lyrics = await self._fetch_func(track)
result_callback_result = self._on_result(lyrics)
if asyncio.iscoroutine(result_callback_result):
await result_callback_result
+412
View File
@@ -0,0 +1,412 @@
"""Player discovery, state monitoring, and active-player selection for watch mode."""
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
from dbus_next.aio.message_bus import MessageBus
from dbus_next.constants import BusType
from dbus_next.message import Message
from loguru import logger
from ..config import AppConfig
from ..models import TrackMeta
def _variant_value(item: object) -> object | None:
"""Extract .value from DBus variant-like objects when available."""
if hasattr(item, "value"):
return getattr(item, "value")
return None
@dataclass(slots=True)
class PlayerState:
"""Current observable state for one MPRIS player."""
bus_name: str
status: str
track: Optional[TrackMeta]
@dataclass(frozen=True, slots=True)
class PlayerTarget:
"""Constraint for choosing which players are visible to watch."""
hint: Optional[str] = None
player_blacklist: tuple[str, ...] = ()
def validation_error(self) -> str | None:
"""Return validation message when hint conflicts with blacklist, else None."""
normalized_hint = self.normalized_hint
if not normalized_hint:
return None
for blocked in self.player_blacklist:
normalized_blocked = blocked.strip().lower()
if not normalized_blocked:
continue
if _keyword_match(normalized_hint, normalized_blocked) or _keyword_match(
normalized_blocked, normalized_hint
):
return (
f"Requested player '{self.hint}' is blocked by "
f"PLAYER_BLACKLIST entry '{blocked}'."
)
return None
@property
def normalized_hint(self) -> str:
"""Return normalized lowercase player hint string."""
return (self.hint or "").strip().lower()
def allows(self, bus_name: str) -> bool:
"""Return whether given MPRIS bus name passes this target constraint."""
normalized_hint = self.normalized_hint
if not normalized_hint:
return True
return _keyword_match(bus_name, normalized_hint)
def _keyword_match(text: str, keyword: str) -> bool:
"""Return True when keyword exists in text, case-insensitively."""
return keyword.strip().lower() in text.lower()
class PlayerMonitor:
"""Tracks MPRIS players and forwards signal-driven state updates to session callbacks."""
_config: AppConfig
_on_players_changed: Callable[[], None]
_on_seeked: Callable[[str, int], None]
_on_playback_status: Callable[[str, str], None]
_target: PlayerTarget
players: dict[str, PlayerState]
_bus: MessageBus | None
_props_cache: dict[str, object]
def __init__(
self,
on_players_changed: Callable[[], None],
on_seeked: Callable[[str, int], None],
on_playback_status: Callable[[str, str], None],
config: AppConfig,
target: Optional[PlayerTarget] = None,
) -> None:
"""Initialize monitor callbacks, runtime options, and player target filter."""
self._config = config
self._on_players_changed = on_players_changed
self._on_seeked = on_seeked
self._on_playback_status = on_playback_status
self._target = target or PlayerTarget(
player_blacklist=self._config.general.player_blacklist
)
self.players: dict[str, PlayerState] = {}
self._bus: MessageBus | None = None
self._props_cache: dict[str, object] = {}
async def start(self) -> None:
"""Start DBus monitoring and populate initial player snapshot."""
self._bus = await MessageBus(bus_type=BusType.SESSION).connect()
self._bus.add_message_handler(self._on_message)
await self._add_match_rules()
await self.refresh()
async def close(self) -> None:
"""Stop DBus monitoring and close bus connection."""
self._props_cache.clear()
if self._bus:
self._bus.disconnect()
self._bus = None
async def _get_player_props(self, bus_name: str) -> object | None:
"""Return cached DBus Properties interface for player, creating it if missing."""
if not self._bus:
return None
if bus_name in self._props_cache:
return self._props_cache[bus_name]
try:
introspection = await self._bus.introspect(
bus_name, "/org/mpris/MediaPlayer2"
)
proxy = self._bus.get_proxy_object(
bus_name, "/org/mpris/MediaPlayer2", introspection
)
props = proxy.get_interface("org.freedesktop.DBus.Properties")
self._props_cache[bus_name] = props
return props
except Exception as e:
logger.debug(f"Failed to prepare DBus props for {bus_name}: {e}")
self._props_cache.pop(bus_name, None)
return None
async def _add_match_rules(self) -> None:
"""Register signal subscriptions needed by monitor."""
if not self._bus:
return
rules = [
"type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged'",
"type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'",
"type='signal',interface='org.mpris.MediaPlayer2.Player',member='Seeked'",
]
for rule in rules:
try:
await self._bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="AddMatch",
signature="s",
body=[rule],
)
)
except Exception as e:
logger.debug(f"Failed to add DBus match rule {rule}: {e}")
async def _list_mpris_players(self) -> list[str]:
"""List visible MPRIS players after applying blacklist and target filter."""
if not self._bus:
return []
try:
reply = await self._bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="ListNames",
)
)
if not reply or not reply.body:
return []
out: list[str] = []
for name in reply.body[0]:
if not name.startswith("org.mpris.MediaPlayer2."):
continue
if any(
x.lower() in name.lower()
for x in self._config.general.player_blacklist
):
continue
if not self._target.allows(name):
continue
out.append(name)
return out
except Exception as e:
logger.debug(f"Failed to list mpris players: {e}")
return []
async def _fetch_player_state(self, bus_name: str) -> Optional[PlayerState]:
"""Read current playback status and metadata from one player service."""
props = await self._get_player_props(bus_name)
if props is None:
return None
try:
status_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "PlaybackStatus"
)
metadata_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "Metadata"
)
status = status_var.value if status_var else "Stopped"
track = self._track_from_metadata(
metadata_var.value if metadata_var else {}
)
return PlayerState(bus_name=bus_name, status=status, track=track)
except Exception as e:
logger.debug(f"Failed to read state for {bus_name}: {e}")
self._props_cache.pop(bus_name, None)
return None
def _track_from_metadata(self, metadata: dict[str, object]) -> Optional[TrackMeta]:
"""Build TrackMeta object from MPRIS metadata map."""
if not metadata:
return None
trackid = metadata.get("mpris:trackid")
if trackid is not None:
trackid = _variant_value(trackid)
if isinstance(trackid, str) and trackid.startswith("spotify:track:"):
trackid = trackid.removeprefix("spotify:track:")
elif isinstance(trackid, str) and trackid.startswith("/com/spotify/track/"):
trackid = trackid.removeprefix("/com/spotify/track/")
elif not isinstance(trackid, str):
trackid = None
length = metadata.get("mpris:length")
length_ms = None
length_value = _variant_value(length) if length is not None else None
if isinstance(length_value, int):
length_ms = length_value // 1000
artist = metadata.get("xesam:artist")
artist_v = None
artist_value = _variant_value(artist) if artist is not None else None
if isinstance(artist_value, list) and artist_value:
artist_v = artist_value[0]
title = metadata.get("xesam:title")
album = metadata.get("xesam:album")
url = metadata.get("xesam:url")
title_value = _variant_value(title) if title is not None else None
album_value = _variant_value(album) if album is not None else None
url_value = _variant_value(url) if url is not None else None
return TrackMeta(
trackid=trackid,
length=length_ms,
album=album_value if isinstance(album_value, str) else None,
artist=artist_v,
title=title_value if isinstance(title_value, str) else None,
url=url_value if isinstance(url_value, str) else None,
)
async def refresh(self) -> None:
"""Refresh full player snapshot and notify session when visible set changes."""
players = await self._list_mpris_players()
updated: dict[str, PlayerState] = {}
for bus_name in players:
st = await self._fetch_player_state(bus_name)
if st is not None:
updated[bus_name] = st
before = set(self.players.keys())
after = set(updated.keys())
added = sorted(after - before)
removed = sorted(before - after)
for bus_name in removed:
self._props_cache.pop(bus_name, None)
self.players = updated
if added or removed:
logger.info(
"MPRIS players updated: added={}, removed={}",
added,
removed,
)
self._on_players_changed()
async def _resolve_well_known_name(self, unique_sender: str) -> str | None:
"""Map a DBus unique sender (e.g. :1.42) to a tracked MPRIS bus name."""
if unique_sender in self.players:
return unique_sender
if not self._bus:
return None
for bus_name in self.players:
try:
reply = await self._bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="GetNameOwner",
signature="s",
body=[bus_name],
)
)
if reply and reply.body and str(reply.body[0]) == unique_sender:
return bus_name
except Exception:
continue
return None
async def _handle_seeked_signal(self, sender: str, position_ms: int) -> None:
"""Route Seeked signal to session using well-known bus name when possible."""
bus_name = await self._resolve_well_known_name(sender)
if bus_name is not None:
self._on_seeked(bus_name, position_ms)
return
# If we cannot map sender reliably, force a state refresh to converge.
await self.refresh()
def _on_message(self, message: Message) -> bool:
"""Low-level DBus signal handler for player lifecycle/status/seek events."""
try:
if (
message.interface == "org.freedesktop.DBus"
and message.member == "NameOwnerChanged"
):
if message.body and str(message.body[0]).startswith(
"org.mpris.MediaPlayer2."
):
asyncio.create_task(self.refresh())
return False
if (
message.interface == "org.freedesktop.DBus.Properties"
and message.member == "PropertiesChanged"
):
# Message.sender is a DBus unique name, so match by path+iface.
path_ok = message.path == "/org/mpris/MediaPlayer2"
iface = message.body[0] if message.body else None
if path_ok and iface == "org.mpris.MediaPlayer2.Player":
asyncio.create_task(self.refresh())
return False
if (
message.interface == "org.mpris.MediaPlayer2.Player"
and message.member == "Seeked"
):
sender = message.sender or ""
if sender and message.body:
position_us = int(message.body[0])
asyncio.create_task(
self._handle_seeked_signal(
sender,
max(0, position_us // 1000),
)
)
return False
except Exception as e:
logger.debug(f"PlayerMonitor signal handling error: {e}")
return False
async def get_position_ms(self, bus_name: str) -> Optional[int]:
"""Read player-reported position in milliseconds."""
props = await self._get_player_props(bus_name)
if props is None:
return None
try:
position_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "Position"
)
if position_var is None:
return None
return max(0, int(position_var.value) // 1000)
except Exception as e:
logger.debug(f"Failed to read position from {bus_name}: {e}")
self._props_cache.pop(bus_name, None)
return None
class ActivePlayerSelector:
@staticmethod
def select(
players: dict[str, PlayerState],
last_active: str | None,
config: AppConfig,
) -> str | None:
"""Select active player by playing state, preferred keyword, and continuity."""
if not players:
return None
playing = [name for name, st in players.items() if st.status == "Playing"]
if len(playing) == 1:
return playing[0]
preferred = config.general.preferred_player.lower().strip()
candidates = playing if playing else list(players.keys())
if preferred:
for name in candidates:
if preferred in name.lower():
return name
if last_active and last_active in players:
return last_active
return candidates[0] if candidates else None
+389
View File
@@ -0,0 +1,389 @@
"""Watch orchestration with explicit MVVM role boundaries.
- Model: WatchModel stores domain state.
- ViewModel: WatchViewModel projects model to output-facing state/signature.
- Coordinator: WatchCoordinator wires services and drives async workflows.
"""
import asyncio
from dataclasses import asdict
from typing import Optional
from loguru import logger
from ..core import LrcManager
from ..lrc import LRCData
from ..models import TrackMeta
from .control import ControlServer
from .fetcher import LyricFetcher
from ..config import AppConfig
from .view import BaseOutput, LyricView, WatchState
from .player import ActivePlayerSelector, PlayerMonitor, PlayerTarget
from .tracker import PositionTracker
class WatchModel:
"""Model layer that owns watch state and lyric timeline representation."""
offset_ms: int
active_player: str | None
active_track_key: str | None
status: str
lyrics: LyricView | None
def __init__(self) -> None:
self.offset_ms = 0
self.active_player: str | None = None
self.active_track_key: str | None = None
self.status: str = "idle"
self.lyrics: LyricView | None = None
def set_lyrics(self, lyrics: LRCData | None) -> None:
"""Update lyrics and rebuild projection once per lyric object change."""
if lyrics is None:
self.lyrics = None
return
self.lyrics = LyricView.from_lrc(lyrics)
def state_signature(self, track: TrackMeta | None, position_ms: int) -> tuple:
"""Build dedupe signature from model state and current lyric cursor."""
track_key = (
track.trackid
if track and track.trackid
else track.display_name()
if track
else None
)
if self.status != "ok" or self.lyrics is None:
return ("status", self.status, self.active_player, track_key)
at_ms = position_ms + self.offset_ms
cursor = self.lyrics.signature_cursor(at_ms)
return ("lyrics", self.active_player, track_key, cursor)
class WatchViewModel:
"""ViewModel that projects WatchModel into view-consumable snapshots."""
_model: WatchModel
def __init__(self, model: WatchModel) -> None:
self._model = model
def signature(self, track: TrackMeta | None, position_ms: int) -> tuple:
"""Build dedupe signature for current projected state."""
return self._model.state_signature(track, position_ms)
def state(self, track: TrackMeta | None, position_ms: int) -> WatchState:
"""Project model values into immutable WatchState payload."""
return WatchState(
track=track,
lyrics=self._model.lyrics,
position_ms=position_ms,
offset_ms=self._model.offset_ms,
status=self._model.status, # type: ignore[arg-type]
)
class WatchCoordinator:
"""Application/service orchestration layer for watch runtime."""
_manager: LrcManager
_output: BaseOutput
_config: AppConfig
_model: WatchModel
_view_model: WatchViewModel
_player_hint: str | None
_last_emit_signature: tuple | None
_target: PlayerTarget
_control: ControlServer
_player_monitor: PlayerMonitor
_tracker: PositionTracker
_fetcher: LyricFetcher
_emit_scheduled: bool
_calibration_task: asyncio.Task | None
def __init__(
self,
manager: LrcManager,
output: BaseOutput,
player_hint: str | None,
config: AppConfig,
) -> None:
self._manager = manager
self._output = output
self._config = config
self._model = WatchModel()
self._view_model = WatchViewModel(self._model)
self._player_hint = player_hint
self._last_emit_signature: tuple | None = None
self._emit_scheduled = False
self._calibration_task = None
self._target = PlayerTarget(
hint=player_hint,
player_blacklist=self._config.general.player_blacklist,
)
self._control = ControlServer(config=self._config)
self._player_monitor = PlayerMonitor(
on_players_changed=self._on_player_change,
on_seeked=self._on_seeked,
on_playback_status=self._on_playback_status,
config=self._config,
target=self._target,
)
self._tracker = PositionTracker(
poll_position_ms=self._player_monitor.get_position_ms,
config=self._config,
on_tick=self._on_tracker_tick,
)
self._fetcher = LyricFetcher(
fetch_func=self._fetch_lyrics,
on_fetching=self._on_fetching,
on_result=self._on_lyrics_update,
config=self._config,
)
async def run(self) -> bool:
"""Run watch workflow and return success flag."""
target_issue = self._target.validation_error()
if target_issue:
logger.error(target_issue)
return False
logger.info(
"watch session starting (player filter: {})",
self._player_hint or "<none>",
)
if not await self._control.start(self):
return False
try:
await self._player_monitor.start()
await self._tracker.start()
self._calibration_task = asyncio.create_task(self._calibration_loop())
self._schedule_emit()
await asyncio.Event().wait()
return True
except asyncio.CancelledError:
return True
except Exception as exc:
logger.exception("watch runtime error: {}", exc)
return False
finally:
logger.info("watch session stopping")
if self._calibration_task is not None:
self._calibration_task.cancel()
await asyncio.gather(self._calibration_task, return_exceptions=True)
self._calibration_task = None
await self._fetcher.stop()
await self._tracker.stop()
await self._player_monitor.close()
await self._control.stop()
async def _calibration_loop(self) -> None:
"""Periodically refresh full MPRIS snapshot as fallback calibration."""
interval = max(0.1, self._config.watch.calibration_interval_s)
while True:
await asyncio.sleep(interval)
try:
await self._player_monitor.refresh()
except asyncio.CancelledError:
raise
except Exception as exc:
logger.debug("mpris calibration refresh failed: {}", exc)
def _active_track(self) -> TrackMeta | None:
"""Return active track metadata from selected player."""
player = self._player_monitor.players.get(self._model.active_player or "")
return player.track if player else None
def _request_fetch_for_active_track(self, reason: str) -> bool:
"""Trigger lyric fetch for active track when needed."""
track = self._active_track()
if track is None:
return False
if self._model.lyrics is not None:
return False
if self._model.status == "fetching":
return False
logger.info("fetching lyrics for track ({}): {}", reason, track.display_name())
self._fetcher.request(track)
return True
async def _fetch_lyrics(self, track: TrackMeta) -> Optional[LRCData]:
"""Fetch lyrics in worker thread."""
result = await asyncio.to_thread(
self._manager.fetch_for_track,
track,
None,
False,
False,
)
if result and result.lyrics:
return result.lyrics
return None
def _on_player_change(self) -> None:
"""React to monitor player snapshot change."""
prev_player = self._model.active_player
prev_track_key = self._model.active_track_key
selected = ActivePlayerSelector.select(
self._player_monitor.players,
self._model.active_player,
self._config,
)
self._model.active_player = selected
if selected != prev_player:
logger.info(
"active player changed: {} -> {}",
prev_player or "<none>",
selected or "<none>",
)
if selected is None:
self._model.status = "idle"
self._model.active_track_key = None
self._model.set_lyrics(None)
self._schedule_emit()
return
state = self._player_monitor.players.get(selected)
if state is None:
self._model.status = "idle"
self._model.active_track_key = None
self._model.set_lyrics(None)
self._schedule_emit()
return
track = state.track
track_key = (
track.trackid
if track and track.trackid
else track.display_name()
if track
else None
)
track_changed = track_key != prev_track_key
player_changed = selected != prev_player
if track_changed or player_changed:
self._model.set_lyrics(None)
self._model.active_track_key = track_key
asyncio.create_task(
self._tracker.set_active_player(
selected,
state.status,
track_key,
)
)
if state.status != "Playing":
self._model.status = "paused"
self._schedule_emit()
return
started_fetch = False
if track is not None and (
player_changed or track_changed or self._model.lyrics is None
):
started_fetch = self._request_fetch_for_active_track("track-changed")
if self._model.lyrics is not None:
self._model.status = "ok"
elif started_fetch:
self._model.status = "fetching"
elif self._model.status != "fetching":
self._model.status = "no_lyrics"
self._schedule_emit()
def _on_seeked(self, bus_name: str, position_ms: int) -> None:
"""Forward seek event to tracker."""
asyncio.create_task(self._tracker.on_seeked(bus_name, position_ms))
def _on_playback_status(self, bus_name: str, status: str) -> None:
"""React to playback status change and tracker sync."""
if bus_name == self._model.active_player:
if status == "Playing":
started_fetch = self._request_fetch_for_active_track("resume-playing")
if self._model.lyrics is not None:
self._model.status = "ok"
elif started_fetch:
self._model.status = "fetching"
elif self._model.status != "fetching":
self._model.status = "no_lyrics"
else:
self._model.status = "paused"
self._schedule_emit()
asyncio.create_task(self._tracker.on_playback_status(bus_name, status))
def _on_tracker_tick(self) -> None:
"""Emit updates from tracker tick only while lyrics are actively rendering."""
if self._model.status == "ok":
self._schedule_emit()
def _schedule_emit(self) -> None:
"""Coalesce frequent events into at most one in-flight emit task."""
if self._emit_scheduled:
return
self._emit_scheduled = True
asyncio.create_task(self._run_scheduled_emit())
async def _run_scheduled_emit(self) -> None:
"""Run one coalesced emit and release scheduler gate."""
try:
await self._emit_state()
finally:
self._emit_scheduled = False
async def _on_fetching(self) -> None:
"""Mark model as fetching and emit state."""
self._model.status = "fetching"
await self._emit_state()
async def _on_lyrics_update(self, lyrics: Optional[LRCData]) -> None:
"""Update model with fetched lyrics and emit state."""
self._model.set_lyrics(lyrics)
self._model.status = "ok" if lyrics is not None else "no_lyrics"
logger.info(
"lyrics update result: {}",
"found" if lyrics is not None else "not found",
)
await self._emit_state()
async def _emit_state(self) -> None:
"""Emit output state only when semantic signature changes."""
player = self._player_monitor.players.get(self._model.active_player or "")
track = player.track if player else None
position = await self._tracker.get_position_ms()
signature = self._view_model.signature(track, position)
if signature == self._last_emit_signature:
return
self._last_emit_signature = signature
state = self._view_model.state(track, position)
await self._output.on_state(state)
def handle_offset(self, delta: int) -> dict:
"""Apply offset update requested by control channel."""
self._model.offset_ms += delta
return {"ok": True, "offset_ms": self._model.offset_ms}
def handle_status(self) -> dict:
"""Return status payload for control channel."""
player = self._player_monitor.players.get(self._model.active_player or "")
track = asdict(player.track) if player and player.track else None
return {
"ok": True,
"offset_ms": self._model.offset_ms,
"player": self._model.active_player,
"track": track,
"position_ms": self._tracker.peek_position_ms(),
"lyrics_status": self._model.status,
}
+143
View File
@@ -0,0 +1,143 @@
"""Playback position tracking utilities for watch mode."""
import asyncio
import time
from typing import Awaitable, Callable, Optional
from ..config import AppConfig
class PositionTracker:
"""Maintains an estimated playback position from seek/status events plus local clock."""
_config: AppConfig
_poll_position_ms: Callable[[str], Awaitable[Optional[int]]]
_active_player: str | None
_is_playing: bool
_track_key: str | None
_position_ms: int
_last_tick: float
_fast_task: asyncio.Task | None
_on_tick: Callable[[], None] | None
_lock: asyncio.Lock
def __init__(
self,
poll_position_ms: Callable[[str], Awaitable[Optional[int]]],
config: AppConfig,
on_tick: Callable[[], None] | None = None,
) -> None:
"""Initialize tracker with position polling callback and runtime options."""
self._config = config
self._poll_position_ms = poll_position_ms
self._on_tick = on_tick
self._active_player: str | None = None
self._is_playing = False
self._track_key: str | None = None
self._position_ms = 0
self._last_tick = time.monotonic()
self._fast_task: asyncio.Task | None = None
self._lock = asyncio.Lock()
async def start(self) -> None:
"""Start local monotonic position ticking task."""
self._last_tick = time.monotonic()
self._fast_task = asyncio.create_task(self._fast_loop())
async def stop(self) -> None:
"""Stop tracker tasks and await clean cancellation."""
tasks = [t for t in (self._fast_task,) if t is not None]
for task in tasks:
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._fast_task = None
async def set_active_player(
self,
bus_name: str | None,
playback_status: str,
track_key: str | None,
) -> None:
"""Switch active source and calibrate position once when entering a new playing track."""
should_calibrate_now = False
async with self._lock:
player_changed = self._active_player != bus_name
track_changed = self._track_key != track_key
was_playing = self._is_playing
self._active_player = bus_name
self._is_playing = playback_status == "Playing"
status_changed_to_playing = self._is_playing and not was_playing
if player_changed or track_changed:
self._position_ms = 0
should_calibrate_now = (
self._is_playing
and bool(self._active_player)
and (player_changed or track_changed or status_changed_to_playing)
)
self._track_key = track_key
self._last_tick = time.monotonic()
if should_calibrate_now and self._active_player:
await self._calibrate_once(self._active_player)
async def on_seeked(self, bus_name: str, position_ms: int) -> None:
"""Apply explicit seek position update for active player."""
async with self._lock:
if bus_name != self._active_player:
return
self._position_ms = max(0, position_ms)
self._last_tick = time.monotonic()
async def on_playback_status(self, bus_name: str, playback_status: str) -> None:
"""Update playing state and calibrate once on paused-to-playing transition."""
should_calibrate_now = False
async with self._lock:
if bus_name != self._active_player:
return
was_playing = self._is_playing
self._is_playing = playback_status == "Playing"
should_calibrate_now = self._is_playing and not was_playing
self._last_tick = time.monotonic()
if should_calibrate_now:
await self._calibrate_once(bus_name)
async def _fast_loop(self) -> None:
"""Advance position by monotonic clock while active player is playing."""
interval = self._config.watch.position_tick_ms / 1000.0
while True:
await asyncio.sleep(interval)
should_notify = False
async with self._lock:
now = time.monotonic()
if self._is_playing and self._active_player:
delta_ms = int((now - self._last_tick) * 1000)
if delta_ms > 0:
self._position_ms += delta_ms
should_notify = True
self._last_tick = now
if should_notify and self._on_tick is not None:
self._on_tick()
async def _calibrate_once(self, bus_name: str) -> None:
"""Poll player-reported position once and synchronize local tracker state."""
polled = await self._poll_position_ms(bus_name)
if polled is None:
return
async with self._lock:
if bus_name != self._active_player:
return
# Drift correction is signal-assisted; polling is fallback.
self._position_ms = max(0, polled)
self._last_tick = time.monotonic()
async def get_position_ms(self) -> int:
"""Return current tracked position in milliseconds."""
async with self._lock:
return max(0, int(self._position_ms))
def peek_position_ms(self) -> int:
"""Return current tracked position without awaiting lock (best-effort snapshot)."""
return max(0, int(self._position_ms))
+80
View File
@@ -0,0 +1,80 @@
"""Output abstraction types for watch mode rendering."""
from abc import ABC, abstractmethod
from bisect import bisect_right
from dataclasses import dataclass
from typing import Literal, Optional
from ...lrc import LRCData, LyricLine
from ...models import TrackMeta
@dataclass(slots=True, frozen=True)
class LyricView:
"""View-ready immutable lyric data projected from one normalized LRC object."""
normalized: LRCData
lines: tuple[str, ...]
timed_line_entries: tuple[tuple[int, int], ...]
timestamps: tuple[int, ...]
@staticmethod
def from_lrc(lyrics: LRCData) -> "LyricView":
"""Build a view projection once from normalized lyrics."""
normalized = lyrics.normalize()
lines: list[str] = []
entries: list[tuple[int, int]] = []
line_index = 0
for line in normalized.lines:
if not isinstance(line, LyricLine):
continue
text = line.text
lines.append(text)
timestamp = line.line_times_ms[0] if line.line_times_ms else 0
entries.append((max(0, timestamp), line_index))
line_index += 1
timestamps = tuple(timestamp for timestamp, _ in entries)
return LyricView(
normalized=normalized,
lines=tuple(lines),
timed_line_entries=tuple(entries),
timestamps=timestamps,
)
def signature_cursor(self, at_ms: int) -> tuple:
"""Build a stable cursor signature for dedupe decisions."""
if not self.timed_line_entries:
return ("plain", self.lines)
first_ts = self.timed_line_entries[0][0]
if at_ms < first_ts:
return ("before_first", first_ts)
idx = bisect_right(self.timestamps, at_ms) - 1
if idx < 0:
idx = 0
ts, line_idx = self.timed_line_entries[idx]
text = self.lines[line_idx] if line_idx < len(self.lines) else ""
return ("ok", idx, ts, text)
@dataclass(slots=True)
class WatchState:
"""Immutable snapshot payload delivered from session to output implementations."""
track: Optional[TrackMeta]
lyrics: Optional[LyricView]
position_ms: int
offset_ms: int
status: Literal["fetching", "ok", "no_lyrics", "paused", "idle"]
class BaseOutput(ABC):
@abstractmethod
async def on_state(self, state: WatchState) -> None:
"""Render or deliver one watch state frame."""
...
+85
View File
@@ -0,0 +1,85 @@
"""Pipe output implementation for watch mode."""
from bisect import bisect_right
from dataclasses import dataclass
import sys
from . import BaseOutput, WatchState
@dataclass(slots=True)
class PipeOutput(BaseOutput):
"""Render a fixed lyric context window to stdout for streaming/pipe usage."""
before: int = 0
after: int = 0
def _window_size(self) -> int:
"""Return rendered lyric window size."""
return self.before + 1 + self.after
def _render_status(self, message: str) -> list[str]:
"""Render centered status line in fixed-size window."""
lines = [""] * self._window_size()
lines[self.before] = message
return lines
def _render_lyrics(self, state: WatchState) -> list[str]:
"""Render context lines centered on current timed lyric entry."""
if state.lyrics is None:
return self._render_status("[no lyrics]")
all_lines = state.lyrics.lines
if not all_lines:
return self._render_status("[no lyrics]")
entries = state.lyrics.timed_line_entries
effective_ms = state.position_ms + state.offset_ms
current_line_idx: int | None
if entries and effective_ms < entries[0][0]:
# Before first timestamp, current lyric is empty and after-window shows upcoming lines.
current_line_idx = None
else:
if not entries:
current_line_idx = 0
else:
current_entry_idx = (
bisect_right(state.lyrics.timestamps, effective_ms) - 1
)
if current_entry_idx < 0:
current_entry_idx = 0
current_line_idx = entries[current_entry_idx][1]
out: list[str] = []
for rel in range(-self.before, self.after + 1):
if current_line_idx is None:
if rel <= 0:
out.append("")
continue
line_idx = rel - 1
else:
line_idx = current_line_idx + rel
if 0 <= line_idx < len(all_lines):
out.append(all_lines[line_idx])
else:
out.append("")
return out
async def on_state(self, state: WatchState) -> None:
"""Render and flush one frame for the latest watch state."""
if state.status == "fetching":
lines = self._render_status("[fetching...]")
elif state.status == "no_lyrics":
lines = self._render_status("[no lyrics]")
elif state.status == "paused":
lines = self._render_status("[paused]")
elif state.status == "idle":
lines = self._render_status("[idle]")
else:
lines = self._render_lyrics(state)
for line in lines:
print(line)
sys.stdout.flush()
View File
-10
View File
@@ -1,13 +1,3 @@
import pytest
from lrx_cli.config import enable_debug from lrx_cli.config import enable_debug
enable_debug() enable_debug()
@pytest.fixture
def no_credentials(monkeypatch):
"""Clear all credential env vars so only anonymous fetchers are active."""
monkeypatch.delenv("SPOTIFY_SP_DC", raising=False)
monkeypatch.delenv("QQ_MUSIC_API_URL", raising=False)
monkeypatch.delenv("MUSIXMATCH_USERTOKEN", raising=False)
+10 -8
View File
@@ -1,16 +1,18 @@
import os
import pytest import pytest
from lrx_cli.config import load_config
_credentials = load_config().credentials
requires_spotify = pytest.mark.skipif( requires_spotify = pytest.mark.skipif(
not os.environ.get("SPOTIFY_SP_DC"), not _credentials.spotify_sp_dc,
reason="requires SPOTIFY_SP_DC", reason="requires credentials.spotify_sp_dc in config.toml",
) )
requires_qq_music = pytest.mark.skipif( requires_qq_music = pytest.mark.skipif(
not os.environ.get("QQ_MUSIC_API_URL"), not _credentials.qq_music_api_url,
reason="requires QQ_MUSIC_API_URL", reason="requires credentials.qq_music_api_url in config.toml",
) )
requires_musixmatch_token = pytest.mark.skipif( requires_musixmatch_token = pytest.mark.skipif(
not os.environ.get("MUSIXMATCH_USERTOKEN"), not _credentials.musixmatch_usertoken,
reason="requires MUSIXMATCH_USERTOKEN", reason="requires credentials.musixmatch_usertoken in config.toml",
) )
+59
View File
@@ -0,0 +1,59 @@
import pytest
from lrx_cli.config import AppConfig, CredentialConfig, WatchConfig, load_config
def test_missing_file_returns_defaults(tmp_path):
assert load_config(tmp_path / "nonexistent.toml") == AppConfig()
def test_empty_file_returns_defaults(tmp_path):
p = tmp_path / "config.toml"
p.write_text("")
assert load_config(p) == AppConfig()
def test_partial_section_keeps_other_defaults(tmp_path):
p = tmp_path / "config.toml"
p.write_bytes(b"[watch]\ndebounce_ms = 200\n")
cfg = load_config(p)
assert cfg.watch.debounce_ms == 200
assert cfg.watch.calibration_interval_s == WatchConfig().calibration_interval_s
def test_credentials_roundtrip(tmp_path):
p = tmp_path / "config.toml"
p.write_bytes(
b"[credentials]\n"
b'spotify_sp_dc = "abc"\n'
b'qq_music_api_url = "http://localhost:3000"\n'
)
assert load_config(p).credentials == CredentialConfig(
spotify_sp_dc="abc", qq_music_api_url="http://localhost:3000"
)
def test_int_coerced_to_float(tmp_path):
p = tmp_path / "config.toml"
p.write_bytes(b"[general]\nhttp_timeout = 5\n")
assert load_config(p).general.http_timeout == 5.0
def test_unknown_key_raises(tmp_path):
p = tmp_path / "config.toml"
p.write_bytes(b"[general]\ntypo_key = 1\n")
with pytest.raises(ValueError, match="Unknown config keys"):
load_config(p)
def test_wrong_type_raises(tmp_path):
p = tmp_path / "config.toml"
p.write_bytes(b"[watch]\ndebounce_ms = true\n")
with pytest.raises(ValueError, match="expected int"):
load_config(p)
def test_app_config_is_frozen():
cfg = AppConfig()
with pytest.raises(Exception):
cfg.general = None # type: ignore[misc]
+23 -15
View File
@@ -1,14 +1,16 @@
from pathlib import Path
import pytest
from dataclasses import replace from dataclasses import replace
from pathlib import Path
import pytest
from lrx_cli.config import AppConfig, load_config
from lrx_cli.core import LrcManager
from lrx_cli.fetchers import FetcherMethodType from lrx_cli.fetchers import FetcherMethodType
from lrx_cli.models import TrackMeta from lrx_cli.models import TrackMeta
from lrx_cli.core import LrcManager
from tests.marks import ( from tests.marks import (
requires_spotify,
requires_qq_music,
requires_musixmatch_token, requires_musixmatch_token,
requires_qq_music,
requires_spotify,
) )
SAMPLE_SPOTIFY_TRACK: TrackMeta = TrackMeta( SAMPLE_SPOTIFY_TRACK: TrackMeta = TrackMeta(
@@ -33,7 +35,14 @@ SAMPLE_SPOTIFY_TRACK_ALBUM_ARTIST_MODIFIED = replace(
@pytest.fixture @pytest.fixture
def lrc_manager(tmp_path: Path) -> LrcManager: def lrc_manager(tmp_path: Path) -> LrcManager:
return LrcManager(str(tmp_path / "cache.db")) """LrcManager with empty credentials (no auth required)."""
return LrcManager(str(tmp_path / "cache.db"), AppConfig())
@pytest.fixture
def cred_lrc_manager(tmp_path: Path) -> LrcManager:
"""LrcManager with credentials from config.toml (for CI/network tests)."""
return LrcManager(str(tmp_path / "cache.db"), load_config())
def _fetch_and_assert( def _fetch_and_assert(
@@ -112,7 +121,6 @@ def test_cache_search_fetcher_prefer_better_match(lrc_manager: LrcManager):
], ],
) )
def test_anonymous_remote_fetchers( def test_anonymous_remote_fetchers(
no_credentials,
lrc_manager: LrcManager, lrc_manager: LrcManager,
method: FetcherMethodType, method: FetcherMethodType,
expect_fail: bool, expect_fail: bool,
@@ -122,18 +130,18 @@ def test_anonymous_remote_fetchers(
@pytest.mark.network @pytest.mark.network
@requires_spotify @requires_spotify
def test_spotify_fetcher(lrc_manager: LrcManager): def test_spotify_fetcher(cred_lrc_manager: LrcManager):
_fetch_and_assert(lrc_manager, "spotify") _fetch_and_assert(cred_lrc_manager, "spotify")
@pytest.mark.network @pytest.mark.network
@requires_qq_music @requires_qq_music
def test_qqmusic_fetcher(lrc_manager: LrcManager): def test_qqmusic_fetcher(cred_lrc_manager: LrcManager):
_fetch_and_assert(lrc_manager, "qqmusic") _fetch_and_assert(cred_lrc_manager, "qqmusic")
@pytest.mark.network @pytest.mark.network
def test_musixmatch_anonymous_fetcher(no_credentials, lrc_manager: LrcManager): def test_musixmatch_anonymous_fetcher(lrc_manager: LrcManager):
# These fetchers should be tested in a single test to share the same usertoken # These fetchers should be tested in a single test to share the same usertoken
# Otherwise the second may fail due to rate limits # Otherwise the second may fail due to rate limits
_fetch_and_assert(lrc_manager, "musixmatch", expect_fail=False) _fetch_and_assert(lrc_manager, "musixmatch", expect_fail=False)
@@ -142,9 +150,9 @@ def test_musixmatch_anonymous_fetcher(no_credentials, lrc_manager: LrcManager):
@pytest.mark.network @pytest.mark.network
@requires_musixmatch_token @requires_musixmatch_token
def test_musixmatch_fetcher(lrc_manager: LrcManager): def test_musixmatch_fetcher(cred_lrc_manager: LrcManager):
_fetch_and_assert(lrc_manager, "musixmatch") _fetch_and_assert(cred_lrc_manager, "musixmatch")
_fetch_and_assert(lrc_manager, "musixmatch-spotify") _fetch_and_assert(cred_lrc_manager, "musixmatch-spotify")
def test_local_fetcher(lrc_manager: LrcManager): def test_local_fetcher(lrc_manager: LrcManager):
+123
View File
@@ -0,0 +1,123 @@
from __future__ import annotations
import asyncio
from pathlib import Path
from lrx_cli.config import AppConfig
from lrx_cli.enrichers.audio_tag import AudioTagEnricher
from lrx_cli.enrichers.file_name import FileNameEnricher
from lrx_cli.models import CacheStatus, TrackMeta
from lrx_cli.fetchers.local import LocalFetcher
_GENERAL = AppConfig().general
def _local_track(path: Path) -> TrackMeta:
return TrackMeta(url=f"file://{path}")
def test_local_fetcher_unavailable_for_non_local_track():
fetcher = LocalFetcher(_GENERAL)
assert not fetcher.is_available(TrackMeta(title="Song", artist="Artist"))
def test_local_fetcher_available_for_local_track(tmp_path):
fetcher = LocalFetcher(_GENERAL)
assert fetcher.is_available(_local_track(tmp_path / "song.flac"))
def test_local_fetcher_returns_empty_for_non_file_url():
fetcher = LocalFetcher(_GENERAL)
track = TrackMeta(url="https://example.com/song.mp3")
result = asyncio.run(fetcher.fetch(track))
assert result.synced is None and result.unsynced is None
def test_local_fetcher_reads_synced_sidecar(tmp_path):
audio = tmp_path / "song.flac"
lrc = audio.with_suffix(".lrc")
lrc.write_text("[00:01.00]Hello\n[00:03.00]World\n")
fetcher = LocalFetcher(_GENERAL)
result = asyncio.run(fetcher.fetch(_local_track(audio)))
assert result.synced is not None
assert result.synced.status == CacheStatus.SUCCESS_SYNCED
assert result.synced.source is not None
assert "sidecar" in result.synced.source
def test_local_fetcher_reads_unsynced_sidecar(tmp_path):
audio = tmp_path / "song.flac"
lrc = audio.with_suffix(".lrc")
lrc.write_text("Hello\nWorld\n")
fetcher = LocalFetcher(_GENERAL)
result = asyncio.run(fetcher.fetch(_local_track(audio)))
assert result.unsynced is not None
assert result.synced is None
def test_local_fetcher_empty_sidecar_ignored(tmp_path):
audio = tmp_path / "song.flac"
(audio.with_suffix(".lrc")).write_text(" ")
fetcher = LocalFetcher(_GENERAL)
result = asyncio.run(fetcher.fetch(_local_track(audio)))
assert result.synced is None and result.unsynced is None
def _enrich(path: str, **existing) -> dict | None:
enricher = FileNameEnricher()
track = TrackMeta(url=f"file://{path}", **existing)
return asyncio.run(enricher.enrich(track))
def test_filename_enricher_artist_title_split(tmp_path):
result = _enrich(str(tmp_path / "Utada Hikaru - First Love.flac"))
assert result == {
"artist": "Utada Hikaru",
"title": "First Love",
"album": tmp_path.name,
}
def test_filename_enricher_track_number_prefix(tmp_path):
# "01. Title" — no " - " separator, regex strips leading "01. "
result = _enrich(str(tmp_path / "01. First Love.flac"))
assert result and result.get("title") == "First Love"
assert "artist" not in result
def test_filename_enricher_title_only(tmp_path):
result = _enrich(str(tmp_path / "First Love.flac"))
assert result and result.get("title") == "First Love"
def test_filename_enricher_does_not_overwrite_existing_fields(tmp_path):
result = _enrich(
str(tmp_path / "Artist - Title.flac"),
artist="Existing Artist",
title="Existing Title",
)
assert result is None or ("artist" not in result and "title" not in result)
def test_filename_enricher_non_local_returns_none():
enricher = FileNameEnricher()
track = TrackMeta(title="Song", artist="Artist")
assert asyncio.run(enricher.enrich(track)) is None
def test_audio_tag_enricher_non_local_returns_none():
enricher = AudioTagEnricher()
track = TrackMeta(title="Song", artist="Artist")
assert asyncio.run(enricher.enrich(track)) is None
def test_audio_tag_enricher_missing_file_returns_none(tmp_path):
enricher = AudioTagEnricher()
track = _local_track(tmp_path / "nonexistent.flac")
assert asyncio.run(enricher.enrich(track)) is None
+50
View File
@@ -277,3 +277,53 @@ def test_unsynced_cache_only_still_fetches_when_unsynced_disallowed(tmp_path):
assert fetcher.called assert fetcher.called
assert result is not None assert result is not None
assert result.status == CacheStatus.SUCCESS_SYNCED assert result.status == CacheStatus.SUCCESS_SYNCED
# manual_insert
def test_manual_insert_synced_stored_with_correct_status(tmp_path):
manager = make_manager(tmp_path)
manager.manual_insert(_track(), "[00:01.00]Hello\n[00:03.00]World\n")
rows = manager.cache.query_track(_track())
assert any(r["status"] == CacheStatus.SUCCESS_SYNCED.value for r in rows)
def test_manual_insert_unsynced_stored_with_correct_status(tmp_path):
manager = make_manager(tmp_path)
manager.manual_insert(_track(), "Hello\nWorld\n")
rows = manager.cache.query_track(_track())
assert any(r["status"] == CacheStatus.SUCCESS_UNSYNCED.value for r in rows)
def test_manual_insert_source_and_ttl(tmp_path):
manager = make_manager(tmp_path)
manager.manual_insert(_track(), "[00:01.00]line\n")
rows = manager.cache.query_track(_track())
assert all(r["source"] == "manual" for r in rows)
assert all(r["expires_at"] is None for r in rows)
def test_manual_insert_overwrites_previous_entry(tmp_path):
manager = make_manager(tmp_path)
track = _track()
manager.manual_insert(track, "[00:01.00]old\n")
manager.manual_insert(track, "[00:01.00]new\n")
best = manager.cache.get_best(track, ["manual"])
assert best is not None
assert str(best.lyrics) == "[00:01.00]new"
def test_manual_insert_is_returned_by_fetch(tmp_path):
manager = make_manager(tmp_path)
track = _track()
manager.manual_insert(track, "[00:01.00]cached\n")
result = manager.fetch_for_track(track)
assert result is not None
assert result.lyrics is not None
assert str(result.lyrics) == "[00:01.00]cached"
+455
View File
@@ -0,0 +1,455 @@
from __future__ import annotations
import asyncio
from pathlib import Path
from lrx_cli.lrc import LRCData
from lrx_cli.models import TrackMeta
from lrx_cli.watch.control import ControlClient, ControlServer, parse_delta
from lrx_cli.watch.view import BaseOutput, LyricView, WatchState
from lrx_cli.watch.view.pipe import PipeOutput
from lrx_cli.watch.player import ActivePlayerSelector, PlayerState, PlayerTarget
from lrx_cli.watch.fetcher import LyricFetcher
from lrx_cli.config import AppConfig
from lrx_cli.watch.tracker import PositionTracker
from lrx_cli.watch.session import WatchCoordinator
TEST_CONFIG = AppConfig()
def test_parse_delta_supports_plus_minus_and_reset() -> None:
assert parse_delta("+200") == (True, 200, None)
assert parse_delta("-150") == (True, -150, None)
assert parse_delta("0") == (True, 0, None)
def test_player_target_allows_all_when_hint_empty() -> None:
target = PlayerTarget()
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
assert target.allows("org.mpris.MediaPlayer2.mpd") is True
def test_player_target_filters_by_case_insensitive_substring() -> None:
target = PlayerTarget("Spot")
assert target.allows("org.mpris.MediaPlayer2.spotify") is True
assert target.allows("org.mpris.MediaPlayer2.mpd") is False
def test_player_target_reports_blacklisted_hint() -> None:
target = PlayerTarget("spot", player_blacklist=("spotify",))
assert target.validation_error() is not None
def test_active_player_selector_prefers_single_playing() -> None:
players = {
"org.mpris.MediaPlayer2.foo": PlayerState(
bus_name="org.mpris.MediaPlayer2.foo",
status="Paused",
track=TrackMeta(title="A"),
),
"org.mpris.MediaPlayer2.bar": PlayerState(
bus_name="org.mpris.MediaPlayer2.bar",
status="Playing",
track=TrackMeta(title="B"),
),
}
assert (
ActivePlayerSelector.select(players, None, TEST_CONFIG)
== "org.mpris.MediaPlayer2.bar"
)
def test_active_player_selector_uses_last_active_when_no_playing() -> None:
players = {
"org.mpris.MediaPlayer2.foo": PlayerState(
bus_name="org.mpris.MediaPlayer2.foo",
status="Paused",
track=TrackMeta(title="A"),
),
"org.mpris.MediaPlayer2.bar": PlayerState(
bus_name="org.mpris.MediaPlayer2.bar",
status="Stopped",
track=TrackMeta(title="B"),
),
}
assert (
ActivePlayerSelector.select(
players,
"org.mpris.MediaPlayer2.bar",
TEST_CONFIG,
)
== "org.mpris.MediaPlayer2.bar"
)
def test_position_tracker_seeked_calibrates_immediately() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 1200
tracker = PositionTracker(_poll, TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
await tracker.on_seeked("org.mpris.MediaPlayer2.foo", 3500)
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 3500
asyncio.run(_run())
def test_position_tracker_playback_status_pause_stops_fast_growth() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 0
tracker = PositionTracker(_poll, TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
await asyncio.sleep(0.08)
before = await tracker.get_position_ms()
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Paused")
await asyncio.sleep(0.08)
after = await tracker.get_position_ms()
await tracker.stop()
assert before > 0
assert after - before < 20
asyncio.run(_run())
def test_position_tracker_playback_status_playing_calibrates_once() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 50000
tracker = PositionTracker(_poll, TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
)
await tracker.on_playback_status("org.mpris.MediaPlayer2.foo", "Playing")
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 50000
asyncio.run(_run())
def test_position_tracker_set_active_player_playing_calibrates_on_resume() -> None:
async def _run() -> None:
async def _poll(_bus: str):
return 42000
tracker = PositionTracker(_poll, TEST_CONFIG)
await tracker.start()
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Paused", "track-A"
)
await tracker.set_active_player(
"org.mpris.MediaPlayer2.foo", "Playing", "track-A"
)
pos = await tracker.get_position_ms()
await tracker.stop()
assert pos >= 42000
asyncio.run(_run())
def test_control_server_and_client_roundtrip(tmp_path: Path) -> None:
async def _run() -> None:
class _Session:
def __init__(self):
self.offset = 0
def handle_offset(self, delta: int) -> dict:
self.offset += delta
return {"ok": True, "offset_ms": self.offset}
def handle_status(self) -> dict:
return {"ok": True, "offset_ms": self.offset, "lyrics_status": "idle"}
socket_path = tmp_path / "watch.sock"
server = ControlServer(socket_path=socket_path, config=TEST_CONFIG)
session = _Session()
await server.start(session) # type: ignore
client = ControlClient(socket_path=socket_path, config=TEST_CONFIG)
r1 = await client._send_async({"cmd": "offset", "delta": 200})
r2 = await client._send_async({"cmd": "status"})
await server.stop()
assert r1 == {"ok": True, "offset_ms": 200}
assert r2["ok"] is True
assert r2["offset_ms"] == 200
asyncio.run(_run())
def test_pipe_output_prints_fixed_window_for_status(capsys) -> None:
output = PipeOutput(before=1, after=1)
state = WatchState(
track=None,
lyrics=None,
position_ms=0,
offset_ms=0,
status="fetching",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\n[fetching...]\n\n"
def test_pipe_output_uses_context_window_for_lyrics(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=2100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "a\nb\nc\n"
def test_pipe_output_shows_upcoming_lines_before_first_timestamp(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:02.00]a\n[00:03.00]b")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=0,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\n\na\n"
def test_pipe_output_first_line_keeps_before_region_empty(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=1100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "\na\nb\n"
def test_pipe_output_last_line_keeps_after_region_empty(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]a\n[00:02.00]b\n[00:03.00]c")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=3100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "b\nc\n\n"
def test_pipe_output_repeated_text_uses_correct_timed_occurrence(capsys) -> None:
output = PipeOutput(before=1, after=1)
lyrics = LRCData("[00:01.00]A\n[00:02.00]X\n[00:03.00]B\n[00:04.00]X\n[00:05.00]C")
state = WatchState(
track=TrackMeta(title="Song"),
lyrics=LyricView.from_lrc(lyrics),
position_ms=4100,
offset_ms=0,
status="ok",
)
asyncio.run(output.on_state(state))
printed = capsys.readouterr().out
assert printed == "B\nX\nC\n"
def test_session_fetches_on_resume_playing_without_lyrics() -> None:
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
class _Output(BaseOutput):
async def on_state(self, state: WatchState) -> None:
return None
class _Fetcher(LyricFetcher):
def __init__(self):
async def _fetch(_track: TrackMeta):
return None
async def _on_fetching() -> None:
return None
async def _on_result(_lyrics) -> None:
return None
super().__init__(_fetch, _on_fetching, _on_result, TEST_CONFIG)
self.requested = []
def request(self, track: TrackMeta) -> None:
self.requested.append(track.display_name())
session = WatchCoordinator(
_Manager(), # type: ignore
_Output(),
player_hint=None,
config=TEST_CONFIG,
)
fake_fetcher = _Fetcher()
session._fetcher = fake_fetcher
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_CONFIG,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
}
session._model.set_lyrics(None)
session._model.status = "paused"
session._on_playback_status(bus_name, "Playing")
await asyncio.sleep(0)
assert fake_fetcher.requested == ["Artist - Song"]
assert session._model.status == "fetching"
asyncio.run(_run())
def test_session_emit_state_only_when_lyric_cursor_changes() -> None:
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
class _Output(BaseOutput):
def __init__(self):
self.count = 0
async def on_state(self, state: WatchState) -> None:
self.count += 1
output = _Output()
session = WatchCoordinator(
_Manager(), # type: ignore
output,
player_hint=None,
config=TEST_CONFIG,
)
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_CONFIG,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
}
session._model.set_lyrics(LRCData("[00:01.00]a\n[00:03.00]b"))
session._model.status = "ok"
await session._tracker.set_active_player(
bus_name,
"Playing",
"Artist - Song",
)
await session._emit_state()
await session._emit_state()
await session._tracker.on_seeked(bus_name, 3200)
await session._emit_state()
assert output.count == 2
asyncio.run(_run())
def test_session_emits_when_crossing_first_timestamp() -> None:
async def _run() -> None:
class _Manager:
def fetch_for_track(self, *_args, **_kwargs):
return None
class _Output(BaseOutput):
def __init__(self):
self.count = 0
async def on_state(self, state: WatchState) -> None:
self.count += 1
output = _Output()
session = WatchCoordinator(
_Manager(), # type: ignore
output,
player_hint=None,
config=TEST_CONFIG,
)
session._tracker = PositionTracker(
lambda _bus: asyncio.sleep(0, result=0),
TEST_CONFIG,
)
bus_name = "org.mpris.MediaPlayer2.spotify"
track = TrackMeta(title="Song", artist="Artist")
session._model.active_player = bus_name
session._player_monitor.players = {
bus_name: PlayerState(bus_name=bus_name, status="Playing", track=track)
}
session._model.set_lyrics(LRCData("[00:02.00]a\n[00:03.00]b"))
session._model.status = "ok"
await session._tracker.set_active_player(
bus_name,
"Playing",
"Artist - Song",
)
await session._emit_state()
await session._tracker.on_seeked(bus_name, 2500)
await session._emit_state()
assert output.count == 2
asyncio.run(_run())
Generated
+33 -44
View File
@@ -43,7 +43,7 @@ wheels = [
[[package]] [[package]]
name = "cyclopts" name = "cyclopts"
version = "4.10.1" version = "4.10.2"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "attrs" }, { name = "attrs" },
@@ -51,9 +51,9 @@ dependencies = [
{ name = "rich" }, { name = "rich" },
{ name = "rich-rst" }, { name = "rich-rst" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/6c/c4/2ce2ca1451487dc7d59f09334c3fa1182c46cfcf0a2d5f19f9b26d53ac74/cyclopts-4.10.1.tar.gz", hash = "sha256:ad4e4bb90576412d32276b14a76f55d43353753d16217f2c3cd5bdceba7f15a0", size = 166623, upload-time = "2026-03-23T14:43:01.098Z" } sdist = { url = "https://files.pythonhosted.org/packages/66/2c/fced34890f6e5a93a4b7afb2c71e8eee2a0719fb26193a0abf159ecb714d/cyclopts-4.10.2.tar.gz", hash = "sha256:d7b950457ef2563596d56331f80cbbbf86a2772535fb8b315c4f03bc7e6127f1", size = 166664, upload-time = "2026-04-08T23:57:45.805Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/8a/0b/2261922126b2e50c601fe22d7ff5194e0a4d50e654836260c0665e24d862/cyclopts-4.10.1-py3-none-any.whl", hash = "sha256:35f37257139380a386d9fe4475e1e7c87ca7795765ef4f31abba579fcfcb6ecd", size = 204331, upload-time = "2026-03-23T14:43:02.625Z" }, { url = "https://files.pythonhosted.org/packages/b4/bd/05055d8360cef0757d79367157f3b15c0a0715e81e08f86a04018ec045f0/cyclopts-4.10.2-py3-none-any.whl", hash = "sha256:a1f2d6f8f7afac9456b48f75a40b36658778ddc9c6d406b520d017ae32c990fe", size = 204314, upload-time = "2026-04-08T23:57:46.969Z" },
] ]
[[package]] [[package]]
@@ -153,7 +153,7 @@ wheels = [
[[package]] [[package]]
name = "lrx-cli" name = "lrx-cli"
version = "0.6.4" version = "0.6.5"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "cyclopts" }, { name = "cyclopts" },
@@ -162,7 +162,6 @@ dependencies = [
{ name = "loguru" }, { name = "loguru" },
{ name = "mutagen" }, { name = "mutagen" },
{ name = "platformdirs" }, { name = "platformdirs" },
{ name = "python-dotenv" },
] ]
[package.dev-dependencies] [package.dev-dependencies]
@@ -178,8 +177,7 @@ requires-dist = [
{ name = "httpx", specifier = ">=0.28.1" }, { name = "httpx", specifier = ">=0.28.1" },
{ name = "loguru", specifier = ">=0.7.3" }, { name = "loguru", specifier = ">=0.7.3" },
{ name = "mutagen", specifier = ">=1.47.0" }, { name = "mutagen", specifier = ">=1.47.0" },
{ name = "platformdirs", specifier = ">=4.9.4" }, { name = "platformdirs", specifier = ">=4.9.6" },
{ name = "python-dotenv", specifier = ">=1.2.2" },
] ]
[package.metadata.requires-dev] [package.metadata.requires-dev]
@@ -229,11 +227,11 @@ wheels = [
[[package]] [[package]]
name = "platformdirs" name = "platformdirs"
version = "4.9.4" version = "4.9.6"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/19/56/8d4c30c8a1d07013911a8fdbd8f89440ef9f08d07a1b50ab8ca8be5a20f9/platformdirs-4.9.4.tar.gz", hash = "sha256:1ec356301b7dc906d83f371c8f487070e99d3ccf9e501686456394622a01a934", size = 28737, upload-time = "2026-03-05T18:34:13.271Z" } sdist = { url = "https://files.pythonhosted.org/packages/9f/4a/0883b8e3802965322523f0b200ecf33d31f10991d0401162f4b23c698b42/platformdirs-4.9.6.tar.gz", hash = "sha256:3bfa75b0ad0db84096ae777218481852c0ebc6c727b3168c1b9e0118e458cf0a", size = 29400, upload-time = "2026-04-09T00:04:10.812Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/63/d7/97f7e3a6abb67d8080dd406fd4df842c2be0efaf712d1c899c32a075027c/platformdirs-4.9.4-py3-none-any.whl", hash = "sha256:68a9a4619a666ea6439f2ff250c12a853cd1cbd5158d258bd824a7df6be2f868", size = 21216, upload-time = "2026-03-05T18:34:12.172Z" }, { url = "https://files.pythonhosted.org/packages/75/a6/a0a304dc33b49145b21f4808d763822111e67d1c3a32b524a1baf947b6e1/platformdirs-4.9.6-py3-none-any.whl", hash = "sha256:e61adb1d5e5cb3441b4b7710bea7e4c12250ca49439228cc1021c00dcfac0917", size = 21348, upload-time = "2026-04-09T00:04:09.463Z" },
] ]
[[package]] [[package]]
@@ -247,16 +245,16 @@ wheels = [
[[package]] [[package]]
name = "pygments" name = "pygments"
version = "2.19.2" version = "2.20.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, { url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" },
] ]
[[package]] [[package]]
name = "pytest" name = "pytest"
version = "9.0.2" version = "9.0.3"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" }, { name = "colorama", marker = "sys_platform == 'win32'" },
@@ -265,18 +263,9 @@ dependencies = [
{ name = "pluggy" }, { name = "pluggy" },
{ name = "pygments" }, { name = "pygments" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" } sdist = { url = "https://files.pythonhosted.org/packages/7d/0d/549bd94f1a0a402dc8cf64563a117c0f3765662e2e668477624baeec44d5/pytest-9.0.3.tar.gz", hash = "sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c", size = 1572165, upload-time = "2026-04-07T17:16:18.027Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/82/ed/0301aeeac3e5353ef3d94b6ec08bbcabd04a72018415dcb29e588514bba8/python_dotenv-1.2.2.tar.gz", hash = "sha256:2c371a91fbd7ba082c2c1dc1f8bf89ca22564a087c2c287cd9b662adde799cf3", size = 50135, upload-time = "2026-03-01T16:00:26.196Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" },
] ]
[[package]] [[package]]
@@ -307,27 +296,27 @@ wheels = [
[[package]] [[package]]
name = "ruff" name = "ruff"
version = "0.15.8" version = "0.15.10"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/14/b0/73cf7550861e2b4824950b8b52eebdcc5adc792a00c514406556c5b80817/ruff-0.15.8.tar.gz", hash = "sha256:995f11f63597ee362130d1d5a327a87cb6f3f5eae3094c620bcc632329a4d26e", size = 4610921, upload-time = "2026-03-26T18:39:38.675Z" } sdist = { url = "https://files.pythonhosted.org/packages/e7/d9/aa3f7d59a10ef6b14fe3431706f854dbf03c5976be614a9796d36326810c/ruff-0.15.10.tar.gz", hash = "sha256:d1f86e67ebfdef88e00faefa1552b5e510e1d35f3be7d423dc7e84e63788c94e", size = 4631728, upload-time = "2026-04-09T14:06:09.884Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/4a/92/c445b0cd6da6e7ae51e954939cb69f97e008dbe750cfca89b8cedc081be7/ruff-0.15.8-py3-none-linux_armv6l.whl", hash = "sha256:cbe05adeba76d58162762d6b239c9056f1a15a55bd4b346cfd21e26cd6ad7bc7", size = 10527394, upload-time = "2026-03-26T18:39:41.566Z" }, { url = "https://files.pythonhosted.org/packages/eb/00/a1c2fdc9939b2c03691edbda290afcd297f1f389196172826b03d6b6a595/ruff-0.15.10-py3-none-linux_armv6l.whl", hash = "sha256:0744e31482f8f7d0d10a11fcbf897af272fefdfcb10f5af907b18c2813ff4d5f", size = 10563362, upload-time = "2026-04-09T14:06:21.189Z" },
{ url = "https://files.pythonhosted.org/packages/eb/92/f1c662784d149ad1414cae450b082cf736430c12ca78367f20f5ed569d65/ruff-0.15.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:d3e3d0b6ba8dca1b7ef9ab80a28e840a20070c4b62e56d675c24f366ef330570", size = 10905693, upload-time = "2026-03-26T18:39:30.364Z" }, { url = "https://files.pythonhosted.org/packages/5c/15/006990029aea0bebe9d33c73c3e28c80c391ebdba408d1b08496f00d422d/ruff-0.15.10-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b1e7c16ea0ff5a53b7c2df52d947e685973049be1cdfe2b59a9c43601897b22e", size = 10951122, upload-time = "2026-04-09T14:06:02.236Z" },
{ url = "https://files.pythonhosted.org/packages/ca/f2/7a631a8af6d88bcef997eb1bf87cc3da158294c57044aafd3e17030613de/ruff-0.15.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:6ee3ae5c65a42f273f126686353f2e08ff29927b7b7e203b711514370d500de3", size = 10323044, upload-time = "2026-03-26T18:39:33.37Z" }, { url = "https://files.pythonhosted.org/packages/f2/c0/4ac978fe874d0618c7da647862afe697b281c2806f13ce904ad652fa87e4/ruff-0.15.10-py3-none-macosx_11_0_arm64.whl", hash = "sha256:93cc06a19e5155b4441dd72808fdf84290d84ad8a39ca3b0f994363ade4cebb1", size = 10314005, upload-time = "2026-04-09T14:06:00.026Z" },
{ url = "https://files.pythonhosted.org/packages/67/18/1bf38e20914a05e72ef3b9569b1d5c70a7ef26cd188d69e9ca8ef588d5bf/ruff-0.15.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fdce027ada77baa448077ccc6ebb2fa9c3c62fd110d8659d601cf2f475858d94", size = 10629135, upload-time = "2026-03-26T18:39:44.142Z" }, { url = "https://files.pythonhosted.org/packages/da/73/c209138a5c98c0d321266372fc4e33ad43d506d7e5dd817dd89b60a8548f/ruff-0.15.10-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:83e1dd04312997c99ea6965df66a14fb4f03ba978564574ffc68b0d61fd3989e", size = 10643450, upload-time = "2026-04-09T14:05:42.137Z" },
{ url = "https://files.pythonhosted.org/packages/d2/e9/138c150ff9af60556121623d41aba18b7b57d95ac032e177b6a53789d279/ruff-0.15.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:12e617fc01a95e5821648a6df341d80456bd627bfab8a829f7cfc26a14a4b4a3", size = 10348041, upload-time = "2026-03-26T18:39:52.178Z" }, { url = "https://files.pythonhosted.org/packages/ec/76/0deec355d8ec10709653635b1f90856735302cb8e149acfdf6f82a5feb70/ruff-0.15.10-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8154d43684e4333360fedd11aaa40b1b08a4e37d8ffa9d95fee6fa5b37b6fab1", size = 10379597, upload-time = "2026-04-09T14:05:49.984Z" },
{ url = "https://files.pythonhosted.org/packages/02/f1/5bfb9298d9c323f842c5ddeb85f1f10ef51516ac7a34ba446c9347d898df/ruff-0.15.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:432701303b26416d22ba696c39f2c6f12499b89093b61360abc34bcc9bf07762", size = 11121987, upload-time = "2026-03-26T18:39:55.195Z" }, { url = "https://files.pythonhosted.org/packages/dc/be/86bba8fc8798c081e28a4b3bb6d143ccad3fd5f6f024f02002b8f08a9fa3/ruff-0.15.10-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8ab88715f3a6deb6bde6c227f3a123410bec7b855c3ae331b4c006189e895cef", size = 11146645, upload-time = "2026-04-09T14:06:12.246Z" },
{ url = "https://files.pythonhosted.org/packages/10/11/6da2e538704e753c04e8d86b1fc55712fdbdcc266af1a1ece7a51fff0d10/ruff-0.15.8-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d910ae974b7a06a33a057cb87d2a10792a3b2b3b35e33d2699fdf63ec8f6b17a", size = 11951057, upload-time = "2026-03-26T18:39:19.18Z" }, { url = "https://files.pythonhosted.org/packages/a8/89/140025e65911b281c57be1d385ba1d932c2366ca88ae6663685aed8d4881/ruff-0.15.10-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a768ff5969b4f44c349d48edf4ab4f91eddb27fd9d77799598e130fb628aa158", size = 12030289, upload-time = "2026-04-09T14:06:04.776Z" },
{ url = "https://files.pythonhosted.org/packages/83/f0/c9208c5fd5101bf87002fed774ff25a96eea313d305f1e5d5744698dc314/ruff-0.15.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2033f963c43949d51e6fdccd3946633c6b37c484f5f98c3035f49c27395a8ab8", size = 11464613, upload-time = "2026-03-26T18:40:06.301Z" }, { url = "https://files.pythonhosted.org/packages/88/de/ddacca9545a5e01332567db01d44bd8cf725f2db3b3d61a80550b48308ea/ruff-0.15.10-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0ee3ef42dab7078bda5ff6a1bcba8539e9857deb447132ad5566a038674540d0", size = 11496266, upload-time = "2026-04-09T14:05:55.485Z" },
{ url = "https://files.pythonhosted.org/packages/f8/22/d7f2fabdba4fae9f3b570e5605d5eb4500dcb7b770d3217dca4428484b17/ruff-0.15.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0f29b989a55572fb885b77464cf24af05500806ab4edf9a0fd8977f9759d85b1", size = 11257557, upload-time = "2026-03-26T18:39:57.972Z" }, { url = "https://files.pythonhosted.org/packages/bc/bb/7ddb00a83760ff4a83c4e2fc231fd63937cc7317c10c82f583302e0f6586/ruff-0.15.10-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51cb8cc943e891ba99989dd92d61e29b1d231e14811db9be6440ecf25d5c1609", size = 11256418, upload-time = "2026-04-09T14:05:57.69Z" },
{ url = "https://files.pythonhosted.org/packages/71/8c/382a9620038cf6906446b23ce8632ab8c0811b8f9d3e764f58bedd0c9a6f/ruff-0.15.8-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:ac51d486bf457cdc985a412fb1801b2dfd1bd8838372fc55de64b1510eff4bec", size = 11169440, upload-time = "2026-03-26T18:39:22.205Z" }, { url = "https://files.pythonhosted.org/packages/dc/8d/55de0d35aacf6cd50b6ee91ee0f291672080021896543776f4170fc5c454/ruff-0.15.10-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:e59c9bdc056a320fb9ea1700a8d591718b8faf78af065484e801258d3a76bc3f", size = 11288416, upload-time = "2026-04-09T14:05:44.695Z" },
{ url = "https://files.pythonhosted.org/packages/4d/0d/0994c802a7eaaf99380085e4e40c845f8e32a562e20a38ec06174b52ef24/ruff-0.15.8-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:c9861eb959edab053c10ad62c278835ee69ca527b6dcd72b47d5c1e5648964f6", size = 10605963, upload-time = "2026-03-26T18:39:46.682Z" }, { url = "https://files.pythonhosted.org/packages/68/cf/9438b1a27426ec46a80e0a718093c7f958ef72f43eb3111862949ead3cc1/ruff-0.15.10-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:136c00ca2f47b0018b073f28cb5c1506642a830ea941a60354b0e8bc8076b151", size = 10621053, upload-time = "2026-04-09T14:05:52.782Z" },
{ url = "https://files.pythonhosted.org/packages/19/aa/d624b86f5b0aad7cef6bbf9cd47a6a02dfdc4f72c92a337d724e39c9d14b/ruff-0.15.8-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8d9a5b8ea13f26ae90838afc33f91b547e61b794865374f114f349e9036835fb", size = 10357484, upload-time = "2026-03-26T18:39:49.176Z" }, { url = "https://files.pythonhosted.org/packages/4c/50/e29be6e2c135e9cd4cb15fbade49d6a2717e009dff3766dd080fcb82e251/ruff-0.15.10-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8b80a2f3c9c8a950d6237f2ca12b206bccff626139be9fa005f14feb881a1ae8", size = 10378302, upload-time = "2026-04-09T14:06:14.361Z" },
{ url = "https://files.pythonhosted.org/packages/35/c3/e0b7835d23001f7d999f3895c6b569927c4d39912286897f625736e1fd04/ruff-0.15.8-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c2a33a529fb3cbc23a7124b5c6ff121e4d6228029cba374777bd7649cc8598b8", size = 10830426, upload-time = "2026-03-26T18:40:03.702Z" }, { url = "https://files.pythonhosted.org/packages/18/2f/e0b36a6f99c51bb89f3a30239bc7bf97e87a37ae80aa2d6542d6e5150364/ruff-0.15.10-py3-none-musllinux_1_2_i686.whl", hash = "sha256:e3e53c588164dc025b671c9df2462429d60357ea91af7e92e9d56c565a9f1b07", size = 10850074, upload-time = "2026-04-09T14:06:16.581Z" },
{ url = "https://files.pythonhosted.org/packages/f0/51/ab20b322f637b369383adc341d761eaaa0f0203d6b9a7421cd6e783d81b9/ruff-0.15.8-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:75e5cd06b1cf3f47a3996cfc999226b19aa92e7cce682dcd62f80d7035f98f49", size = 11345125, upload-time = "2026-03-26T18:39:27.799Z" }, { url = "https://files.pythonhosted.org/packages/11/08/874da392558ce087a0f9b709dc6ec0d60cbc694c1c772dab8d5f31efe8cb/ruff-0.15.10-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:b0c52744cf9f143a393e284125d2576140b68264a93c6716464e129a3e9adb48", size = 11358051, upload-time = "2026-04-09T14:06:18.948Z" },
{ url = "https://files.pythonhosted.org/packages/37/e6/90b2b33419f59d0f2c4c8a48a4b74b460709a557e8e0064cf33ad894f983/ruff-0.15.8-py3-none-win32.whl", hash = "sha256:bc1f0a51254ba21767bfa9a8b5013ca8149dcf38092e6a9eb704d876de94dc34", size = 10571959, upload-time = "2026-03-26T18:39:36.117Z" }, { url = "https://files.pythonhosted.org/packages/e4/46/602938f030adfa043e67112b73821024dc79f3ab4df5474c25fa4c1d2d14/ruff-0.15.10-py3-none-win32.whl", hash = "sha256:d4272e87e801e9a27a2e8df7b21011c909d9ddd82f4f3281d269b6ba19789ca5", size = 10588964, upload-time = "2026-04-09T14:06:07.14Z" },
{ url = "https://files.pythonhosted.org/packages/1f/a2/ef467cb77099062317154c63f234b8a7baf7cb690b99af760c5b68b9ee7f/ruff-0.15.8-py3-none-win_amd64.whl", hash = "sha256:04f79eff02a72db209d47d665ba7ebcad609d8918a134f86cb13dd132159fc89", size = 11743893, upload-time = "2026-03-26T18:39:25.01Z" }, { url = "https://files.pythonhosted.org/packages/25/b6/261225b875d7a13b33a6d02508c39c28450b2041bb01d0f7f1a83d569512/ruff-0.15.10-py3-none-win_amd64.whl", hash = "sha256:28cb32d53203242d403d819fd6983152489b12e4a3ae44993543d6fe62ab42ed", size = 11745044, upload-time = "2026-04-09T14:05:39.473Z" },
{ url = "https://files.pythonhosted.org/packages/15/e2/77be4fff062fa78d9b2a4dea85d14785dac5f1d0c1fb58ed52331f0ebe28/ruff-0.15.8-py3-none-win_arm64.whl", hash = "sha256:cf891fa8e3bb430c0e7fac93851a5978fc99c8fa2c053b57b118972866f8e5f2", size = 11048175, upload-time = "2026-03-26T18:40:01.06Z" }, { url = "https://files.pythonhosted.org/packages/58/ed/dea90a65b7d9e69888890fb14c90d7f51bf0c1e82ad800aeb0160e4bacfd/ruff-0.15.10-py3-none-win_arm64.whl", hash = "sha256:601d1610a9e1f1c2165a4f561eeaa2e2ea1e97f3287c5aa258d3dab8b57c6188", size = 11035607, upload-time = "2026-04-09T14:05:47.593Z" },
] ]
[[package]] [[package]]