diff --git a/lrcfetch/__init__.py b/lrcfetch/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/lrcfetch/__main__.py b/lrcfetch/__main__.py
deleted file mode 100644
index a2fdeab..0000000
--- a/lrcfetch/__main__.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from lrcfetch.cli import run
-
-if __name__ == "__main__":
- run()
diff --git a/lrcfetch/cache.py b/lrcfetch/cache.py
deleted file mode 100644
index 7e16fc3..0000000
--- a/lrcfetch/cache.py
+++ /dev/null
@@ -1,441 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 10:18:03
-Description: SQLite-based lyric cache with per-source storage and TTL expiration
-"""
-
-import re
-import sqlite3
-import hashlib
-import time
-import unicodedata
-from typing import Optional
-from loguru import logger
-
-from .config import DB_PATH, DURATION_TOLERANCE_MS
-from .models import TrackMeta, LyricResult, CacheStatus
-
-# Punctuation to strip for fuzzy matching (ASCII + fullwidth + CJK brackets/symbols)
-_PUNCT_RE = re.compile(
- r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`"
- r"~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`"
- r"「」『』《》〈〉〔〕·•‥…—–]"
-)
-_SPACE_RE = re.compile(r"\s+")
-# feat./ft./featuring and everything after (case-insensitive, word boundary)
-_FEAT_RE = re.compile(r"\s*(?:\bfeat\.?\b|\bft\.?\b|\bfeaturing\b).*", re.IGNORECASE)
-# Multi-artist separators: /, &, ×, x (surrounded by spaces), ;, 、, vs.
-_ARTIST_SEP_RE = re.compile(r"\s*(?:[/&;×、]|\bvs\.?\b|\bx\b)\s*", re.IGNORECASE)
-
-
-def _normalize_for_match(s: str) -> str:
- """Normalize a string for fuzzy comparison.
-
- Lowercases, NFKC-normalizes (fullwidth → halfwidth), strips punctuation,
- and collapses whitespace.
- """
- s = unicodedata.normalize("NFKC", s).lower()
- s = _FEAT_RE.sub("", s)
- s = _PUNCT_RE.sub(" ", s)
- s = _SPACE_RE.sub(" ", s).strip()
- return s
-
-
-def _normalize_artist(s: str) -> str:
- """Normalize an artist string: split by separators, normalize each, sort.
-
- Splits first (on /, &, ;, ×, 、, vs., x), then strips feat./ft./featuring
- from each part individually, so 'A feat. C / B' → ['a', 'b'] not just ['a'].
- """
- s = unicodedata.normalize("NFKC", s).lower()
- parts = _ARTIST_SEP_RE.split(s)
- normed = sorted(
- {_normalize_for_match(p) for p in parts if _FEAT_RE.sub("", p).strip()}
- )
- return "\0".join(normed) if normed else _normalize_for_match(s)
-
-
-def _generate_key(track: TrackMeta, source: str) -> str:
- """Generate a unique cache key from track metadata and source.
-
- The key is scoped by source so that different fetchers can cache
- independently for the same track (e.g. Spotify synced vs Netease unsynced).
- """
- # Spotify tracks always use their track ID as the primary identifier
- if track.trackid and source == "spotify":
- return f"spotify:{track.trackid}"
-
- parts = []
- if track.artist:
- parts.append(track.artist)
- if track.title:
- parts.append(track.title)
- if track.album:
- parts.append(track.album)
- if track.length:
- parts.append(str(track.length))
-
- # Fall back to URL for local files
- if not parts and track.url:
- return f"{source}:url:{track.url}"
-
- if not parts:
- raise ValueError("Insufficient metadata to generate cache key")
-
- raw = "|".join(parts)
- digest = hashlib.sha256(raw.encode()).hexdigest()
- return f"{source}:{digest}"
-
-
-class CacheEngine:
- def __init__(self, db_path: str = DB_PATH):
- self.db_path = db_path
- self._init_db()
-
- def _init_db(self) -> None:
- """Create or migrate the cache table."""
- with sqlite3.connect(self.db_path) as conn:
- conn.execute("""
- CREATE TABLE IF NOT EXISTS cache (
- key TEXT PRIMARY KEY,
- source TEXT NOT NULL,
- status TEXT NOT NULL,
- lyrics TEXT,
- created_at INTEGER NOT NULL,
- expires_at INTEGER,
- artist TEXT,
- title TEXT,
- album TEXT,
- length INTEGER
- )
- """)
- # Migration: add length column if missing
- cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
- if "length" not in cols:
- conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER")
- conn.commit()
-
- # Read
-
- def get(self, track: TrackMeta, source: str) -> Optional[LyricResult]:
- """Look up a cached result for *track* from *source*.
-
- Returns None on cache miss or expiration.
- """
- try:
- key = _generate_key(track, source)
- except ValueError:
- return None
-
- with sqlite3.connect(self.db_path) as conn:
- row = conn.execute(
- "SELECT status, lyrics, source, expires_at, length FROM cache WHERE key = ?",
- (key,),
- ).fetchone()
-
- if not row:
- logger.debug(f"Cache miss: {source} / {track.display_name()}")
- return None
-
- status_str, lyrics, src, expires_at, cached_length = row
-
- # Check TTL expiration
- if expires_at and expires_at < int(time.time()):
- logger.debug(f"Cache expired: {source} / {track.display_name()}")
- conn.execute("DELETE FROM cache WHERE key = ?", (key,))
- conn.commit()
- return None
-
- # Backfill length if the cached row is missing it
- if cached_length is None and track.length is not None:
- conn.execute(
- "UPDATE cache SET length = ? WHERE key = ?",
- (track.length, key),
- )
- conn.commit()
-
- remaining = expires_at - int(time.time()) if expires_at else None
- logger.debug(
- f"Cache hit: {source} / {track.display_name()} "
- f"[{status_str}, ttl={remaining}s]"
- )
- return LyricResult(
- status=CacheStatus(status_str),
- lyrics=lyrics,
- source=src,
- ttl=remaining,
- )
-
- def get_best(self, track: TrackMeta, sources: list[str]) -> Optional[LyricResult]:
- """Return the best cached result across *sources* (synced > unsynced).
-
- Skips negative statuses (NOT_FOUND, NETWORK_ERROR) — those are only
- consulted per-source to avoid redundant fetches.
- """
- best: Optional[LyricResult] = None
- for src in sources:
- cached = self.get(track, src)
- if not cached:
- continue
- if cached.status == CacheStatus.SUCCESS_SYNCED:
- return cached # Can't do better
- if cached.status == CacheStatus.SUCCESS_UNSYNCED and best is None:
- best = cached
- return best
-
- # Write
-
- def set(
- self,
- track: TrackMeta,
- source: str,
- result: LyricResult,
- ttl_seconds: Optional[int] = None,
- ) -> None:
- """Store a lyric result in the cache."""
- try:
- key = _generate_key(track, source)
- except ValueError:
- logger.warning("Cannot cache: insufficient track metadata.")
- return
-
- now = int(time.time())
- expires_at = now + ttl_seconds if ttl_seconds else None
-
- with sqlite3.connect(self.db_path) as conn:
- conn.execute(
- """INSERT OR REPLACE INTO cache
- (key, source, status, lyrics, created_at, expires_at,
- artist, title, album, length)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
- (
- key,
- source,
- result.status.value,
- result.lyrics,
- now,
- expires_at,
- track.artist,
- track.title,
- track.album,
- track.length,
- ),
- )
- conn.commit()
- logger.debug(
- f"Cached: {source} / {track.display_name()} "
- f"[{result.status.value}, ttl={ttl_seconds}s]"
- )
-
- # Delete
-
- def clear_all(self) -> None:
- """Remove every entry from the cache."""
- with sqlite3.connect(self.db_path) as conn:
- conn.execute("DELETE FROM cache")
- conn.commit()
- logger.info("Cache cleared.")
-
- def clear_track(self, track: TrackMeta) -> None:
- """Remove all cached entries (every source) for a single track."""
- conditions, params = self._track_where(track)
- if not conditions:
- logger.info(f"No cache entries found for {track.display_name()}.")
- return
- where = " AND ".join(conditions)
- with sqlite3.connect(self.db_path) as conn:
- cur = conn.execute(f"DELETE FROM cache WHERE {where}", params)
- conn.commit()
- if cur.rowcount:
- logger.info(
- f"Cleared {cur.rowcount} cache entries for {track.display_name()}."
- )
- else:
- logger.info(f"No cache entries found for {track.display_name()}.")
-
- def prune(self) -> int:
- """Remove all expired entries. Returns the number of rows deleted."""
- with sqlite3.connect(self.db_path) as conn:
- cur = conn.execute(
- "DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
- (int(time.time()),),
- )
- conn.commit()
- count = cur.rowcount
- logger.info(f"Pruned {count} expired cache entries.")
- return count
-
- @staticmethod
- def _track_where(track: TrackMeta) -> tuple[list[str], list[str]]:
- """Build WHERE conditions to match a track across all sources."""
- conditions: list[str] = []
- params: list[str] = []
- if track.artist:
- conditions.append("artist = ?")
- params.append(track.artist)
- if track.title:
- conditions.append("title = ?")
- params.append(track.title)
- if track.album:
- conditions.append("album = ?")
- params.append(track.album)
- return conditions, params
-
- # Exact cross-source search
-
- def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]:
- """Find the best positive (synced/unsynced) cache entry for *track*.
-
- Uses exact metadata match (artist + title + album) across all sources.
- Returns synced if available, otherwise unsynced, or None.
- """
- conditions, params = self._track_where(track)
- if not conditions:
- return None
-
- now = int(time.time())
- conditions.append("status IN (?, ?)")
- params.extend(
- [CacheStatus.SUCCESS_SYNCED.value, CacheStatus.SUCCESS_UNSYNCED.value]
- )
- conditions.append("(expires_at IS NULL OR expires_at > ?)")
- params.append(str(now))
-
- where = " AND ".join(conditions)
- with sqlite3.connect(self.db_path) as conn:
- conn.row_factory = sqlite3.Row
- rows = conn.execute(
- f"SELECT status, lyrics, source FROM cache WHERE {where} "
- "ORDER BY CASE status WHEN ? THEN 0 ELSE 1 END LIMIT 1",
- params + [CacheStatus.SUCCESS_SYNCED.value],
- ).fetchall()
-
- if not rows:
- return None
-
- row = dict(rows[0])
- return LyricResult(
- status=CacheStatus(row["status"]),
- lyrics=row["lyrics"],
- source="cache-search",
- )
-
- # Fuzzy search
-
- def search_by_meta(
- self,
- artist: Optional[str],
- title: Optional[str],
- length: Optional[int] = None,
- ) -> list[dict]:
- """Search cache for lyrics matching artist/title with fuzzy normalization.
-
- Ignores album and source. Only returns positive results (synced/unsynced)
- that have not expired. When *length* is provided, filters by duration
- tolerance and sorts by closest match.
- """
- if not title:
- return []
-
- now = int(time.time())
- with sqlite3.connect(self.db_path) as conn:
- conn.row_factory = sqlite3.Row
- rows = conn.execute(
- """SELECT * FROM cache
- WHERE status IN (?, ?)
- AND (expires_at IS NULL OR expires_at > ?)""",
- (
- CacheStatus.SUCCESS_SYNCED.value,
- CacheStatus.SUCCESS_UNSYNCED.value,
- now,
- ),
- ).fetchall()
-
- norm_title = _normalize_for_match(title)
- norm_artist = _normalize_artist(artist) if artist else None
-
- matches: list[dict] = []
- for row in rows:
- row_dict = dict(row)
- # Title must match
- row_title = row_dict.get("title") or ""
- if _normalize_for_match(row_title) != norm_title:
- continue
- # Artist must match if provided
- if norm_artist:
- row_artist = row_dict.get("artist") or ""
- if _normalize_artist(row_artist) != norm_artist:
- continue
- matches.append(row_dict)
-
- # Duration filtering
- if length is not None and matches:
- scored = []
- for m in matches:
- row_len = m.get("length")
- if row_len is not None:
- diff = abs(row_len - length)
- if diff <= DURATION_TOLERANCE_MS:
- scored.append((diff, m))
- else:
- # No duration info in cache — still a candidate but lower priority
- scored.append((DURATION_TOLERANCE_MS, m))
- scored.sort(
- key=lambda x: (
- x[0],
- x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value,
- )
- )
- matches = [m for _, m in scored]
-
- return matches
-
- # Query / inspect
-
- def query_track(self, track: TrackMeta) -> list[dict]:
- """Return all cached rows for a given track (across all sources)."""
- conditions, params = self._track_where(track)
- if not conditions:
- return []
- where = " AND ".join(conditions)
- with sqlite3.connect(self.db_path) as conn:
- conn.row_factory = sqlite3.Row
- return [
- dict(r)
- for r in conn.execute(
- f"SELECT * FROM cache WHERE {where}", params
- ).fetchall()
- ]
-
- def query_all(self) -> list[dict]:
- """Return every row in the cache table."""
- with sqlite3.connect(self.db_path) as conn:
- conn.row_factory = sqlite3.Row
- return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
-
- def stats(self) -> dict:
- """Return aggregate cache statistics."""
- now = int(time.time())
- with sqlite3.connect(self.db_path) as conn:
- total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
- expired = conn.execute(
- "SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
- (now,),
- ).fetchone()[0]
- by_status = dict(
- conn.execute(
- "SELECT status, COUNT(*) FROM cache GROUP BY status"
- ).fetchall()
- )
- by_source = dict(
- conn.execute(
- "SELECT source, COUNT(*) FROM cache GROUP BY source"
- ).fetchall()
- )
- return {
- "total": total,
- "expired": expired,
- "active": total - expired,
- "by_status": by_status,
- "by_source": by_source,
- }
diff --git a/lrcfetch/cli.py b/lrcfetch/cli.py
deleted file mode 100644
index 48d7640..0000000
--- a/lrcfetch/cli.py
+++ /dev/null
@@ -1,426 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-26 02:04:39
-Description: CLI interface
-"""
-
-import sys
-import time
-import os
-from pathlib import Path
-from typing import Annotated
-from urllib.parse import quote
-import cyclopts
-from loguru import logger
-
-from .config import enable_debug
-from .models import TrackMeta, CacheStatus
-from .mpris import get_current_track
-from .core import LrcManager
-from .fetchers import FetcherMethodType
-from .lrc import get_sidecar_path
-
-
-app = cyclopts.App(
- help="LRCFetch — Fetch line-synced lyrics for your music player.",
-)
-app.register_install_completion_command()
-
-cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.")
-app.command(cache_app)
-
-manager = LrcManager()
-
-# Global state set by the meta launcher
-_player: str | None = None
-
-
-@app.meta.default
-def launcher(
- *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
- debug: Annotated[
- bool,
- cyclopts.Parameter(
- name=["--debug", "-d"], negative="", help="Enable debug logging."
- ),
- ] = False,
- player: Annotated[
- str | None,
- cyclopts.Parameter(
- name=["--player", "-p"],
- help="Target a specific MPRIS player using its DBus name or a portion thereof.",
- ),
- ] = None,
-):
- global _player
- if debug:
- enable_debug()
- _player = player
- app(tokens)
-
-
-# fetch
-
-
-@app.command
-def fetch(
- *,
- method: Annotated[
- FetcherMethodType | None,
- cyclopts.Parameter(help="Force a specific source."),
- ] = None,
- no_cache: Annotated[
- bool,
- cyclopts.Parameter(
- name="--no-cache", negative="", help="Bypass the cache for this request."
- ),
- ] = False,
- only_synced: Annotated[
- bool,
- cyclopts.Parameter(
- name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
- ),
- ] = False,
-):
- """Fetch and print lyrics for the currently playing track."""
- track = get_current_track(_player)
-
- if not track:
- logger.error("No active playing track found.")
- sys.exit(1)
-
- logger.info(f"Track: {track.display_name()}")
-
- result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
-
- if not result or not result.lyrics:
- logger.error("No lyrics found.")
- sys.exit(1)
-
- if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
- logger.error("Only unsynced lyrics available (--only-synced requested).")
- sys.exit(1)
-
- print(result.lyrics)
-
-
-# search
-
-
-@app.command
-def search(
- *,
- title: Annotated[
- str | None, cyclopts.Parameter(name=["--title", "-t"], help="Track title.")
- ] = None,
- artist: Annotated[
- str | None, cyclopts.Parameter(name=["--artist", "-a"], help="Artist name.")
- ] = None,
- album: Annotated[str | None, cyclopts.Parameter(help="Album name.")] = None,
- trackid: Annotated[str | None, cyclopts.Parameter(help="Spotify track ID.")] = None,
- length: Annotated[
- int | None,
- cyclopts.Parameter(
- name=["--length", "-l"], help="Track duration in milliseconds."
- ),
- ] = None,
- url: Annotated[
- str | None,
- cyclopts.Parameter(
- help="Local file URL (file:///...). Mutually exclusive with --path."
- ),
- ] = None,
- path: Annotated[
- str | None,
- cyclopts.Parameter(
- name=["--path"],
- help="Local audio file path. Mutually exclusive with --url.",
- ),
- ] = None,
- method: Annotated[
- FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
- ] = None,
- no_cache: Annotated[
- bool,
- cyclopts.Parameter(
- name="--no-cache", negative="", help="Bypass the cache for this request."
- ),
- ] = False,
- only_synced: Annotated[
- bool,
- cyclopts.Parameter(
- name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
- ),
- ] = False,
-):
- """Search for lyrics by metadata (bypasses MPRIS)."""
- if url and path:
- logger.error("--url and --path are mutually exclusive.")
- sys.exit(1)
-
- if path:
- resolved = str(Path(path).resolve())
- url = "file://" + quote(resolved, safe="/")
-
- track = TrackMeta(
- title=title,
- artist=artist,
- album=album,
- trackid=trackid,
- length=length,
- url=url,
- )
-
- logger.info(f"Track: {track.display_name()}")
-
- result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
-
- if not result or not result.lyrics:
- logger.error("No lyrics found.")
- sys.exit(1)
-
- if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
- logger.error("Only unsynced lyrics available (--only-synced requested).")
- sys.exit(1)
-
- print(result.lyrics)
-
-
-# export
-
-
-@app.command
-def export(
- *,
- output: Annotated[
- str | None,
- cyclopts.Parameter(
- name=["--output", "-o"],
- help="Output file path (default: same directory as audio file with .lrc extension, or current directory if not available).",
- ),
- ] = None,
- method: Annotated[
- FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
- ] = None,
- no_cache: Annotated[
- bool, cyclopts.Parameter(name="--no-cache", negative="", help="Bypass cache.")
- ] = False,
- overwrite: Annotated[
- bool,
- cyclopts.Parameter(
- name=["--overwrite", "-f"], negative="", help="Overwrite existing file."
- ),
- ] = False,
-):
- """Export lyrics of the current track to a .lrc file."""
- track = get_current_track(_player)
- if not track:
- logger.error("No active playing track found.")
- sys.exit(1)
-
- result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
- if not result or not result.lyrics:
- logger.error("No lyrics available to export.")
- sys.exit(1)
-
- # Build default output path
- if not output:
- if track.url:
- lrc_path = get_sidecar_path(track.url, ensure_exists=False)
- if lrc_path:
- output = str(lrc_path)
- logger.info(f"Exporting to sidecar path: {output}")
-
- # Fallback to current directory with sanitized filename
- if not output:
- filename = (
- f"{track.artist} - {track.title}.lrc"
- if track.artist and track.title
- else "lyrics.lrc"
- )
- # Sanitize filename
- filename = "".join(
- c for c in filename if c.isalpha() or c.isdigit() or c in " -_."
- ).rstrip()
- output = os.path.join(os.getcwd(), filename)
-
- if os.path.exists(output) and not overwrite:
- logger.error(f"File exists: {output} (use -f to overwrite)")
- sys.exit(1)
-
- try:
- with open(output, "w", encoding="utf-8") as f:
- f.write(result.lyrics)
- logger.info(f"Exported lyrics to {output}")
- except Exception as e:
- logger.error(f"Failed to write file: {e}")
- sys.exit(1)
-
-
-# cache subcommands
-
-
-@cache_app.command
-def query(
- *,
- all: Annotated[
- bool,
- cyclopts.Parameter(name="--all", negative="", help="Dump all cache entries."),
- ] = False,
-):
- """Show cached entries for the current track."""
- if all:
- rows = manager.cache.query_all()
- if not rows:
- print("Cache is empty.")
- return
- for row in rows:
- _print_cache_row(row)
- print()
- return
-
- track = get_current_track(_player)
- if not track:
- logger.error("No active playing track found.")
- sys.exit(1)
- _print_track_cache(track)
-
-
-@cache_app.command
-def clear(
- *,
- all: Annotated[
- bool,
- cyclopts.Parameter(name="--all", negative="", help="Clear the entire cache."),
- ] = False,
-):
- """Clear cached entries for the current track."""
- if all:
- manager.cache.clear_all()
- return
-
- track = get_current_track(_player)
- if not track:
- logger.error("No active playing track found.")
- sys.exit(1)
- manager.cache.clear_track(track)
-
-
-@cache_app.command
-def prune():
- """Remove expired cache entries."""
- manager.cache.prune()
-
-
-@cache_app.command
-def stats():
- """Show cache statistics."""
- s = manager.cache.stats()
- print("=== Cache Statistics ===")
- print(f"Total entries : {s['total']}")
- print(f"Active : {s['active']}")
- print(f"Expired : {s['expired']}")
- if s["by_status"]:
- print("\nBy status:")
- for status, count in s["by_status"].items():
- print(f" {status}: {count}")
- if s["by_source"]:
- print("\nBy source:")
- for source, count in s["by_source"].items():
- print(f" {source}: {count}")
-
-
-@cache_app.command
-def insert(
- *,
- path: Annotated[
- str | None,
- cyclopts.Parameter(
- name=["--path"],
- help="Path to a local .lrc file to insert instead of reading from stdin.",
- ),
- ] = None,
-):
- """Manually insert lyrics into the cache for the current track."""
- track = get_current_track(_player)
- if not track:
- logger.error("No active playing track found.")
- sys.exit(1)
-
- if path:
- try:
- with open(path, "r", encoding="utf-8") as f:
- lyrics = f.read()
- except Exception as e:
- logger.error(f"Failed to read file: {e}")
- sys.exit(1)
- else:
- logger.info("Reading lyrics from stdin (Ctrl+D to finish)...")
- lyrics = sys.stdin.read()
-
- manager.manual_insert(track, lyrics)
-
-
-# helpers
-
-
-def _print_track_cache(track: TrackMeta) -> None:
- """Print all cached entries for a given track."""
- print(f"Track: {track.display_name()}")
- if track.album:
- print(f"Album: {track.album}")
- if track.length:
- secs = track.length / 1000.0
- print(f"Duration: {int(secs // 60)}:{secs % 60:05.2f}")
- print()
-
- rows = manager.cache.query_track(track)
- if not rows:
- print(" (no cache entries)")
- return
-
- for row in rows:
- _print_cache_row(row, indent=" ")
-
-
-def _print_cache_row(row: dict, indent: str = "") -> None:
- """Pretty-print a single cache row."""
- now = int(time.time())
- source = row.get("source", "?")
- status = row.get("status", "?")
- artist = row.get("artist", "")
- title = row.get("title", "")
- album = row.get("album", "")
- created = row.get("created_at", 0)
- expires = row.get("expires_at")
- lyrics = row.get("lyrics", "")
-
- name = f"{artist} - {title}" if artist and title else row.get("key", "?")
- print(f"{indent}[{source}] {name}")
- if album:
- print(f"{indent} Album : {album}")
- print(f"{indent} Status : {status}")
- if created:
- age = now - created
- print(f"{indent} Cached : {age // 3600}h {(age % 3600) // 60}m ago")
- if expires:
- remaining = expires - now
- if remaining > 0:
- print(
- f"{indent} Expires : in {remaining // 3600}h {(remaining % 3600) // 60}m"
- )
- else:
- print(f"{indent} Expires : EXPIRED")
- else:
- print(f"{indent} Expires : never")
- if lyrics:
- line_count = len(lyrics.splitlines())
- print(f"{indent} Lyrics : {line_count} lines")
-
-
-def run():
- app.meta()
-
-
-if __name__ == "__main__":
- run()
diff --git a/lrcfetch/config.py b/lrcfetch/config.py
deleted file mode 100644
index 5f0f5b8..0000000
--- a/lrcfetch/config.py
+++ /dev/null
@@ -1,88 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 10:17:56
-Description: Global configuration constants and logger setup
-"""
-
-import os
-import sys
-from pathlib import Path
-from platformdirs import user_cache_dir, user_config_dir
-from dotenv import load_dotenv
-from loguru import logger
-from importlib.metadata import version
-
-# Application
-APP_NAME = "lrcfetch"
-APP_AUTHOR = "Uyanide"
-APP_VERSION = version(APP_NAME)
-
-# Paths
-CACHE_DIR = user_cache_dir(APP_NAME, APP_AUTHOR)
-DB_PATH = os.path.join(CACHE_DIR, "cache.db")
-
-# .env loading
-_config_env = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / ".env"
-load_dotenv(_config_env) # ~/.config/lrcfetch/.env
-load_dotenv() # .env in cwd (does NOT override existing vars)
-
-# HTTP
-HTTP_TIMEOUT = 10.0
-
-# Cache TTLs (seconds)
-TTL_SYNCED = None # never expires
-TTL_UNSYNCED = 86400 # 1 day
-TTL_NOT_FOUND = 86400 * 3 # 3 days
-TTL_NETWORK_ERROR = 3600 # 1 hour
-
-# Search
-DURATION_TOLERANCE_MS = 3000 # max duration mismatch for search matching
-
-# Spotify related
-SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token"
-SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
-SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time"
-SPOTIFY_SECRET_URL = (
- "https://raw.githubusercontent.com/xyloflake/spot-secrets-go"
- "/refs/heads/main/secrets/secrets.json"
-)
-SPOTIFY_SP_DC = os.environ.get("SPOTIFY_SP_DC", "")
-SPOTIFY_TOKEN_CACHE_FILE = os.path.join(CACHE_DIR, "spotify_token.json")
-SPOTIFY_APP_VERSION = "1.2.87.284.g3ff41c13"
-
-# Netease api
-NETEASE_SEARCH_URL = "https://music.163.com/api/cloudsearch/pc"
-NETEASE_LYRIC_URL = "https://interface3.music.163.com/api/song/lyric"
-
-# LRCLIB api
-LRCLIB_API_URL = "https://lrclib.net/api/get"
-LRCLIB_SEARCH_URL = "https://lrclib.net/api/search"
-
-# QQ Music API (self-hosted proxy)
-QQ_MUSIC_API_URL = os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/")
-
-# Player preference (used when multiple MPRIS players are active)
-PREFERRED_PLAYER = os.environ.get("LRCFETCH_PLAYER", "spotify")
-
-# User-Agents
-UA_BROWSER = "Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0"
-UA_LRCFETCH = f"LRCFetch {APP_VERSION} (https://github.com/Uyanide/lrcfetch)"
-
-os.makedirs(CACHE_DIR, exist_ok=True)
-
-# Logger
-_LOG_FORMAT = (
- "{time:YYYY-MM-DD HH:mm:ss} | "
- "{level: <8} | "
- "{name}:{function}:{line} - "
- "{message}"
-)
-
-logger.remove()
-logger.add(sys.stderr, format=_LOG_FORMAT, level="INFO")
-
-
-def enable_debug() -> None:
- """Switch logger to DEBUG level."""
- logger.remove()
- logger.add(sys.stderr, format=_LOG_FORMAT, level="DEBUG")
diff --git a/lrcfetch/core.py b/lrcfetch/core.py
deleted file mode 100644
index 04ba9ea..0000000
--- a/lrcfetch/core.py
+++ /dev/null
@@ -1,178 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 11:09:53
-Description: Core orchestrator — coordinates fetchers with cache-aware fallback
-"""
-
-"""
-Fetch pipeline:
- 1. Check cache for each source in the fallback sequence
- 2. For sources without a valid cache hit, call the fetcher
- 3. Cache every result (success, not-found, or error) per source
- 4. Return the best result (synced > unsynced > None)
-"""
-
-from typing import Optional
-from loguru import logger
-
-from .fetchers import FetcherMethodType, create_fetchers
-from .fetchers.base import BaseFetcher
-from .cache import CacheEngine
-from .lrc import normalize_tags, normalize_unsynced, detect_sync_status
-from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR
-from .models import TrackMeta, LyricResult, CacheStatus
-from .enrichers import enrich_track
-
-
-# Maps CacheStatus to the default TTL used when storing results
-_STATUS_TTL: dict[CacheStatus, Optional[int]] = {
- CacheStatus.SUCCESS_SYNCED: TTL_SYNCED,
- CacheStatus.SUCCESS_UNSYNCED: TTL_UNSYNCED,
- CacheStatus.NOT_FOUND: TTL_NOT_FOUND,
- CacheStatus.NETWORK_ERROR: TTL_NETWORK_ERROR,
-}
-
-
-class LrcManager:
- """Main entry point for fetching lyrics with caching."""
-
- def __init__(self) -> None:
- self.cache = CacheEngine()
- self.fetchers = create_fetchers(self.cache)
-
- def _build_sequence(
- self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None
- ) -> list[BaseFetcher]:
- """Determine the ordered list of fetchers to try."""
- if force_method:
- if force_method not in self.fetchers:
- logger.error(f"Unknown method: {force_method}")
- return []
- return [self.fetchers[force_method]]
-
- sequence: list[BaseFetcher] = []
- for method in self.fetchers.keys():
- if self.fetchers[method].is_available(track):
- sequence.append(self.fetchers[method])
-
- logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}")
- return sequence
-
- def fetch_for_track(
- self,
- track: TrackMeta,
- force_method: Optional[FetcherMethodType] = None,
- bypass_cache: bool = False,
- ) -> Optional[LyricResult]:
- """Fetch lyrics for *track* using the fallback pipeline.
-
- Each source is checked against the cache independently:
- - Cache hit with synced lyrics → return immediately
- - Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source
- - Cache miss or unsynced → call fetcher, then cache the result
-
- After all sources are tried, returns the best result found
- (synced > unsynced > None).
- """
- track = enrich_track(track)
- logger.info(f"Fetching lyrics for: {track.display_name()}")
-
- sequence = self._build_sequence(track, force_method)
- if not sequence:
- return None
-
- # Best result seen so far (synced wins over unsynced)
- best_result: Optional[LyricResult] = None
-
- for fetcher in sequence:
- source = fetcher.source_name
-
- # Cache check (skip for fetchers that handle their own caching)
- if not bypass_cache and not fetcher.self_cached:
- cached = self.cache.get(track, source)
- if cached:
- if cached.status == CacheStatus.SUCCESS_SYNCED:
- logger.info(f"[{source}] cache hit: synced lyrics")
- return cached
- elif cached.status == CacheStatus.SUCCESS_UNSYNCED:
- logger.debug(
- f"[{source}] cache hit: unsynced lyrics (continuing)"
- )
- if best_result is None:
- best_result = cached
- continue # Try next source for synced
- elif cached.status in (
- CacheStatus.NOT_FOUND,
- CacheStatus.NETWORK_ERROR,
- ):
- logger.debug(
- f"[{source}] cache hit: {cached.status.value}, skipping"
- )
- continue
- elif not fetcher.self_cached:
- logger.debug(f"[{source}] cache bypassed")
-
- # Fetch
- logger.debug(f"[{source}] calling fetcher...")
- result = fetcher.fetch(track, bypass_cache=bypass_cache)
-
- if not result:
- logger.debug(f"[{source}] returned None (no result)")
- continue
-
- # Cache the result (skip for self-cached fetchers)
- if not fetcher.self_cached:
- ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND)
- self.cache.set(track, source, result, ttl_seconds=ttl)
-
- # Evaluate result
- if result.status == CacheStatus.SUCCESS_SYNCED:
- logger.info(f"[{source}] got synced lyrics")
- return result
-
- if result.status == CacheStatus.SUCCESS_UNSYNCED:
- logger.debug(f"[{source}] got unsynced lyrics (continuing)")
- if best_result is None:
- best_result = result
-
- # NOT_FOUND / NETWORK_ERROR: already cached, try next
-
- # Return best available
- if best_result:
- # Normalize unsynced lyrics: set all timestamps to [00:00.00]
- if (
- best_result.status == CacheStatus.SUCCESS_UNSYNCED
- and best_result.lyrics
- ):
- best_result = LyricResult(
- status=best_result.status,
- lyrics=normalize_unsynced(best_result.lyrics),
- source=best_result.source,
- ttl=best_result.ttl,
- )
- logger.info(
- f"Returning unsynced lyrics from {best_result.source} "
- f"(no synced source found)"
- )
- else:
- logger.info(f"No lyrics found for {track.display_name()}")
-
- return best_result
-
- def manual_insert(
- self,
- track: TrackMeta,
- lyrics: str,
- ) -> None:
- """Manually insert lyrics into the cache for a track."""
- track = enrich_track(track)
- logger.info(f"Manually inserting lyrics for: {track.display_name()}")
- lyrics = normalize_tags(lyrics)
- result = LyricResult(
- status=detect_sync_status(lyrics),
- lyrics=normalize_tags(lyrics),
- source="manual",
- ttl=None,
- )
- self.cache.set(track, "manual", result, ttl_seconds=None)
- logger.info("Lyrics inserted into cache.")
diff --git a/lrcfetch/enrichers/__init__.py b/lrcfetch/enrichers/__init__.py
deleted file mode 100644
index 25309e2..0000000
--- a/lrcfetch/enrichers/__init__.py
+++ /dev/null
@@ -1,39 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-31 06:09:11
-Description: Metadata enrichment pipeline
-"""
-
-from loguru import logger
-
-from .base import BaseEnricher
-from .audio_tag import AudioTagEnricher
-from .file_name import FileNameEnricher
-from ..models import TrackMeta
-
-# Enrichers run in order; earlier ones have higher priority.
-_ENRICHERS: list[BaseEnricher] = [
- AudioTagEnricher(),
- FileNameEnricher(),
-]
-
-
-def enrich_track(track: TrackMeta) -> TrackMeta:
- """Run all enrichers and return a track with missing fields filled in.
-
- Each enricher sees the cumulative state (earlier enrichers' results
- are already applied). A field is only set if it is currently None.
- """
- for enricher in _ENRICHERS:
- try:
- result = enricher.enrich(track)
- except Exception as e:
- logger.warning(f"Enricher {enricher.name} failed: {e}")
- continue
- if not result:
- continue
- # Only apply fields that are still None
- updates = {k: v for k, v in result.items() if getattr(track, k, None) is None}
- if updates:
- track = track.model_copy(update=updates)
- return track
diff --git a/lrcfetch/enrichers/audio_tag.py b/lrcfetch/enrichers/audio_tag.py
deleted file mode 100644
index 4e9f604..0000000
--- a/lrcfetch/enrichers/audio_tag.py
+++ /dev/null
@@ -1,78 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-31 06:11:27
-Description: Enricher that reads metadata from audio file tags (mutagen)
-"""
-
-from typing import Optional
-from loguru import logger
-from mutagen._file import File, FileType
-
-from .base import BaseEnricher
-from ..models import TrackMeta
-from ..lrc import get_audio_path
-
-
-class AudioTagEnricher(BaseEnricher):
- """Extract title, artist, album, and duration from audio file tags."""
-
- @property
- def name(self) -> str:
- return "audio-tag"
-
- def enrich(self, track: TrackMeta) -> Optional[dict]:
- if not track.is_local or not track.url:
- return None
-
- audio_path = get_audio_path(track.url, ensure_exists=True)
- if not audio_path:
- return None
-
- try:
- audio = File(audio_path)
- except Exception as e:
- logger.debug(f"AudioTag: failed to read {audio_path}: {e}")
- return None
-
- if audio is None:
- return None
-
- updates: dict = {}
-
- # Try common tag names (vorbis comments, ID3, MP4)
- title = _first_tag(audio, "title", "TIT2", "\xa9nam")
- if title and not track.title:
- updates["title"] = title
-
- artist = _first_tag(audio, "artist", "TPE1", "\xa9ART")
- if artist and not track.artist:
- updates["artist"] = artist
-
- album = _first_tag(audio, "album", "TALB", "\xa9alb")
- if album and not track.album:
- updates["album"] = album
-
- if not track.length and audio.info and hasattr(audio.info, "length"):
- length_ms = int(audio.info.length * 1000)
- if length_ms > 0:
- updates["length"] = length_ms
-
- if updates:
- logger.debug(f"AudioTag: enriched fields: {list(updates.keys())}")
- return updates or None
-
-
-def _first_tag(audio: FileType, *keys: str) -> Optional[str]:
- """Return the first non-empty string value found among the given tag keys."""
- if not audio.tags:
- return None
- for key in keys:
- val = audio.tags.get(key)
- if val is None:
- continue
- # mutagen returns lists for vorbis, single values for ID3
- if isinstance(val, list):
- val = val[0] if val else None
- if val:
- return str(val).strip()
- return None
diff --git a/lrcfetch/enrichers/base.py b/lrcfetch/enrichers/base.py
deleted file mode 100644
index f0a09da..0000000
--- a/lrcfetch/enrichers/base.py
+++ /dev/null
@@ -1,31 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-31 06:08:16
-Description: Base class for metadata enrichers
-"""
-
-from abc import ABC, abstractmethod
-from typing import Optional
-
-from ..models import TrackMeta
-
-
-class BaseEnricher(ABC):
- """Attempts to fill missing fields on a TrackMeta.
-
- Each enricher inspects the track, and returns a dict of field names
- to values for any fields it can provide. Only fields that are
- currently ``None`` on the track will actually be applied.
- """
-
- @property
- @abstractmethod
- def name(self) -> str: ...
-
- @abstractmethod
- def enrich(self, track: TrackMeta) -> Optional[dict]:
- """Return a dict of {field_name: value} for fields this enricher can fill.
-
- Return None or an empty dict if nothing can be contributed.
- """
- ...
diff --git a/lrcfetch/enrichers/file_name.py b/lrcfetch/enrichers/file_name.py
deleted file mode 100644
index 150cebd..0000000
--- a/lrcfetch/enrichers/file_name.py
+++ /dev/null
@@ -1,83 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-31 06:08:44
-Description: Enricher that parses metadata from the audio file path
-"""
-
-import re
-from typing import Optional
-from loguru import logger
-
-from .base import BaseEnricher
-from ..models import TrackMeta
-from ..lrc import get_audio_path
-
-
-# Common track-number prefixes: "01 - ", "01. ", "1 - ", etc.
-_TRACK_NUM_RE = re.compile(r"^\d{1,3}[\s.\-]+")
-
-
-class FileNameEnricher(BaseEnricher):
- """Derive artist / title from the file path when tags are unavailable.
-
- Heuristics (applied to the stem of the filename):
- - "Artist - Title" → artist, title
- - "01 - Title" → title only (leading track number stripped)
- - "Title" → title only
-
- If artist is still missing after parsing the filename, the parent
- directory name is used as a guess (common layout: ``Artist/Album/track``).
- """
-
- @property
- def name(self) -> str:
- return "file-name"
-
- def enrich(self, track: TrackMeta) -> Optional[dict]:
- if not track.is_local or not track.url:
- return None
-
- audio_path = get_audio_path(track.url, ensure_exists=False)
- if not audio_path:
- return None
-
- updates: dict = {}
- stem = audio_path.stem
-
- # Try "Artist - Title" split
- if " - " in stem:
- left, right = stem.split(" - ", 1)
- left = _TRACK_NUM_RE.sub("", left).strip()
- right = right.strip()
-
- if left and right:
- # Both sides non-empty after stripping track number
- if not track.artist:
- updates["artist"] = left
- if not track.title:
- updates["title"] = right
- elif right:
- # Left was only a track number → right is the title
- if not track.title:
- updates["title"] = right
- else:
- # No separator: strip track number, remainder is title
- title_guess = _TRACK_NUM_RE.sub("", stem).strip()
- if title_guess and not track.title:
- updates["title"] = title_guess
-
- # Use parent directory as artist fallback
- # Typical layout: /Music/Artist/Album/01 - Track.flac
- if not track.artist and "artist" not in updates:
- parents = audio_path.parents
- if len(parents) >= 2:
- album_dir = parents[0].name
- artist_dir = parents[1].name
- if artist_dir and artist_dir not in (".", "/"):
- updates["artist"] = artist_dir
- if not track.album and album_dir and album_dir != artist_dir:
- updates["album"] = album_dir
-
- if updates:
- logger.debug(f"FileName: enriched fields: {list(updates.keys())}")
- return updates or None
diff --git a/lrcfetch/fetchers/__init__.py b/lrcfetch/fetchers/__init__.py
deleted file mode 100644
index 75377d6..0000000
--- a/lrcfetch/fetchers/__init__.py
+++ /dev/null
@@ -1,41 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 02:33:26
-Description: Fetcher pipeline — registry and types
-"""
-
-from typing import Literal
-
-from .base import BaseFetcher
-from .local import LocalFetcher
-from .cache_search import CacheSearchFetcher
-from .spotify import SpotifyFetcher
-from .lrclib import LrclibFetcher
-from .lrclib_search import LrclibSearchFetcher
-from .netease import NeteaseFetcher
-from .qqmusic import QQMusicFetcher
-from ..cache import CacheEngine
-
-FetcherMethodType = Literal[
- "local",
- "cache-search",
- "spotify",
- "lrclib",
- "lrclib-search",
- "netease",
- "qqmusic",
-]
-
-
-def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]:
- """Instantiate all fetchers. Returns a dict keyed by source name."""
- fetchers: dict[FetcherMethodType, BaseFetcher] = {
- "local": LocalFetcher(),
- "cache-search": CacheSearchFetcher(cache),
- "spotify": SpotifyFetcher(),
- "lrclib": LrclibFetcher(),
- "lrclib-search": LrclibSearchFetcher(),
- "netease": NeteaseFetcher(),
- "qqmusic": QQMusicFetcher(),
- }
- return fetchers
diff --git a/lrcfetch/fetchers/base.py b/lrcfetch/fetchers/base.py
deleted file mode 100644
index 2bf70af..0000000
--- a/lrcfetch/fetchers/base.py
+++ /dev/null
@@ -1,35 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 02:33:26
-Description: Base fetcher class and common interfaces
-"""
-
-from abc import ABC, abstractmethod
-from typing import Optional
-
-from ..models import TrackMeta, LyricResult
-
-
-class BaseFetcher(ABC):
- @property
- @abstractmethod
- def source_name(self) -> str:
- """Name of the fetcher source."""
- pass
-
- @property
- def self_cached(self) -> bool:
- """True if this fetcher manages its own cache (skip per-source cache check)."""
- return False
-
- @abstractmethod
- def is_available(self, track: TrackMeta) -> bool:
- """Check if the fetcher is available for the given track (e.g. has required metadata)."""
- pass
-
- @abstractmethod
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- """Fetch lyrics for the given track. Returns None if unable to fetch."""
- pass
diff --git a/lrcfetch/fetchers/cache_search.py b/lrcfetch/fetchers/cache_search.py
deleted file mode 100644
index af973c5..0000000
--- a/lrcfetch/fetchers/cache_search.py
+++ /dev/null
@@ -1,85 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-28 05:57:46
-Description: Cache-search fetcher — cross-album fuzzy lookup in the local cache
-"""
-
-"""
-Searches existing cache entries by artist + title with fuzzy normalization,
-ignoring album and source. Useful when the same track appears on different
-albums or is played from different players.
-"""
-
-from typing import Optional
-from loguru import logger
-
-from .base import BaseFetcher
-from ..models import TrackMeta, LyricResult, CacheStatus
-from ..cache import CacheEngine
-
-
-class CacheSearchFetcher(BaseFetcher):
- def __init__(self, cache: CacheEngine) -> None:
- self._cache = cache
-
- @property
- def source_name(self) -> str:
- return "cache-search"
-
- @property
- def self_cached(self) -> bool:
- return True
-
- def is_available(self, track: TrackMeta) -> bool:
- return bool(track.title)
-
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- if bypass_cache:
- logger.debug("Cache-search: bypassed by caller")
- return None
-
- if not track.title:
- logger.debug("Cache-search: skipped — no title")
- return None
-
- # Fast path: exact metadata match (artist+title+album), single SQL query
- exact = self._cache.find_best_positive(track)
- if exact:
- logger.info(f"Cache-search: exact hit ({exact.status.value})")
- return exact
-
- # Slow path: fuzzy cross-album search
- matches = self._cache.search_by_meta(
- artist=track.artist,
- title=track.title,
- length=track.length,
- )
-
- if not matches:
- logger.debug(f"Cache-search: no match for {track.display_name()}")
- return None
-
- # Pick best: prefer synced, then first available
- best = None
- for m in matches:
- if m.get("status") == CacheStatus.SUCCESS_SYNCED.value:
- best = m
- break
- if best is None:
- best = m
-
- if not best or not best.get("lyrics"):
- return None
-
- status = CacheStatus(best["status"])
- logger.info(
- f"Cache-search: fuzzy hit from [{best.get('source')}] "
- f"album={best.get('album')!r} ({status.value})"
- )
- return LyricResult(
- status=status,
- lyrics=best["lyrics"],
- source=self.source_name,
- )
diff --git a/lrcfetch/fetchers/local.py b/lrcfetch/fetchers/local.py
deleted file mode 100644
index 82e3ecd..0000000
--- a/lrcfetch/fetchers/local.py
+++ /dev/null
@@ -1,98 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-26 02:08:41
-Description: Local fetcher — reads lyrics from .lrc sidecar files or embedded audio metadata
-"""
-
-"""
-Priority:
- 1. Same-directory .lrc file (e.g. /path/to/track.lrc)
- 2. Embedded lyrics in audio metadata (FLAC, MP3 USLT/SYLT tags)
-"""
-
-from typing import Optional
-from loguru import logger
-from mutagen._file import File
-from mutagen.flac import FLAC
-
-from .base import BaseFetcher
-from ..models import TrackMeta, LyricResult
-from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path
-
-
-class LocalFetcher(BaseFetcher):
- @property
- def source_name(self) -> str:
- return "local"
-
- def is_available(self, track: TrackMeta) -> bool:
- return track.is_local
-
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- """Attempt to read lyrics from local filesystem."""
- if not track.is_local or not track.url:
- return None
-
- audio_path = get_audio_path(track.url, ensure_exists=False)
- if not audio_path:
- logger.debug(f"Local: audio URL is not a valid file path: {track.url}")
- return None
-
- lrc_path = get_sidecar_path(
- track.url, ensure_audio_exists=False, ensure_exists=True
- )
- if lrc_path:
- try:
- with open(lrc_path, "r", encoding="utf-8") as f:
- content = f.read().strip()
- if content:
- content = normalize_tags(content)
- status = detect_sync_status(content)
- logger.info(f"Local: found .lrc sidecar ({status.value})")
- return LyricResult(
- status=status, lyrics=content, source=self.source_name
- )
- except Exception as e:
- logger.error(f"Local: error reading {lrc_path}: {e}")
- else:
- logger.debug(f"Local: no .lrc sidecar found for {audio_path}")
-
- # Embedded metadata
- if not audio_path.exists():
- logger.debug(f"Local: audio file does not exist: {audio_path}")
- return None
- try:
- audio = File(audio_path)
- if audio is not None:
- lyrics = None
-
- if isinstance(audio, FLAC):
- # FLAC stores lyrics in vorbis comment tags
- lyrics = (
- audio.get("lyrics") or audio.get("unsynclyrics") or [None]
- )[0]
- elif hasattr(audio, "tags") and audio.tags:
- # MP3 / other: look for USLT or SYLT ID3 frames
- for key in audio.tags.keys():
- if key.startswith("USLT") or key.startswith("SYLT"):
- lyrics = str(audio.tags[key])
- break
-
- if lyrics:
- lyrics = normalize_tags(lyrics.strip())
- status = detect_sync_status(lyrics)
- logger.info(f"Local: found embedded lyrics ({status.value})")
- return LyricResult(
- status=status,
- lyrics=lyrics,
- source=f"{self.source_name} (embedded)",
- )
- else:
- logger.debug("Local: no embedded lyrics found")
- except Exception as e:
- logger.error(f"Local: error reading metadata for {audio_path}: {e}")
-
- logger.debug(f"Local: no lyrics found for {audio_path}")
- return None
diff --git a/lrcfetch/fetchers/lrclib.py b/lrcfetch/fetchers/lrclib.py
deleted file mode 100644
index 94928d1..0000000
--- a/lrcfetch/fetchers/lrclib.py
+++ /dev/null
@@ -1,111 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 05:23:38
-Description: LRCLIB fetcher — queries lrclib.net for synced/plain lyrics
-"""
-
-"""
-Requires complete track metadata (artist, title, album, duration).
-"""
-
-from typing import Optional
-import httpx
-from loguru import logger
-from urllib.parse import urlencode
-
-from .base import BaseFetcher
-from ..models import TrackMeta, LyricResult, CacheStatus
-from ..lrc import normalize_tags
-from ..config import (
- HTTP_TIMEOUT,
- TTL_UNSYNCED,
- TTL_NOT_FOUND,
- TTL_NETWORK_ERROR,
- LRCLIB_API_URL,
- UA_LRCFETCH,
-)
-
-
-class LrclibFetcher(BaseFetcher):
- @property
- def source_name(self) -> str:
- return "lrclib"
-
- def is_available(self, track: TrackMeta) -> bool:
- return track.is_complete
-
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- """Fetch lyrics from LRCLIB. Requires complete metadata."""
- if not track.is_complete:
- logger.debug("LRCLIB: skipped — incomplete metadata")
- return None
-
- params = {
- "track_name": track.title,
- "artist_name": track.artist,
- "album_name": track.album,
- "duration": track.length / 1000.0 if track.length else 0,
- }
-
- url = f"{LRCLIB_API_URL}?{urlencode(params)}"
- logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
-
- try:
- with httpx.Client(timeout=HTTP_TIMEOUT) as client:
- resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
-
- if resp.status_code == 404:
- logger.debug(f"LRCLIB: not found for {track.display_name()}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- if resp.status_code != 200:
- logger.error(f"LRCLIB: API returned {resp.status_code}")
- return LyricResult(
- status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
- )
-
- data = resp.json()
-
- # Validate response
- if not isinstance(data, dict):
- logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
- return LyricResult(
- status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
- )
-
- synced = data.get("syncedLyrics")
- unsynced = data.get("plainLyrics")
-
- if isinstance(synced, str) and synced.strip():
- lyrics = normalize_tags(synced.strip())
- logger.info(
- f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)"
- )
- return LyricResult(
- status=CacheStatus.SUCCESS_SYNCED,
- lyrics=lyrics,
- source=self.source_name,
- )
- elif isinstance(unsynced, str) and unsynced.strip():
- lyrics = normalize_tags(unsynced.strip())
- logger.info(
- f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
- )
- return LyricResult(
- status=CacheStatus.SUCCESS_UNSYNCED,
- lyrics=lyrics,
- source=self.source_name,
- ttl=TTL_UNSYNCED,
- )
- else:
- logger.debug(f"LRCLIB: empty response for {track.display_name()}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- except httpx.HTTPError as e:
- logger.error(f"LRCLIB: HTTP error: {e}")
- return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
- except Exception as e:
- logger.error(f"LRCLIB: unexpected error: {e}")
- return None
diff --git a/lrcfetch/fetchers/lrclib_search.py b/lrcfetch/fetchers/lrclib_search.py
deleted file mode 100644
index 95f87c2..0000000
--- a/lrcfetch/fetchers/lrclib_search.py
+++ /dev/null
@@ -1,168 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 05:30:50
-Description: LRCLIB search fetcher — fuzzy search via lrclib.net /api/search
-"""
-
-"""
-Used when metadata is incomplete (no album or duration) but title is available.
-Selects the best match by duration when track length is known.
-"""
-
-import httpx
-from typing import Optional
-from loguru import logger
-from urllib.parse import urlencode
-
-from .base import BaseFetcher
-from ..models import TrackMeta, LyricResult, CacheStatus
-from ..lrc import normalize_tags
-from ..config import (
- HTTP_TIMEOUT,
- TTL_UNSYNCED,
- TTL_NOT_FOUND,
- TTL_NETWORK_ERROR,
- DURATION_TOLERANCE_MS,
- LRCLIB_SEARCH_URL,
- UA_LRCFETCH,
-)
-
-
-class LrclibSearchFetcher(BaseFetcher):
- @property
- def source_name(self) -> str:
- return "lrclib-search"
-
- def is_available(self, track: TrackMeta) -> bool:
- return bool(track.title)
-
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- """Search LRCLIB for lyrics. Requires at least a title."""
- if not track.title:
- logger.debug("LRCLIB-search: skipped — no title")
- return None
-
- params: dict[str, str] = {"track_name": track.title}
- if track.artist:
- params["artist_name"] = track.artist
- if track.album:
- params["album_name"] = track.album
-
- url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
- logger.info(f"LRCLIB-search: searching for {track.display_name()}")
-
- try:
- with httpx.Client(timeout=HTTP_TIMEOUT) as client:
- resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
-
- if resp.status_code != 200:
- logger.error(f"LRCLIB-search: API returned {resp.status_code}")
- return LyricResult(
- status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
- )
-
- data = resp.json()
-
- if not isinstance(data, list) or len(data) == 0:
- logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- logger.debug(f"LRCLIB-search: got {len(data)} candidates")
-
- # Select best match by duration
- best = self._select_best(data, track)
- if best is None:
- logger.debug("LRCLIB-search: no valid candidate found")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- # Extract lyrics
- synced = best.get("syncedLyrics")
- unsynced = best.get("plainLyrics")
-
- if isinstance(synced, str) and synced.strip():
- lyrics = normalize_tags(synced.strip())
- logger.info(
- f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)"
- )
- return LyricResult(
- status=CacheStatus.SUCCESS_SYNCED,
- lyrics=lyrics,
- source=self.source_name,
- )
- elif isinstance(unsynced, str) and unsynced.strip():
- lyrics = normalize_tags(unsynced.strip())
- logger.info(
- f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
- )
- return LyricResult(
- status=CacheStatus.SUCCESS_UNSYNCED,
- lyrics=lyrics,
- source=self.source_name,
- ttl=TTL_UNSYNCED,
- )
- else:
- logger.debug("LRCLIB-search: best candidate has empty lyrics")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- except httpx.HTTPError as e:
- logger.error(f"LRCLIB-search: HTTP error: {e}")
- return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
- except Exception as e:
- logger.error(f"LRCLIB-search: unexpected error: {e}")
- return None
-
- @staticmethod
- def _select_best(candidates: list[dict], track: TrackMeta) -> Optional[dict]:
- """Pick the best candidate, preferring synced lyrics and closest duration."""
- if track.length is not None:
- track_s = track.length / 1000.0
- best: Optional[dict] = None
- best_diff = float("inf")
-
- for item in candidates:
- if not isinstance(item, dict):
- continue
- duration = item.get("duration")
- if not isinstance(duration, (int, float)):
- continue
- diff = abs(duration - track_s) * 1000 # compare in ms
- if diff > DURATION_TOLERANCE_MS:
- continue
- # Prefer synced over unsynced at similar duration
- has_synced = (
- isinstance(item.get("syncedLyrics"), str)
- and item["syncedLyrics"].strip()
- )
- best_synced = (
- best is not None
- and isinstance(best.get("syncedLyrics"), str)
- and best["syncedLyrics"].strip()
- )
- if diff < best_diff or (
- diff == best_diff and has_synced and not best_synced
- ):
- best_diff = diff
- best = item
-
- if best is not None:
- logger.debug(
- f"LRCLIB-search: selected id={best.get('id')} (diff={best_diff:.0f}ms)"
- )
- return best
-
- logger.debug(
- f"LRCLIB-search: no candidate within {DURATION_TOLERANCE_MS}ms"
- )
- return None
-
- # No duration — pick first with synced lyrics, or just first
- for item in candidates:
- if (
- isinstance(item, dict)
- and isinstance(item.get("syncedLyrics"), str)
- and item["syncedLyrics"].strip()
- ):
- return item
- return candidates[0] if isinstance(candidates[0], dict) else None
diff --git a/lrcfetch/fetchers/netease.py b/lrcfetch/fetchers/netease.py
deleted file mode 100644
index eecc8d7..0000000
--- a/lrcfetch/fetchers/netease.py
+++ /dev/null
@@ -1,213 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 11:04:51
-Description: Netease Cloud Music fetcher
-"""
-
-"""
-Uses the public cloudsearch API for searching and the song/lyric API for
-retrieving lyrics. No authentication required.
-
-Search results are filtered by duration when the track has a known length
-to avoid returning lyrics for the wrong version of a song.
-"""
-
-from typing import Optional
-import httpx
-from loguru import logger
-
-from .base import BaseFetcher
-from ..models import TrackMeta, LyricResult, CacheStatus
-from ..lrc import detect_sync_status, normalize_tags
-from ..config import (
- HTTP_TIMEOUT,
- TTL_NOT_FOUND,
- TTL_NETWORK_ERROR,
- DURATION_TOLERANCE_MS,
- NETEASE_SEARCH_URL,
- NETEASE_LYRIC_URL,
- UA_BROWSER,
-)
-
-_HEADERS = {
- "User-Agent": UA_BROWSER,
- "Referer": "https://music.163.com/",
-}
-
-
-class NeteaseFetcher(BaseFetcher):
- @property
- def source_name(self) -> str:
- return "netease"
-
- def is_available(self, track: TrackMeta) -> bool:
- return bool(track.title)
-
- def _search(self, track: TrackMeta, limit: int = 10) -> Optional[int]:
- """Search Netease and return the best-matching song ID.
-
- When ``track.length`` is available, candidates are ranked by duration
- difference and only accepted if within ``DURATION_TOLERANCE_MS``.
- """
- query = f"{track.artist or ''} {track.title or ''}".strip()
- if not query:
- return None
-
- logger.debug(f"Netease: searching for '{query}' (limit={limit})")
-
- try:
- with httpx.Client(timeout=HTTP_TIMEOUT) as client:
- resp = client.post(
- NETEASE_SEARCH_URL,
- headers=_HEADERS,
- data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
- )
- resp.raise_for_status()
- result = resp.json()
-
- # Validate response
- if not isinstance(result, dict):
- logger.error(
- f"Netease: search returned non-dict: {type(result).__name__}"
- )
- return None
-
- result_body = result.get("result")
- if not isinstance(result_body, dict):
- logger.debug("Netease: search 'result' field missing or invalid")
- return None
-
- songs = result_body.get("songs")
- if not isinstance(songs, list) or len(songs) == 0:
- logger.debug("Netease: search returned 0 results")
- return None
-
- logger.debug(f"Netease: search returned {len(songs)} candidates")
-
- # Duration-based best-match selection
- if track.length is not None:
- track_ms = track.length
- best_id: Optional[int] = None
- best_diff = float("inf")
-
- for song in songs:
- if not isinstance(song, dict):
- continue
- sid = song.get("id")
- name = song.get("name", "?")
- duration = song.get("dt") # milliseconds
- if not isinstance(duration, int):
- logger.debug(
- f" candidate {sid} '{name}': no duration, skipped"
- )
- continue
- diff = abs(duration - track_ms)
- logger.debug(
- f" candidate {sid} '{name}': "
- f"duration={duration}ms, diff={diff}ms"
- )
- if diff < best_diff:
- best_diff = diff
- best_id = sid
-
- if best_id is not None and best_diff <= DURATION_TOLERANCE_MS:
- logger.debug(f"Netease: selected id={best_id} (diff={best_diff}ms)")
- return best_id
-
- logger.debug(
- f"Netease: no candidate within {DURATION_TOLERANCE_MS}ms "
- f"(best diff={best_diff}ms)"
- )
- return None
-
- # No duration info — take the first result
- first = songs[0]
- if not isinstance(first, dict) or "id" not in first:
- logger.error("Netease: first search result has no 'id'")
- return None
- logger.debug(
- f"Netease: no duration available, using first result "
- f"id={first['id']} '{first.get('name', '?')}'"
- )
- return first["id"]
-
- except Exception as e:
- logger.error(f"Netease: search failed: {e}")
- return None
-
- def _get_lyric(self, song_id: int) -> Optional[LyricResult]:
- """Fetch lyrics for a given Netease song ID."""
- logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
-
- try:
- with httpx.Client(timeout=HTTP_TIMEOUT) as client:
- resp = client.post(
- NETEASE_LYRIC_URL,
- headers=_HEADERS,
- data={
- "id": str(song_id),
- "cp": "false",
- "tv": "0",
- "lv": "0",
- "rv": "0",
- "kv": "0",
- "yv": "0",
- "ytv": "0",
- "yrv": "0",
- },
- )
- resp.raise_for_status()
- data = resp.json()
-
- # Validate response
- if not isinstance(data, dict):
- logger.error(
- f"Netease: lyric response is not dict: {type(data).__name__}"
- )
- return LyricResult(
- status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
- )
-
- lrc_obj = data.get("lrc")
- if not isinstance(lrc_obj, dict):
- logger.debug(
- f"Netease: no 'lrc' object in response for song_id={song_id}"
- )
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- lrc: str = lrc_obj.get("lyric", "")
- if not isinstance(lrc, str) or not lrc.strip():
- logger.debug(f"Netease: empty lyrics for song_id={song_id}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- # Determine sync status
- lrc = normalize_tags(lrc)
- status = detect_sync_status(lrc)
- logger.info(
- f"Netease: got {status.value} lyrics for song_id={song_id} "
- f"({len(lrc.splitlines())} lines)"
- )
- return LyricResult(
- status=status, lyrics=lrc.strip(), source=self.source_name
- )
-
- except Exception as e:
- logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
- return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
-
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- """Search for the track and fetch its lyrics."""
- query = f"{track.artist or ''} {track.title or ''}".strip()
- if not query:
- logger.debug("Netease: skipped — insufficient metadata")
- return None
-
- logger.info(f"Netease: fetching lyrics for {track.display_name()}")
- song_id = self._search(track)
- if not song_id:
- logger.debug(f"Netease: no match found for {track.display_name()}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- return self._get_lyric(song_id)
diff --git a/lrcfetch/fetchers/qqmusic.py b/lrcfetch/fetchers/qqmusic.py
deleted file mode 100644
index a5d0b63..0000000
--- a/lrcfetch/fetchers/qqmusic.py
+++ /dev/null
@@ -1,178 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-31 01:54:02
-Description: QQ Music fetcher via self-hosted API proxy
-"""
-
-"""
-Requires a running qq-music-api instance.
-The base URL is read from the QQ_MUSIC_API_URL environment variable.
-
-Search → pick best match by duration → fetch LRC lyrics.
-"""
-
-from typing import Optional
-import httpx
-from loguru import logger
-
-from .base import BaseFetcher
-from ..models import TrackMeta, LyricResult, CacheStatus
-from ..lrc import detect_sync_status, normalize_tags
-from ..config import (
- HTTP_TIMEOUT,
- TTL_NOT_FOUND,
- TTL_NETWORK_ERROR,
- DURATION_TOLERANCE_MS,
- QQ_MUSIC_API_URL,
-)
-
-
-class QQMusicFetcher(BaseFetcher):
- @property
- def source_name(self) -> str:
- return "qqmusic"
-
- def is_available(self, track: TrackMeta) -> bool:
- return bool(track.title) and bool(QQ_MUSIC_API_URL)
-
- def _search(self, track: TrackMeta, limit: int = 10) -> Optional[str]:
- """Search QQ Music and return the best-matching song MID."""
- query = f"{track.artist or ''} {track.title or ''}".strip()
- if not query:
- return None
-
- logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
-
- try:
- with httpx.Client(timeout=HTTP_TIMEOUT) as client:
- resp = client.get(
- f"{QQ_MUSIC_API_URL}/api/search",
- params={"keyword": query, "type": "song", "num": limit},
- )
- resp.raise_for_status()
- data = resp.json()
-
- if data.get("code") != 0:
- logger.error(f"QQMusic: search API error: {data}")
- return None
-
- songs = data.get("data", {}).get("list", [])
- if not songs:
- logger.debug("QQMusic: search returned 0 results")
- return None
-
- logger.debug(f"QQMusic: search returned {len(songs)} candidates")
-
- # Duration-based best-match selection
- if track.length is not None:
- track_ms = track.length
- best_mid: Optional[str] = None
- best_diff = float("inf")
-
- for song in songs:
- if not isinstance(song, dict):
- continue
- mid = song.get("mid")
- name = song.get("name", "?")
- # interval is in seconds
- interval = song.get("interval")
- if not isinstance(interval, int):
- logger.debug(
- f" candidate {mid} '{name}': no duration, skipped"
- )
- continue
- duration_ms = interval * 1000
- diff = abs(duration_ms - track_ms)
- logger.debug(
- f" candidate {mid} '{name}': "
- f"duration={duration_ms}ms, diff={diff}ms"
- )
- if diff < best_diff:
- best_diff = diff
- best_mid = mid
-
- if best_mid is not None and best_diff <= DURATION_TOLERANCE_MS:
- logger.debug(
- f"QQMusic: selected mid={best_mid} (diff={best_diff}ms)"
- )
- return best_mid
-
- logger.debug(
- f"QQMusic: no candidate within {DURATION_TOLERANCE_MS}ms "
- f"(best diff={best_diff}ms)"
- )
- return None
-
- # No duration info — take the first result
- first = songs[0]
- if not isinstance(first, dict) or "mid" not in first:
- logger.error("QQMusic: first search result has no 'mid'")
- return None
- logger.debug(
- f"QQMusic: no duration available, using first result "
- f"mid={first['mid']} '{first.get('name', '?')}'"
- )
- return first["mid"]
-
- except Exception as e:
- logger.error(f"QQMusic: search failed: {e}")
- return None
-
- def _get_lyric(self, mid: str) -> Optional[LyricResult]:
- """Fetch lyrics for a given QQ Music song MID."""
- logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
-
- try:
- with httpx.Client(timeout=HTTP_TIMEOUT) as client:
- resp = client.get(
- f"{QQ_MUSIC_API_URL}/api/lyric",
- params={"mid": mid},
- )
- resp.raise_for_status()
- data = resp.json()
-
- if data.get("code") != 0:
- logger.error(f"QQMusic: lyric API error: {data}")
- return LyricResult(
- status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
- )
-
- lrc = data.get("data", {}).get("lyric", "")
- if not isinstance(lrc, str) or not lrc.strip():
- logger.debug(f"QQMusic: empty lyrics for mid={mid}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- lrc = normalize_tags(lrc)
- status = detect_sync_status(lrc)
- logger.info(
- f"QQMusic: got {status.value} lyrics for mid={mid} "
- f"({len(lrc.splitlines())} lines)"
- )
- return LyricResult(
- status=status, lyrics=lrc.strip(), source=self.source_name
- )
-
- except Exception as e:
- logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
- return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
-
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- """Search for the track and fetch its lyrics."""
- if not QQ_MUSIC_API_URL:
- logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured")
- return None
-
- query = f"{track.artist or ''} {track.title or ''}".strip()
- if not query:
- logger.debug("QQMusic: skipped — insufficient metadata")
- return None
-
- logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
- mid = self._search(track)
- if not mid:
- logger.debug(f"QQMusic: no match found for {track.display_name()}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- return self._get_lyric(mid)
diff --git a/lrcfetch/fetchers/spotify.py b/lrcfetch/fetchers/spotify.py
deleted file mode 100644
index fcd568c..0000000
--- a/lrcfetch/fetchers/spotify.py
+++ /dev/null
@@ -1,373 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 10:43:21
-Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
-"""
-
-"""
-Authentication flow:
- 1. Fetch server time from Spotify
- 2. Fetch TOTP secret
- 3. Generate a TOTP code and exchange it (with SP_DC cookie) for an access token
- 4. Request lyrics using the access token
-
-The secret and token are cached on the instance to avoid redundant network
-calls within the same session.
-
-Requires SPOTIFY_SP_DC environment variable to be set.
-"""
-
-import httpx
-import json
-import time
-import struct
-import hmac
-import hashlib
-from typing import Optional, Tuple
-from loguru import logger
-
-from .base import BaseFetcher
-from ..models import TrackMeta, LyricResult, CacheStatus
-from ..lrc import normalize_tags
-from ..config import (
- HTTP_TIMEOUT,
- SPOTIFY_APP_VERSION,
- TTL_NOT_FOUND,
- TTL_NETWORK_ERROR,
- SPOTIFY_TOKEN_URL,
- SPOTIFY_LYRICS_URL,
- SPOTIFY_SERVER_TIME_URL,
- SPOTIFY_SECRET_URL,
- SPOTIFY_SP_DC,
- SPOTIFY_TOKEN_CACHE_FILE,
- UA_BROWSER,
-)
-
-
-class SpotifyFetcher(BaseFetcher):
- def __init__(self) -> None:
- # Session-level caches to avoid refetching within the same run
- self._cached_secret: Optional[Tuple[str, int]] = None
- self._cached_token: Optional[str] = None
- self._token_expires_at: float = 0.0
-
- @property
- def source_name(self) -> str:
- return "spotify"
-
- def is_available(self, track: TrackMeta) -> bool:
- return bool(track.trackid) and bool(SPOTIFY_SP_DC)
-
- # ─── Auth helpers ────────────────────────────────────────────────
-
- def _get_server_time(self, client: httpx.Client) -> Optional[int]:
- """Fetch Spotify's server timestamp (seconds since epoch)."""
- try:
- res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
- res.raise_for_status()
- data = res.json()
- if not isinstance(data, dict) or "serverTime" not in data:
- logger.error(f"Spotify: unexpected server-time response: {data}")
- return None
- server_time = data["serverTime"]
- logger.debug(f"Spotify: server time = {server_time}")
- return server_time
- except Exception as e:
- logger.error(f"Spotify: failed to fetch server time: {e}")
- return None
-
- def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]:
- """Fetch and decode the TOTP secret. Cached after first success.
-
- Response format: [{version: int, secret: str}, ...]
- Each character in *secret* is XOR-decoded with ``(index % 33) + 9``.
- """
- if self._cached_secret is not None:
- logger.debug("Spotify: using cached TOTP secret")
- return self._cached_secret
-
- try:
- res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
- res.raise_for_status()
- data = res.json()
-
- if not isinstance(data, list) or len(data) == 0:
- logger.error(
- f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})"
- )
- return None
-
- last = data[-1]
- if "secret" not in last or "version" not in last:
- logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
- return None
-
- secret_raw = last["secret"]
- version = last["version"]
-
- # XOR decode
- parts = []
- for i, char in enumerate(secret_raw):
- parts.append(str(ord(char) ^ ((i % 33) + 9)))
- secret = "".join(parts)
-
- logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
- self._cached_secret = (secret, version)
- return self._cached_secret
-
- except Exception as e:
- logger.error(f"Spotify: failed to fetch secret: {e}")
- return None
-
- @staticmethod
- def _generate_totp(server_time_s: int, secret: str) -> str:
- """Generate a 6-digit TOTP code compatible with Spotify's auth.
-
- Uses HMAC-SHA1 with a 30-second period, matching the Go reference.
- """
- counter = server_time_s // 30
- counter_bytes = struct.pack(">Q", counter)
-
- mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
-
- offset = mac[-1] & 0x0F
- binary_code = (
- (mac[offset] & 0x7F) << 24
- | (mac[offset + 1] & 0xFF) << 16
- | (mac[offset + 2] & 0xFF) << 8
- | (mac[offset + 3] & 0xFF)
- )
-
- code = binary_code % (10**6)
- return str(code).zfill(6)
-
- def _load_cached_token(self) -> Optional[str]:
- """Try to load a valid token from the persistent cache file."""
- try:
- with open(SPOTIFY_TOKEN_CACHE_FILE, "r") as f:
- data = json.load(f)
- expires_ms = data.get("accessTokenExpirationTimestampMs", 0)
- if expires_ms <= int(time.time() * 1000):
- logger.debug("Spotify: persisted token expired")
- return None
- token = data.get("accessToken", "")
- if not token:
- return None
- self._cached_token = token
- self._token_expires_at = expires_ms / 1000.0
- logger.debug("Spotify: loaded token from cache file")
- return token
- except (FileNotFoundError, json.JSONDecodeError, KeyError):
- return None
-
- def _save_token(self, body: dict) -> None:
- """Persist the token response to disk."""
- try:
- with open(SPOTIFY_TOKEN_CACHE_FILE, "w") as f:
- json.dump(body, f)
- logger.debug("Spotify: token saved to cache file")
- except Exception as e:
- logger.warning(f"Spotify: failed to write token cache: {e}")
-
- def _get_token(self) -> Optional[str]:
- """Obtain a Spotify access token. Cached in memory and on disk.
-
- Requires SP_DC cookie (set via SPOTIFY_SP_DC env var).
- """
- # 1. Memory cache
- if self._cached_token and time.time() < self._token_expires_at - 30:
- logger.debug("Spotify: using in-memory cached token")
- return self._cached_token
-
- # 2. Disk cache
- disk_token = self._load_cached_token()
- if disk_token and time.time() < self._token_expires_at - 30:
- return disk_token
-
- # 3. Fetch new token
- if not SPOTIFY_SP_DC:
- logger.error(
- "Spotify: SPOTIFY_SP_DC env var not set — "
- "cannot authenticate with Spotify"
- )
- return None
-
- headers = {
- "User-Agent": UA_BROWSER,
- "Accept": "*/*",
- "Referer": "https://open.spotify.com/",
- "Cookie": f"sp_dc={SPOTIFY_SP_DC}",
- }
-
- with httpx.Client(headers=headers) as client:
- server_time = self._get_server_time(client)
- if server_time is None:
- return None
-
- secret_data = self._get_secret(client)
- if secret_data is None:
- return None
-
- secret, version = secret_data
- totp = self._generate_totp(server_time, secret)
- logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
-
- params = {
- "reason": "init",
- "productType": "web-player",
- "totp": totp,
- "totpVer": str(version),
- "totpServer": totp,
- }
-
- try:
- res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT)
- if res.status_code != 200:
- logger.error(f"Spotify: token request returned {res.status_code}")
- return None
-
- body = res.json()
-
- if not isinstance(body, dict) or "accessToken" not in body:
- logger.error(
- f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}"
- )
- return None
-
- token = body["accessToken"]
- is_anonymous = body.get("isAnonymous", False)
- if is_anonymous:
- logger.warning(
- "Spotify: received anonymous token — SP_DC may be invalid"
- )
-
- expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
- if expires_ms and expires_ms > int(time.time() * 1000):
- self._token_expires_at = expires_ms / 1000.0
- else:
- logger.warning("Spotify: token expiry missing or invalid")
- self._token_expires_at = time.time() + 3600
-
- self._cached_token = token
- # Persist to disk (including anonymous tokens, same as Go ref)
- self._save_token(body)
- logger.debug("Spotify: obtained access token")
- return token
-
- except Exception as e:
- logger.error(f"Spotify: token request failed: {e}")
- return None
-
- # ─── Lyrics ──────────────────────────────────────────────────────
-
- @staticmethod
- def _format_lrc_line(start_ms: int, words: str) -> str:
- """Format a single lyric line as LRC ``[mm:ss.cc]text``."""
- minutes = start_ms // 60000
- seconds = (start_ms // 1000) % 60
- centiseconds = round((start_ms % 1000) / 10.0)
- return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
-
- @staticmethod
- def _is_truly_synced(lines: list[dict]) -> bool:
- """Check if lyrics are actually synced (not all timestamps zero)."""
- for line in lines:
- try:
- ms = int(line.get("startTimeMs", "0"))
- if ms > 0:
- return True
- except (ValueError, TypeError):
- continue
- return False
-
- def fetch(
- self, track: TrackMeta, bypass_cache: bool = False
- ) -> Optional[LyricResult]:
- """Fetch lyrics for a Spotify track by its track ID."""
- if not track.trackid:
- logger.debug("Spotify: skipped — no trackid in metadata")
- return None
-
- logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
-
- token = self._get_token()
- if not token:
- logger.error("Spotify: cannot fetch lyrics without a token")
- return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
-
- url = f"{SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token"
- headers = {
- "User-Agent": UA_BROWSER,
- "Accept": "application/json",
- "Authorization": f"Bearer {token}",
- "Referer": "https://open.spotify.com/",
- "App-Platform": "WebPlayer",
- "Spotify-App-Version": SPOTIFY_APP_VERSION,
- "Origin": "https://open.spotify.com",
- }
-
- try:
- with httpx.Client(timeout=HTTP_TIMEOUT) as client:
- res = client.get(url, headers=headers)
-
- if res.status_code == 404:
- logger.debug(f"Spotify: 404 for trackid={track.trackid}")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- if res.status_code != 200:
- logger.error(f"Spotify: lyrics API returned {res.status_code}")
- return LyricResult(
- status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
- )
-
- data = res.json()
-
- # Validate response structure
- if not isinstance(data, dict) or "lyrics" not in data:
- logger.error("Spotify: unexpected lyrics response structure")
- return LyricResult(
- status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
- )
-
- lyrics_data = data["lyrics"]
- sync_type = lyrics_data.get("syncType", "")
- lines = lyrics_data.get("lines", [])
-
- if not isinstance(lines, list) or len(lines) == 0:
- logger.debug("Spotify: response contained no lyric lines")
- return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
-
- # Determine sync status
- # syncType == "LINE_SYNCED" AND at least one non-zero timestamp
- is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
-
- # Convert to LRC
- lrc_lines: list[str] = []
- for line in lines:
- words = line.get("words", "")
- if not isinstance(words, str):
- continue
- try:
- ms = int(line.get("startTimeMs", "0"))
- except (ValueError, TypeError):
- ms = 0
-
- if is_synced:
- lrc_lines.append(self._format_lrc_line(ms, words))
- else:
- # Unsynced: emit with zero timestamps
- lrc_lines.append(f"[00:00.00]{words}")
-
- content = normalize_tags("\n".join(lrc_lines))
- status = (
- CacheStatus.SUCCESS_SYNCED
- if is_synced
- else CacheStatus.SUCCESS_UNSYNCED
- )
-
- logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)")
- return LyricResult(status=status, lyrics=content, source=self.source_name)
-
- except Exception as e:
- logger.error(f"Spotify: lyrics fetch failed: {e}")
- return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
diff --git a/lrcfetch/lrc.py b/lrcfetch/lrc.py
deleted file mode 100644
index 6913512..0000000
--- a/lrcfetch/lrc.py
+++ /dev/null
@@ -1,178 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 21:54:01
-Description: Shared LRC time-tag utilities (definitely overengineered)
-"""
-
-import re
-from pathlib import Path
-from typing import Optional
-from urllib.parse import unquote
-
-from .models import CacheStatus
-
-# Parses any time tag input format:
-# [mm:ss], [mm:ss.c], [mm:ss.cc], [mm:ss.ccc], [mm:ss:cc], …
-_RAW_TAG_RE = re.compile(r"\[(\d{2,}):(\d{2})(?:[.:](\d{1,3}))?\]")
-
-# Standard format after normalization: [mm:ss.cc]
-_STD_TAG_RE = re.compile(r"\[\d{2,}:\d{2}\.\d{2}\]")
-
-# Standard format with capture groups
-_STD_TAG_CAPTURE_RE = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2})\]")
-
-# Matches a standard time tag at the start of a line
-_LRC_LINE_RE = re.compile(r"^\[\d{2,}:\d{2}\.\d{2}\]", re.MULTILINE)
-
-# [offset:+/-xxx] tag — value in milliseconds
-_OFFSET_RE = re.compile(r"^\[offset:\s*([+-]?\d+)\]\s*$", re.MULTILINE | re.IGNORECASE)
-
-
-def _raw_tag_to_cs(mm: str, ss: str, frac: Optional[str]) -> str:
- """Convert parsed time tag components to standard [mm:ss.cc] string."""
- if frac is None:
- ms = 0
- else:
- # cc in [mm:ss:cc] is also treated as centiseconds, per LRC spec
- # ^
- # why does this format even exist, idk
- n = len(frac)
- if n == 1:
- ms = int(frac) * 100
- elif n == 2:
- ms = int(frac) * 10
- else:
- ms = int(frac)
- cs = min(round(ms / 10), 99)
- return f"[{mm}:{ss}.{cs:02d}]"
-
-
-def _reformat(text: str) -> str:
- """Parse each line and reformat to standard [mm:ss.cc]...content form.
-
- Handles any mix of time tag formats on input. Lines with no time tags
- are stripped of leading/trailing whitespace and passed through unchanged.
- """
- out: list[str] = []
- for line in text.splitlines():
- line = line.strip()
- pos = 0
- tags: list[str] = []
- while True:
- while pos < len(line) and line[pos] == " ":
- pos += 1
- m = _RAW_TAG_RE.match(line, pos)
- # Non-time tags are passed through as-is, except for leading/trailing whitespace which is stripped.
- if not m:
- # No more tags on this line
- break
- tags.append(_raw_tag_to_cs(m.group(1), m.group(2), m.group(3)))
- pos = m.end()
- if tags:
- # This could break lyric lines of some kind of word-synced LRC format,
- # but such format were not planned to be supported in the first place, so…
- out.append("".join(tags) + line[pos:].lstrip())
- else:
- out.append(line)
- # Empty lines with no tags are also preserved
- return "\n".join(out)
-
-
-def _apply_offset(text: str) -> str:
- """Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
-
- Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
- """
- m = _OFFSET_RE.search(text)
- if not m:
- return text
- offset_ms = int(m.group(1))
- text = _OFFSET_RE.sub("", text).strip("\n")
- if offset_ms == 0:
- return text
-
- def _shift(match: re.Match) -> str:
- total_ms = max(
- 0,
- (int(match.group(1)) * 60 + int(match.group(2))) * 1000
- + int(match.group(3)) * 10
- - offset_ms,
- )
- new_mm = total_ms // 60000
- new_ss = (total_ms % 60000) // 1000
- new_cs = min(round((total_ms % 1000) / 10), 99)
- return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
-
- return _STD_TAG_CAPTURE_RE.sub(_shift, text)
-
-
-def normalize_tags(text: str) -> str:
- """Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset."""
- return _apply_offset(_reformat(text))
-
-
-def is_synced(text: str) -> bool:
- """Check whether text contains non-zero LRC time tags.
-
- Assumes text has been normalized by normalize_tags (standard [mm:ss.cc] format).
- """
- tags = _STD_TAG_RE.findall(text)
- return bool(tags) and any(tag != "[00:00.00]" for tag in tags)
-
-
-def detect_sync_status(text: str) -> CacheStatus:
- """Determine whether lyrics contain meaningful LRC time tags.
-
- Assumes text has been normalized by normalize_tags.
- """
- return (
- CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED
- )
-
-
-def normalize_unsynced(lyrics: str) -> str:
- """Normalize unsynced lyrics so every line has a [00:00.00] tag.
-
- - Lines that already have time tags: replace with [00:00.00]
- - Lines without time tags: prepend [00:00.00]
- - Blank lines are converted to [00:00.00]
- """
- out: list[str] = []
- for line in lyrics.splitlines():
- stripped = line.strip()
- if not stripped:
- out.append("[00:00.00]")
- continue
- cleaned = _LRC_LINE_RE.sub("", stripped)
- while _LRC_LINE_RE.match(cleaned):
- cleaned = _LRC_LINE_RE.sub("", cleaned)
- out.append(f"[00:00.00]{cleaned}")
- return "\n".join(out)
-
-
-def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
- """Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist."""
- if not audio_url.startswith("file://"):
- return None
- file_path = unquote(audio_url.replace("file://", "", 1))
- path = Path(file_path)
- if ensure_exists and not path.exists():
- return None
- return path
-
-
-def get_sidecar_path(
- audio_url: str, ensure_audio_exists: bool = False, ensure_exists: bool = False
-) -> Optional[Path]:
- """Given a file:// URL, return the corresponding .lrc sidecar path.
-
- If ensure_audio_exists is True, return None if the audio file does not exist.
- If ensure_exists is True, return None if the .lrc file does not exist.
- """
- audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists)
- if not audio_path:
- return None
- lrc_path = audio_path.with_suffix(".lrc")
- if ensure_exists and not lrc_path.exists():
- return None
- return lrc_path
diff --git a/lrcfetch/models.py b/lrcfetch/models.py
deleted file mode 100644
index 775922a..0000000
--- a/lrcfetch/models.py
+++ /dev/null
@@ -1,59 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 04:09:36
-Description: Data models
-"""
-
-from enum import Enum
-from typing import Optional
-from dataclasses import dataclass
-
-
-class CacheStatus(str, Enum):
- """Status of a cached lyric entry."""
-
- SUCCESS_SYNCED = "SUCCESS_SYNCED"
- SUCCESS_UNSYNCED = "SUCCESS_UNSYNCED"
- NOT_FOUND = "NOT_FOUND"
- NETWORK_ERROR = "NETWORK_ERROR"
-
-
-@dataclass
-class TrackMeta:
- """Metadata describing a track obtained from MPRIS or manual input."""
-
- trackid: Optional[str] = None # Spotify track ID (without "spotify:track:" prefix)
- length: Optional[int] = None # Duration in milliseconds
- album: Optional[str] = None
- artist: Optional[str] = None
- title: Optional[str] = None
- url: Optional[str] = None # Playback URL (file:// for local files)
-
- @property
- def is_local(self) -> bool:
- """True when the track is a local file (file:// URL)."""
- return bool(self.url and self.url.startswith("file://"))
-
- @property
- def is_complete(self) -> bool:
- """True when all fields required by LRCLIB are present."""
- return all([self.length, self.album, self.title, self.artist])
-
- def display_name(self) -> str:
- """Human-readable representation for logging."""
- parts = []
- if self.artist:
- parts.append(self.artist)
- if self.title:
- parts.append(self.title)
- return " - ".join(parts) if parts else self.trackid or self.url or "(unknown)"
-
-
-@dataclass
-class LyricResult:
- """Result of a lyric fetch attempt, also used as cache record."""
-
- status: CacheStatus
- lyrics: Optional[str] = None
- source: Optional[str] = None # Which fetcher produced this result
- ttl: Optional[int] = None # Hint for cache TTL (seconds)
diff --git a/lrcfetch/mpris.py b/lrcfetch/mpris.py
deleted file mode 100644
index 58ef486..0000000
--- a/lrcfetch/mpris.py
+++ /dev/null
@@ -1,188 +0,0 @@
-"""
-Author: Uyanide pywang0608@foxmail.com
-Date: 2026-03-25 04:44:15
-Description: MPRIS integration for fetching track metadata
-"""
-
-import asyncio
-from dbus_next.aio.message_bus import MessageBus
-from dbus_next.constants import BusType
-from dbus_next.message import Message
-from lrcfetch.models import TrackMeta
-from lrcfetch.config import PREFERRED_PLAYER
-from loguru import logger
-from typing import Optional, List, Any
-
-
-async def _list_mpris_players(bus: MessageBus) -> List[str]:
- """List all MPRIS player bus names."""
- try:
- reply = await bus.call(
- Message(
- destination="org.freedesktop.DBus",
- path="/org/freedesktop/DBus",
- interface="org.freedesktop.DBus",
- member="ListNames",
- )
- )
- if not reply or not reply.body:
- return []
- return [
- name for name in reply.body[0] if name.startswith("org.mpris.MediaPlayer2.")
- ]
- except Exception as e:
- logger.error(f"Failed to list DBus names: {e}")
- return []
-
-
-async def _get_playback_status(bus: MessageBus, player_name: str) -> Optional[str]:
- """Get PlaybackStatus ('Playing', 'Paused', 'Stopped') for a player."""
- try:
- introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
- proxy = bus.get_proxy_object(
- player_name, "/org/mpris/MediaPlayer2", introspection
- )
- props = proxy.get_interface("org.freedesktop.DBus.Properties")
- status_var = await getattr(props, "call_get")(
- "org.mpris.MediaPlayer2.Player", "PlaybackStatus"
- )
- return status_var.value if status_var else None
- except Exception as e:
- logger.debug(f"Could not get playback status for {player_name}: {e}")
- return None
-
-
-async def _select_player(
- bus: MessageBus, specific_player: Optional[str] = None
-) -> Optional[str]:
- """Select the best MPRIS player.
-
- When specific_player is given, filter by name match.
- Otherwise: prefer the currently playing player. If multiple are playing,
- prefer the one matching LRCFETCH_PLAYER env var (default: spotify).
- """
- players = await _list_mpris_players(bus)
- if not players:
- return None
-
- if specific_player:
- players = [p for p in players if specific_player.lower() in p.lower()]
- return players[0] if players else None
-
- # Check playback status for each player
- playing = []
- for p in players:
- status = await _get_playback_status(bus, p)
- logger.debug(f"Player {p}: {status}")
- if status == "Playing":
- playing.append(p)
-
- candidates = playing if playing else players
-
- if len(candidates) == 1:
- return candidates[0]
-
- # Multiple candidates: prefer LRCFETCH_PLAYER
- preferred = PREFERRED_PLAYER.lower()
- if preferred:
- for p in candidates:
- if preferred in p.lower():
- return p
- return candidates[0]
-
-
-async def _fetch_metadata_dbus(
- specific_player: Optional[str] = None,
-) -> Optional[TrackMeta]:
- bus = None
- try:
- bus = await MessageBus(bus_type=BusType.SESSION).connect()
- except Exception as e:
- logger.error(f"Failed to connect to DBus: {e}")
- return None
-
- try:
- player_name = await _select_player(bus, specific_player)
- if not player_name:
- logger.debug(
- f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}."
- )
- return None
-
- logger.debug(f"Using player: {player_name}")
-
- introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
- proxy = bus.get_proxy_object(
- player_name, "/org/mpris/MediaPlayer2", introspection
- )
-
- props_iface = proxy.get_interface("org.freedesktop.DBus.Properties")
- if not props_iface:
- logger.error(f"Player {player_name} doesn't support Properties interface.")
- return None
-
- try:
- metadata_var: Any = await getattr(props_iface, "call_get")(
- "org.mpris.MediaPlayer2.Player", "Metadata"
- )
- if not metadata_var:
- logger.error("Empty metadata received.")
- return None
-
- metadata = metadata_var.value
-
- # Extract trackid — MPRIS returns either "spotify:track:ID"
- # or a DBus object path like "/com/spotify/track/ID"
- trackid = metadata.get("mpris:trackid", None)
- if trackid:
- trackid = trackid.value
- if isinstance(trackid, str):
- if trackid.startswith("spotify:track:"):
- trackid = trackid.removeprefix("spotify:track:")
- elif trackid.startswith("/com/spotify/track/"):
- trackid = trackid.removeprefix("/com/spotify/track/")
-
- # Extract length (usually microseconds)
- length = metadata.get("mpris:length", None)
- if length:
- length = length.value // 1000 if isinstance(length.value, int) else None
-
- album = metadata.get("xesam:album", None)
- album = album.value if album else None
-
- artist = metadata.get("xesam:artist", None)
- artist = (
- artist.value[0]
- if artist and isinstance(artist.value, list) and artist.value
- else None
- )
-
- title = metadata.get("xesam:title", None)
- title = title.value if title else None
-
- url = metadata.get("xesam:url", None)
- url = url.value if url else None
-
- return TrackMeta(
- trackid=trackid,
- length=length,
- album=album,
- artist=artist,
- title=title,
- url=url,
- )
- except Exception as e:
- logger.error(f"Failed to get properties from {player_name}: {e}")
- return None
-
- finally:
- if bus:
- bus.disconnect()
-
-
-def get_current_track(player_name: Optional[str] = None) -> Optional[TrackMeta]:
- try:
- return asyncio.run(_fetch_metadata_dbus(player_name))
- except Exception as e:
- logger.error(f"DBus async loop failed: {e}")
- return None