feat: lrclib-search fetcher now do multiple request to cover more cases
refactor: abstract selection logic & test dafür
This commit is contained in:
@@ -1 +0,0 @@
|
||||
claude --resume 48d54aac-a89b-48c3-8a76-23e9eb73722d
|
||||
@@ -15,6 +15,7 @@ from loguru import logger
|
||||
|
||||
|
||||
from .base import BaseFetcher
|
||||
from .selection import SearchCandidate, select_best
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..cache import CacheEngine
|
||||
from ..lrc import LRCData
|
||||
@@ -64,15 +65,17 @@ class CacheSearchFetcher(BaseFetcher):
|
||||
return None
|
||||
|
||||
# Pick best: prefer synced, then first available
|
||||
best = None
|
||||
for m in matches:
|
||||
if m.get("status") == CacheStatus.SUCCESS_SYNCED.value:
|
||||
best = m
|
||||
break
|
||||
if best is None:
|
||||
best = m
|
||||
candidates = [
|
||||
SearchCandidate(
|
||||
item=m,
|
||||
is_synced=m.get("status") == CacheStatus.SUCCESS_SYNCED.value,
|
||||
)
|
||||
for m in matches
|
||||
if m.get("lyrics")
|
||||
]
|
||||
best = select_best(candidates, track.length)
|
||||
|
||||
if not best or not best.get("lyrics"):
|
||||
if not best:
|
||||
return None
|
||||
|
||||
status = CacheStatus(best["status"])
|
||||
|
||||
@@ -15,6 +15,7 @@ from loguru import logger
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from .base import BaseFetcher
|
||||
from .selection import SearchCandidate, select_best
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
@@ -22,7 +23,6 @@ from ..config import (
|
||||
TTL_UNSYNCED,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
LRCLIB_SEARCH_URL,
|
||||
UA_LRX,
|
||||
)
|
||||
@@ -36,6 +36,34 @@ class LrclibSearchFetcher(BaseFetcher):
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return bool(track.title)
|
||||
|
||||
def _build_queries(self, track: TrackMeta) -> list[dict[str, str]]:
|
||||
"""Build up to 4 query param sets, from most specific to least.
|
||||
|
||||
1. title + artist + album (if all present)
|
||||
2. title + artist (if artist present)
|
||||
3. title + album (if album present)
|
||||
4. title only
|
||||
"""
|
||||
assert track.title is not None
|
||||
title = track.title
|
||||
queries: list[dict[str, str]] = []
|
||||
|
||||
if track.artist and track.album:
|
||||
queries.append(
|
||||
{
|
||||
"track_name": title,
|
||||
"artist_name": track.artist,
|
||||
"album_name": track.album,
|
||||
}
|
||||
)
|
||||
if track.artist:
|
||||
queries.append({"track_name": title, "artist_name": track.artist})
|
||||
if track.album:
|
||||
queries.append({"track_name": title, "album_name": track.album})
|
||||
queries.append({"track_name": title})
|
||||
|
||||
return queries
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
@@ -44,40 +72,68 @@ class LrclibSearchFetcher(BaseFetcher):
|
||||
logger.debug("LRCLIB-search: skipped — no title")
|
||||
return None
|
||||
|
||||
params: dict[str, str] = {"track_name": track.title}
|
||||
if track.artist:
|
||||
params["artist_name"] = track.artist
|
||||
if track.album:
|
||||
params["album_name"] = track.album
|
||||
|
||||
url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
|
||||
queries = self._build_queries(track)
|
||||
logger.info(f"LRCLIB-search: searching for {track.display_name()}")
|
||||
|
||||
seen_ids: set[int] = set()
|
||||
candidates: list[dict] = []
|
||||
had_error = False
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.get(url, headers={"User-Agent": UA_LRX})
|
||||
for params in queries:
|
||||
url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
|
||||
logger.debug(f"LRCLIB-search: query {params}")
|
||||
resp = client.get(url, headers={"User-Agent": UA_LRX})
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
|
||||
had_error = True
|
||||
continue
|
||||
|
||||
data = resp.json()
|
||||
data = resp.json()
|
||||
if not isinstance(data, list):
|
||||
continue
|
||||
|
||||
if not isinstance(data, list) or len(data) == 0:
|
||||
for item in data:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
item_id = item.get("id")
|
||||
if item_id is not None and item_id in seen_ids:
|
||||
continue
|
||||
if item_id is not None:
|
||||
seen_ids.add(item_id)
|
||||
candidates.append(item)
|
||||
|
||||
if not candidates:
|
||||
if had_error:
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
logger.debug(f"LRCLIB-search: got {len(data)} candidates")
|
||||
logger.debug(
|
||||
f"LRCLIB-search: got {len(candidates)} unique candidates "
|
||||
f"from {len(queries)} queries"
|
||||
)
|
||||
|
||||
# Select best match by duration
|
||||
best = self._select_best(data, track)
|
||||
mapped = [
|
||||
SearchCandidate(
|
||||
item=item,
|
||||
duration_ms=item["duration"] * 1000
|
||||
if isinstance(item.get("duration"), (int, float))
|
||||
else None,
|
||||
is_synced=isinstance(item.get("syncedLyrics"), str)
|
||||
and bool(item["syncedLyrics"].strip()),
|
||||
)
|
||||
for item in candidates
|
||||
]
|
||||
best = select_best(mapped, track.length)
|
||||
if best is None:
|
||||
logger.debug("LRCLIB-search: no valid candidate found")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Extract lyrics
|
||||
synced = best.get("syncedLyrics")
|
||||
unsynced = best.get("plainLyrics")
|
||||
|
||||
@@ -108,57 +164,3 @@ class LrclibSearchFetcher(BaseFetcher):
|
||||
except Exception as e:
|
||||
logger.error(f"LRCLIB-search: unexpected error: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _select_best(candidates: list[dict], track: TrackMeta) -> Optional[dict]:
|
||||
"""Pick the best candidate, preferring synced lyrics and closest duration."""
|
||||
if track.length is not None:
|
||||
track_s = track.length / 1000.0
|
||||
best: Optional[dict] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for item in candidates:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
duration = item.get("duration")
|
||||
if not isinstance(duration, (int, float)):
|
||||
continue
|
||||
diff = abs(duration - track_s) * 1000 # compare in ms
|
||||
if diff > DURATION_TOLERANCE_MS:
|
||||
continue
|
||||
# Prefer synced over unsynced at similar duration
|
||||
has_synced = (
|
||||
isinstance(item.get("syncedLyrics"), str)
|
||||
and item["syncedLyrics"].strip()
|
||||
)
|
||||
best_synced = (
|
||||
best is not None
|
||||
and isinstance(best.get("syncedLyrics"), str)
|
||||
and best["syncedLyrics"].strip()
|
||||
)
|
||||
if diff < best_diff or (
|
||||
diff == best_diff and has_synced and not best_synced
|
||||
):
|
||||
best_diff = diff
|
||||
best = item
|
||||
|
||||
if best is not None:
|
||||
logger.debug(
|
||||
f"LRCLIB-search: selected id={best.get('id')} (diff={best_diff:.0f}ms)"
|
||||
)
|
||||
return best
|
||||
|
||||
logger.debug(
|
||||
f"LRCLIB-search: no candidate within {DURATION_TOLERANCE_MS}ms"
|
||||
)
|
||||
return None
|
||||
|
||||
# No duration — pick first with synced lyrics, or just first
|
||||
for item in candidates:
|
||||
if (
|
||||
isinstance(item, dict)
|
||||
and isinstance(item.get("syncedLyrics"), str)
|
||||
and item["syncedLyrics"].strip()
|
||||
):
|
||||
return item
|
||||
return candidates[0] if isinstance(candidates[0], dict) else None
|
||||
|
||||
+16
-45
@@ -17,13 +17,13 @@ import httpx
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from .selection import SearchCandidate, select_best
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
NETEASE_SEARCH_URL,
|
||||
NETEASE_LYRIC_URL,
|
||||
UA_BROWSER,
|
||||
@@ -84,52 +84,23 @@ class NeteaseFetcher(BaseFetcher):
|
||||
|
||||
logger.debug(f"Netease: search returned {len(songs)} candidates")
|
||||
|
||||
# Duration-based best-match selection
|
||||
if track.length is not None:
|
||||
track_ms = track.length
|
||||
best_id: Optional[int] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for song in songs:
|
||||
if not isinstance(song, dict):
|
||||
continue
|
||||
sid = song.get("id")
|
||||
name = song.get("name", "?")
|
||||
duration = song.get("dt") # milliseconds
|
||||
if not isinstance(duration, int):
|
||||
logger.debug(
|
||||
f" candidate {sid} '{name}': no duration, skipped"
|
||||
)
|
||||
continue
|
||||
diff = abs(duration - track_ms)
|
||||
logger.debug(
|
||||
f" candidate {sid} '{name}': "
|
||||
f"duration={duration}ms, diff={diff}ms"
|
||||
)
|
||||
if diff < best_diff:
|
||||
best_diff = diff
|
||||
best_id = sid
|
||||
|
||||
if best_id is not None and best_diff <= DURATION_TOLERANCE_MS:
|
||||
logger.debug(f"Netease: selected id={best_id} (diff={best_diff}ms)")
|
||||
return best_id
|
||||
|
||||
logger.debug(
|
||||
f"Netease: no candidate within {DURATION_TOLERANCE_MS}ms "
|
||||
f"(best diff={best_diff}ms)"
|
||||
candidates = [
|
||||
SearchCandidate(
|
||||
item=song.get("id"),
|
||||
duration_ms=float(song["dt"])
|
||||
if isinstance(song.get("dt"), int)
|
||||
else None,
|
||||
)
|
||||
return None
|
||||
for song in songs
|
||||
if isinstance(song, dict) and song.get("id") is not None
|
||||
]
|
||||
best_id = select_best(candidates, track.length)
|
||||
if best_id is not None:
|
||||
logger.debug(f"Netease: selected id={best_id}")
|
||||
return best_id
|
||||
|
||||
# No duration info — take the first result
|
||||
first = songs[0]
|
||||
if not isinstance(first, dict) or "id" not in first:
|
||||
logger.error("Netease: first search result has no 'id'")
|
||||
return None
|
||||
logger.debug(
|
||||
f"Netease: no duration available, using first result "
|
||||
f"id={first['id']} '{first.get('name', '?')}'"
|
||||
)
|
||||
return first["id"]
|
||||
logger.debug("Netease: no suitable candidate found")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Netease: search failed: {e}")
|
||||
|
||||
+16
-49
@@ -16,13 +16,13 @@ import httpx
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from .selection import SearchCandidate, select_best
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
QQ_MUSIC_API_URL,
|
||||
)
|
||||
|
||||
@@ -63,56 +63,23 @@ class QQMusicFetcher(BaseFetcher):
|
||||
|
||||
logger.debug(f"QQMusic: search returned {len(songs)} candidates")
|
||||
|
||||
# Duration-based best-match selection
|
||||
if track.length is not None:
|
||||
track_ms = track.length
|
||||
best_mid: Optional[str] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for song in songs:
|
||||
if not isinstance(song, dict):
|
||||
continue
|
||||
mid = song.get("mid")
|
||||
name = song.get("name", "?")
|
||||
# interval is in seconds
|
||||
interval = song.get("interval")
|
||||
if not isinstance(interval, int):
|
||||
logger.debug(
|
||||
f" candidate {mid} '{name}': no duration, skipped"
|
||||
)
|
||||
continue
|
||||
duration_ms = interval * 1000
|
||||
diff = abs(duration_ms - track_ms)
|
||||
logger.debug(
|
||||
f" candidate {mid} '{name}': "
|
||||
f"duration={duration_ms}ms, diff={diff}ms"
|
||||
)
|
||||
if diff < best_diff:
|
||||
best_diff = diff
|
||||
best_mid = mid
|
||||
|
||||
if best_mid is not None and best_diff <= DURATION_TOLERANCE_MS:
|
||||
logger.debug(
|
||||
f"QQMusic: selected mid={best_mid} (diff={best_diff}ms)"
|
||||
)
|
||||
return best_mid
|
||||
|
||||
logger.debug(
|
||||
f"QQMusic: no candidate within {DURATION_TOLERANCE_MS}ms "
|
||||
f"(best diff={best_diff}ms)"
|
||||
candidates = [
|
||||
SearchCandidate(
|
||||
item=song.get("mid"),
|
||||
duration_ms=float(song["interval"]) * 1000
|
||||
if isinstance(song.get("interval"), int)
|
||||
else None,
|
||||
)
|
||||
return None
|
||||
for song in songs
|
||||
if isinstance(song, dict) and song.get("mid") is not None
|
||||
]
|
||||
best_mid = select_best(candidates, track.length)
|
||||
if best_mid is not None:
|
||||
logger.debug(f"QQMusic: selected mid={best_mid}")
|
||||
return best_mid
|
||||
|
||||
# No duration info — take the first result
|
||||
first = songs[0]
|
||||
if not isinstance(first, dict) or "mid" not in first:
|
||||
logger.error("QQMusic: first search result has no 'mid'")
|
||||
return None
|
||||
logger.debug(
|
||||
f"QQMusic: no duration available, using first result "
|
||||
f"mid={first['mid']} '{first.get('name', '?')}'"
|
||||
)
|
||||
return first["mid"]
|
||||
logger.debug("QQMusic: no suitable candidate found")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"QQMusic: search failed: {e}")
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
"""
|
||||
Shared candidate-selection logic for search-based fetchers.
|
||||
|
||||
Each fetcher maps its API-specific results to SearchCandidate, then calls
|
||||
select_best() which handles duration filtering and synced preference uniformly.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Generic, Optional, TypeVar
|
||||
|
||||
from ..config import DURATION_TOLERANCE_MS
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchCandidate(Generic[T]):
|
||||
"""A normalized search result for best-match selection.
|
||||
|
||||
Attributes:
|
||||
item: The original API-specific object (dict, ID, etc.)
|
||||
duration_ms: Track duration in milliseconds, or None if unknown.
|
||||
is_synced: Whether this candidate is known to have synced lyrics.
|
||||
"""
|
||||
|
||||
item: T
|
||||
duration_ms: Optional[float] = None
|
||||
is_synced: bool = False
|
||||
|
||||
|
||||
def select_best(
|
||||
candidates: list[SearchCandidate[T]],
|
||||
track_length_ms: Optional[int] = None,
|
||||
tolerance_ms: float = DURATION_TOLERANCE_MS,
|
||||
) -> Optional[T]:
|
||||
"""Pick the best candidate by duration proximity and sync preference.
|
||||
|
||||
When track_length_ms is available:
|
||||
- Filter by tolerance_ms
|
||||
- Pick closest duration, prefer synced at equal distance
|
||||
When track_length_ms is unavailable:
|
||||
- Pick first synced candidate, or first overall
|
||||
"""
|
||||
if track_length_ms is not None:
|
||||
best: Optional[SearchCandidate[T]] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for c in candidates:
|
||||
if c.duration_ms is None:
|
||||
continue
|
||||
diff = abs(c.duration_ms - track_length_ms)
|
||||
if diff > tolerance_ms:
|
||||
continue
|
||||
if diff < best_diff or (
|
||||
diff == best_diff
|
||||
and c.is_synced
|
||||
and (best is None or not best.is_synced)
|
||||
):
|
||||
best_diff = diff
|
||||
best = c
|
||||
|
||||
return best.item if best is not None else None
|
||||
|
||||
# No duration — prefer synced, fallback to first
|
||||
for c in candidates:
|
||||
if c.is_synced:
|
||||
return c.item
|
||||
return candidates[0].item if candidates else None
|
||||
@@ -0,0 +1,92 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from lrx_cli.fetchers.selection import SearchCandidate, select_best
|
||||
|
||||
|
||||
def test_picks_closest_duration_within_tolerance() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="far", duration_ms=10000.0),
|
||||
SearchCandidate(item="close", duration_ms=5100.0),
|
||||
SearchCandidate(item="exact", duration_ms=5000.0),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=5000) == "exact"
|
||||
|
||||
|
||||
def test_filters_out_candidates_beyond_tolerance() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="too_far", duration_ms=100000.0),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=5000, tolerance_ms=2000) is None
|
||||
|
||||
|
||||
def test_prefers_synced_at_equal_duration() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="unsynced", duration_ms=5000.0, is_synced=False),
|
||||
SearchCandidate(item="synced", duration_ms=5000.0, is_synced=True),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=5000) == "synced"
|
||||
|
||||
|
||||
def test_closer_duration_wins_over_synced() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="synced_far", duration_ms=6000.0, is_synced=True),
|
||||
SearchCandidate(item="unsynced_close", duration_ms=5001.0, is_synced=False),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=5000) == "unsynced_close"
|
||||
|
||||
|
||||
def test_skips_candidates_without_duration_when_track_length_given() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="no_dur", duration_ms=None),
|
||||
SearchCandidate(item="has_dur", duration_ms=5000.0),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=5000) == "has_dur"
|
||||
|
||||
|
||||
def test_returns_none_when_all_lack_duration_and_track_length_given() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="a", duration_ms=None),
|
||||
SearchCandidate(item="b", duration_ms=None),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=5000) is None
|
||||
|
||||
|
||||
def test_prefers_synced_when_no_track_length() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="unsynced", is_synced=False),
|
||||
SearchCandidate(item="synced", is_synced=True),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=None) == "synced"
|
||||
|
||||
|
||||
def test_falls_back_to_first_when_none_synced() -> None:
|
||||
candidates = [
|
||||
SearchCandidate(item="first"),
|
||||
SearchCandidate(item="second"),
|
||||
]
|
||||
assert select_best(candidates, track_length_ms=None) == "first"
|
||||
|
||||
|
||||
def test_empty_candidates_returns_none() -> None:
|
||||
assert select_best([], track_length_ms=5000) is None
|
||||
assert select_best([], track_length_ms=None) is None
|
||||
|
||||
|
||||
def test_single_candidate_within_tolerance() -> None:
|
||||
candidates = [SearchCandidate(item="only", duration_ms=5000.0)]
|
||||
assert select_best(candidates, track_length_ms=5000) == "only"
|
||||
|
||||
|
||||
def test_single_candidate_beyond_tolerance() -> None:
|
||||
candidates = [SearchCandidate(item="only", duration_ms=99999.0)]
|
||||
assert select_best(candidates, track_length_ms=5000, tolerance_ms=1000) is None
|
||||
|
||||
|
||||
def test_generic_type_preserved() -> None:
|
||||
"""select_best returns the same type as SearchCandidate.item."""
|
||||
int_candidates = [SearchCandidate(item=42, duration_ms=5000.0)]
|
||||
assert select_best(int_candidates, track_length_ms=5000) == 42
|
||||
|
||||
dict_candidates = [SearchCandidate(item={"id": 1}, duration_ms=5000.0)]
|
||||
result = select_best(dict_candidates, track_length_ms=5000)
|
||||
assert result == {"id": 1}
|
||||
Reference in New Issue
Block a user