276 lines
9.7 KiB
Python
276 lines
9.7 KiB
Python
"""SQLite-based lyric cache with per-source storage and TTL expiration."""
|
|
|
|
import sqlite3
|
|
import hashlib
|
|
import time
|
|
from typing import Optional
|
|
from lrcfetch.config import DB_PATH
|
|
from lrcfetch.models import TrackMeta, LyricResult, CacheStatus
|
|
from loguru import logger
|
|
|
|
|
|
def _generate_key(track: TrackMeta, source: str) -> str:
|
|
"""Generate a unique cache key from track metadata and source.
|
|
|
|
The key is scoped by source so that different fetchers can cache
|
|
independently for the same track (e.g. Spotify synced vs Netease unsynced).
|
|
"""
|
|
# Spotify tracks always use their track ID as the primary identifier
|
|
if track.trackid and source == "spotify":
|
|
return f"spotify:{track.trackid}"
|
|
|
|
parts = []
|
|
if track.artist:
|
|
parts.append(track.artist)
|
|
if track.title:
|
|
parts.append(track.title)
|
|
if track.album:
|
|
parts.append(track.album)
|
|
if track.length:
|
|
parts.append(str(track.length))
|
|
|
|
# Fall back to URL for local files
|
|
if not parts and track.url:
|
|
return f"{source}:url:{track.url}"
|
|
|
|
if not parts:
|
|
raise ValueError("Insufficient metadata to generate cache key")
|
|
|
|
raw = "|".join(parts)
|
|
digest = hashlib.sha256(raw.encode()).hexdigest()
|
|
return f"{source}:{digest}"
|
|
|
|
|
|
class CacheEngine:
|
|
def __init__(self, db_path: str = DB_PATH):
|
|
self.db_path = db_path
|
|
self._init_db()
|
|
|
|
def _init_db(self) -> None:
|
|
"""Create or migrate the cache table."""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS cache (
|
|
key TEXT PRIMARY KEY,
|
|
source TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
lyrics TEXT,
|
|
created_at INTEGER NOT NULL,
|
|
expires_at INTEGER,
|
|
artist TEXT,
|
|
title TEXT,
|
|
album TEXT
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Read
|
|
# ------------------------------------------------------------------
|
|
|
|
def get(self, track: TrackMeta, source: str) -> Optional[LyricResult]:
|
|
"""Look up a cached result for *track* from *source*.
|
|
|
|
Returns None on cache miss or expiration.
|
|
"""
|
|
try:
|
|
key = _generate_key(track, source)
|
|
except ValueError:
|
|
return None
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
row = conn.execute(
|
|
"SELECT status, lyrics, source, expires_at FROM cache WHERE key = ?",
|
|
(key,),
|
|
).fetchone()
|
|
|
|
if not row:
|
|
logger.debug(f"Cache miss: {source} / {track.display_name()}")
|
|
return None
|
|
|
|
status_str, lyrics, src, expires_at = row
|
|
|
|
# Check TTL expiration
|
|
if expires_at and expires_at < int(time.time()):
|
|
logger.debug(f"Cache expired: {source} / {track.display_name()}")
|
|
conn.execute("DELETE FROM cache WHERE key = ?", (key,))
|
|
conn.commit()
|
|
return None
|
|
|
|
remaining = expires_at - int(time.time()) if expires_at else None
|
|
logger.debug(
|
|
f"Cache hit: {source} / {track.display_name()} "
|
|
f"[{status_str}, ttl={remaining}s]"
|
|
)
|
|
return LyricResult(
|
|
status=CacheStatus(status_str),
|
|
lyrics=lyrics,
|
|
source=src,
|
|
ttl=remaining,
|
|
)
|
|
|
|
def get_best(self, track: TrackMeta, sources: list[str]) -> Optional[LyricResult]:
|
|
"""Return the best cached result across *sources* (synced > unsynced).
|
|
|
|
Skips negative statuses (NOT_FOUND, NETWORK_ERROR) — those are only
|
|
consulted per-source to avoid redundant fetches.
|
|
"""
|
|
best: Optional[LyricResult] = None
|
|
for src in sources:
|
|
cached = self.get(track, src)
|
|
if not cached:
|
|
continue
|
|
if cached.status == CacheStatus.SUCCESS_SYNCED:
|
|
return cached # Can't do better
|
|
if cached.status == CacheStatus.SUCCESS_UNSYNCED and best is None:
|
|
best = cached
|
|
return best
|
|
|
|
# ------------------------------------------------------------------
|
|
# Write
|
|
# ------------------------------------------------------------------
|
|
|
|
def set(
|
|
self,
|
|
track: TrackMeta,
|
|
source: str,
|
|
result: LyricResult,
|
|
ttl_seconds: Optional[int] = None,
|
|
) -> None:
|
|
"""Store a lyric result in the cache."""
|
|
try:
|
|
key = _generate_key(track, source)
|
|
except ValueError:
|
|
logger.warning("Cannot cache: insufficient track metadata.")
|
|
return
|
|
|
|
now = int(time.time())
|
|
expires_at = now + ttl_seconds if ttl_seconds else None
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute(
|
|
"""INSERT OR REPLACE INTO cache
|
|
(key, source, status, lyrics, created_at, expires_at,
|
|
artist, title, album)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
key,
|
|
source,
|
|
result.status.value,
|
|
result.lyrics,
|
|
now,
|
|
expires_at,
|
|
track.artist,
|
|
track.title,
|
|
track.album,
|
|
),
|
|
)
|
|
conn.commit()
|
|
logger.debug(
|
|
f"Cached: {source} / {track.display_name()} "
|
|
f"[{result.status.value}, ttl={ttl_seconds}s]"
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Delete
|
|
# ------------------------------------------------------------------
|
|
|
|
def clear_all(self) -> None:
|
|
"""Remove every entry from the cache."""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute("DELETE FROM cache")
|
|
conn.commit()
|
|
logger.info("Cache cleared.")
|
|
|
|
def clear_track(self, track: TrackMeta) -> None:
|
|
"""Remove all cached entries (every source) for a single track."""
|
|
conditions, params = self._track_where(track)
|
|
if not conditions:
|
|
logger.info(f"No cache entries found for {track.display_name()}.")
|
|
return
|
|
where = " AND ".join(conditions)
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cur = conn.execute(f"DELETE FROM cache WHERE {where}", params)
|
|
conn.commit()
|
|
if cur.rowcount:
|
|
logger.info(f"Cleared {cur.rowcount} cache entries for {track.display_name()}.")
|
|
else:
|
|
logger.info(f"No cache entries found for {track.display_name()}.")
|
|
|
|
def prune(self) -> int:
|
|
"""Remove all expired entries. Returns the number of rows deleted."""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cur = conn.execute(
|
|
"DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
|
|
(int(time.time()),),
|
|
)
|
|
conn.commit()
|
|
count = cur.rowcount
|
|
logger.info(f"Pruned {count} expired cache entries.")
|
|
return count
|
|
|
|
@staticmethod
|
|
def _track_where(track: TrackMeta) -> tuple[list[str], list[str]]:
|
|
"""Build WHERE conditions to match a track across all sources."""
|
|
conditions: list[str] = []
|
|
params: list[str] = []
|
|
if track.artist:
|
|
conditions.append("artist = ?")
|
|
params.append(track.artist)
|
|
if track.title:
|
|
conditions.append("title = ?")
|
|
params.append(track.title)
|
|
if track.album:
|
|
conditions.append("album = ?")
|
|
params.append(track.album)
|
|
return conditions, params
|
|
|
|
# ------------------------------------------------------------------
|
|
# Query / inspect
|
|
# ------------------------------------------------------------------
|
|
|
|
def query_track(self, track: TrackMeta) -> list[dict]:
|
|
"""Return all cached rows for a given track (across all sources)."""
|
|
conditions, params = self._track_where(track)
|
|
if not conditions:
|
|
return []
|
|
where = " AND ".join(conditions)
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
return [dict(r) for r in conn.execute(
|
|
f"SELECT * FROM cache WHERE {where}", params
|
|
).fetchall()]
|
|
|
|
def query_all(self) -> list[dict]:
|
|
"""Return every row in the cache table."""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
|
|
|
|
def stats(self) -> dict:
|
|
"""Return aggregate cache statistics."""
|
|
now = int(time.time())
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
|
|
expired = conn.execute(
|
|
"SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
|
|
(now,),
|
|
).fetchone()[0]
|
|
by_status = dict(
|
|
conn.execute(
|
|
"SELECT status, COUNT(*) FROM cache GROUP BY status"
|
|
).fetchall()
|
|
)
|
|
by_source = dict(
|
|
conn.execute(
|
|
"SELECT source, COUNT(*) FROM cache GROUP BY source"
|
|
).fetchall()
|
|
)
|
|
return {
|
|
"total": total,
|
|
"expired": expired,
|
|
"active": total - expired,
|
|
"by_status": by_status,
|
|
"by_source": by_source,
|
|
}
|