feat: implement cache-search fetcher for cross-album fuzzy lookup

This commit is contained in:
2026-03-28 06:21:31 +01:00
parent 4182229ae2
commit 05d7def249
3 changed files with 181 additions and 11 deletions
+103 -6
View File
@@ -4,15 +4,33 @@ Date: 2026-03-25 10:18:03
Description: SQLite-based lyric cache with per-source storage and TTL expiration
"""
import re
import sqlite3
import hashlib
import time
import unicodedata
from typing import Optional
from loguru import logger
from .config import DB_PATH
from .config import DB_PATH, DURATION_TOLERANCE_MS
from .models import TrackMeta, LyricResult, CacheStatus
# Punctuation to strip for fuzzy matching (ASCII + common fullwidth)
_PUNCT_RE = re.compile(r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`]")
_SPACE_RE = re.compile(r"\s+")
def _normalize_for_match(s: str) -> str:
"""Normalize a string for fuzzy comparison.
Lowercases, NFKC-normalizes (fullwidth → halfwidth), strips punctuation,
and collapses whitespace.
"""
s = unicodedata.normalize("NFKC", s).lower()
s = _PUNCT_RE.sub("", s)
s = _SPACE_RE.sub(" ", s).strip()
return s
def _generate_key(track: TrackMeta, source: str) -> str:
"""Generate a unique cache key from track metadata and source.
@@ -64,9 +82,14 @@ class CacheEngine:
expires_at INTEGER,
artist TEXT,
title TEXT,
album TEXT
album TEXT,
length INTEGER
)
""")
# Migration: add length column if missing
cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
if "length" not in cols:
conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER")
conn.commit()
# Read
@@ -83,7 +106,7 @@ class CacheEngine:
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT status, lyrics, source, expires_at FROM cache WHERE key = ?",
"SELECT status, lyrics, source, expires_at, length FROM cache WHERE key = ?",
(key,),
).fetchone()
@@ -91,7 +114,7 @@ class CacheEngine:
logger.debug(f"Cache miss: {source} / {track.display_name()}")
return None
status_str, lyrics, src, expires_at = row
status_str, lyrics, src, expires_at, cached_length = row
# Check TTL expiration
if expires_at and expires_at < int(time.time()):
@@ -100,6 +123,14 @@ class CacheEngine:
conn.commit()
return None
# Backfill length if the cached row is missing it
if cached_length is None and track.length is not None:
conn.execute(
"UPDATE cache SET length = ? WHERE key = ?",
(track.length, key),
)
conn.commit()
remaining = expires_at - int(time.time()) if expires_at else None
logger.debug(
f"Cache hit: {source} / {track.display_name()} "
@@ -152,8 +183,8 @@ class CacheEngine:
conn.execute(
"""INSERT OR REPLACE INTO cache
(key, source, status, lyrics, created_at, expires_at,
artist, title, album)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
artist, title, album, length)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
key,
source,
@@ -164,6 +195,7 @@ class CacheEngine:
track.artist,
track.title,
track.album,
track.length,
),
)
conn.commit()
@@ -226,6 +258,71 @@ class CacheEngine:
params.append(track.album)
return conditions, params
# Fuzzy search
def search_by_meta(
self,
artist: Optional[str],
title: Optional[str],
length: Optional[int] = None,
) -> list[dict]:
"""Search cache for lyrics matching artist/title with fuzzy normalization.
Ignores album and source. Only returns positive results (synced/unsynced)
that have not expired. When *length* is provided, filters by duration
tolerance and sorts by closest match.
"""
if not title:
return []
now = int(time.time())
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""SELECT * FROM cache
WHERE status IN (?, ?)
AND (expires_at IS NULL OR expires_at > ?)""",
(
CacheStatus.SUCCESS_SYNCED.value,
CacheStatus.SUCCESS_UNSYNCED.value,
now,
),
).fetchall()
norm_title = _normalize_for_match(title)
norm_artist = _normalize_for_match(artist) if artist else None
matches: list[dict] = []
for row in rows:
row_dict = dict(row)
# Title must match
row_title = row_dict.get("title") or ""
if _normalize_for_match(row_title) != norm_title:
continue
# Artist must match if provided
if norm_artist:
row_artist = row_dict.get("artist") or ""
if _normalize_for_match(row_artist) != norm_artist:
continue
matches.append(row_dict)
# Duration filtering
if length is not None and matches:
scored = []
for m in matches:
row_len = m.get("length")
if row_len is not None:
diff = abs(row_len - length)
if diff <= DURATION_TOLERANCE_MS:
scored.append((diff, m))
else:
# No duration info in cache — still a candidate but lower priority
scored.append((DURATION_TOLERANCE_MS, m))
scored.sort(key=lambda x: (x[0], x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value))
matches = [m for _, m in scored]
return matches
# Query / inspect
def query_track(self, track: TrackMeta) -> list[dict]: