test: add basic tests for cache-search & network fetchers

test: enable debug
test: add pytest.ini to mark tests that require network
fix: cache-search should not aquire full match of artists due to different translations
fix: replace dynamic f-string SQL in track WHERE clauses with parameterized nullable conditions
refactor: lrc.py should never directly call print, return a str instead
chore: add requirements.txt (via 'uv export')
chore: update README.md with dev instructions
This commit is contained in:
2026-04-05 09:42:42 +02:00
parent 84a3e1076e
commit 0d56cde927
12 changed files with 360 additions and 70 deletions
+55 -58
View File
@@ -13,7 +13,6 @@ from loguru import logger
from .lrc import LRCData
from .normalize import normalize_for_match as _normalize_for_match
from .normalize import normalize_artist as _normalize_artist
from .config import (
DURATION_TOLERANCE_MS,
LEGACY_CONFIDENCE_SYNCED,
@@ -22,6 +21,26 @@ from .config import (
from .models import TrackMeta, LyricResult, CacheStatus
# Fixed WHERE clause for exact track matching. Column names are hardcoded
# literals; only the *values* come from user-supplied params — no injection risk.
_TRACK_WHERE = (
"(? IS NULL OR artist = ?) AND "
"(? IS NULL OR title = ?) AND "
"(? IS NULL OR album = ?)"
)
def _track_where_params(track: TrackMeta) -> list:
return [
track.artist,
track.artist,
track.title,
track.title,
track.album,
track.album,
]
def _generate_key(track: TrackMeta, source: str) -> str:
"""Generate a unique cache key from track metadata and source.
@@ -235,13 +254,14 @@ class CacheEngine:
def clear_track(self, track: TrackMeta) -> None:
"""Remove all cached entries (every source) for a single track."""
conditions, params = self._track_where(track)
if not conditions:
if not self._track_has_meta(track):
logger.info(f"No cache entries found for {track.display_name()}.")
return
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
cur = conn.execute(f"DELETE FROM cache WHERE {where}", params)
cur = conn.execute(
f"DELETE FROM cache WHERE {_TRACK_WHERE}",
_track_where_params(track),
)
conn.commit()
if cur.rowcount:
logger.info(
@@ -263,20 +283,8 @@ class CacheEngine:
return count
@staticmethod
def _track_where(track: TrackMeta) -> tuple[list[str], list[str]]:
"""Build WHERE conditions to match a track across all sources."""
conditions: list[str] = []
params: list[str] = []
if track.artist:
conditions.append("artist = ?")
params.append(track.artist)
if track.title:
conditions.append("title = ?")
params.append(track.title)
if track.album:
conditions.append("album = ?")
params.append(track.album)
return conditions, params
def _track_has_meta(track: TrackMeta) -> bool:
return bool(track.artist or track.title or track.album)
# Exact cross-source search
@@ -286,30 +294,27 @@ class CacheEngine:
Uses exact metadata match (artist + title + album) across all sources.
Returns the highest-confidence entry, or None.
"""
conditions, params = self._track_where(track)
if not conditions:
if not self._track_has_meta(track):
return None
now = int(time.time())
conditions.append("status IN (?, ?)")
params.extend(
[CacheStatus.SUCCESS_SYNCED.value, CacheStatus.SUCCESS_UNSYNCED.value]
)
conditions.append("(expires_at IS NULL OR expires_at > ?)")
params.append(str(now))
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
f"SELECT status, lyrics, source, confidence FROM cache WHERE {where} "
"ORDER BY COALESCE(confidence, "
" CASE status WHEN ? THEN ? ELSE ? END"
") DESC, "
"CASE status WHEN ? THEN 0 ELSE 1 END, "
"created_at DESC LIMIT 1",
params
f"SELECT status, lyrics, source, confidence FROM cache"
f" WHERE {_TRACK_WHERE}"
" AND status IN (?, ?)"
" AND (expires_at IS NULL OR expires_at > ?)"
" ORDER BY COALESCE(confidence,"
" CASE status WHEN ? THEN ? ELSE ? END"
" ) DESC,"
" CASE status WHEN ? THEN 0 ELSE 1 END,"
" created_at DESC LIMIT 1",
_track_where_params(track)
+ [
CacheStatus.SUCCESS_SYNCED.value,
CacheStatus.SUCCESS_UNSYNCED.value,
now,
CacheStatus.SUCCESS_SYNCED.value,
LEGACY_CONFIDENCE_SYNCED,
LEGACY_CONFIDENCE_UNSYNCED,
@@ -339,15 +344,18 @@ class CacheEngine:
def search_by_meta(
self,
artist: Optional[str],
title: Optional[str],
length: Optional[int] = None,
) -> list[dict]:
"""Search cache for lyrics matching artist/title with fuzzy normalization.
"""Search cache for lyrics matching title with fuzzy normalization.
Ignores album and source. Only returns positive results (synced/unsynced)
that have not expired. When *length* is provided, filters by duration
tolerance and sorts by closest match.
Artist is intentionally not filtered here — artist names can differ
significantly across languages (e.g. Japanese romanization vs. kanji),
making hard artist filtering unreliable for cross-language queries.
Ignores artist, album and source. Only returns positive results
(synced/unsynced) that have not expired. When *length* is provided,
filters by duration tolerance and sorts by closest match.
"""
if not title:
return []
@@ -367,7 +375,6 @@ class CacheEngine:
).fetchall()
norm_title = _normalize_for_match(title)
norm_artist = _normalize_artist(artist) if artist else None
matches: list[dict] = []
for row in rows:
@@ -376,11 +383,6 @@ class CacheEngine:
row_title = row_dict.get("title") or ""
if _normalize_for_match(row_title) != norm_title:
continue
# Artist must match if provided
if norm_artist:
row_artist = row_dict.get("artist") or ""
if _normalize_artist(row_artist) != norm_artist:
continue
matches.append(row_dict)
# Duration filtering
@@ -419,16 +421,12 @@ class CacheEngine:
Returns the number of rows updated.
"""
conditions, params = self._track_where(track)
if not conditions:
if not self._track_has_meta(track):
return 0
conditions.append("source = ?")
params.append(source)
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
cur = conn.execute(
f"UPDATE cache SET confidence = ? WHERE {where}",
[confidence] + params,
f"UPDATE cache SET confidence = ? WHERE {_TRACK_WHERE} AND source = ?",
[confidence] + _track_where_params(track) + [source],
)
conn.commit()
return cur.rowcount
@@ -437,16 +435,15 @@ class CacheEngine:
def query_track(self, track: TrackMeta) -> list[dict]:
"""Return all cached rows for a given track (across all sources)."""
conditions, params = self._track_where(track)
if not conditions:
if not self._track_has_meta(track):
return []
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
return [
dict(r)
for r in conn.execute(
f"SELECT * FROM cache WHERE {where}", params
f"SELECT * FROM cache WHERE {_TRACK_WHERE}",
_track_where_params(track),
).fetchall()
]