feat: fetcher: try multiple candidates rather just the best one

This commit is contained in:
2026-04-03 22:16:49 +02:00
parent 89553a6da6
commit 2f8004581b
8 changed files with 176 additions and 43 deletions
+4
View File
@@ -49,6 +49,10 @@ SCORE_W_SYNCED = 10.0
MIN_CONFIDENCE = 25.0 # below this, candidate is rejected MIN_CONFIDENCE = 25.0 # below this, candidate is rejected
HIGH_CONFIDENCE = 80.0 # at or above this, stop searching early HIGH_CONFIDENCE = 80.0 # at or above this, stop searching early
# Multi-candidate fetching
MULTI_CANDIDATE_LIMIT = 3 # max candidates to try per search-based fetcher
MULTI_CANDIDATE_DELAY_S = 0.2 # delay between sequential lyric fetches
# Legacy cache rows (no confidence stored) get a base score by sync status # Legacy cache rows (no confidence stored) get a base score by sync status
LEGACY_CONFIDENCE_SYNCED = 50.0 LEGACY_CONFIDENCE_SYNCED = 50.0
LEGACY_CONFIDENCE_UNSYNCED = 40.0 LEGACY_CONFIDENCE_UNSYNCED = 40.0
+1 -1
View File
@@ -29,7 +29,7 @@ FetcherMethodType = Literal[
] ]
# Fetchers within a group run in parallel; groups run sequentially. # Fetchers within a group run in parallel; groups run sequentially.
# A group that produces any positive result stops the pipeline. # A group that produces any trusted and synced result stops the pipeline.
_FETCHER_GROUPS: list[list[FetcherMethodType]] = [ _FETCHER_GROUPS: list[list[FetcherMethodType]] = [
["local"], ["local"],
["cache-search"], ["cache-search"],
+30 -19
View File
@@ -12,18 +12,20 @@ Search results are filtered by duration when the track has a known length
to avoid returning lyrics for the wrong version of a song. to avoid returning lyrics for the wrong version of a song.
""" """
import asyncio
from typing import Optional from typing import Optional
import httpx import httpx
from loguru import logger from loguru import logger
from .base import BaseFetcher from .base import BaseFetcher
from .selection import SearchCandidate, select_best from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, HTTP_TIMEOUT,
TTL_NOT_FOUND, TTL_NOT_FOUND,
TTL_NETWORK_ERROR, TTL_NETWORK_ERROR,
MULTI_CANDIDATE_DELAY_S,
NETEASE_SEARCH_URL, NETEASE_SEARCH_URL,
NETEASE_LYRIC_URL, NETEASE_LYRIC_URL,
UA_BROWSER, UA_BROWSER,
@@ -45,10 +47,10 @@ class NeteaseFetcher(BaseFetcher):
async def _search( async def _search(
self, track: TrackMeta, limit: int = 10 self, track: TrackMeta, limit: int = 10
) -> tuple[Optional[int], float]: ) -> list[tuple[int, float]]:
query = f"{track.artist or ''} {track.title or ''}".strip() query = f"{track.artist or ''} {track.title or ''}".strip()
if not query: if not query:
return None, 0.0 return []
logger.debug(f"Netease: searching for '{query}' (limit={limit})") logger.debug(f"Netease: searching for '{query}' (limit={limit})")
@@ -66,23 +68,23 @@ class NeteaseFetcher(BaseFetcher):
logger.error( logger.error(
f"Netease: search returned non-dict: {type(result).__name__}" f"Netease: search returned non-dict: {type(result).__name__}"
) )
return None, 0.0 return []
result_body = result.get("result") result_body = result.get("result")
if not isinstance(result_body, dict): if not isinstance(result_body, dict):
logger.debug("Netease: search 'result' field missing or invalid") logger.debug("Netease: search 'result' field missing or invalid")
return None, 0.0 return []
songs = result_body.get("songs") songs = result_body.get("songs")
if not isinstance(songs, list) or len(songs) == 0: if not isinstance(songs, list) or len(songs) == 0:
logger.debug("Netease: search returned 0 results") logger.debug("Netease: search returned 0 results")
return None, 0.0 return []
logger.debug(f"Netease: search returned {len(songs)} candidates") logger.debug(f"Netease: search returned {len(songs)} candidates")
candidates = [ candidates = [
SearchCandidate( SearchCandidate(
item=song.get("id"), item=song_id,
duration_ms=float(song["dt"]) duration_ms=float(song["dt"])
if isinstance(song.get("dt"), int) if isinstance(song.get("dt"), int)
else None, else None,
@@ -92,27 +94,27 @@ class NeteaseFetcher(BaseFetcher):
album=(song.get("al") or {}).get("name"), album=(song.get("al") or {}).get("name"),
) )
for song in songs for song in songs
if isinstance(song, dict) and song.get("id") is not None if isinstance(song, dict) and isinstance(song_id := song.get("id"), int)
] ]
best_id, confidence = select_best( ranked = select_ranked(
candidates, candidates,
track.length, track.length,
title=track.title, title=track.title,
artist=track.artist, artist=track.artist,
album=track.album, album=track.album,
) )
if best_id is not None: if ranked:
logger.debug( logger.debug(
f"Netease: selected id={best_id} (confidence={confidence:.0f})" "Netease: top candidates: "
+ ", ".join(f"id={i} ({c:.0f})" for i, c in ranked)
) )
return best_id, confidence else:
logger.debug("Netease: no suitable candidate found")
logger.debug("Netease: no suitable candidate found") return ranked
return None, 0.0
except Exception as e: except Exception as e:
logger.error(f"Netease: search failed: {e}") logger.error(f"Netease: search failed: {e}")
return None, 0.0 return []
async def _get_lyric( async def _get_lyric(
self, song_id: int, confidence: float = 0.0 self, song_id: int, confidence: float = 0.0
@@ -185,9 +187,18 @@ class NeteaseFetcher(BaseFetcher):
return None return None
logger.info(f"Netease: fetching lyrics for {track.display_name()}") logger.info(f"Netease: fetching lyrics for {track.display_name()}")
song_id, confidence = await self._search(track) candidates = await self._search(track)
if not song_id: if not candidates:
logger.debug(f"Netease: no match found for {track.display_name()}") logger.debug(f"Netease: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return await self._get_lyric(song_id, confidence=confidence) for i, (song_id, confidence) in enumerate(candidates):
if i > 0:
await asyncio.sleep(MULTI_CANDIDATE_DELAY_S)
result = await self._get_lyric(song_id, confidence=confidence)
if result is None or result.status == CacheStatus.NETWORK_ERROR:
return result
if result.status != CacheStatus.NOT_FOUND:
return result
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+29 -18
View File
@@ -11,18 +11,20 @@ The base URL is read from the QQ_MUSIC_API_URL environment variable.
Search → pick best match by duration → fetch LRC lyrics. Search → pick best match by duration → fetch LRC lyrics.
""" """
import asyncio
from typing import Optional from typing import Optional
import httpx import httpx
from loguru import logger from loguru import logger
from .base import BaseFetcher from .base import BaseFetcher
from .selection import SearchCandidate, select_best from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, HTTP_TIMEOUT,
TTL_NOT_FOUND, TTL_NOT_FOUND,
TTL_NETWORK_ERROR, TTL_NETWORK_ERROR,
MULTI_CANDIDATE_DELAY_S,
QQ_MUSIC_API_URL, QQ_MUSIC_API_URL,
) )
@@ -37,10 +39,10 @@ class QQMusicFetcher(BaseFetcher):
async def _search( async def _search(
self, track: TrackMeta, limit: int = 10 self, track: TrackMeta, limit: int = 10
) -> tuple[Optional[str], float]: ) -> list[tuple[str, float]]:
query = f"{track.artist or ''} {track.title or ''}".strip() query = f"{track.artist or ''} {track.title or ''}".strip()
if not query: if not query:
return None, 0.0 return []
logger.debug(f"QQMusic: searching for '{query}' (limit={limit})") logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
@@ -55,18 +57,18 @@ class QQMusicFetcher(BaseFetcher):
if data.get("code") != 0: if data.get("code") != 0:
logger.error(f"QQMusic: search API error: {data}") logger.error(f"QQMusic: search API error: {data}")
return None, 0.0 return []
songs = data.get("data", {}).get("list", []) songs = data.get("data", {}).get("list", [])
if not songs: if not songs:
logger.debug("QQMusic: search returned 0 results") logger.debug("QQMusic: search returned 0 results")
return None, 0.0 return []
logger.debug(f"QQMusic: search returned {len(songs)} candidates") logger.debug(f"QQMusic: search returned {len(songs)} candidates")
candidates = [ candidates = [
SearchCandidate( SearchCandidate(
item=song.get("mid"), item=mid,
duration_ms=float(song["interval"]) * 1000 duration_ms=float(song["interval"]) * 1000
if isinstance(song.get("interval"), int) if isinstance(song.get("interval"), int)
else None, else None,
@@ -76,27 +78,27 @@ class QQMusicFetcher(BaseFetcher):
album=(song.get("album") or {}).get("name"), album=(song.get("album") or {}).get("name"),
) )
for song in songs for song in songs
if isinstance(song, dict) and song.get("mid") is not None if isinstance(song, dict) and isinstance(mid := song.get("mid"), str)
] ]
best_mid, confidence = select_best( ranked = select_ranked(
candidates, candidates,
track.length, track.length,
title=track.title, title=track.title,
artist=track.artist, artist=track.artist,
album=track.album, album=track.album,
) )
if best_mid is not None: if ranked:
logger.debug( logger.debug(
f"QQMusic: selected mid={best_mid} (confidence={confidence:.0f})" "QQMusic: top candidates: "
+ ", ".join(f"mid={m} ({c:.0f})" for m, c in ranked)
) )
return best_mid, confidence else:
logger.debug("QQMusic: no suitable candidate found")
logger.debug("QQMusic: no suitable candidate found") return ranked
return None, 0.0
except Exception as e: except Exception as e:
logger.error(f"QQMusic: search failed: {e}") logger.error(f"QQMusic: search failed: {e}")
return None, 0.0 return []
async def _get_lyric( async def _get_lyric(
self, mid: str, confidence: float = 0.0 self, mid: str, confidence: float = 0.0
@@ -152,9 +154,18 @@ class QQMusicFetcher(BaseFetcher):
return None return None
logger.info(f"QQMusic: fetching lyrics for {track.display_name()}") logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
mid, confidence = await self._search(track) candidates = await self._search(track)
if not mid: if not candidates:
logger.debug(f"QQMusic: no match found for {track.display_name()}") logger.debug(f"QQMusic: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return await self._get_lyric(mid, confidence=confidence) for i, (mid, confidence) in enumerate(candidates):
if i > 0:
await asyncio.sleep(MULTI_CANDIDATE_DELAY_S)
result = await self._get_lyric(mid, confidence=confidence)
if result is None or result.status == CacheStatus.NETWORK_ERROR:
return result
if result.status != CacheStatus.NOT_FOUND:
return result
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+27
View File
@@ -11,6 +11,7 @@ from typing import Generic, Optional, TypeVar
from ..config import ( from ..config import (
DURATION_TOLERANCE_MS, DURATION_TOLERANCE_MS,
MULTI_CANDIDATE_LIMIT,
SCORE_W_TITLE as _W_TITLE, SCORE_W_TITLE as _W_TITLE,
SCORE_W_ARTIST as _W_ARTIST, SCORE_W_ARTIST as _W_ARTIST,
SCORE_W_ALBUM as _W_ALBUM, SCORE_W_ALBUM as _W_ALBUM,
@@ -143,6 +144,32 @@ def _score_candidate(
return metadata_score + synced_score return metadata_score + synced_score
def select_ranked(
candidates: list[SearchCandidate[T]],
track_length_ms: Optional[int] = None,
*,
title: Optional[str] = None,
artist: Optional[str] = None,
album: Optional[str] = None,
min_confidence: float = MIN_CONFIDENCE,
max_results: int = MULTI_CANDIDATE_LIMIT,
) -> list[tuple[T, float]]:
"""Score candidates and return top max_results above min_confidence, sorted by score descending."""
scored: list[tuple[T, float]] = []
for c in candidates:
if (
track_length_ms is not None
and c.duration_ms is not None
and abs(c.duration_ms - track_length_ms) > DURATION_TOLERANCE_MS
):
continue
s = _score_candidate(c, title, artist, album, track_length_ms)
if s >= min_confidence:
scored.append((c.item, s))
scored.sort(key=lambda x: x[1], reverse=True)
return scored[:max_results]
def select_best( def select_best(
candidates: list[SearchCandidate[T]], candidates: list[SearchCandidate[T]],
track_length_ms: Optional[int] = None, track_length_ms: Optional[int] = None,
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "lrx-cli" name = "lrx-cli"
version = "0.4.1" version = "0.4.2"
description = "Fetch line-synced lyrics for your music player." description = "Fetch line-synced lyrics for your music player."
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
+83 -3
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from lrx_cli.fetchers.selection import ( from lrx_cli.fetchers.selection import (
SearchCandidate, SearchCandidate,
select_best, select_best,
select_ranked,
_score_candidate, _score_candidate,
_text_similarity, _text_similarity,
MIN_CONFIDENCE, MIN_CONFIDENCE,
@@ -407,9 +408,6 @@ def test_netease_without_ref_metadata_rejects_below_confidence() -> None:
assert best is None assert best is None
# --- Edge cases ---
def test_empty_candidates_returns_none() -> None: def test_empty_candidates_returns_none() -> None:
assert select_best([], track_length_ms=5000) == (None, 0.0) assert select_best([], track_length_ms=5000) == (None, 0.0)
assert select_best([], track_length_ms=None) == (None, 0.0) assert select_best([], track_length_ms=None) == (None, 0.0)
@@ -445,3 +443,85 @@ def test_generic_type_preserved() -> None:
dict_candidates = [SearchCandidate(item={"id": 1}, title="x")] dict_candidates = [SearchCandidate(item={"id": 1}, title="x")]
best, _ = select_best(dict_candidates, title="x") best, _ = select_best(dict_candidates, title="x")
assert best == {"id": 1} assert best == {"id": 1}
def test_select_ranked_empty_input() -> None:
assert select_ranked([]) == []
def test_select_ranked_all_below_confidence() -> None:
"""All candidates below threshold → empty list."""
candidates = [
SearchCandidate(item="x", title="Completely Different", duration_ms=999999.0)
]
result = select_ranked(
candidates, 232000, title="My Love", artist="Westlife", min_confidence=90.0
)
assert result == []
def test_select_ranked_sorted_descending() -> None:
"""Results are ordered highest score first."""
candidates = _netease_candidates()
ranked = select_ranked(
candidates,
_REF_LENGTH,
title=_REF_TITLE,
artist=_REF_ARTIST,
album=_REF_ALBUM,
)
assert len(ranked) >= 2
scores = [score for _, score in ranked]
assert scores == sorted(scores, reverse=True)
def test_select_ranked_respects_max_results() -> None:
candidates = _netease_candidates()
ranked = select_ranked(
candidates,
_REF_LENGTH,
title=_REF_TITLE,
artist=_REF_ARTIST,
album=_REF_ALBUM,
max_results=2,
)
assert len(ranked) <= 2
def test_select_ranked_consistent_with_select_best() -> None:
"""First result of select_ranked matches select_best."""
candidates = _netease_candidates()
kwargs = dict(title=_REF_TITLE, artist=_REF_ARTIST, album=_REF_ALBUM)
ranked = select_ranked(candidates, _REF_LENGTH, **kwargs) # type: ignore
best_item, best_score = select_best(candidates, _REF_LENGTH, **kwargs) # type: ignore
assert ranked[0] == (best_item, best_score)
def test_select_ranked_duration_hard_filter_applies() -> None:
"""Candidates outside duration tolerance are excluded from ranked results."""
candidates = _netease_candidates()
ranked = select_ranked(
candidates,
_REF_LENGTH,
title=_REF_TITLE,
artist=_REF_ARTIST,
album=_REF_ALBUM,
)
ids = [item for item, _ in ranked]
# 29809886 (dt=262000, diff=30000ms) and 20707713 (dt=241116, diff=9116ms)
# both exceed DURATION_TOLERANCE_MS=3000 → must not appear
assert 29809886 not in ids
assert 20707713 not in ids
def test_select_ranked_netease_top_is_best_duration_match() -> None:
"""2080607 (diff=59ms) should rank first over 572412968 (diff=1000ms)."""
candidates = _netease_candidates()
ranked = select_ranked(
candidates,
_REF_LENGTH,
title=_REF_TITLE,
artist=_REF_ARTIST,
album=_REF_ALBUM,
)
assert ranked[0][0] == 2080607
Generated
+1 -1
View File
@@ -153,7 +153,7 @@ wheels = [
[[package]] [[package]]
name = "lrx-cli" name = "lrx-cli"
version = "0.4.0" version = "0.4.1"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "cyclopts" }, { name = "cyclopts" },