init
This commit is contained in:
@@ -0,0 +1,16 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
from lrcfetch.models import TrackMeta, LyricResult
|
||||
|
||||
|
||||
class BaseFetcher(ABC):
|
||||
@property
|
||||
@abstractmethod
|
||||
def source_name(self) -> str:
|
||||
"""Name of the fetcher source."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def fetch(self, track: TrackMeta) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for the given track. Returns None if unable to fetch."""
|
||||
pass
|
||||
@@ -0,0 +1,99 @@
|
||||
"""Local fetcher — reads lyrics from .lrc sidecar files or embedded audio metadata.
|
||||
|
||||
Priority:
|
||||
1. Same-directory .lrc file (e.g. /path/to/track.lrc)
|
||||
2. Embedded lyrics in audio metadata (FLAC, MP3 USLT/SYLT tags)
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
from lrcfetch.models import TrackMeta, LyricResult, CacheStatus
|
||||
from lrcfetch.fetchers.base import BaseFetcher
|
||||
from mutagen._file import File
|
||||
from mutagen.flac import FLAC
|
||||
|
||||
# Matches LRC time tags like [00:12.34] or [01:23.456]
|
||||
_LRC_TIME_TAG_RE = re.compile(r"\[\d{2}:\d{2}\.\d{2,3}\]")
|
||||
# Matches time tags that are all zeros
|
||||
_ZERO_TIME_TAG_RE = re.compile(r"^\[00:00\.0{2,3}\]$")
|
||||
|
||||
|
||||
def _detect_sync_status(text: str) -> CacheStatus:
|
||||
"""Determine whether lyrics text contains meaningful LRC time tags.
|
||||
|
||||
Returns UNSYNCED if no tags exist or all tags are [00:00.00].
|
||||
"""
|
||||
tags = _LRC_TIME_TAG_RE.findall(text)
|
||||
if not tags:
|
||||
return CacheStatus.SUCCESS_UNSYNCED
|
||||
for tag in tags:
|
||||
if not _ZERO_TIME_TAG_RE.match(tag):
|
||||
return CacheStatus.SUCCESS_SYNCED
|
||||
return CacheStatus.SUCCESS_UNSYNCED
|
||||
|
||||
|
||||
class LocalFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "local"
|
||||
|
||||
def fetch(self, track: TrackMeta) -> Optional[LyricResult]:
|
||||
"""Attempt to read lyrics from local filesystem."""
|
||||
if not track.is_local or not track.url:
|
||||
return None
|
||||
|
||||
file_path = track.url.replace("file://", "", 1)
|
||||
if not os.path.exists(file_path):
|
||||
logger.debug(f"Local: file does not exist: {file_path}")
|
||||
return None
|
||||
|
||||
logger.info(f"Local: checking for lyrics near {file_path}")
|
||||
|
||||
# Sidecar .lrc file
|
||||
lrc_path = os.path.splitext(file_path)[0] + ".lrc"
|
||||
if os.path.exists(lrc_path):
|
||||
try:
|
||||
with open(lrc_path, "r", encoding="utf-8") as f:
|
||||
content = f.read().strip()
|
||||
if content:
|
||||
status = _detect_sync_status(content)
|
||||
logger.info(f"Local: found .lrc sidecar ({status.value})")
|
||||
return LyricResult(
|
||||
status=status, lyrics=content, source=self.source_name
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Local: error reading {lrc_path}: {e}")
|
||||
|
||||
# Embedded metadata
|
||||
try:
|
||||
audio = File(file_path)
|
||||
if audio is not None:
|
||||
lyrics = None
|
||||
|
||||
if isinstance(audio, FLAC):
|
||||
# FLAC stores lyrics in vorbis comment tags
|
||||
lyrics = (audio.get("lyrics") or audio.get("unsynclyrics") or [None])[0]
|
||||
elif hasattr(audio, "tags") and audio.tags:
|
||||
# MP3 / other: look for USLT or SYLT ID3 frames
|
||||
for key in audio.tags.keys():
|
||||
if key.startswith("USLT") or key.startswith("SYLT"):
|
||||
lyrics = str(audio.tags[key])
|
||||
break
|
||||
|
||||
if lyrics:
|
||||
status = _detect_sync_status(lyrics)
|
||||
logger.info(f"Local: found embedded lyrics ({status.value})")
|
||||
return LyricResult(
|
||||
status=status,
|
||||
lyrics=lyrics.strip(),
|
||||
source=f"{self.source_name} (embedded)",
|
||||
)
|
||||
else:
|
||||
logger.debug("Local: no embedded lyrics found")
|
||||
except Exception as e:
|
||||
logger.error(f"Local: error reading metadata for {file_path}: {e}")
|
||||
|
||||
logger.debug(f"Local: no lyrics found for {file_path}")
|
||||
return None
|
||||
@@ -0,0 +1,94 @@
|
||||
"""LRCLIB fetcher — queries lrclib.net for synced/plain lyrics.
|
||||
|
||||
Requires complete track metadata (artist, title, album, duration).
|
||||
"""
|
||||
|
||||
import httpx
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from lrcfetch.models import TrackMeta, LyricResult, CacheStatus
|
||||
from lrcfetch.fetchers.base import BaseFetcher
|
||||
from lrcfetch.config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_UNSYNCED,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
LRCLIB_API_URL,
|
||||
UA_LRCFETCH,
|
||||
)
|
||||
|
||||
|
||||
class LrclibFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "lrclib"
|
||||
|
||||
def fetch(self, track: TrackMeta) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics from LRCLIB. Requires complete metadata."""
|
||||
if not track.is_complete:
|
||||
logger.debug("LRCLIB: skipped — incomplete metadata")
|
||||
return None
|
||||
|
||||
params = {
|
||||
"track_name": track.title,
|
||||
"artist_name": track.artist,
|
||||
"album_name": track.album,
|
||||
"duration": track.length / 1000.0 if track.length else 0,
|
||||
}
|
||||
|
||||
url = f"{LRCLIB_API_URL}?{urlencode(params)}"
|
||||
logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
|
||||
|
||||
if resp.status_code == 404:
|
||||
logger.debug(f"LRCLIB: not found for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"LRCLIB: API returned {resp.status_code}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
data = resp.json()
|
||||
|
||||
# Validate response
|
||||
if not isinstance(data, dict):
|
||||
logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
synced = data.get("syncedLyrics")
|
||||
unsynced = data.get("plainLyrics")
|
||||
|
||||
if isinstance(synced, str) and synced.strip():
|
||||
logger.info(
|
||||
f"LRCLIB: got synced lyrics ({len(synced.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_SYNCED,
|
||||
lyrics=synced.strip(),
|
||||
source=self.source_name,
|
||||
)
|
||||
elif isinstance(unsynced, str) and unsynced.strip():
|
||||
logger.info(
|
||||
f"LRCLIB: got unsynced lyrics ({len(unsynced.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_UNSYNCED,
|
||||
lyrics=unsynced.strip(),
|
||||
source=self.source_name,
|
||||
ttl=TTL_UNSYNCED,
|
||||
)
|
||||
else:
|
||||
logger.debug(f"LRCLIB: empty response for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"LRCLIB: HTTP error: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
except Exception as e:
|
||||
logger.error(f"LRCLIB: unexpected error: {e}")
|
||||
return None
|
||||
@@ -0,0 +1,136 @@
|
||||
"""LRCLIB search fetcher — fuzzy search via lrclib.net /api/search.
|
||||
|
||||
Used when metadata is incomplete (no album or duration) but title is available.
|
||||
Selects the best match by duration when track length is known.
|
||||
"""
|
||||
|
||||
import httpx
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from lrcfetch.models import TrackMeta, LyricResult, CacheStatus
|
||||
from lrcfetch.fetchers.base import BaseFetcher
|
||||
from lrcfetch.config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_UNSYNCED,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
LRCLIB_SEARCH_URL,
|
||||
UA_LRCFETCH,
|
||||
)
|
||||
|
||||
|
||||
class LrclibSearchFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "lrclib-search"
|
||||
|
||||
def fetch(self, track: TrackMeta) -> Optional[LyricResult]:
|
||||
"""Search LRCLIB for lyrics. Requires at least a title."""
|
||||
if not track.title:
|
||||
logger.debug("LRCLIB-search: skipped — no title")
|
||||
return None
|
||||
|
||||
params: dict[str, str] = {"track_name": track.title}
|
||||
if track.artist:
|
||||
params["artist_name"] = track.artist
|
||||
if track.album:
|
||||
params["album_name"] = track.album
|
||||
|
||||
url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
|
||||
logger.info(f"LRCLIB-search: searching for {track.display_name()}")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
data = resp.json()
|
||||
|
||||
if not isinstance(data, list) or len(data) == 0:
|
||||
logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
logger.debug(f"LRCLIB-search: got {len(data)} candidates")
|
||||
|
||||
# Select best match by duration
|
||||
best = self._select_best(data, track)
|
||||
if best is None:
|
||||
logger.debug("LRCLIB-search: no valid candidate found")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Extract lyrics
|
||||
synced = best.get("syncedLyrics")
|
||||
unsynced = best.get("plainLyrics")
|
||||
|
||||
if isinstance(synced, str) and synced.strip():
|
||||
logger.info(
|
||||
f"LRCLIB-search: got synced lyrics ({len(synced.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_SYNCED,
|
||||
lyrics=synced.strip(),
|
||||
source=self.source_name,
|
||||
)
|
||||
elif isinstance(unsynced, str) and unsynced.strip():
|
||||
logger.info(
|
||||
f"LRCLIB-search: got unsynced lyrics ({len(unsynced.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_UNSYNCED,
|
||||
lyrics=unsynced.strip(),
|
||||
source=self.source_name,
|
||||
ttl=TTL_UNSYNCED,
|
||||
)
|
||||
else:
|
||||
logger.debug("LRCLIB-search: best candidate has empty lyrics")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"LRCLIB-search: HTTP error: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
except Exception as e:
|
||||
logger.error(f"LRCLIB-search: unexpected error: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _select_best(candidates: list[dict], track: TrackMeta) -> Optional[dict]:
|
||||
"""Pick the best candidate, preferring synced lyrics and closest duration."""
|
||||
if track.length is not None:
|
||||
track_s = track.length / 1000.0
|
||||
best: Optional[dict] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for item in candidates:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
duration = item.get("duration")
|
||||
if not isinstance(duration, (int, float)):
|
||||
continue
|
||||
diff = abs(duration - track_s) * 1000 # compare in ms
|
||||
if diff > DURATION_TOLERANCE_MS:
|
||||
continue
|
||||
# Prefer synced over unsynced at similar duration
|
||||
has_synced = isinstance(item.get("syncedLyrics"), str) and item["syncedLyrics"].strip()
|
||||
best_synced = best is not None and isinstance(best.get("syncedLyrics"), str) and best["syncedLyrics"].strip()
|
||||
if diff < best_diff or (diff == best_diff and has_synced and not best_synced):
|
||||
best_diff = diff
|
||||
best = item
|
||||
|
||||
if best is not None:
|
||||
logger.debug(f"LRCLIB-search: selected id={best.get('id')} (diff={best_diff:.0f}ms)")
|
||||
return best
|
||||
|
||||
logger.debug(f"LRCLIB-search: no candidate within {DURATION_TOLERANCE_MS}ms")
|
||||
return None
|
||||
|
||||
# No duration — pick first with synced lyrics, or just first
|
||||
for item in candidates:
|
||||
if isinstance(item, dict) and isinstance(item.get("syncedLyrics"), str) and item["syncedLyrics"].strip():
|
||||
return item
|
||||
return candidates[0] if isinstance(candidates[0], dict) else None
|
||||
@@ -0,0 +1,216 @@
|
||||
"""Netease Cloud Music fetcher.
|
||||
|
||||
Uses the public cloudsearch API for searching and the song/lyric API for
|
||||
retrieving lyrics. No authentication required.
|
||||
|
||||
Search results are filtered by duration when the track has a known length
|
||||
to avoid returning lyrics for the wrong version of a song.
|
||||
"""
|
||||
|
||||
import re
|
||||
import httpx
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
from lrcfetch.models import TrackMeta, LyricResult, CacheStatus
|
||||
from lrcfetch.fetchers.base import BaseFetcher
|
||||
from lrcfetch.config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
NETEASE_SEARCH_URL,
|
||||
NETEASE_LYRIC_URL,
|
||||
UA_BROWSER,
|
||||
)
|
||||
|
||||
# Matches LRC time tags like [00:12.34] or [01:23.456]
|
||||
_LRC_TIME_TAG_RE = re.compile(r"\[\d{2}:\d{2}\.\d{2,3}\]")
|
||||
# Matches time tags that are all zeros: [00:00.00] or [00:00.000]
|
||||
_ZERO_TIME_TAG_RE = re.compile(r"^\[00:00\.0{2,3}\]")
|
||||
|
||||
_HEADERS = {
|
||||
"User-Agent": UA_BROWSER,
|
||||
"Referer": "https://music.163.com/",
|
||||
}
|
||||
|
||||
|
||||
def _is_synced_lrc(text: str) -> bool:
|
||||
"""Check whether *text* contains actual LRC time tags with non-zero times.
|
||||
|
||||
Returns False if:
|
||||
- No time tags at all
|
||||
- All time tags are [00:00.00] (unsynced disguised as synced)
|
||||
"""
|
||||
lines_with_tags = _LRC_TIME_TAG_RE.findall(text)
|
||||
if not lines_with_tags:
|
||||
return False
|
||||
# Check if ALL tags are zero — if so, it's unsynced
|
||||
for tag in lines_with_tags:
|
||||
if not _ZERO_TIME_TAG_RE.match(tag):
|
||||
return True # Found at least one non-zero tag
|
||||
return False
|
||||
|
||||
|
||||
class NeteaseFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "netease"
|
||||
|
||||
def _search(self, track: TrackMeta, limit: int = 10) -> Optional[int]:
|
||||
"""Search Netease and return the best-matching song ID.
|
||||
|
||||
When ``track.length`` is available, candidates are ranked by duration
|
||||
difference and only accepted if within ``DURATION_TOLERANCE_MS``.
|
||||
"""
|
||||
query = f"{track.artist or ''} {track.title or ''}".strip()
|
||||
if not query:
|
||||
return None
|
||||
|
||||
logger.debug(f"Netease: searching for '{query}' (limit={limit})")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.post(
|
||||
NETEASE_SEARCH_URL,
|
||||
headers=_HEADERS,
|
||||
data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
result = resp.json()
|
||||
|
||||
# Validate response
|
||||
if not isinstance(result, dict):
|
||||
logger.error(f"Netease: search returned non-dict: {type(result).__name__}")
|
||||
return None
|
||||
|
||||
result_body = result.get("result")
|
||||
if not isinstance(result_body, dict):
|
||||
logger.debug(f"Netease: search 'result' field missing or invalid")
|
||||
return None
|
||||
|
||||
songs = result_body.get("songs")
|
||||
if not isinstance(songs, list) or len(songs) == 0:
|
||||
logger.debug("Netease: search returned 0 results")
|
||||
return None
|
||||
|
||||
logger.debug(f"Netease: search returned {len(songs)} candidates")
|
||||
|
||||
# Duration-based best-match selection
|
||||
if track.length is not None:
|
||||
track_ms = track.length
|
||||
best_id: Optional[int] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for song in songs:
|
||||
if not isinstance(song, dict):
|
||||
continue
|
||||
sid = song.get("id")
|
||||
name = song.get("name", "?")
|
||||
duration = song.get("dt") # milliseconds
|
||||
if not isinstance(duration, int):
|
||||
logger.debug(f" candidate {sid} '{name}': no duration, skipped")
|
||||
continue
|
||||
diff = abs(duration - track_ms)
|
||||
logger.debug(
|
||||
f" candidate {sid} '{name}': "
|
||||
f"duration={duration}ms, diff={diff}ms"
|
||||
)
|
||||
if diff < best_diff:
|
||||
best_diff = diff
|
||||
best_id = sid
|
||||
|
||||
if best_id is not None and best_diff <= DURATION_TOLERANCE_MS:
|
||||
logger.debug(
|
||||
f"Netease: selected id={best_id} (diff={best_diff}ms)"
|
||||
)
|
||||
return best_id
|
||||
|
||||
logger.debug(
|
||||
f"Netease: no candidate within {DURATION_TOLERANCE_MS}ms "
|
||||
f"(best diff={best_diff}ms)"
|
||||
)
|
||||
return None
|
||||
|
||||
# No duration info — take the first result
|
||||
first = songs[0]
|
||||
if not isinstance(first, dict) or "id" not in first:
|
||||
logger.error("Netease: first search result has no 'id'")
|
||||
return None
|
||||
logger.debug(
|
||||
f"Netease: no duration available, using first result "
|
||||
f"id={first['id']} '{first.get('name', '?')}'"
|
||||
)
|
||||
return first["id"]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Netease: search failed: {e}")
|
||||
return None
|
||||
|
||||
def _get_lyric(self, song_id: int) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for a given Netease song ID."""
|
||||
logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.post(
|
||||
NETEASE_LYRIC_URL,
|
||||
headers=_HEADERS,
|
||||
data={
|
||||
"id": str(song_id),
|
||||
"cp": "false",
|
||||
"tv": "0",
|
||||
"lv": "0",
|
||||
"rv": "0",
|
||||
"kv": "0",
|
||||
"yv": "0",
|
||||
"ytv": "0",
|
||||
"yrv": "0",
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
# Validate response
|
||||
if not isinstance(data, dict):
|
||||
logger.error(f"Netease: lyric response is not dict: {type(data).__name__}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
lrc_obj = data.get("lrc")
|
||||
if not isinstance(lrc_obj, dict):
|
||||
logger.debug(f"Netease: no 'lrc' object in response for song_id={song_id}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
lrc: str = lrc_obj.get("lyric", "")
|
||||
if not isinstance(lrc, str) or not lrc.strip():
|
||||
logger.debug(f"Netease: empty lyrics for song_id={song_id}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Determine sync status
|
||||
synced = _is_synced_lrc(lrc)
|
||||
status = CacheStatus.SUCCESS_SYNCED if synced else CacheStatus.SUCCESS_UNSYNCED
|
||||
logger.info(
|
||||
f"Netease: got {status.value} lyrics for song_id={song_id} "
|
||||
f"({len(lrc.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status, lyrics=lrc.strip(), source=self.source_name
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
def fetch(self, track: TrackMeta) -> Optional[LyricResult]:
|
||||
"""Search for the track and fetch its lyrics."""
|
||||
query = f"{track.artist or ''} {track.title or ''}".strip()
|
||||
if not query:
|
||||
logger.debug("Netease: skipped — insufficient metadata")
|
||||
return None
|
||||
|
||||
logger.info(f"Netease: fetching lyrics for {track.display_name()}")
|
||||
song_id = self._search(track)
|
||||
if not song_id:
|
||||
logger.debug(f"Netease: no match found for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
return self._get_lyric(song_id)
|
||||
@@ -0,0 +1,323 @@
|
||||
"""Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
|
||||
|
||||
Authentication flow (mirrors spotify-lyrics Go implementation):
|
||||
1. Fetch server time from Spotify
|
||||
2. Fetch TOTP secret from xyloflake/spot-secrets-go
|
||||
3. Generate a TOTP code and exchange it (with SP_DC cookie) for an access token
|
||||
4. Request lyrics using the access token
|
||||
|
||||
The secret and token are cached on the instance to avoid redundant network
|
||||
calls within the same session.
|
||||
|
||||
Requires SPOTIFY_SP_DC environment variable to be set.
|
||||
"""
|
||||
|
||||
import httpx
|
||||
import time
|
||||
import struct
|
||||
import hmac
|
||||
import hashlib
|
||||
from typing import Optional, Tuple
|
||||
from loguru import logger
|
||||
|
||||
from lrcfetch.models import TrackMeta, LyricResult, CacheStatus
|
||||
from lrcfetch.fetchers.base import BaseFetcher
|
||||
from lrcfetch.config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
SPOTIFY_TOKEN_URL,
|
||||
SPOTIFY_LYRICS_URL,
|
||||
SPOTIFY_SERVER_TIME_URL,
|
||||
SPOTIFY_SECRET_URL,
|
||||
SPOTIFY_SP_DC,
|
||||
UA_BROWSER,
|
||||
)
|
||||
|
||||
|
||||
class SpotifyFetcher(BaseFetcher):
|
||||
def __init__(self) -> None:
|
||||
# Session-level caches to avoid refetching within the same run
|
||||
self._cached_secret: Optional[Tuple[str, int]] = None
|
||||
self._cached_token: Optional[str] = None
|
||||
self._token_expires_at: float = 0.0
|
||||
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "spotify"
|
||||
|
||||
# ─── Auth helpers ────────────────────────────────────────────────
|
||||
|
||||
def _get_server_time(self, client: httpx.Client) -> Optional[int]:
|
||||
"""Fetch Spotify's server timestamp (seconds since epoch)."""
|
||||
try:
|
||||
res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
|
||||
res.raise_for_status()
|
||||
data = res.json()
|
||||
if not isinstance(data, dict) or "serverTime" not in data:
|
||||
logger.error(f"Spotify: unexpected server-time response: {data}")
|
||||
return None
|
||||
server_time = data["serverTime"]
|
||||
logger.debug(f"Spotify: server time = {server_time}")
|
||||
return server_time
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: failed to fetch server time: {e}")
|
||||
return None
|
||||
|
||||
def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]:
|
||||
"""Fetch and decode the TOTP secret. Cached after first success.
|
||||
|
||||
Response format: [{version: int, secret: str}, ...]
|
||||
Each character in *secret* is XOR-decoded with ``(index % 33) + 9``.
|
||||
"""
|
||||
if self._cached_secret is not None:
|
||||
logger.debug("Spotify: using cached TOTP secret")
|
||||
return self._cached_secret
|
||||
|
||||
try:
|
||||
res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
|
||||
res.raise_for_status()
|
||||
data = res.json()
|
||||
|
||||
if not isinstance(data, list) or len(data) == 0:
|
||||
logger.error(
|
||||
f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})")
|
||||
return None
|
||||
|
||||
last = data[-1]
|
||||
if "secret" not in last or "version" not in last:
|
||||
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
|
||||
return None
|
||||
|
||||
secret_raw = last["secret"]
|
||||
version = last["version"]
|
||||
|
||||
# XOR decode
|
||||
parts = []
|
||||
for i, char in enumerate(secret_raw):
|
||||
parts.append(str(ord(char) ^ ((i % 33) + 9)))
|
||||
secret = "".join(parts)
|
||||
|
||||
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
|
||||
self._cached_secret = (secret, version)
|
||||
return self._cached_secret
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: failed to fetch secret: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _generate_totp(server_time_s: int, secret: str) -> str:
|
||||
"""Generate a 6-digit TOTP code compatible with Spotify's auth.
|
||||
|
||||
Uses HMAC-SHA1 with a 30-second period, matching the Go reference.
|
||||
"""
|
||||
counter = server_time_s // 30
|
||||
counter_bytes = struct.pack(">Q", counter)
|
||||
|
||||
mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
|
||||
|
||||
offset = mac[-1] & 0x0F
|
||||
binary_code = (
|
||||
(mac[offset] & 0x7F) << 24
|
||||
| (mac[offset + 1] & 0xFF) << 16
|
||||
| (mac[offset + 2] & 0xFF) << 8
|
||||
| (mac[offset + 3] & 0xFF)
|
||||
)
|
||||
|
||||
code = binary_code % (10**6)
|
||||
return str(code).zfill(6)
|
||||
|
||||
def _get_token(self) -> Optional[str]:
|
||||
"""Obtain a Spotify access token. Cached until expiry.
|
||||
|
||||
Requires SP_DC cookie (set via SPOTIFY_SP_DC env var).
|
||||
"""
|
||||
# Return cached token if still valid (with 30s safety margin)
|
||||
if self._cached_token and time.time() < self._token_expires_at - 30:
|
||||
logger.debug("Spotify: using cached access token")
|
||||
return self._cached_token
|
||||
|
||||
if not SPOTIFY_SP_DC:
|
||||
logger.error(
|
||||
"Spotify: SPOTIFY_SP_DC env var not set — "
|
||||
"cannot authenticate with Spotify"
|
||||
)
|
||||
return None
|
||||
|
||||
headers = {
|
||||
"User-Agent": UA_BROWSER,
|
||||
"Cookie": f"sp_dc={SPOTIFY_SP_DC}",
|
||||
}
|
||||
|
||||
with httpx.Client(headers=headers) as client:
|
||||
# Step 1: server time
|
||||
server_time = self._get_server_time(client)
|
||||
if server_time is None:
|
||||
return None
|
||||
|
||||
# Step 2: secret
|
||||
secret_data = self._get_secret(client)
|
||||
if secret_data is None:
|
||||
return None
|
||||
|
||||
secret, version = secret_data
|
||||
|
||||
# Step 3: TOTP
|
||||
totp = self._generate_totp(server_time, secret)
|
||||
logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
|
||||
|
||||
# Step 4: exchange for token
|
||||
params = {
|
||||
"reason": "transport",
|
||||
"productType": "web-player",
|
||||
"totp": totp,
|
||||
"totpVer": str(version),
|
||||
"ts": str(int(time.time())),
|
||||
}
|
||||
|
||||
try:
|
||||
res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT)
|
||||
if res.status_code != 200:
|
||||
logger.error(
|
||||
f"Spotify: token request returned {res.status_code}"
|
||||
)
|
||||
return None
|
||||
|
||||
body = res.json()
|
||||
|
||||
if not isinstance(body, dict) or "accessToken" not in body:
|
||||
logger.error(
|
||||
f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}")
|
||||
return None
|
||||
|
||||
token = body["accessToken"]
|
||||
is_anonymous = body.get("isAnonymous", False)
|
||||
if is_anonymous:
|
||||
logger.warning(
|
||||
"Spotify: received anonymous token — SP_DC may be invalid"
|
||||
)
|
||||
|
||||
# Cache with reported expiry
|
||||
expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
|
||||
if expires_ms and expires_ms > int(time.time() * 1000):
|
||||
self._token_expires_at = expires_ms / 1000.0
|
||||
else:
|
||||
logger.warning("Spotify: token expiry missing or invalid")
|
||||
self._token_expires_at = time.time() + 3600
|
||||
|
||||
self._cached_token = token
|
||||
logger.debug("Spotify: obtained access token")
|
||||
return token
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: token request failed: {e}")
|
||||
return None
|
||||
|
||||
# ─── Lyrics ──────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _format_lrc_line(start_ms: int, words: str) -> str:
|
||||
"""Format a single lyric line as LRC ``[mm:ss.cc]text``."""
|
||||
minutes = start_ms // 60000
|
||||
seconds = (start_ms // 1000) % 60
|
||||
centiseconds = round((start_ms % 1000) / 10.0)
|
||||
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
|
||||
|
||||
@staticmethod
|
||||
def _is_truly_synced(lines: list[dict]) -> bool:
|
||||
"""Check if lyrics are actually synced (not all timestamps zero)."""
|
||||
for line in lines:
|
||||
try:
|
||||
ms = int(line.get("startTimeMs", "0"))
|
||||
if ms > 0:
|
||||
return True
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
return False
|
||||
|
||||
def fetch(self, track: TrackMeta) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for a Spotify track by its track ID."""
|
||||
if not track.trackid:
|
||||
logger.debug("Spotify: skipped — no trackid in metadata")
|
||||
return None
|
||||
|
||||
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
|
||||
|
||||
token = self._get_token()
|
||||
if not token:
|
||||
logger.error("Spotify: cannot fetch lyrics without a token")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
url = f"{SPOTIFY_LYRICS_URL}{track.trackid}?format=json&market=from_token"
|
||||
headers = {
|
||||
"User-Agent": UA_BROWSER,
|
||||
"Authorization": f"Bearer {token}",
|
||||
"App-Platform": "WebPlayer",
|
||||
}
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
res = client.get(url, headers=headers)
|
||||
|
||||
if res.status_code == 404:
|
||||
logger.debug(f"Spotify: 404 for trackid={track.trackid}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
|
||||
)
|
||||
|
||||
if res.status_code != 200:
|
||||
logger.error(f"Spotify: lyrics API returned {res.status_code}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
data = res.json()
|
||||
|
||||
# Validate response structure
|
||||
if not isinstance(data, dict) or "lyrics" not in data:
|
||||
logger.error(f"Spotify: unexpected lyrics response structure")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
lyrics_data = data["lyrics"]
|
||||
sync_type = lyrics_data.get("syncType", "")
|
||||
lines = lyrics_data.get("lines", [])
|
||||
|
||||
if not isinstance(lines, list) or len(lines) == 0:
|
||||
logger.debug("Spotify: response contained no lyric lines")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Determine sync status
|
||||
# syncType == "LINE_SYNCED" AND at least one non-zero timestamp
|
||||
is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
|
||||
|
||||
# Convert to LRC
|
||||
lrc_lines: list[str] = []
|
||||
for line in lines:
|
||||
words = line.get("words", "")
|
||||
if not isinstance(words, str):
|
||||
continue
|
||||
try:
|
||||
ms = int(line.get("startTimeMs", "0"))
|
||||
except (ValueError, TypeError):
|
||||
ms = 0
|
||||
|
||||
if is_synced:
|
||||
lrc_lines.append(self._format_lrc_line(ms, words))
|
||||
else:
|
||||
# Unsynced: emit with zero timestamps
|
||||
lrc_lines.append(f"[00:00.00]{words}")
|
||||
|
||||
content = "\n".join(lrc_lines)
|
||||
status = CacheStatus.SUCCESS_SYNCED if is_synced else CacheStatus.SUCCESS_UNSYNCED
|
||||
|
||||
logger.info(
|
||||
f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)"
|
||||
)
|
||||
return LyricResult(status=status, lyrics=content, source=self.source_name)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: lyrics fetch failed: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
Reference in New Issue
Block a user