refactor: large scale refactor regarding selection & fetchers

This commit is contained in:
2026-04-06 13:37:51 +02:00
parent 69b7f5c60c
commit 0c85af534e
23 changed files with 794 additions and 364 deletions
+51 -32
View File
@@ -1,7 +1,8 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:18:03
Description: SQLite-based lyric cache with per-source storage and TTL expiration.
Description: SQLite-based lyric cache with per-source storage, TTL expiration,
and lightweight schema migrations (including confidence versioning).
"""
import json
@@ -13,11 +14,7 @@ from loguru import logger
from .lrc import LRCData
from .normalize import normalize_for_match as _normalize_for_match
from .config import (
DURATION_TOLERANCE_MS,
LEGACY_CONFIDENCE_SYNCED,
LEGACY_CONFIDENCE_UNSYNCED,
)
from .config import DURATION_TOLERANCE_MS, LEGACY_CONFIDENCE, CONFIDENCE_ALGO_VERSION
from .models import TrackMeta, LyricResult, CacheStatus
@@ -79,7 +76,14 @@ class CacheEngine:
self._init_db()
def _init_db(self) -> None:
"""Create or migrate the cache and credentials tables."""
"""Create or migrate cache schema and credentials table.
Migration notes:
- Add structural columns introduced after initial releases.
- When introducing confidence versioning, rebalance legacy unsynced
confidence (+10, capped at 100) and stamp migrated rows with the
current algorithm version.
"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
@@ -91,7 +95,10 @@ class CacheEngine:
expires_at INTEGER,
artist TEXT,
title TEXT,
album TEXT
album TEXT,
length INTEGER,
confidence REAL,
confidence_version INTEGER
)
""")
conn.execute("""
@@ -101,12 +108,28 @@ class CacheEngine:
expires_at INTEGER
)
""")
# Migrations
# Incremental, idempotent migrations for existing databases.
cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
if "length" not in cols:
conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER")
if "confidence" not in cols:
conn.execute("ALTER TABLE cache ADD COLUMN confidence REAL")
if "confidence_version" not in cols:
conn.execute("ALTER TABLE cache ADD COLUMN confidence_version INTEGER")
# First-time confidence-version migration: boost unsynced rows
# from older scoring assumptions while preserving upper bound.
conn.execute(
"""
UPDATE cache
SET confidence = MIN(100.0, COALESCE(confidence, ?) + 10.0)
WHERE status = ?
""",
(LEGACY_CONFIDENCE, CacheStatus.SUCCESS_UNSYNCED.value),
)
conn.execute(
"UPDATE cache SET confidence_version = ? WHERE confidence_version IS NULL",
(CONFIDENCE_ALGO_VERSION,),
)
conn.commit()
# Read
@@ -155,10 +178,8 @@ class CacheEngine:
)
status = CacheStatus(status_str)
if confidence is None:
if status == CacheStatus.SUCCESS_SYNCED:
confidence = LEGACY_CONFIDENCE_SYNCED
elif status == CacheStatus.SUCCESS_UNSYNCED:
confidence = LEGACY_CONFIDENCE_UNSYNCED
if status in (CacheStatus.SUCCESS_SYNCED, CacheStatus.SUCCESS_UNSYNCED):
confidence = LEGACY_CONFIDENCE
else:
confidence = 0.0 # negative statuses: no confidence
@@ -207,7 +228,11 @@ class CacheEngine:
result: LyricResult,
ttl_seconds: Optional[int] = None,
) -> None:
"""Store a lyric result in the cache."""
"""Store a lyric result in the cache.
New/updated rows are tagged with the current confidence algorithm
version so future migrations can be applied deterministically.
"""
try:
key = _generate_key(track, source)
except ValueError:
@@ -221,8 +246,8 @@ class CacheEngine:
conn.execute(
"""INSERT OR REPLACE INTO cache
(key, source, status, lyrics, created_at, expires_at,
artist, title, album, length, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
artist, title, album, length, confidence, confidence_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
key,
source,
@@ -235,6 +260,7 @@ class CacheEngine:
track.album,
track.length,
result.confidence,
CONFIDENCE_ALGO_VERSION,
),
)
conn.commit()
@@ -288,7 +314,9 @@ class CacheEngine:
# Exact cross-source search
def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]:
def find_best_positive(
self, track: TrackMeta, status: CacheStatus
) -> Optional[LyricResult]:
"""Find the best positive (synced/unsynced) cache entry for *track*.
Uses exact metadata match (artist + title + album) across all sources.
@@ -303,21 +331,16 @@ class CacheEngine:
rows = conn.execute(
f"SELECT status, lyrics, source, confidence FROM cache"
f" WHERE {_TRACK_WHERE}"
" AND status IN (?, ?)"
" AND status = ?"
" AND (expires_at IS NULL OR expires_at > ?)"
" ORDER BY COALESCE(confidence,"
" CASE status WHEN ? THEN ? ELSE ? END"
" ) DESC,"
" ORDER BY COALESCE(confidence, ?) DESC,"
" CASE status WHEN ? THEN 0 ELSE 1 END,"
" created_at DESC LIMIT 1",
" created_at DESC",
_track_where_params(track)
+ [
CacheStatus.SUCCESS_SYNCED.value,
CacheStatus.SUCCESS_UNSYNCED.value,
status.value,
now,
CacheStatus.SUCCESS_SYNCED.value,
LEGACY_CONFIDENCE_SYNCED,
LEGACY_CONFIDENCE_UNSYNCED,
LEGACY_CONFIDENCE,
CacheStatus.SUCCESS_SYNCED.value,
],
).fetchall()
@@ -328,11 +351,7 @@ class CacheEngine:
row = dict(rows[0])
confidence = row["confidence"]
if confidence is None:
confidence = (
LEGACY_CONFIDENCE_SYNCED
if row["status"] == CacheStatus.SUCCESS_SYNCED.value
else LEGACY_CONFIDENCE_UNSYNCED
)
confidence = LEGACY_CONFIDENCE
return LyricResult(
status=CacheStatus(row["status"]),
lyrics=LRCData(row["lyrics"]) if row["lyrics"] else None,
+35 -16
View File
@@ -14,7 +14,7 @@ import cyclopts
from loguru import logger
from .config import DB_PATH, enable_debug
from .models import TrackMeta, CacheStatus
from .models import TrackMeta
from .mpris import get_current_track
from .core import LrcManager
from .fetchers import FetcherMethodType
@@ -88,10 +88,12 @@ def fetch(
name="--no-cache", negative="", help="Bypass the cache for this request."
),
] = False,
only_synced: Annotated[
allow_unsynced: Annotated[
bool,
cyclopts.Parameter(
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
name="--allow-unsynced",
negative="",
help="Allow unsynced lyrics (will be displayed with all time tags set to [00:00.00]).",
),
] = False,
plain: Annotated[
@@ -110,16 +112,17 @@ def fetch(
logger.info(f"Track: {track.display_name()}")
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
result = manager.fetch_for_track(
track,
force_method=method,
bypass_cache=no_cache,
allow_unsynced=allow_unsynced,
)
if not result or not result.lyrics:
logger.error("No lyrics found.")
sys.exit(1)
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print(result.lyrics.to_lrc(plain=plain))
@@ -165,10 +168,12 @@ def search(
name="--no-cache", negative="", help="Bypass the cache for this request."
),
] = False,
only_synced: Annotated[
allow_unsynced: Annotated[
bool,
cyclopts.Parameter(
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
name="--allow-unsynced",
negative="",
help="Allow unsynced lyrics (will be displayed with all time tags set to [00:00.00]).",
),
] = False,
plain: Annotated[
@@ -198,16 +203,17 @@ def search(
logger.info(f"Track: {track.display_name()}")
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
result = manager.fetch_for_track(
track,
force_method=method,
bypass_cache=no_cache,
allow_unsynced=allow_unsynced,
)
if not result or not result.lyrics:
logger.error("No lyrics found.")
sys.exit(1)
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print(result.lyrics.to_lrc(plain=plain))
@@ -236,6 +242,14 @@ def export(
name=["--overwrite", "-f"], negative="", help="Overwrite existing file."
),
] = False,
allow_unsynced: Annotated[
bool,
cyclopts.Parameter(
name="--allow-unsynced",
negative="",
help="Allow unsynced lyrics (will be exported with all time tags set to [00:00.00] if --plain is not present).",
),
] = False,
plain: Annotated[
bool,
cyclopts.Parameter(
@@ -249,7 +263,12 @@ def export(
logger.error("No active playing track found.")
sys.exit(1)
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
result = manager.fetch_for_track(
track,
force_method=method,
bypass_cache=no_cache,
allow_unsynced=allow_unsynced,
)
if not result or not result.lyrics:
logger.error("No lyrics available to export.")
sys.exit(1)
+3 -3
View File
@@ -44,9 +44,10 @@ SCORE_W_ARTIST = 30.0
SCORE_W_ALBUM = 10.0
SCORE_W_DURATION = 10.0
SCORE_W_SYNCED = 10.0
CONFIDENCE_ALGO_VERSION = 1
# Confidence thresholds
MIN_CONFIDENCE = 25.0 # below this, candidate is rejected
MIN_CONFIDENCE = 40.0 # below this, candidate is rejected
HIGH_CONFIDENCE = 80.0 # at or above this, stop searching early
# Multi-candidate fetching
@@ -54,8 +55,7 @@ MULTI_CANDIDATE_LIMIT = 3 # max candidates to try per search-based fetcher
MULTI_CANDIDATE_DELAY_S = 0.2 # delay between sequential lyric fetches
# Legacy cache rows (no confidence stored) get a base score by sync status
LEGACY_CONFIDENCE_SYNCED = 50.0
LEGACY_CONFIDENCE_UNSYNCED = 40.0
LEGACY_CONFIDENCE = 50.0
# User-Agents
UA_BROWSER = "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0"
+129 -36
View File
@@ -10,7 +10,7 @@ from typing import Optional
from loguru import logger
from .fetchers import FetcherMethodType, build_plan, create_fetchers
from .fetchers.base import BaseFetcher
from .fetchers.base import BaseFetcher, FetchResult
from .authenticators import create_authenticators
from .cache import CacheEngine
from .lrc import LRCData
@@ -34,28 +34,91 @@ _STATUS_TTL: dict[CacheStatus, Optional[int]] = {
}
def _is_better(new: LyricResult, old: LyricResult) -> bool:
"""Compare two results: higher confidence wins; synced breaks ties."""
def _is_better(new: LyricResult, old: LyricResult, allow_unsynced: bool) -> bool:
"""Compare two results: higher confidence wins; if equal, synced > unsynced.
If allow_unsynced is False, treat unsynced as strictly worse than any synced."""
# If new is negative, it's definitely not better
if new.status not in (CacheStatus.SUCCESS_SYNCED, CacheStatus.SUCCESS_UNSYNCED):
return False
# If old is negative, the result is better or equal regardless of other factors
if old.status not in (CacheStatus.SUCCESS_SYNCED, CacheStatus.SUCCESS_UNSYNCED):
return True
# If unsynced results are not allowed, treat them as strictly worse than any synced result
if not allow_unsynced:
if (
new.status == CacheStatus.SUCCESS_UNSYNCED
and old.status == CacheStatus.SUCCESS_SYNCED
):
return False
if (
old.status == CacheStatus.SUCCESS_UNSYNCED
and new.status == CacheStatus.SUCCESS_SYNCED
):
return True
# Compare confidence
if new.confidence != old.confidence:
return new.confidence > old.confidence
# Equal confidence — prefer synced as tiebreaker
# Will return false if unsynced results are not allowed
return (
new.status == CacheStatus.SUCCESS_SYNCED
and old.status != CacheStatus.SUCCESS_SYNCED
)
def _normalize_result(result: LyricResult) -> LyricResult:
"""Normalize unsynced lyrics before returning."""
if result.status == CacheStatus.SUCCESS_UNSYNCED and result.lyrics:
return LyricResult(
status=result.status,
lyrics=result.lyrics.normalize_unsynced(),
source=result.source,
ttl=result.ttl,
confidence=result.confidence,
)
return result
def _pick_for_return(
result: FetchResult,
allow_unsynced: bool,
) -> Optional[LyricResult]:
"""Pick which lyric result should participate in final selection."""
candidates: list[LyricResult] = []
if result.synced and result.synced.status == CacheStatus.SUCCESS_SYNCED:
candidates.append(result.synced)
if (
allow_unsynced
and result.unsynced
and result.unsynced.status == CacheStatus.SUCCESS_UNSYNCED
):
candidates.append(result.unsynced)
if not candidates:
return None
best = candidates[0]
for c in candidates[1:]:
if _is_better(c, best, allow_unsynced=True):
best = c
return best
def _pick_for_cache(result: FetchResult) -> Optional[LyricResult]:
"""Pick a single cacheable result from FetchResult for legacy one-slot cache schema."""
slots = [r for r in (result.synced, result.unsynced) if r is not None]
if not slots:
return None
positives = [
r
for r in slots
if r.status in (CacheStatus.SUCCESS_SYNCED, CacheStatus.SUCCESS_UNSYNCED)
]
if positives:
best = positives[0]
for p in positives[1:]:
if _is_better(p, best, allow_unsynced=True):
best = p
return best
# If there is no positive result, prefer caching NETWORK_ERROR over NOT_FOUND
# to avoid long false-negative TTL when error signals disagree between slots.
for r in slots:
if r.status == CacheStatus.NETWORK_ERROR:
return r
for r in slots:
if r.status == CacheStatus.NOT_FOUND:
return r
return None
class LrcManager:
@@ -72,6 +135,7 @@ class LrcManager:
group: list[BaseFetcher],
track: TrackMeta,
bypass_cache: bool,
allow_unsynced: bool,
) -> list[tuple[str, LyricResult]]:
"""Run one group: cache-check first, then parallel-fetch uncached. Returns (source, result) pairs."""
cached_results: list[tuple[str, LyricResult]] = []
@@ -129,25 +193,30 @@ class LrcManager:
logger.debug(f"[{source}] returned None")
continue
if not fetcher.self_cached and not bypass_cache:
ttl = result.ttl or _STATUS_TTL.get(
result.status, TTL_NOT_FOUND
)
self.cache.set(track, source, result, ttl_seconds=ttl)
if result.status in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
logger.info(
f"[{source}] got {result.status.value} lyrics"
f" (confidence={result.confidence:.0f})"
)
cached_results.append((source, result))
cache_result = _pick_for_cache(result)
return_result = _pick_for_return(result, allow_unsynced)
if (
result.status == CacheStatus.SUCCESS_SYNCED
and result.confidence >= HIGH_CONFIDENCE
cache_result is not None
and not fetcher.self_cached
and not bypass_cache
):
ttl = cache_result.ttl or _STATUS_TTL.get(
cache_result.status, TTL_NOT_FOUND
)
self.cache.set(track, source, cache_result, ttl_seconds=ttl)
if return_result is not None:
logger.info(
f"[{source}] got {return_result.status.value} lyrics"
f" (confidence={return_result.confidence:.0f})"
)
cached_results.append((source, return_result))
if (
return_result is not None
and return_result.status == CacheStatus.SUCCESS_SYNCED
and return_result.confidence >= HIGH_CONFIDENCE
):
found_trusted = True
@@ -164,6 +233,7 @@ class LrcManager:
track: TrackMeta,
force_method: Optional[FetcherMethodType],
bypass_cache: bool,
allow_unsynced: bool,
) -> Optional[LyricResult]:
track = await enrich_track(track, self.enrichers)
logger.info(f"Fetching lyrics for: {track.display_name()}")
@@ -175,7 +245,12 @@ class LrcManager:
best_result: Optional[LyricResult] = None
for group in plan:
group_results = await self._run_group(group, track, bypass_cache)
group_results = await self._run_group(
group,
track,
bypass_cache,
allow_unsynced,
)
for source, result in group_results:
if result.status not in (
@@ -192,16 +267,26 @@ class LrcManager:
f"Returning {result.status.value} lyrics from {source}"
f" (confidence={result.confidence:.0f})"
)
return _normalize_result(result)
return result
if best_result is None or _is_better(result, best_result):
if best_result is None or _is_better(
result, best_result, allow_unsynced
):
best_result = result
if best_result:
if (
best_result.status == CacheStatus.SUCCESS_UNSYNCED
and not allow_unsynced
):
logger.info(
f"Unsynced lyrics found from {best_result.source}, but unsynced results are not allowed"
)
return None
logger.info(
f"Returning {best_result.status.value} lyrics from {best_result.source}"
)
return _normalize_result(best_result)
return best_result
logger.info(f"No lyrics found for {track.display_name()}")
return None
@@ -211,9 +296,17 @@ class LrcManager:
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
bypass_cache: bool = False,
allow_unsynced: bool = False,
) -> Optional[LyricResult]:
"""Fetch lyrics for *track* using the group-based parallel pipeline."""
return asyncio.run(self._fetch_for_track(track, force_method, bypass_cache))
return asyncio.run(
self._fetch_for_track(
track,
force_method,
bypass_cache,
allow_unsynced,
)
)
def manual_insert(
self,
+29 -4
View File
@@ -6,8 +6,35 @@ Description: Base fetcher class and common interfaces.
from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass
from ..models import TrackMeta, LyricResult
from ..models import CacheStatus, TrackMeta, LyricResult
@dataclass(frozen=True, slots=True)
class FetchResult:
synced: Optional[LyricResult] = None
unsynced: Optional[LyricResult] = None
@staticmethod
def from_not_found() -> "FetchResult":
return FetchResult(
synced=LyricResult(status=CacheStatus.NOT_FOUND, lyrics=None, source=None),
unsynced=LyricResult(
status=CacheStatus.NOT_FOUND, lyrics=None, source=None
),
)
@staticmethod
def from_network_error() -> "FetchResult":
return FetchResult(
synced=LyricResult(
status=CacheStatus.NETWORK_ERROR, lyrics=None, source=None
),
unsynced=LyricResult(
status=CacheStatus.NETWORK_ERROR, lyrics=None, source=None
),
)
class BaseFetcher(ABC):
@@ -28,8 +55,6 @@ class BaseFetcher(ABC):
pass
@abstractmethod
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
"""Fetch lyrics for the given track. Returns None if unable to fetch."""
pass
+65 -43
View File
@@ -12,7 +12,7 @@ from typing import Optional
from loguru import logger
from .base import BaseFetcher
from .base import BaseFetcher, FetchResult
from .selection import SearchCandidate, select_best
from ..models import TrackMeta, LyricResult, CacheStatus
from ..cache import CacheEngine
@@ -34,65 +34,87 @@ class CacheSearchFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
if bypass_cache:
logger.debug("Cache-search: bypassed by caller")
return None
if not track.title:
logger.debug("Cache-search: skipped — no title")
return None
# Fast path: exact metadata match (artist+title+album), single SQL query
exact = self._cache.find_best_positive(track)
if exact:
logger.info(f"Cache-search: exact hit ({exact.status.value})")
return exact
# Slow path: fuzzy cross-album search
matches = self._cache.search_by_meta(
title=track.title,
length=track.length,
def _get_exact(self, track: TrackMeta, synced: bool) -> Optional[LyricResult]:
exact = self._cache.find_best_positive(
track,
CacheStatus.SUCCESS_SYNCED if synced else CacheStatus.SUCCESS_UNSYNCED,
)
if exact and exact.lyrics is not None:
logger.info(
f"Cache-search: exact {'synced' if synced else 'unsynced'} hit ({exact.status.value})"
)
return exact
return None
if not matches:
logger.debug(f"Cache-search: no match for {track.display_name()}")
return None
# Pick best by confidence scoring
candidates = [
def _get_fuzzy(
self, matches: list, track: TrackMeta, synced: bool
) -> Optional[LyricResult]:
filtered = [
SearchCandidate(
item=m,
duration_ms=float(m["length"]) if m.get("length") else None,
is_synced=m.get("status") == CacheStatus.SUCCESS_SYNCED.value,
is_synced=synced,
title=m.get("title"),
artist=m.get("artist"),
album=m.get("album"),
)
for m in matches
if m.get("lyrics")
and (synced and m.get("status") == CacheStatus.SUCCESS_SYNCED.value)
or (not synced and m.get("status") == CacheStatus.SUCCESS_UNSYNCED.value)
]
best, confidence = select_best(
candidates,
filtered,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if best and best.get("lyrics") is not None:
status = (
CacheStatus.SUCCESS_SYNCED if synced else CacheStatus.SUCCESS_UNSYNCED
)
logger.info(
f"Cache-search: fuzzy {'synced' if synced else 'unsynced'} hit from "
f"[{best.get('source')}] album={best.get('album')!r} (confidence={confidence:.0f})"
)
return LyricResult(
status=status,
lyrics=LRCData(best["lyrics"]),
source=self.source_name,
confidence=confidence,
)
return None
if not best:
return None
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if bypass_cache:
logger.debug("Cache-search: bypassed by caller")
return FetchResult()
status = CacheStatus(best["status"])
logger.info(
f"Cache-search: fuzzy hit from [{best.get('source')}] "
f"album={best.get('album')!r} ({status.value}, confidence={confidence:.0f})"
)
return LyricResult(
status=status,
lyrics=LRCData(best["lyrics"]),
source=self.source_name,
confidence=confidence,
)
if not track.title:
logger.debug("Cache-search: skipped — no title")
return FetchResult()
res_synced: Optional[LyricResult] = None
res_unsynced: Optional[LyricResult] = None
# Fast path: exact metadata match (artist+title+album), single SQL query
res_synced = self._get_exact(track, synced=True)
res_unsynced = self._get_exact(track, synced=False)
if res_synced and res_unsynced:
return FetchResult(synced=res_synced, unsynced=res_unsynced)
# Slow path: fuzzy cross-album search
matches = self._cache.search_by_meta(title=track.title, length=track.length)
if not matches:
logger.debug(f"Cache-search: no match for {track.display_name()}")
return FetchResult(synced=res_synced, unsynced=res_unsynced)
if not res_synced:
res_synced = self._get_fuzzy(matches, track, synced=True)
if not res_unsynced:
res_unsynced = self._get_fuzzy(matches, track, synced=False)
return FetchResult(synced=res_synced, unsynced=res_unsynced)
+60 -45
View File
@@ -12,8 +12,8 @@ from loguru import logger
from mutagen._file import File
from mutagen.flac import FLAC
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult
from .base import BaseFetcher, FetchResult
from ..models import CacheStatus, TrackMeta, LyricResult
from ..lrc import get_audio_path, get_sidecar_path, LRCData
@@ -25,17 +25,18 @@ class LocalFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return track.is_local
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
"""Attempt to read lyrics from local filesystem."""
if not track.is_local or not track.url:
return None
return FetchResult()
audio_path = get_audio_path(track.url, ensure_exists=False)
if not audio_path:
logger.debug(f"Local: audio URL is not a valid file path: {track.url}")
return None
return FetchResult()
synced_result: Optional[LyricResult] = None
unsynced_result: Optional[LyricResult] = None
lrc_path = get_sidecar_path(
track.url, ensure_audio_exists=False, ensure_exists=True
@@ -50,11 +51,19 @@ class LocalFetcher(BaseFetcher):
logger.info(
f"Local: found .lrc sidecar ({status.value}) for {audio_path.name}"
)
return LyricResult(
status=status,
lyrics=lrc,
source=self.source_name,
)
if status == CacheStatus.SUCCESS_SYNCED:
synced_result = LyricResult(
status=status,
lyrics=lrc,
source=f"{self.source_name} (sidecar)",
)
else:
unsynced_result = LyricResult(
status=status,
lyrics=lrc,
source=f"{self.source_name} (sidecar)",
)
except Exception as e:
logger.error(f"Local: error reading {lrc_path}: {e}")
else:
@@ -63,39 +72,45 @@ class LocalFetcher(BaseFetcher):
# Embedded metadata
if not audio_path.exists():
logger.debug(f"Local: audio file does not exist: {audio_path}")
return None
try:
audio = File(audio_path)
if audio is not None:
lyrics = None
else:
try:
audio = File(audio_path)
if audio is not None:
lyrics = None
if isinstance(audio, FLAC):
# FLAC stores lyrics in vorbis comment tags
lyrics = (
audio.get("lyrics") or audio.get("unsynclyrics") or [None]
)[0]
elif hasattr(audio, "tags") and audio.tags:
# MP3 / other: look for USLT or SYLT ID3 frames
for key in audio.tags.keys():
if key.startswith("USLT") or key.startswith("SYLT"):
lyrics = str(audio.tags[key])
break
if isinstance(audio, FLAC):
# FLAC stores lyrics in vorbis comment tags
lyrics = (
audio.get("lyrics") or audio.get("unsynclyrics") or [None]
)[0]
elif hasattr(audio, "tags") and audio.tags:
# MP3 / other: look for USLT or SYLT ID3 frames
for key in audio.tags.keys():
if key.startswith("USLT") or key.startswith("SYLT"):
lyrics = str(audio.tags[key])
break
if lyrics:
lrc = LRCData(lyrics)
status = lrc.detect_sync_status()
logger.info(
f"Local: found embedded lyrics ({status.value}) for {audio_path.name}"
)
return LyricResult(
status=status,
lyrics=lrc,
source=f"{self.source_name} (embedded)",
)
else:
logger.debug("Local: no embedded lyrics found")
except Exception as e:
logger.error(f"Local: error reading metadata for {audio_path}: {e}")
if lyrics:
lrc = LRCData(lyrics)
status = lrc.detect_sync_status()
logger.info(
f"Local: found embedded lyrics ({status.value}) for {audio_path.name}"
)
if status == CacheStatus.SUCCESS_SYNCED and not synced_result:
synced_result = LyricResult(
status=status,
lyrics=lrc,
source=f"{self.source_name} (embedded)",
)
elif not unsynced_result:
unsynced_result = LyricResult(
status=status,
lyrics=lrc,
source=f"{self.source_name} (embedded)",
)
else:
logger.debug("Local: no embedded lyrics found")
except Exception as e:
logger.error(f"Local: error reading metadata for {audio_path}: {e}")
logger.debug(f"Local: no lyrics found for {audio_path}")
return None
return FetchResult(synced=synced_result, unsynced=unsynced_result)
+21 -22
View File
@@ -5,19 +5,17 @@ Description: LRCLIB fetcher — queries lrclib.net for synced/plain lyrics.
Requires complete track metadata (artist, title, album, duration).
"""
from typing import Optional
import httpx
from loguru import logger
from urllib.parse import urlencode
from .base import BaseFetcher
from .base import BaseFetcher, FetchResult
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
UA_LRX,
)
@@ -32,13 +30,11 @@ class LrclibFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return track.is_complete
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
"""Fetch lyrics from LRCLIB. Requires complete metadata."""
if not track.is_complete:
logger.debug("LRCLIB: skipped — incomplete metadata")
return None
return FetchResult()
params = {
"track_name": track.title,
@@ -55,48 +51,51 @@ class LrclibFetcher(BaseFetcher):
if resp.status_code == 404:
logger.debug(f"LRCLIB: not found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
if resp.status_code != 200:
logger.error(f"LRCLIB: API returned {resp.status_code}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
return FetchResult.from_network_error()
data = resp.json()
if not isinstance(data, dict):
logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
return FetchResult.from_network_error()
synced = data.get("syncedLyrics")
unsynced = data.get("plainLyrics")
res_synced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
res_unsynced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
if isinstance(synced, str) and synced.strip():
lyrics = LRCData(synced)
logger.info(f"LRCLIB: got synced lyrics ({len(lyrics)} lines)")
return LyricResult(
res_synced = LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
)
elif isinstance(unsynced, str) and unsynced.strip():
if isinstance(unsynced, str) and unsynced.strip():
lyrics = LRCData(unsynced)
logger.info(f"LRCLIB: got unsynced lyrics ({len(lyrics)} lines)")
return LyricResult(
res_unsynced = LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source=self.source_name,
ttl=TTL_UNSYNCED,
)
else:
logger.debug(f"LRCLIB: empty response for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult(synced=res_synced, unsynced=res_unsynced)
except httpx.HTTPError as e:
logger.error(f"LRCLIB: HTTP error: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
except Exception as e:
logger.error(f"LRCLIB: unexpected error: {e}")
return None
return FetchResult()
+21 -20
View File
@@ -7,11 +7,10 @@ Description: LRCLIB search fetcher — fuzzy search via lrclib.net /api/search.
import asyncio
import httpx
from typing import Optional
from loguru import logger
from urllib.parse import urlencode
from .base import BaseFetcher
from .base import BaseFetcher, FetchResult
from .selection import SearchCandidate, select_best
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
@@ -19,7 +18,6 @@ from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
UA_LRX,
)
@@ -62,12 +60,10 @@ class LrclibSearchFetcher(BaseFetcher):
return queries
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if not track.title:
logger.debug("LRCLIB-search: skipped — no title")
return None
return FetchResult()
queries = self._build_queries(track)
logger.info(f"LRCLIB-search: searching for {track.display_name()}")
@@ -110,11 +106,9 @@ class LrclibSearchFetcher(BaseFetcher):
if not candidates:
if had_error:
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
return FetchResult.from_network_error()
logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
logger.debug(
f"LRCLIB-search: got {len(candidates)} unique candidates "
@@ -144,41 +138,48 @@ class LrclibSearchFetcher(BaseFetcher):
)
if best is None:
logger.debug("LRCLIB-search: no valid candidate found")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
synced = best.get("syncedLyrics")
unsynced = best.get("plainLyrics")
res_synced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
res_unsynced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
if isinstance(synced, str) and synced.strip():
lyrics = LRCData(synced)
logger.info(
f"LRCLIB-search: got synced lyrics ({len(lyrics)} lines, confidence={confidence:.0f})"
)
return LyricResult(
res_synced = LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
confidence=confidence,
)
elif isinstance(unsynced, str) and unsynced.strip():
if isinstance(unsynced, str) and unsynced.strip():
lyrics = LRCData(unsynced)
logger.info(
f"LRCLIB-search: got unsynced lyrics ({len(lyrics)} lines, confidence={confidence:.0f})"
)
return LyricResult(
res_unsynced = LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source=self.source_name,
ttl=TTL_UNSYNCED,
confidence=confidence,
)
else:
logger.debug("LRCLIB-search: best candidate has empty lyrics")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult(synced=res_synced, unsynced=res_unsynced)
except httpx.HTTPError as e:
logger.error(f"LRCLIB-search: HTTP error: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
except Exception as e:
logger.error(f"LRCLIB-search: unexpected error: {e}")
return None
return FetchResult()
+28 -24
View File
@@ -15,12 +15,11 @@ import json
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from .base import BaseFetcher, FetchResult
from .selection import SearchCandidate, select_best
from ..authenticators.musixmatch import MusixmatchAuthenticator
from ..lrc import LRCData
from ..models import CacheStatus, LyricResult, TrackMeta
from ..config import TTL_NETWORK_ERROR, TTL_NOT_FOUND
_MUSIXMATCH_MACRO_URL = "https://apic-desktop.musixmatch.com/ws/1.1/macro.subtitles.get"
_MUSIXMATCH_SEARCH_URL = "https://apic-desktop.musixmatch.com/ws/1.1/track.search"
@@ -156,9 +155,7 @@ class MusixmatchSpotifyFetcher(BaseFetcher):
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and not self.auth.is_cooldown()
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
logger.info(f"Musixmatch-Spotify: fetching lyrics for {track.display_name()}")
try:
@@ -167,22 +164,27 @@ class MusixmatchSpotifyFetcher(BaseFetcher):
{"track_spotify_id": track.trackid}, # type: ignore[dict-item]
)
except AttributeError:
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
except Exception as e:
logger.error(f"Musixmatch-Spotify: fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
if lrc is None:
logger.debug(
f"Musixmatch-Spotify: no lyrics found for {track.display_name()}"
)
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
logger.info(f"Musixmatch-Spotify: got SUCCESS_SYNCED lyrics ({len(lrc)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrc,
source=self.source_name,
return FetchResult(
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrc,
source=self.source_name,
),
# Fetching unsynced lyrics is not possible with current endpoint,
# so no need to cache NOT_FOUND to avoid repeated failed attempts
unsynced=None,
)
@@ -258,38 +260,40 @@ class MusixmatchFetcher(BaseFetcher):
logger.debug("Musixmatch: no suitable candidate found")
return best_id, confidence
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
logger.info(f"Musixmatch: fetching lyrics for {track.display_name()}")
try:
commontrack_id, confidence = await self._search(track)
if commontrack_id is None:
logger.debug(f"Musixmatch: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
lrc = await _fetch_macro(
self.auth,
{"commontrack_id": str(commontrack_id)},
)
except AttributeError:
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
except Exception as e:
logger.error(f"Musixmatch: fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
if lrc is None:
logger.debug(f"Musixmatch: no lyrics for commontrack_id={commontrack_id}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
logger.info(
f"Musixmatch: got SUCCESS_SYNCED lyrics "
f"for commontrack_id={commontrack_id} ({len(lrc)} lines)"
)
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrc,
source=self.source_name,
confidence=confidence,
return FetchResult(
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrc,
source=self.source_name,
confidence=confidence,
),
# Same as above
unsynced=None,
)
+59 -25
View File
@@ -8,18 +8,16 @@ Description: Netease Cloud Music fetcher.
"""
import asyncio
from typing import Optional
import httpx
from loguru import logger
from .base import BaseFetcher
from .base import BaseFetcher, FetchResult
from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
MULTI_CANDIDATE_DELAY_S,
UA_BROWSER,
)
@@ -112,9 +110,7 @@ class NeteaseFetcher(BaseFetcher):
logger.error(f"Netease: search failed: {e}")
return []
async def _get_lyric(
self, song_id: int, confidence: float = 0.0
) -> Optional[LyricResult]:
async def _get_lyric(self, song_id: int, confidence: float = 0.0) -> FetchResult:
logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
try:
@@ -141,21 +137,19 @@ class NeteaseFetcher(BaseFetcher):
logger.error(
f"Netease: lyric response is not dict: {type(data).__name__}"
)
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
return FetchResult.from_network_error()
lrc_obj = data.get("lrc")
if not isinstance(lrc_obj, dict):
logger.debug(
f"Netease: no 'lrc' object in response for song_id={song_id}"
)
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
lrc: str = lrc_obj.get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
logger.debug(f"Netease: empty lyrics for song_id={song_id}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
@@ -163,38 +157,78 @@ class NeteaseFetcher(BaseFetcher):
f"Netease: got {status.value} lyrics for song_id={song_id} "
f"({len(lrcdata)} lines)"
)
return LyricResult(
status=status,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
not_found = LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if status == CacheStatus.SUCCESS_SYNCED:
return FetchResult(
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
),
unsynced=not_found,
)
return FetchResult(
synced=not_found,
unsynced=LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
),
)
except Exception as e:
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
logger.debug("Netease: skipped — insufficient metadata")
return None
return FetchResult()
logger.info(f"Netease: fetching lyrics for {track.display_name()}")
candidates = await self._search(track)
if not candidates:
logger.debug(f"Netease: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
res_synced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
res_unsynced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
for i, (song_id, confidence) in enumerate(candidates):
if i > 0:
await asyncio.sleep(MULTI_CANDIDATE_DELAY_S)
result = await self._get_lyric(song_id, confidence=confidence)
if result is None or result.status == CacheStatus.NETWORK_ERROR:
if result.synced and result.synced.status == CacheStatus.NETWORK_ERROR:
return result
if result.status != CacheStatus.NOT_FOUND:
if result.unsynced and result.unsynced.status == CacheStatus.NETWORK_ERROR:
return result
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if (
res_synced.status == CacheStatus.NOT_FOUND
and result.synced
and result.synced.status == CacheStatus.SUCCESS_SYNCED
):
res_synced = result.synced
if (
res_unsynced.status == CacheStatus.NOT_FOUND
and result.unsynced
and result.unsynced.status == CacheStatus.SUCCESS_UNSYNCED
):
res_unsynced = result.unsynced
# Netease API is quite expensive, so we stop after finding synced lyrics,
# instead of trying to find both synced and unsynced versions
if (
res_synced.status == CacheStatus.SUCCESS_SYNCED
# and res_unsynced.status == CacheStatus.SUCCESS_UNSYNCED
):
break
return FetchResult(synced=res_synced, unsynced=res_unsynced)
+59 -25
View File
@@ -10,18 +10,16 @@ Description: QQ Music fetcher via self-hosted API proxy.
"""
import asyncio
from typing import Optional
import httpx
from loguru import logger
from .base import BaseFetcher
from .base import BaseFetcher, FetchResult
from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
MULTI_CANDIDATE_DELAY_S,
)
@@ -104,9 +102,7 @@ class QQMusicFetcher(BaseFetcher):
logger.error(f"QQMusic: search failed: {e}")
return []
async def _get_lyric(
self, mid: str, confidence: float = 0.0
) -> Optional[LyricResult]:
async def _get_lyric(self, mid: str, confidence: float = 0.0) -> FetchResult:
logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
try:
@@ -120,56 +116,94 @@ class QQMusicFetcher(BaseFetcher):
if data.get("code") != 0:
logger.error(f"QQMusic: lyric API error: {data}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
return FetchResult.from_network_error()
lrc = data.get("data", {}).get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} ({len(lrcdata)} lines)"
)
return LyricResult(
status=status,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
not_found = LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if status == CacheStatus.SUCCESS_SYNCED:
return FetchResult(
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
),
unsynced=not_found,
)
return FetchResult(
synced=not_found,
unsynced=LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
),
)
except Exception as e:
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if not self.auth.is_configured():
logger.debug("QQMusic: skipped — Auth not configured")
return None
return FetchResult()
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
logger.debug("QQMusic: skipped — insufficient metadata")
return None
return FetchResult()
logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
candidates = await self._search(track)
if not candidates:
logger.debug(f"QQMusic: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
res_synced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
res_unsynced: LyricResult = LyricResult(
status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND
)
for i, (mid, confidence) in enumerate(candidates):
if i > 0:
await asyncio.sleep(MULTI_CANDIDATE_DELAY_S)
result = await self._get_lyric(mid, confidence=confidence)
if result is None or result.status == CacheStatus.NETWORK_ERROR:
if result.synced and result.synced.status == CacheStatus.NETWORK_ERROR:
return result
if result.status != CacheStatus.NOT_FOUND:
if result.unsynced and result.unsynced.status == CacheStatus.NETWORK_ERROR:
return result
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if (
res_synced.status == CacheStatus.NOT_FOUND
and result.synced
and result.synced.status == CacheStatus.SUCCESS_SYNCED
):
res_synced = result.synced
if (
res_unsynced.status == CacheStatus.NOT_FOUND
and result.unsynced
and result.unsynced.status == CacheStatus.SUCCESS_UNSYNCED
):
res_unsynced = result.unsynced
# QQMusic API is quite expensive, so we stop after finding synced lyrics,
# instead of trying to find both synced and unsynced versions
if (
res_synced.status == CacheStatus.SUCCESS_SYNCED
# and res_unsynced.status == CacheStatus.SUCCESS_UNSYNCED
):
break
return FetchResult(synced=res_synced, unsynced=res_unsynced)
+10 -9
View File
@@ -70,14 +70,12 @@ def _score_candidate(
Scoring works in two tiers:
1. **Metadata score** — computed from fields available on *both* sides,
then rescaled to fill the 0-90 range so that missing fields don't
inflate the score. Fields missing on both sides are simply excluded
from the calculation (neutral). Fields present on only one side
contribute 0 to the numerator but their weight still counts in the
denominator (penalty for asymmetric absence).
2. **Synced bonus** — a flat 10 pts, always applied independently.
Metadata score — computed from fields available on both sides,
then rescaled to fill the 0-90 range so that missing fields don't
inflate the score. Fields missing on both sides are simply excluded
from the calculation (neutral). Fields present on only one side
contribute 0 to the numerator but their weight still counts in the
denominator (penalty for asymmetric absence).
Field weights (before rescaling):
- Title: 40
@@ -141,7 +139,10 @@ def _score_candidate(
metadata_score = 0.0
# Synced bonus (always 10 pts, independent of metadata)
synced_score = _W_SYNCED if c.is_synced else 0.0
# synced_score = _W_SYNCED if c.is_synced else 0.0
# EDIT: synced or not should not affect the score that indicates metadata similarity.
# Always apply synced bonus regardless of is_synced.
synced_score = _W_SYNCED
return metadata_score + synced_score
+28 -18
View File
@@ -5,14 +5,13 @@ Description: Spotify fetcher — obtains synced lyrics via Spotify's internal co
"""
import httpx
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from .base import BaseFetcher, FetchResult
from ..authenticators.spotify import SpotifyAuthenticator, SPOTIFY_BASE_HEADERS
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import HTTP_TIMEOUT, TTL_NOT_FOUND, TTL_NETWORK_ERROR
from ..config import HTTP_TIMEOUT, TTL_NOT_FOUND
_SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
@@ -46,19 +45,17 @@ class SpotifyFetcher(BaseFetcher):
continue
return False
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
async def fetch(self, track: TrackMeta, bypass_cache: bool = False) -> FetchResult:
if not track.trackid:
logger.debug("Spotify: skipped — no trackid in metadata")
return None
return FetchResult()
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
token = await self.auth.authenticate()
if not token:
logger.error("Spotify: cannot fetch lyrics without a token")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
url = f"{_SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token"
headers = {
@@ -73,21 +70,17 @@ class SpotifyFetcher(BaseFetcher):
if res.status_code == 404:
logger.debug(f"Spotify: 404 for trackid={track.trackid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
if res.status_code != 200:
logger.error(f"Spotify: lyrics API returned {res.status_code}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
return FetchResult.from_network_error()
data = res.json()
if not isinstance(data, dict) or "lyrics" not in data:
logger.error("Spotify: unexpected lyrics response structure")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
return FetchResult.from_network_error()
lyrics_data = data["lyrics"]
sync_type = lyrics_data.get("syncType", "")
@@ -95,7 +88,7 @@ class SpotifyFetcher(BaseFetcher):
if not isinstance(lines, list) or len(lines) == 0:
logger.debug("Spotify: response contained no lyric lines")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return FetchResult.from_not_found()
is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
@@ -122,8 +115,25 @@ class SpotifyFetcher(BaseFetcher):
)
logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)")
return LyricResult(status=status, lyrics=content, source=self.source_name)
not_found = LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if is_synced:
return FetchResult(
synced=LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=content,
source=self.source_name,
),
unsynced=not_found,
)
return FetchResult(
synced=not_found,
unsynced=LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=content,
source=self.source_name,
),
)
except Exception as e:
logger.error(f"Spotify: lyrics fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
return FetchResult.from_network_error()
+8 -2
View File
@@ -271,6 +271,9 @@ class LRCData:
return "\n".join(sorted_lines).strip()
def to_unsynced(self):
return LRCData(self.to_plain())
def to_lrc(
self,
plain: bool = False,
@@ -279,9 +282,12 @@ class LRCData:
Assumes text has been normalized by normalize.
"""
ret = self
if not self.is_synced():
ret = self.normalize_unsynced()
if plain:
return self.to_plain()
return "\n".join(self._lines)
return ret.to_plain()
return "\n".join(ret._lines)
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
-5
View File
@@ -10,8 +10,6 @@ from enum import Enum
from typing import Optional, TYPE_CHECKING
from dataclasses import dataclass
from .config import SCORE_W_SYNCED
if TYPE_CHECKING:
from .lrc import LRCData
@@ -69,6 +67,3 @@ class LyricResult:
def __post_init__(self) -> None:
if self.status in (CacheStatus.NOT_FOUND, CacheStatus.NETWORK_ERROR):
self.confidence = 0.0
if self.status is CacheStatus.SUCCESS_UNSYNCED and self.confidence == 100.0:
# Fix: remove inflated confidence for unsynced results
self.confidence = 100 - SCORE_W_SYNCED