test: add api fixtures

This commit is contained in:
2026-04-10 08:45:24 +02:00
parent 60732f2986
commit 0f1f5b418a
23 changed files with 1324 additions and 483 deletions
+55 -36
View File
@@ -21,6 +21,38 @@ from ..config import (
_LRCLIB_API_URL = "https://lrclib.net/api/get"
def _parse_lrclib_response(data: dict) -> FetchResult:
"""Parse LRCLIB JSON response into synced/unsynced fetch result."""
synced = data.get("syncedLyrics")
unsynced = data.get("plainLyrics")
res_synced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
res_unsynced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
if isinstance(synced, str) and synced.strip():
lyrics = LRCData(synced)
res_synced = LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source="lrclib",
)
if isinstance(unsynced, str) and unsynced.strip():
lyrics = LRCData(unsynced)
res_unsynced = LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source="lrclib",
ttl=TTL_UNSYNCED,
)
return FetchResult(synced=res_synced, unsynced=res_unsynced)
class LrclibFetcher(BaseFetcher):
@property
def source_name(self) -> str:
@@ -29,12 +61,12 @@ class LrclibFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return track.is_complete
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
"""Fetch lyrics from LRCLIB. Requires complete metadata."""
if not track.is_complete:
logger.debug("LRCLIB: skipped — incomplete metadata")
return FetchResult()
async def _api_get(
self,
client: httpx.AsyncClient,
track: TrackMeta,
) -> httpx.Response:
"""Issue one LRCLIB get request using the same path as production fetch."""
params = {
"track_name": track.title,
"artist_name": track.artist,
@@ -42,11 +74,19 @@ class LrclibFetcher(BaseFetcher):
"duration": track.length / 1000.0 if track.length else 0,
}
url = f"{_LRCLIB_API_URL}?{urlencode(params)}"
return await client.get(url, headers={"User-Agent": UA_LRX})
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
"""Fetch lyrics from LRCLIB. Requires complete metadata."""
if not track.is_complete:
logger.debug("LRCLIB: skipped — incomplete metadata")
return FetchResult()
logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
try:
async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.get(url, headers={"User-Agent": UA_LRX})
resp = await self._api_get(client, track)
if resp.status_code == 404:
logger.debug(f"LRCLIB: not found for {track.display_name()}")
@@ -60,37 +100,16 @@ class LrclibFetcher(BaseFetcher):
if not isinstance(data, dict):
logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
return FetchResult.from_network_error()
synced = data.get("syncedLyrics")
unsynced = data.get("plainLyrics")
res_synced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
res_unsynced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
if isinstance(synced, str) and synced.strip():
lyrics = LRCData(synced)
logger.info(f"LRCLIB: got synced lyrics ({len(lyrics)} lines)")
res_synced = LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
result = _parse_lrclib_response(data)
if result.synced and result.synced.lyrics:
logger.info(
f"LRCLIB: got synced lyrics ({len(result.synced.lyrics)} lines)"
)
if isinstance(unsynced, str) and unsynced.strip():
lyrics = LRCData(unsynced)
logger.info(f"LRCLIB: got unsynced lyrics ({len(lyrics)} lines)")
res_unsynced = LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source=self.source_name,
ttl=TTL_UNSYNCED,
if result.unsynced and result.unsynced.lyrics:
logger.info(
f"LRCLIB: got unsynced lyrics ({len(result.unsynced.lyrics)} lines)"
)
return FetchResult(synced=res_synced, unsynced=res_unsynced)
return result
except httpx.HTTPError as e:
logger.error(f"LRCLIB: HTTP error: {e}")
+68 -49
View File
@@ -23,6 +23,24 @@ from ..config import (
_LRCLIB_SEARCH_URL = "https://lrclib.net/api/search"
def _parse_lrclib_search_results(items: list[dict]) -> list[SearchCandidate[dict]]:
"""Map LRCLIB search JSON items to normalized SearchCandidate entries."""
return [
SearchCandidate(
item=item,
duration_ms=item["duration"] * 1000
if isinstance(item.get("duration"), (int, float))
else None,
is_synced=isinstance(item.get("syncedLyrics"), str)
and bool(item["syncedLyrics"].strip()),
title=item.get("trackName"),
artist=item.get("artistName"),
album=item.get("albumName"),
)
for item in items
]
class LrclibSearchFetcher(BaseFetcher):
@property
def source_name(self) -> str:
@@ -59,49 +77,63 @@ class LrclibSearchFetcher(BaseFetcher):
return queries
async def _api_query(
self,
client: httpx.AsyncClient,
params: dict[str, str],
) -> tuple[list[dict], bool]:
"""Issue one LRCLIB search query using production request path."""
url = f"{_LRCLIB_SEARCH_URL}?{urlencode(params)}"
logger.debug(f"LRCLIB-search: query {params}")
try:
resp = await client.get(url, headers={"User-Agent": UA_LRX})
except httpx.HTTPError as e:
logger.error(f"LRCLIB-search: HTTP error: {e}")
return [], True
if resp.status_code != 200:
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
return [], True
data = resp.json()
if not isinstance(data, list):
return [], False
return [item for item in data if isinstance(item, dict)], False
async def _api_candidates(
self,
client: httpx.AsyncClient,
track: TrackMeta,
) -> tuple[list[dict], bool]:
"""Request and merge LRCLIB-search candidates using built-in query strategy."""
queries = self._build_queries(track)
all_results = await asyncio.gather(
*(self._api_query(client, p) for p in queries)
)
seen_ids: set[int] = set()
candidates: list[dict] = []
had_error = False
for items, err in all_results:
if err:
had_error = True
for item in items:
item_id = item.get("id")
if item_id is not None and item_id in seen_ids:
continue
if item_id is not None:
seen_ids.add(item_id)
candidates.append(item)
return candidates, had_error
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if not track.title:
logger.debug("LRCLIB-search: skipped — no title")
return FetchResult()
queries = self._build_queries(track)
logger.info(f"LRCLIB-search: searching for {track.display_name()}")
seen_ids: set[int] = set()
candidates: list[dict] = []
had_error = False
try:
async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
async def _query(params: dict[str, str]) -> tuple[list[dict], bool]:
url = f"{_LRCLIB_SEARCH_URL}?{urlencode(params)}"
logger.debug(f"LRCLIB-search: query {params}")
try:
resp = await client.get(url, headers={"User-Agent": UA_LRX})
except httpx.HTTPError as e:
logger.error(f"LRCLIB-search: HTTP error: {e}")
return [], True
if resp.status_code != 200:
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
return [], True
data = resp.json()
if not isinstance(data, list):
return [], False
return [item for item in data if isinstance(item, dict)], False
all_results = await asyncio.gather(*(_query(p) for p in queries))
for items, err in all_results:
if err:
had_error = True
for item in items:
item_id = item.get("id")
if item_id is not None and item_id in seen_ids:
continue
if item_id is not None:
seen_ids.add(item_id)
candidates.append(item)
candidates, had_error = await self._api_candidates(client, track)
if not candidates:
if had_error:
@@ -111,23 +143,10 @@ class LrclibSearchFetcher(BaseFetcher):
logger.debug(
f"LRCLIB-search: got {len(candidates)} unique candidates "
f"from {len(queries)} queries"
f"from {len(self._build_queries(track))} queries"
)
mapped = [
SearchCandidate(
item=item,
duration_ms=item["duration"] * 1000
if isinstance(item.get("duration"), (int, float))
else None,
is_synced=isinstance(item.get("syncedLyrics"), str)
and bool(item["syncedLyrics"].strip()),
title=item.get("trackName"),
artist=item.get("artistName"),
album=item.get("albumName"),
)
for item in candidates
]
mapped = _parse_lrclib_search_results(candidates)
best, confidence = select_best(
mapped,
track.length,
+113 -53
View File
@@ -83,21 +83,8 @@ def _parse_subtitle(body: str) -> Optional[str]:
return None
async def _fetch_macro(
auth: MusixmatchAuthenticator,
params: dict,
) -> Optional[LRCData]:
"""Call macro.subtitles.get via auth.get_json.
Returns LRCData (richsync preferred over subtitle), or None when no usable
lyrics are found. Raises on HTTP/network errors.
"""
logger.debug(f"Musixmatch: macro call with {list(params.keys())}")
data = await auth.get_json(_MUSIXMATCH_MACRO_URL, {**_MXM_MACRO_PARAMS, **params})
if data is None:
return None
# Musixmatch returns body=[] (not {}) when the track is not found
def _parse_mxm_macro(data: dict) -> LRCData | None:
"""Parse macro.subtitles.get payload into LRCData (richsync preferred)."""
body = data.get("message", {}).get("body", {})
if not isinstance(body, dict):
return None
@@ -105,7 +92,6 @@ async def _fetch_macro(
if not isinstance(macro_calls, dict):
return None
# Prefer richsync (word-level timing)
richsync_msg = macro_calls.get("track.richsync.get", {}).get("message", {})
if (
isinstance(richsync_msg, dict)
@@ -119,10 +105,8 @@ async def _fetch_macro(
if lrc_text:
lrc = LRCData(lrc_text)
if lrc:
logger.debug("Musixmatch: got richsync lyrics")
return lrc
# Fall back to subtitle (line-level timing)
subtitle_msg = macro_calls.get("track.subtitles.get", {}).get("message", {})
if (
isinstance(subtitle_msg, dict)
@@ -136,13 +120,36 @@ async def _fetch_macro(
if lrc_text:
lrc = LRCData(lrc_text)
if lrc:
logger.debug("Musixmatch: got subtitle lyrics")
return lrc
logger.debug("Musixmatch: no usable lyrics in macro response")
return None
def _parse_mxm_search(data: dict) -> list[SearchCandidate[int]]:
"""Parse track.search payload to normalized candidates."""
track_list = data.get("message", {}).get("body", {}).get("track_list", [])
if not isinstance(track_list, list) or not track_list:
return []
return [
SearchCandidate(
item=int(t["commontrack_id"]),
duration_ms=(
float(t["track_length"]) * 1000 if t.get("track_length") else None
),
is_synced=bool(t.get("has_subtitles") or t.get("has_richsync")),
title=t.get("track_name"),
artist=t.get("artist_name"),
album=t.get("album_name"),
)
for item in track_list
if isinstance(item, dict)
and isinstance(t := item.get("track", {}), dict)
and isinstance(t.get("commontrack_id"), int)
and not t.get("instrumental")
]
class MusixmatchSpotifyFetcher(BaseFetcher):
"""Direct lookup by Spotify track ID — no search, single request."""
@@ -158,14 +165,36 @@ class MusixmatchSpotifyFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and not self._auth.is_cooldown()
async def _api_macro(self, params: dict) -> dict | None:
"""Request macro payload through authenticator using production path."""
return await self._auth.get_json(
_MUSIXMATCH_MACRO_URL, {**_MXM_MACRO_PARAMS, **params}
)
async def _api_macro_track(self, track: TrackMeta) -> dict | None:
"""Request macro payload for one track using Spotify ID lookup path."""
if not track.trackid:
return None
return await self._api_macro({"track_spotify_id": track.trackid})
async def _fetch_macro(self, params: dict) -> LRCData | None:
"""Request and parse Musixmatch macro lyrics payload."""
logger.debug(f"Musixmatch: macro call with {list(params.keys())}")
data = await self._api_macro(params)
if data is None:
return None
lrc = _parse_mxm_macro(data)
if lrc is None:
logger.debug("Musixmatch: no usable lyrics in macro response")
return None
logger.debug("Musixmatch: parsed macro lyrics")
return lrc
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
logger.info(f"Musixmatch-Spotify: fetching lyrics for {track.display_name()}")
try:
lrc = await _fetch_macro(
self._auth,
{"track_spotify_id": track.trackid}, # type: ignore[dict-item]
)
lrc = await self._fetch_macro({"track_spotify_id": track.trackid}) # type: ignore[dict-item]
except AttributeError:
return FetchResult.from_not_found()
except Exception as e:
@@ -210,9 +239,13 @@ class MusixmatchFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and not self._auth.is_cooldown()
async def _search(self, track: TrackMeta) -> tuple[Optional[int], float]:
"""Search for track metadata. Raises on network/HTTP errors."""
params: dict = {
async def _api_search(self, params: dict) -> dict | None:
"""Request search payload through authenticator using production path."""
return await self._auth.get_json(_MUSIXMATCH_SEARCH_URL, params)
def _build_search_params(self, track: TrackMeta) -> dict[str, str]:
"""Build Musixmatch search params for one track."""
params: dict[str, str] = {
"q_track": track.title or "",
"page_size": "10",
"f_has_lyrics": "1",
@@ -221,36 +254,66 @@ class MusixmatchFetcher(BaseFetcher):
params["q_artist"] = track.artist
if track.album:
params["q_album"] = track.album
return params
async def _api_search_track(self, track: TrackMeta) -> dict | None:
"""Request search payload for one track using production path."""
return await self._api_search(self._build_search_params(track))
async def _api_macro(self, params: dict) -> dict | None:
"""Request macro payload through authenticator using production path."""
return await self._auth.get_json(
_MUSIXMATCH_MACRO_URL, {**_MXM_MACRO_PARAMS, **params}
)
async def _api_macro_track(self, track: TrackMeta) -> dict | None:
"""Request macro payload for top-ranked search candidate of one track."""
search_data = await self._api_search_track(track)
if search_data is None:
return None
candidates = _parse_mxm_search(search_data)
if not candidates:
return None
commontrack_id, _confidence = select_best(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if commontrack_id is None:
return None
return await self._api_macro({"commontrack_id": str(commontrack_id)})
async def _fetch_macro(self, params: dict) -> LRCData | None:
"""Request and parse Musixmatch macro lyrics payload."""
logger.debug(f"Musixmatch: macro call with {list(params.keys())}")
data = await self._api_macro(params)
if data is None:
return None
lrc = _parse_mxm_macro(data)
if lrc is None:
logger.debug("Musixmatch: no usable lyrics in macro response")
return None
logger.debug("Musixmatch: parsed macro lyrics")
return lrc
async def _search(self, track: TrackMeta) -> tuple[Optional[int], float]:
"""Search for track metadata. Raises on network/HTTP errors."""
logger.debug(f"Musixmatch: searching for '{track.display_name()}'")
data = await self._auth.get_json(_MUSIXMATCH_SEARCH_URL, params)
data = await self._api_search_track(track)
if data is None:
return None, 0.0
track_list = data.get("message", {}).get("body", {}).get("track_list", [])
if not isinstance(track_list, list) or not track_list:
candidates = _parse_mxm_search(data)
if not candidates:
logger.debug("Musixmatch: search returned 0 results")
return None, 0.0
logger.debug(f"Musixmatch: search returned {len(track_list)} candidates")
candidates = [
SearchCandidate(
item=int(t["commontrack_id"]),
duration_ms=(
float(t["track_length"]) * 1000 if t.get("track_length") else None
),
is_synced=bool(t.get("has_subtitles") or t.get("has_richsync")),
title=t.get("track_name"),
artist=t.get("artist_name"),
album=t.get("album_name"),
)
for item in track_list
if isinstance(item, dict)
and isinstance(t := item.get("track", {}), dict)
and isinstance(t.get("commontrack_id"), int)
and not t.get("instrumental")
]
logger.debug(f"Musixmatch: search returned {len(candidates)} candidates")
best_id, confidence = select_best(
candidates,
@@ -274,10 +337,7 @@ class MusixmatchFetcher(BaseFetcher):
logger.debug(f"Musixmatch: no match found for {track.display_name()}")
return FetchResult.from_not_found()
lrc = await _fetch_macro(
self._auth,
{"commontrack_id": str(commontrack_id)},
)
lrc = await self._fetch_macro({"commontrack_id": str(commontrack_id)})
except AttributeError:
return FetchResult.from_not_found()
except Exception as e:
+129 -66
View File
@@ -30,6 +30,42 @@ _NETEASE_BASE_HEADERS = {
}
def _parse_netease_search(data: dict) -> list[SearchCandidate[int]]:
"""Parse Netease search response into scored candidates."""
result_body = data.get("result")
if not isinstance(result_body, dict):
return []
songs = result_body.get("songs")
if not isinstance(songs, list) or len(songs) == 0:
return []
return [
SearchCandidate(
item=song_id,
duration_ms=float(song["dt"]) if isinstance(song.get("dt"), int) else None,
title=song.get("name"),
artist=", ".join(a.get("name", "") for a in song.get("ar", [])) or None,
album=(song.get("al") or {}).get("name"),
)
for song in songs
if isinstance(song, dict) and isinstance(song_id := song.get("id"), int)
]
def _parse_netease_lyrics(data: dict) -> LRCData | None:
"""Parse Netease lyric response to LRCData."""
lrc_obj = data.get("lrc")
if not isinstance(lrc_obj, dict):
return None
lrc = lrc_obj.get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
return None
return LRCData(lrc)
class NeteaseFetcher(BaseFetcher):
@property
def source_name(self) -> str:
@@ -38,6 +74,88 @@ class NeteaseFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
async def _api_search(
self,
client: httpx.AsyncClient,
query: str,
limit: int,
) -> dict | None:
"""Issue one Netease search request and return JSON payload."""
resp = await client.post(
_NETEASE_SEARCH_URL,
headers=_NETEASE_BASE_HEADERS,
data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
return None
return data
async def _api_search_track(
self,
client: httpx.AsyncClient,
track: TrackMeta,
limit: int,
) -> dict | None:
"""Request Netease search payload for one track using production query strategy."""
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
return None
return await self._api_search(client, query, limit)
async def _api_lyric(
self,
client: httpx.AsyncClient,
song_id: int,
) -> dict | None:
"""Issue one Netease lyric request and return JSON payload."""
resp = await client.post(
_NETEASE_LYRIC_URL,
headers=_NETEASE_BASE_HEADERS,
data={
"id": str(song_id),
"cp": "false",
"tv": "0",
"lv": "0",
"rv": "0",
"kv": "0",
"yv": "0",
"ytv": "0",
"yrv": "0",
},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
return None
return data
async def _api_lyric_track(
self,
client: httpx.AsyncClient,
track: TrackMeta,
limit: int,
) -> dict | None:
"""Request lyric payload for top-ranked candidate of a track."""
search_data = await self._api_search_track(client, track, limit)
if search_data is None:
return None
candidates = _parse_netease_search(search_data)
if not candidates:
return None
ranked = select_ranked(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if not ranked:
return None
top_song_id = ranked[0][0]
return await self._api_lyric(client, top_song_id)
async def _search(
self, track: TrackMeta, limit: int = 10
) -> list[tuple[int, float]]:
@@ -49,46 +167,18 @@ class NeteaseFetcher(BaseFetcher):
try:
async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.post(
_NETEASE_SEARCH_URL,
headers=_NETEASE_BASE_HEADERS,
data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
)
resp.raise_for_status()
result = resp.json()
result = await self._api_search_track(client, track, limit)
if not isinstance(result, dict):
logger.error(
f"Netease: search returned non-dict: {type(result).__name__}"
)
if result is None:
logger.error("Netease: search returned non-dict payload")
return []
result_body = result.get("result")
if not isinstance(result_body, dict):
logger.debug("Netease: search 'result' field missing or invalid")
return []
songs = result_body.get("songs")
if not isinstance(songs, list) or len(songs) == 0:
candidates = _parse_netease_search(result)
if not candidates:
logger.debug("Netease: search returned 0 results")
return []
logger.debug(f"Netease: search returned {len(songs)} candidates")
candidates = [
SearchCandidate(
item=song_id,
duration_ms=float(song["dt"])
if isinstance(song.get("dt"), int)
else None,
title=song.get("name"),
artist=", ".join(a.get("name", "") for a in song.get("ar", []))
or None,
album=(song.get("al") or {}).get("name"),
)
for song in songs
if isinstance(song, dict) and isinstance(song_id := song.get("id"), int)
]
logger.debug(f"Netease: search returned {len(candidates)} candidates")
ranked = select_ranked(
candidates,
track.length,
@@ -114,43 +204,16 @@ class NeteaseFetcher(BaseFetcher):
try:
async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.post(
_NETEASE_LYRIC_URL,
headers=_NETEASE_BASE_HEADERS,
data={
"id": str(song_id),
"cp": "false",
"tv": "0",
"lv": "0",
"rv": "0",
"kv": "0",
"yv": "0",
"ytv": "0",
"yrv": "0",
},
)
resp.raise_for_status()
data = resp.json()
data = await self._api_lyric(client, song_id)
if not isinstance(data, dict):
logger.error(
f"Netease: lyric response is not dict: {type(data).__name__}"
)
if data is None:
logger.error("Netease: lyric response is not dict")
return FetchResult.from_network_error()
lrc_obj = data.get("lrc")
if not isinstance(lrc_obj, dict):
logger.debug(
f"Netease: no 'lrc' object in response for song_id={song_id}"
)
return FetchResult.from_not_found()
lrc: str = lrc_obj.get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
lrcdata = _parse_netease_lyrics(data)
if lrcdata is None:
logger.debug(f"Netease: empty lyrics for song_id={song_id}")
return FetchResult.from_not_found()
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
logger.info(
f"Netease: got {status.value} lyrics for song_id={song_id} "
+134 -98
View File
@@ -10,11 +10,11 @@ Description: QQ Music fetcher via self-hosted API proxy.
"""
import asyncio
import httpx
from loguru import logger
from .base import BaseFetcher, FetchResult
from .selection import SearchCandidate, select_ranked
from ..authenticators import QQMusicAuthenticator
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
@@ -23,9 +23,40 @@ from ..config import (
MULTI_CANDIDATE_DELAY_S,
)
_QQ_MUSIC_API_SEARCH_ENDPOINT = "/api/search"
_QQ_MUSIC_API_LYRIC_ENDPOINT = "/api/lyric"
from ..authenticators import QQMusicAuthenticator
def _parse_qq_search(data: dict) -> list[SearchCandidate[str]]:
"""Parse QQMusic search response into normalized candidates."""
if data.get("code") != 0:
return []
songs = data.get("data", {}).get("list", [])
if not isinstance(songs, list):
return []
return [
SearchCandidate(
item=mid,
duration_ms=float(song["interval"]) * 1000
if isinstance(song.get("interval"), int)
else None,
title=song.get("name"),
artist=", ".join(s.get("name", "") for s in song.get("singer", [])) or None,
album=(song.get("album") or {}).get("name"),
)
for song in songs
if isinstance(song, dict) and isinstance(mid := song.get("mid"), str)
]
def _parse_qq_lyrics(data: dict) -> LRCData | None:
"""Parse QQMusic lyric response to LRCData."""
if data.get("code") != 0:
return None
lrc = data.get("data", {}).get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
return None
return LRCData(lrc)
class QQMusicFetcher(BaseFetcher):
@@ -41,119 +72,124 @@ class QQMusicFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and self._auth.is_configured()
async def _api_search(
self,
track: TrackMeta,
limit: int,
) -> dict | None:
"""Return raw QQMusic search payload for one track."""
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
return None
data = await self._auth.search(query, limit)
if not isinstance(data, dict):
return None
return data
async def _api_lyric(
self,
mid: str,
) -> dict | None:
"""Return raw QQMusic lyric payload for one song MID."""
data = await self._auth.get_lyric(mid)
if not isinstance(data, dict):
return None
return data
async def _api_lyric_track(
self,
track: TrackMeta,
limit: int,
) -> dict | None:
"""Return raw QQMusic lyric payload for top-ranked search candidate."""
search_data = await self._api_search(track, limit)
if search_data is None:
return None
candidates = _parse_qq_search(search_data)
if not candidates:
return None
ranked = select_ranked(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if not ranked:
return None
mid = ranked[0][0]
return await self._api_lyric(mid)
async def _search(
self, track: TrackMeta, limit: int = 10
) -> list[tuple[str, float]]:
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
search_data = await self._api_search(track, limit)
if search_data is None:
return []
query = f"{track.artist or ''} {track.title or ''}".strip()
logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
try:
async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.get(
f"{await self._auth.authenticate()}{_QQ_MUSIC_API_SEARCH_ENDPOINT}",
params={"keyword": query, "type": "song", "num": limit},
)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
logger.error(f"QQMusic: search API error: {data}")
return []
songs = data.get("data", {}).get("list", [])
if not songs:
logger.debug("QQMusic: search returned 0 results")
return []
logger.debug(f"QQMusic: search returned {len(songs)} candidates")
candidates = [
SearchCandidate(
item=mid,
duration_ms=float(song["interval"]) * 1000
if isinstance(song.get("interval"), int)
else None,
title=song.get("name"),
artist=", ".join(s.get("name", "") for s in song.get("singer", []))
or None,
album=(song.get("album") or {}).get("name"),
)
for song in songs
if isinstance(song, dict) and isinstance(mid := song.get("mid"), str)
]
ranked = select_ranked(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if ranked:
logger.debug(
"QQMusic: top candidates: "
+ ", ".join(f"mid={m} ({c:.0f})" for m, c in ranked)
)
else:
logger.debug("QQMusic: no suitable candidate found")
return ranked
except Exception as e:
logger.error(f"QQMusic: search failed: {e}")
candidates = _parse_qq_search(search_data)
if not candidates:
logger.debug("QQMusic: search returned 0 results")
return []
logger.debug(f"QQMusic: search returned {len(candidates)} candidates")
ranked = select_ranked(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if ranked:
logger.debug(
"QQMusic: top candidates: "
+ ", ".join(f"mid={m} ({c:.0f})" for m, c in ranked)
)
else:
logger.debug("QQMusic: no suitable candidate found")
return ranked
async def _get_lyric(self, mid: str, confidence: float = 0.0) -> FetchResult:
logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
data = await self._api_lyric(mid)
if data is None:
return FetchResult.from_network_error()
try:
async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
resp = await client.get(
f"{await self._auth.authenticate()}{_QQ_MUSIC_API_LYRIC_ENDPOINT}",
params={"mid": mid},
)
resp.raise_for_status()
data = resp.json()
lrcdata = _parse_qq_lyrics(data)
if lrcdata is None:
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
return FetchResult.from_not_found()
if data.get("code") != 0:
logger.error(f"QQMusic: lyric API error: {data}")
return FetchResult.from_network_error()
lrc = data.get("data", {}).get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
return FetchResult.from_not_found()
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} ({len(lrcdata)} lines)"
)
not_found = LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if status == CacheStatus.SUCCESS_SYNCED:
return FetchResult(
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
),
unsynced=not_found,
)
status = lrcdata.detect_sync_status()
logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} ({len(lrcdata)} lines)"
)
not_found = LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if status == CacheStatus.SUCCESS_SYNCED:
return FetchResult(
synced=not_found,
unsynced=LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
),
unsynced=not_found,
)
except Exception as e:
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
return FetchResult.from_network_error()
return FetchResult(
synced=not_found,
unsynced=LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
),
)
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if not self._auth.is_configured():
+84 -98
View File
@@ -4,16 +4,66 @@ Date: 2026-03-25 10:43:21
Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
"""
import httpx
from loguru import logger
from .base import BaseFetcher, FetchResult
from ..authenticators.spotify import SpotifyAuthenticator, SPOTIFY_BASE_HEADERS
from ..authenticators.spotify import SpotifyAuthenticator
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import GeneralConfig, TTL_NOT_FOUND
_SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
def _format_lrc_line(start_ms: int, words: str) -> str:
minutes = start_ms // 60000
seconds = (start_ms // 1000) % 60
centiseconds = round((start_ms % 1000) / 10.0)
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
def _is_truly_synced(lines: list[dict]) -> bool:
for line in lines:
try:
ms = int(line.get("startTimeMs", "0"))
if ms > 0:
return True
except (ValueError, TypeError):
continue
return False
def _parse_spotify_lyrics(data: dict) -> LRCData | None:
"""Parse Spotify color-lyrics payload to LRCData."""
lyrics_data = data.get("lyrics")
if not isinstance(lyrics_data, dict):
return None
sync_type = lyrics_data.get("syncType", "")
lines = lyrics_data.get("lines", [])
if not isinstance(lines, list) or len(lines) == 0:
return None
is_synced = sync_type == "LINE_SYNCED" and _is_truly_synced(lines)
lrc_lines: list[str] = []
for line in lines:
if not isinstance(line, dict):
continue
words = line.get("words", "")
if not isinstance(words, str):
continue
try:
ms = int(line.get("startTimeMs", "0"))
except (ValueError, TypeError):
ms = 0
if is_synced:
lrc_lines.append(_format_lrc_line(ms, words))
else:
lrc_lines.append(f"[00:00.00]{words}")
if not lrc_lines:
return None
return LRCData("\n".join(lrc_lines))
class SpotifyFetcher(BaseFetcher):
@@ -29,23 +79,14 @@ class SpotifyFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and self._auth.is_configured()
@staticmethod
def _format_lrc_line(start_ms: int, words: str) -> str:
minutes = start_ms // 60000
seconds = (start_ms // 1000) % 60
centiseconds = round((start_ms % 1000) / 10.0)
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
@staticmethod
def _is_truly_synced(lines: list[dict]) -> bool:
for line in lines:
try:
ms = int(line.get("startTimeMs", "0"))
if ms > 0:
return True
except (ValueError, TypeError):
continue
return False
async def _api_lyrics(self, track: TrackMeta) -> dict | None:
"""Return raw Spotify lyrics payload for one track using production auth path."""
if not track.trackid:
return None
data = await self._auth.get_lyrics(track.trackid)
if not isinstance(data, dict):
return None
return data
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if not track.trackid:
@@ -54,88 +95,33 @@ class SpotifyFetcher(BaseFetcher):
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
token = await self._auth.authenticate()
if not token:
logger.error("Spotify: cannot fetch lyrics without a token")
return FetchResult.from_network_error()
data = await self._api_lyrics(track)
if data is None:
logger.debug(f"Spotify: no lyrics payload for trackid={track.trackid}")
return FetchResult.from_not_found()
url = f"{_SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {token}",
**SPOTIFY_BASE_HEADERS,
}
content = _parse_spotify_lyrics(data)
if content is None:
logger.debug("Spotify: response contained no parseable lyric lines")
return FetchResult.from_not_found()
try:
async with httpx.AsyncClient(timeout=self._general.http_timeout) as client:
res = await client.get(url, headers=headers)
if res.status_code == 404:
logger.debug(f"Spotify: 404 for trackid={track.trackid}")
return FetchResult.from_not_found()
if res.status_code != 200:
logger.error(f"Spotify: lyrics API returned {res.status_code}")
return FetchResult.from_network_error()
data = res.json()
if not isinstance(data, dict) or "lyrics" not in data:
logger.error("Spotify: unexpected lyrics response structure")
return FetchResult.from_network_error()
lyrics_data = data["lyrics"]
sync_type = lyrics_data.get("syncType", "")
lines = lyrics_data.get("lines", [])
if not isinstance(lines, list) or len(lines) == 0:
logger.debug("Spotify: response contained no lyric lines")
return FetchResult.from_not_found()
is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
lrc_lines: list[str] = []
for line in lines:
words = line.get("words", "")
if not isinstance(words, str):
continue
try:
ms = int(line.get("startTimeMs", "0"))
except (ValueError, TypeError):
ms = 0
if is_synced:
lrc_lines.append(self._format_lrc_line(ms, words))
else:
lrc_lines.append(f"[00:00.00]{words}")
content = LRCData("\n".join(lrc_lines))
status = (
CacheStatus.SUCCESS_SYNCED
if is_synced
else CacheStatus.SUCCESS_UNSYNCED
)
logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)")
not_found = LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if is_synced:
return FetchResult(
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=content,
source=self.source_name,
),
unsynced=not_found,
)
status = content.detect_sync_status()
logger.info(f"Spotify: got {status.value} lyrics ({len(content)} lines)")
not_found = LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if status == CacheStatus.SUCCESS_SYNCED:
return FetchResult(
synced=not_found,
unsynced=LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=content,
source=self.source_name,
),
unsynced=not_found,
)
except Exception as e:
logger.error(f"Spotify: lyrics fetch failed: {e}")
return FetchResult.from_network_error()
return FetchResult(
synced=not_found,
unsynced=LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=content,
source=self.source_name,
),
)