diff --git a/.claude/resume b/.claude/resume
new file mode 100644
index 0000000..87046c6
--- /dev/null
+++ b/.claude/resume
@@ -0,0 +1 @@
+claude --resume 48d54aac-a89b-48c3-8a76-23e9eb73722d
diff --git a/.env b/.env
new file mode 100644
index 0000000..211472d
--- /dev/null
+++ b/.env
@@ -0,0 +1 @@
+SPOTIFY_SP_DC="AQBzQVulTMDML5PjpVXadGZIgaPqfLPTxbv1ZSJI4ypO3UKA-ZcbDqU2omZybyH_vqlM67PM33_og7UEvrxEktk2u6ND1syOEPiXjPzr600KNOLVXr9hwuFD9RcXgOGm_aLaqUlYJcum9Zchjq-upKLmyEWCESdGoQGQT8_maQ4pZ-6wFiL5GaDn1WgpZgjW71zxCVqf8GuJoscSXWhUPo0jJ-1xQvyvVT3YooIf9i1mOG-vDnAFru6xeaH4O2mVWQN6PJPVYNG582E"
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..acf6d3b
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,3 @@
+{
+ "python-envs.defaultEnvManager": "ms-python.python:system"
+}
diff --git a/CLAUDE.md b/CLAUDE.md
new file mode 100644
index 0000000..f42724e
--- /dev/null
+++ b/CLAUDE.md
@@ -0,0 +1,3 @@
+# Claude.md
+
+The role of this file is to describe common mistakes and confusion points that agents might encounter as they work in this project. If you ever encounter something in the project that surprises you, please alert the developer working with you and indicate that this is the case in this file to help prevent future agents from having the same issue.
diff --git a/lrx/__init__.py b/lrx/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/lrx/__main__.py b/lrx/__main__.py
new file mode 100644
index 0000000..1407b6f
--- /dev/null
+++ b/lrx/__main__.py
@@ -0,0 +1,4 @@
+from lrx.cli import run
+
+if __name__ == "__main__":
+ run()
diff --git a/lrx/__pycache__/__init__.cpython-313.pyc b/lrx/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..1a3c3be
Binary files /dev/null and b/lrx/__pycache__/__init__.cpython-313.pyc differ
diff --git a/lrx/__pycache__/__init__.cpython-314.pyc b/lrx/__pycache__/__init__.cpython-314.pyc
new file mode 100644
index 0000000..4ebce98
Binary files /dev/null and b/lrx/__pycache__/__init__.cpython-314.pyc differ
diff --git a/lrx/__pycache__/__main__.cpython-313.pyc b/lrx/__pycache__/__main__.cpython-313.pyc
new file mode 100644
index 0000000..1c980ca
Binary files /dev/null and b/lrx/__pycache__/__main__.cpython-313.pyc differ
diff --git a/lrx/__pycache__/__main__.cpython-314.pyc b/lrx/__pycache__/__main__.cpython-314.pyc
new file mode 100644
index 0000000..e5908b0
Binary files /dev/null and b/lrx/__pycache__/__main__.cpython-314.pyc differ
diff --git a/lrx/__pycache__/cache.cpython-313.pyc b/lrx/__pycache__/cache.cpython-313.pyc
new file mode 100644
index 0000000..670e768
Binary files /dev/null and b/lrx/__pycache__/cache.cpython-313.pyc differ
diff --git a/lrx/__pycache__/cache.cpython-314.pyc b/lrx/__pycache__/cache.cpython-314.pyc
new file mode 100644
index 0000000..2f16fd4
Binary files /dev/null and b/lrx/__pycache__/cache.cpython-314.pyc differ
diff --git a/lrx/__pycache__/cli.cpython-313.pyc b/lrx/__pycache__/cli.cpython-313.pyc
new file mode 100644
index 0000000..6e82c18
Binary files /dev/null and b/lrx/__pycache__/cli.cpython-313.pyc differ
diff --git a/lrx/__pycache__/cli.cpython-314.pyc b/lrx/__pycache__/cli.cpython-314.pyc
new file mode 100644
index 0000000..9ac351b
Binary files /dev/null and b/lrx/__pycache__/cli.cpython-314.pyc differ
diff --git a/lrx/__pycache__/config.cpython-313.pyc b/lrx/__pycache__/config.cpython-313.pyc
new file mode 100644
index 0000000..18c04b3
Binary files /dev/null and b/lrx/__pycache__/config.cpython-313.pyc differ
diff --git a/lrx/__pycache__/core.cpython-313.pyc b/lrx/__pycache__/core.cpython-313.pyc
new file mode 100644
index 0000000..a638a93
Binary files /dev/null and b/lrx/__pycache__/core.cpython-313.pyc differ
diff --git a/lrx/__pycache__/lrc.cpython-313.pyc b/lrx/__pycache__/lrc.cpython-313.pyc
new file mode 100644
index 0000000..1d398ad
Binary files /dev/null and b/lrx/__pycache__/lrc.cpython-313.pyc differ
diff --git a/lrx/__pycache__/lrc.cpython-314.pyc b/lrx/__pycache__/lrc.cpython-314.pyc
new file mode 100644
index 0000000..6864851
Binary files /dev/null and b/lrx/__pycache__/lrc.cpython-314.pyc differ
diff --git a/lrx/__pycache__/models.cpython-313.pyc b/lrx/__pycache__/models.cpython-313.pyc
new file mode 100644
index 0000000..af046de
Binary files /dev/null and b/lrx/__pycache__/models.cpython-313.pyc differ
diff --git a/lrx/__pycache__/models.cpython-314.pyc b/lrx/__pycache__/models.cpython-314.pyc
new file mode 100644
index 0000000..14d49aa
Binary files /dev/null and b/lrx/__pycache__/models.cpython-314.pyc differ
diff --git a/lrx/__pycache__/mpris.cpython-313.pyc b/lrx/__pycache__/mpris.cpython-313.pyc
new file mode 100644
index 0000000..5fb6c2f
Binary files /dev/null and b/lrx/__pycache__/mpris.cpython-313.pyc differ
diff --git a/lrx/cache.py b/lrx/cache.py
new file mode 100644
index 0000000..7e16fc3
--- /dev/null
+++ b/lrx/cache.py
@@ -0,0 +1,441 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 10:18:03
+Description: SQLite-based lyric cache with per-source storage and TTL expiration
+"""
+
+import re
+import sqlite3
+import hashlib
+import time
+import unicodedata
+from typing import Optional
+from loguru import logger
+
+from .config import DB_PATH, DURATION_TOLERANCE_MS
+from .models import TrackMeta, LyricResult, CacheStatus
+
+# Punctuation to strip for fuzzy matching (ASCII + fullwidth + CJK brackets/symbols)
+_PUNCT_RE = re.compile(
+ r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`"
+ r"~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`"
+ r"「」『』《》〈〉〔〕·•‥…—–]"
+)
+_SPACE_RE = re.compile(r"\s+")
+# feat./ft./featuring and everything after (case-insensitive, word boundary)
+_FEAT_RE = re.compile(r"\s*(?:\bfeat\.?\b|\bft\.?\b|\bfeaturing\b).*", re.IGNORECASE)
+# Multi-artist separators: /, &, ×, x (surrounded by spaces), ;, 、, vs.
+_ARTIST_SEP_RE = re.compile(r"\s*(?:[/&;×、]|\bvs\.?\b|\bx\b)\s*", re.IGNORECASE)
+
+
+def _normalize_for_match(s: str) -> str:
+ """Normalize a string for fuzzy comparison.
+
+ Lowercases, NFKC-normalizes (fullwidth → halfwidth), strips punctuation,
+ and collapses whitespace.
+ """
+ s = unicodedata.normalize("NFKC", s).lower()
+ s = _FEAT_RE.sub("", s)
+ s = _PUNCT_RE.sub(" ", s)
+ s = _SPACE_RE.sub(" ", s).strip()
+ return s
+
+
+def _normalize_artist(s: str) -> str:
+ """Normalize an artist string: split by separators, normalize each, sort.
+
+ Splits first (on /, &, ;, ×, 、, vs., x), then strips feat./ft./featuring
+ from each part individually, so 'A feat. C / B' → ['a', 'b'] not just ['a'].
+ """
+ s = unicodedata.normalize("NFKC", s).lower()
+ parts = _ARTIST_SEP_RE.split(s)
+ normed = sorted(
+ {_normalize_for_match(p) for p in parts if _FEAT_RE.sub("", p).strip()}
+ )
+ return "\0".join(normed) if normed else _normalize_for_match(s)
+
+
+def _generate_key(track: TrackMeta, source: str) -> str:
+ """Generate a unique cache key from track metadata and source.
+
+ The key is scoped by source so that different fetchers can cache
+ independently for the same track (e.g. Spotify synced vs Netease unsynced).
+ """
+ # Spotify tracks always use their track ID as the primary identifier
+ if track.trackid and source == "spotify":
+ return f"spotify:{track.trackid}"
+
+ parts = []
+ if track.artist:
+ parts.append(track.artist)
+ if track.title:
+ parts.append(track.title)
+ if track.album:
+ parts.append(track.album)
+ if track.length:
+ parts.append(str(track.length))
+
+ # Fall back to URL for local files
+ if not parts and track.url:
+ return f"{source}:url:{track.url}"
+
+ if not parts:
+ raise ValueError("Insufficient metadata to generate cache key")
+
+ raw = "|".join(parts)
+ digest = hashlib.sha256(raw.encode()).hexdigest()
+ return f"{source}:{digest}"
+
+
+class CacheEngine:
+ def __init__(self, db_path: str = DB_PATH):
+ self.db_path = db_path
+ self._init_db()
+
+ def _init_db(self) -> None:
+ """Create or migrate the cache table."""
+ with sqlite3.connect(self.db_path) as conn:
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS cache (
+ key TEXT PRIMARY KEY,
+ source TEXT NOT NULL,
+ status TEXT NOT NULL,
+ lyrics TEXT,
+ created_at INTEGER NOT NULL,
+ expires_at INTEGER,
+ artist TEXT,
+ title TEXT,
+ album TEXT,
+ length INTEGER
+ )
+ """)
+ # Migration: add length column if missing
+ cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
+ if "length" not in cols:
+ conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER")
+ conn.commit()
+
+ # Read
+
+ def get(self, track: TrackMeta, source: str) -> Optional[LyricResult]:
+ """Look up a cached result for *track* from *source*.
+
+ Returns None on cache miss or expiration.
+ """
+ try:
+ key = _generate_key(track, source)
+ except ValueError:
+ return None
+
+ with sqlite3.connect(self.db_path) as conn:
+ row = conn.execute(
+ "SELECT status, lyrics, source, expires_at, length FROM cache WHERE key = ?",
+ (key,),
+ ).fetchone()
+
+ if not row:
+ logger.debug(f"Cache miss: {source} / {track.display_name()}")
+ return None
+
+ status_str, lyrics, src, expires_at, cached_length = row
+
+ # Check TTL expiration
+ if expires_at and expires_at < int(time.time()):
+ logger.debug(f"Cache expired: {source} / {track.display_name()}")
+ conn.execute("DELETE FROM cache WHERE key = ?", (key,))
+ conn.commit()
+ return None
+
+ # Backfill length if the cached row is missing it
+ if cached_length is None and track.length is not None:
+ conn.execute(
+ "UPDATE cache SET length = ? WHERE key = ?",
+ (track.length, key),
+ )
+ conn.commit()
+
+ remaining = expires_at - int(time.time()) if expires_at else None
+ logger.debug(
+ f"Cache hit: {source} / {track.display_name()} "
+ f"[{status_str}, ttl={remaining}s]"
+ )
+ return LyricResult(
+ status=CacheStatus(status_str),
+ lyrics=lyrics,
+ source=src,
+ ttl=remaining,
+ )
+
+ def get_best(self, track: TrackMeta, sources: list[str]) -> Optional[LyricResult]:
+ """Return the best cached result across *sources* (synced > unsynced).
+
+ Skips negative statuses (NOT_FOUND, NETWORK_ERROR) — those are only
+ consulted per-source to avoid redundant fetches.
+ """
+ best: Optional[LyricResult] = None
+ for src in sources:
+ cached = self.get(track, src)
+ if not cached:
+ continue
+ if cached.status == CacheStatus.SUCCESS_SYNCED:
+ return cached # Can't do better
+ if cached.status == CacheStatus.SUCCESS_UNSYNCED and best is None:
+ best = cached
+ return best
+
+ # Write
+
+ def set(
+ self,
+ track: TrackMeta,
+ source: str,
+ result: LyricResult,
+ ttl_seconds: Optional[int] = None,
+ ) -> None:
+ """Store a lyric result in the cache."""
+ try:
+ key = _generate_key(track, source)
+ except ValueError:
+ logger.warning("Cannot cache: insufficient track metadata.")
+ return
+
+ now = int(time.time())
+ expires_at = now + ttl_seconds if ttl_seconds else None
+
+ with sqlite3.connect(self.db_path) as conn:
+ conn.execute(
+ """INSERT OR REPLACE INTO cache
+ (key, source, status, lyrics, created_at, expires_at,
+ artist, title, album, length)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
+ (
+ key,
+ source,
+ result.status.value,
+ result.lyrics,
+ now,
+ expires_at,
+ track.artist,
+ track.title,
+ track.album,
+ track.length,
+ ),
+ )
+ conn.commit()
+ logger.debug(
+ f"Cached: {source} / {track.display_name()} "
+ f"[{result.status.value}, ttl={ttl_seconds}s]"
+ )
+
+ # Delete
+
+ def clear_all(self) -> None:
+ """Remove every entry from the cache."""
+ with sqlite3.connect(self.db_path) as conn:
+ conn.execute("DELETE FROM cache")
+ conn.commit()
+ logger.info("Cache cleared.")
+
+ def clear_track(self, track: TrackMeta) -> None:
+ """Remove all cached entries (every source) for a single track."""
+ conditions, params = self._track_where(track)
+ if not conditions:
+ logger.info(f"No cache entries found for {track.display_name()}.")
+ return
+ where = " AND ".join(conditions)
+ with sqlite3.connect(self.db_path) as conn:
+ cur = conn.execute(f"DELETE FROM cache WHERE {where}", params)
+ conn.commit()
+ if cur.rowcount:
+ logger.info(
+ f"Cleared {cur.rowcount} cache entries for {track.display_name()}."
+ )
+ else:
+ logger.info(f"No cache entries found for {track.display_name()}.")
+
+ def prune(self) -> int:
+ """Remove all expired entries. Returns the number of rows deleted."""
+ with sqlite3.connect(self.db_path) as conn:
+ cur = conn.execute(
+ "DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
+ (int(time.time()),),
+ )
+ conn.commit()
+ count = cur.rowcount
+ logger.info(f"Pruned {count} expired cache entries.")
+ return count
+
+ @staticmethod
+ def _track_where(track: TrackMeta) -> tuple[list[str], list[str]]:
+ """Build WHERE conditions to match a track across all sources."""
+ conditions: list[str] = []
+ params: list[str] = []
+ if track.artist:
+ conditions.append("artist = ?")
+ params.append(track.artist)
+ if track.title:
+ conditions.append("title = ?")
+ params.append(track.title)
+ if track.album:
+ conditions.append("album = ?")
+ params.append(track.album)
+ return conditions, params
+
+ # Exact cross-source search
+
+ def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]:
+ """Find the best positive (synced/unsynced) cache entry for *track*.
+
+ Uses exact metadata match (artist + title + album) across all sources.
+ Returns synced if available, otherwise unsynced, or None.
+ """
+ conditions, params = self._track_where(track)
+ if not conditions:
+ return None
+
+ now = int(time.time())
+ conditions.append("status IN (?, ?)")
+ params.extend(
+ [CacheStatus.SUCCESS_SYNCED.value, CacheStatus.SUCCESS_UNSYNCED.value]
+ )
+ conditions.append("(expires_at IS NULL OR expires_at > ?)")
+ params.append(str(now))
+
+ where = " AND ".join(conditions)
+ with sqlite3.connect(self.db_path) as conn:
+ conn.row_factory = sqlite3.Row
+ rows = conn.execute(
+ f"SELECT status, lyrics, source FROM cache WHERE {where} "
+ "ORDER BY CASE status WHEN ? THEN 0 ELSE 1 END LIMIT 1",
+ params + [CacheStatus.SUCCESS_SYNCED.value],
+ ).fetchall()
+
+ if not rows:
+ return None
+
+ row = dict(rows[0])
+ return LyricResult(
+ status=CacheStatus(row["status"]),
+ lyrics=row["lyrics"],
+ source="cache-search",
+ )
+
+ # Fuzzy search
+
+ def search_by_meta(
+ self,
+ artist: Optional[str],
+ title: Optional[str],
+ length: Optional[int] = None,
+ ) -> list[dict]:
+ """Search cache for lyrics matching artist/title with fuzzy normalization.
+
+ Ignores album and source. Only returns positive results (synced/unsynced)
+ that have not expired. When *length* is provided, filters by duration
+ tolerance and sorts by closest match.
+ """
+ if not title:
+ return []
+
+ now = int(time.time())
+ with sqlite3.connect(self.db_path) as conn:
+ conn.row_factory = sqlite3.Row
+ rows = conn.execute(
+ """SELECT * FROM cache
+ WHERE status IN (?, ?)
+ AND (expires_at IS NULL OR expires_at > ?)""",
+ (
+ CacheStatus.SUCCESS_SYNCED.value,
+ CacheStatus.SUCCESS_UNSYNCED.value,
+ now,
+ ),
+ ).fetchall()
+
+ norm_title = _normalize_for_match(title)
+ norm_artist = _normalize_artist(artist) if artist else None
+
+ matches: list[dict] = []
+ for row in rows:
+ row_dict = dict(row)
+ # Title must match
+ row_title = row_dict.get("title") or ""
+ if _normalize_for_match(row_title) != norm_title:
+ continue
+ # Artist must match if provided
+ if norm_artist:
+ row_artist = row_dict.get("artist") or ""
+ if _normalize_artist(row_artist) != norm_artist:
+ continue
+ matches.append(row_dict)
+
+ # Duration filtering
+ if length is not None and matches:
+ scored = []
+ for m in matches:
+ row_len = m.get("length")
+ if row_len is not None:
+ diff = abs(row_len - length)
+ if diff <= DURATION_TOLERANCE_MS:
+ scored.append((diff, m))
+ else:
+ # No duration info in cache — still a candidate but lower priority
+ scored.append((DURATION_TOLERANCE_MS, m))
+ scored.sort(
+ key=lambda x: (
+ x[0],
+ x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value,
+ )
+ )
+ matches = [m for _, m in scored]
+
+ return matches
+
+ # Query / inspect
+
+ def query_track(self, track: TrackMeta) -> list[dict]:
+ """Return all cached rows for a given track (across all sources)."""
+ conditions, params = self._track_where(track)
+ if not conditions:
+ return []
+ where = " AND ".join(conditions)
+ with sqlite3.connect(self.db_path) as conn:
+ conn.row_factory = sqlite3.Row
+ return [
+ dict(r)
+ for r in conn.execute(
+ f"SELECT * FROM cache WHERE {where}", params
+ ).fetchall()
+ ]
+
+ def query_all(self) -> list[dict]:
+ """Return every row in the cache table."""
+ with sqlite3.connect(self.db_path) as conn:
+ conn.row_factory = sqlite3.Row
+ return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
+
+ def stats(self) -> dict:
+ """Return aggregate cache statistics."""
+ now = int(time.time())
+ with sqlite3.connect(self.db_path) as conn:
+ total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
+ expired = conn.execute(
+ "SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
+ (now,),
+ ).fetchone()[0]
+ by_status = dict(
+ conn.execute(
+ "SELECT status, COUNT(*) FROM cache GROUP BY status"
+ ).fetchall()
+ )
+ by_source = dict(
+ conn.execute(
+ "SELECT source, COUNT(*) FROM cache GROUP BY source"
+ ).fetchall()
+ )
+ return {
+ "total": total,
+ "expired": expired,
+ "active": total - expired,
+ "by_status": by_status,
+ "by_source": by_source,
+ }
diff --git a/lrx/cli.py b/lrx/cli.py
new file mode 100644
index 0000000..48d7640
--- /dev/null
+++ b/lrx/cli.py
@@ -0,0 +1,426 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-26 02:04:39
+Description: CLI interface
+"""
+
+import sys
+import time
+import os
+from pathlib import Path
+from typing import Annotated
+from urllib.parse import quote
+import cyclopts
+from loguru import logger
+
+from .config import enable_debug
+from .models import TrackMeta, CacheStatus
+from .mpris import get_current_track
+from .core import LrcManager
+from .fetchers import FetcherMethodType
+from .lrc import get_sidecar_path
+
+
+app = cyclopts.App(
+ help="LRCFetch — Fetch line-synced lyrics for your music player.",
+)
+app.register_install_completion_command()
+
+cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.")
+app.command(cache_app)
+
+manager = LrcManager()
+
+# Global state set by the meta launcher
+_player: str | None = None
+
+
+@app.meta.default
+def launcher(
+ *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
+ debug: Annotated[
+ bool,
+ cyclopts.Parameter(
+ name=["--debug", "-d"], negative="", help="Enable debug logging."
+ ),
+ ] = False,
+ player: Annotated[
+ str | None,
+ cyclopts.Parameter(
+ name=["--player", "-p"],
+ help="Target a specific MPRIS player using its DBus name or a portion thereof.",
+ ),
+ ] = None,
+):
+ global _player
+ if debug:
+ enable_debug()
+ _player = player
+ app(tokens)
+
+
+# fetch
+
+
+@app.command
+def fetch(
+ *,
+ method: Annotated[
+ FetcherMethodType | None,
+ cyclopts.Parameter(help="Force a specific source."),
+ ] = None,
+ no_cache: Annotated[
+ bool,
+ cyclopts.Parameter(
+ name="--no-cache", negative="", help="Bypass the cache for this request."
+ ),
+ ] = False,
+ only_synced: Annotated[
+ bool,
+ cyclopts.Parameter(
+ name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
+ ),
+ ] = False,
+):
+ """Fetch and print lyrics for the currently playing track."""
+ track = get_current_track(_player)
+
+ if not track:
+ logger.error("No active playing track found.")
+ sys.exit(1)
+
+ logger.info(f"Track: {track.display_name()}")
+
+ result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
+
+ if not result or not result.lyrics:
+ logger.error("No lyrics found.")
+ sys.exit(1)
+
+ if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
+ logger.error("Only unsynced lyrics available (--only-synced requested).")
+ sys.exit(1)
+
+ print(result.lyrics)
+
+
+# search
+
+
+@app.command
+def search(
+ *,
+ title: Annotated[
+ str | None, cyclopts.Parameter(name=["--title", "-t"], help="Track title.")
+ ] = None,
+ artist: Annotated[
+ str | None, cyclopts.Parameter(name=["--artist", "-a"], help="Artist name.")
+ ] = None,
+ album: Annotated[str | None, cyclopts.Parameter(help="Album name.")] = None,
+ trackid: Annotated[str | None, cyclopts.Parameter(help="Spotify track ID.")] = None,
+ length: Annotated[
+ int | None,
+ cyclopts.Parameter(
+ name=["--length", "-l"], help="Track duration in milliseconds."
+ ),
+ ] = None,
+ url: Annotated[
+ str | None,
+ cyclopts.Parameter(
+ help="Local file URL (file:///...). Mutually exclusive with --path."
+ ),
+ ] = None,
+ path: Annotated[
+ str | None,
+ cyclopts.Parameter(
+ name=["--path"],
+ help="Local audio file path. Mutually exclusive with --url.",
+ ),
+ ] = None,
+ method: Annotated[
+ FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
+ ] = None,
+ no_cache: Annotated[
+ bool,
+ cyclopts.Parameter(
+ name="--no-cache", negative="", help="Bypass the cache for this request."
+ ),
+ ] = False,
+ only_synced: Annotated[
+ bool,
+ cyclopts.Parameter(
+ name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
+ ),
+ ] = False,
+):
+ """Search for lyrics by metadata (bypasses MPRIS)."""
+ if url and path:
+ logger.error("--url and --path are mutually exclusive.")
+ sys.exit(1)
+
+ if path:
+ resolved = str(Path(path).resolve())
+ url = "file://" + quote(resolved, safe="/")
+
+ track = TrackMeta(
+ title=title,
+ artist=artist,
+ album=album,
+ trackid=trackid,
+ length=length,
+ url=url,
+ )
+
+ logger.info(f"Track: {track.display_name()}")
+
+ result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
+
+ if not result or not result.lyrics:
+ logger.error("No lyrics found.")
+ sys.exit(1)
+
+ if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
+ logger.error("Only unsynced lyrics available (--only-synced requested).")
+ sys.exit(1)
+
+ print(result.lyrics)
+
+
+# export
+
+
+@app.command
+def export(
+ *,
+ output: Annotated[
+ str | None,
+ cyclopts.Parameter(
+ name=["--output", "-o"],
+ help="Output file path (default: same directory as audio file with .lrc extension, or current directory if not available).",
+ ),
+ ] = None,
+ method: Annotated[
+ FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
+ ] = None,
+ no_cache: Annotated[
+ bool, cyclopts.Parameter(name="--no-cache", negative="", help="Bypass cache.")
+ ] = False,
+ overwrite: Annotated[
+ bool,
+ cyclopts.Parameter(
+ name=["--overwrite", "-f"], negative="", help="Overwrite existing file."
+ ),
+ ] = False,
+):
+ """Export lyrics of the current track to a .lrc file."""
+ track = get_current_track(_player)
+ if not track:
+ logger.error("No active playing track found.")
+ sys.exit(1)
+
+ result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
+ if not result or not result.lyrics:
+ logger.error("No lyrics available to export.")
+ sys.exit(1)
+
+ # Build default output path
+ if not output:
+ if track.url:
+ lrc_path = get_sidecar_path(track.url, ensure_exists=False)
+ if lrc_path:
+ output = str(lrc_path)
+ logger.info(f"Exporting to sidecar path: {output}")
+
+ # Fallback to current directory with sanitized filename
+ if not output:
+ filename = (
+ f"{track.artist} - {track.title}.lrc"
+ if track.artist and track.title
+ else "lyrics.lrc"
+ )
+ # Sanitize filename
+ filename = "".join(
+ c for c in filename if c.isalpha() or c.isdigit() or c in " -_."
+ ).rstrip()
+ output = os.path.join(os.getcwd(), filename)
+
+ if os.path.exists(output) and not overwrite:
+ logger.error(f"File exists: {output} (use -f to overwrite)")
+ sys.exit(1)
+
+ try:
+ with open(output, "w", encoding="utf-8") as f:
+ f.write(result.lyrics)
+ logger.info(f"Exported lyrics to {output}")
+ except Exception as e:
+ logger.error(f"Failed to write file: {e}")
+ sys.exit(1)
+
+
+# cache subcommands
+
+
+@cache_app.command
+def query(
+ *,
+ all: Annotated[
+ bool,
+ cyclopts.Parameter(name="--all", negative="", help="Dump all cache entries."),
+ ] = False,
+):
+ """Show cached entries for the current track."""
+ if all:
+ rows = manager.cache.query_all()
+ if not rows:
+ print("Cache is empty.")
+ return
+ for row in rows:
+ _print_cache_row(row)
+ print()
+ return
+
+ track = get_current_track(_player)
+ if not track:
+ logger.error("No active playing track found.")
+ sys.exit(1)
+ _print_track_cache(track)
+
+
+@cache_app.command
+def clear(
+ *,
+ all: Annotated[
+ bool,
+ cyclopts.Parameter(name="--all", negative="", help="Clear the entire cache."),
+ ] = False,
+):
+ """Clear cached entries for the current track."""
+ if all:
+ manager.cache.clear_all()
+ return
+
+ track = get_current_track(_player)
+ if not track:
+ logger.error("No active playing track found.")
+ sys.exit(1)
+ manager.cache.clear_track(track)
+
+
+@cache_app.command
+def prune():
+ """Remove expired cache entries."""
+ manager.cache.prune()
+
+
+@cache_app.command
+def stats():
+ """Show cache statistics."""
+ s = manager.cache.stats()
+ print("=== Cache Statistics ===")
+ print(f"Total entries : {s['total']}")
+ print(f"Active : {s['active']}")
+ print(f"Expired : {s['expired']}")
+ if s["by_status"]:
+ print("\nBy status:")
+ for status, count in s["by_status"].items():
+ print(f" {status}: {count}")
+ if s["by_source"]:
+ print("\nBy source:")
+ for source, count in s["by_source"].items():
+ print(f" {source}: {count}")
+
+
+@cache_app.command
+def insert(
+ *,
+ path: Annotated[
+ str | None,
+ cyclopts.Parameter(
+ name=["--path"],
+ help="Path to a local .lrc file to insert instead of reading from stdin.",
+ ),
+ ] = None,
+):
+ """Manually insert lyrics into the cache for the current track."""
+ track = get_current_track(_player)
+ if not track:
+ logger.error("No active playing track found.")
+ sys.exit(1)
+
+ if path:
+ try:
+ with open(path, "r", encoding="utf-8") as f:
+ lyrics = f.read()
+ except Exception as e:
+ logger.error(f"Failed to read file: {e}")
+ sys.exit(1)
+ else:
+ logger.info("Reading lyrics from stdin (Ctrl+D to finish)...")
+ lyrics = sys.stdin.read()
+
+ manager.manual_insert(track, lyrics)
+
+
+# helpers
+
+
+def _print_track_cache(track: TrackMeta) -> None:
+ """Print all cached entries for a given track."""
+ print(f"Track: {track.display_name()}")
+ if track.album:
+ print(f"Album: {track.album}")
+ if track.length:
+ secs = track.length / 1000.0
+ print(f"Duration: {int(secs // 60)}:{secs % 60:05.2f}")
+ print()
+
+ rows = manager.cache.query_track(track)
+ if not rows:
+ print(" (no cache entries)")
+ return
+
+ for row in rows:
+ _print_cache_row(row, indent=" ")
+
+
+def _print_cache_row(row: dict, indent: str = "") -> None:
+ """Pretty-print a single cache row."""
+ now = int(time.time())
+ source = row.get("source", "?")
+ status = row.get("status", "?")
+ artist = row.get("artist", "")
+ title = row.get("title", "")
+ album = row.get("album", "")
+ created = row.get("created_at", 0)
+ expires = row.get("expires_at")
+ lyrics = row.get("lyrics", "")
+
+ name = f"{artist} - {title}" if artist and title else row.get("key", "?")
+ print(f"{indent}[{source}] {name}")
+ if album:
+ print(f"{indent} Album : {album}")
+ print(f"{indent} Status : {status}")
+ if created:
+ age = now - created
+ print(f"{indent} Cached : {age // 3600}h {(age % 3600) // 60}m ago")
+ if expires:
+ remaining = expires - now
+ if remaining > 0:
+ print(
+ f"{indent} Expires : in {remaining // 3600}h {(remaining % 3600) // 60}m"
+ )
+ else:
+ print(f"{indent} Expires : EXPIRED")
+ else:
+ print(f"{indent} Expires : never")
+ if lyrics:
+ line_count = len(lyrics.splitlines())
+ print(f"{indent} Lyrics : {line_count} lines")
+
+
+def run():
+ app.meta()
+
+
+if __name__ == "__main__":
+ run()
diff --git a/lrx/config.py b/lrx/config.py
new file mode 100644
index 0000000..5f0f5b8
--- /dev/null
+++ b/lrx/config.py
@@ -0,0 +1,88 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 10:17:56
+Description: Global configuration constants and logger setup
+"""
+
+import os
+import sys
+from pathlib import Path
+from platformdirs import user_cache_dir, user_config_dir
+from dotenv import load_dotenv
+from loguru import logger
+from importlib.metadata import version
+
+# Application
+APP_NAME = "lrcfetch"
+APP_AUTHOR = "Uyanide"
+APP_VERSION = version(APP_NAME)
+
+# Paths
+CACHE_DIR = user_cache_dir(APP_NAME, APP_AUTHOR)
+DB_PATH = os.path.join(CACHE_DIR, "cache.db")
+
+# .env loading
+_config_env = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / ".env"
+load_dotenv(_config_env) # ~/.config/lrcfetch/.env
+load_dotenv() # .env in cwd (does NOT override existing vars)
+
+# HTTP
+HTTP_TIMEOUT = 10.0
+
+# Cache TTLs (seconds)
+TTL_SYNCED = None # never expires
+TTL_UNSYNCED = 86400 # 1 day
+TTL_NOT_FOUND = 86400 * 3 # 3 days
+TTL_NETWORK_ERROR = 3600 # 1 hour
+
+# Search
+DURATION_TOLERANCE_MS = 3000 # max duration mismatch for search matching
+
+# Spotify related
+SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token"
+SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
+SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time"
+SPOTIFY_SECRET_URL = (
+ "https://raw.githubusercontent.com/xyloflake/spot-secrets-go"
+ "/refs/heads/main/secrets/secrets.json"
+)
+SPOTIFY_SP_DC = os.environ.get("SPOTIFY_SP_DC", "")
+SPOTIFY_TOKEN_CACHE_FILE = os.path.join(CACHE_DIR, "spotify_token.json")
+SPOTIFY_APP_VERSION = "1.2.87.284.g3ff41c13"
+
+# Netease api
+NETEASE_SEARCH_URL = "https://music.163.com/api/cloudsearch/pc"
+NETEASE_LYRIC_URL = "https://interface3.music.163.com/api/song/lyric"
+
+# LRCLIB api
+LRCLIB_API_URL = "https://lrclib.net/api/get"
+LRCLIB_SEARCH_URL = "https://lrclib.net/api/search"
+
+# QQ Music API (self-hosted proxy)
+QQ_MUSIC_API_URL = os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/")
+
+# Player preference (used when multiple MPRIS players are active)
+PREFERRED_PLAYER = os.environ.get("LRCFETCH_PLAYER", "spotify")
+
+# User-Agents
+UA_BROWSER = "Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0"
+UA_LRCFETCH = f"LRCFetch {APP_VERSION} (https://github.com/Uyanide/lrcfetch)"
+
+os.makedirs(CACHE_DIR, exist_ok=True)
+
+# Logger
+_LOG_FORMAT = (
+ "{time:YYYY-MM-DD HH:mm:ss} | "
+ "{level: <8} | "
+ "{name}:{function}:{line} - "
+ "{message}"
+)
+
+logger.remove()
+logger.add(sys.stderr, format=_LOG_FORMAT, level="INFO")
+
+
+def enable_debug() -> None:
+ """Switch logger to DEBUG level."""
+ logger.remove()
+ logger.add(sys.stderr, format=_LOG_FORMAT, level="DEBUG")
diff --git a/lrx/core.py b/lrx/core.py
new file mode 100644
index 0000000..04ba9ea
--- /dev/null
+++ b/lrx/core.py
@@ -0,0 +1,178 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 11:09:53
+Description: Core orchestrator — coordinates fetchers with cache-aware fallback
+"""
+
+"""
+Fetch pipeline:
+ 1. Check cache for each source in the fallback sequence
+ 2. For sources without a valid cache hit, call the fetcher
+ 3. Cache every result (success, not-found, or error) per source
+ 4. Return the best result (synced > unsynced > None)
+"""
+
+from typing import Optional
+from loguru import logger
+
+from .fetchers import FetcherMethodType, create_fetchers
+from .fetchers.base import BaseFetcher
+from .cache import CacheEngine
+from .lrc import normalize_tags, normalize_unsynced, detect_sync_status
+from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR
+from .models import TrackMeta, LyricResult, CacheStatus
+from .enrichers import enrich_track
+
+
+# Maps CacheStatus to the default TTL used when storing results
+_STATUS_TTL: dict[CacheStatus, Optional[int]] = {
+ CacheStatus.SUCCESS_SYNCED: TTL_SYNCED,
+ CacheStatus.SUCCESS_UNSYNCED: TTL_UNSYNCED,
+ CacheStatus.NOT_FOUND: TTL_NOT_FOUND,
+ CacheStatus.NETWORK_ERROR: TTL_NETWORK_ERROR,
+}
+
+
+class LrcManager:
+ """Main entry point for fetching lyrics with caching."""
+
+ def __init__(self) -> None:
+ self.cache = CacheEngine()
+ self.fetchers = create_fetchers(self.cache)
+
+ def _build_sequence(
+ self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None
+ ) -> list[BaseFetcher]:
+ """Determine the ordered list of fetchers to try."""
+ if force_method:
+ if force_method not in self.fetchers:
+ logger.error(f"Unknown method: {force_method}")
+ return []
+ return [self.fetchers[force_method]]
+
+ sequence: list[BaseFetcher] = []
+ for method in self.fetchers.keys():
+ if self.fetchers[method].is_available(track):
+ sequence.append(self.fetchers[method])
+
+ logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}")
+ return sequence
+
+ def fetch_for_track(
+ self,
+ track: TrackMeta,
+ force_method: Optional[FetcherMethodType] = None,
+ bypass_cache: bool = False,
+ ) -> Optional[LyricResult]:
+ """Fetch lyrics for *track* using the fallback pipeline.
+
+ Each source is checked against the cache independently:
+ - Cache hit with synced lyrics → return immediately
+ - Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source
+ - Cache miss or unsynced → call fetcher, then cache the result
+
+ After all sources are tried, returns the best result found
+ (synced > unsynced > None).
+ """
+ track = enrich_track(track)
+ logger.info(f"Fetching lyrics for: {track.display_name()}")
+
+ sequence = self._build_sequence(track, force_method)
+ if not sequence:
+ return None
+
+ # Best result seen so far (synced wins over unsynced)
+ best_result: Optional[LyricResult] = None
+
+ for fetcher in sequence:
+ source = fetcher.source_name
+
+ # Cache check (skip for fetchers that handle their own caching)
+ if not bypass_cache and not fetcher.self_cached:
+ cached = self.cache.get(track, source)
+ if cached:
+ if cached.status == CacheStatus.SUCCESS_SYNCED:
+ logger.info(f"[{source}] cache hit: synced lyrics")
+ return cached
+ elif cached.status == CacheStatus.SUCCESS_UNSYNCED:
+ logger.debug(
+ f"[{source}] cache hit: unsynced lyrics (continuing)"
+ )
+ if best_result is None:
+ best_result = cached
+ continue # Try next source for synced
+ elif cached.status in (
+ CacheStatus.NOT_FOUND,
+ CacheStatus.NETWORK_ERROR,
+ ):
+ logger.debug(
+ f"[{source}] cache hit: {cached.status.value}, skipping"
+ )
+ continue
+ elif not fetcher.self_cached:
+ logger.debug(f"[{source}] cache bypassed")
+
+ # Fetch
+ logger.debug(f"[{source}] calling fetcher...")
+ result = fetcher.fetch(track, bypass_cache=bypass_cache)
+
+ if not result:
+ logger.debug(f"[{source}] returned None (no result)")
+ continue
+
+ # Cache the result (skip for self-cached fetchers)
+ if not fetcher.self_cached:
+ ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND)
+ self.cache.set(track, source, result, ttl_seconds=ttl)
+
+ # Evaluate result
+ if result.status == CacheStatus.SUCCESS_SYNCED:
+ logger.info(f"[{source}] got synced lyrics")
+ return result
+
+ if result.status == CacheStatus.SUCCESS_UNSYNCED:
+ logger.debug(f"[{source}] got unsynced lyrics (continuing)")
+ if best_result is None:
+ best_result = result
+
+ # NOT_FOUND / NETWORK_ERROR: already cached, try next
+
+ # Return best available
+ if best_result:
+ # Normalize unsynced lyrics: set all timestamps to [00:00.00]
+ if (
+ best_result.status == CacheStatus.SUCCESS_UNSYNCED
+ and best_result.lyrics
+ ):
+ best_result = LyricResult(
+ status=best_result.status,
+ lyrics=normalize_unsynced(best_result.lyrics),
+ source=best_result.source,
+ ttl=best_result.ttl,
+ )
+ logger.info(
+ f"Returning unsynced lyrics from {best_result.source} "
+ f"(no synced source found)"
+ )
+ else:
+ logger.info(f"No lyrics found for {track.display_name()}")
+
+ return best_result
+
+ def manual_insert(
+ self,
+ track: TrackMeta,
+ lyrics: str,
+ ) -> None:
+ """Manually insert lyrics into the cache for a track."""
+ track = enrich_track(track)
+ logger.info(f"Manually inserting lyrics for: {track.display_name()}")
+ lyrics = normalize_tags(lyrics)
+ result = LyricResult(
+ status=detect_sync_status(lyrics),
+ lyrics=normalize_tags(lyrics),
+ source="manual",
+ ttl=None,
+ )
+ self.cache.set(track, "manual", result, ttl_seconds=None)
+ logger.info("Lyrics inserted into cache.")
diff --git a/lrx/enrichers/__init__.py b/lrx/enrichers/__init__.py
new file mode 100644
index 0000000..9be0a80
--- /dev/null
+++ b/lrx/enrichers/__init__.py
@@ -0,0 +1,40 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-31 06:09:11
+Description: Metadata enrichment pipeline
+"""
+
+from loguru import logger
+
+from .base import BaseEnricher
+from .audio_tag import AudioTagEnricher
+from .file_name import FileNameEnricher
+from ..models import TrackMeta
+
+# Enrichers run in order; earlier ones have higher priority.
+_ENRICHERS: list[BaseEnricher] = [
+ AudioTagEnricher(),
+ FileNameEnricher(),
+]
+
+
+def enrich_track(track: TrackMeta) -> TrackMeta:
+ """Run all enrichers and return a track with missing fields filled in.
+
+ Each enricher sees the cumulative state (earlier enrichers' results
+ are already applied). A field is only set if it is currently None.
+ """
+ for enricher in _ENRICHERS:
+ try:
+ result = enricher.enrich(track)
+ except Exception as e:
+ logger.warning(f"Enricher {enricher.name} failed: {e}")
+ continue
+ if not result:
+ continue
+ # Only apply fields that are still None
+ updates = {k: v for k, v in result.items() if getattr(track, k, None) is None}
+ if updates:
+ for k, v in updates.items():
+ setattr(track, k, v)
+ return track
diff --git a/lrx/enrichers/__pycache__/__init__.cpython-313.pyc b/lrx/enrichers/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..bd522c1
Binary files /dev/null and b/lrx/enrichers/__pycache__/__init__.cpython-313.pyc differ
diff --git a/lrx/enrichers/__pycache__/audio_tag.cpython-313.pyc b/lrx/enrichers/__pycache__/audio_tag.cpython-313.pyc
new file mode 100644
index 0000000..c65ccca
Binary files /dev/null and b/lrx/enrichers/__pycache__/audio_tag.cpython-313.pyc differ
diff --git a/lrx/enrichers/__pycache__/base.cpython-313.pyc b/lrx/enrichers/__pycache__/base.cpython-313.pyc
new file mode 100644
index 0000000..8a88af1
Binary files /dev/null and b/lrx/enrichers/__pycache__/base.cpython-313.pyc differ
diff --git a/lrx/enrichers/__pycache__/file_name.cpython-313.pyc b/lrx/enrichers/__pycache__/file_name.cpython-313.pyc
new file mode 100644
index 0000000..e514b4f
Binary files /dev/null and b/lrx/enrichers/__pycache__/file_name.cpython-313.pyc differ
diff --git a/lrx/enrichers/audio_tag.py b/lrx/enrichers/audio_tag.py
new file mode 100644
index 0000000..4e9f604
--- /dev/null
+++ b/lrx/enrichers/audio_tag.py
@@ -0,0 +1,78 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-31 06:11:27
+Description: Enricher that reads metadata from audio file tags (mutagen)
+"""
+
+from typing import Optional
+from loguru import logger
+from mutagen._file import File, FileType
+
+from .base import BaseEnricher
+from ..models import TrackMeta
+from ..lrc import get_audio_path
+
+
+class AudioTagEnricher(BaseEnricher):
+ """Extract title, artist, album, and duration from audio file tags."""
+
+ @property
+ def name(self) -> str:
+ return "audio-tag"
+
+ def enrich(self, track: TrackMeta) -> Optional[dict]:
+ if not track.is_local or not track.url:
+ return None
+
+ audio_path = get_audio_path(track.url, ensure_exists=True)
+ if not audio_path:
+ return None
+
+ try:
+ audio = File(audio_path)
+ except Exception as e:
+ logger.debug(f"AudioTag: failed to read {audio_path}: {e}")
+ return None
+
+ if audio is None:
+ return None
+
+ updates: dict = {}
+
+ # Try common tag names (vorbis comments, ID3, MP4)
+ title = _first_tag(audio, "title", "TIT2", "\xa9nam")
+ if title and not track.title:
+ updates["title"] = title
+
+ artist = _first_tag(audio, "artist", "TPE1", "\xa9ART")
+ if artist and not track.artist:
+ updates["artist"] = artist
+
+ album = _first_tag(audio, "album", "TALB", "\xa9alb")
+ if album and not track.album:
+ updates["album"] = album
+
+ if not track.length and audio.info and hasattr(audio.info, "length"):
+ length_ms = int(audio.info.length * 1000)
+ if length_ms > 0:
+ updates["length"] = length_ms
+
+ if updates:
+ logger.debug(f"AudioTag: enriched fields: {list(updates.keys())}")
+ return updates or None
+
+
+def _first_tag(audio: FileType, *keys: str) -> Optional[str]:
+ """Return the first non-empty string value found among the given tag keys."""
+ if not audio.tags:
+ return None
+ for key in keys:
+ val = audio.tags.get(key)
+ if val is None:
+ continue
+ # mutagen returns lists for vorbis, single values for ID3
+ if isinstance(val, list):
+ val = val[0] if val else None
+ if val:
+ return str(val).strip()
+ return None
diff --git a/lrx/enrichers/base.py b/lrx/enrichers/base.py
new file mode 100644
index 0000000..f0a09da
--- /dev/null
+++ b/lrx/enrichers/base.py
@@ -0,0 +1,31 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-31 06:08:16
+Description: Base class for metadata enrichers
+"""
+
+from abc import ABC, abstractmethod
+from typing import Optional
+
+from ..models import TrackMeta
+
+
+class BaseEnricher(ABC):
+ """Attempts to fill missing fields on a TrackMeta.
+
+ Each enricher inspects the track, and returns a dict of field names
+ to values for any fields it can provide. Only fields that are
+ currently ``None`` on the track will actually be applied.
+ """
+
+ @property
+ @abstractmethod
+ def name(self) -> str: ...
+
+ @abstractmethod
+ def enrich(self, track: TrackMeta) -> Optional[dict]:
+ """Return a dict of {field_name: value} for fields this enricher can fill.
+
+ Return None or an empty dict if nothing can be contributed.
+ """
+ ...
diff --git a/lrx/enrichers/file_name.py b/lrx/enrichers/file_name.py
new file mode 100644
index 0000000..150cebd
--- /dev/null
+++ b/lrx/enrichers/file_name.py
@@ -0,0 +1,83 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-31 06:08:44
+Description: Enricher that parses metadata from the audio file path
+"""
+
+import re
+from typing import Optional
+from loguru import logger
+
+from .base import BaseEnricher
+from ..models import TrackMeta
+from ..lrc import get_audio_path
+
+
+# Common track-number prefixes: "01 - ", "01. ", "1 - ", etc.
+_TRACK_NUM_RE = re.compile(r"^\d{1,3}[\s.\-]+")
+
+
+class FileNameEnricher(BaseEnricher):
+ """Derive artist / title from the file path when tags are unavailable.
+
+ Heuristics (applied to the stem of the filename):
+ - "Artist - Title" → artist, title
+ - "01 - Title" → title only (leading track number stripped)
+ - "Title" → title only
+
+ If artist is still missing after parsing the filename, the parent
+ directory name is used as a guess (common layout: ``Artist/Album/track``).
+ """
+
+ @property
+ def name(self) -> str:
+ return "file-name"
+
+ def enrich(self, track: TrackMeta) -> Optional[dict]:
+ if not track.is_local or not track.url:
+ return None
+
+ audio_path = get_audio_path(track.url, ensure_exists=False)
+ if not audio_path:
+ return None
+
+ updates: dict = {}
+ stem = audio_path.stem
+
+ # Try "Artist - Title" split
+ if " - " in stem:
+ left, right = stem.split(" - ", 1)
+ left = _TRACK_NUM_RE.sub("", left).strip()
+ right = right.strip()
+
+ if left and right:
+ # Both sides non-empty after stripping track number
+ if not track.artist:
+ updates["artist"] = left
+ if not track.title:
+ updates["title"] = right
+ elif right:
+ # Left was only a track number → right is the title
+ if not track.title:
+ updates["title"] = right
+ else:
+ # No separator: strip track number, remainder is title
+ title_guess = _TRACK_NUM_RE.sub("", stem).strip()
+ if title_guess and not track.title:
+ updates["title"] = title_guess
+
+ # Use parent directory as artist fallback
+ # Typical layout: /Music/Artist/Album/01 - Track.flac
+ if not track.artist and "artist" not in updates:
+ parents = audio_path.parents
+ if len(parents) >= 2:
+ album_dir = parents[0].name
+ artist_dir = parents[1].name
+ if artist_dir and artist_dir not in (".", "/"):
+ updates["artist"] = artist_dir
+ if not track.album and album_dir and album_dir != artist_dir:
+ updates["album"] = album_dir
+
+ if updates:
+ logger.debug(f"FileName: enriched fields: {list(updates.keys())}")
+ return updates or None
diff --git a/lrx/fetchers/__init__.py b/lrx/fetchers/__init__.py
new file mode 100644
index 0000000..75377d6
--- /dev/null
+++ b/lrx/fetchers/__init__.py
@@ -0,0 +1,41 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 02:33:26
+Description: Fetcher pipeline — registry and types
+"""
+
+from typing import Literal
+
+from .base import BaseFetcher
+from .local import LocalFetcher
+from .cache_search import CacheSearchFetcher
+from .spotify import SpotifyFetcher
+from .lrclib import LrclibFetcher
+from .lrclib_search import LrclibSearchFetcher
+from .netease import NeteaseFetcher
+from .qqmusic import QQMusicFetcher
+from ..cache import CacheEngine
+
+FetcherMethodType = Literal[
+ "local",
+ "cache-search",
+ "spotify",
+ "lrclib",
+ "lrclib-search",
+ "netease",
+ "qqmusic",
+]
+
+
+def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]:
+ """Instantiate all fetchers. Returns a dict keyed by source name."""
+ fetchers: dict[FetcherMethodType, BaseFetcher] = {
+ "local": LocalFetcher(),
+ "cache-search": CacheSearchFetcher(cache),
+ "spotify": SpotifyFetcher(),
+ "lrclib": LrclibFetcher(),
+ "lrclib-search": LrclibSearchFetcher(),
+ "netease": NeteaseFetcher(),
+ "qqmusic": QQMusicFetcher(),
+ }
+ return fetchers
diff --git a/lrx/fetchers/__pycache__/__init__.cpython-313.pyc b/lrx/fetchers/__pycache__/__init__.cpython-313.pyc
new file mode 100644
index 0000000..645408d
Binary files /dev/null and b/lrx/fetchers/__pycache__/__init__.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/base.cpython-313.pyc b/lrx/fetchers/__pycache__/base.cpython-313.pyc
new file mode 100644
index 0000000..14e0222
Binary files /dev/null and b/lrx/fetchers/__pycache__/base.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/cache_search.cpython-313.pyc b/lrx/fetchers/__pycache__/cache_search.cpython-313.pyc
new file mode 100644
index 0000000..8036956
Binary files /dev/null and b/lrx/fetchers/__pycache__/cache_search.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/local.cpython-313.pyc b/lrx/fetchers/__pycache__/local.cpython-313.pyc
new file mode 100644
index 0000000..894ad14
Binary files /dev/null and b/lrx/fetchers/__pycache__/local.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/lrclib.cpython-313.pyc b/lrx/fetchers/__pycache__/lrclib.cpython-313.pyc
new file mode 100644
index 0000000..eda61f5
Binary files /dev/null and b/lrx/fetchers/__pycache__/lrclib.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/lrclib_search.cpython-313.pyc b/lrx/fetchers/__pycache__/lrclib_search.cpython-313.pyc
new file mode 100644
index 0000000..c01e5b0
Binary files /dev/null and b/lrx/fetchers/__pycache__/lrclib_search.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/musixmatch.cpython-313.pyc b/lrx/fetchers/__pycache__/musixmatch.cpython-313.pyc
new file mode 100644
index 0000000..7c09c74
Binary files /dev/null and b/lrx/fetchers/__pycache__/musixmatch.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/netease.cpython-313.pyc b/lrx/fetchers/__pycache__/netease.cpython-313.pyc
new file mode 100644
index 0000000..9fd6d53
Binary files /dev/null and b/lrx/fetchers/__pycache__/netease.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/qqmusic.cpython-313.pyc b/lrx/fetchers/__pycache__/qqmusic.cpython-313.pyc
new file mode 100644
index 0000000..a8b62e0
Binary files /dev/null and b/lrx/fetchers/__pycache__/qqmusic.cpython-313.pyc differ
diff --git a/lrx/fetchers/__pycache__/spotify.cpython-313.pyc b/lrx/fetchers/__pycache__/spotify.cpython-313.pyc
new file mode 100644
index 0000000..0b2c8bd
Binary files /dev/null and b/lrx/fetchers/__pycache__/spotify.cpython-313.pyc differ
diff --git a/lrx/fetchers/base.py b/lrx/fetchers/base.py
new file mode 100644
index 0000000..2bf70af
--- /dev/null
+++ b/lrx/fetchers/base.py
@@ -0,0 +1,35 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 02:33:26
+Description: Base fetcher class and common interfaces
+"""
+
+from abc import ABC, abstractmethod
+from typing import Optional
+
+from ..models import TrackMeta, LyricResult
+
+
+class BaseFetcher(ABC):
+ @property
+ @abstractmethod
+ def source_name(self) -> str:
+ """Name of the fetcher source."""
+ pass
+
+ @property
+ def self_cached(self) -> bool:
+ """True if this fetcher manages its own cache (skip per-source cache check)."""
+ return False
+
+ @abstractmethod
+ def is_available(self, track: TrackMeta) -> bool:
+ """Check if the fetcher is available for the given track (e.g. has required metadata)."""
+ pass
+
+ @abstractmethod
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ """Fetch lyrics for the given track. Returns None if unable to fetch."""
+ pass
diff --git a/lrx/fetchers/cache_search.py b/lrx/fetchers/cache_search.py
new file mode 100644
index 0000000..af973c5
--- /dev/null
+++ b/lrx/fetchers/cache_search.py
@@ -0,0 +1,85 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-28 05:57:46
+Description: Cache-search fetcher — cross-album fuzzy lookup in the local cache
+"""
+
+"""
+Searches existing cache entries by artist + title with fuzzy normalization,
+ignoring album and source. Useful when the same track appears on different
+albums or is played from different players.
+"""
+
+from typing import Optional
+from loguru import logger
+
+from .base import BaseFetcher
+from ..models import TrackMeta, LyricResult, CacheStatus
+from ..cache import CacheEngine
+
+
+class CacheSearchFetcher(BaseFetcher):
+ def __init__(self, cache: CacheEngine) -> None:
+ self._cache = cache
+
+ @property
+ def source_name(self) -> str:
+ return "cache-search"
+
+ @property
+ def self_cached(self) -> bool:
+ return True
+
+ def is_available(self, track: TrackMeta) -> bool:
+ return bool(track.title)
+
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ if bypass_cache:
+ logger.debug("Cache-search: bypassed by caller")
+ return None
+
+ if not track.title:
+ logger.debug("Cache-search: skipped — no title")
+ return None
+
+ # Fast path: exact metadata match (artist+title+album), single SQL query
+ exact = self._cache.find_best_positive(track)
+ if exact:
+ logger.info(f"Cache-search: exact hit ({exact.status.value})")
+ return exact
+
+ # Slow path: fuzzy cross-album search
+ matches = self._cache.search_by_meta(
+ artist=track.artist,
+ title=track.title,
+ length=track.length,
+ )
+
+ if not matches:
+ logger.debug(f"Cache-search: no match for {track.display_name()}")
+ return None
+
+ # Pick best: prefer synced, then first available
+ best = None
+ for m in matches:
+ if m.get("status") == CacheStatus.SUCCESS_SYNCED.value:
+ best = m
+ break
+ if best is None:
+ best = m
+
+ if not best or not best.get("lyrics"):
+ return None
+
+ status = CacheStatus(best["status"])
+ logger.info(
+ f"Cache-search: fuzzy hit from [{best.get('source')}] "
+ f"album={best.get('album')!r} ({status.value})"
+ )
+ return LyricResult(
+ status=status,
+ lyrics=best["lyrics"],
+ source=self.source_name,
+ )
diff --git a/lrx/fetchers/local.py b/lrx/fetchers/local.py
new file mode 100644
index 0000000..82e3ecd
--- /dev/null
+++ b/lrx/fetchers/local.py
@@ -0,0 +1,98 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-26 02:08:41
+Description: Local fetcher — reads lyrics from .lrc sidecar files or embedded audio metadata
+"""
+
+"""
+Priority:
+ 1. Same-directory .lrc file (e.g. /path/to/track.lrc)
+ 2. Embedded lyrics in audio metadata (FLAC, MP3 USLT/SYLT tags)
+"""
+
+from typing import Optional
+from loguru import logger
+from mutagen._file import File
+from mutagen.flac import FLAC
+
+from .base import BaseFetcher
+from ..models import TrackMeta, LyricResult
+from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path
+
+
+class LocalFetcher(BaseFetcher):
+ @property
+ def source_name(self) -> str:
+ return "local"
+
+ def is_available(self, track: TrackMeta) -> bool:
+ return track.is_local
+
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ """Attempt to read lyrics from local filesystem."""
+ if not track.is_local or not track.url:
+ return None
+
+ audio_path = get_audio_path(track.url, ensure_exists=False)
+ if not audio_path:
+ logger.debug(f"Local: audio URL is not a valid file path: {track.url}")
+ return None
+
+ lrc_path = get_sidecar_path(
+ track.url, ensure_audio_exists=False, ensure_exists=True
+ )
+ if lrc_path:
+ try:
+ with open(lrc_path, "r", encoding="utf-8") as f:
+ content = f.read().strip()
+ if content:
+ content = normalize_tags(content)
+ status = detect_sync_status(content)
+ logger.info(f"Local: found .lrc sidecar ({status.value})")
+ return LyricResult(
+ status=status, lyrics=content, source=self.source_name
+ )
+ except Exception as e:
+ logger.error(f"Local: error reading {lrc_path}: {e}")
+ else:
+ logger.debug(f"Local: no .lrc sidecar found for {audio_path}")
+
+ # Embedded metadata
+ if not audio_path.exists():
+ logger.debug(f"Local: audio file does not exist: {audio_path}")
+ return None
+ try:
+ audio = File(audio_path)
+ if audio is not None:
+ lyrics = None
+
+ if isinstance(audio, FLAC):
+ # FLAC stores lyrics in vorbis comment tags
+ lyrics = (
+ audio.get("lyrics") or audio.get("unsynclyrics") or [None]
+ )[0]
+ elif hasattr(audio, "tags") and audio.tags:
+ # MP3 / other: look for USLT or SYLT ID3 frames
+ for key in audio.tags.keys():
+ if key.startswith("USLT") or key.startswith("SYLT"):
+ lyrics = str(audio.tags[key])
+ break
+
+ if lyrics:
+ lyrics = normalize_tags(lyrics.strip())
+ status = detect_sync_status(lyrics)
+ logger.info(f"Local: found embedded lyrics ({status.value})")
+ return LyricResult(
+ status=status,
+ lyrics=lyrics,
+ source=f"{self.source_name} (embedded)",
+ )
+ else:
+ logger.debug("Local: no embedded lyrics found")
+ except Exception as e:
+ logger.error(f"Local: error reading metadata for {audio_path}: {e}")
+
+ logger.debug(f"Local: no lyrics found for {audio_path}")
+ return None
diff --git a/lrx/fetchers/lrclib.py b/lrx/fetchers/lrclib.py
new file mode 100644
index 0000000..94928d1
--- /dev/null
+++ b/lrx/fetchers/lrclib.py
@@ -0,0 +1,111 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 05:23:38
+Description: LRCLIB fetcher — queries lrclib.net for synced/plain lyrics
+"""
+
+"""
+Requires complete track metadata (artist, title, album, duration).
+"""
+
+from typing import Optional
+import httpx
+from loguru import logger
+from urllib.parse import urlencode
+
+from .base import BaseFetcher
+from ..models import TrackMeta, LyricResult, CacheStatus
+from ..lrc import normalize_tags
+from ..config import (
+ HTTP_TIMEOUT,
+ TTL_UNSYNCED,
+ TTL_NOT_FOUND,
+ TTL_NETWORK_ERROR,
+ LRCLIB_API_URL,
+ UA_LRCFETCH,
+)
+
+
+class LrclibFetcher(BaseFetcher):
+ @property
+ def source_name(self) -> str:
+ return "lrclib"
+
+ def is_available(self, track: TrackMeta) -> bool:
+ return track.is_complete
+
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ """Fetch lyrics from LRCLIB. Requires complete metadata."""
+ if not track.is_complete:
+ logger.debug("LRCLIB: skipped — incomplete metadata")
+ return None
+
+ params = {
+ "track_name": track.title,
+ "artist_name": track.artist,
+ "album_name": track.album,
+ "duration": track.length / 1000.0 if track.length else 0,
+ }
+
+ url = f"{LRCLIB_API_URL}?{urlencode(params)}"
+ logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
+
+ try:
+ with httpx.Client(timeout=HTTP_TIMEOUT) as client:
+ resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
+
+ if resp.status_code == 404:
+ logger.debug(f"LRCLIB: not found for {track.display_name()}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ if resp.status_code != 200:
+ logger.error(f"LRCLIB: API returned {resp.status_code}")
+ return LyricResult(
+ status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
+ )
+
+ data = resp.json()
+
+ # Validate response
+ if not isinstance(data, dict):
+ logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
+ return LyricResult(
+ status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
+ )
+
+ synced = data.get("syncedLyrics")
+ unsynced = data.get("plainLyrics")
+
+ if isinstance(synced, str) and synced.strip():
+ lyrics = normalize_tags(synced.strip())
+ logger.info(
+ f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)"
+ )
+ return LyricResult(
+ status=CacheStatus.SUCCESS_SYNCED,
+ lyrics=lyrics,
+ source=self.source_name,
+ )
+ elif isinstance(unsynced, str) and unsynced.strip():
+ lyrics = normalize_tags(unsynced.strip())
+ logger.info(
+ f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
+ )
+ return LyricResult(
+ status=CacheStatus.SUCCESS_UNSYNCED,
+ lyrics=lyrics,
+ source=self.source_name,
+ ttl=TTL_UNSYNCED,
+ )
+ else:
+ logger.debug(f"LRCLIB: empty response for {track.display_name()}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ except httpx.HTTPError as e:
+ logger.error(f"LRCLIB: HTTP error: {e}")
+ return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+ except Exception as e:
+ logger.error(f"LRCLIB: unexpected error: {e}")
+ return None
diff --git a/lrx/fetchers/lrclib_search.py b/lrx/fetchers/lrclib_search.py
new file mode 100644
index 0000000..95f87c2
--- /dev/null
+++ b/lrx/fetchers/lrclib_search.py
@@ -0,0 +1,168 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 05:30:50
+Description: LRCLIB search fetcher — fuzzy search via lrclib.net /api/search
+"""
+
+"""
+Used when metadata is incomplete (no album or duration) but title is available.
+Selects the best match by duration when track length is known.
+"""
+
+import httpx
+from typing import Optional
+from loguru import logger
+from urllib.parse import urlencode
+
+from .base import BaseFetcher
+from ..models import TrackMeta, LyricResult, CacheStatus
+from ..lrc import normalize_tags
+from ..config import (
+ HTTP_TIMEOUT,
+ TTL_UNSYNCED,
+ TTL_NOT_FOUND,
+ TTL_NETWORK_ERROR,
+ DURATION_TOLERANCE_MS,
+ LRCLIB_SEARCH_URL,
+ UA_LRCFETCH,
+)
+
+
+class LrclibSearchFetcher(BaseFetcher):
+ @property
+ def source_name(self) -> str:
+ return "lrclib-search"
+
+ def is_available(self, track: TrackMeta) -> bool:
+ return bool(track.title)
+
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ """Search LRCLIB for lyrics. Requires at least a title."""
+ if not track.title:
+ logger.debug("LRCLIB-search: skipped — no title")
+ return None
+
+ params: dict[str, str] = {"track_name": track.title}
+ if track.artist:
+ params["artist_name"] = track.artist
+ if track.album:
+ params["album_name"] = track.album
+
+ url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
+ logger.info(f"LRCLIB-search: searching for {track.display_name()}")
+
+ try:
+ with httpx.Client(timeout=HTTP_TIMEOUT) as client:
+ resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
+
+ if resp.status_code != 200:
+ logger.error(f"LRCLIB-search: API returned {resp.status_code}")
+ return LyricResult(
+ status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
+ )
+
+ data = resp.json()
+
+ if not isinstance(data, list) or len(data) == 0:
+ logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ logger.debug(f"LRCLIB-search: got {len(data)} candidates")
+
+ # Select best match by duration
+ best = self._select_best(data, track)
+ if best is None:
+ logger.debug("LRCLIB-search: no valid candidate found")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ # Extract lyrics
+ synced = best.get("syncedLyrics")
+ unsynced = best.get("plainLyrics")
+
+ if isinstance(synced, str) and synced.strip():
+ lyrics = normalize_tags(synced.strip())
+ logger.info(
+ f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)"
+ )
+ return LyricResult(
+ status=CacheStatus.SUCCESS_SYNCED,
+ lyrics=lyrics,
+ source=self.source_name,
+ )
+ elif isinstance(unsynced, str) and unsynced.strip():
+ lyrics = normalize_tags(unsynced.strip())
+ logger.info(
+ f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
+ )
+ return LyricResult(
+ status=CacheStatus.SUCCESS_UNSYNCED,
+ lyrics=lyrics,
+ source=self.source_name,
+ ttl=TTL_UNSYNCED,
+ )
+ else:
+ logger.debug("LRCLIB-search: best candidate has empty lyrics")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ except httpx.HTTPError as e:
+ logger.error(f"LRCLIB-search: HTTP error: {e}")
+ return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+ except Exception as e:
+ logger.error(f"LRCLIB-search: unexpected error: {e}")
+ return None
+
+ @staticmethod
+ def _select_best(candidates: list[dict], track: TrackMeta) -> Optional[dict]:
+ """Pick the best candidate, preferring synced lyrics and closest duration."""
+ if track.length is not None:
+ track_s = track.length / 1000.0
+ best: Optional[dict] = None
+ best_diff = float("inf")
+
+ for item in candidates:
+ if not isinstance(item, dict):
+ continue
+ duration = item.get("duration")
+ if not isinstance(duration, (int, float)):
+ continue
+ diff = abs(duration - track_s) * 1000 # compare in ms
+ if diff > DURATION_TOLERANCE_MS:
+ continue
+ # Prefer synced over unsynced at similar duration
+ has_synced = (
+ isinstance(item.get("syncedLyrics"), str)
+ and item["syncedLyrics"].strip()
+ )
+ best_synced = (
+ best is not None
+ and isinstance(best.get("syncedLyrics"), str)
+ and best["syncedLyrics"].strip()
+ )
+ if diff < best_diff or (
+ diff == best_diff and has_synced and not best_synced
+ ):
+ best_diff = diff
+ best = item
+
+ if best is not None:
+ logger.debug(
+ f"LRCLIB-search: selected id={best.get('id')} (diff={best_diff:.0f}ms)"
+ )
+ return best
+
+ logger.debug(
+ f"LRCLIB-search: no candidate within {DURATION_TOLERANCE_MS}ms"
+ )
+ return None
+
+ # No duration — pick first with synced lyrics, or just first
+ for item in candidates:
+ if (
+ isinstance(item, dict)
+ and isinstance(item.get("syncedLyrics"), str)
+ and item["syncedLyrics"].strip()
+ ):
+ return item
+ return candidates[0] if isinstance(candidates[0], dict) else None
diff --git a/lrx/fetchers/netease.py b/lrx/fetchers/netease.py
new file mode 100644
index 0000000..eecc8d7
--- /dev/null
+++ b/lrx/fetchers/netease.py
@@ -0,0 +1,213 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 11:04:51
+Description: Netease Cloud Music fetcher
+"""
+
+"""
+Uses the public cloudsearch API for searching and the song/lyric API for
+retrieving lyrics. No authentication required.
+
+Search results are filtered by duration when the track has a known length
+to avoid returning lyrics for the wrong version of a song.
+"""
+
+from typing import Optional
+import httpx
+from loguru import logger
+
+from .base import BaseFetcher
+from ..models import TrackMeta, LyricResult, CacheStatus
+from ..lrc import detect_sync_status, normalize_tags
+from ..config import (
+ HTTP_TIMEOUT,
+ TTL_NOT_FOUND,
+ TTL_NETWORK_ERROR,
+ DURATION_TOLERANCE_MS,
+ NETEASE_SEARCH_URL,
+ NETEASE_LYRIC_URL,
+ UA_BROWSER,
+)
+
+_HEADERS = {
+ "User-Agent": UA_BROWSER,
+ "Referer": "https://music.163.com/",
+}
+
+
+class NeteaseFetcher(BaseFetcher):
+ @property
+ def source_name(self) -> str:
+ return "netease"
+
+ def is_available(self, track: TrackMeta) -> bool:
+ return bool(track.title)
+
+ def _search(self, track: TrackMeta, limit: int = 10) -> Optional[int]:
+ """Search Netease and return the best-matching song ID.
+
+ When ``track.length`` is available, candidates are ranked by duration
+ difference and only accepted if within ``DURATION_TOLERANCE_MS``.
+ """
+ query = f"{track.artist or ''} {track.title or ''}".strip()
+ if not query:
+ return None
+
+ logger.debug(f"Netease: searching for '{query}' (limit={limit})")
+
+ try:
+ with httpx.Client(timeout=HTTP_TIMEOUT) as client:
+ resp = client.post(
+ NETEASE_SEARCH_URL,
+ headers=_HEADERS,
+ data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
+ )
+ resp.raise_for_status()
+ result = resp.json()
+
+ # Validate response
+ if not isinstance(result, dict):
+ logger.error(
+ f"Netease: search returned non-dict: {type(result).__name__}"
+ )
+ return None
+
+ result_body = result.get("result")
+ if not isinstance(result_body, dict):
+ logger.debug("Netease: search 'result' field missing or invalid")
+ return None
+
+ songs = result_body.get("songs")
+ if not isinstance(songs, list) or len(songs) == 0:
+ logger.debug("Netease: search returned 0 results")
+ return None
+
+ logger.debug(f"Netease: search returned {len(songs)} candidates")
+
+ # Duration-based best-match selection
+ if track.length is not None:
+ track_ms = track.length
+ best_id: Optional[int] = None
+ best_diff = float("inf")
+
+ for song in songs:
+ if not isinstance(song, dict):
+ continue
+ sid = song.get("id")
+ name = song.get("name", "?")
+ duration = song.get("dt") # milliseconds
+ if not isinstance(duration, int):
+ logger.debug(
+ f" candidate {sid} '{name}': no duration, skipped"
+ )
+ continue
+ diff = abs(duration - track_ms)
+ logger.debug(
+ f" candidate {sid} '{name}': "
+ f"duration={duration}ms, diff={diff}ms"
+ )
+ if diff < best_diff:
+ best_diff = diff
+ best_id = sid
+
+ if best_id is not None and best_diff <= DURATION_TOLERANCE_MS:
+ logger.debug(f"Netease: selected id={best_id} (diff={best_diff}ms)")
+ return best_id
+
+ logger.debug(
+ f"Netease: no candidate within {DURATION_TOLERANCE_MS}ms "
+ f"(best diff={best_diff}ms)"
+ )
+ return None
+
+ # No duration info — take the first result
+ first = songs[0]
+ if not isinstance(first, dict) or "id" not in first:
+ logger.error("Netease: first search result has no 'id'")
+ return None
+ logger.debug(
+ f"Netease: no duration available, using first result "
+ f"id={first['id']} '{first.get('name', '?')}'"
+ )
+ return first["id"]
+
+ except Exception as e:
+ logger.error(f"Netease: search failed: {e}")
+ return None
+
+ def _get_lyric(self, song_id: int) -> Optional[LyricResult]:
+ """Fetch lyrics for a given Netease song ID."""
+ logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
+
+ try:
+ with httpx.Client(timeout=HTTP_TIMEOUT) as client:
+ resp = client.post(
+ NETEASE_LYRIC_URL,
+ headers=_HEADERS,
+ data={
+ "id": str(song_id),
+ "cp": "false",
+ "tv": "0",
+ "lv": "0",
+ "rv": "0",
+ "kv": "0",
+ "yv": "0",
+ "ytv": "0",
+ "yrv": "0",
+ },
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ # Validate response
+ if not isinstance(data, dict):
+ logger.error(
+ f"Netease: lyric response is not dict: {type(data).__name__}"
+ )
+ return LyricResult(
+ status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
+ )
+
+ lrc_obj = data.get("lrc")
+ if not isinstance(lrc_obj, dict):
+ logger.debug(
+ f"Netease: no 'lrc' object in response for song_id={song_id}"
+ )
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ lrc: str = lrc_obj.get("lyric", "")
+ if not isinstance(lrc, str) or not lrc.strip():
+ logger.debug(f"Netease: empty lyrics for song_id={song_id}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ # Determine sync status
+ lrc = normalize_tags(lrc)
+ status = detect_sync_status(lrc)
+ logger.info(
+ f"Netease: got {status.value} lyrics for song_id={song_id} "
+ f"({len(lrc.splitlines())} lines)"
+ )
+ return LyricResult(
+ status=status, lyrics=lrc.strip(), source=self.source_name
+ )
+
+ except Exception as e:
+ logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
+ return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ """Search for the track and fetch its lyrics."""
+ query = f"{track.artist or ''} {track.title or ''}".strip()
+ if not query:
+ logger.debug("Netease: skipped — insufficient metadata")
+ return None
+
+ logger.info(f"Netease: fetching lyrics for {track.display_name()}")
+ song_id = self._search(track)
+ if not song_id:
+ logger.debug(f"Netease: no match found for {track.display_name()}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ return self._get_lyric(song_id)
diff --git a/lrx/fetchers/qqmusic.py b/lrx/fetchers/qqmusic.py
new file mode 100644
index 0000000..a5d0b63
--- /dev/null
+++ b/lrx/fetchers/qqmusic.py
@@ -0,0 +1,178 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-31 01:54:02
+Description: QQ Music fetcher via self-hosted API proxy
+"""
+
+"""
+Requires a running qq-music-api instance.
+The base URL is read from the QQ_MUSIC_API_URL environment variable.
+
+Search → pick best match by duration → fetch LRC lyrics.
+"""
+
+from typing import Optional
+import httpx
+from loguru import logger
+
+from .base import BaseFetcher
+from ..models import TrackMeta, LyricResult, CacheStatus
+from ..lrc import detect_sync_status, normalize_tags
+from ..config import (
+ HTTP_TIMEOUT,
+ TTL_NOT_FOUND,
+ TTL_NETWORK_ERROR,
+ DURATION_TOLERANCE_MS,
+ QQ_MUSIC_API_URL,
+)
+
+
+class QQMusicFetcher(BaseFetcher):
+ @property
+ def source_name(self) -> str:
+ return "qqmusic"
+
+ def is_available(self, track: TrackMeta) -> bool:
+ return bool(track.title) and bool(QQ_MUSIC_API_URL)
+
+ def _search(self, track: TrackMeta, limit: int = 10) -> Optional[str]:
+ """Search QQ Music and return the best-matching song MID."""
+ query = f"{track.artist or ''} {track.title or ''}".strip()
+ if not query:
+ return None
+
+ logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
+
+ try:
+ with httpx.Client(timeout=HTTP_TIMEOUT) as client:
+ resp = client.get(
+ f"{QQ_MUSIC_API_URL}/api/search",
+ params={"keyword": query, "type": "song", "num": limit},
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ if data.get("code") != 0:
+ logger.error(f"QQMusic: search API error: {data}")
+ return None
+
+ songs = data.get("data", {}).get("list", [])
+ if not songs:
+ logger.debug("QQMusic: search returned 0 results")
+ return None
+
+ logger.debug(f"QQMusic: search returned {len(songs)} candidates")
+
+ # Duration-based best-match selection
+ if track.length is not None:
+ track_ms = track.length
+ best_mid: Optional[str] = None
+ best_diff = float("inf")
+
+ for song in songs:
+ if not isinstance(song, dict):
+ continue
+ mid = song.get("mid")
+ name = song.get("name", "?")
+ # interval is in seconds
+ interval = song.get("interval")
+ if not isinstance(interval, int):
+ logger.debug(
+ f" candidate {mid} '{name}': no duration, skipped"
+ )
+ continue
+ duration_ms = interval * 1000
+ diff = abs(duration_ms - track_ms)
+ logger.debug(
+ f" candidate {mid} '{name}': "
+ f"duration={duration_ms}ms, diff={diff}ms"
+ )
+ if diff < best_diff:
+ best_diff = diff
+ best_mid = mid
+
+ if best_mid is not None and best_diff <= DURATION_TOLERANCE_MS:
+ logger.debug(
+ f"QQMusic: selected mid={best_mid} (diff={best_diff}ms)"
+ )
+ return best_mid
+
+ logger.debug(
+ f"QQMusic: no candidate within {DURATION_TOLERANCE_MS}ms "
+ f"(best diff={best_diff}ms)"
+ )
+ return None
+
+ # No duration info — take the first result
+ first = songs[0]
+ if not isinstance(first, dict) or "mid" not in first:
+ logger.error("QQMusic: first search result has no 'mid'")
+ return None
+ logger.debug(
+ f"QQMusic: no duration available, using first result "
+ f"mid={first['mid']} '{first.get('name', '?')}'"
+ )
+ return first["mid"]
+
+ except Exception as e:
+ logger.error(f"QQMusic: search failed: {e}")
+ return None
+
+ def _get_lyric(self, mid: str) -> Optional[LyricResult]:
+ """Fetch lyrics for a given QQ Music song MID."""
+ logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
+
+ try:
+ with httpx.Client(timeout=HTTP_TIMEOUT) as client:
+ resp = client.get(
+ f"{QQ_MUSIC_API_URL}/api/lyric",
+ params={"mid": mid},
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ if data.get("code") != 0:
+ logger.error(f"QQMusic: lyric API error: {data}")
+ return LyricResult(
+ status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
+ )
+
+ lrc = data.get("data", {}).get("lyric", "")
+ if not isinstance(lrc, str) or not lrc.strip():
+ logger.debug(f"QQMusic: empty lyrics for mid={mid}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ lrc = normalize_tags(lrc)
+ status = detect_sync_status(lrc)
+ logger.info(
+ f"QQMusic: got {status.value} lyrics for mid={mid} "
+ f"({len(lrc.splitlines())} lines)"
+ )
+ return LyricResult(
+ status=status, lyrics=lrc.strip(), source=self.source_name
+ )
+
+ except Exception as e:
+ logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
+ return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ """Search for the track and fetch its lyrics."""
+ if not QQ_MUSIC_API_URL:
+ logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured")
+ return None
+
+ query = f"{track.artist or ''} {track.title or ''}".strip()
+ if not query:
+ logger.debug("QQMusic: skipped — insufficient metadata")
+ return None
+
+ logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
+ mid = self._search(track)
+ if not mid:
+ logger.debug(f"QQMusic: no match found for {track.display_name()}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ return self._get_lyric(mid)
diff --git a/lrx/fetchers/spotify.py b/lrx/fetchers/spotify.py
new file mode 100644
index 0000000..fcd568c
--- /dev/null
+++ b/lrx/fetchers/spotify.py
@@ -0,0 +1,373 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 10:43:21
+Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
+"""
+
+"""
+Authentication flow:
+ 1. Fetch server time from Spotify
+ 2. Fetch TOTP secret
+ 3. Generate a TOTP code and exchange it (with SP_DC cookie) for an access token
+ 4. Request lyrics using the access token
+
+The secret and token are cached on the instance to avoid redundant network
+calls within the same session.
+
+Requires SPOTIFY_SP_DC environment variable to be set.
+"""
+
+import httpx
+import json
+import time
+import struct
+import hmac
+import hashlib
+from typing import Optional, Tuple
+from loguru import logger
+
+from .base import BaseFetcher
+from ..models import TrackMeta, LyricResult, CacheStatus
+from ..lrc import normalize_tags
+from ..config import (
+ HTTP_TIMEOUT,
+ SPOTIFY_APP_VERSION,
+ TTL_NOT_FOUND,
+ TTL_NETWORK_ERROR,
+ SPOTIFY_TOKEN_URL,
+ SPOTIFY_LYRICS_URL,
+ SPOTIFY_SERVER_TIME_URL,
+ SPOTIFY_SECRET_URL,
+ SPOTIFY_SP_DC,
+ SPOTIFY_TOKEN_CACHE_FILE,
+ UA_BROWSER,
+)
+
+
+class SpotifyFetcher(BaseFetcher):
+ def __init__(self) -> None:
+ # Session-level caches to avoid refetching within the same run
+ self._cached_secret: Optional[Tuple[str, int]] = None
+ self._cached_token: Optional[str] = None
+ self._token_expires_at: float = 0.0
+
+ @property
+ def source_name(self) -> str:
+ return "spotify"
+
+ def is_available(self, track: TrackMeta) -> bool:
+ return bool(track.trackid) and bool(SPOTIFY_SP_DC)
+
+ # ─── Auth helpers ────────────────────────────────────────────────
+
+ def _get_server_time(self, client: httpx.Client) -> Optional[int]:
+ """Fetch Spotify's server timestamp (seconds since epoch)."""
+ try:
+ res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
+ res.raise_for_status()
+ data = res.json()
+ if not isinstance(data, dict) or "serverTime" not in data:
+ logger.error(f"Spotify: unexpected server-time response: {data}")
+ return None
+ server_time = data["serverTime"]
+ logger.debug(f"Spotify: server time = {server_time}")
+ return server_time
+ except Exception as e:
+ logger.error(f"Spotify: failed to fetch server time: {e}")
+ return None
+
+ def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]:
+ """Fetch and decode the TOTP secret. Cached after first success.
+
+ Response format: [{version: int, secret: str}, ...]
+ Each character in *secret* is XOR-decoded with ``(index % 33) + 9``.
+ """
+ if self._cached_secret is not None:
+ logger.debug("Spotify: using cached TOTP secret")
+ return self._cached_secret
+
+ try:
+ res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
+ res.raise_for_status()
+ data = res.json()
+
+ if not isinstance(data, list) or len(data) == 0:
+ logger.error(
+ f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})"
+ )
+ return None
+
+ last = data[-1]
+ if "secret" not in last or "version" not in last:
+ logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
+ return None
+
+ secret_raw = last["secret"]
+ version = last["version"]
+
+ # XOR decode
+ parts = []
+ for i, char in enumerate(secret_raw):
+ parts.append(str(ord(char) ^ ((i % 33) + 9)))
+ secret = "".join(parts)
+
+ logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
+ self._cached_secret = (secret, version)
+ return self._cached_secret
+
+ except Exception as e:
+ logger.error(f"Spotify: failed to fetch secret: {e}")
+ return None
+
+ @staticmethod
+ def _generate_totp(server_time_s: int, secret: str) -> str:
+ """Generate a 6-digit TOTP code compatible with Spotify's auth.
+
+ Uses HMAC-SHA1 with a 30-second period, matching the Go reference.
+ """
+ counter = server_time_s // 30
+ counter_bytes = struct.pack(">Q", counter)
+
+ mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
+
+ offset = mac[-1] & 0x0F
+ binary_code = (
+ (mac[offset] & 0x7F) << 24
+ | (mac[offset + 1] & 0xFF) << 16
+ | (mac[offset + 2] & 0xFF) << 8
+ | (mac[offset + 3] & 0xFF)
+ )
+
+ code = binary_code % (10**6)
+ return str(code).zfill(6)
+
+ def _load_cached_token(self) -> Optional[str]:
+ """Try to load a valid token from the persistent cache file."""
+ try:
+ with open(SPOTIFY_TOKEN_CACHE_FILE, "r") as f:
+ data = json.load(f)
+ expires_ms = data.get("accessTokenExpirationTimestampMs", 0)
+ if expires_ms <= int(time.time() * 1000):
+ logger.debug("Spotify: persisted token expired")
+ return None
+ token = data.get("accessToken", "")
+ if not token:
+ return None
+ self._cached_token = token
+ self._token_expires_at = expires_ms / 1000.0
+ logger.debug("Spotify: loaded token from cache file")
+ return token
+ except (FileNotFoundError, json.JSONDecodeError, KeyError):
+ return None
+
+ def _save_token(self, body: dict) -> None:
+ """Persist the token response to disk."""
+ try:
+ with open(SPOTIFY_TOKEN_CACHE_FILE, "w") as f:
+ json.dump(body, f)
+ logger.debug("Spotify: token saved to cache file")
+ except Exception as e:
+ logger.warning(f"Spotify: failed to write token cache: {e}")
+
+ def _get_token(self) -> Optional[str]:
+ """Obtain a Spotify access token. Cached in memory and on disk.
+
+ Requires SP_DC cookie (set via SPOTIFY_SP_DC env var).
+ """
+ # 1. Memory cache
+ if self._cached_token and time.time() < self._token_expires_at - 30:
+ logger.debug("Spotify: using in-memory cached token")
+ return self._cached_token
+
+ # 2. Disk cache
+ disk_token = self._load_cached_token()
+ if disk_token and time.time() < self._token_expires_at - 30:
+ return disk_token
+
+ # 3. Fetch new token
+ if not SPOTIFY_SP_DC:
+ logger.error(
+ "Spotify: SPOTIFY_SP_DC env var not set — "
+ "cannot authenticate with Spotify"
+ )
+ return None
+
+ headers = {
+ "User-Agent": UA_BROWSER,
+ "Accept": "*/*",
+ "Referer": "https://open.spotify.com/",
+ "Cookie": f"sp_dc={SPOTIFY_SP_DC}",
+ }
+
+ with httpx.Client(headers=headers) as client:
+ server_time = self._get_server_time(client)
+ if server_time is None:
+ return None
+
+ secret_data = self._get_secret(client)
+ if secret_data is None:
+ return None
+
+ secret, version = secret_data
+ totp = self._generate_totp(server_time, secret)
+ logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
+
+ params = {
+ "reason": "init",
+ "productType": "web-player",
+ "totp": totp,
+ "totpVer": str(version),
+ "totpServer": totp,
+ }
+
+ try:
+ res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT)
+ if res.status_code != 200:
+ logger.error(f"Spotify: token request returned {res.status_code}")
+ return None
+
+ body = res.json()
+
+ if not isinstance(body, dict) or "accessToken" not in body:
+ logger.error(
+ f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}"
+ )
+ return None
+
+ token = body["accessToken"]
+ is_anonymous = body.get("isAnonymous", False)
+ if is_anonymous:
+ logger.warning(
+ "Spotify: received anonymous token — SP_DC may be invalid"
+ )
+
+ expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
+ if expires_ms and expires_ms > int(time.time() * 1000):
+ self._token_expires_at = expires_ms / 1000.0
+ else:
+ logger.warning("Spotify: token expiry missing or invalid")
+ self._token_expires_at = time.time() + 3600
+
+ self._cached_token = token
+ # Persist to disk (including anonymous tokens, same as Go ref)
+ self._save_token(body)
+ logger.debug("Spotify: obtained access token")
+ return token
+
+ except Exception as e:
+ logger.error(f"Spotify: token request failed: {e}")
+ return None
+
+ # ─── Lyrics ──────────────────────────────────────────────────────
+
+ @staticmethod
+ def _format_lrc_line(start_ms: int, words: str) -> str:
+ """Format a single lyric line as LRC ``[mm:ss.cc]text``."""
+ minutes = start_ms // 60000
+ seconds = (start_ms // 1000) % 60
+ centiseconds = round((start_ms % 1000) / 10.0)
+ return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
+
+ @staticmethod
+ def _is_truly_synced(lines: list[dict]) -> bool:
+ """Check if lyrics are actually synced (not all timestamps zero)."""
+ for line in lines:
+ try:
+ ms = int(line.get("startTimeMs", "0"))
+ if ms > 0:
+ return True
+ except (ValueError, TypeError):
+ continue
+ return False
+
+ def fetch(
+ self, track: TrackMeta, bypass_cache: bool = False
+ ) -> Optional[LyricResult]:
+ """Fetch lyrics for a Spotify track by its track ID."""
+ if not track.trackid:
+ logger.debug("Spotify: skipped — no trackid in metadata")
+ return None
+
+ logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
+
+ token = self._get_token()
+ if not token:
+ logger.error("Spotify: cannot fetch lyrics without a token")
+ return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+
+ url = f"{SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token"
+ headers = {
+ "User-Agent": UA_BROWSER,
+ "Accept": "application/json",
+ "Authorization": f"Bearer {token}",
+ "Referer": "https://open.spotify.com/",
+ "App-Platform": "WebPlayer",
+ "Spotify-App-Version": SPOTIFY_APP_VERSION,
+ "Origin": "https://open.spotify.com",
+ }
+
+ try:
+ with httpx.Client(timeout=HTTP_TIMEOUT) as client:
+ res = client.get(url, headers=headers)
+
+ if res.status_code == 404:
+ logger.debug(f"Spotify: 404 for trackid={track.trackid}")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ if res.status_code != 200:
+ logger.error(f"Spotify: lyrics API returned {res.status_code}")
+ return LyricResult(
+ status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
+ )
+
+ data = res.json()
+
+ # Validate response structure
+ if not isinstance(data, dict) or "lyrics" not in data:
+ logger.error("Spotify: unexpected lyrics response structure")
+ return LyricResult(
+ status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
+ )
+
+ lyrics_data = data["lyrics"]
+ sync_type = lyrics_data.get("syncType", "")
+ lines = lyrics_data.get("lines", [])
+
+ if not isinstance(lines, list) or len(lines) == 0:
+ logger.debug("Spotify: response contained no lyric lines")
+ return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+
+ # Determine sync status
+ # syncType == "LINE_SYNCED" AND at least one non-zero timestamp
+ is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
+
+ # Convert to LRC
+ lrc_lines: list[str] = []
+ for line in lines:
+ words = line.get("words", "")
+ if not isinstance(words, str):
+ continue
+ try:
+ ms = int(line.get("startTimeMs", "0"))
+ except (ValueError, TypeError):
+ ms = 0
+
+ if is_synced:
+ lrc_lines.append(self._format_lrc_line(ms, words))
+ else:
+ # Unsynced: emit with zero timestamps
+ lrc_lines.append(f"[00:00.00]{words}")
+
+ content = normalize_tags("\n".join(lrc_lines))
+ status = (
+ CacheStatus.SUCCESS_SYNCED
+ if is_synced
+ else CacheStatus.SUCCESS_UNSYNCED
+ )
+
+ logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)")
+ return LyricResult(status=status, lyrics=content, source=self.source_name)
+
+ except Exception as e:
+ logger.error(f"Spotify: lyrics fetch failed: {e}")
+ return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
diff --git a/lrx/lrc.py b/lrx/lrc.py
new file mode 100644
index 0000000..6913512
--- /dev/null
+++ b/lrx/lrc.py
@@ -0,0 +1,178 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 21:54:01
+Description: Shared LRC time-tag utilities (definitely overengineered)
+"""
+
+import re
+from pathlib import Path
+from typing import Optional
+from urllib.parse import unquote
+
+from .models import CacheStatus
+
+# Parses any time tag input format:
+# [mm:ss], [mm:ss.c], [mm:ss.cc], [mm:ss.ccc], [mm:ss:cc], …
+_RAW_TAG_RE = re.compile(r"\[(\d{2,}):(\d{2})(?:[.:](\d{1,3}))?\]")
+
+# Standard format after normalization: [mm:ss.cc]
+_STD_TAG_RE = re.compile(r"\[\d{2,}:\d{2}\.\d{2}\]")
+
+# Standard format with capture groups
+_STD_TAG_CAPTURE_RE = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2})\]")
+
+# Matches a standard time tag at the start of a line
+_LRC_LINE_RE = re.compile(r"^\[\d{2,}:\d{2}\.\d{2}\]", re.MULTILINE)
+
+# [offset:+/-xxx] tag — value in milliseconds
+_OFFSET_RE = re.compile(r"^\[offset:\s*([+-]?\d+)\]\s*$", re.MULTILINE | re.IGNORECASE)
+
+
+def _raw_tag_to_cs(mm: str, ss: str, frac: Optional[str]) -> str:
+ """Convert parsed time tag components to standard [mm:ss.cc] string."""
+ if frac is None:
+ ms = 0
+ else:
+ # cc in [mm:ss:cc] is also treated as centiseconds, per LRC spec
+ # ^
+ # why does this format even exist, idk
+ n = len(frac)
+ if n == 1:
+ ms = int(frac) * 100
+ elif n == 2:
+ ms = int(frac) * 10
+ else:
+ ms = int(frac)
+ cs = min(round(ms / 10), 99)
+ return f"[{mm}:{ss}.{cs:02d}]"
+
+
+def _reformat(text: str) -> str:
+ """Parse each line and reformat to standard [mm:ss.cc]...content form.
+
+ Handles any mix of time tag formats on input. Lines with no time tags
+ are stripped of leading/trailing whitespace and passed through unchanged.
+ """
+ out: list[str] = []
+ for line in text.splitlines():
+ line = line.strip()
+ pos = 0
+ tags: list[str] = []
+ while True:
+ while pos < len(line) and line[pos] == " ":
+ pos += 1
+ m = _RAW_TAG_RE.match(line, pos)
+ # Non-time tags are passed through as-is, except for leading/trailing whitespace which is stripped.
+ if not m:
+ # No more tags on this line
+ break
+ tags.append(_raw_tag_to_cs(m.group(1), m.group(2), m.group(3)))
+ pos = m.end()
+ if tags:
+ # This could break lyric lines of some kind of word-synced LRC format,
+ # but such format were not planned to be supported in the first place, so…
+ out.append("".join(tags) + line[pos:].lstrip())
+ else:
+ out.append(line)
+ # Empty lines with no tags are also preserved
+ return "\n".join(out)
+
+
+def _apply_offset(text: str) -> str:
+ """Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
+
+ Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
+ """
+ m = _OFFSET_RE.search(text)
+ if not m:
+ return text
+ offset_ms = int(m.group(1))
+ text = _OFFSET_RE.sub("", text).strip("\n")
+ if offset_ms == 0:
+ return text
+
+ def _shift(match: re.Match) -> str:
+ total_ms = max(
+ 0,
+ (int(match.group(1)) * 60 + int(match.group(2))) * 1000
+ + int(match.group(3)) * 10
+ - offset_ms,
+ )
+ new_mm = total_ms // 60000
+ new_ss = (total_ms % 60000) // 1000
+ new_cs = min(round((total_ms % 1000) / 10), 99)
+ return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
+
+ return _STD_TAG_CAPTURE_RE.sub(_shift, text)
+
+
+def normalize_tags(text: str) -> str:
+ """Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset."""
+ return _apply_offset(_reformat(text))
+
+
+def is_synced(text: str) -> bool:
+ """Check whether text contains non-zero LRC time tags.
+
+ Assumes text has been normalized by normalize_tags (standard [mm:ss.cc] format).
+ """
+ tags = _STD_TAG_RE.findall(text)
+ return bool(tags) and any(tag != "[00:00.00]" for tag in tags)
+
+
+def detect_sync_status(text: str) -> CacheStatus:
+ """Determine whether lyrics contain meaningful LRC time tags.
+
+ Assumes text has been normalized by normalize_tags.
+ """
+ return (
+ CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED
+ )
+
+
+def normalize_unsynced(lyrics: str) -> str:
+ """Normalize unsynced lyrics so every line has a [00:00.00] tag.
+
+ - Lines that already have time tags: replace with [00:00.00]
+ - Lines without time tags: prepend [00:00.00]
+ - Blank lines are converted to [00:00.00]
+ """
+ out: list[str] = []
+ for line in lyrics.splitlines():
+ stripped = line.strip()
+ if not stripped:
+ out.append("[00:00.00]")
+ continue
+ cleaned = _LRC_LINE_RE.sub("", stripped)
+ while _LRC_LINE_RE.match(cleaned):
+ cleaned = _LRC_LINE_RE.sub("", cleaned)
+ out.append(f"[00:00.00]{cleaned}")
+ return "\n".join(out)
+
+
+def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
+ """Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist."""
+ if not audio_url.startswith("file://"):
+ return None
+ file_path = unquote(audio_url.replace("file://", "", 1))
+ path = Path(file_path)
+ if ensure_exists and not path.exists():
+ return None
+ return path
+
+
+def get_sidecar_path(
+ audio_url: str, ensure_audio_exists: bool = False, ensure_exists: bool = False
+) -> Optional[Path]:
+ """Given a file:// URL, return the corresponding .lrc sidecar path.
+
+ If ensure_audio_exists is True, return None if the audio file does not exist.
+ If ensure_exists is True, return None if the .lrc file does not exist.
+ """
+ audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists)
+ if not audio_path:
+ return None
+ lrc_path = audio_path.with_suffix(".lrc")
+ if ensure_exists and not lrc_path.exists():
+ return None
+ return lrc_path
diff --git a/lrx/models.py b/lrx/models.py
new file mode 100644
index 0000000..775922a
--- /dev/null
+++ b/lrx/models.py
@@ -0,0 +1,59 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 04:09:36
+Description: Data models
+"""
+
+from enum import Enum
+from typing import Optional
+from dataclasses import dataclass
+
+
+class CacheStatus(str, Enum):
+ """Status of a cached lyric entry."""
+
+ SUCCESS_SYNCED = "SUCCESS_SYNCED"
+ SUCCESS_UNSYNCED = "SUCCESS_UNSYNCED"
+ NOT_FOUND = "NOT_FOUND"
+ NETWORK_ERROR = "NETWORK_ERROR"
+
+
+@dataclass
+class TrackMeta:
+ """Metadata describing a track obtained from MPRIS or manual input."""
+
+ trackid: Optional[str] = None # Spotify track ID (without "spotify:track:" prefix)
+ length: Optional[int] = None # Duration in milliseconds
+ album: Optional[str] = None
+ artist: Optional[str] = None
+ title: Optional[str] = None
+ url: Optional[str] = None # Playback URL (file:// for local files)
+
+ @property
+ def is_local(self) -> bool:
+ """True when the track is a local file (file:// URL)."""
+ return bool(self.url and self.url.startswith("file://"))
+
+ @property
+ def is_complete(self) -> bool:
+ """True when all fields required by LRCLIB are present."""
+ return all([self.length, self.album, self.title, self.artist])
+
+ def display_name(self) -> str:
+ """Human-readable representation for logging."""
+ parts = []
+ if self.artist:
+ parts.append(self.artist)
+ if self.title:
+ parts.append(self.title)
+ return " - ".join(parts) if parts else self.trackid or self.url or "(unknown)"
+
+
+@dataclass
+class LyricResult:
+ """Result of a lyric fetch attempt, also used as cache record."""
+
+ status: CacheStatus
+ lyrics: Optional[str] = None
+ source: Optional[str] = None # Which fetcher produced this result
+ ttl: Optional[int] = None # Hint for cache TTL (seconds)
diff --git a/lrx/mpris.py b/lrx/mpris.py
new file mode 100644
index 0000000..c8a2c17
--- /dev/null
+++ b/lrx/mpris.py
@@ -0,0 +1,188 @@
+"""
+Author: Uyanide pywang0608@foxmail.com
+Date: 2026-03-25 04:44:15
+Description: MPRIS integration for fetching track metadata
+"""
+
+import asyncio
+from dbus_next.aio.message_bus import MessageBus
+from dbus_next.constants import BusType
+from dbus_next.message import Message
+from lrx.models import TrackMeta
+from lrx.config import PREFERRED_PLAYER
+from loguru import logger
+from typing import Optional, List, Any
+
+
+async def _list_mpris_players(bus: MessageBus) -> List[str]:
+ """List all MPRIS player bus names."""
+ try:
+ reply = await bus.call(
+ Message(
+ destination="org.freedesktop.DBus",
+ path="/org/freedesktop/DBus",
+ interface="org.freedesktop.DBus",
+ member="ListNames",
+ )
+ )
+ if not reply or not reply.body:
+ return []
+ return [
+ name for name in reply.body[0] if name.startswith("org.mpris.MediaPlayer2.")
+ ]
+ except Exception as e:
+ logger.error(f"Failed to list DBus names: {e}")
+ return []
+
+
+async def _get_playback_status(bus: MessageBus, player_name: str) -> Optional[str]:
+ """Get PlaybackStatus ('Playing', 'Paused', 'Stopped') for a player."""
+ try:
+ introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
+ proxy = bus.get_proxy_object(
+ player_name, "/org/mpris/MediaPlayer2", introspection
+ )
+ props = proxy.get_interface("org.freedesktop.DBus.Properties")
+ status_var = await getattr(props, "call_get")(
+ "org.mpris.MediaPlayer2.Player", "PlaybackStatus"
+ )
+ return status_var.value if status_var else None
+ except Exception as e:
+ logger.debug(f"Could not get playback status for {player_name}: {e}")
+ return None
+
+
+async def _select_player(
+ bus: MessageBus, specific_player: Optional[str] = None
+) -> Optional[str]:
+ """Select the best MPRIS player.
+
+ When specific_player is given, filter by name match.
+ Otherwise: prefer the currently playing player. If multiple are playing,
+ prefer the one matching LRCFETCH_PLAYER env var (default: spotify).
+ """
+ players = await _list_mpris_players(bus)
+ if not players:
+ return None
+
+ if specific_player:
+ players = [p for p in players if specific_player.lower() in p.lower()]
+ return players[0] if players else None
+
+ # Check playback status for each player
+ playing = []
+ for p in players:
+ status = await _get_playback_status(bus, p)
+ logger.debug(f"Player {p}: {status}")
+ if status == "Playing":
+ playing.append(p)
+
+ candidates = playing if playing else players
+
+ if len(candidates) == 1:
+ return candidates[0]
+
+ # Multiple candidates: prefer LRCFETCH_PLAYER
+ preferred = PREFERRED_PLAYER.lower()
+ if preferred:
+ for p in candidates:
+ if preferred in p.lower():
+ return p
+ return candidates[0]
+
+
+async def _fetch_metadata_dbus(
+ specific_player: Optional[str] = None,
+) -> Optional[TrackMeta]:
+ bus = None
+ try:
+ bus = await MessageBus(bus_type=BusType.SESSION).connect()
+ except Exception as e:
+ logger.error(f"Failed to connect to DBus: {e}")
+ return None
+
+ try:
+ player_name = await _select_player(bus, specific_player)
+ if not player_name:
+ logger.debug(
+ f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}."
+ )
+ return None
+
+ logger.debug(f"Using player: {player_name}")
+
+ introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
+ proxy = bus.get_proxy_object(
+ player_name, "/org/mpris/MediaPlayer2", introspection
+ )
+
+ props_iface = proxy.get_interface("org.freedesktop.DBus.Properties")
+ if not props_iface:
+ logger.error(f"Player {player_name} doesn't support Properties interface.")
+ return None
+
+ try:
+ metadata_var: Any = await getattr(props_iface, "call_get")(
+ "org.mpris.MediaPlayer2.Player", "Metadata"
+ )
+ if not metadata_var:
+ logger.error("Empty metadata received.")
+ return None
+
+ metadata = metadata_var.value
+
+ # Extract trackid — MPRIS returns either "spotify:track:ID"
+ # or a DBus object path like "/com/spotify/track/ID"
+ trackid = metadata.get("mpris:trackid", None)
+ if trackid:
+ trackid = trackid.value
+ if isinstance(trackid, str):
+ if trackid.startswith("spotify:track:"):
+ trackid = trackid.removeprefix("spotify:track:")
+ elif trackid.startswith("/com/spotify/track/"):
+ trackid = trackid.removeprefix("/com/spotify/track/")
+
+ # Extract length (usually microseconds)
+ length = metadata.get("mpris:length", None)
+ if length:
+ length = length.value // 1000 if isinstance(length.value, int) else None
+
+ album = metadata.get("xesam:album", None)
+ album = album.value if album else None
+
+ artist = metadata.get("xesam:artist", None)
+ artist = (
+ artist.value[0]
+ if artist and isinstance(artist.value, list) and artist.value
+ else None
+ )
+
+ title = metadata.get("xesam:title", None)
+ title = title.value if title else None
+
+ url = metadata.get("xesam:url", None)
+ url = url.value if url else None
+
+ return TrackMeta(
+ trackid=trackid,
+ length=length,
+ album=album,
+ artist=artist,
+ title=title,
+ url=url,
+ )
+ except Exception as e:
+ logger.error(f"Failed to get properties from {player_name}: {e}")
+ return None
+
+ finally:
+ if bus:
+ bus.disconnect()
+
+
+def get_current_track(player_name: Optional[str] = None) -> Optional[TrackMeta]:
+ try:
+ return asyncio.run(_fetch_metadata_dbus(player_name))
+ except Exception as e:
+ logger.error(f"DBus async loop failed: {e}")
+ return None
diff --git a/main.py b/main.py
index a2fdeab..1407b6f 100644
--- a/main.py
+++ b/main.py
@@ -1,4 +1,4 @@
-from lrcfetch.cli import run
+from lrx.cli import run
if __name__ == "__main__":
run()
diff --git a/pyproject.toml b/pyproject.toml
index bb8238d..f61f550 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -3,7 +3,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
-name = "lrcfetch"
+name = "lrx-cli"
version = "0.1.7"
description = "Fetch line-synced lyrics for your music player."
readme = "README.md"
@@ -19,7 +19,7 @@ dependencies = [
]
[project.scripts]
-lrcfetch = "lrcfetch.cli:run"
+lrx = "lrx.cli:run"
[tool.ruff.lint]
ignore = ["E402"]