feat: add exact metadata match for cache search in CacheSearchFetcher

This commit is contained in:
2026-03-28 06:54:30 +01:00
parent 05d7def249
commit a74bf885a2
3 changed files with 57 additions and 3 deletions
+48 -2
View File
@@ -16,7 +16,9 @@ from .config import DB_PATH, DURATION_TOLERANCE_MS
from .models import TrackMeta, LyricResult, CacheStatus
# Punctuation to strip for fuzzy matching (ASCII + common fullwidth)
_PUNCT_RE = re.compile(r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`]")
_PUNCT_RE = re.compile(
r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`]"
)
_SPACE_RE = re.compile(r"\s+")
@@ -258,6 +260,45 @@ class CacheEngine:
params.append(track.album)
return conditions, params
# Exact cross-source search
def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]:
"""Find the best positive (synced/unsynced) cache entry for *track*.
Uses exact metadata match (artist + title + album) across all sources.
Returns synced if available, otherwise unsynced, or None.
"""
conditions, params = self._track_where(track)
if not conditions:
return None
now = int(time.time())
conditions.append("status IN (?, ?)")
params.extend(
[CacheStatus.SUCCESS_SYNCED.value, CacheStatus.SUCCESS_UNSYNCED.value]
)
conditions.append("(expires_at IS NULL OR expires_at > ?)")
params.append(str(now))
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
f"SELECT status, lyrics, source FROM cache WHERE {where} "
"ORDER BY CASE status WHEN ? THEN 0 ELSE 1 END LIMIT 1",
params + [CacheStatus.SUCCESS_SYNCED.value],
).fetchall()
if not rows:
return None
row = dict(rows[0])
return LyricResult(
status=CacheStatus(row["status"]),
lyrics=row["lyrics"],
source="cache-search",
)
# Fuzzy search
def search_by_meta(
@@ -318,7 +359,12 @@ class CacheEngine:
else:
# No duration info in cache — still a candidate but lower priority
scored.append((DURATION_TOLERANCE_MS, m))
scored.sort(key=lambda x: (x[0], x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value))
scored.sort(
key=lambda x: (
x[0],
x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value,
)
)
matches = [m for _, m in scored]
return matches