From 4d48e2153345ea06d23ad1c4d35bd1d407acdd40 Mon Sep 17 00:00:00 2001 From: Uyanide Date: Tue, 31 Mar 2026 22:33:03 +0200 Subject: [PATCH] delete old files --- lrcfetch/__init__.py | 0 lrcfetch/__main__.py | 4 - lrcfetch/cache.py | 441 ----------------------------- lrcfetch/cli.py | 426 ---------------------------- lrcfetch/config.py | 88 ------ lrcfetch/core.py | 178 ------------ lrcfetch/enrichers/__init__.py | 39 --- lrcfetch/enrichers/audio_tag.py | 78 ----- lrcfetch/enrichers/base.py | 31 -- lrcfetch/enrichers/file_name.py | 83 ------ lrcfetch/fetchers/__init__.py | 41 --- lrcfetch/fetchers/base.py | 35 --- lrcfetch/fetchers/cache_search.py | 85 ------ lrcfetch/fetchers/local.py | 98 ------- lrcfetch/fetchers/lrclib.py | 111 -------- lrcfetch/fetchers/lrclib_search.py | 168 ----------- lrcfetch/fetchers/netease.py | 213 -------------- lrcfetch/fetchers/qqmusic.py | 178 ------------ lrcfetch/fetchers/spotify.py | 373 ------------------------ lrcfetch/lrc.py | 178 ------------ lrcfetch/models.py | 59 ---- lrcfetch/mpris.py | 188 ------------ 22 files changed, 3095 deletions(-) delete mode 100644 lrcfetch/__init__.py delete mode 100644 lrcfetch/__main__.py delete mode 100644 lrcfetch/cache.py delete mode 100644 lrcfetch/cli.py delete mode 100644 lrcfetch/config.py delete mode 100644 lrcfetch/core.py delete mode 100644 lrcfetch/enrichers/__init__.py delete mode 100644 lrcfetch/enrichers/audio_tag.py delete mode 100644 lrcfetch/enrichers/base.py delete mode 100644 lrcfetch/enrichers/file_name.py delete mode 100644 lrcfetch/fetchers/__init__.py delete mode 100644 lrcfetch/fetchers/base.py delete mode 100644 lrcfetch/fetchers/cache_search.py delete mode 100644 lrcfetch/fetchers/local.py delete mode 100644 lrcfetch/fetchers/lrclib.py delete mode 100644 lrcfetch/fetchers/lrclib_search.py delete mode 100644 lrcfetch/fetchers/netease.py delete mode 100644 lrcfetch/fetchers/qqmusic.py delete mode 100644 lrcfetch/fetchers/spotify.py delete mode 100644 lrcfetch/lrc.py delete mode 100644 lrcfetch/models.py delete mode 100644 lrcfetch/mpris.py diff --git a/lrcfetch/__init__.py b/lrcfetch/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lrcfetch/__main__.py b/lrcfetch/__main__.py deleted file mode 100644 index a2fdeab..0000000 --- a/lrcfetch/__main__.py +++ /dev/null @@ -1,4 +0,0 @@ -from lrcfetch.cli import run - -if __name__ == "__main__": - run() diff --git a/lrcfetch/cache.py b/lrcfetch/cache.py deleted file mode 100644 index 7e16fc3..0000000 --- a/lrcfetch/cache.py +++ /dev/null @@ -1,441 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 10:18:03 -Description: SQLite-based lyric cache with per-source storage and TTL expiration -""" - -import re -import sqlite3 -import hashlib -import time -import unicodedata -from typing import Optional -from loguru import logger - -from .config import DB_PATH, DURATION_TOLERANCE_MS -from .models import TrackMeta, LyricResult, CacheStatus - -# Punctuation to strip for fuzzy matching (ASCII + fullwidth + CJK brackets/symbols) -_PUNCT_RE = re.compile( - r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`" - r"~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`" - r"「」『』《》〈〉〔〕·•‥…—–]" -) -_SPACE_RE = re.compile(r"\s+") -# feat./ft./featuring and everything after (case-insensitive, word boundary) -_FEAT_RE = re.compile(r"\s*(?:\bfeat\.?\b|\bft\.?\b|\bfeaturing\b).*", re.IGNORECASE) -# Multi-artist separators: /, &, ×, x (surrounded by spaces), ;, 、, vs. -_ARTIST_SEP_RE = re.compile(r"\s*(?:[/&;×、]|\bvs\.?\b|\bx\b)\s*", re.IGNORECASE) - - -def _normalize_for_match(s: str) -> str: - """Normalize a string for fuzzy comparison. - - Lowercases, NFKC-normalizes (fullwidth → halfwidth), strips punctuation, - and collapses whitespace. - """ - s = unicodedata.normalize("NFKC", s).lower() - s = _FEAT_RE.sub("", s) - s = _PUNCT_RE.sub(" ", s) - s = _SPACE_RE.sub(" ", s).strip() - return s - - -def _normalize_artist(s: str) -> str: - """Normalize an artist string: split by separators, normalize each, sort. - - Splits first (on /, &, ;, ×, 、, vs., x), then strips feat./ft./featuring - from each part individually, so 'A feat. C / B' → ['a', 'b'] not just ['a']. - """ - s = unicodedata.normalize("NFKC", s).lower() - parts = _ARTIST_SEP_RE.split(s) - normed = sorted( - {_normalize_for_match(p) for p in parts if _FEAT_RE.sub("", p).strip()} - ) - return "\0".join(normed) if normed else _normalize_for_match(s) - - -def _generate_key(track: TrackMeta, source: str) -> str: - """Generate a unique cache key from track metadata and source. - - The key is scoped by source so that different fetchers can cache - independently for the same track (e.g. Spotify synced vs Netease unsynced). - """ - # Spotify tracks always use their track ID as the primary identifier - if track.trackid and source == "spotify": - return f"spotify:{track.trackid}" - - parts = [] - if track.artist: - parts.append(track.artist) - if track.title: - parts.append(track.title) - if track.album: - parts.append(track.album) - if track.length: - parts.append(str(track.length)) - - # Fall back to URL for local files - if not parts and track.url: - return f"{source}:url:{track.url}" - - if not parts: - raise ValueError("Insufficient metadata to generate cache key") - - raw = "|".join(parts) - digest = hashlib.sha256(raw.encode()).hexdigest() - return f"{source}:{digest}" - - -class CacheEngine: - def __init__(self, db_path: str = DB_PATH): - self.db_path = db_path - self._init_db() - - def _init_db(self) -> None: - """Create or migrate the cache table.""" - with sqlite3.connect(self.db_path) as conn: - conn.execute(""" - CREATE TABLE IF NOT EXISTS cache ( - key TEXT PRIMARY KEY, - source TEXT NOT NULL, - status TEXT NOT NULL, - lyrics TEXT, - created_at INTEGER NOT NULL, - expires_at INTEGER, - artist TEXT, - title TEXT, - album TEXT, - length INTEGER - ) - """) - # Migration: add length column if missing - cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()} - if "length" not in cols: - conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER") - conn.commit() - - # Read - - def get(self, track: TrackMeta, source: str) -> Optional[LyricResult]: - """Look up a cached result for *track* from *source*. - - Returns None on cache miss or expiration. - """ - try: - key = _generate_key(track, source) - except ValueError: - return None - - with sqlite3.connect(self.db_path) as conn: - row = conn.execute( - "SELECT status, lyrics, source, expires_at, length FROM cache WHERE key = ?", - (key,), - ).fetchone() - - if not row: - logger.debug(f"Cache miss: {source} / {track.display_name()}") - return None - - status_str, lyrics, src, expires_at, cached_length = row - - # Check TTL expiration - if expires_at and expires_at < int(time.time()): - logger.debug(f"Cache expired: {source} / {track.display_name()}") - conn.execute("DELETE FROM cache WHERE key = ?", (key,)) - conn.commit() - return None - - # Backfill length if the cached row is missing it - if cached_length is None and track.length is not None: - conn.execute( - "UPDATE cache SET length = ? WHERE key = ?", - (track.length, key), - ) - conn.commit() - - remaining = expires_at - int(time.time()) if expires_at else None - logger.debug( - f"Cache hit: {source} / {track.display_name()} " - f"[{status_str}, ttl={remaining}s]" - ) - return LyricResult( - status=CacheStatus(status_str), - lyrics=lyrics, - source=src, - ttl=remaining, - ) - - def get_best(self, track: TrackMeta, sources: list[str]) -> Optional[LyricResult]: - """Return the best cached result across *sources* (synced > unsynced). - - Skips negative statuses (NOT_FOUND, NETWORK_ERROR) — those are only - consulted per-source to avoid redundant fetches. - """ - best: Optional[LyricResult] = None - for src in sources: - cached = self.get(track, src) - if not cached: - continue - if cached.status == CacheStatus.SUCCESS_SYNCED: - return cached # Can't do better - if cached.status == CacheStatus.SUCCESS_UNSYNCED and best is None: - best = cached - return best - - # Write - - def set( - self, - track: TrackMeta, - source: str, - result: LyricResult, - ttl_seconds: Optional[int] = None, - ) -> None: - """Store a lyric result in the cache.""" - try: - key = _generate_key(track, source) - except ValueError: - logger.warning("Cannot cache: insufficient track metadata.") - return - - now = int(time.time()) - expires_at = now + ttl_seconds if ttl_seconds else None - - with sqlite3.connect(self.db_path) as conn: - conn.execute( - """INSERT OR REPLACE INTO cache - (key, source, status, lyrics, created_at, expires_at, - artist, title, album, length) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - ( - key, - source, - result.status.value, - result.lyrics, - now, - expires_at, - track.artist, - track.title, - track.album, - track.length, - ), - ) - conn.commit() - logger.debug( - f"Cached: {source} / {track.display_name()} " - f"[{result.status.value}, ttl={ttl_seconds}s]" - ) - - # Delete - - def clear_all(self) -> None: - """Remove every entry from the cache.""" - with sqlite3.connect(self.db_path) as conn: - conn.execute("DELETE FROM cache") - conn.commit() - logger.info("Cache cleared.") - - def clear_track(self, track: TrackMeta) -> None: - """Remove all cached entries (every source) for a single track.""" - conditions, params = self._track_where(track) - if not conditions: - logger.info(f"No cache entries found for {track.display_name()}.") - return - where = " AND ".join(conditions) - with sqlite3.connect(self.db_path) as conn: - cur = conn.execute(f"DELETE FROM cache WHERE {where}", params) - conn.commit() - if cur.rowcount: - logger.info( - f"Cleared {cur.rowcount} cache entries for {track.display_name()}." - ) - else: - logger.info(f"No cache entries found for {track.display_name()}.") - - def prune(self) -> int: - """Remove all expired entries. Returns the number of rows deleted.""" - with sqlite3.connect(self.db_path) as conn: - cur = conn.execute( - "DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", - (int(time.time()),), - ) - conn.commit() - count = cur.rowcount - logger.info(f"Pruned {count} expired cache entries.") - return count - - @staticmethod - def _track_where(track: TrackMeta) -> tuple[list[str], list[str]]: - """Build WHERE conditions to match a track across all sources.""" - conditions: list[str] = [] - params: list[str] = [] - if track.artist: - conditions.append("artist = ?") - params.append(track.artist) - if track.title: - conditions.append("title = ?") - params.append(track.title) - if track.album: - conditions.append("album = ?") - params.append(track.album) - return conditions, params - - # Exact cross-source search - - def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]: - """Find the best positive (synced/unsynced) cache entry for *track*. - - Uses exact metadata match (artist + title + album) across all sources. - Returns synced if available, otherwise unsynced, or None. - """ - conditions, params = self._track_where(track) - if not conditions: - return None - - now = int(time.time()) - conditions.append("status IN (?, ?)") - params.extend( - [CacheStatus.SUCCESS_SYNCED.value, CacheStatus.SUCCESS_UNSYNCED.value] - ) - conditions.append("(expires_at IS NULL OR expires_at > ?)") - params.append(str(now)) - - where = " AND ".join(conditions) - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - rows = conn.execute( - f"SELECT status, lyrics, source FROM cache WHERE {where} " - "ORDER BY CASE status WHEN ? THEN 0 ELSE 1 END LIMIT 1", - params + [CacheStatus.SUCCESS_SYNCED.value], - ).fetchall() - - if not rows: - return None - - row = dict(rows[0]) - return LyricResult( - status=CacheStatus(row["status"]), - lyrics=row["lyrics"], - source="cache-search", - ) - - # Fuzzy search - - def search_by_meta( - self, - artist: Optional[str], - title: Optional[str], - length: Optional[int] = None, - ) -> list[dict]: - """Search cache for lyrics matching artist/title with fuzzy normalization. - - Ignores album and source. Only returns positive results (synced/unsynced) - that have not expired. When *length* is provided, filters by duration - tolerance and sorts by closest match. - """ - if not title: - return [] - - now = int(time.time()) - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - rows = conn.execute( - """SELECT * FROM cache - WHERE status IN (?, ?) - AND (expires_at IS NULL OR expires_at > ?)""", - ( - CacheStatus.SUCCESS_SYNCED.value, - CacheStatus.SUCCESS_UNSYNCED.value, - now, - ), - ).fetchall() - - norm_title = _normalize_for_match(title) - norm_artist = _normalize_artist(artist) if artist else None - - matches: list[dict] = [] - for row in rows: - row_dict = dict(row) - # Title must match - row_title = row_dict.get("title") or "" - if _normalize_for_match(row_title) != norm_title: - continue - # Artist must match if provided - if norm_artist: - row_artist = row_dict.get("artist") or "" - if _normalize_artist(row_artist) != norm_artist: - continue - matches.append(row_dict) - - # Duration filtering - if length is not None and matches: - scored = [] - for m in matches: - row_len = m.get("length") - if row_len is not None: - diff = abs(row_len - length) - if diff <= DURATION_TOLERANCE_MS: - scored.append((diff, m)) - else: - # No duration info in cache — still a candidate but lower priority - scored.append((DURATION_TOLERANCE_MS, m)) - scored.sort( - key=lambda x: ( - x[0], - x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value, - ) - ) - matches = [m for _, m in scored] - - return matches - - # Query / inspect - - def query_track(self, track: TrackMeta) -> list[dict]: - """Return all cached rows for a given track (across all sources).""" - conditions, params = self._track_where(track) - if not conditions: - return [] - where = " AND ".join(conditions) - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - return [ - dict(r) - for r in conn.execute( - f"SELECT * FROM cache WHERE {where}", params - ).fetchall() - ] - - def query_all(self) -> list[dict]: - """Return every row in the cache table.""" - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()] - - def stats(self) -> dict: - """Return aggregate cache statistics.""" - now = int(time.time()) - with sqlite3.connect(self.db_path) as conn: - total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0] - expired = conn.execute( - "SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?", - (now,), - ).fetchone()[0] - by_status = dict( - conn.execute( - "SELECT status, COUNT(*) FROM cache GROUP BY status" - ).fetchall() - ) - by_source = dict( - conn.execute( - "SELECT source, COUNT(*) FROM cache GROUP BY source" - ).fetchall() - ) - return { - "total": total, - "expired": expired, - "active": total - expired, - "by_status": by_status, - "by_source": by_source, - } diff --git a/lrcfetch/cli.py b/lrcfetch/cli.py deleted file mode 100644 index 48d7640..0000000 --- a/lrcfetch/cli.py +++ /dev/null @@ -1,426 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-26 02:04:39 -Description: CLI interface -""" - -import sys -import time -import os -from pathlib import Path -from typing import Annotated -from urllib.parse import quote -import cyclopts -from loguru import logger - -from .config import enable_debug -from .models import TrackMeta, CacheStatus -from .mpris import get_current_track -from .core import LrcManager -from .fetchers import FetcherMethodType -from .lrc import get_sidecar_path - - -app = cyclopts.App( - help="LRCFetch — Fetch line-synced lyrics for your music player.", -) -app.register_install_completion_command() - -cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.") -app.command(cache_app) - -manager = LrcManager() - -# Global state set by the meta launcher -_player: str | None = None - - -@app.meta.default -def launcher( - *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)], - debug: Annotated[ - bool, - cyclopts.Parameter( - name=["--debug", "-d"], negative="", help="Enable debug logging." - ), - ] = False, - player: Annotated[ - str | None, - cyclopts.Parameter( - name=["--player", "-p"], - help="Target a specific MPRIS player using its DBus name or a portion thereof.", - ), - ] = None, -): - global _player - if debug: - enable_debug() - _player = player - app(tokens) - - -# fetch - - -@app.command -def fetch( - *, - method: Annotated[ - FetcherMethodType | None, - cyclopts.Parameter(help="Force a specific source."), - ] = None, - no_cache: Annotated[ - bool, - cyclopts.Parameter( - name="--no-cache", negative="", help="Bypass the cache for this request." - ), - ] = False, - only_synced: Annotated[ - bool, - cyclopts.Parameter( - name="--only-synced", negative="", help="Only accept synced (timed) lyrics." - ), - ] = False, -): - """Fetch and print lyrics for the currently playing track.""" - track = get_current_track(_player) - - if not track: - logger.error("No active playing track found.") - sys.exit(1) - - logger.info(f"Track: {track.display_name()}") - - result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache) - - if not result or not result.lyrics: - logger.error("No lyrics found.") - sys.exit(1) - - if only_synced and result.status != CacheStatus.SUCCESS_SYNCED: - logger.error("Only unsynced lyrics available (--only-synced requested).") - sys.exit(1) - - print(result.lyrics) - - -# search - - -@app.command -def search( - *, - title: Annotated[ - str | None, cyclopts.Parameter(name=["--title", "-t"], help="Track title.") - ] = None, - artist: Annotated[ - str | None, cyclopts.Parameter(name=["--artist", "-a"], help="Artist name.") - ] = None, - album: Annotated[str | None, cyclopts.Parameter(help="Album name.")] = None, - trackid: Annotated[str | None, cyclopts.Parameter(help="Spotify track ID.")] = None, - length: Annotated[ - int | None, - cyclopts.Parameter( - name=["--length", "-l"], help="Track duration in milliseconds." - ), - ] = None, - url: Annotated[ - str | None, - cyclopts.Parameter( - help="Local file URL (file:///...). Mutually exclusive with --path." - ), - ] = None, - path: Annotated[ - str | None, - cyclopts.Parameter( - name=["--path"], - help="Local audio file path. Mutually exclusive with --url.", - ), - ] = None, - method: Annotated[ - FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.") - ] = None, - no_cache: Annotated[ - bool, - cyclopts.Parameter( - name="--no-cache", negative="", help="Bypass the cache for this request." - ), - ] = False, - only_synced: Annotated[ - bool, - cyclopts.Parameter( - name="--only-synced", negative="", help="Only accept synced (timed) lyrics." - ), - ] = False, -): - """Search for lyrics by metadata (bypasses MPRIS).""" - if url and path: - logger.error("--url and --path are mutually exclusive.") - sys.exit(1) - - if path: - resolved = str(Path(path).resolve()) - url = "file://" + quote(resolved, safe="/") - - track = TrackMeta( - title=title, - artist=artist, - album=album, - trackid=trackid, - length=length, - url=url, - ) - - logger.info(f"Track: {track.display_name()}") - - result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache) - - if not result or not result.lyrics: - logger.error("No lyrics found.") - sys.exit(1) - - if only_synced and result.status != CacheStatus.SUCCESS_SYNCED: - logger.error("Only unsynced lyrics available (--only-synced requested).") - sys.exit(1) - - print(result.lyrics) - - -# export - - -@app.command -def export( - *, - output: Annotated[ - str | None, - cyclopts.Parameter( - name=["--output", "-o"], - help="Output file path (default: same directory as audio file with .lrc extension, or current directory if not available).", - ), - ] = None, - method: Annotated[ - FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.") - ] = None, - no_cache: Annotated[ - bool, cyclopts.Parameter(name="--no-cache", negative="", help="Bypass cache.") - ] = False, - overwrite: Annotated[ - bool, - cyclopts.Parameter( - name=["--overwrite", "-f"], negative="", help="Overwrite existing file." - ), - ] = False, -): - """Export lyrics of the current track to a .lrc file.""" - track = get_current_track(_player) - if not track: - logger.error("No active playing track found.") - sys.exit(1) - - result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache) - if not result or not result.lyrics: - logger.error("No lyrics available to export.") - sys.exit(1) - - # Build default output path - if not output: - if track.url: - lrc_path = get_sidecar_path(track.url, ensure_exists=False) - if lrc_path: - output = str(lrc_path) - logger.info(f"Exporting to sidecar path: {output}") - - # Fallback to current directory with sanitized filename - if not output: - filename = ( - f"{track.artist} - {track.title}.lrc" - if track.artist and track.title - else "lyrics.lrc" - ) - # Sanitize filename - filename = "".join( - c for c in filename if c.isalpha() or c.isdigit() or c in " -_." - ).rstrip() - output = os.path.join(os.getcwd(), filename) - - if os.path.exists(output) and not overwrite: - logger.error(f"File exists: {output} (use -f to overwrite)") - sys.exit(1) - - try: - with open(output, "w", encoding="utf-8") as f: - f.write(result.lyrics) - logger.info(f"Exported lyrics to {output}") - except Exception as e: - logger.error(f"Failed to write file: {e}") - sys.exit(1) - - -# cache subcommands - - -@cache_app.command -def query( - *, - all: Annotated[ - bool, - cyclopts.Parameter(name="--all", negative="", help="Dump all cache entries."), - ] = False, -): - """Show cached entries for the current track.""" - if all: - rows = manager.cache.query_all() - if not rows: - print("Cache is empty.") - return - for row in rows: - _print_cache_row(row) - print() - return - - track = get_current_track(_player) - if not track: - logger.error("No active playing track found.") - sys.exit(1) - _print_track_cache(track) - - -@cache_app.command -def clear( - *, - all: Annotated[ - bool, - cyclopts.Parameter(name="--all", negative="", help="Clear the entire cache."), - ] = False, -): - """Clear cached entries for the current track.""" - if all: - manager.cache.clear_all() - return - - track = get_current_track(_player) - if not track: - logger.error("No active playing track found.") - sys.exit(1) - manager.cache.clear_track(track) - - -@cache_app.command -def prune(): - """Remove expired cache entries.""" - manager.cache.prune() - - -@cache_app.command -def stats(): - """Show cache statistics.""" - s = manager.cache.stats() - print("=== Cache Statistics ===") - print(f"Total entries : {s['total']}") - print(f"Active : {s['active']}") - print(f"Expired : {s['expired']}") - if s["by_status"]: - print("\nBy status:") - for status, count in s["by_status"].items(): - print(f" {status}: {count}") - if s["by_source"]: - print("\nBy source:") - for source, count in s["by_source"].items(): - print(f" {source}: {count}") - - -@cache_app.command -def insert( - *, - path: Annotated[ - str | None, - cyclopts.Parameter( - name=["--path"], - help="Path to a local .lrc file to insert instead of reading from stdin.", - ), - ] = None, -): - """Manually insert lyrics into the cache for the current track.""" - track = get_current_track(_player) - if not track: - logger.error("No active playing track found.") - sys.exit(1) - - if path: - try: - with open(path, "r", encoding="utf-8") as f: - lyrics = f.read() - except Exception as e: - logger.error(f"Failed to read file: {e}") - sys.exit(1) - else: - logger.info("Reading lyrics from stdin (Ctrl+D to finish)...") - lyrics = sys.stdin.read() - - manager.manual_insert(track, lyrics) - - -# helpers - - -def _print_track_cache(track: TrackMeta) -> None: - """Print all cached entries for a given track.""" - print(f"Track: {track.display_name()}") - if track.album: - print(f"Album: {track.album}") - if track.length: - secs = track.length / 1000.0 - print(f"Duration: {int(secs // 60)}:{secs % 60:05.2f}") - print() - - rows = manager.cache.query_track(track) - if not rows: - print(" (no cache entries)") - return - - for row in rows: - _print_cache_row(row, indent=" ") - - -def _print_cache_row(row: dict, indent: str = "") -> None: - """Pretty-print a single cache row.""" - now = int(time.time()) - source = row.get("source", "?") - status = row.get("status", "?") - artist = row.get("artist", "") - title = row.get("title", "") - album = row.get("album", "") - created = row.get("created_at", 0) - expires = row.get("expires_at") - lyrics = row.get("lyrics", "") - - name = f"{artist} - {title}" if artist and title else row.get("key", "?") - print(f"{indent}[{source}] {name}") - if album: - print(f"{indent} Album : {album}") - print(f"{indent} Status : {status}") - if created: - age = now - created - print(f"{indent} Cached : {age // 3600}h {(age % 3600) // 60}m ago") - if expires: - remaining = expires - now - if remaining > 0: - print( - f"{indent} Expires : in {remaining // 3600}h {(remaining % 3600) // 60}m" - ) - else: - print(f"{indent} Expires : EXPIRED") - else: - print(f"{indent} Expires : never") - if lyrics: - line_count = len(lyrics.splitlines()) - print(f"{indent} Lyrics : {line_count} lines") - - -def run(): - app.meta() - - -if __name__ == "__main__": - run() diff --git a/lrcfetch/config.py b/lrcfetch/config.py deleted file mode 100644 index 5f0f5b8..0000000 --- a/lrcfetch/config.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 10:17:56 -Description: Global configuration constants and logger setup -""" - -import os -import sys -from pathlib import Path -from platformdirs import user_cache_dir, user_config_dir -from dotenv import load_dotenv -from loguru import logger -from importlib.metadata import version - -# Application -APP_NAME = "lrcfetch" -APP_AUTHOR = "Uyanide" -APP_VERSION = version(APP_NAME) - -# Paths -CACHE_DIR = user_cache_dir(APP_NAME, APP_AUTHOR) -DB_PATH = os.path.join(CACHE_DIR, "cache.db") - -# .env loading -_config_env = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / ".env" -load_dotenv(_config_env) # ~/.config/lrcfetch/.env -load_dotenv() # .env in cwd (does NOT override existing vars) - -# HTTP -HTTP_TIMEOUT = 10.0 - -# Cache TTLs (seconds) -TTL_SYNCED = None # never expires -TTL_UNSYNCED = 86400 # 1 day -TTL_NOT_FOUND = 86400 * 3 # 3 days -TTL_NETWORK_ERROR = 3600 # 1 hour - -# Search -DURATION_TOLERANCE_MS = 3000 # max duration mismatch for search matching - -# Spotify related -SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token" -SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/" -SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time" -SPOTIFY_SECRET_URL = ( - "https://raw.githubusercontent.com/xyloflake/spot-secrets-go" - "/refs/heads/main/secrets/secrets.json" -) -SPOTIFY_SP_DC = os.environ.get("SPOTIFY_SP_DC", "") -SPOTIFY_TOKEN_CACHE_FILE = os.path.join(CACHE_DIR, "spotify_token.json") -SPOTIFY_APP_VERSION = "1.2.87.284.g3ff41c13" - -# Netease api -NETEASE_SEARCH_URL = "https://music.163.com/api/cloudsearch/pc" -NETEASE_LYRIC_URL = "https://interface3.music.163.com/api/song/lyric" - -# LRCLIB api -LRCLIB_API_URL = "https://lrclib.net/api/get" -LRCLIB_SEARCH_URL = "https://lrclib.net/api/search" - -# QQ Music API (self-hosted proxy) -QQ_MUSIC_API_URL = os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/") - -# Player preference (used when multiple MPRIS players are active) -PREFERRED_PLAYER = os.environ.get("LRCFETCH_PLAYER", "spotify") - -# User-Agents -UA_BROWSER = "Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0" -UA_LRCFETCH = f"LRCFetch {APP_VERSION} (https://github.com/Uyanide/lrcfetch)" - -os.makedirs(CACHE_DIR, exist_ok=True) - -# Logger -_LOG_FORMAT = ( - "{time:YYYY-MM-DD HH:mm:ss} | " - "{level: <8} | " - "{name}:{function}:{line} - " - "{message}" -) - -logger.remove() -logger.add(sys.stderr, format=_LOG_FORMAT, level="INFO") - - -def enable_debug() -> None: - """Switch logger to DEBUG level.""" - logger.remove() - logger.add(sys.stderr, format=_LOG_FORMAT, level="DEBUG") diff --git a/lrcfetch/core.py b/lrcfetch/core.py deleted file mode 100644 index 04ba9ea..0000000 --- a/lrcfetch/core.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 11:09:53 -Description: Core orchestrator — coordinates fetchers with cache-aware fallback -""" - -""" -Fetch pipeline: - 1. Check cache for each source in the fallback sequence - 2. For sources without a valid cache hit, call the fetcher - 3. Cache every result (success, not-found, or error) per source - 4. Return the best result (synced > unsynced > None) -""" - -from typing import Optional -from loguru import logger - -from .fetchers import FetcherMethodType, create_fetchers -from .fetchers.base import BaseFetcher -from .cache import CacheEngine -from .lrc import normalize_tags, normalize_unsynced, detect_sync_status -from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR -from .models import TrackMeta, LyricResult, CacheStatus -from .enrichers import enrich_track - - -# Maps CacheStatus to the default TTL used when storing results -_STATUS_TTL: dict[CacheStatus, Optional[int]] = { - CacheStatus.SUCCESS_SYNCED: TTL_SYNCED, - CacheStatus.SUCCESS_UNSYNCED: TTL_UNSYNCED, - CacheStatus.NOT_FOUND: TTL_NOT_FOUND, - CacheStatus.NETWORK_ERROR: TTL_NETWORK_ERROR, -} - - -class LrcManager: - """Main entry point for fetching lyrics with caching.""" - - def __init__(self) -> None: - self.cache = CacheEngine() - self.fetchers = create_fetchers(self.cache) - - def _build_sequence( - self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None - ) -> list[BaseFetcher]: - """Determine the ordered list of fetchers to try.""" - if force_method: - if force_method not in self.fetchers: - logger.error(f"Unknown method: {force_method}") - return [] - return [self.fetchers[force_method]] - - sequence: list[BaseFetcher] = [] - for method in self.fetchers.keys(): - if self.fetchers[method].is_available(track): - sequence.append(self.fetchers[method]) - - logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}") - return sequence - - def fetch_for_track( - self, - track: TrackMeta, - force_method: Optional[FetcherMethodType] = None, - bypass_cache: bool = False, - ) -> Optional[LyricResult]: - """Fetch lyrics for *track* using the fallback pipeline. - - Each source is checked against the cache independently: - - Cache hit with synced lyrics → return immediately - - Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source - - Cache miss or unsynced → call fetcher, then cache the result - - After all sources are tried, returns the best result found - (synced > unsynced > None). - """ - track = enrich_track(track) - logger.info(f"Fetching lyrics for: {track.display_name()}") - - sequence = self._build_sequence(track, force_method) - if not sequence: - return None - - # Best result seen so far (synced wins over unsynced) - best_result: Optional[LyricResult] = None - - for fetcher in sequence: - source = fetcher.source_name - - # Cache check (skip for fetchers that handle their own caching) - if not bypass_cache and not fetcher.self_cached: - cached = self.cache.get(track, source) - if cached: - if cached.status == CacheStatus.SUCCESS_SYNCED: - logger.info(f"[{source}] cache hit: synced lyrics") - return cached - elif cached.status == CacheStatus.SUCCESS_UNSYNCED: - logger.debug( - f"[{source}] cache hit: unsynced lyrics (continuing)" - ) - if best_result is None: - best_result = cached - continue # Try next source for synced - elif cached.status in ( - CacheStatus.NOT_FOUND, - CacheStatus.NETWORK_ERROR, - ): - logger.debug( - f"[{source}] cache hit: {cached.status.value}, skipping" - ) - continue - elif not fetcher.self_cached: - logger.debug(f"[{source}] cache bypassed") - - # Fetch - logger.debug(f"[{source}] calling fetcher...") - result = fetcher.fetch(track, bypass_cache=bypass_cache) - - if not result: - logger.debug(f"[{source}] returned None (no result)") - continue - - # Cache the result (skip for self-cached fetchers) - if not fetcher.self_cached: - ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND) - self.cache.set(track, source, result, ttl_seconds=ttl) - - # Evaluate result - if result.status == CacheStatus.SUCCESS_SYNCED: - logger.info(f"[{source}] got synced lyrics") - return result - - if result.status == CacheStatus.SUCCESS_UNSYNCED: - logger.debug(f"[{source}] got unsynced lyrics (continuing)") - if best_result is None: - best_result = result - - # NOT_FOUND / NETWORK_ERROR: already cached, try next - - # Return best available - if best_result: - # Normalize unsynced lyrics: set all timestamps to [00:00.00] - if ( - best_result.status == CacheStatus.SUCCESS_UNSYNCED - and best_result.lyrics - ): - best_result = LyricResult( - status=best_result.status, - lyrics=normalize_unsynced(best_result.lyrics), - source=best_result.source, - ttl=best_result.ttl, - ) - logger.info( - f"Returning unsynced lyrics from {best_result.source} " - f"(no synced source found)" - ) - else: - logger.info(f"No lyrics found for {track.display_name()}") - - return best_result - - def manual_insert( - self, - track: TrackMeta, - lyrics: str, - ) -> None: - """Manually insert lyrics into the cache for a track.""" - track = enrich_track(track) - logger.info(f"Manually inserting lyrics for: {track.display_name()}") - lyrics = normalize_tags(lyrics) - result = LyricResult( - status=detect_sync_status(lyrics), - lyrics=normalize_tags(lyrics), - source="manual", - ttl=None, - ) - self.cache.set(track, "manual", result, ttl_seconds=None) - logger.info("Lyrics inserted into cache.") diff --git a/lrcfetch/enrichers/__init__.py b/lrcfetch/enrichers/__init__.py deleted file mode 100644 index 25309e2..0000000 --- a/lrcfetch/enrichers/__init__.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-31 06:09:11 -Description: Metadata enrichment pipeline -""" - -from loguru import logger - -from .base import BaseEnricher -from .audio_tag import AudioTagEnricher -from .file_name import FileNameEnricher -from ..models import TrackMeta - -# Enrichers run in order; earlier ones have higher priority. -_ENRICHERS: list[BaseEnricher] = [ - AudioTagEnricher(), - FileNameEnricher(), -] - - -def enrich_track(track: TrackMeta) -> TrackMeta: - """Run all enrichers and return a track with missing fields filled in. - - Each enricher sees the cumulative state (earlier enrichers' results - are already applied). A field is only set if it is currently None. - """ - for enricher in _ENRICHERS: - try: - result = enricher.enrich(track) - except Exception as e: - logger.warning(f"Enricher {enricher.name} failed: {e}") - continue - if not result: - continue - # Only apply fields that are still None - updates = {k: v for k, v in result.items() if getattr(track, k, None) is None} - if updates: - track = track.model_copy(update=updates) - return track diff --git a/lrcfetch/enrichers/audio_tag.py b/lrcfetch/enrichers/audio_tag.py deleted file mode 100644 index 4e9f604..0000000 --- a/lrcfetch/enrichers/audio_tag.py +++ /dev/null @@ -1,78 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-31 06:11:27 -Description: Enricher that reads metadata from audio file tags (mutagen) -""" - -from typing import Optional -from loguru import logger -from mutagen._file import File, FileType - -from .base import BaseEnricher -from ..models import TrackMeta -from ..lrc import get_audio_path - - -class AudioTagEnricher(BaseEnricher): - """Extract title, artist, album, and duration from audio file tags.""" - - @property - def name(self) -> str: - return "audio-tag" - - def enrich(self, track: TrackMeta) -> Optional[dict]: - if not track.is_local or not track.url: - return None - - audio_path = get_audio_path(track.url, ensure_exists=True) - if not audio_path: - return None - - try: - audio = File(audio_path) - except Exception as e: - logger.debug(f"AudioTag: failed to read {audio_path}: {e}") - return None - - if audio is None: - return None - - updates: dict = {} - - # Try common tag names (vorbis comments, ID3, MP4) - title = _first_tag(audio, "title", "TIT2", "\xa9nam") - if title and not track.title: - updates["title"] = title - - artist = _first_tag(audio, "artist", "TPE1", "\xa9ART") - if artist and not track.artist: - updates["artist"] = artist - - album = _first_tag(audio, "album", "TALB", "\xa9alb") - if album and not track.album: - updates["album"] = album - - if not track.length and audio.info and hasattr(audio.info, "length"): - length_ms = int(audio.info.length * 1000) - if length_ms > 0: - updates["length"] = length_ms - - if updates: - logger.debug(f"AudioTag: enriched fields: {list(updates.keys())}") - return updates or None - - -def _first_tag(audio: FileType, *keys: str) -> Optional[str]: - """Return the first non-empty string value found among the given tag keys.""" - if not audio.tags: - return None - for key in keys: - val = audio.tags.get(key) - if val is None: - continue - # mutagen returns lists for vorbis, single values for ID3 - if isinstance(val, list): - val = val[0] if val else None - if val: - return str(val).strip() - return None diff --git a/lrcfetch/enrichers/base.py b/lrcfetch/enrichers/base.py deleted file mode 100644 index f0a09da..0000000 --- a/lrcfetch/enrichers/base.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-31 06:08:16 -Description: Base class for metadata enrichers -""" - -from abc import ABC, abstractmethod -from typing import Optional - -from ..models import TrackMeta - - -class BaseEnricher(ABC): - """Attempts to fill missing fields on a TrackMeta. - - Each enricher inspects the track, and returns a dict of field names - to values for any fields it can provide. Only fields that are - currently ``None`` on the track will actually be applied. - """ - - @property - @abstractmethod - def name(self) -> str: ... - - @abstractmethod - def enrich(self, track: TrackMeta) -> Optional[dict]: - """Return a dict of {field_name: value} for fields this enricher can fill. - - Return None or an empty dict if nothing can be contributed. - """ - ... diff --git a/lrcfetch/enrichers/file_name.py b/lrcfetch/enrichers/file_name.py deleted file mode 100644 index 150cebd..0000000 --- a/lrcfetch/enrichers/file_name.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-31 06:08:44 -Description: Enricher that parses metadata from the audio file path -""" - -import re -from typing import Optional -from loguru import logger - -from .base import BaseEnricher -from ..models import TrackMeta -from ..lrc import get_audio_path - - -# Common track-number prefixes: "01 - ", "01. ", "1 - ", etc. -_TRACK_NUM_RE = re.compile(r"^\d{1,3}[\s.\-]+") - - -class FileNameEnricher(BaseEnricher): - """Derive artist / title from the file path when tags are unavailable. - - Heuristics (applied to the stem of the filename): - - "Artist - Title" → artist, title - - "01 - Title" → title only (leading track number stripped) - - "Title" → title only - - If artist is still missing after parsing the filename, the parent - directory name is used as a guess (common layout: ``Artist/Album/track``). - """ - - @property - def name(self) -> str: - return "file-name" - - def enrich(self, track: TrackMeta) -> Optional[dict]: - if not track.is_local or not track.url: - return None - - audio_path = get_audio_path(track.url, ensure_exists=False) - if not audio_path: - return None - - updates: dict = {} - stem = audio_path.stem - - # Try "Artist - Title" split - if " - " in stem: - left, right = stem.split(" - ", 1) - left = _TRACK_NUM_RE.sub("", left).strip() - right = right.strip() - - if left and right: - # Both sides non-empty after stripping track number - if not track.artist: - updates["artist"] = left - if not track.title: - updates["title"] = right - elif right: - # Left was only a track number → right is the title - if not track.title: - updates["title"] = right - else: - # No separator: strip track number, remainder is title - title_guess = _TRACK_NUM_RE.sub("", stem).strip() - if title_guess and not track.title: - updates["title"] = title_guess - - # Use parent directory as artist fallback - # Typical layout: /Music/Artist/Album/01 - Track.flac - if not track.artist and "artist" not in updates: - parents = audio_path.parents - if len(parents) >= 2: - album_dir = parents[0].name - artist_dir = parents[1].name - if artist_dir and artist_dir not in (".", "/"): - updates["artist"] = artist_dir - if not track.album and album_dir and album_dir != artist_dir: - updates["album"] = album_dir - - if updates: - logger.debug(f"FileName: enriched fields: {list(updates.keys())}") - return updates or None diff --git a/lrcfetch/fetchers/__init__.py b/lrcfetch/fetchers/__init__.py deleted file mode 100644 index 75377d6..0000000 --- a/lrcfetch/fetchers/__init__.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 02:33:26 -Description: Fetcher pipeline — registry and types -""" - -from typing import Literal - -from .base import BaseFetcher -from .local import LocalFetcher -from .cache_search import CacheSearchFetcher -from .spotify import SpotifyFetcher -from .lrclib import LrclibFetcher -from .lrclib_search import LrclibSearchFetcher -from .netease import NeteaseFetcher -from .qqmusic import QQMusicFetcher -from ..cache import CacheEngine - -FetcherMethodType = Literal[ - "local", - "cache-search", - "spotify", - "lrclib", - "lrclib-search", - "netease", - "qqmusic", -] - - -def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]: - """Instantiate all fetchers. Returns a dict keyed by source name.""" - fetchers: dict[FetcherMethodType, BaseFetcher] = { - "local": LocalFetcher(), - "cache-search": CacheSearchFetcher(cache), - "spotify": SpotifyFetcher(), - "lrclib": LrclibFetcher(), - "lrclib-search": LrclibSearchFetcher(), - "netease": NeteaseFetcher(), - "qqmusic": QQMusicFetcher(), - } - return fetchers diff --git a/lrcfetch/fetchers/base.py b/lrcfetch/fetchers/base.py deleted file mode 100644 index 2bf70af..0000000 --- a/lrcfetch/fetchers/base.py +++ /dev/null @@ -1,35 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 02:33:26 -Description: Base fetcher class and common interfaces -""" - -from abc import ABC, abstractmethod -from typing import Optional - -from ..models import TrackMeta, LyricResult - - -class BaseFetcher(ABC): - @property - @abstractmethod - def source_name(self) -> str: - """Name of the fetcher source.""" - pass - - @property - def self_cached(self) -> bool: - """True if this fetcher manages its own cache (skip per-source cache check).""" - return False - - @abstractmethod - def is_available(self, track: TrackMeta) -> bool: - """Check if the fetcher is available for the given track (e.g. has required metadata).""" - pass - - @abstractmethod - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - """Fetch lyrics for the given track. Returns None if unable to fetch.""" - pass diff --git a/lrcfetch/fetchers/cache_search.py b/lrcfetch/fetchers/cache_search.py deleted file mode 100644 index af973c5..0000000 --- a/lrcfetch/fetchers/cache_search.py +++ /dev/null @@ -1,85 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-28 05:57:46 -Description: Cache-search fetcher — cross-album fuzzy lookup in the local cache -""" - -""" -Searches existing cache entries by artist + title with fuzzy normalization, -ignoring album and source. Useful when the same track appears on different -albums or is played from different players. -""" - -from typing import Optional -from loguru import logger - -from .base import BaseFetcher -from ..models import TrackMeta, LyricResult, CacheStatus -from ..cache import CacheEngine - - -class CacheSearchFetcher(BaseFetcher): - def __init__(self, cache: CacheEngine) -> None: - self._cache = cache - - @property - def source_name(self) -> str: - return "cache-search" - - @property - def self_cached(self) -> bool: - return True - - def is_available(self, track: TrackMeta) -> bool: - return bool(track.title) - - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - if bypass_cache: - logger.debug("Cache-search: bypassed by caller") - return None - - if not track.title: - logger.debug("Cache-search: skipped — no title") - return None - - # Fast path: exact metadata match (artist+title+album), single SQL query - exact = self._cache.find_best_positive(track) - if exact: - logger.info(f"Cache-search: exact hit ({exact.status.value})") - return exact - - # Slow path: fuzzy cross-album search - matches = self._cache.search_by_meta( - artist=track.artist, - title=track.title, - length=track.length, - ) - - if not matches: - logger.debug(f"Cache-search: no match for {track.display_name()}") - return None - - # Pick best: prefer synced, then first available - best = None - for m in matches: - if m.get("status") == CacheStatus.SUCCESS_SYNCED.value: - best = m - break - if best is None: - best = m - - if not best or not best.get("lyrics"): - return None - - status = CacheStatus(best["status"]) - logger.info( - f"Cache-search: fuzzy hit from [{best.get('source')}] " - f"album={best.get('album')!r} ({status.value})" - ) - return LyricResult( - status=status, - lyrics=best["lyrics"], - source=self.source_name, - ) diff --git a/lrcfetch/fetchers/local.py b/lrcfetch/fetchers/local.py deleted file mode 100644 index 82e3ecd..0000000 --- a/lrcfetch/fetchers/local.py +++ /dev/null @@ -1,98 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-26 02:08:41 -Description: Local fetcher — reads lyrics from .lrc sidecar files or embedded audio metadata -""" - -""" -Priority: - 1. Same-directory .lrc file (e.g. /path/to/track.lrc) - 2. Embedded lyrics in audio metadata (FLAC, MP3 USLT/SYLT tags) -""" - -from typing import Optional -from loguru import logger -from mutagen._file import File -from mutagen.flac import FLAC - -from .base import BaseFetcher -from ..models import TrackMeta, LyricResult -from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path - - -class LocalFetcher(BaseFetcher): - @property - def source_name(self) -> str: - return "local" - - def is_available(self, track: TrackMeta) -> bool: - return track.is_local - - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - """Attempt to read lyrics from local filesystem.""" - if not track.is_local or not track.url: - return None - - audio_path = get_audio_path(track.url, ensure_exists=False) - if not audio_path: - logger.debug(f"Local: audio URL is not a valid file path: {track.url}") - return None - - lrc_path = get_sidecar_path( - track.url, ensure_audio_exists=False, ensure_exists=True - ) - if lrc_path: - try: - with open(lrc_path, "r", encoding="utf-8") as f: - content = f.read().strip() - if content: - content = normalize_tags(content) - status = detect_sync_status(content) - logger.info(f"Local: found .lrc sidecar ({status.value})") - return LyricResult( - status=status, lyrics=content, source=self.source_name - ) - except Exception as e: - logger.error(f"Local: error reading {lrc_path}: {e}") - else: - logger.debug(f"Local: no .lrc sidecar found for {audio_path}") - - # Embedded metadata - if not audio_path.exists(): - logger.debug(f"Local: audio file does not exist: {audio_path}") - return None - try: - audio = File(audio_path) - if audio is not None: - lyrics = None - - if isinstance(audio, FLAC): - # FLAC stores lyrics in vorbis comment tags - lyrics = ( - audio.get("lyrics") or audio.get("unsynclyrics") or [None] - )[0] - elif hasattr(audio, "tags") and audio.tags: - # MP3 / other: look for USLT or SYLT ID3 frames - for key in audio.tags.keys(): - if key.startswith("USLT") or key.startswith("SYLT"): - lyrics = str(audio.tags[key]) - break - - if lyrics: - lyrics = normalize_tags(lyrics.strip()) - status = detect_sync_status(lyrics) - logger.info(f"Local: found embedded lyrics ({status.value})") - return LyricResult( - status=status, - lyrics=lyrics, - source=f"{self.source_name} (embedded)", - ) - else: - logger.debug("Local: no embedded lyrics found") - except Exception as e: - logger.error(f"Local: error reading metadata for {audio_path}: {e}") - - logger.debug(f"Local: no lyrics found for {audio_path}") - return None diff --git a/lrcfetch/fetchers/lrclib.py b/lrcfetch/fetchers/lrclib.py deleted file mode 100644 index 94928d1..0000000 --- a/lrcfetch/fetchers/lrclib.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 05:23:38 -Description: LRCLIB fetcher — queries lrclib.net for synced/plain lyrics -""" - -""" -Requires complete track metadata (artist, title, album, duration). -""" - -from typing import Optional -import httpx -from loguru import logger -from urllib.parse import urlencode - -from .base import BaseFetcher -from ..models import TrackMeta, LyricResult, CacheStatus -from ..lrc import normalize_tags -from ..config import ( - HTTP_TIMEOUT, - TTL_UNSYNCED, - TTL_NOT_FOUND, - TTL_NETWORK_ERROR, - LRCLIB_API_URL, - UA_LRCFETCH, -) - - -class LrclibFetcher(BaseFetcher): - @property - def source_name(self) -> str: - return "lrclib" - - def is_available(self, track: TrackMeta) -> bool: - return track.is_complete - - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - """Fetch lyrics from LRCLIB. Requires complete metadata.""" - if not track.is_complete: - logger.debug("LRCLIB: skipped — incomplete metadata") - return None - - params = { - "track_name": track.title, - "artist_name": track.artist, - "album_name": track.album, - "duration": track.length / 1000.0 if track.length else 0, - } - - url = f"{LRCLIB_API_URL}?{urlencode(params)}" - logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}") - - try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.get(url, headers={"User-Agent": UA_LRCFETCH}) - - if resp.status_code == 404: - logger.debug(f"LRCLIB: not found for {track.display_name()}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - if resp.status_code != 200: - logger.error(f"LRCLIB: API returned {resp.status_code}") - return LyricResult( - status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR - ) - - data = resp.json() - - # Validate response - if not isinstance(data, dict): - logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}") - return LyricResult( - status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR - ) - - synced = data.get("syncedLyrics") - unsynced = data.get("plainLyrics") - - if isinstance(synced, str) and synced.strip(): - lyrics = normalize_tags(synced.strip()) - logger.info( - f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)" - ) - return LyricResult( - status=CacheStatus.SUCCESS_SYNCED, - lyrics=lyrics, - source=self.source_name, - ) - elif isinstance(unsynced, str) and unsynced.strip(): - lyrics = normalize_tags(unsynced.strip()) - logger.info( - f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)" - ) - return LyricResult( - status=CacheStatus.SUCCESS_UNSYNCED, - lyrics=lyrics, - source=self.source_name, - ttl=TTL_UNSYNCED, - ) - else: - logger.debug(f"LRCLIB: empty response for {track.display_name()}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - except httpx.HTTPError as e: - logger.error(f"LRCLIB: HTTP error: {e}") - return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) - except Exception as e: - logger.error(f"LRCLIB: unexpected error: {e}") - return None diff --git a/lrcfetch/fetchers/lrclib_search.py b/lrcfetch/fetchers/lrclib_search.py deleted file mode 100644 index 95f87c2..0000000 --- a/lrcfetch/fetchers/lrclib_search.py +++ /dev/null @@ -1,168 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 05:30:50 -Description: LRCLIB search fetcher — fuzzy search via lrclib.net /api/search -""" - -""" -Used when metadata is incomplete (no album or duration) but title is available. -Selects the best match by duration when track length is known. -""" - -import httpx -from typing import Optional -from loguru import logger -from urllib.parse import urlencode - -from .base import BaseFetcher -from ..models import TrackMeta, LyricResult, CacheStatus -from ..lrc import normalize_tags -from ..config import ( - HTTP_TIMEOUT, - TTL_UNSYNCED, - TTL_NOT_FOUND, - TTL_NETWORK_ERROR, - DURATION_TOLERANCE_MS, - LRCLIB_SEARCH_URL, - UA_LRCFETCH, -) - - -class LrclibSearchFetcher(BaseFetcher): - @property - def source_name(self) -> str: - return "lrclib-search" - - def is_available(self, track: TrackMeta) -> bool: - return bool(track.title) - - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - """Search LRCLIB for lyrics. Requires at least a title.""" - if not track.title: - logger.debug("LRCLIB-search: skipped — no title") - return None - - params: dict[str, str] = {"track_name": track.title} - if track.artist: - params["artist_name"] = track.artist - if track.album: - params["album_name"] = track.album - - url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}" - logger.info(f"LRCLIB-search: searching for {track.display_name()}") - - try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.get(url, headers={"User-Agent": UA_LRCFETCH}) - - if resp.status_code != 200: - logger.error(f"LRCLIB-search: API returned {resp.status_code}") - return LyricResult( - status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR - ) - - data = resp.json() - - if not isinstance(data, list) or len(data) == 0: - logger.debug(f"LRCLIB-search: no results for {track.display_name()}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - logger.debug(f"LRCLIB-search: got {len(data)} candidates") - - # Select best match by duration - best = self._select_best(data, track) - if best is None: - logger.debug("LRCLIB-search: no valid candidate found") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - # Extract lyrics - synced = best.get("syncedLyrics") - unsynced = best.get("plainLyrics") - - if isinstance(synced, str) and synced.strip(): - lyrics = normalize_tags(synced.strip()) - logger.info( - f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)" - ) - return LyricResult( - status=CacheStatus.SUCCESS_SYNCED, - lyrics=lyrics, - source=self.source_name, - ) - elif isinstance(unsynced, str) and unsynced.strip(): - lyrics = normalize_tags(unsynced.strip()) - logger.info( - f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)" - ) - return LyricResult( - status=CacheStatus.SUCCESS_UNSYNCED, - lyrics=lyrics, - source=self.source_name, - ttl=TTL_UNSYNCED, - ) - else: - logger.debug("LRCLIB-search: best candidate has empty lyrics") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - except httpx.HTTPError as e: - logger.error(f"LRCLIB-search: HTTP error: {e}") - return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) - except Exception as e: - logger.error(f"LRCLIB-search: unexpected error: {e}") - return None - - @staticmethod - def _select_best(candidates: list[dict], track: TrackMeta) -> Optional[dict]: - """Pick the best candidate, preferring synced lyrics and closest duration.""" - if track.length is not None: - track_s = track.length / 1000.0 - best: Optional[dict] = None - best_diff = float("inf") - - for item in candidates: - if not isinstance(item, dict): - continue - duration = item.get("duration") - if not isinstance(duration, (int, float)): - continue - diff = abs(duration - track_s) * 1000 # compare in ms - if diff > DURATION_TOLERANCE_MS: - continue - # Prefer synced over unsynced at similar duration - has_synced = ( - isinstance(item.get("syncedLyrics"), str) - and item["syncedLyrics"].strip() - ) - best_synced = ( - best is not None - and isinstance(best.get("syncedLyrics"), str) - and best["syncedLyrics"].strip() - ) - if diff < best_diff or ( - diff == best_diff and has_synced and not best_synced - ): - best_diff = diff - best = item - - if best is not None: - logger.debug( - f"LRCLIB-search: selected id={best.get('id')} (diff={best_diff:.0f}ms)" - ) - return best - - logger.debug( - f"LRCLIB-search: no candidate within {DURATION_TOLERANCE_MS}ms" - ) - return None - - # No duration — pick first with synced lyrics, or just first - for item in candidates: - if ( - isinstance(item, dict) - and isinstance(item.get("syncedLyrics"), str) - and item["syncedLyrics"].strip() - ): - return item - return candidates[0] if isinstance(candidates[0], dict) else None diff --git a/lrcfetch/fetchers/netease.py b/lrcfetch/fetchers/netease.py deleted file mode 100644 index eecc8d7..0000000 --- a/lrcfetch/fetchers/netease.py +++ /dev/null @@ -1,213 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 11:04:51 -Description: Netease Cloud Music fetcher -""" - -""" -Uses the public cloudsearch API for searching and the song/lyric API for -retrieving lyrics. No authentication required. - -Search results are filtered by duration when the track has a known length -to avoid returning lyrics for the wrong version of a song. -""" - -from typing import Optional -import httpx -from loguru import logger - -from .base import BaseFetcher -from ..models import TrackMeta, LyricResult, CacheStatus -from ..lrc import detect_sync_status, normalize_tags -from ..config import ( - HTTP_TIMEOUT, - TTL_NOT_FOUND, - TTL_NETWORK_ERROR, - DURATION_TOLERANCE_MS, - NETEASE_SEARCH_URL, - NETEASE_LYRIC_URL, - UA_BROWSER, -) - -_HEADERS = { - "User-Agent": UA_BROWSER, - "Referer": "https://music.163.com/", -} - - -class NeteaseFetcher(BaseFetcher): - @property - def source_name(self) -> str: - return "netease" - - def is_available(self, track: TrackMeta) -> bool: - return bool(track.title) - - def _search(self, track: TrackMeta, limit: int = 10) -> Optional[int]: - """Search Netease and return the best-matching song ID. - - When ``track.length`` is available, candidates are ranked by duration - difference and only accepted if within ``DURATION_TOLERANCE_MS``. - """ - query = f"{track.artist or ''} {track.title or ''}".strip() - if not query: - return None - - logger.debug(f"Netease: searching for '{query}' (limit={limit})") - - try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.post( - NETEASE_SEARCH_URL, - headers=_HEADERS, - data={"s": query, "type": "1", "limit": str(limit), "offset": "0"}, - ) - resp.raise_for_status() - result = resp.json() - - # Validate response - if not isinstance(result, dict): - logger.error( - f"Netease: search returned non-dict: {type(result).__name__}" - ) - return None - - result_body = result.get("result") - if not isinstance(result_body, dict): - logger.debug("Netease: search 'result' field missing or invalid") - return None - - songs = result_body.get("songs") - if not isinstance(songs, list) or len(songs) == 0: - logger.debug("Netease: search returned 0 results") - return None - - logger.debug(f"Netease: search returned {len(songs)} candidates") - - # Duration-based best-match selection - if track.length is not None: - track_ms = track.length - best_id: Optional[int] = None - best_diff = float("inf") - - for song in songs: - if not isinstance(song, dict): - continue - sid = song.get("id") - name = song.get("name", "?") - duration = song.get("dt") # milliseconds - if not isinstance(duration, int): - logger.debug( - f" candidate {sid} '{name}': no duration, skipped" - ) - continue - diff = abs(duration - track_ms) - logger.debug( - f" candidate {sid} '{name}': " - f"duration={duration}ms, diff={diff}ms" - ) - if diff < best_diff: - best_diff = diff - best_id = sid - - if best_id is not None and best_diff <= DURATION_TOLERANCE_MS: - logger.debug(f"Netease: selected id={best_id} (diff={best_diff}ms)") - return best_id - - logger.debug( - f"Netease: no candidate within {DURATION_TOLERANCE_MS}ms " - f"(best diff={best_diff}ms)" - ) - return None - - # No duration info — take the first result - first = songs[0] - if not isinstance(first, dict) or "id" not in first: - logger.error("Netease: first search result has no 'id'") - return None - logger.debug( - f"Netease: no duration available, using first result " - f"id={first['id']} '{first.get('name', '?')}'" - ) - return first["id"] - - except Exception as e: - logger.error(f"Netease: search failed: {e}") - return None - - def _get_lyric(self, song_id: int) -> Optional[LyricResult]: - """Fetch lyrics for a given Netease song ID.""" - logger.debug(f"Netease: fetching lyrics for song_id={song_id}") - - try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.post( - NETEASE_LYRIC_URL, - headers=_HEADERS, - data={ - "id": str(song_id), - "cp": "false", - "tv": "0", - "lv": "0", - "rv": "0", - "kv": "0", - "yv": "0", - "ytv": "0", - "yrv": "0", - }, - ) - resp.raise_for_status() - data = resp.json() - - # Validate response - if not isinstance(data, dict): - logger.error( - f"Netease: lyric response is not dict: {type(data).__name__}" - ) - return LyricResult( - status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR - ) - - lrc_obj = data.get("lrc") - if not isinstance(lrc_obj, dict): - logger.debug( - f"Netease: no 'lrc' object in response for song_id={song_id}" - ) - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - lrc: str = lrc_obj.get("lyric", "") - if not isinstance(lrc, str) or not lrc.strip(): - logger.debug(f"Netease: empty lyrics for song_id={song_id}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - # Determine sync status - lrc = normalize_tags(lrc) - status = detect_sync_status(lrc) - logger.info( - f"Netease: got {status.value} lyrics for song_id={song_id} " - f"({len(lrc.splitlines())} lines)" - ) - return LyricResult( - status=status, lyrics=lrc.strip(), source=self.source_name - ) - - except Exception as e: - logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}") - return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) - - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - """Search for the track and fetch its lyrics.""" - query = f"{track.artist or ''} {track.title or ''}".strip() - if not query: - logger.debug("Netease: skipped — insufficient metadata") - return None - - logger.info(f"Netease: fetching lyrics for {track.display_name()}") - song_id = self._search(track) - if not song_id: - logger.debug(f"Netease: no match found for {track.display_name()}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - return self._get_lyric(song_id) diff --git a/lrcfetch/fetchers/qqmusic.py b/lrcfetch/fetchers/qqmusic.py deleted file mode 100644 index a5d0b63..0000000 --- a/lrcfetch/fetchers/qqmusic.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-31 01:54:02 -Description: QQ Music fetcher via self-hosted API proxy -""" - -""" -Requires a running qq-music-api instance. -The base URL is read from the QQ_MUSIC_API_URL environment variable. - -Search → pick best match by duration → fetch LRC lyrics. -""" - -from typing import Optional -import httpx -from loguru import logger - -from .base import BaseFetcher -from ..models import TrackMeta, LyricResult, CacheStatus -from ..lrc import detect_sync_status, normalize_tags -from ..config import ( - HTTP_TIMEOUT, - TTL_NOT_FOUND, - TTL_NETWORK_ERROR, - DURATION_TOLERANCE_MS, - QQ_MUSIC_API_URL, -) - - -class QQMusicFetcher(BaseFetcher): - @property - def source_name(self) -> str: - return "qqmusic" - - def is_available(self, track: TrackMeta) -> bool: - return bool(track.title) and bool(QQ_MUSIC_API_URL) - - def _search(self, track: TrackMeta, limit: int = 10) -> Optional[str]: - """Search QQ Music and return the best-matching song MID.""" - query = f"{track.artist or ''} {track.title or ''}".strip() - if not query: - return None - - logger.debug(f"QQMusic: searching for '{query}' (limit={limit})") - - try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.get( - f"{QQ_MUSIC_API_URL}/api/search", - params={"keyword": query, "type": "song", "num": limit}, - ) - resp.raise_for_status() - data = resp.json() - - if data.get("code") != 0: - logger.error(f"QQMusic: search API error: {data}") - return None - - songs = data.get("data", {}).get("list", []) - if not songs: - logger.debug("QQMusic: search returned 0 results") - return None - - logger.debug(f"QQMusic: search returned {len(songs)} candidates") - - # Duration-based best-match selection - if track.length is not None: - track_ms = track.length - best_mid: Optional[str] = None - best_diff = float("inf") - - for song in songs: - if not isinstance(song, dict): - continue - mid = song.get("mid") - name = song.get("name", "?") - # interval is in seconds - interval = song.get("interval") - if not isinstance(interval, int): - logger.debug( - f" candidate {mid} '{name}': no duration, skipped" - ) - continue - duration_ms = interval * 1000 - diff = abs(duration_ms - track_ms) - logger.debug( - f" candidate {mid} '{name}': " - f"duration={duration_ms}ms, diff={diff}ms" - ) - if diff < best_diff: - best_diff = diff - best_mid = mid - - if best_mid is not None and best_diff <= DURATION_TOLERANCE_MS: - logger.debug( - f"QQMusic: selected mid={best_mid} (diff={best_diff}ms)" - ) - return best_mid - - logger.debug( - f"QQMusic: no candidate within {DURATION_TOLERANCE_MS}ms " - f"(best diff={best_diff}ms)" - ) - return None - - # No duration info — take the first result - first = songs[0] - if not isinstance(first, dict) or "mid" not in first: - logger.error("QQMusic: first search result has no 'mid'") - return None - logger.debug( - f"QQMusic: no duration available, using first result " - f"mid={first['mid']} '{first.get('name', '?')}'" - ) - return first["mid"] - - except Exception as e: - logger.error(f"QQMusic: search failed: {e}") - return None - - def _get_lyric(self, mid: str) -> Optional[LyricResult]: - """Fetch lyrics for a given QQ Music song MID.""" - logger.debug(f"QQMusic: fetching lyrics for mid={mid}") - - try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - resp = client.get( - f"{QQ_MUSIC_API_URL}/api/lyric", - params={"mid": mid}, - ) - resp.raise_for_status() - data = resp.json() - - if data.get("code") != 0: - logger.error(f"QQMusic: lyric API error: {data}") - return LyricResult( - status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR - ) - - lrc = data.get("data", {}).get("lyric", "") - if not isinstance(lrc, str) or not lrc.strip(): - logger.debug(f"QQMusic: empty lyrics for mid={mid}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - lrc = normalize_tags(lrc) - status = detect_sync_status(lrc) - logger.info( - f"QQMusic: got {status.value} lyrics for mid={mid} " - f"({len(lrc.splitlines())} lines)" - ) - return LyricResult( - status=status, lyrics=lrc.strip(), source=self.source_name - ) - - except Exception as e: - logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}") - return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) - - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - """Search for the track and fetch its lyrics.""" - if not QQ_MUSIC_API_URL: - logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured") - return None - - query = f"{track.artist or ''} {track.title or ''}".strip() - if not query: - logger.debug("QQMusic: skipped — insufficient metadata") - return None - - logger.info(f"QQMusic: fetching lyrics for {track.display_name()}") - mid = self._search(track) - if not mid: - logger.debug(f"QQMusic: no match found for {track.display_name()}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - return self._get_lyric(mid) diff --git a/lrcfetch/fetchers/spotify.py b/lrcfetch/fetchers/spotify.py deleted file mode 100644 index fcd568c..0000000 --- a/lrcfetch/fetchers/spotify.py +++ /dev/null @@ -1,373 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 10:43:21 -Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API. -""" - -""" -Authentication flow: - 1. Fetch server time from Spotify - 2. Fetch TOTP secret - 3. Generate a TOTP code and exchange it (with SP_DC cookie) for an access token - 4. Request lyrics using the access token - -The secret and token are cached on the instance to avoid redundant network -calls within the same session. - -Requires SPOTIFY_SP_DC environment variable to be set. -""" - -import httpx -import json -import time -import struct -import hmac -import hashlib -from typing import Optional, Tuple -from loguru import logger - -from .base import BaseFetcher -from ..models import TrackMeta, LyricResult, CacheStatus -from ..lrc import normalize_tags -from ..config import ( - HTTP_TIMEOUT, - SPOTIFY_APP_VERSION, - TTL_NOT_FOUND, - TTL_NETWORK_ERROR, - SPOTIFY_TOKEN_URL, - SPOTIFY_LYRICS_URL, - SPOTIFY_SERVER_TIME_URL, - SPOTIFY_SECRET_URL, - SPOTIFY_SP_DC, - SPOTIFY_TOKEN_CACHE_FILE, - UA_BROWSER, -) - - -class SpotifyFetcher(BaseFetcher): - def __init__(self) -> None: - # Session-level caches to avoid refetching within the same run - self._cached_secret: Optional[Tuple[str, int]] = None - self._cached_token: Optional[str] = None - self._token_expires_at: float = 0.0 - - @property - def source_name(self) -> str: - return "spotify" - - def is_available(self, track: TrackMeta) -> bool: - return bool(track.trackid) and bool(SPOTIFY_SP_DC) - - # ─── Auth helpers ──────────────────────────────────────────────── - - def _get_server_time(self, client: httpx.Client) -> Optional[int]: - """Fetch Spotify's server timestamp (seconds since epoch).""" - try: - res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT) - res.raise_for_status() - data = res.json() - if not isinstance(data, dict) or "serverTime" not in data: - logger.error(f"Spotify: unexpected server-time response: {data}") - return None - server_time = data["serverTime"] - logger.debug(f"Spotify: server time = {server_time}") - return server_time - except Exception as e: - logger.error(f"Spotify: failed to fetch server time: {e}") - return None - - def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]: - """Fetch and decode the TOTP secret. Cached after first success. - - Response format: [{version: int, secret: str}, ...] - Each character in *secret* is XOR-decoded with ``(index % 33) + 9``. - """ - if self._cached_secret is not None: - logger.debug("Spotify: using cached TOTP secret") - return self._cached_secret - - try: - res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT) - res.raise_for_status() - data = res.json() - - if not isinstance(data, list) or len(data) == 0: - logger.error( - f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})" - ) - return None - - last = data[-1] - if "secret" not in last or "version" not in last: - logger.error(f"Spotify: malformed secret entry: {list(last.keys())}") - return None - - secret_raw = last["secret"] - version = last["version"] - - # XOR decode - parts = [] - for i, char in enumerate(secret_raw): - parts.append(str(ord(char) ^ ((i % 33) + 9))) - secret = "".join(parts) - - logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})") - self._cached_secret = (secret, version) - return self._cached_secret - - except Exception as e: - logger.error(f"Spotify: failed to fetch secret: {e}") - return None - - @staticmethod - def _generate_totp(server_time_s: int, secret: str) -> str: - """Generate a 6-digit TOTP code compatible with Spotify's auth. - - Uses HMAC-SHA1 with a 30-second period, matching the Go reference. - """ - counter = server_time_s // 30 - counter_bytes = struct.pack(">Q", counter) - - mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest() - - offset = mac[-1] & 0x0F - binary_code = ( - (mac[offset] & 0x7F) << 24 - | (mac[offset + 1] & 0xFF) << 16 - | (mac[offset + 2] & 0xFF) << 8 - | (mac[offset + 3] & 0xFF) - ) - - code = binary_code % (10**6) - return str(code).zfill(6) - - def _load_cached_token(self) -> Optional[str]: - """Try to load a valid token from the persistent cache file.""" - try: - with open(SPOTIFY_TOKEN_CACHE_FILE, "r") as f: - data = json.load(f) - expires_ms = data.get("accessTokenExpirationTimestampMs", 0) - if expires_ms <= int(time.time() * 1000): - logger.debug("Spotify: persisted token expired") - return None - token = data.get("accessToken", "") - if not token: - return None - self._cached_token = token - self._token_expires_at = expires_ms / 1000.0 - logger.debug("Spotify: loaded token from cache file") - return token - except (FileNotFoundError, json.JSONDecodeError, KeyError): - return None - - def _save_token(self, body: dict) -> None: - """Persist the token response to disk.""" - try: - with open(SPOTIFY_TOKEN_CACHE_FILE, "w") as f: - json.dump(body, f) - logger.debug("Spotify: token saved to cache file") - except Exception as e: - logger.warning(f"Spotify: failed to write token cache: {e}") - - def _get_token(self) -> Optional[str]: - """Obtain a Spotify access token. Cached in memory and on disk. - - Requires SP_DC cookie (set via SPOTIFY_SP_DC env var). - """ - # 1. Memory cache - if self._cached_token and time.time() < self._token_expires_at - 30: - logger.debug("Spotify: using in-memory cached token") - return self._cached_token - - # 2. Disk cache - disk_token = self._load_cached_token() - if disk_token and time.time() < self._token_expires_at - 30: - return disk_token - - # 3. Fetch new token - if not SPOTIFY_SP_DC: - logger.error( - "Spotify: SPOTIFY_SP_DC env var not set — " - "cannot authenticate with Spotify" - ) - return None - - headers = { - "User-Agent": UA_BROWSER, - "Accept": "*/*", - "Referer": "https://open.spotify.com/", - "Cookie": f"sp_dc={SPOTIFY_SP_DC}", - } - - with httpx.Client(headers=headers) as client: - server_time = self._get_server_time(client) - if server_time is None: - return None - - secret_data = self._get_secret(client) - if secret_data is None: - return None - - secret, version = secret_data - totp = self._generate_totp(server_time, secret) - logger.debug(f"Spotify: generated TOTP v{version}: {totp}") - - params = { - "reason": "init", - "productType": "web-player", - "totp": totp, - "totpVer": str(version), - "totpServer": totp, - } - - try: - res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT) - if res.status_code != 200: - logger.error(f"Spotify: token request returned {res.status_code}") - return None - - body = res.json() - - if not isinstance(body, dict) or "accessToken" not in body: - logger.error( - f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}" - ) - return None - - token = body["accessToken"] - is_anonymous = body.get("isAnonymous", False) - if is_anonymous: - logger.warning( - "Spotify: received anonymous token — SP_DC may be invalid" - ) - - expires_ms = body.get("accessTokenExpirationTimestampMs", 0) - if expires_ms and expires_ms > int(time.time() * 1000): - self._token_expires_at = expires_ms / 1000.0 - else: - logger.warning("Spotify: token expiry missing or invalid") - self._token_expires_at = time.time() + 3600 - - self._cached_token = token - # Persist to disk (including anonymous tokens, same as Go ref) - self._save_token(body) - logger.debug("Spotify: obtained access token") - return token - - except Exception as e: - logger.error(f"Spotify: token request failed: {e}") - return None - - # ─── Lyrics ────────────────────────────────────────────────────── - - @staticmethod - def _format_lrc_line(start_ms: int, words: str) -> str: - """Format a single lyric line as LRC ``[mm:ss.cc]text``.""" - minutes = start_ms // 60000 - seconds = (start_ms // 1000) % 60 - centiseconds = round((start_ms % 1000) / 10.0) - return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}" - - @staticmethod - def _is_truly_synced(lines: list[dict]) -> bool: - """Check if lyrics are actually synced (not all timestamps zero).""" - for line in lines: - try: - ms = int(line.get("startTimeMs", "0")) - if ms > 0: - return True - except (ValueError, TypeError): - continue - return False - - def fetch( - self, track: TrackMeta, bypass_cache: bool = False - ) -> Optional[LyricResult]: - """Fetch lyrics for a Spotify track by its track ID.""" - if not track.trackid: - logger.debug("Spotify: skipped — no trackid in metadata") - return None - - logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}") - - token = self._get_token() - if not token: - logger.error("Spotify: cannot fetch lyrics without a token") - return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) - - url = f"{SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token" - headers = { - "User-Agent": UA_BROWSER, - "Accept": "application/json", - "Authorization": f"Bearer {token}", - "Referer": "https://open.spotify.com/", - "App-Platform": "WebPlayer", - "Spotify-App-Version": SPOTIFY_APP_VERSION, - "Origin": "https://open.spotify.com", - } - - try: - with httpx.Client(timeout=HTTP_TIMEOUT) as client: - res = client.get(url, headers=headers) - - if res.status_code == 404: - logger.debug(f"Spotify: 404 for trackid={track.trackid}") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - if res.status_code != 200: - logger.error(f"Spotify: lyrics API returned {res.status_code}") - return LyricResult( - status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR - ) - - data = res.json() - - # Validate response structure - if not isinstance(data, dict) or "lyrics" not in data: - logger.error("Spotify: unexpected lyrics response structure") - return LyricResult( - status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR - ) - - lyrics_data = data["lyrics"] - sync_type = lyrics_data.get("syncType", "") - lines = lyrics_data.get("lines", []) - - if not isinstance(lines, list) or len(lines) == 0: - logger.debug("Spotify: response contained no lyric lines") - return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) - - # Determine sync status - # syncType == "LINE_SYNCED" AND at least one non-zero timestamp - is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines) - - # Convert to LRC - lrc_lines: list[str] = [] - for line in lines: - words = line.get("words", "") - if not isinstance(words, str): - continue - try: - ms = int(line.get("startTimeMs", "0")) - except (ValueError, TypeError): - ms = 0 - - if is_synced: - lrc_lines.append(self._format_lrc_line(ms, words)) - else: - # Unsynced: emit with zero timestamps - lrc_lines.append(f"[00:00.00]{words}") - - content = normalize_tags("\n".join(lrc_lines)) - status = ( - CacheStatus.SUCCESS_SYNCED - if is_synced - else CacheStatus.SUCCESS_UNSYNCED - ) - - logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)") - return LyricResult(status=status, lyrics=content, source=self.source_name) - - except Exception as e: - logger.error(f"Spotify: lyrics fetch failed: {e}") - return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR) diff --git a/lrcfetch/lrc.py b/lrcfetch/lrc.py deleted file mode 100644 index 6913512..0000000 --- a/lrcfetch/lrc.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 21:54:01 -Description: Shared LRC time-tag utilities (definitely overengineered) -""" - -import re -from pathlib import Path -from typing import Optional -from urllib.parse import unquote - -from .models import CacheStatus - -# Parses any time tag input format: -# [mm:ss], [mm:ss.c], [mm:ss.cc], [mm:ss.ccc], [mm:ss:cc], … -_RAW_TAG_RE = re.compile(r"\[(\d{2,}):(\d{2})(?:[.:](\d{1,3}))?\]") - -# Standard format after normalization: [mm:ss.cc] -_STD_TAG_RE = re.compile(r"\[\d{2,}:\d{2}\.\d{2}\]") - -# Standard format with capture groups -_STD_TAG_CAPTURE_RE = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2})\]") - -# Matches a standard time tag at the start of a line -_LRC_LINE_RE = re.compile(r"^\[\d{2,}:\d{2}\.\d{2}\]", re.MULTILINE) - -# [offset:+/-xxx] tag — value in milliseconds -_OFFSET_RE = re.compile(r"^\[offset:\s*([+-]?\d+)\]\s*$", re.MULTILINE | re.IGNORECASE) - - -def _raw_tag_to_cs(mm: str, ss: str, frac: Optional[str]) -> str: - """Convert parsed time tag components to standard [mm:ss.cc] string.""" - if frac is None: - ms = 0 - else: - # cc in [mm:ss:cc] is also treated as centiseconds, per LRC spec - # ^ - # why does this format even exist, idk - n = len(frac) - if n == 1: - ms = int(frac) * 100 - elif n == 2: - ms = int(frac) * 10 - else: - ms = int(frac) - cs = min(round(ms / 10), 99) - return f"[{mm}:{ss}.{cs:02d}]" - - -def _reformat(text: str) -> str: - """Parse each line and reformat to standard [mm:ss.cc]...content form. - - Handles any mix of time tag formats on input. Lines with no time tags - are stripped of leading/trailing whitespace and passed through unchanged. - """ - out: list[str] = [] - for line in text.splitlines(): - line = line.strip() - pos = 0 - tags: list[str] = [] - while True: - while pos < len(line) and line[pos] == " ": - pos += 1 - m = _RAW_TAG_RE.match(line, pos) - # Non-time tags are passed through as-is, except for leading/trailing whitespace which is stripped. - if not m: - # No more tags on this line - break - tags.append(_raw_tag_to_cs(m.group(1), m.group(2), m.group(3))) - pos = m.end() - if tags: - # This could break lyric lines of some kind of word-synced LRC format, - # but such format were not planned to be supported in the first place, so… - out.append("".join(tags) + line[pos:].lstrip()) - else: - out.append(line) - # Empty lines with no tags are also preserved - return "\n".join(out) - - -def _apply_offset(text: str) -> str: - """Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly. - - Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps). - """ - m = _OFFSET_RE.search(text) - if not m: - return text - offset_ms = int(m.group(1)) - text = _OFFSET_RE.sub("", text).strip("\n") - if offset_ms == 0: - return text - - def _shift(match: re.Match) -> str: - total_ms = max( - 0, - (int(match.group(1)) * 60 + int(match.group(2))) * 1000 - + int(match.group(3)) * 10 - - offset_ms, - ) - new_mm = total_ms // 60000 - new_ss = (total_ms % 60000) // 1000 - new_cs = min(round((total_ms % 1000) / 10), 99) - return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]" - - return _STD_TAG_CAPTURE_RE.sub(_shift, text) - - -def normalize_tags(text: str) -> str: - """Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset.""" - return _apply_offset(_reformat(text)) - - -def is_synced(text: str) -> bool: - """Check whether text contains non-zero LRC time tags. - - Assumes text has been normalized by normalize_tags (standard [mm:ss.cc] format). - """ - tags = _STD_TAG_RE.findall(text) - return bool(tags) and any(tag != "[00:00.00]" for tag in tags) - - -def detect_sync_status(text: str) -> CacheStatus: - """Determine whether lyrics contain meaningful LRC time tags. - - Assumes text has been normalized by normalize_tags. - """ - return ( - CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED - ) - - -def normalize_unsynced(lyrics: str) -> str: - """Normalize unsynced lyrics so every line has a [00:00.00] tag. - - - Lines that already have time tags: replace with [00:00.00] - - Lines without time tags: prepend [00:00.00] - - Blank lines are converted to [00:00.00] - """ - out: list[str] = [] - for line in lyrics.splitlines(): - stripped = line.strip() - if not stripped: - out.append("[00:00.00]") - continue - cleaned = _LRC_LINE_RE.sub("", stripped) - while _LRC_LINE_RE.match(cleaned): - cleaned = _LRC_LINE_RE.sub("", cleaned) - out.append(f"[00:00.00]{cleaned}") - return "\n".join(out) - - -def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]: - """Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist.""" - if not audio_url.startswith("file://"): - return None - file_path = unquote(audio_url.replace("file://", "", 1)) - path = Path(file_path) - if ensure_exists and not path.exists(): - return None - return path - - -def get_sidecar_path( - audio_url: str, ensure_audio_exists: bool = False, ensure_exists: bool = False -) -> Optional[Path]: - """Given a file:// URL, return the corresponding .lrc sidecar path. - - If ensure_audio_exists is True, return None if the audio file does not exist. - If ensure_exists is True, return None if the .lrc file does not exist. - """ - audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists) - if not audio_path: - return None - lrc_path = audio_path.with_suffix(".lrc") - if ensure_exists and not lrc_path.exists(): - return None - return lrc_path diff --git a/lrcfetch/models.py b/lrcfetch/models.py deleted file mode 100644 index 775922a..0000000 --- a/lrcfetch/models.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 04:09:36 -Description: Data models -""" - -from enum import Enum -from typing import Optional -from dataclasses import dataclass - - -class CacheStatus(str, Enum): - """Status of a cached lyric entry.""" - - SUCCESS_SYNCED = "SUCCESS_SYNCED" - SUCCESS_UNSYNCED = "SUCCESS_UNSYNCED" - NOT_FOUND = "NOT_FOUND" - NETWORK_ERROR = "NETWORK_ERROR" - - -@dataclass -class TrackMeta: - """Metadata describing a track obtained from MPRIS or manual input.""" - - trackid: Optional[str] = None # Spotify track ID (without "spotify:track:" prefix) - length: Optional[int] = None # Duration in milliseconds - album: Optional[str] = None - artist: Optional[str] = None - title: Optional[str] = None - url: Optional[str] = None # Playback URL (file:// for local files) - - @property - def is_local(self) -> bool: - """True when the track is a local file (file:// URL).""" - return bool(self.url and self.url.startswith("file://")) - - @property - def is_complete(self) -> bool: - """True when all fields required by LRCLIB are present.""" - return all([self.length, self.album, self.title, self.artist]) - - def display_name(self) -> str: - """Human-readable representation for logging.""" - parts = [] - if self.artist: - parts.append(self.artist) - if self.title: - parts.append(self.title) - return " - ".join(parts) if parts else self.trackid or self.url or "(unknown)" - - -@dataclass -class LyricResult: - """Result of a lyric fetch attempt, also used as cache record.""" - - status: CacheStatus - lyrics: Optional[str] = None - source: Optional[str] = None # Which fetcher produced this result - ttl: Optional[int] = None # Hint for cache TTL (seconds) diff --git a/lrcfetch/mpris.py b/lrcfetch/mpris.py deleted file mode 100644 index 58ef486..0000000 --- a/lrcfetch/mpris.py +++ /dev/null @@ -1,188 +0,0 @@ -""" -Author: Uyanide pywang0608@foxmail.com -Date: 2026-03-25 04:44:15 -Description: MPRIS integration for fetching track metadata -""" - -import asyncio -from dbus_next.aio.message_bus import MessageBus -from dbus_next.constants import BusType -from dbus_next.message import Message -from lrcfetch.models import TrackMeta -from lrcfetch.config import PREFERRED_PLAYER -from loguru import logger -from typing import Optional, List, Any - - -async def _list_mpris_players(bus: MessageBus) -> List[str]: - """List all MPRIS player bus names.""" - try: - reply = await bus.call( - Message( - destination="org.freedesktop.DBus", - path="/org/freedesktop/DBus", - interface="org.freedesktop.DBus", - member="ListNames", - ) - ) - if not reply or not reply.body: - return [] - return [ - name for name in reply.body[0] if name.startswith("org.mpris.MediaPlayer2.") - ] - except Exception as e: - logger.error(f"Failed to list DBus names: {e}") - return [] - - -async def _get_playback_status(bus: MessageBus, player_name: str) -> Optional[str]: - """Get PlaybackStatus ('Playing', 'Paused', 'Stopped') for a player.""" - try: - introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2") - proxy = bus.get_proxy_object( - player_name, "/org/mpris/MediaPlayer2", introspection - ) - props = proxy.get_interface("org.freedesktop.DBus.Properties") - status_var = await getattr(props, "call_get")( - "org.mpris.MediaPlayer2.Player", "PlaybackStatus" - ) - return status_var.value if status_var else None - except Exception as e: - logger.debug(f"Could not get playback status for {player_name}: {e}") - return None - - -async def _select_player( - bus: MessageBus, specific_player: Optional[str] = None -) -> Optional[str]: - """Select the best MPRIS player. - - When specific_player is given, filter by name match. - Otherwise: prefer the currently playing player. If multiple are playing, - prefer the one matching LRCFETCH_PLAYER env var (default: spotify). - """ - players = await _list_mpris_players(bus) - if not players: - return None - - if specific_player: - players = [p for p in players if specific_player.lower() in p.lower()] - return players[0] if players else None - - # Check playback status for each player - playing = [] - for p in players: - status = await _get_playback_status(bus, p) - logger.debug(f"Player {p}: {status}") - if status == "Playing": - playing.append(p) - - candidates = playing if playing else players - - if len(candidates) == 1: - return candidates[0] - - # Multiple candidates: prefer LRCFETCH_PLAYER - preferred = PREFERRED_PLAYER.lower() - if preferred: - for p in candidates: - if preferred in p.lower(): - return p - return candidates[0] - - -async def _fetch_metadata_dbus( - specific_player: Optional[str] = None, -) -> Optional[TrackMeta]: - bus = None - try: - bus = await MessageBus(bus_type=BusType.SESSION).connect() - except Exception as e: - logger.error(f"Failed to connect to DBus: {e}") - return None - - try: - player_name = await _select_player(bus, specific_player) - if not player_name: - logger.debug( - f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}." - ) - return None - - logger.debug(f"Using player: {player_name}") - - introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2") - proxy = bus.get_proxy_object( - player_name, "/org/mpris/MediaPlayer2", introspection - ) - - props_iface = proxy.get_interface("org.freedesktop.DBus.Properties") - if not props_iface: - logger.error(f"Player {player_name} doesn't support Properties interface.") - return None - - try: - metadata_var: Any = await getattr(props_iface, "call_get")( - "org.mpris.MediaPlayer2.Player", "Metadata" - ) - if not metadata_var: - logger.error("Empty metadata received.") - return None - - metadata = metadata_var.value - - # Extract trackid — MPRIS returns either "spotify:track:ID" - # or a DBus object path like "/com/spotify/track/ID" - trackid = metadata.get("mpris:trackid", None) - if trackid: - trackid = trackid.value - if isinstance(trackid, str): - if trackid.startswith("spotify:track:"): - trackid = trackid.removeprefix("spotify:track:") - elif trackid.startswith("/com/spotify/track/"): - trackid = trackid.removeprefix("/com/spotify/track/") - - # Extract length (usually microseconds) - length = metadata.get("mpris:length", None) - if length: - length = length.value // 1000 if isinstance(length.value, int) else None - - album = metadata.get("xesam:album", None) - album = album.value if album else None - - artist = metadata.get("xesam:artist", None) - artist = ( - artist.value[0] - if artist and isinstance(artist.value, list) and artist.value - else None - ) - - title = metadata.get("xesam:title", None) - title = title.value if title else None - - url = metadata.get("xesam:url", None) - url = url.value if url else None - - return TrackMeta( - trackid=trackid, - length=length, - album=album, - artist=artist, - title=title, - url=url, - ) - except Exception as e: - logger.error(f"Failed to get properties from {player_name}: {e}") - return None - - finally: - if bus: - bus.disconnect() - - -def get_current_track(player_name: Optional[str] = None) -> Optional[TrackMeta]: - try: - return asyncio.run(_fetch_metadata_dbus(player_name)) - except Exception as e: - logger.error(f"DBus async loop failed: {e}") - return None