From 0eabea6a50413f0759497ca533954e9f6408651d Mon Sep 17 00:00:00 2001 From: Uyanide Date: Fri, 3 Apr 2026 18:39:31 +0200 Subject: [PATCH] refactor: async parallel fetching --- lrx_cli/core.py | 225 +++++++++++++++++------------- lrx_cli/fetchers/__init__.py | 39 +++++- lrx_cli/fetchers/base.py | 2 +- lrx_cli/fetchers/cache_search.py | 2 +- lrx_cli/fetchers/local.py | 2 +- lrx_cli/fetchers/lrclib.py | 9 +- lrx_cli/fetchers/lrclib_search.py | 7 +- lrx_cli/fetchers/netease.py | 30 ++-- lrx_cli/fetchers/qqmusic.py | 28 ++-- lrx_cli/fetchers/spotify.py | 190 +++++++++++-------------- pyproject.toml | 2 +- tests/test_pipeline.py | 170 ++++++++++++++++++++++ uv.lock | 2 +- 13 files changed, 452 insertions(+), 256 deletions(-) create mode 100644 tests/test_pipeline.py diff --git a/lrx_cli/core.py b/lrx_cli/core.py index d8559de..36aacbf 100644 --- a/lrx_cli/core.py +++ b/lrx_cli/core.py @@ -4,18 +4,11 @@ Date: 2026-03-25 11:09:53 Description: Core orchestrator — coordinates fetchers with cache-aware fallback """ -""" -Fetch pipeline: - 1. Check cache for each source in the fallback sequence - 2. For sources without a valid cache hit, call the fetcher - 3. Cache every result (success, not-found, or error) per source - 4. Return the best result by confidence (highest wins) -""" - +import asyncio from typing import Optional from loguru import logger -from .fetchers import FetcherMethodType, create_fetchers +from .fetchers import FetcherMethodType, build_plan, create_fetchers from .fetchers.base import BaseFetcher from .cache import CacheEngine from .lrc import LRCData @@ -50,6 +43,19 @@ def _is_better(new: LyricResult, old: LyricResult) -> bool: ) +def _normalize_result(result: LyricResult) -> LyricResult: + """Normalize unsynced lyrics before returning.""" + if result.status == CacheStatus.SUCCESS_UNSYNCED and result.lyrics: + return LyricResult( + status=result.status, + lyrics=result.lyrics.normalize_unsynced(), + source=result.source, + ttl=result.ttl, + confidence=result.confidence, + ) + return result + + class LrcManager: """Main entry point for fetching lyrics with caching.""" @@ -57,54 +63,18 @@ class LrcManager: self.cache = CacheEngine(db_path=db_path) self.fetchers = create_fetchers(self.cache) - def _build_sequence( - self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None - ) -> list[BaseFetcher]: - """Determine the ordered list of fetchers to try.""" - if force_method: - if force_method not in self.fetchers: - logger.error(f"Unknown method: {force_method}") - return [] - return [self.fetchers[force_method]] - - sequence: list[BaseFetcher] = [] - for method in self.fetchers.keys(): - if self.fetchers[method].is_available(track): - sequence.append(self.fetchers[method]) - - logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}") - return sequence - - def fetch_for_track( + async def _run_group( self, + group: list[BaseFetcher], track: TrackMeta, - force_method: Optional[FetcherMethodType] = None, - bypass_cache: bool = False, - ) -> Optional[LyricResult]: - """Fetch lyrics for *track* using the fallback pipeline. + bypass_cache: bool, + ) -> list[tuple[str, LyricResult]]: + """Run one group: cache-check first, then parallel-fetch uncached. Returns (source, result) pairs.""" + cached_results: list[tuple[str, LyricResult]] = [] + need_fetch: list[BaseFetcher] = [] - Each source is checked against the cache independently: - - Cache hit with synced lyrics → return immediately - - Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source - - Cache miss or unsynced → call fetcher, then cache the result - - After all sources are tried, returns the best result found - (highest confidence wins). - """ - track = enrich_track(track) - logger.info(f"Fetching lyrics for: {track.display_name()}") - - sequence = self._build_sequence(track, force_method) - if not sequence: - return None - - # Best result seen so far (highest confidence wins) - best_result: Optional[LyricResult] = None - - for fetcher in sequence: + for fetcher in group: source = fetcher.source_name - - # Cache check (skip for fetchers that handle their own caching) if not bypass_cache and not fetcher.self_cached: cached = self.cache.get(track, source) if cached: @@ -116,75 +86,130 @@ class LrcManager: f"[{source}] cache hit: {cached.status.value}, skipping" ) continue - - # Positive cache hit — apply the same confidence evaluation - # as fresh fetches so that low-confidence cached results - # don't block better results from later fetchers. is_trusted = cached.confidence >= HIGH_CONFIDENCE logger.info( f"[{source}] cache hit: {cached.status.value}" f" (confidence={cached.confidence:.0f})" ) + cached_results.append((source, cached)) + # Return immediately on trusted synced cache hit if cached.status == CacheStatus.SUCCESS_SYNCED and is_trusted: - return cached - if best_result is None or _is_better(cached, best_result): - best_result = cached + return cached_results continue elif not fetcher.self_cached: logger.debug(f"[{source}] cache bypassed") + need_fetch.append(fetcher) - # Fetch - logger.debug(f"[{source}] calling fetcher...") - result = fetcher.fetch(track, bypass_cache=bypass_cache) + if need_fetch: + task_map: dict[asyncio.Task, BaseFetcher] = { + asyncio.create_task(f.fetch(track, bypass_cache=bypass_cache)): f + for f in need_fetch + } + pending = set(task_map) - if not result: - logger.debug(f"[{source}] returned None (no result)") - continue - - # Cache the result (skip for self-cached fetchers) - if not fetcher.self_cached and not bypass_cache: - ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND) - self.cache.set(track, source, result, ttl_seconds=ttl) - - # Evaluate result - if result.status in ( - CacheStatus.SUCCESS_SYNCED, - CacheStatus.SUCCESS_UNSYNCED, - ): - is_trusted = result.confidence >= HIGH_CONFIDENCE - logger.info( - f"[{source}] got {result.status.value} lyrics" - f" (confidence={result.confidence:.0f})" + while pending: + done, pending = await asyncio.wait( + pending, return_when=asyncio.FIRST_COMPLETED ) + found_trusted = False + for task in done: + fetcher = task_map[task] + source = fetcher.source_name + try: + result = task.result() + except Exception as e: + logger.error(f"[{source}] fetch raised: {e}") + continue + + if result is None: + logger.debug(f"[{source}] returned None") + continue + + if not fetcher.self_cached and not bypass_cache: + ttl = result.ttl or _STATUS_TTL.get( + result.status, TTL_NOT_FOUND + ) + self.cache.set(track, source, result, ttl_seconds=ttl) + + if result.status in ( + CacheStatus.SUCCESS_SYNCED, + CacheStatus.SUCCESS_UNSYNCED, + ): + logger.info( + f"[{source}] got {result.status.value} lyrics" + f" (confidence={result.confidence:.0f})" + ) + cached_results.append((source, result)) + + if ( + result.status == CacheStatus.SUCCESS_SYNCED + and result.confidence >= HIGH_CONFIDENCE + ): + found_trusted = True + + if found_trusted: + for t in pending: + t.cancel() + await asyncio.gather(*pending, return_exceptions=True) + break + + return cached_results + + async def _fetch_for_track( + self, + track: TrackMeta, + force_method: Optional[FetcherMethodType], + bypass_cache: bool, + ) -> Optional[LyricResult]: + track = enrich_track(track) + logger.info(f"Fetching lyrics for: {track.display_name()}") + + plan = build_plan(self.fetchers, track, force_method) + if not plan: + return None + + best_result: Optional[LyricResult] = None + + for group in plan: + group_results = await self._run_group(group, track, bypass_cache) + + for source, result in group_results: + if result.status not in ( + CacheStatus.SUCCESS_SYNCED, + CacheStatus.SUCCESS_UNSYNCED, + ): + continue + + is_trusted = result.confidence >= HIGH_CONFIDENCE + # Trusted synced → return immediately if result.status == CacheStatus.SUCCESS_SYNCED and is_trusted: - return result - # Track best result by confidence + logger.info( + f"Returning {result.status.value} lyrics from {source}" + f" (confidence={result.confidence:.0f})" + ) + return _normalize_result(result) + if best_result is None or _is_better(result, best_result): best_result = result - # NOT_FOUND / NETWORK_ERROR: already cached, try next - - # Return best available if best_result: - if ( - best_result.status == CacheStatus.SUCCESS_UNSYNCED - and best_result.lyrics - ): - best_result = LyricResult( - status=best_result.status, - lyrics=best_result.lyrics.normalize_unsynced(), - source=best_result.source, - ttl=best_result.ttl, - confidence=best_result.confidence, - ) logger.info( f"Returning {best_result.status.value} lyrics from {best_result.source}" ) - else: - logger.info(f"No lyrics found for {track.display_name()}") + return _normalize_result(best_result) - return best_result + logger.info(f"No lyrics found for {track.display_name()}") + return None + + def fetch_for_track( + self, + track: TrackMeta, + force_method: Optional[FetcherMethodType] = None, + bypass_cache: bool = False, + ) -> Optional[LyricResult]: + """Fetch lyrics for *track* using the group-based parallel pipeline.""" + return asyncio.run(self._fetch_for_track(track, force_method, bypass_cache)) def manual_insert( self, diff --git a/lrx_cli/fetchers/__init__.py b/lrx_cli/fetchers/__init__.py index 75377d6..53f0c29 100644 --- a/lrx_cli/fetchers/__init__.py +++ b/lrx_cli/fetchers/__init__.py @@ -4,7 +4,8 @@ Date: 2026-03-25 02:33:26 Description: Fetcher pipeline — registry and types """ -from typing import Literal +from typing import Literal, Optional +from loguru import logger from .base import BaseFetcher from .local import LocalFetcher @@ -15,6 +16,7 @@ from .lrclib_search import LrclibSearchFetcher from .netease import NeteaseFetcher from .qqmusic import QQMusicFetcher from ..cache import CacheEngine +from ..models import TrackMeta FetcherMethodType = Literal[ "local", @@ -26,6 +28,15 @@ FetcherMethodType = Literal[ "qqmusic", ] +# Fetchers within a group run in parallel; groups run sequentially. +# A group that produces any positive result stops the pipeline. +_FETCHER_GROUPS: list[list[FetcherMethodType]] = [ + ["local"], + ["cache-search"], + ["spotify", "lrclib"], + ["lrclib-search", "netease", "qqmusic"], +] + def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]: """Instantiate all fetchers. Returns a dict keyed by source name.""" @@ -39,3 +50,29 @@ def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]: "qqmusic": QQMusicFetcher(), } return fetchers + + +def build_plan( + fetchers: dict[FetcherMethodType, BaseFetcher], + track: TrackMeta, + force_method: Optional[FetcherMethodType] = None, +) -> list[list[BaseFetcher]]: + """Return the fetch plan as a list of groups (each group runs in parallel).""" + if force_method: + if force_method not in fetchers: + logger.error(f"Unknown method: {force_method}") + return [] + return [[fetchers[force_method]]] + + plan: list[list[BaseFetcher]] = [] + for group_methods in _FETCHER_GROUPS: + group = [ + fetchers[m] + for m in group_methods + if m in fetchers and fetchers[m].is_available(track) + ] + if group: + plan.append(group) + + logger.debug(f"Fetch plan: {[[f.source_name for f in g] for g in plan]}") + return plan diff --git a/lrx_cli/fetchers/base.py b/lrx_cli/fetchers/base.py index 2bf70af..4bd7501 100644 --- a/lrx_cli/fetchers/base.py +++ b/lrx_cli/fetchers/base.py @@ -28,7 +28,7 @@ class BaseFetcher(ABC): pass @abstractmethod - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: """Fetch lyrics for the given track. Returns None if unable to fetch.""" diff --git a/lrx_cli/fetchers/cache_search.py b/lrx_cli/fetchers/cache_search.py index 119ab3a..c29d525 100644 --- a/lrx_cli/fetchers/cache_search.py +++ b/lrx_cli/fetchers/cache_search.py @@ -36,7 +36,7 @@ class CacheSearchFetcher(BaseFetcher): def is_available(self, track: TrackMeta) -> bool: return bool(track.title) - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: if bypass_cache: diff --git a/lrx_cli/fetchers/local.py b/lrx_cli/fetchers/local.py index ea3e832..837aea8 100644 --- a/lrx_cli/fetchers/local.py +++ b/lrx_cli/fetchers/local.py @@ -28,7 +28,7 @@ class LocalFetcher(BaseFetcher): def is_available(self, track: TrackMeta) -> bool: return track.is_local - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: """Attempt to read lyrics from local filesystem.""" diff --git a/lrx_cli/fetchers/lrclib.py b/lrx_cli/fetchers/lrclib.py index c236fe9..011c52c 100644 --- a/lrx_cli/fetchers/lrclib.py +++ b/lrx_cli/fetchers/lrclib.py @@ -34,7 +34,7 @@ class LrclibFetcher(BaseFetcher): def is_available(self, track: TrackMeta) -> bool: return track.is_complete - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: """Fetch lyrics from LRCLIB. Requires complete metadata.""" @@ -48,13 +48,12 @@ class LrclibFetcher(BaseFetcher): "album_name": track.album, "duration": track.length / 1000.0 if track.length else 0, } - url = f"{LRCLIB_API_URL}?{urlencode(params)}" logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}") try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.get(url, headers={"User-Agent": UA_LRX}) + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + resp = await client.get(url, headers={"User-Agent": UA_LRX}) if resp.status_code == 404: logger.debug(f"LRCLIB: not found for {track.display_name()}") @@ -67,8 +66,6 @@ class LrclibFetcher(BaseFetcher): ) data = resp.json() - - # Validate response if not isinstance(data, dict): logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}") return LyricResult( diff --git a/lrx_cli/fetchers/lrclib_search.py b/lrx_cli/fetchers/lrclib_search.py index a09820d..6cabb51 100644 --- a/lrx_cli/fetchers/lrclib_search.py +++ b/lrx_cli/fetchers/lrclib_search.py @@ -64,10 +64,9 @@ class LrclibSearchFetcher(BaseFetcher): return queries - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: - """Search LRCLIB for lyrics. Requires at least a title.""" if not track.title: logger.debug("LRCLIB-search: skipped — no title") return None @@ -80,11 +79,11 @@ class LrclibSearchFetcher(BaseFetcher): had_error = False try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: for params in queries: url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}" logger.debug(f"LRCLIB-search: query {params}") - resp = client.get(url, headers={"User-Agent": UA_LRX}) + resp = await client.get(url, headers={"User-Agent": UA_LRX}) if resp.status_code != 200: logger.error(f"LRCLIB-search: API returned {resp.status_code}") diff --git a/lrx_cli/fetchers/netease.py b/lrx_cli/fetchers/netease.py index d054a3f..adc182a 100644 --- a/lrx_cli/fetchers/netease.py +++ b/lrx_cli/fetchers/netease.py @@ -43,12 +43,9 @@ class NeteaseFetcher(BaseFetcher): def is_available(self, track: TrackMeta) -> bool: return bool(track.title) - def _search(self, track: TrackMeta, limit: int = 10) -> tuple[Optional[int], float]: - """Search Netease and return the best-matching song ID with confidence. - - When ``track.length`` is available, candidates are ranked by duration - difference and only accepted if within ``DURATION_TOLERANCE_MS``. - """ + async def _search( + self, track: TrackMeta, limit: int = 10 + ) -> tuple[Optional[int], float]: query = f"{track.artist or ''} {track.title or ''}".strip() if not query: return None, 0.0 @@ -56,8 +53,8 @@ class NeteaseFetcher(BaseFetcher): logger.debug(f"Netease: searching for '{query}' (limit={limit})") try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.post( + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + resp = await client.post( NETEASE_SEARCH_URL, headers=_HEADERS, data={"s": query, "type": "1", "limit": str(limit), "offset": "0"}, @@ -65,7 +62,6 @@ class NeteaseFetcher(BaseFetcher): resp.raise_for_status() result = resp.json() - # Validate response if not isinstance(result, dict): logger.error( f"Netease: search returned non-dict: {type(result).__name__}" @@ -118,15 +114,14 @@ class NeteaseFetcher(BaseFetcher): logger.error(f"Netease: search failed: {e}") return None, 0.0 - def _get_lyric( + async def _get_lyric( self, song_id: int, confidence: float = 0.0 ) -> Optional[LyricResult]: - """Fetch lyrics for a given Netease song ID.""" logger.debug(f"Netease: fetching lyrics for song_id={song_id}") try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.post( + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + resp = await client.post( NETEASE_LYRIC_URL, headers=_HEADERS, data={ @@ -144,7 +139,6 @@ class NeteaseFetcher(BaseFetcher): resp.raise_for_status() data = resp.json() - # Validate response if not isinstance(data, dict): logger.error( f"Netease: lyric response is not dict: {type(data).__name__}" @@ -165,7 +159,6 @@ class NeteaseFetcher(BaseFetcher): logger.debug(f"Netease: empty lyrics for song_id={song_id}") return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - # Determine sync status lrcdata = LRCData(lrc) status = lrcdata.detect_sync_status() logger.info( @@ -183,19 +176,18 @@ class NeteaseFetcher(BaseFetcher): logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}") return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: - """Search for the track and fetch its lyrics.""" query = f"{track.artist or ''} {track.title or ''}".strip() if not query: logger.debug("Netease: skipped — insufficient metadata") return None logger.info(f"Netease: fetching lyrics for {track.display_name()}") - song_id, confidence = self._search(track) + song_id, confidence = await self._search(track) if not song_id: logger.debug(f"Netease: no match found for {track.display_name()}") return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - return self._get_lyric(song_id, confidence=confidence) + return await self._get_lyric(song_id, confidence=confidence) diff --git a/lrx_cli/fetchers/qqmusic.py b/lrx_cli/fetchers/qqmusic.py index dfaef78..21dbe04 100644 --- a/lrx_cli/fetchers/qqmusic.py +++ b/lrx_cli/fetchers/qqmusic.py @@ -35,8 +35,9 @@ class QQMusicFetcher(BaseFetcher): def is_available(self, track: TrackMeta) -> bool: return bool(track.title) and bool(QQ_MUSIC_API_URL) - def _search(self, track: TrackMeta, limit: int = 10) -> tuple[Optional[str], float]: - """Search QQ Music and return the best-matching song MID with confidence.""" + async def _search( + self, track: TrackMeta, limit: int = 10 + ) -> tuple[Optional[str], float]: query = f"{track.artist or ''} {track.title or ''}".strip() if not query: return None, 0.0 @@ -44,8 +45,8 @@ class QQMusicFetcher(BaseFetcher): logger.debug(f"QQMusic: searching for '{query}' (limit={limit})") try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.get( + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + resp = await client.get( f"{QQ_MUSIC_API_URL}/api/search", params={"keyword": query, "type": "song", "num": limit}, ) @@ -97,13 +98,14 @@ class QQMusicFetcher(BaseFetcher): logger.error(f"QQMusic: search failed: {e}") return None, 0.0 - def _get_lyric(self, mid: str, confidence: float = 0.0) -> Optional[LyricResult]: - """Fetch lyrics for a given QQ Music song MID.""" + async def _get_lyric( + self, mid: str, confidence: float = 0.0 + ) -> Optional[LyricResult]: logger.debug(f"QQMusic: fetching lyrics for mid={mid}") try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.get( + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + resp = await client.get( f"{QQ_MUSIC_API_URL}/api/lyric", params={"mid": mid}, ) @@ -124,8 +126,7 @@ class QQMusicFetcher(BaseFetcher): lrcdata = LRCData(lrc) status = lrcdata.detect_sync_status() logger.info( - f"QQMusic: got {status.value} lyrics for mid={mid} " - f"({len(lrcdata)} lines)" + f"QQMusic: got {status.value} lyrics for mid={mid} ({len(lrcdata)} lines)" ) return LyricResult( status=status, @@ -138,10 +139,9 @@ class QQMusicFetcher(BaseFetcher): logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}") return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: - """Search for the track and fetch its lyrics.""" if not QQ_MUSIC_API_URL: logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured") return None @@ -152,9 +152,9 @@ class QQMusicFetcher(BaseFetcher): return None logger.info(f"QQMusic: fetching lyrics for {track.display_name()}") - mid, confidence = self._search(track) + mid, confidence = await self._search(track) if not mid: logger.debug(f"QQMusic: no match found for {track.display_name()}") return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - return self._get_lyric(mid, confidence=confidence) + return await self._get_lyric(mid, confidence=confidence) diff --git a/lrx_cli/fetchers/spotify.py b/lrx_cli/fetchers/spotify.py index 8175959..aedcfce 100644 --- a/lrx_cli/fetchers/spotify.py +++ b/lrx_cli/fetchers/spotify.py @@ -58,67 +58,6 @@ class SpotifyFetcher(BaseFetcher): def is_available(self, track: TrackMeta) -> bool: return bool(track.trackid) and bool(SPOTIFY_SP_DC) - # ─── Auth helpers ──────────────────────────────────────────────── - - def _get_server_time(self, client: httpx.Client) -> Optional[int]: - """Fetch Spotify's server timestamp (seconds since epoch).""" - try: - res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT) - res.raise_for_status() - data = res.json() - if not isinstance(data, dict) or "serverTime" not in data: - logger.error(f"Spotify: unexpected server-time response: {data}") - return None - server_time = data["serverTime"] - logger.debug(f"Spotify: server time = {server_time}") - return server_time - except Exception as e: - logger.error(f"Spotify: failed to fetch server time: {e}") - return None - - def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]: - """Fetch and decode the TOTP secret. Cached after first success. - - Response format: [{version: int, secret: str}, ...] - Each character in *secret* is XOR-decoded with ``(index % 33) + 9``. - """ - if self._cached_secret is not None: - logger.debug("Spotify: using cached TOTP secret") - return self._cached_secret - - try: - res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT) - res.raise_for_status() - data = res.json() - - if not isinstance(data, list) or len(data) == 0: - logger.error( - f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})" - ) - return None - - last = data[-1] - if "secret" not in last or "version" not in last: - logger.error(f"Spotify: malformed secret entry: {list(last.keys())}") - return None - - secret_raw = last["secret"] - version = last["version"] - - # XOR decode - parts = [] - for i, char in enumerate(secret_raw): - parts.append(str(ord(char) ^ ((i % 33) + 9))) - secret = "".join(parts) - - logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})") - self._cached_secret = (secret, version) - return self._cached_secret - - except Exception as e: - logger.error(f"Spotify: failed to fetch secret: {e}") - return None - @staticmethod def _generate_totp(server_time_s: int, secret: str) -> str: """Generate a 6-digit TOTP code compatible with Spotify's auth. @@ -169,26 +108,90 @@ class SpotifyFetcher(BaseFetcher): except Exception as e: logger.warning(f"Spotify: failed to write token cache: {e}") - def _get_token(self) -> Optional[str]: - """Obtain a Spotify access token. Cached in memory and on disk. + @staticmethod + def _format_lrc_line(start_ms: int, words: str) -> str: + """Format a single lyric line as LRC ``[mm:ss.cc]text``.""" + minutes = start_ms // 60000 + seconds = (start_ms // 1000) % 60 + centiseconds = round((start_ms % 1000) / 10.0) + return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}" - Requires SP_DC cookie (set via SPOTIFY_SP_DC env var). - """ - # 1. Memory cache + @staticmethod + def _is_truly_synced(lines: list[dict]) -> bool: + """Check if lyrics are actually synced (not all timestamps zero).""" + for line in lines: + try: + ms = int(line.get("startTimeMs", "0")) + if ms > 0: + return True + except (ValueError, TypeError): + continue + return False + + async def _get_server_time(self, client: httpx.AsyncClient) -> Optional[int]: + try: + res = await client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT) + res.raise_for_status() + data = res.json() + if not isinstance(data, dict) or "serverTime" not in data: + logger.error(f"Spotify: unexpected server-time response: {data}") + return None + server_time = data["serverTime"] + logger.debug(f"Spotify: server time = {server_time}") + return server_time + except Exception as e: + logger.error(f"Spotify: failed to fetch server time: {e}") + return None + + async def _get_secret(self, client: httpx.AsyncClient) -> Optional[Tuple[str, int]]: + if self._cached_secret is not None: + logger.debug("Spotify: using cached TOTP secret") + return self._cached_secret + + try: + res = await client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT) + res.raise_for_status() + data = res.json() + + if not isinstance(data, list) or len(data) == 0: + logger.error( + f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})" + ) + return None + + last = data[-1] + if "secret" not in last or "version" not in last: + logger.error(f"Spotify: malformed secret entry: {list(last.keys())}") + return None + + secret_raw = last["secret"] + version = last["version"] + + parts = [] + for i, char in enumerate(secret_raw): + parts.append(str(ord(char) ^ ((i % 33) + 9))) + secret = "".join(parts) + + logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})") + self._cached_secret = (secret, version) + return self._cached_secret + + except Exception as e: + logger.error(f"Spotify: failed to fetch secret: {e}") + return None + + async def _get_token(self) -> Optional[str]: if self._cached_token and time.time() < self._token_expires_at - 30: logger.debug("Spotify: using in-memory cached token") return self._cached_token - # 2. Disk cache disk_token = self._load_cached_token() if disk_token and time.time() < self._token_expires_at - 30: return disk_token - # 3. Fetch new token if not SPOTIFY_SP_DC: logger.error( - "Spotify: SPOTIFY_SP_DC env var not set — " - "cannot authenticate with Spotify" + "Spotify: SPOTIFY_SP_DC env var not set — cannot authenticate with Spotify" ) return None @@ -199,12 +202,12 @@ class SpotifyFetcher(BaseFetcher): "Cookie": f"sp_dc={SPOTIFY_SP_DC}", } - with httpx.Client(headers=headers) as client: - server_time = self._get_server_time(client) + async with httpx.AsyncClient(headers=headers) as client: + server_time = await self._get_server_time(client) if server_time is None: return None - secret_data = self._get_secret(client) + secret_data = await self._get_secret(client) if secret_data is None: return None @@ -221,7 +224,9 @@ class SpotifyFetcher(BaseFetcher): } try: - res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT) + res = await client.get( + SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT + ) if res.status_code != 200: logger.error(f"Spotify: token request returned {res.status_code}") return None @@ -249,7 +254,6 @@ class SpotifyFetcher(BaseFetcher): self._token_expires_at = time.time() + 3600 self._cached_token = token - # Persist to disk (including anonymous tokens, same as Go ref) self._save_token(body) logger.debug("Spotify: obtained access token") return token @@ -258,39 +262,16 @@ class SpotifyFetcher(BaseFetcher): logger.error(f"Spotify: token request failed: {e}") return None - # ─── Lyrics ────────────────────────────────────────────────────── - - @staticmethod - def _format_lrc_line(start_ms: int, words: str) -> str: - """Format a single lyric line as LRC ``[mm:ss.cc]text``.""" - minutes = start_ms // 60000 - seconds = (start_ms // 1000) % 60 - centiseconds = round((start_ms % 1000) / 10.0) - return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}" - - @staticmethod - def _is_truly_synced(lines: list[dict]) -> bool: - """Check if lyrics are actually synced (not all timestamps zero).""" - for line in lines: - try: - ms = int(line.get("startTimeMs", "0")) - if ms > 0: - return True - except (ValueError, TypeError): - continue - return False - - def fetch( + async def fetch( self, track: TrackMeta, bypass_cache: bool = False ) -> Optional[LyricResult]: - """Fetch lyrics for a Spotify track by its track ID.""" if not track.trackid: logger.debug("Spotify: skipped — no trackid in metadata") return None logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}") - token = self._get_token() + token = await self._get_token() if not token: logger.error("Spotify: cannot fetch lyrics without a token") return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) @@ -307,8 +288,8 @@ class SpotifyFetcher(BaseFetcher): } try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - res = client.get(url, headers=headers) + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: + res = await client.get(url, headers=headers) if res.status_code == 404: logger.debug(f"Spotify: 404 for trackid={track.trackid}") @@ -322,7 +303,6 @@ class SpotifyFetcher(BaseFetcher): data = res.json() - # Validate response structure if not isinstance(data, dict) or "lyrics" not in data: logger.error("Spotify: unexpected lyrics response structure") return LyricResult( @@ -337,11 +317,8 @@ class SpotifyFetcher(BaseFetcher): logger.debug("Spotify: response contained no lyric lines") return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - # Determine sync status - # syncType == "LINE_SYNCED" AND at least one non-zero timestamp is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines) - # Convert to LRC lrc_lines: list[str] = [] for line in lines: words = line.get("words", "") @@ -355,7 +332,6 @@ class SpotifyFetcher(BaseFetcher): if is_synced: lrc_lines.append(self._format_lrc_line(ms, words)) else: - # Unsynced: emit with zero timestamps lrc_lines.append(f"[00:00.00]{words}") content = LRCData("\n".join(lrc_lines)) diff --git a/pyproject.toml b/pyproject.toml index b906f60..296b6ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "lrx-cli" -version = "0.3.5" +version = "0.4.0" description = "Fetch line-synced lyrics for your music player." readme = "README.md" requires-python = ">=3.13" diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..b3ca397 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import asyncio +from unittest.mock import patch + +from lrx_cli.config import HIGH_CONFIDENCE +from lrx_cli.core import LrcManager +from lrx_cli.fetchers.base import BaseFetcher +from lrx_cli.lrc import LRCData +from lrx_cli.models import CacheStatus, LyricResult, TrackMeta + + +# Helpers + + +def _track(**kwargs) -> TrackMeta: + defaults = dict(artist="Artist", title="Song", album="Album", length=180000) + defaults.update(kwargs) + return TrackMeta(**defaults) # type: ignore + + +def _synced(source: str, confidence: float = HIGH_CONFIDENCE) -> LyricResult: + return LyricResult( + status=CacheStatus.SUCCESS_SYNCED, + lyrics=LRCData("[00:01.00]lyrics"), + source=source, + confidence=confidence, + ) + + +def _unsynced(source: str, confidence: float = 60.0) -> LyricResult: + return LyricResult( + status=CacheStatus.SUCCESS_UNSYNCED, + lyrics=LRCData("lyrics"), + source=source, + confidence=confidence, + ) + + +def _not_found() -> LyricResult: + return LyricResult(status=CacheStatus.NOT_FOUND) + + +class MockFetcher(BaseFetcher): + def __init__(self, name: str, result: LyricResult | None, delay: float = 0.0): + self._name = name + self._result = result + self._delay = delay + self.called = False + self.completed = False + + @property + def source_name(self) -> str: + return self._name + + def is_available(self, track: TrackMeta) -> bool: + return True + + async def fetch( + self, track: TrackMeta, bypass_cache: bool = False + ) -> LyricResult | None: + self.called = True + try: + if self._delay: + await asyncio.sleep(self._delay) + self.completed = True + return self._result + except asyncio.CancelledError: + raise + + +def make_manager(tmp_path) -> LrcManager: + return LrcManager(db_path=str(tmp_path / "cache.db")) + + +# Between-group termination + + +def test_unsynced_does_not_stop_next_group(tmp_path): + """Unsynced result should not stop the pipeline — next group must still run.""" + a = MockFetcher("a", _unsynced("a")) + b = MockFetcher("b", _synced("b")) + manager = make_manager(tmp_path) + with patch("lrx_cli.core.build_plan", return_value=[[a], [b]]): + result = manager.fetch_for_track(_track()) + assert b.called + assert result is not None + assert result.source == "b" + + +def test_trusted_synced_stops_next_group(tmp_path): + """Trusted synced from group1 must prevent group2 from running.""" + a = MockFetcher("a", _synced("a")) + b = MockFetcher("b", _synced("b")) + manager = make_manager(tmp_path) + with patch("lrx_cli.core.build_plan", return_value=[[a], [b]]): + result = manager.fetch_for_track(_track()) + assert not b.called + assert result is not None + assert result.source == "a" + + +def test_negative_continues_next_group(tmp_path): + """NOT_FOUND from group1 must cause group2 to be tried.""" + a = MockFetcher("a", _not_found()) + b = MockFetcher("b", _synced("b")) + manager = make_manager(tmp_path) + with patch("lrx_cli.core.build_plan", return_value=[[a], [b]]): + result = manager.fetch_for_track(_track()) + assert a.called + assert b.called + assert result is not None + assert result.source == "b" + + +# Within-group behaviour + + +def test_trusted_synced_cancels_sibling(tmp_path): + """When a fast fetcher returns trusted synced, the slow sibling must be cancelled. + If cancellation is broken this test will block for 10 seconds.""" + fast = MockFetcher("fast", _synced("fast")) + slow = MockFetcher("slow", _synced("slow"), delay=10.0) + manager = make_manager(tmp_path) + with patch("lrx_cli.core.build_plan", return_value=[[fast, slow]]): + result = manager.fetch_for_track(_track()) + assert fast.called + assert slow.called # task was started + assert not slow.completed # but cancelled before finishing + assert result is not None + assert result.source == "fast" + + +def test_best_confidence_within_group(tmp_path): + """When no trusted synced result, highest-confidence result from group is returned.""" + low = MockFetcher("low", _unsynced("low", confidence=40.0)) + high = MockFetcher("high", _unsynced("high", confidence=70.0)) + manager = make_manager(tmp_path) + with patch("lrx_cli.core.build_plan", return_value=[[low, high]]): + result = manager.fetch_for_track(_track()) + assert result is not None + assert result.source == "high" + + +# Cache interaction + + +def test_cache_negative_skips_fetch(tmp_path): + """A cached NOT_FOUND entry must prevent the fetcher from being called.""" + fetcher = MockFetcher("src", _synced("src")) + manager = make_manager(tmp_path) + track = _track() + manager.cache.set(track, "src", _not_found(), ttl_seconds=3600) + with patch("lrx_cli.core.build_plan", return_value=[[fetcher]]): + result = manager.fetch_for_track(track) + assert not fetcher.called + assert result is None + + +def test_cache_trusted_synced_no_fetch(tmp_path): + """A trusted synced cache hit must be returned without calling the fetcher.""" + fetcher = MockFetcher("src", None) + manager = make_manager(tmp_path) + track = _track() + manager.cache.set(track, "src", _synced("src"), ttl_seconds=3600) + with patch("lrx_cli.core.build_plan", return_value=[[fetcher]]): + result = manager.fetch_for_track(track) + assert not fetcher.called + assert result is not None + assert result.status == CacheStatus.SUCCESS_SYNCED diff --git a/uv.lock b/uv.lock index 9fc4d6d..6f37c3f 100644 --- a/uv.lock +++ b/uv.lock @@ -153,7 +153,7 @@ wheels = [ [[package]] name = "lrx-cli" -version = "0.3.3" +version = "0.4.0" source = { editable = "." } dependencies = [ { name = "cyclopts" },