rename to lrx

resolve conflicts
This commit is contained in:
2026-03-31 22:24:08 +02:00
parent 09d7945b27
commit 942615348d
57 changed files with 3107 additions and 3 deletions
+1
View File
@@ -0,0 +1 @@
claude --resume 48d54aac-a89b-48c3-8a76-23e9eb73722d
+1
View File
@@ -0,0 +1 @@
SPOTIFY_SP_DC="AQBzQVulTMDML5PjpVXadGZIgaPqfLPTxbv1ZSJI4ypO3UKA-ZcbDqU2omZybyH_vqlM67PM33_og7UEvrxEktk2u6ND1syOEPiXjPzr600KNOLVXr9hwuFD9RcXgOGm_aLaqUlYJcum9Zchjq-upKLmyEWCESdGoQGQT8_maQ4pZ-6wFiL5GaDn1WgpZgjW71zxCVqf8GuJoscSXWhUPo0jJ-1xQvyvVT3YooIf9i1mOG-vDnAFru6xeaH4O2mVWQN6PJPVYNG582E"
+3
View File
@@ -0,0 +1,3 @@
{
"python-envs.defaultEnvManager": "ms-python.python:system"
}
+3
View File
@@ -0,0 +1,3 @@
# Claude.md
The role of this file is to describe common mistakes and confusion points that agents might encounter as they work in this project. If you ever encounter something in the project that surprises you, please alert the developer working with you and indicate that this is the case in this file to help prevent future agents from having the same issue.
View File
+4
View File
@@ -0,0 +1,4 @@
from lrx.cli import run
if __name__ == "__main__":
run()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+441
View File
@@ -0,0 +1,441 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:18:03
Description: SQLite-based lyric cache with per-source storage and TTL expiration
"""
import re
import sqlite3
import hashlib
import time
import unicodedata
from typing import Optional
from loguru import logger
from .config import DB_PATH, DURATION_TOLERANCE_MS
from .models import TrackMeta, LyricResult, CacheStatus
# Punctuation to strip for fuzzy matching (ASCII + fullwidth + CJK brackets/symbols)
_PUNCT_RE = re.compile(
r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`"
r"~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`"
r"「」『』《》〈〉〔〕·•‥…—–]"
)
_SPACE_RE = re.compile(r"\s+")
# feat./ft./featuring and everything after (case-insensitive, word boundary)
_FEAT_RE = re.compile(r"\s*(?:\bfeat\.?\b|\bft\.?\b|\bfeaturing\b).*", re.IGNORECASE)
# Multi-artist separators: /, &, ×, x (surrounded by spaces), ;, 、, vs.
_ARTIST_SEP_RE = re.compile(r"\s*(?:[/&;×、]|\bvs\.?\b|\bx\b)\s*", re.IGNORECASE)
def _normalize_for_match(s: str) -> str:
"""Normalize a string for fuzzy comparison.
Lowercases, NFKC-normalizes (fullwidth → halfwidth), strips punctuation,
and collapses whitespace.
"""
s = unicodedata.normalize("NFKC", s).lower()
s = _FEAT_RE.sub("", s)
s = _PUNCT_RE.sub(" ", s)
s = _SPACE_RE.sub(" ", s).strip()
return s
def _normalize_artist(s: str) -> str:
"""Normalize an artist string: split by separators, normalize each, sort.
Splits first (on /, &, ;, ×, 、, vs., x), then strips feat./ft./featuring
from each part individually, so 'A feat. C / B' → ['a', 'b'] not just ['a'].
"""
s = unicodedata.normalize("NFKC", s).lower()
parts = _ARTIST_SEP_RE.split(s)
normed = sorted(
{_normalize_for_match(p) for p in parts if _FEAT_RE.sub("", p).strip()}
)
return "\0".join(normed) if normed else _normalize_for_match(s)
def _generate_key(track: TrackMeta, source: str) -> str:
"""Generate a unique cache key from track metadata and source.
The key is scoped by source so that different fetchers can cache
independently for the same track (e.g. Spotify synced vs Netease unsynced).
"""
# Spotify tracks always use their track ID as the primary identifier
if track.trackid and source == "spotify":
return f"spotify:{track.trackid}"
parts = []
if track.artist:
parts.append(track.artist)
if track.title:
parts.append(track.title)
if track.album:
parts.append(track.album)
if track.length:
parts.append(str(track.length))
# Fall back to URL for local files
if not parts and track.url:
return f"{source}:url:{track.url}"
if not parts:
raise ValueError("Insufficient metadata to generate cache key")
raw = "|".join(parts)
digest = hashlib.sha256(raw.encode()).hexdigest()
return f"{source}:{digest}"
class CacheEngine:
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
self._init_db()
def _init_db(self) -> None:
"""Create or migrate the cache table."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
key TEXT PRIMARY KEY,
source TEXT NOT NULL,
status TEXT NOT NULL,
lyrics TEXT,
created_at INTEGER NOT NULL,
expires_at INTEGER,
artist TEXT,
title TEXT,
album TEXT,
length INTEGER
)
""")
# Migration: add length column if missing
cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
if "length" not in cols:
conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER")
conn.commit()
# Read
def get(self, track: TrackMeta, source: str) -> Optional[LyricResult]:
"""Look up a cached result for *track* from *source*.
Returns None on cache miss or expiration.
"""
try:
key = _generate_key(track, source)
except ValueError:
return None
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT status, lyrics, source, expires_at, length FROM cache WHERE key = ?",
(key,),
).fetchone()
if not row:
logger.debug(f"Cache miss: {source} / {track.display_name()}")
return None
status_str, lyrics, src, expires_at, cached_length = row
# Check TTL expiration
if expires_at and expires_at < int(time.time()):
logger.debug(f"Cache expired: {source} / {track.display_name()}")
conn.execute("DELETE FROM cache WHERE key = ?", (key,))
conn.commit()
return None
# Backfill length if the cached row is missing it
if cached_length is None and track.length is not None:
conn.execute(
"UPDATE cache SET length = ? WHERE key = ?",
(track.length, key),
)
conn.commit()
remaining = expires_at - int(time.time()) if expires_at else None
logger.debug(
f"Cache hit: {source} / {track.display_name()} "
f"[{status_str}, ttl={remaining}s]"
)
return LyricResult(
status=CacheStatus(status_str),
lyrics=lyrics,
source=src,
ttl=remaining,
)
def get_best(self, track: TrackMeta, sources: list[str]) -> Optional[LyricResult]:
"""Return the best cached result across *sources* (synced > unsynced).
Skips negative statuses (NOT_FOUND, NETWORK_ERROR) — those are only
consulted per-source to avoid redundant fetches.
"""
best: Optional[LyricResult] = None
for src in sources:
cached = self.get(track, src)
if not cached:
continue
if cached.status == CacheStatus.SUCCESS_SYNCED:
return cached # Can't do better
if cached.status == CacheStatus.SUCCESS_UNSYNCED and best is None:
best = cached
return best
# Write
def set(
self,
track: TrackMeta,
source: str,
result: LyricResult,
ttl_seconds: Optional[int] = None,
) -> None:
"""Store a lyric result in the cache."""
try:
key = _generate_key(track, source)
except ValueError:
logger.warning("Cannot cache: insufficient track metadata.")
return
now = int(time.time())
expires_at = now + ttl_seconds if ttl_seconds else None
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO cache
(key, source, status, lyrics, created_at, expires_at,
artist, title, album, length)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
key,
source,
result.status.value,
result.lyrics,
now,
expires_at,
track.artist,
track.title,
track.album,
track.length,
),
)
conn.commit()
logger.debug(
f"Cached: {source} / {track.display_name()} "
f"[{result.status.value}, ttl={ttl_seconds}s]"
)
# Delete
def clear_all(self) -> None:
"""Remove every entry from the cache."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM cache")
conn.commit()
logger.info("Cache cleared.")
def clear_track(self, track: TrackMeta) -> None:
"""Remove all cached entries (every source) for a single track."""
conditions, params = self._track_where(track)
if not conditions:
logger.info(f"No cache entries found for {track.display_name()}.")
return
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
cur = conn.execute(f"DELETE FROM cache WHERE {where}", params)
conn.commit()
if cur.rowcount:
logger.info(
f"Cleared {cur.rowcount} cache entries for {track.display_name()}."
)
else:
logger.info(f"No cache entries found for {track.display_name()}.")
def prune(self) -> int:
"""Remove all expired entries. Returns the number of rows deleted."""
with sqlite3.connect(self.db_path) as conn:
cur = conn.execute(
"DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
(int(time.time()),),
)
conn.commit()
count = cur.rowcount
logger.info(f"Pruned {count} expired cache entries.")
return count
@staticmethod
def _track_where(track: TrackMeta) -> tuple[list[str], list[str]]:
"""Build WHERE conditions to match a track across all sources."""
conditions: list[str] = []
params: list[str] = []
if track.artist:
conditions.append("artist = ?")
params.append(track.artist)
if track.title:
conditions.append("title = ?")
params.append(track.title)
if track.album:
conditions.append("album = ?")
params.append(track.album)
return conditions, params
# Exact cross-source search
def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]:
"""Find the best positive (synced/unsynced) cache entry for *track*.
Uses exact metadata match (artist + title + album) across all sources.
Returns synced if available, otherwise unsynced, or None.
"""
conditions, params = self._track_where(track)
if not conditions:
return None
now = int(time.time())
conditions.append("status IN (?, ?)")
params.extend(
[CacheStatus.SUCCESS_SYNCED.value, CacheStatus.SUCCESS_UNSYNCED.value]
)
conditions.append("(expires_at IS NULL OR expires_at > ?)")
params.append(str(now))
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
f"SELECT status, lyrics, source FROM cache WHERE {where} "
"ORDER BY CASE status WHEN ? THEN 0 ELSE 1 END LIMIT 1",
params + [CacheStatus.SUCCESS_SYNCED.value],
).fetchall()
if not rows:
return None
row = dict(rows[0])
return LyricResult(
status=CacheStatus(row["status"]),
lyrics=row["lyrics"],
source="cache-search",
)
# Fuzzy search
def search_by_meta(
self,
artist: Optional[str],
title: Optional[str],
length: Optional[int] = None,
) -> list[dict]:
"""Search cache for lyrics matching artist/title with fuzzy normalization.
Ignores album and source. Only returns positive results (synced/unsynced)
that have not expired. When *length* is provided, filters by duration
tolerance and sorts by closest match.
"""
if not title:
return []
now = int(time.time())
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""SELECT * FROM cache
WHERE status IN (?, ?)
AND (expires_at IS NULL OR expires_at > ?)""",
(
CacheStatus.SUCCESS_SYNCED.value,
CacheStatus.SUCCESS_UNSYNCED.value,
now,
),
).fetchall()
norm_title = _normalize_for_match(title)
norm_artist = _normalize_artist(artist) if artist else None
matches: list[dict] = []
for row in rows:
row_dict = dict(row)
# Title must match
row_title = row_dict.get("title") or ""
if _normalize_for_match(row_title) != norm_title:
continue
# Artist must match if provided
if norm_artist:
row_artist = row_dict.get("artist") or ""
if _normalize_artist(row_artist) != norm_artist:
continue
matches.append(row_dict)
# Duration filtering
if length is not None and matches:
scored = []
for m in matches:
row_len = m.get("length")
if row_len is not None:
diff = abs(row_len - length)
if diff <= DURATION_TOLERANCE_MS:
scored.append((diff, m))
else:
# No duration info in cache — still a candidate but lower priority
scored.append((DURATION_TOLERANCE_MS, m))
scored.sort(
key=lambda x: (
x[0],
x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value,
)
)
matches = [m for _, m in scored]
return matches
# Query / inspect
def query_track(self, track: TrackMeta) -> list[dict]:
"""Return all cached rows for a given track (across all sources)."""
conditions, params = self._track_where(track)
if not conditions:
return []
where = " AND ".join(conditions)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
return [
dict(r)
for r in conn.execute(
f"SELECT * FROM cache WHERE {where}", params
).fetchall()
]
def query_all(self) -> list[dict]:
"""Return every row in the cache table."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
def stats(self) -> dict:
"""Return aggregate cache statistics."""
now = int(time.time())
with sqlite3.connect(self.db_path) as conn:
total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
expired = conn.execute(
"SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
(now,),
).fetchone()[0]
by_status = dict(
conn.execute(
"SELECT status, COUNT(*) FROM cache GROUP BY status"
).fetchall()
)
by_source = dict(
conn.execute(
"SELECT source, COUNT(*) FROM cache GROUP BY source"
).fetchall()
)
return {
"total": total,
"expired": expired,
"active": total - expired,
"by_status": by_status,
"by_source": by_source,
}
+426
View File
@@ -0,0 +1,426 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-26 02:04:39
Description: CLI interface
"""
import sys
import time
import os
from pathlib import Path
from typing import Annotated
from urllib.parse import quote
import cyclopts
from loguru import logger
from .config import enable_debug
from .models import TrackMeta, CacheStatus
from .mpris import get_current_track
from .core import LrcManager
from .fetchers import FetcherMethodType
from .lrc import get_sidecar_path
app = cyclopts.App(
help="LRCFetch — Fetch line-synced lyrics for your music player.",
)
app.register_install_completion_command()
cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.")
app.command(cache_app)
manager = LrcManager()
# Global state set by the meta launcher
_player: str | None = None
@app.meta.default
def launcher(
*tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
debug: Annotated[
bool,
cyclopts.Parameter(
name=["--debug", "-d"], negative="", help="Enable debug logging."
),
] = False,
player: Annotated[
str | None,
cyclopts.Parameter(
name=["--player", "-p"],
help="Target a specific MPRIS player using its DBus name or a portion thereof.",
),
] = None,
):
global _player
if debug:
enable_debug()
_player = player
app(tokens)
# fetch
@app.command
def fetch(
*,
method: Annotated[
FetcherMethodType | None,
cyclopts.Parameter(help="Force a specific source."),
] = None,
no_cache: Annotated[
bool,
cyclopts.Parameter(
name="--no-cache", negative="", help="Bypass the cache for this request."
),
] = False,
only_synced: Annotated[
bool,
cyclopts.Parameter(
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
),
] = False,
):
"""Fetch and print lyrics for the currently playing track."""
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
logger.info(f"Track: {track.display_name()}")
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
if not result or not result.lyrics:
logger.error("No lyrics found.")
sys.exit(1)
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print(result.lyrics)
# search
@app.command
def search(
*,
title: Annotated[
str | None, cyclopts.Parameter(name=["--title", "-t"], help="Track title.")
] = None,
artist: Annotated[
str | None, cyclopts.Parameter(name=["--artist", "-a"], help="Artist name.")
] = None,
album: Annotated[str | None, cyclopts.Parameter(help="Album name.")] = None,
trackid: Annotated[str | None, cyclopts.Parameter(help="Spotify track ID.")] = None,
length: Annotated[
int | None,
cyclopts.Parameter(
name=["--length", "-l"], help="Track duration in milliseconds."
),
] = None,
url: Annotated[
str | None,
cyclopts.Parameter(
help="Local file URL (file:///...). Mutually exclusive with --path."
),
] = None,
path: Annotated[
str | None,
cyclopts.Parameter(
name=["--path"],
help="Local audio file path. Mutually exclusive with --url.",
),
] = None,
method: Annotated[
FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
] = None,
no_cache: Annotated[
bool,
cyclopts.Parameter(
name="--no-cache", negative="", help="Bypass the cache for this request."
),
] = False,
only_synced: Annotated[
bool,
cyclopts.Parameter(
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
),
] = False,
):
"""Search for lyrics by metadata (bypasses MPRIS)."""
if url and path:
logger.error("--url and --path are mutually exclusive.")
sys.exit(1)
if path:
resolved = str(Path(path).resolve())
url = "file://" + quote(resolved, safe="/")
track = TrackMeta(
title=title,
artist=artist,
album=album,
trackid=trackid,
length=length,
url=url,
)
logger.info(f"Track: {track.display_name()}")
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
if not result or not result.lyrics:
logger.error("No lyrics found.")
sys.exit(1)
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print(result.lyrics)
# export
@app.command
def export(
*,
output: Annotated[
str | None,
cyclopts.Parameter(
name=["--output", "-o"],
help="Output file path (default: same directory as audio file with .lrc extension, or current directory if not available).",
),
] = None,
method: Annotated[
FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
] = None,
no_cache: Annotated[
bool, cyclopts.Parameter(name="--no-cache", negative="", help="Bypass cache.")
] = False,
overwrite: Annotated[
bool,
cyclopts.Parameter(
name=["--overwrite", "-f"], negative="", help="Overwrite existing file."
),
] = False,
):
"""Export lyrics of the current track to a .lrc file."""
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
if not result or not result.lyrics:
logger.error("No lyrics available to export.")
sys.exit(1)
# Build default output path
if not output:
if track.url:
lrc_path = get_sidecar_path(track.url, ensure_exists=False)
if lrc_path:
output = str(lrc_path)
logger.info(f"Exporting to sidecar path: {output}")
# Fallback to current directory with sanitized filename
if not output:
filename = (
f"{track.artist} - {track.title}.lrc"
if track.artist and track.title
else "lyrics.lrc"
)
# Sanitize filename
filename = "".join(
c for c in filename if c.isalpha() or c.isdigit() or c in " -_."
).rstrip()
output = os.path.join(os.getcwd(), filename)
if os.path.exists(output) and not overwrite:
logger.error(f"File exists: {output} (use -f to overwrite)")
sys.exit(1)
try:
with open(output, "w", encoding="utf-8") as f:
f.write(result.lyrics)
logger.info(f"Exported lyrics to {output}")
except Exception as e:
logger.error(f"Failed to write file: {e}")
sys.exit(1)
# cache subcommands
@cache_app.command
def query(
*,
all: Annotated[
bool,
cyclopts.Parameter(name="--all", negative="", help="Dump all cache entries."),
] = False,
):
"""Show cached entries for the current track."""
if all:
rows = manager.cache.query_all()
if not rows:
print("Cache is empty.")
return
for row in rows:
_print_cache_row(row)
print()
return
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
_print_track_cache(track)
@cache_app.command
def clear(
*,
all: Annotated[
bool,
cyclopts.Parameter(name="--all", negative="", help="Clear the entire cache."),
] = False,
):
"""Clear cached entries for the current track."""
if all:
manager.cache.clear_all()
return
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
manager.cache.clear_track(track)
@cache_app.command
def prune():
"""Remove expired cache entries."""
manager.cache.prune()
@cache_app.command
def stats():
"""Show cache statistics."""
s = manager.cache.stats()
print("=== Cache Statistics ===")
print(f"Total entries : {s['total']}")
print(f"Active : {s['active']}")
print(f"Expired : {s['expired']}")
if s["by_status"]:
print("\nBy status:")
for status, count in s["by_status"].items():
print(f" {status}: {count}")
if s["by_source"]:
print("\nBy source:")
for source, count in s["by_source"].items():
print(f" {source}: {count}")
@cache_app.command
def insert(
*,
path: Annotated[
str | None,
cyclopts.Parameter(
name=["--path"],
help="Path to a local .lrc file to insert instead of reading from stdin.",
),
] = None,
):
"""Manually insert lyrics into the cache for the current track."""
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
if path:
try:
with open(path, "r", encoding="utf-8") as f:
lyrics = f.read()
except Exception as e:
logger.error(f"Failed to read file: {e}")
sys.exit(1)
else:
logger.info("Reading lyrics from stdin (Ctrl+D to finish)...")
lyrics = sys.stdin.read()
manager.manual_insert(track, lyrics)
# helpers
def _print_track_cache(track: TrackMeta) -> None:
"""Print all cached entries for a given track."""
print(f"Track: {track.display_name()}")
if track.album:
print(f"Album: {track.album}")
if track.length:
secs = track.length / 1000.0
print(f"Duration: {int(secs // 60)}:{secs % 60:05.2f}")
print()
rows = manager.cache.query_track(track)
if not rows:
print(" (no cache entries)")
return
for row in rows:
_print_cache_row(row, indent=" ")
def _print_cache_row(row: dict, indent: str = "") -> None:
"""Pretty-print a single cache row."""
now = int(time.time())
source = row.get("source", "?")
status = row.get("status", "?")
artist = row.get("artist", "")
title = row.get("title", "")
album = row.get("album", "")
created = row.get("created_at", 0)
expires = row.get("expires_at")
lyrics = row.get("lyrics", "")
name = f"{artist} - {title}" if artist and title else row.get("key", "?")
print(f"{indent}[{source}] {name}")
if album:
print(f"{indent} Album : {album}")
print(f"{indent} Status : {status}")
if created:
age = now - created
print(f"{indent} Cached : {age // 3600}h {(age % 3600) // 60}m ago")
if expires:
remaining = expires - now
if remaining > 0:
print(
f"{indent} Expires : in {remaining // 3600}h {(remaining % 3600) // 60}m"
)
else:
print(f"{indent} Expires : EXPIRED")
else:
print(f"{indent} Expires : never")
if lyrics:
line_count = len(lyrics.splitlines())
print(f"{indent} Lyrics : {line_count} lines")
def run():
app.meta()
if __name__ == "__main__":
run()
+88
View File
@@ -0,0 +1,88 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:17:56
Description: Global configuration constants and logger setup
"""
import os
import sys
from pathlib import Path
from platformdirs import user_cache_dir, user_config_dir
from dotenv import load_dotenv
from loguru import logger
from importlib.metadata import version
# Application
APP_NAME = "lrcfetch"
APP_AUTHOR = "Uyanide"
APP_VERSION = version(APP_NAME)
# Paths
CACHE_DIR = user_cache_dir(APP_NAME, APP_AUTHOR)
DB_PATH = os.path.join(CACHE_DIR, "cache.db")
# .env loading
_config_env = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / ".env"
load_dotenv(_config_env) # ~/.config/lrcfetch/.env
load_dotenv() # .env in cwd (does NOT override existing vars)
# HTTP
HTTP_TIMEOUT = 10.0
# Cache TTLs (seconds)
TTL_SYNCED = None # never expires
TTL_UNSYNCED = 86400 # 1 day
TTL_NOT_FOUND = 86400 * 3 # 3 days
TTL_NETWORK_ERROR = 3600 # 1 hour
# Search
DURATION_TOLERANCE_MS = 3000 # max duration mismatch for search matching
# Spotify related
SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token"
SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time"
SPOTIFY_SECRET_URL = (
"https://raw.githubusercontent.com/xyloflake/spot-secrets-go"
"/refs/heads/main/secrets/secrets.json"
)
SPOTIFY_SP_DC = os.environ.get("SPOTIFY_SP_DC", "")
SPOTIFY_TOKEN_CACHE_FILE = os.path.join(CACHE_DIR, "spotify_token.json")
SPOTIFY_APP_VERSION = "1.2.87.284.g3ff41c13"
# Netease api
NETEASE_SEARCH_URL = "https://music.163.com/api/cloudsearch/pc"
NETEASE_LYRIC_URL = "https://interface3.music.163.com/api/song/lyric"
# LRCLIB api
LRCLIB_API_URL = "https://lrclib.net/api/get"
LRCLIB_SEARCH_URL = "https://lrclib.net/api/search"
# QQ Music API (self-hosted proxy)
QQ_MUSIC_API_URL = os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/")
# Player preference (used when multiple MPRIS players are active)
PREFERRED_PLAYER = os.environ.get("LRCFETCH_PLAYER", "spotify")
# User-Agents
UA_BROWSER = "Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0"
UA_LRCFETCH = f"LRCFetch {APP_VERSION} (https://github.com/Uyanide/lrcfetch)"
os.makedirs(CACHE_DIR, exist_ok=True)
# Logger
_LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
logger.remove()
logger.add(sys.stderr, format=_LOG_FORMAT, level="INFO")
def enable_debug() -> None:
"""Switch logger to DEBUG level."""
logger.remove()
logger.add(sys.stderr, format=_LOG_FORMAT, level="DEBUG")
+178
View File
@@ -0,0 +1,178 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 11:09:53
Description: Core orchestrator — coordinates fetchers with cache-aware fallback
"""
"""
Fetch pipeline:
1. Check cache for each source in the fallback sequence
2. For sources without a valid cache hit, call the fetcher
3. Cache every result (success, not-found, or error) per source
4. Return the best result (synced > unsynced > None)
"""
from typing import Optional
from loguru import logger
from .fetchers import FetcherMethodType, create_fetchers
from .fetchers.base import BaseFetcher
from .cache import CacheEngine
from .lrc import normalize_tags, normalize_unsynced, detect_sync_status
from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR
from .models import TrackMeta, LyricResult, CacheStatus
from .enrichers import enrich_track
# Maps CacheStatus to the default TTL used when storing results
_STATUS_TTL: dict[CacheStatus, Optional[int]] = {
CacheStatus.SUCCESS_SYNCED: TTL_SYNCED,
CacheStatus.SUCCESS_UNSYNCED: TTL_UNSYNCED,
CacheStatus.NOT_FOUND: TTL_NOT_FOUND,
CacheStatus.NETWORK_ERROR: TTL_NETWORK_ERROR,
}
class LrcManager:
"""Main entry point for fetching lyrics with caching."""
def __init__(self) -> None:
self.cache = CacheEngine()
self.fetchers = create_fetchers(self.cache)
def _build_sequence(
self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None
) -> list[BaseFetcher]:
"""Determine the ordered list of fetchers to try."""
if force_method:
if force_method not in self.fetchers:
logger.error(f"Unknown method: {force_method}")
return []
return [self.fetchers[force_method]]
sequence: list[BaseFetcher] = []
for method in self.fetchers.keys():
if self.fetchers[method].is_available(track):
sequence.append(self.fetchers[method])
logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}")
return sequence
def fetch_for_track(
self,
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
bypass_cache: bool = False,
) -> Optional[LyricResult]:
"""Fetch lyrics for *track* using the fallback pipeline.
Each source is checked against the cache independently:
- Cache hit with synced lyrics → return immediately
- Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source
- Cache miss or unsynced → call fetcher, then cache the result
After all sources are tried, returns the best result found
(synced > unsynced > None).
"""
track = enrich_track(track)
logger.info(f"Fetching lyrics for: {track.display_name()}")
sequence = self._build_sequence(track, force_method)
if not sequence:
return None
# Best result seen so far (synced wins over unsynced)
best_result: Optional[LyricResult] = None
for fetcher in sequence:
source = fetcher.source_name
# Cache check (skip for fetchers that handle their own caching)
if not bypass_cache and not fetcher.self_cached:
cached = self.cache.get(track, source)
if cached:
if cached.status == CacheStatus.SUCCESS_SYNCED:
logger.info(f"[{source}] cache hit: synced lyrics")
return cached
elif cached.status == CacheStatus.SUCCESS_UNSYNCED:
logger.debug(
f"[{source}] cache hit: unsynced lyrics (continuing)"
)
if best_result is None:
best_result = cached
continue # Try next source for synced
elif cached.status in (
CacheStatus.NOT_FOUND,
CacheStatus.NETWORK_ERROR,
):
logger.debug(
f"[{source}] cache hit: {cached.status.value}, skipping"
)
continue
elif not fetcher.self_cached:
logger.debug(f"[{source}] cache bypassed")
# Fetch
logger.debug(f"[{source}] calling fetcher...")
result = fetcher.fetch(track, bypass_cache=bypass_cache)
if not result:
logger.debug(f"[{source}] returned None (no result)")
continue
# Cache the result (skip for self-cached fetchers)
if not fetcher.self_cached:
ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND)
self.cache.set(track, source, result, ttl_seconds=ttl)
# Evaluate result
if result.status == CacheStatus.SUCCESS_SYNCED:
logger.info(f"[{source}] got synced lyrics")
return result
if result.status == CacheStatus.SUCCESS_UNSYNCED:
logger.debug(f"[{source}] got unsynced lyrics (continuing)")
if best_result is None:
best_result = result
# NOT_FOUND / NETWORK_ERROR: already cached, try next
# Return best available
if best_result:
# Normalize unsynced lyrics: set all timestamps to [00:00.00]
if (
best_result.status == CacheStatus.SUCCESS_UNSYNCED
and best_result.lyrics
):
best_result = LyricResult(
status=best_result.status,
lyrics=normalize_unsynced(best_result.lyrics),
source=best_result.source,
ttl=best_result.ttl,
)
logger.info(
f"Returning unsynced lyrics from {best_result.source} "
f"(no synced source found)"
)
else:
logger.info(f"No lyrics found for {track.display_name()}")
return best_result
def manual_insert(
self,
track: TrackMeta,
lyrics: str,
) -> None:
"""Manually insert lyrics into the cache for a track."""
track = enrich_track(track)
logger.info(f"Manually inserting lyrics for: {track.display_name()}")
lyrics = normalize_tags(lyrics)
result = LyricResult(
status=detect_sync_status(lyrics),
lyrics=normalize_tags(lyrics),
source="manual",
ttl=None,
)
self.cache.set(track, "manual", result, ttl_seconds=None)
logger.info("Lyrics inserted into cache.")
+40
View File
@@ -0,0 +1,40 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:09:11
Description: Metadata enrichment pipeline
"""
from loguru import logger
from .base import BaseEnricher
from .audio_tag import AudioTagEnricher
from .file_name import FileNameEnricher
from ..models import TrackMeta
# Enrichers run in order; earlier ones have higher priority.
_ENRICHERS: list[BaseEnricher] = [
AudioTagEnricher(),
FileNameEnricher(),
]
def enrich_track(track: TrackMeta) -> TrackMeta:
"""Run all enrichers and return a track with missing fields filled in.
Each enricher sees the cumulative state (earlier enrichers' results
are already applied). A field is only set if it is currently None.
"""
for enricher in _ENRICHERS:
try:
result = enricher.enrich(track)
except Exception as e:
logger.warning(f"Enricher {enricher.name} failed: {e}")
continue
if not result:
continue
# Only apply fields that are still None
updates = {k: v for k, v in result.items() if getattr(track, k, None) is None}
if updates:
for k, v in updates.items():
setattr(track, k, v)
return track
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+78
View File
@@ -0,0 +1,78 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:11:27
Description: Enricher that reads metadata from audio file tags (mutagen)
"""
from typing import Optional
from loguru import logger
from mutagen._file import File, FileType
from .base import BaseEnricher
from ..models import TrackMeta
from ..lrc import get_audio_path
class AudioTagEnricher(BaseEnricher):
"""Extract title, artist, album, and duration from audio file tags."""
@property
def name(self) -> str:
return "audio-tag"
def enrich(self, track: TrackMeta) -> Optional[dict]:
if not track.is_local or not track.url:
return None
audio_path = get_audio_path(track.url, ensure_exists=True)
if not audio_path:
return None
try:
audio = File(audio_path)
except Exception as e:
logger.debug(f"AudioTag: failed to read {audio_path}: {e}")
return None
if audio is None:
return None
updates: dict = {}
# Try common tag names (vorbis comments, ID3, MP4)
title = _first_tag(audio, "title", "TIT2", "\xa9nam")
if title and not track.title:
updates["title"] = title
artist = _first_tag(audio, "artist", "TPE1", "\xa9ART")
if artist and not track.artist:
updates["artist"] = artist
album = _first_tag(audio, "album", "TALB", "\xa9alb")
if album and not track.album:
updates["album"] = album
if not track.length and audio.info and hasattr(audio.info, "length"):
length_ms = int(audio.info.length * 1000)
if length_ms > 0:
updates["length"] = length_ms
if updates:
logger.debug(f"AudioTag: enriched fields: {list(updates.keys())}")
return updates or None
def _first_tag(audio: FileType, *keys: str) -> Optional[str]:
"""Return the first non-empty string value found among the given tag keys."""
if not audio.tags:
return None
for key in keys:
val = audio.tags.get(key)
if val is None:
continue
# mutagen returns lists for vorbis, single values for ID3
if isinstance(val, list):
val = val[0] if val else None
if val:
return str(val).strip()
return None
+31
View File
@@ -0,0 +1,31 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:08:16
Description: Base class for metadata enrichers
"""
from abc import ABC, abstractmethod
from typing import Optional
from ..models import TrackMeta
class BaseEnricher(ABC):
"""Attempts to fill missing fields on a TrackMeta.
Each enricher inspects the track, and returns a dict of field names
to values for any fields it can provide. Only fields that are
currently ``None`` on the track will actually be applied.
"""
@property
@abstractmethod
def name(self) -> str: ...
@abstractmethod
def enrich(self, track: TrackMeta) -> Optional[dict]:
"""Return a dict of {field_name: value} for fields this enricher can fill.
Return None or an empty dict if nothing can be contributed.
"""
...
+83
View File
@@ -0,0 +1,83 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:08:44
Description: Enricher that parses metadata from the audio file path
"""
import re
from typing import Optional
from loguru import logger
from .base import BaseEnricher
from ..models import TrackMeta
from ..lrc import get_audio_path
# Common track-number prefixes: "01 - ", "01. ", "1 - ", etc.
_TRACK_NUM_RE = re.compile(r"^\d{1,3}[\s.\-]+")
class FileNameEnricher(BaseEnricher):
"""Derive artist / title from the file path when tags are unavailable.
Heuristics (applied to the stem of the filename):
- "Artist - Title" → artist, title
- "01 - Title" → title only (leading track number stripped)
- "Title" → title only
If artist is still missing after parsing the filename, the parent
directory name is used as a guess (common layout: ``Artist/Album/track``).
"""
@property
def name(self) -> str:
return "file-name"
def enrich(self, track: TrackMeta) -> Optional[dict]:
if not track.is_local or not track.url:
return None
audio_path = get_audio_path(track.url, ensure_exists=False)
if not audio_path:
return None
updates: dict = {}
stem = audio_path.stem
# Try "Artist - Title" split
if " - " in stem:
left, right = stem.split(" - ", 1)
left = _TRACK_NUM_RE.sub("", left).strip()
right = right.strip()
if left and right:
# Both sides non-empty after stripping track number
if not track.artist:
updates["artist"] = left
if not track.title:
updates["title"] = right
elif right:
# Left was only a track number → right is the title
if not track.title:
updates["title"] = right
else:
# No separator: strip track number, remainder is title
title_guess = _TRACK_NUM_RE.sub("", stem).strip()
if title_guess and not track.title:
updates["title"] = title_guess
# Use parent directory as artist fallback
# Typical layout: /Music/Artist/Album/01 - Track.flac
if not track.artist and "artist" not in updates:
parents = audio_path.parents
if len(parents) >= 2:
album_dir = parents[0].name
artist_dir = parents[1].name
if artist_dir and artist_dir not in (".", "/"):
updates["artist"] = artist_dir
if not track.album and album_dir and album_dir != artist_dir:
updates["album"] = album_dir
if updates:
logger.debug(f"FileName: enriched fields: {list(updates.keys())}")
return updates or None
+41
View File
@@ -0,0 +1,41 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 02:33:26
Description: Fetcher pipeline — registry and types
"""
from typing import Literal
from .base import BaseFetcher
from .local import LocalFetcher
from .cache_search import CacheSearchFetcher
from .spotify import SpotifyFetcher
from .lrclib import LrclibFetcher
from .lrclib_search import LrclibSearchFetcher
from .netease import NeteaseFetcher
from .qqmusic import QQMusicFetcher
from ..cache import CacheEngine
FetcherMethodType = Literal[
"local",
"cache-search",
"spotify",
"lrclib",
"lrclib-search",
"netease",
"qqmusic",
]
def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]:
"""Instantiate all fetchers. Returns a dict keyed by source name."""
fetchers: dict[FetcherMethodType, BaseFetcher] = {
"local": LocalFetcher(),
"cache-search": CacheSearchFetcher(cache),
"spotify": SpotifyFetcher(),
"lrclib": LrclibFetcher(),
"lrclib-search": LrclibSearchFetcher(),
"netease": NeteaseFetcher(),
"qqmusic": QQMusicFetcher(),
}
return fetchers
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+35
View File
@@ -0,0 +1,35 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 02:33:26
Description: Base fetcher class and common interfaces
"""
from abc import ABC, abstractmethod
from typing import Optional
from ..models import TrackMeta, LyricResult
class BaseFetcher(ABC):
@property
@abstractmethod
def source_name(self) -> str:
"""Name of the fetcher source."""
pass
@property
def self_cached(self) -> bool:
"""True if this fetcher manages its own cache (skip per-source cache check)."""
return False
@abstractmethod
def is_available(self, track: TrackMeta) -> bool:
"""Check if the fetcher is available for the given track (e.g. has required metadata)."""
pass
@abstractmethod
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Fetch lyrics for the given track. Returns None if unable to fetch."""
pass
+85
View File
@@ -0,0 +1,85 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-28 05:57:46
Description: Cache-search fetcher — cross-album fuzzy lookup in the local cache
"""
"""
Searches existing cache entries by artist + title with fuzzy normalization,
ignoring album and source. Useful when the same track appears on different
albums or is played from different players.
"""
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..cache import CacheEngine
class CacheSearchFetcher(BaseFetcher):
def __init__(self, cache: CacheEngine) -> None:
self._cache = cache
@property
def source_name(self) -> str:
return "cache-search"
@property
def self_cached(self) -> bool:
return True
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
if bypass_cache:
logger.debug("Cache-search: bypassed by caller")
return None
if not track.title:
logger.debug("Cache-search: skipped — no title")
return None
# Fast path: exact metadata match (artist+title+album), single SQL query
exact = self._cache.find_best_positive(track)
if exact:
logger.info(f"Cache-search: exact hit ({exact.status.value})")
return exact
# Slow path: fuzzy cross-album search
matches = self._cache.search_by_meta(
artist=track.artist,
title=track.title,
length=track.length,
)
if not matches:
logger.debug(f"Cache-search: no match for {track.display_name()}")
return None
# Pick best: prefer synced, then first available
best = None
for m in matches:
if m.get("status") == CacheStatus.SUCCESS_SYNCED.value:
best = m
break
if best is None:
best = m
if not best or not best.get("lyrics"):
return None
status = CacheStatus(best["status"])
logger.info(
f"Cache-search: fuzzy hit from [{best.get('source')}] "
f"album={best.get('album')!r} ({status.value})"
)
return LyricResult(
status=status,
lyrics=best["lyrics"],
source=self.source_name,
)
+98
View File
@@ -0,0 +1,98 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-26 02:08:41
Description: Local fetcher — reads lyrics from .lrc sidecar files or embedded audio metadata
"""
"""
Priority:
1. Same-directory .lrc file (e.g. /path/to/track.lrc)
2. Embedded lyrics in audio metadata (FLAC, MP3 USLT/SYLT tags)
"""
from typing import Optional
from loguru import logger
from mutagen._file import File
from mutagen.flac import FLAC
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult
from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path
class LocalFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "local"
def is_available(self, track: TrackMeta) -> bool:
return track.is_local
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Attempt to read lyrics from local filesystem."""
if not track.is_local or not track.url:
return None
audio_path = get_audio_path(track.url, ensure_exists=False)
if not audio_path:
logger.debug(f"Local: audio URL is not a valid file path: {track.url}")
return None
lrc_path = get_sidecar_path(
track.url, ensure_audio_exists=False, ensure_exists=True
)
if lrc_path:
try:
with open(lrc_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if content:
content = normalize_tags(content)
status = detect_sync_status(content)
logger.info(f"Local: found .lrc sidecar ({status.value})")
return LyricResult(
status=status, lyrics=content, source=self.source_name
)
except Exception as e:
logger.error(f"Local: error reading {lrc_path}: {e}")
else:
logger.debug(f"Local: no .lrc sidecar found for {audio_path}")
# Embedded metadata
if not audio_path.exists():
logger.debug(f"Local: audio file does not exist: {audio_path}")
return None
try:
audio = File(audio_path)
if audio is not None:
lyrics = None
if isinstance(audio, FLAC):
# FLAC stores lyrics in vorbis comment tags
lyrics = (
audio.get("lyrics") or audio.get("unsynclyrics") or [None]
)[0]
elif hasattr(audio, "tags") and audio.tags:
# MP3 / other: look for USLT or SYLT ID3 frames
for key in audio.tags.keys():
if key.startswith("USLT") or key.startswith("SYLT"):
lyrics = str(audio.tags[key])
break
if lyrics:
lyrics = normalize_tags(lyrics.strip())
status = detect_sync_status(lyrics)
logger.info(f"Local: found embedded lyrics ({status.value})")
return LyricResult(
status=status,
lyrics=lyrics,
source=f"{self.source_name} (embedded)",
)
else:
logger.debug("Local: no embedded lyrics found")
except Exception as e:
logger.error(f"Local: error reading metadata for {audio_path}: {e}")
logger.debug(f"Local: no lyrics found for {audio_path}")
return None
+111
View File
@@ -0,0 +1,111 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 05:23:38
Description: LRCLIB fetcher — queries lrclib.net for synced/plain lyrics
"""
"""
Requires complete track metadata (artist, title, album, duration).
"""
from typing import Optional
import httpx
from loguru import logger
from urllib.parse import urlencode
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags
from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
LRCLIB_API_URL,
UA_LRCFETCH,
)
class LrclibFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "lrclib"
def is_available(self, track: TrackMeta) -> bool:
return track.is_complete
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Fetch lyrics from LRCLIB. Requires complete metadata."""
if not track.is_complete:
logger.debug("LRCLIB: skipped — incomplete metadata")
return None
params = {
"track_name": track.title,
"artist_name": track.artist,
"album_name": track.album,
"duration": track.length / 1000.0 if track.length else 0,
}
url = f"{LRCLIB_API_URL}?{urlencode(params)}"
logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
if resp.status_code == 404:
logger.debug(f"LRCLIB: not found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if resp.status_code != 200:
logger.error(f"LRCLIB: API returned {resp.status_code}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
data = resp.json()
# Validate response
if not isinstance(data, dict):
logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
synced = data.get("syncedLyrics")
unsynced = data.get("plainLyrics")
if isinstance(synced, str) and synced.strip():
lyrics = normalize_tags(synced.strip())
logger.info(
f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
)
elif isinstance(unsynced, str) and unsynced.strip():
lyrics = normalize_tags(unsynced.strip())
logger.info(
f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source=self.source_name,
ttl=TTL_UNSYNCED,
)
else:
logger.debug(f"LRCLIB: empty response for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
except httpx.HTTPError as e:
logger.error(f"LRCLIB: HTTP error: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
except Exception as e:
logger.error(f"LRCLIB: unexpected error: {e}")
return None
+168
View File
@@ -0,0 +1,168 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 05:30:50
Description: LRCLIB search fetcher — fuzzy search via lrclib.net /api/search
"""
"""
Used when metadata is incomplete (no album or duration) but title is available.
Selects the best match by duration when track length is known.
"""
import httpx
from typing import Optional
from loguru import logger
from urllib.parse import urlencode
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags
from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
DURATION_TOLERANCE_MS,
LRCLIB_SEARCH_URL,
UA_LRCFETCH,
)
class LrclibSearchFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "lrclib-search"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Search LRCLIB for lyrics. Requires at least a title."""
if not track.title:
logger.debug("LRCLIB-search: skipped — no title")
return None
params: dict[str, str] = {"track_name": track.title}
if track.artist:
params["artist_name"] = track.artist
if track.album:
params["album_name"] = track.album
url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
logger.info(f"LRCLIB-search: searching for {track.display_name()}")
try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
resp = client.get(url, headers={"User-Agent": UA_LRCFETCH})
if resp.status_code != 200:
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
data = resp.json()
if not isinstance(data, list) or len(data) == 0:
logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
logger.debug(f"LRCLIB-search: got {len(data)} candidates")
# Select best match by duration
best = self._select_best(data, track)
if best is None:
logger.debug("LRCLIB-search: no valid candidate found")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
# Extract lyrics
synced = best.get("syncedLyrics")
unsynced = best.get("plainLyrics")
if isinstance(synced, str) and synced.strip():
lyrics = normalize_tags(synced.strip())
logger.info(
f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
)
elif isinstance(unsynced, str) and unsynced.strip():
lyrics = normalize_tags(unsynced.strip())
logger.info(
f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source=self.source_name,
ttl=TTL_UNSYNCED,
)
else:
logger.debug("LRCLIB-search: best candidate has empty lyrics")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
except httpx.HTTPError as e:
logger.error(f"LRCLIB-search: HTTP error: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
except Exception as e:
logger.error(f"LRCLIB-search: unexpected error: {e}")
return None
@staticmethod
def _select_best(candidates: list[dict], track: TrackMeta) -> Optional[dict]:
"""Pick the best candidate, preferring synced lyrics and closest duration."""
if track.length is not None:
track_s = track.length / 1000.0
best: Optional[dict] = None
best_diff = float("inf")
for item in candidates:
if not isinstance(item, dict):
continue
duration = item.get("duration")
if not isinstance(duration, (int, float)):
continue
diff = abs(duration - track_s) * 1000 # compare in ms
if diff > DURATION_TOLERANCE_MS:
continue
# Prefer synced over unsynced at similar duration
has_synced = (
isinstance(item.get("syncedLyrics"), str)
and item["syncedLyrics"].strip()
)
best_synced = (
best is not None
and isinstance(best.get("syncedLyrics"), str)
and best["syncedLyrics"].strip()
)
if diff < best_diff or (
diff == best_diff and has_synced and not best_synced
):
best_diff = diff
best = item
if best is not None:
logger.debug(
f"LRCLIB-search: selected id={best.get('id')} (diff={best_diff:.0f}ms)"
)
return best
logger.debug(
f"LRCLIB-search: no candidate within {DURATION_TOLERANCE_MS}ms"
)
return None
# No duration — pick first with synced lyrics, or just first
for item in candidates:
if (
isinstance(item, dict)
and isinstance(item.get("syncedLyrics"), str)
and item["syncedLyrics"].strip()
):
return item
return candidates[0] if isinstance(candidates[0], dict) else None
+213
View File
@@ -0,0 +1,213 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 11:04:51
Description: Netease Cloud Music fetcher
"""
"""
Uses the public cloudsearch API for searching and the song/lyric API for
retrieving lyrics. No authentication required.
Search results are filtered by duration when the track has a known length
to avoid returning lyrics for the wrong version of a song.
"""
from typing import Optional
import httpx
from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import detect_sync_status, normalize_tags
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
DURATION_TOLERANCE_MS,
NETEASE_SEARCH_URL,
NETEASE_LYRIC_URL,
UA_BROWSER,
)
_HEADERS = {
"User-Agent": UA_BROWSER,
"Referer": "https://music.163.com/",
}
class NeteaseFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "netease"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
def _search(self, track: TrackMeta, limit: int = 10) -> Optional[int]:
"""Search Netease and return the best-matching song ID.
When ``track.length`` is available, candidates are ranked by duration
difference and only accepted if within ``DURATION_TOLERANCE_MS``.
"""
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
return None
logger.debug(f"Netease: searching for '{query}' (limit={limit})")
try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
resp = client.post(
NETEASE_SEARCH_URL,
headers=_HEADERS,
data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
)
resp.raise_for_status()
result = resp.json()
# Validate response
if not isinstance(result, dict):
logger.error(
f"Netease: search returned non-dict: {type(result).__name__}"
)
return None
result_body = result.get("result")
if not isinstance(result_body, dict):
logger.debug("Netease: search 'result' field missing or invalid")
return None
songs = result_body.get("songs")
if not isinstance(songs, list) or len(songs) == 0:
logger.debug("Netease: search returned 0 results")
return None
logger.debug(f"Netease: search returned {len(songs)} candidates")
# Duration-based best-match selection
if track.length is not None:
track_ms = track.length
best_id: Optional[int] = None
best_diff = float("inf")
for song in songs:
if not isinstance(song, dict):
continue
sid = song.get("id")
name = song.get("name", "?")
duration = song.get("dt") # milliseconds
if not isinstance(duration, int):
logger.debug(
f" candidate {sid} '{name}': no duration, skipped"
)
continue
diff = abs(duration - track_ms)
logger.debug(
f" candidate {sid} '{name}': "
f"duration={duration}ms, diff={diff}ms"
)
if diff < best_diff:
best_diff = diff
best_id = sid
if best_id is not None and best_diff <= DURATION_TOLERANCE_MS:
logger.debug(f"Netease: selected id={best_id} (diff={best_diff}ms)")
return best_id
logger.debug(
f"Netease: no candidate within {DURATION_TOLERANCE_MS}ms "
f"(best diff={best_diff}ms)"
)
return None
# No duration info — take the first result
first = songs[0]
if not isinstance(first, dict) or "id" not in first:
logger.error("Netease: first search result has no 'id'")
return None
logger.debug(
f"Netease: no duration available, using first result "
f"id={first['id']} '{first.get('name', '?')}'"
)
return first["id"]
except Exception as e:
logger.error(f"Netease: search failed: {e}")
return None
def _get_lyric(self, song_id: int) -> Optional[LyricResult]:
"""Fetch lyrics for a given Netease song ID."""
logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
resp = client.post(
NETEASE_LYRIC_URL,
headers=_HEADERS,
data={
"id": str(song_id),
"cp": "false",
"tv": "0",
"lv": "0",
"rv": "0",
"kv": "0",
"yv": "0",
"ytv": "0",
"yrv": "0",
},
)
resp.raise_for_status()
data = resp.json()
# Validate response
if not isinstance(data, dict):
logger.error(
f"Netease: lyric response is not dict: {type(data).__name__}"
)
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
lrc_obj = data.get("lrc")
if not isinstance(lrc_obj, dict):
logger.debug(
f"Netease: no 'lrc' object in response for song_id={song_id}"
)
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrc: str = lrc_obj.get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
logger.debug(f"Netease: empty lyrics for song_id={song_id}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
# Determine sync status
lrc = normalize_tags(lrc)
status = detect_sync_status(lrc)
logger.info(
f"Netease: got {status.value} lyrics for song_id={song_id} "
f"({len(lrc.splitlines())} lines)"
)
return LyricResult(
status=status, lyrics=lrc.strip(), source=self.source_name
)
except Exception as e:
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Search for the track and fetch its lyrics."""
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
logger.debug("Netease: skipped — insufficient metadata")
return None
logger.info(f"Netease: fetching lyrics for {track.display_name()}")
song_id = self._search(track)
if not song_id:
logger.debug(f"Netease: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return self._get_lyric(song_id)
+178
View File
@@ -0,0 +1,178 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 01:54:02
Description: QQ Music fetcher via self-hosted API proxy
"""
"""
Requires a running qq-music-api instance.
The base URL is read from the QQ_MUSIC_API_URL environment variable.
Search → pick best match by duration → fetch LRC lyrics.
"""
from typing import Optional
import httpx
from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import detect_sync_status, normalize_tags
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
DURATION_TOLERANCE_MS,
QQ_MUSIC_API_URL,
)
class QQMusicFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "qqmusic"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and bool(QQ_MUSIC_API_URL)
def _search(self, track: TrackMeta, limit: int = 10) -> Optional[str]:
"""Search QQ Music and return the best-matching song MID."""
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
return None
logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
resp = client.get(
f"{QQ_MUSIC_API_URL}/api/search",
params={"keyword": query, "type": "song", "num": limit},
)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
logger.error(f"QQMusic: search API error: {data}")
return None
songs = data.get("data", {}).get("list", [])
if not songs:
logger.debug("QQMusic: search returned 0 results")
return None
logger.debug(f"QQMusic: search returned {len(songs)} candidates")
# Duration-based best-match selection
if track.length is not None:
track_ms = track.length
best_mid: Optional[str] = None
best_diff = float("inf")
for song in songs:
if not isinstance(song, dict):
continue
mid = song.get("mid")
name = song.get("name", "?")
# interval is in seconds
interval = song.get("interval")
if not isinstance(interval, int):
logger.debug(
f" candidate {mid} '{name}': no duration, skipped"
)
continue
duration_ms = interval * 1000
diff = abs(duration_ms - track_ms)
logger.debug(
f" candidate {mid} '{name}': "
f"duration={duration_ms}ms, diff={diff}ms"
)
if diff < best_diff:
best_diff = diff
best_mid = mid
if best_mid is not None and best_diff <= DURATION_TOLERANCE_MS:
logger.debug(
f"QQMusic: selected mid={best_mid} (diff={best_diff}ms)"
)
return best_mid
logger.debug(
f"QQMusic: no candidate within {DURATION_TOLERANCE_MS}ms "
f"(best diff={best_diff}ms)"
)
return None
# No duration info — take the first result
first = songs[0]
if not isinstance(first, dict) or "mid" not in first:
logger.error("QQMusic: first search result has no 'mid'")
return None
logger.debug(
f"QQMusic: no duration available, using first result "
f"mid={first['mid']} '{first.get('name', '?')}'"
)
return first["mid"]
except Exception as e:
logger.error(f"QQMusic: search failed: {e}")
return None
def _get_lyric(self, mid: str) -> Optional[LyricResult]:
"""Fetch lyrics for a given QQ Music song MID."""
logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
resp = client.get(
f"{QQ_MUSIC_API_URL}/api/lyric",
params={"mid": mid},
)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
logger.error(f"QQMusic: lyric API error: {data}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
lrc = data.get("data", {}).get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrc = normalize_tags(lrc)
status = detect_sync_status(lrc)
logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} "
f"({len(lrc.splitlines())} lines)"
)
return LyricResult(
status=status, lyrics=lrc.strip(), source=self.source_name
)
except Exception as e:
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Search for the track and fetch its lyrics."""
if not QQ_MUSIC_API_URL:
logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured")
return None
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
logger.debug("QQMusic: skipped — insufficient metadata")
return None
logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
mid = self._search(track)
if not mid:
logger.debug(f"QQMusic: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
return self._get_lyric(mid)
+373
View File
@@ -0,0 +1,373 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:43:21
Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
"""
"""
Authentication flow:
1. Fetch server time from Spotify
2. Fetch TOTP secret
3. Generate a TOTP code and exchange it (with SP_DC cookie) for an access token
4. Request lyrics using the access token
The secret and token are cached on the instance to avoid redundant network
calls within the same session.
Requires SPOTIFY_SP_DC environment variable to be set.
"""
import httpx
import json
import time
import struct
import hmac
import hashlib
from typing import Optional, Tuple
from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags
from ..config import (
HTTP_TIMEOUT,
SPOTIFY_APP_VERSION,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
SPOTIFY_TOKEN_URL,
SPOTIFY_LYRICS_URL,
SPOTIFY_SERVER_TIME_URL,
SPOTIFY_SECRET_URL,
SPOTIFY_SP_DC,
SPOTIFY_TOKEN_CACHE_FILE,
UA_BROWSER,
)
class SpotifyFetcher(BaseFetcher):
def __init__(self) -> None:
# Session-level caches to avoid refetching within the same run
self._cached_secret: Optional[Tuple[str, int]] = None
self._cached_token: Optional[str] = None
self._token_expires_at: float = 0.0
@property
def source_name(self) -> str:
return "spotify"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and bool(SPOTIFY_SP_DC)
# ─── Auth helpers ────────────────────────────────────────────────
def _get_server_time(self, client: httpx.Client) -> Optional[int]:
"""Fetch Spotify's server timestamp (seconds since epoch)."""
try:
res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, dict) or "serverTime" not in data:
logger.error(f"Spotify: unexpected server-time response: {data}")
return None
server_time = data["serverTime"]
logger.debug(f"Spotify: server time = {server_time}")
return server_time
except Exception as e:
logger.error(f"Spotify: failed to fetch server time: {e}")
return None
def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]:
"""Fetch and decode the TOTP secret. Cached after first success.
Response format: [{version: int, secret: str}, ...]
Each character in *secret* is XOR-decoded with ``(index % 33) + 9``.
"""
if self._cached_secret is not None:
logger.debug("Spotify: using cached TOTP secret")
return self._cached_secret
try:
res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, list) or len(data) == 0:
logger.error(
f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})"
)
return None
last = data[-1]
if "secret" not in last or "version" not in last:
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
return None
secret_raw = last["secret"]
version = last["version"]
# XOR decode
parts = []
for i, char in enumerate(secret_raw):
parts.append(str(ord(char) ^ ((i % 33) + 9)))
secret = "".join(parts)
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
self._cached_secret = (secret, version)
return self._cached_secret
except Exception as e:
logger.error(f"Spotify: failed to fetch secret: {e}")
return None
@staticmethod
def _generate_totp(server_time_s: int, secret: str) -> str:
"""Generate a 6-digit TOTP code compatible with Spotify's auth.
Uses HMAC-SHA1 with a 30-second period, matching the Go reference.
"""
counter = server_time_s // 30
counter_bytes = struct.pack(">Q", counter)
mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
offset = mac[-1] & 0x0F
binary_code = (
(mac[offset] & 0x7F) << 24
| (mac[offset + 1] & 0xFF) << 16
| (mac[offset + 2] & 0xFF) << 8
| (mac[offset + 3] & 0xFF)
)
code = binary_code % (10**6)
return str(code).zfill(6)
def _load_cached_token(self) -> Optional[str]:
"""Try to load a valid token from the persistent cache file."""
try:
with open(SPOTIFY_TOKEN_CACHE_FILE, "r") as f:
data = json.load(f)
expires_ms = data.get("accessTokenExpirationTimestampMs", 0)
if expires_ms <= int(time.time() * 1000):
logger.debug("Spotify: persisted token expired")
return None
token = data.get("accessToken", "")
if not token:
return None
self._cached_token = token
self._token_expires_at = expires_ms / 1000.0
logger.debug("Spotify: loaded token from cache file")
return token
except (FileNotFoundError, json.JSONDecodeError, KeyError):
return None
def _save_token(self, body: dict) -> None:
"""Persist the token response to disk."""
try:
with open(SPOTIFY_TOKEN_CACHE_FILE, "w") as f:
json.dump(body, f)
logger.debug("Spotify: token saved to cache file")
except Exception as e:
logger.warning(f"Spotify: failed to write token cache: {e}")
def _get_token(self) -> Optional[str]:
"""Obtain a Spotify access token. Cached in memory and on disk.
Requires SP_DC cookie (set via SPOTIFY_SP_DC env var).
"""
# 1. Memory cache
if self._cached_token and time.time() < self._token_expires_at - 30:
logger.debug("Spotify: using in-memory cached token")
return self._cached_token
# 2. Disk cache
disk_token = self._load_cached_token()
if disk_token and time.time() < self._token_expires_at - 30:
return disk_token
# 3. Fetch new token
if not SPOTIFY_SP_DC:
logger.error(
"Spotify: SPOTIFY_SP_DC env var not set — "
"cannot authenticate with Spotify"
)
return None
headers = {
"User-Agent": UA_BROWSER,
"Accept": "*/*",
"Referer": "https://open.spotify.com/",
"Cookie": f"sp_dc={SPOTIFY_SP_DC}",
}
with httpx.Client(headers=headers) as client:
server_time = self._get_server_time(client)
if server_time is None:
return None
secret_data = self._get_secret(client)
if secret_data is None:
return None
secret, version = secret_data
totp = self._generate_totp(server_time, secret)
logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
params = {
"reason": "init",
"productType": "web-player",
"totp": totp,
"totpVer": str(version),
"totpServer": totp,
}
try:
res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT)
if res.status_code != 200:
logger.error(f"Spotify: token request returned {res.status_code}")
return None
body = res.json()
if not isinstance(body, dict) or "accessToken" not in body:
logger.error(
f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}"
)
return None
token = body["accessToken"]
is_anonymous = body.get("isAnonymous", False)
if is_anonymous:
logger.warning(
"Spotify: received anonymous token — SP_DC may be invalid"
)
expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
if expires_ms and expires_ms > int(time.time() * 1000):
self._token_expires_at = expires_ms / 1000.0
else:
logger.warning("Spotify: token expiry missing or invalid")
self._token_expires_at = time.time() + 3600
self._cached_token = token
# Persist to disk (including anonymous tokens, same as Go ref)
self._save_token(body)
logger.debug("Spotify: obtained access token")
return token
except Exception as e:
logger.error(f"Spotify: token request failed: {e}")
return None
# ─── Lyrics ──────────────────────────────────────────────────────
@staticmethod
def _format_lrc_line(start_ms: int, words: str) -> str:
"""Format a single lyric line as LRC ``[mm:ss.cc]text``."""
minutes = start_ms // 60000
seconds = (start_ms // 1000) % 60
centiseconds = round((start_ms % 1000) / 10.0)
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
@staticmethod
def _is_truly_synced(lines: list[dict]) -> bool:
"""Check if lyrics are actually synced (not all timestamps zero)."""
for line in lines:
try:
ms = int(line.get("startTimeMs", "0"))
if ms > 0:
return True
except (ValueError, TypeError):
continue
return False
def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Fetch lyrics for a Spotify track by its track ID."""
if not track.trackid:
logger.debug("Spotify: skipped — no trackid in metadata")
return None
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
token = self._get_token()
if not token:
logger.error("Spotify: cannot fetch lyrics without a token")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
url = f"{SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token"
headers = {
"User-Agent": UA_BROWSER,
"Accept": "application/json",
"Authorization": f"Bearer {token}",
"Referer": "https://open.spotify.com/",
"App-Platform": "WebPlayer",
"Spotify-App-Version": SPOTIFY_APP_VERSION,
"Origin": "https://open.spotify.com",
}
try:
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
res = client.get(url, headers=headers)
if res.status_code == 404:
logger.debug(f"Spotify: 404 for trackid={track.trackid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if res.status_code != 200:
logger.error(f"Spotify: lyrics API returned {res.status_code}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
data = res.json()
# Validate response structure
if not isinstance(data, dict) or "lyrics" not in data:
logger.error("Spotify: unexpected lyrics response structure")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
lyrics_data = data["lyrics"]
sync_type = lyrics_data.get("syncType", "")
lines = lyrics_data.get("lines", [])
if not isinstance(lines, list) or len(lines) == 0:
logger.debug("Spotify: response contained no lyric lines")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
# Determine sync status
# syncType == "LINE_SYNCED" AND at least one non-zero timestamp
is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
# Convert to LRC
lrc_lines: list[str] = []
for line in lines:
words = line.get("words", "")
if not isinstance(words, str):
continue
try:
ms = int(line.get("startTimeMs", "0"))
except (ValueError, TypeError):
ms = 0
if is_synced:
lrc_lines.append(self._format_lrc_line(ms, words))
else:
# Unsynced: emit with zero timestamps
lrc_lines.append(f"[00:00.00]{words}")
content = normalize_tags("\n".join(lrc_lines))
status = (
CacheStatus.SUCCESS_SYNCED
if is_synced
else CacheStatus.SUCCESS_UNSYNCED
)
logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)")
return LyricResult(status=status, lyrics=content, source=self.source_name)
except Exception as e:
logger.error(f"Spotify: lyrics fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+178
View File
@@ -0,0 +1,178 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 21:54:01
Description: Shared LRC time-tag utilities (definitely overengineered)
"""
import re
from pathlib import Path
from typing import Optional
from urllib.parse import unquote
from .models import CacheStatus
# Parses any time tag input format:
# [mm:ss], [mm:ss.c], [mm:ss.cc], [mm:ss.ccc], [mm:ss:cc], …
_RAW_TAG_RE = re.compile(r"\[(\d{2,}):(\d{2})(?:[.:](\d{1,3}))?\]")
# Standard format after normalization: [mm:ss.cc]
_STD_TAG_RE = re.compile(r"\[\d{2,}:\d{2}\.\d{2}\]")
# Standard format with capture groups
_STD_TAG_CAPTURE_RE = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2})\]")
# Matches a standard time tag at the start of a line
_LRC_LINE_RE = re.compile(r"^\[\d{2,}:\d{2}\.\d{2}\]", re.MULTILINE)
# [offset:+/-xxx] tag — value in milliseconds
_OFFSET_RE = re.compile(r"^\[offset:\s*([+-]?\d+)\]\s*$", re.MULTILINE | re.IGNORECASE)
def _raw_tag_to_cs(mm: str, ss: str, frac: Optional[str]) -> str:
"""Convert parsed time tag components to standard [mm:ss.cc] string."""
if frac is None:
ms = 0
else:
# cc in [mm:ss:cc] is also treated as centiseconds, per LRC spec
# ^
# why does this format even exist, idk
n = len(frac)
if n == 1:
ms = int(frac) * 100
elif n == 2:
ms = int(frac) * 10
else:
ms = int(frac)
cs = min(round(ms / 10), 99)
return f"[{mm}:{ss}.{cs:02d}]"
def _reformat(text: str) -> str:
"""Parse each line and reformat to standard [mm:ss.cc]...content form.
Handles any mix of time tag formats on input. Lines with no time tags
are stripped of leading/trailing whitespace and passed through unchanged.
"""
out: list[str] = []
for line in text.splitlines():
line = line.strip()
pos = 0
tags: list[str] = []
while True:
while pos < len(line) and line[pos] == " ":
pos += 1
m = _RAW_TAG_RE.match(line, pos)
# Non-time tags are passed through as-is, except for leading/trailing whitespace which is stripped.
if not m:
# No more tags on this line
break
tags.append(_raw_tag_to_cs(m.group(1), m.group(2), m.group(3)))
pos = m.end()
if tags:
# This could break lyric lines of some kind of word-synced LRC format,
# but such format were not planned to be supported in the first place, so…
out.append("".join(tags) + line[pos:].lstrip())
else:
out.append(line)
# Empty lines with no tags are also preserved
return "\n".join(out)
def _apply_offset(text: str) -> str:
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
"""
m = _OFFSET_RE.search(text)
if not m:
return text
offset_ms = int(m.group(1))
text = _OFFSET_RE.sub("", text).strip("\n")
if offset_ms == 0:
return text
def _shift(match: re.Match) -> str:
total_ms = max(
0,
(int(match.group(1)) * 60 + int(match.group(2))) * 1000
+ int(match.group(3)) * 10
- offset_ms,
)
new_mm = total_ms // 60000
new_ss = (total_ms % 60000) // 1000
new_cs = min(round((total_ms % 1000) / 10), 99)
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
return _STD_TAG_CAPTURE_RE.sub(_shift, text)
def normalize_tags(text: str) -> str:
"""Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset."""
return _apply_offset(_reformat(text))
def is_synced(text: str) -> bool:
"""Check whether text contains non-zero LRC time tags.
Assumes text has been normalized by normalize_tags (standard [mm:ss.cc] format).
"""
tags = _STD_TAG_RE.findall(text)
return bool(tags) and any(tag != "[00:00.00]" for tag in tags)
def detect_sync_status(text: str) -> CacheStatus:
"""Determine whether lyrics contain meaningful LRC time tags.
Assumes text has been normalized by normalize_tags.
"""
return (
CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED
)
def normalize_unsynced(lyrics: str) -> str:
"""Normalize unsynced lyrics so every line has a [00:00.00] tag.
- Lines that already have time tags: replace with [00:00.00]
- Lines without time tags: prepend [00:00.00]
- Blank lines are converted to [00:00.00]
"""
out: list[str] = []
for line in lyrics.splitlines():
stripped = line.strip()
if not stripped:
out.append("[00:00.00]")
continue
cleaned = _LRC_LINE_RE.sub("", stripped)
while _LRC_LINE_RE.match(cleaned):
cleaned = _LRC_LINE_RE.sub("", cleaned)
out.append(f"[00:00.00]{cleaned}")
return "\n".join(out)
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
"""Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist."""
if not audio_url.startswith("file://"):
return None
file_path = unquote(audio_url.replace("file://", "", 1))
path = Path(file_path)
if ensure_exists and not path.exists():
return None
return path
def get_sidecar_path(
audio_url: str, ensure_audio_exists: bool = False, ensure_exists: bool = False
) -> Optional[Path]:
"""Given a file:// URL, return the corresponding .lrc sidecar path.
If ensure_audio_exists is True, return None if the audio file does not exist.
If ensure_exists is True, return None if the .lrc file does not exist.
"""
audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists)
if not audio_path:
return None
lrc_path = audio_path.with_suffix(".lrc")
if ensure_exists and not lrc_path.exists():
return None
return lrc_path
+59
View File
@@ -0,0 +1,59 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 04:09:36
Description: Data models
"""
from enum import Enum
from typing import Optional
from dataclasses import dataclass
class CacheStatus(str, Enum):
"""Status of a cached lyric entry."""
SUCCESS_SYNCED = "SUCCESS_SYNCED"
SUCCESS_UNSYNCED = "SUCCESS_UNSYNCED"
NOT_FOUND = "NOT_FOUND"
NETWORK_ERROR = "NETWORK_ERROR"
@dataclass
class TrackMeta:
"""Metadata describing a track obtained from MPRIS or manual input."""
trackid: Optional[str] = None # Spotify track ID (without "spotify:track:" prefix)
length: Optional[int] = None # Duration in milliseconds
album: Optional[str] = None
artist: Optional[str] = None
title: Optional[str] = None
url: Optional[str] = None # Playback URL (file:// for local files)
@property
def is_local(self) -> bool:
"""True when the track is a local file (file:// URL)."""
return bool(self.url and self.url.startswith("file://"))
@property
def is_complete(self) -> bool:
"""True when all fields required by LRCLIB are present."""
return all([self.length, self.album, self.title, self.artist])
def display_name(self) -> str:
"""Human-readable representation for logging."""
parts = []
if self.artist:
parts.append(self.artist)
if self.title:
parts.append(self.title)
return " - ".join(parts) if parts else self.trackid or self.url or "(unknown)"
@dataclass
class LyricResult:
"""Result of a lyric fetch attempt, also used as cache record."""
status: CacheStatus
lyrics: Optional[str] = None
source: Optional[str] = None # Which fetcher produced this result
ttl: Optional[int] = None # Hint for cache TTL (seconds)
+188
View File
@@ -0,0 +1,188 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 04:44:15
Description: MPRIS integration for fetching track metadata
"""
import asyncio
from dbus_next.aio.message_bus import MessageBus
from dbus_next.constants import BusType
from dbus_next.message import Message
from lrx.models import TrackMeta
from lrx.config import PREFERRED_PLAYER
from loguru import logger
from typing import Optional, List, Any
async def _list_mpris_players(bus: MessageBus) -> List[str]:
"""List all MPRIS player bus names."""
try:
reply = await bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="ListNames",
)
)
if not reply or not reply.body:
return []
return [
name for name in reply.body[0] if name.startswith("org.mpris.MediaPlayer2.")
]
except Exception as e:
logger.error(f"Failed to list DBus names: {e}")
return []
async def _get_playback_status(bus: MessageBus, player_name: str) -> Optional[str]:
"""Get PlaybackStatus ('Playing', 'Paused', 'Stopped') for a player."""
try:
introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
proxy = bus.get_proxy_object(
player_name, "/org/mpris/MediaPlayer2", introspection
)
props = proxy.get_interface("org.freedesktop.DBus.Properties")
status_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "PlaybackStatus"
)
return status_var.value if status_var else None
except Exception as e:
logger.debug(f"Could not get playback status for {player_name}: {e}")
return None
async def _select_player(
bus: MessageBus, specific_player: Optional[str] = None
) -> Optional[str]:
"""Select the best MPRIS player.
When specific_player is given, filter by name match.
Otherwise: prefer the currently playing player. If multiple are playing,
prefer the one matching LRCFETCH_PLAYER env var (default: spotify).
"""
players = await _list_mpris_players(bus)
if not players:
return None
if specific_player:
players = [p for p in players if specific_player.lower() in p.lower()]
return players[0] if players else None
# Check playback status for each player
playing = []
for p in players:
status = await _get_playback_status(bus, p)
logger.debug(f"Player {p}: {status}")
if status == "Playing":
playing.append(p)
candidates = playing if playing else players
if len(candidates) == 1:
return candidates[0]
# Multiple candidates: prefer LRCFETCH_PLAYER
preferred = PREFERRED_PLAYER.lower()
if preferred:
for p in candidates:
if preferred in p.lower():
return p
return candidates[0]
async def _fetch_metadata_dbus(
specific_player: Optional[str] = None,
) -> Optional[TrackMeta]:
bus = None
try:
bus = await MessageBus(bus_type=BusType.SESSION).connect()
except Exception as e:
logger.error(f"Failed to connect to DBus: {e}")
return None
try:
player_name = await _select_player(bus, specific_player)
if not player_name:
logger.debug(
f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}."
)
return None
logger.debug(f"Using player: {player_name}")
introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
proxy = bus.get_proxy_object(
player_name, "/org/mpris/MediaPlayer2", introspection
)
props_iface = proxy.get_interface("org.freedesktop.DBus.Properties")
if not props_iface:
logger.error(f"Player {player_name} doesn't support Properties interface.")
return None
try:
metadata_var: Any = await getattr(props_iface, "call_get")(
"org.mpris.MediaPlayer2.Player", "Metadata"
)
if not metadata_var:
logger.error("Empty metadata received.")
return None
metadata = metadata_var.value
# Extract trackid — MPRIS returns either "spotify:track:ID"
# or a DBus object path like "/com/spotify/track/ID"
trackid = metadata.get("mpris:trackid", None)
if trackid:
trackid = trackid.value
if isinstance(trackid, str):
if trackid.startswith("spotify:track:"):
trackid = trackid.removeprefix("spotify:track:")
elif trackid.startswith("/com/spotify/track/"):
trackid = trackid.removeprefix("/com/spotify/track/")
# Extract length (usually microseconds)
length = metadata.get("mpris:length", None)
if length:
length = length.value // 1000 if isinstance(length.value, int) else None
album = metadata.get("xesam:album", None)
album = album.value if album else None
artist = metadata.get("xesam:artist", None)
artist = (
artist.value[0]
if artist and isinstance(artist.value, list) and artist.value
else None
)
title = metadata.get("xesam:title", None)
title = title.value if title else None
url = metadata.get("xesam:url", None)
url = url.value if url else None
return TrackMeta(
trackid=trackid,
length=length,
album=album,
artist=artist,
title=title,
url=url,
)
except Exception as e:
logger.error(f"Failed to get properties from {player_name}: {e}")
return None
finally:
if bus:
bus.disconnect()
def get_current_track(player_name: Optional[str] = None) -> Optional[TrackMeta]:
try:
return asyncio.run(_fetch_metadata_dbus(player_name))
except Exception as e:
logger.error(f"DBus async loop failed: {e}")
return None
+1 -1
View File
@@ -1,4 +1,4 @@
from lrcfetch.cli import run from lrx.cli import run
if __name__ == "__main__": if __name__ == "__main__":
run() run()
+2 -2
View File
@@ -3,7 +3,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
[project] [project]
name = "lrcfetch" name = "lrx-cli"
version = "0.1.7" version = "0.1.7"
description = "Fetch line-synced lyrics for your music player." description = "Fetch line-synced lyrics for your music player."
readme = "README.md" readme = "README.md"
@@ -19,7 +19,7 @@ dependencies = [
] ]
[project.scripts] [project.scripts]
lrcfetch = "lrcfetch.cli:run" lrx = "lrx.cli:run"
[tool.ruff.lint] [tool.ruff.lint]
ignore = ["E402"] ignore = ["E402"]