refactor: async parallel fetching

This commit is contained in:
2026-04-03 18:39:31 +02:00
parent aab23e6b8d
commit 0eabea6a50
13 changed files with 452 additions and 256 deletions
+125 -100
View File
@@ -4,18 +4,11 @@ Date: 2026-03-25 11:09:53
Description: Core orchestrator — coordinates fetchers with cache-aware fallback Description: Core orchestrator — coordinates fetchers with cache-aware fallback
""" """
""" import asyncio
Fetch pipeline:
1. Check cache for each source in the fallback sequence
2. For sources without a valid cache hit, call the fetcher
3. Cache every result (success, not-found, or error) per source
4. Return the best result by confidence (highest wins)
"""
from typing import Optional from typing import Optional
from loguru import logger from loguru import logger
from .fetchers import FetcherMethodType, create_fetchers from .fetchers import FetcherMethodType, build_plan, create_fetchers
from .fetchers.base import BaseFetcher from .fetchers.base import BaseFetcher
from .cache import CacheEngine from .cache import CacheEngine
from .lrc import LRCData from .lrc import LRCData
@@ -50,6 +43,19 @@ def _is_better(new: LyricResult, old: LyricResult) -> bool:
) )
def _normalize_result(result: LyricResult) -> LyricResult:
"""Normalize unsynced lyrics before returning."""
if result.status == CacheStatus.SUCCESS_UNSYNCED and result.lyrics:
return LyricResult(
status=result.status,
lyrics=result.lyrics.normalize_unsynced(),
source=result.source,
ttl=result.ttl,
confidence=result.confidence,
)
return result
class LrcManager: class LrcManager:
"""Main entry point for fetching lyrics with caching.""" """Main entry point for fetching lyrics with caching."""
@@ -57,54 +63,18 @@ class LrcManager:
self.cache = CacheEngine(db_path=db_path) self.cache = CacheEngine(db_path=db_path)
self.fetchers = create_fetchers(self.cache) self.fetchers = create_fetchers(self.cache)
def _build_sequence( async def _run_group(
self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None
) -> list[BaseFetcher]:
"""Determine the ordered list of fetchers to try."""
if force_method:
if force_method not in self.fetchers:
logger.error(f"Unknown method: {force_method}")
return []
return [self.fetchers[force_method]]
sequence: list[BaseFetcher] = []
for method in self.fetchers.keys():
if self.fetchers[method].is_available(track):
sequence.append(self.fetchers[method])
logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}")
return sequence
def fetch_for_track(
self, self,
group: list[BaseFetcher],
track: TrackMeta, track: TrackMeta,
force_method: Optional[FetcherMethodType] = None, bypass_cache: bool,
bypass_cache: bool = False, ) -> list[tuple[str, LyricResult]]:
) -> Optional[LyricResult]: """Run one group: cache-check first, then parallel-fetch uncached. Returns (source, result) pairs."""
"""Fetch lyrics for *track* using the fallback pipeline. cached_results: list[tuple[str, LyricResult]] = []
need_fetch: list[BaseFetcher] = []
Each source is checked against the cache independently: for fetcher in group:
- Cache hit with synced lyrics → return immediately
- Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source
- Cache miss or unsynced → call fetcher, then cache the result
After all sources are tried, returns the best result found
(highest confidence wins).
"""
track = enrich_track(track)
logger.info(f"Fetching lyrics for: {track.display_name()}")
sequence = self._build_sequence(track, force_method)
if not sequence:
return None
# Best result seen so far (highest confidence wins)
best_result: Optional[LyricResult] = None
for fetcher in sequence:
source = fetcher.source_name source = fetcher.source_name
# Cache check (skip for fetchers that handle their own caching)
if not bypass_cache and not fetcher.self_cached: if not bypass_cache and not fetcher.self_cached:
cached = self.cache.get(track, source) cached = self.cache.get(track, source)
if cached: if cached:
@@ -116,75 +86,130 @@ class LrcManager:
f"[{source}] cache hit: {cached.status.value}, skipping" f"[{source}] cache hit: {cached.status.value}, skipping"
) )
continue continue
# Positive cache hit — apply the same confidence evaluation
# as fresh fetches so that low-confidence cached results
# don't block better results from later fetchers.
is_trusted = cached.confidence >= HIGH_CONFIDENCE is_trusted = cached.confidence >= HIGH_CONFIDENCE
logger.info( logger.info(
f"[{source}] cache hit: {cached.status.value}" f"[{source}] cache hit: {cached.status.value}"
f" (confidence={cached.confidence:.0f})" f" (confidence={cached.confidence:.0f})"
) )
cached_results.append((source, cached))
# Return immediately on trusted synced cache hit
if cached.status == CacheStatus.SUCCESS_SYNCED and is_trusted: if cached.status == CacheStatus.SUCCESS_SYNCED and is_trusted:
return cached return cached_results
if best_result is None or _is_better(cached, best_result):
best_result = cached
continue continue
elif not fetcher.self_cached: elif not fetcher.self_cached:
logger.debug(f"[{source}] cache bypassed") logger.debug(f"[{source}] cache bypassed")
need_fetch.append(fetcher)
# Fetch if need_fetch:
logger.debug(f"[{source}] calling fetcher...") task_map: dict[asyncio.Task, BaseFetcher] = {
result = fetcher.fetch(track, bypass_cache=bypass_cache) asyncio.create_task(f.fetch(track, bypass_cache=bypass_cache)): f
for f in need_fetch
}
pending = set(task_map)
if not result: while pending:
logger.debug(f"[{source}] returned None (no result)") done, pending = await asyncio.wait(
continue pending, return_when=asyncio.FIRST_COMPLETED
# Cache the result (skip for self-cached fetchers)
if not fetcher.self_cached and not bypass_cache:
ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND)
self.cache.set(track, source, result, ttl_seconds=ttl)
# Evaluate result
if result.status in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
is_trusted = result.confidence >= HIGH_CONFIDENCE
logger.info(
f"[{source}] got {result.status.value} lyrics"
f" (confidence={result.confidence:.0f})"
) )
found_trusted = False
for task in done:
fetcher = task_map[task]
source = fetcher.source_name
try:
result = task.result()
except Exception as e:
logger.error(f"[{source}] fetch raised: {e}")
continue
if result is None:
logger.debug(f"[{source}] returned None")
continue
if not fetcher.self_cached and not bypass_cache:
ttl = result.ttl or _STATUS_TTL.get(
result.status, TTL_NOT_FOUND
)
self.cache.set(track, source, result, ttl_seconds=ttl)
if result.status in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
logger.info(
f"[{source}] got {result.status.value} lyrics"
f" (confidence={result.confidence:.0f})"
)
cached_results.append((source, result))
if (
result.status == CacheStatus.SUCCESS_SYNCED
and result.confidence >= HIGH_CONFIDENCE
):
found_trusted = True
if found_trusted:
for t in pending:
t.cancel()
await asyncio.gather(*pending, return_exceptions=True)
break
return cached_results
async def _fetch_for_track(
self,
track: TrackMeta,
force_method: Optional[FetcherMethodType],
bypass_cache: bool,
) -> Optional[LyricResult]:
track = enrich_track(track)
logger.info(f"Fetching lyrics for: {track.display_name()}")
plan = build_plan(self.fetchers, track, force_method)
if not plan:
return None
best_result: Optional[LyricResult] = None
for group in plan:
group_results = await self._run_group(group, track, bypass_cache)
for source, result in group_results:
if result.status not in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
continue
is_trusted = result.confidence >= HIGH_CONFIDENCE
# Trusted synced → return immediately # Trusted synced → return immediately
if result.status == CacheStatus.SUCCESS_SYNCED and is_trusted: if result.status == CacheStatus.SUCCESS_SYNCED and is_trusted:
return result logger.info(
# Track best result by confidence f"Returning {result.status.value} lyrics from {source}"
f" (confidence={result.confidence:.0f})"
)
return _normalize_result(result)
if best_result is None or _is_better(result, best_result): if best_result is None or _is_better(result, best_result):
best_result = result best_result = result
# NOT_FOUND / NETWORK_ERROR: already cached, try next
# Return best available
if best_result: if best_result:
if (
best_result.status == CacheStatus.SUCCESS_UNSYNCED
and best_result.lyrics
):
best_result = LyricResult(
status=best_result.status,
lyrics=best_result.lyrics.normalize_unsynced(),
source=best_result.source,
ttl=best_result.ttl,
confidence=best_result.confidence,
)
logger.info( logger.info(
f"Returning {best_result.status.value} lyrics from {best_result.source}" f"Returning {best_result.status.value} lyrics from {best_result.source}"
) )
else: return _normalize_result(best_result)
logger.info(f"No lyrics found for {track.display_name()}")
return best_result logger.info(f"No lyrics found for {track.display_name()}")
return None
def fetch_for_track(
self,
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
bypass_cache: bool = False,
) -> Optional[LyricResult]:
"""Fetch lyrics for *track* using the group-based parallel pipeline."""
return asyncio.run(self._fetch_for_track(track, force_method, bypass_cache))
def manual_insert( def manual_insert(
self, self,
+38 -1
View File
@@ -4,7 +4,8 @@ Date: 2026-03-25 02:33:26
Description: Fetcher pipeline — registry and types Description: Fetcher pipeline — registry and types
""" """
from typing import Literal from typing import Literal, Optional
from loguru import logger
from .base import BaseFetcher from .base import BaseFetcher
from .local import LocalFetcher from .local import LocalFetcher
@@ -15,6 +16,7 @@ from .lrclib_search import LrclibSearchFetcher
from .netease import NeteaseFetcher from .netease import NeteaseFetcher
from .qqmusic import QQMusicFetcher from .qqmusic import QQMusicFetcher
from ..cache import CacheEngine from ..cache import CacheEngine
from ..models import TrackMeta
FetcherMethodType = Literal[ FetcherMethodType = Literal[
"local", "local",
@@ -26,6 +28,15 @@ FetcherMethodType = Literal[
"qqmusic", "qqmusic",
] ]
# Fetchers within a group run in parallel; groups run sequentially.
# A group that produces any positive result stops the pipeline.
_FETCHER_GROUPS: list[list[FetcherMethodType]] = [
["local"],
["cache-search"],
["spotify", "lrclib"],
["lrclib-search", "netease", "qqmusic"],
]
def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]: def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]:
"""Instantiate all fetchers. Returns a dict keyed by source name.""" """Instantiate all fetchers. Returns a dict keyed by source name."""
@@ -39,3 +50,29 @@ def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]:
"qqmusic": QQMusicFetcher(), "qqmusic": QQMusicFetcher(),
} }
return fetchers return fetchers
def build_plan(
fetchers: dict[FetcherMethodType, BaseFetcher],
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
) -> list[list[BaseFetcher]]:
"""Return the fetch plan as a list of groups (each group runs in parallel)."""
if force_method:
if force_method not in fetchers:
logger.error(f"Unknown method: {force_method}")
return []
return [[fetchers[force_method]]]
plan: list[list[BaseFetcher]] = []
for group_methods in _FETCHER_GROUPS:
group = [
fetchers[m]
for m in group_methods
if m in fetchers and fetchers[m].is_available(track)
]
if group:
plan.append(group)
logger.debug(f"Fetch plan: {[[f.source_name for f in g] for g in plan]}")
return plan
+1 -1
View File
@@ -28,7 +28,7 @@ class BaseFetcher(ABC):
pass pass
@abstractmethod @abstractmethod
def fetch( async def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Fetch lyrics for the given track. Returns None if unable to fetch.""" """Fetch lyrics for the given track. Returns None if unable to fetch."""
+1 -1
View File
@@ -36,7 +36,7 @@ class CacheSearchFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) return bool(track.title)
def fetch( async def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
if bypass_cache: if bypass_cache:
+1 -1
View File
@@ -28,7 +28,7 @@ class LocalFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return track.is_local return track.is_local
def fetch( async def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Attempt to read lyrics from local filesystem.""" """Attempt to read lyrics from local filesystem."""
+3 -6
View File
@@ -34,7 +34,7 @@ class LrclibFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return track.is_complete return track.is_complete
def fetch( async def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Fetch lyrics from LRCLIB. Requires complete metadata.""" """Fetch lyrics from LRCLIB. Requires complete metadata."""
@@ -48,13 +48,12 @@ class LrclibFetcher(BaseFetcher):
"album_name": track.album, "album_name": track.album,
"duration": track.length / 1000.0 if track.length else 0, "duration": track.length / 1000.0 if track.length else 0,
} }
url = f"{LRCLIB_API_URL}?{urlencode(params)}" url = f"{LRCLIB_API_URL}?{urlencode(params)}"
logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}") logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
try: try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = client.get(url, headers={"User-Agent": UA_LRX}) resp = await client.get(url, headers={"User-Agent": UA_LRX})
if resp.status_code == 404: if resp.status_code == 404:
logger.debug(f"LRCLIB: not found for {track.display_name()}") logger.debug(f"LRCLIB: not found for {track.display_name()}")
@@ -67,8 +66,6 @@ class LrclibFetcher(BaseFetcher):
) )
data = resp.json() data = resp.json()
# Validate response
if not isinstance(data, dict): if not isinstance(data, dict):
logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}") logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
return LyricResult( return LyricResult(
+3 -4
View File
@@ -64,10 +64,9 @@ class LrclibSearchFetcher(BaseFetcher):
return queries return queries
def fetch( async def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Search LRCLIB for lyrics. Requires at least a title."""
if not track.title: if not track.title:
logger.debug("LRCLIB-search: skipped — no title") logger.debug("LRCLIB-search: skipped — no title")
return None return None
@@ -80,11 +79,11 @@ class LrclibSearchFetcher(BaseFetcher):
had_error = False had_error = False
try: try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
for params in queries: for params in queries:
url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}" url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
logger.debug(f"LRCLIB-search: query {params}") logger.debug(f"LRCLIB-search: query {params}")
resp = client.get(url, headers={"User-Agent": UA_LRX}) resp = await client.get(url, headers={"User-Agent": UA_LRX})
if resp.status_code != 200: if resp.status_code != 200:
logger.error(f"LRCLIB-search: API returned {resp.status_code}") logger.error(f"LRCLIB-search: API returned {resp.status_code}")
+11 -19
View File
@@ -43,12 +43,9 @@ class NeteaseFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) return bool(track.title)
def _search(self, track: TrackMeta, limit: int = 10) -> tuple[Optional[int], float]: async def _search(
"""Search Netease and return the best-matching song ID with confidence. self, track: TrackMeta, limit: int = 10
) -> tuple[Optional[int], float]:
When ``track.length`` is available, candidates are ranked by duration
difference and only accepted if within ``DURATION_TOLERANCE_MS``.
"""
query = f"{track.artist or ''} {track.title or ''}".strip() query = f"{track.artist or ''} {track.title or ''}".strip()
if not query: if not query:
return None, 0.0 return None, 0.0
@@ -56,8 +53,8 @@ class NeteaseFetcher(BaseFetcher):
logger.debug(f"Netease: searching for '{query}' (limit={limit})") logger.debug(f"Netease: searching for '{query}' (limit={limit})")
try: try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = client.post( resp = await client.post(
NETEASE_SEARCH_URL, NETEASE_SEARCH_URL,
headers=_HEADERS, headers=_HEADERS,
data={"s": query, "type": "1", "limit": str(limit), "offset": "0"}, data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
@@ -65,7 +62,6 @@ class NeteaseFetcher(BaseFetcher):
resp.raise_for_status() resp.raise_for_status()
result = resp.json() result = resp.json()
# Validate response
if not isinstance(result, dict): if not isinstance(result, dict):
logger.error( logger.error(
f"Netease: search returned non-dict: {type(result).__name__}" f"Netease: search returned non-dict: {type(result).__name__}"
@@ -118,15 +114,14 @@ class NeteaseFetcher(BaseFetcher):
logger.error(f"Netease: search failed: {e}") logger.error(f"Netease: search failed: {e}")
return None, 0.0 return None, 0.0
def _get_lyric( async def _get_lyric(
self, song_id: int, confidence: float = 0.0 self, song_id: int, confidence: float = 0.0
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Fetch lyrics for a given Netease song ID."""
logger.debug(f"Netease: fetching lyrics for song_id={song_id}") logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
try: try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = client.post( resp = await client.post(
NETEASE_LYRIC_URL, NETEASE_LYRIC_URL,
headers=_HEADERS, headers=_HEADERS,
data={ data={
@@ -144,7 +139,6 @@ class NeteaseFetcher(BaseFetcher):
resp.raise_for_status() resp.raise_for_status()
data = resp.json() data = resp.json()
# Validate response
if not isinstance(data, dict): if not isinstance(data, dict):
logger.error( logger.error(
f"Netease: lyric response is not dict: {type(data).__name__}" f"Netease: lyric response is not dict: {type(data).__name__}"
@@ -165,7 +159,6 @@ class NeteaseFetcher(BaseFetcher):
logger.debug(f"Netease: empty lyrics for song_id={song_id}") logger.debug(f"Netease: empty lyrics for song_id={song_id}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
# Determine sync status
lrcdata = LRCData(lrc) lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status() status = lrcdata.detect_sync_status()
logger.info( logger.info(
@@ -183,19 +176,18 @@ class NeteaseFetcher(BaseFetcher):
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}") logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
def fetch( async def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Search for the track and fetch its lyrics."""
query = f"{track.artist or ''} {track.title or ''}".strip() query = f"{track.artist or ''} {track.title or ''}".strip()
if not query: if not query:
logger.debug("Netease: skipped — insufficient metadata") logger.debug("Netease: skipped — insufficient metadata")
return None return None
logger.info(f"Netease: fetching lyrics for {track.display_name()}") logger.info(f"Netease: fetching lyrics for {track.display_name()}")
song_id, confidence = self._search(track) song_id, confidence = await self._search(track)
if not song_id: if not song_id:
logger.debug(f"Netease: no match found for {track.display_name()}") logger.debug(f"Netease: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return self._get_lyric(song_id, confidence=confidence) return await self._get_lyric(song_id, confidence=confidence)
+14 -14
View File
@@ -35,8 +35,9 @@ class QQMusicFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and bool(QQ_MUSIC_API_URL) return bool(track.title) and bool(QQ_MUSIC_API_URL)
def _search(self, track: TrackMeta, limit: int = 10) -> tuple[Optional[str], float]: async def _search(
"""Search QQ Music and return the best-matching song MID with confidence.""" self, track: TrackMeta, limit: int = 10
) -> tuple[Optional[str], float]:
query = f"{track.artist or ''} {track.title or ''}".strip() query = f"{track.artist or ''} {track.title or ''}".strip()
if not query: if not query:
return None, 0.0 return None, 0.0
@@ -44,8 +45,8 @@ class QQMusicFetcher(BaseFetcher):
logger.debug(f"QQMusic: searching for '{query}' (limit={limit})") logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
try: try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = client.get( resp = await client.get(
f"{QQ_MUSIC_API_URL}/api/search", f"{QQ_MUSIC_API_URL}/api/search",
params={"keyword": query, "type": "song", "num": limit}, params={"keyword": query, "type": "song", "num": limit},
) )
@@ -97,13 +98,14 @@ class QQMusicFetcher(BaseFetcher):
logger.error(f"QQMusic: search failed: {e}") logger.error(f"QQMusic: search failed: {e}")
return None, 0.0 return None, 0.0
def _get_lyric(self, mid: str, confidence: float = 0.0) -> Optional[LyricResult]: async def _get_lyric(
"""Fetch lyrics for a given QQ Music song MID.""" self, mid: str, confidence: float = 0.0
) -> Optional[LyricResult]:
logger.debug(f"QQMusic: fetching lyrics for mid={mid}") logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
try: try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = client.get( resp = await client.get(
f"{QQ_MUSIC_API_URL}/api/lyric", f"{QQ_MUSIC_API_URL}/api/lyric",
params={"mid": mid}, params={"mid": mid},
) )
@@ -124,8 +126,7 @@ class QQMusicFetcher(BaseFetcher):
lrcdata = LRCData(lrc) lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status() status = lrcdata.detect_sync_status()
logger.info( logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} " f"QQMusic: got {status.value} lyrics for mid={mid} ({len(lrcdata)} lines)"
f"({len(lrcdata)} lines)"
) )
return LyricResult( return LyricResult(
status=status, status=status,
@@ -138,10 +139,9 @@ class QQMusicFetcher(BaseFetcher):
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}") logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
def fetch( async def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Search for the track and fetch its lyrics."""
if not QQ_MUSIC_API_URL: if not QQ_MUSIC_API_URL:
logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured") logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured")
return None return None
@@ -152,9 +152,9 @@ class QQMusicFetcher(BaseFetcher):
return None return None
logger.info(f"QQMusic: fetching lyrics for {track.display_name()}") logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
mid, confidence = self._search(track) mid, confidence = await self._search(track)
if not mid: if not mid:
logger.debug(f"QQMusic: no match found for {track.display_name()}") logger.debug(f"QQMusic: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return self._get_lyric(mid, confidence=confidence) return await self._get_lyric(mid, confidence=confidence)
+83 -107
View File
@@ -58,67 +58,6 @@ class SpotifyFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool: def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and bool(SPOTIFY_SP_DC) return bool(track.trackid) and bool(SPOTIFY_SP_DC)
# ─── Auth helpers ────────────────────────────────────────────────
def _get_server_time(self, client: httpx.Client) -> Optional[int]:
"""Fetch Spotify's server timestamp (seconds since epoch)."""
try:
res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, dict) or "serverTime" not in data:
logger.error(f"Spotify: unexpected server-time response: {data}")
return None
server_time = data["serverTime"]
logger.debug(f"Spotify: server time = {server_time}")
return server_time
except Exception as e:
logger.error(f"Spotify: failed to fetch server time: {e}")
return None
def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]:
"""Fetch and decode the TOTP secret. Cached after first success.
Response format: [{version: int, secret: str}, ...]
Each character in *secret* is XOR-decoded with ``(index % 33) + 9``.
"""
if self._cached_secret is not None:
logger.debug("Spotify: using cached TOTP secret")
return self._cached_secret
try:
res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, list) or len(data) == 0:
logger.error(
f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})"
)
return None
last = data[-1]
if "secret" not in last or "version" not in last:
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
return None
secret_raw = last["secret"]
version = last["version"]
# XOR decode
parts = []
for i, char in enumerate(secret_raw):
parts.append(str(ord(char) ^ ((i % 33) + 9)))
secret = "".join(parts)
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
self._cached_secret = (secret, version)
return self._cached_secret
except Exception as e:
logger.error(f"Spotify: failed to fetch secret: {e}")
return None
@staticmethod @staticmethod
def _generate_totp(server_time_s: int, secret: str) -> str: def _generate_totp(server_time_s: int, secret: str) -> str:
"""Generate a 6-digit TOTP code compatible with Spotify's auth. """Generate a 6-digit TOTP code compatible with Spotify's auth.
@@ -169,26 +108,90 @@ class SpotifyFetcher(BaseFetcher):
except Exception as e: except Exception as e:
logger.warning(f"Spotify: failed to write token cache: {e}") logger.warning(f"Spotify: failed to write token cache: {e}")
def _get_token(self) -> Optional[str]: @staticmethod
"""Obtain a Spotify access token. Cached in memory and on disk. def _format_lrc_line(start_ms: int, words: str) -> str:
"""Format a single lyric line as LRC ``[mm:ss.cc]text``."""
minutes = start_ms // 60000
seconds = (start_ms // 1000) % 60
centiseconds = round((start_ms % 1000) / 10.0)
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
Requires SP_DC cookie (set via SPOTIFY_SP_DC env var). @staticmethod
""" def _is_truly_synced(lines: list[dict]) -> bool:
# 1. Memory cache """Check if lyrics are actually synced (not all timestamps zero)."""
for line in lines:
try:
ms = int(line.get("startTimeMs", "0"))
if ms > 0:
return True
except (ValueError, TypeError):
continue
return False
async def _get_server_time(self, client: httpx.AsyncClient) -> Optional[int]:
try:
res = await client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, dict) or "serverTime" not in data:
logger.error(f"Spotify: unexpected server-time response: {data}")
return None
server_time = data["serverTime"]
logger.debug(f"Spotify: server time = {server_time}")
return server_time
except Exception as e:
logger.error(f"Spotify: failed to fetch server time: {e}")
return None
async def _get_secret(self, client: httpx.AsyncClient) -> Optional[Tuple[str, int]]:
if self._cached_secret is not None:
logger.debug("Spotify: using cached TOTP secret")
return self._cached_secret
try:
res = await client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, list) or len(data) == 0:
logger.error(
f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})"
)
return None
last = data[-1]
if "secret" not in last or "version" not in last:
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
return None
secret_raw = last["secret"]
version = last["version"]
parts = []
for i, char in enumerate(secret_raw):
parts.append(str(ord(char) ^ ((i % 33) + 9)))
secret = "".join(parts)
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
self._cached_secret = (secret, version)
return self._cached_secret
except Exception as e:
logger.error(f"Spotify: failed to fetch secret: {e}")
return None
async def _get_token(self) -> Optional[str]:
if self._cached_token and time.time() < self._token_expires_at - 30: if self._cached_token and time.time() < self._token_expires_at - 30:
logger.debug("Spotify: using in-memory cached token") logger.debug("Spotify: using in-memory cached token")
return self._cached_token return self._cached_token
# 2. Disk cache
disk_token = self._load_cached_token() disk_token = self._load_cached_token()
if disk_token and time.time() < self._token_expires_at - 30: if disk_token and time.time() < self._token_expires_at - 30:
return disk_token return disk_token
# 3. Fetch new token
if not SPOTIFY_SP_DC: if not SPOTIFY_SP_DC:
logger.error( logger.error(
"Spotify: SPOTIFY_SP_DC env var not set — " "Spotify: SPOTIFY_SP_DC env var not set — cannot authenticate with Spotify"
"cannot authenticate with Spotify"
) )
return None return None
@@ -199,12 +202,12 @@ class SpotifyFetcher(BaseFetcher):
"Cookie": f"sp_dc={SPOTIFY_SP_DC}", "Cookie": f"sp_dc={SPOTIFY_SP_DC}",
} }
with httpx.Client(headers=headers) as client: async with httpx.AsyncClient(headers=headers) as client:
server_time = self._get_server_time(client) server_time = await self._get_server_time(client)
if server_time is None: if server_time is None:
return None return None
secret_data = self._get_secret(client) secret_data = await self._get_secret(client)
if secret_data is None: if secret_data is None:
return None return None
@@ -221,7 +224,9 @@ class SpotifyFetcher(BaseFetcher):
} }
try: try:
res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT) res = await client.get(
SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT
)
if res.status_code != 200: if res.status_code != 200:
logger.error(f"Spotify: token request returned {res.status_code}") logger.error(f"Spotify: token request returned {res.status_code}")
return None return None
@@ -249,7 +254,6 @@ class SpotifyFetcher(BaseFetcher):
self._token_expires_at = time.time() + 3600 self._token_expires_at = time.time() + 3600
self._cached_token = token self._cached_token = token
# Persist to disk (including anonymous tokens, same as Go ref)
self._save_token(body) self._save_token(body)
logger.debug("Spotify: obtained access token") logger.debug("Spotify: obtained access token")
return token return token
@@ -258,39 +262,16 @@ class SpotifyFetcher(BaseFetcher):
logger.error(f"Spotify: token request failed: {e}") logger.error(f"Spotify: token request failed: {e}")
return None return None
# ─── Lyrics ────────────────────────────────────────────────────── async def fetch(
@staticmethod
def _format_lrc_line(start_ms: int, words: str) -> str:
"""Format a single lyric line as LRC ``[mm:ss.cc]text``."""
minutes = start_ms // 60000
seconds = (start_ms // 1000) % 60
centiseconds = round((start_ms % 1000) / 10.0)
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
@staticmethod
def _is_truly_synced(lines: list[dict]) -> bool:
"""Check if lyrics are actually synced (not all timestamps zero)."""
for line in lines:
try:
ms = int(line.get("startTimeMs", "0"))
if ms > 0:
return True
except (ValueError, TypeError):
continue
return False
def fetch(
self, track: TrackMeta, bypass_cache: bool = False self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]: ) -> Optional[LyricResult]:
"""Fetch lyrics for a Spotify track by its track ID."""
if not track.trackid: if not track.trackid:
logger.debug("Spotify: skipped — no trackid in metadata") logger.debug("Spotify: skipped — no trackid in metadata")
return None return None
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}") logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
token = self._get_token() token = await self._get_token()
if not token: if not token:
logger.error("Spotify: cannot fetch lyrics without a token") logger.error("Spotify: cannot fetch lyrics without a token")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
@@ -307,8 +288,8 @@ class SpotifyFetcher(BaseFetcher):
} }
try: try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
res = client.get(url, headers=headers) res = await client.get(url, headers=headers)
if res.status_code == 404: if res.status_code == 404:
logger.debug(f"Spotify: 404 for trackid={track.trackid}") logger.debug(f"Spotify: 404 for trackid={track.trackid}")
@@ -322,7 +303,6 @@ class SpotifyFetcher(BaseFetcher):
data = res.json() data = res.json()
# Validate response structure
if not isinstance(data, dict) or "lyrics" not in data: if not isinstance(data, dict) or "lyrics" not in data:
logger.error("Spotify: unexpected lyrics response structure") logger.error("Spotify: unexpected lyrics response structure")
return LyricResult( return LyricResult(
@@ -337,11 +317,8 @@ class SpotifyFetcher(BaseFetcher):
logger.debug("Spotify: response contained no lyric lines") logger.debug("Spotify: response contained no lyric lines")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
# Determine sync status
# syncType == "LINE_SYNCED" AND at least one non-zero timestamp
is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines) is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
# Convert to LRC
lrc_lines: list[str] = [] lrc_lines: list[str] = []
for line in lines: for line in lines:
words = line.get("words", "") words = line.get("words", "")
@@ -355,7 +332,6 @@ class SpotifyFetcher(BaseFetcher):
if is_synced: if is_synced:
lrc_lines.append(self._format_lrc_line(ms, words)) lrc_lines.append(self._format_lrc_line(ms, words))
else: else:
# Unsynced: emit with zero timestamps
lrc_lines.append(f"[00:00.00]{words}") lrc_lines.append(f"[00:00.00]{words}")
content = LRCData("\n".join(lrc_lines)) content = LRCData("\n".join(lrc_lines))
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "lrx-cli" name = "lrx-cli"
version = "0.3.5" version = "0.4.0"
description = "Fetch line-synced lyrics for your music player." description = "Fetch line-synced lyrics for your music player."
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
+170
View File
@@ -0,0 +1,170 @@
from __future__ import annotations
import asyncio
from unittest.mock import patch
from lrx_cli.config import HIGH_CONFIDENCE
from lrx_cli.core import LrcManager
from lrx_cli.fetchers.base import BaseFetcher
from lrx_cli.lrc import LRCData
from lrx_cli.models import CacheStatus, LyricResult, TrackMeta
# Helpers
def _track(**kwargs) -> TrackMeta:
defaults = dict(artist="Artist", title="Song", album="Album", length=180000)
defaults.update(kwargs)
return TrackMeta(**defaults) # type: ignore
def _synced(source: str, confidence: float = HIGH_CONFIDENCE) -> LyricResult:
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=LRCData("[00:01.00]lyrics"),
source=source,
confidence=confidence,
)
def _unsynced(source: str, confidence: float = 60.0) -> LyricResult:
return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=LRCData("lyrics"),
source=source,
confidence=confidence,
)
def _not_found() -> LyricResult:
return LyricResult(status=CacheStatus.NOT_FOUND)
class MockFetcher(BaseFetcher):
def __init__(self, name: str, result: LyricResult | None, delay: float = 0.0):
self._name = name
self._result = result
self._delay = delay
self.called = False
self.completed = False
@property
def source_name(self) -> str:
return self._name
def is_available(self, track: TrackMeta) -> bool:
return True
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> LyricResult | None:
self.called = True
try:
if self._delay:
await asyncio.sleep(self._delay)
self.completed = True
return self._result
except asyncio.CancelledError:
raise
def make_manager(tmp_path) -> LrcManager:
return LrcManager(db_path=str(tmp_path / "cache.db"))
# Between-group termination
def test_unsynced_does_not_stop_next_group(tmp_path):
"""Unsynced result should not stop the pipeline — next group must still run."""
a = MockFetcher("a", _unsynced("a"))
b = MockFetcher("b", _synced("b"))
manager = make_manager(tmp_path)
with patch("lrx_cli.core.build_plan", return_value=[[a], [b]]):
result = manager.fetch_for_track(_track())
assert b.called
assert result is not None
assert result.source == "b"
def test_trusted_synced_stops_next_group(tmp_path):
"""Trusted synced from group1 must prevent group2 from running."""
a = MockFetcher("a", _synced("a"))
b = MockFetcher("b", _synced("b"))
manager = make_manager(tmp_path)
with patch("lrx_cli.core.build_plan", return_value=[[a], [b]]):
result = manager.fetch_for_track(_track())
assert not b.called
assert result is not None
assert result.source == "a"
def test_negative_continues_next_group(tmp_path):
"""NOT_FOUND from group1 must cause group2 to be tried."""
a = MockFetcher("a", _not_found())
b = MockFetcher("b", _synced("b"))
manager = make_manager(tmp_path)
with patch("lrx_cli.core.build_plan", return_value=[[a], [b]]):
result = manager.fetch_for_track(_track())
assert a.called
assert b.called
assert result is not None
assert result.source == "b"
# Within-group behaviour
def test_trusted_synced_cancels_sibling(tmp_path):
"""When a fast fetcher returns trusted synced, the slow sibling must be cancelled.
If cancellation is broken this test will block for 10 seconds."""
fast = MockFetcher("fast", _synced("fast"))
slow = MockFetcher("slow", _synced("slow"), delay=10.0)
manager = make_manager(tmp_path)
with patch("lrx_cli.core.build_plan", return_value=[[fast, slow]]):
result = manager.fetch_for_track(_track())
assert fast.called
assert slow.called # task was started
assert not slow.completed # but cancelled before finishing
assert result is not None
assert result.source == "fast"
def test_best_confidence_within_group(tmp_path):
"""When no trusted synced result, highest-confidence result from group is returned."""
low = MockFetcher("low", _unsynced("low", confidence=40.0))
high = MockFetcher("high", _unsynced("high", confidence=70.0))
manager = make_manager(tmp_path)
with patch("lrx_cli.core.build_plan", return_value=[[low, high]]):
result = manager.fetch_for_track(_track())
assert result is not None
assert result.source == "high"
# Cache interaction
def test_cache_negative_skips_fetch(tmp_path):
"""A cached NOT_FOUND entry must prevent the fetcher from being called."""
fetcher = MockFetcher("src", _synced("src"))
manager = make_manager(tmp_path)
track = _track()
manager.cache.set(track, "src", _not_found(), ttl_seconds=3600)
with patch("lrx_cli.core.build_plan", return_value=[[fetcher]]):
result = manager.fetch_for_track(track)
assert not fetcher.called
assert result is None
def test_cache_trusted_synced_no_fetch(tmp_path):
"""A trusted synced cache hit must be returned without calling the fetcher."""
fetcher = MockFetcher("src", None)
manager = make_manager(tmp_path)
track = _track()
manager.cache.set(track, "src", _synced("src"), ttl_seconds=3600)
with patch("lrx_cli.core.build_plan", return_value=[[fetcher]]):
result = manager.fetch_for_track(track)
assert not fetcher.called
assert result is not None
assert result.status == CacheStatus.SUCCESS_SYNCED
Generated
+1 -1
View File
@@ -153,7 +153,7 @@ wheels = [
[[package]] [[package]]
name = "lrx-cli" name = "lrx-cli"
version = "0.3.3" version = "0.4.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "cyclopts" }, { name = "cyclopts" },