finish renaming
This commit is contained in:
@@ -0,0 +1,4 @@
|
||||
from lrx_cli.cli import run
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
@@ -0,0 +1,441 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 10:18:03
|
||||
Description: SQLite-based lyric cache with per-source storage and TTL expiration
|
||||
"""
|
||||
|
||||
import re
|
||||
import sqlite3
|
||||
import hashlib
|
||||
import time
|
||||
import unicodedata
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
|
||||
from .config import DB_PATH, DURATION_TOLERANCE_MS
|
||||
from .models import TrackMeta, LyricResult, CacheStatus
|
||||
|
||||
# Punctuation to strip for fuzzy matching (ASCII + fullwidth + CJK brackets/symbols)
|
||||
_PUNCT_RE = re.compile(
|
||||
r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`"
|
||||
r"~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`"
|
||||
r"「」『』《》〈〉〔〕·•‥…—–]"
|
||||
)
|
||||
_SPACE_RE = re.compile(r"\s+")
|
||||
# feat./ft./featuring and everything after (case-insensitive, word boundary)
|
||||
_FEAT_RE = re.compile(r"\s*(?:\bfeat\.?\b|\bft\.?\b|\bfeaturing\b).*", re.IGNORECASE)
|
||||
# Multi-artist separators: /, &, ×, x (surrounded by spaces), ;, 、, vs.
|
||||
_ARTIST_SEP_RE = re.compile(r"\s*(?:[/&;×、]|\bvs\.?\b|\bx\b)\s*", re.IGNORECASE)
|
||||
|
||||
|
||||
def _normalize_for_match(s: str) -> str:
|
||||
"""Normalize a string for fuzzy comparison.
|
||||
|
||||
Lowercases, NFKC-normalizes (fullwidth → halfwidth), strips punctuation,
|
||||
and collapses whitespace.
|
||||
"""
|
||||
s = unicodedata.normalize("NFKC", s).lower()
|
||||
s = _FEAT_RE.sub("", s)
|
||||
s = _PUNCT_RE.sub(" ", s)
|
||||
s = _SPACE_RE.sub(" ", s).strip()
|
||||
return s
|
||||
|
||||
|
||||
def _normalize_artist(s: str) -> str:
|
||||
"""Normalize an artist string: split by separators, normalize each, sort.
|
||||
|
||||
Splits first (on /, &, ;, ×, 、, vs., x), then strips feat./ft./featuring
|
||||
from each part individually, so 'A feat. C / B' → ['a', 'b'] not just ['a'].
|
||||
"""
|
||||
s = unicodedata.normalize("NFKC", s).lower()
|
||||
parts = _ARTIST_SEP_RE.split(s)
|
||||
normed = sorted(
|
||||
{_normalize_for_match(p) for p in parts if _FEAT_RE.sub("", p).strip()}
|
||||
)
|
||||
return "\0".join(normed) if normed else _normalize_for_match(s)
|
||||
|
||||
|
||||
def _generate_key(track: TrackMeta, source: str) -> str:
|
||||
"""Generate a unique cache key from track metadata and source.
|
||||
|
||||
The key is scoped by source so that different fetchers can cache
|
||||
independently for the same track (e.g. Spotify synced vs Netease unsynced).
|
||||
"""
|
||||
# Spotify tracks always use their track ID as the primary identifier
|
||||
if track.trackid and source == "spotify":
|
||||
return f"spotify:{track.trackid}"
|
||||
|
||||
parts = []
|
||||
if track.artist:
|
||||
parts.append(track.artist)
|
||||
if track.title:
|
||||
parts.append(track.title)
|
||||
if track.album:
|
||||
parts.append(track.album)
|
||||
if track.length:
|
||||
parts.append(str(track.length))
|
||||
|
||||
# Fall back to URL for local files
|
||||
if not parts and track.url:
|
||||
return f"{source}:url:{track.url}"
|
||||
|
||||
if not parts:
|
||||
raise ValueError("Insufficient metadata to generate cache key")
|
||||
|
||||
raw = "|".join(parts)
|
||||
digest = hashlib.sha256(raw.encode()).hexdigest()
|
||||
return f"{source}:{digest}"
|
||||
|
||||
|
||||
class CacheEngine:
|
||||
def __init__(self, db_path: str = DB_PATH):
|
||||
self.db_path = db_path
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self) -> None:
|
||||
"""Create or migrate the cache table."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS cache (
|
||||
key TEXT PRIMARY KEY,
|
||||
source TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
lyrics TEXT,
|
||||
created_at INTEGER NOT NULL,
|
||||
expires_at INTEGER,
|
||||
artist TEXT,
|
||||
title TEXT,
|
||||
album TEXT,
|
||||
length INTEGER
|
||||
)
|
||||
""")
|
||||
# Migration: add length column if missing
|
||||
cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
|
||||
if "length" not in cols:
|
||||
conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER")
|
||||
conn.commit()
|
||||
|
||||
# Read
|
||||
|
||||
def get(self, track: TrackMeta, source: str) -> Optional[LyricResult]:
|
||||
"""Look up a cached result for *track* from *source*.
|
||||
|
||||
Returns None on cache miss or expiration.
|
||||
"""
|
||||
try:
|
||||
key = _generate_key(track, source)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
row = conn.execute(
|
||||
"SELECT status, lyrics, source, expires_at, length FROM cache WHERE key = ?",
|
||||
(key,),
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
logger.debug(f"Cache miss: {source} / {track.display_name()}")
|
||||
return None
|
||||
|
||||
status_str, lyrics, src, expires_at, cached_length = row
|
||||
|
||||
# Check TTL expiration
|
||||
if expires_at and expires_at < int(time.time()):
|
||||
logger.debug(f"Cache expired: {source} / {track.display_name()}")
|
||||
conn.execute("DELETE FROM cache WHERE key = ?", (key,))
|
||||
conn.commit()
|
||||
return None
|
||||
|
||||
# Backfill length if the cached row is missing it
|
||||
if cached_length is None and track.length is not None:
|
||||
conn.execute(
|
||||
"UPDATE cache SET length = ? WHERE key = ?",
|
||||
(track.length, key),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
remaining = expires_at - int(time.time()) if expires_at else None
|
||||
logger.debug(
|
||||
f"Cache hit: {source} / {track.display_name()} "
|
||||
f"[{status_str}, ttl={remaining}s]"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus(status_str),
|
||||
lyrics=lyrics,
|
||||
source=src,
|
||||
ttl=remaining,
|
||||
)
|
||||
|
||||
def get_best(self, track: TrackMeta, sources: list[str]) -> Optional[LyricResult]:
|
||||
"""Return the best cached result across *sources* (synced > unsynced).
|
||||
|
||||
Skips negative statuses (NOT_FOUND, NETWORK_ERROR) — those are only
|
||||
consulted per-source to avoid redundant fetches.
|
||||
"""
|
||||
best: Optional[LyricResult] = None
|
||||
for src in sources:
|
||||
cached = self.get(track, src)
|
||||
if not cached:
|
||||
continue
|
||||
if cached.status == CacheStatus.SUCCESS_SYNCED:
|
||||
return cached # Can't do better
|
||||
if cached.status == CacheStatus.SUCCESS_UNSYNCED and best is None:
|
||||
best = cached
|
||||
return best
|
||||
|
||||
# Write
|
||||
|
||||
def set(
|
||||
self,
|
||||
track: TrackMeta,
|
||||
source: str,
|
||||
result: LyricResult,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
) -> None:
|
||||
"""Store a lyric result in the cache."""
|
||||
try:
|
||||
key = _generate_key(track, source)
|
||||
except ValueError:
|
||||
logger.warning("Cannot cache: insufficient track metadata.")
|
||||
return
|
||||
|
||||
now = int(time.time())
|
||||
expires_at = now + ttl_seconds if ttl_seconds else None
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO cache
|
||||
(key, source, status, lyrics, created_at, expires_at,
|
||||
artist, title, album, length)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
key,
|
||||
source,
|
||||
result.status.value,
|
||||
result.lyrics,
|
||||
now,
|
||||
expires_at,
|
||||
track.artist,
|
||||
track.title,
|
||||
track.album,
|
||||
track.length,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
logger.debug(
|
||||
f"Cached: {source} / {track.display_name()} "
|
||||
f"[{result.status.value}, ttl={ttl_seconds}s]"
|
||||
)
|
||||
|
||||
# Delete
|
||||
|
||||
def clear_all(self) -> None:
|
||||
"""Remove every entry from the cache."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("DELETE FROM cache")
|
||||
conn.commit()
|
||||
logger.info("Cache cleared.")
|
||||
|
||||
def clear_track(self, track: TrackMeta) -> None:
|
||||
"""Remove all cached entries (every source) for a single track."""
|
||||
conditions, params = self._track_where(track)
|
||||
if not conditions:
|
||||
logger.info(f"No cache entries found for {track.display_name()}.")
|
||||
return
|
||||
where = " AND ".join(conditions)
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cur = conn.execute(f"DELETE FROM cache WHERE {where}", params)
|
||||
conn.commit()
|
||||
if cur.rowcount:
|
||||
logger.info(
|
||||
f"Cleared {cur.rowcount} cache entries for {track.display_name()}."
|
||||
)
|
||||
else:
|
||||
logger.info(f"No cache entries found for {track.display_name()}.")
|
||||
|
||||
def prune(self) -> int:
|
||||
"""Remove all expired entries. Returns the number of rows deleted."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cur = conn.execute(
|
||||
"DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
|
||||
(int(time.time()),),
|
||||
)
|
||||
conn.commit()
|
||||
count = cur.rowcount
|
||||
logger.info(f"Pruned {count} expired cache entries.")
|
||||
return count
|
||||
|
||||
@staticmethod
|
||||
def _track_where(track: TrackMeta) -> tuple[list[str], list[str]]:
|
||||
"""Build WHERE conditions to match a track across all sources."""
|
||||
conditions: list[str] = []
|
||||
params: list[str] = []
|
||||
if track.artist:
|
||||
conditions.append("artist = ?")
|
||||
params.append(track.artist)
|
||||
if track.title:
|
||||
conditions.append("title = ?")
|
||||
params.append(track.title)
|
||||
if track.album:
|
||||
conditions.append("album = ?")
|
||||
params.append(track.album)
|
||||
return conditions, params
|
||||
|
||||
# Exact cross-source search
|
||||
|
||||
def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]:
|
||||
"""Find the best positive (synced/unsynced) cache entry for *track*.
|
||||
|
||||
Uses exact metadata match (artist + title + album) across all sources.
|
||||
Returns synced if available, otherwise unsynced, or None.
|
||||
"""
|
||||
conditions, params = self._track_where(track)
|
||||
if not conditions:
|
||||
return None
|
||||
|
||||
now = int(time.time())
|
||||
conditions.append("status IN (?, ?)")
|
||||
params.extend(
|
||||
[CacheStatus.SUCCESS_SYNCED.value, CacheStatus.SUCCESS_UNSYNCED.value]
|
||||
)
|
||||
conditions.append("(expires_at IS NULL OR expires_at > ?)")
|
||||
params.append(str(now))
|
||||
|
||||
where = " AND ".join(conditions)
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute(
|
||||
f"SELECT status, lyrics, source FROM cache WHERE {where} "
|
||||
"ORDER BY CASE status WHEN ? THEN 0 ELSE 1 END LIMIT 1",
|
||||
params + [CacheStatus.SUCCESS_SYNCED.value],
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
return None
|
||||
|
||||
row = dict(rows[0])
|
||||
return LyricResult(
|
||||
status=CacheStatus(row["status"]),
|
||||
lyrics=row["lyrics"],
|
||||
source="cache-search",
|
||||
)
|
||||
|
||||
# Fuzzy search
|
||||
|
||||
def search_by_meta(
|
||||
self,
|
||||
artist: Optional[str],
|
||||
title: Optional[str],
|
||||
length: Optional[int] = None,
|
||||
) -> list[dict]:
|
||||
"""Search cache for lyrics matching artist/title with fuzzy normalization.
|
||||
|
||||
Ignores album and source. Only returns positive results (synced/unsynced)
|
||||
that have not expired. When *length* is provided, filters by duration
|
||||
tolerance and sorts by closest match.
|
||||
"""
|
||||
if not title:
|
||||
return []
|
||||
|
||||
now = int(time.time())
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute(
|
||||
"""SELECT * FROM cache
|
||||
WHERE status IN (?, ?)
|
||||
AND (expires_at IS NULL OR expires_at > ?)""",
|
||||
(
|
||||
CacheStatus.SUCCESS_SYNCED.value,
|
||||
CacheStatus.SUCCESS_UNSYNCED.value,
|
||||
now,
|
||||
),
|
||||
).fetchall()
|
||||
|
||||
norm_title = _normalize_for_match(title)
|
||||
norm_artist = _normalize_artist(artist) if artist else None
|
||||
|
||||
matches: list[dict] = []
|
||||
for row in rows:
|
||||
row_dict = dict(row)
|
||||
# Title must match
|
||||
row_title = row_dict.get("title") or ""
|
||||
if _normalize_for_match(row_title) != norm_title:
|
||||
continue
|
||||
# Artist must match if provided
|
||||
if norm_artist:
|
||||
row_artist = row_dict.get("artist") or ""
|
||||
if _normalize_artist(row_artist) != norm_artist:
|
||||
continue
|
||||
matches.append(row_dict)
|
||||
|
||||
# Duration filtering
|
||||
if length is not None and matches:
|
||||
scored = []
|
||||
for m in matches:
|
||||
row_len = m.get("length")
|
||||
if row_len is not None:
|
||||
diff = abs(row_len - length)
|
||||
if diff <= DURATION_TOLERANCE_MS:
|
||||
scored.append((diff, m))
|
||||
else:
|
||||
# No duration info in cache — still a candidate but lower priority
|
||||
scored.append((DURATION_TOLERANCE_MS, m))
|
||||
scored.sort(
|
||||
key=lambda x: (
|
||||
x[0],
|
||||
x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value,
|
||||
)
|
||||
)
|
||||
matches = [m for _, m in scored]
|
||||
|
||||
return matches
|
||||
|
||||
# Query / inspect
|
||||
|
||||
def query_track(self, track: TrackMeta) -> list[dict]:
|
||||
"""Return all cached rows for a given track (across all sources)."""
|
||||
conditions, params = self._track_where(track)
|
||||
if not conditions:
|
||||
return []
|
||||
where = " AND ".join(conditions)
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
return [
|
||||
dict(r)
|
||||
for r in conn.execute(
|
||||
f"SELECT * FROM cache WHERE {where}", params
|
||||
).fetchall()
|
||||
]
|
||||
|
||||
def query_all(self) -> list[dict]:
|
||||
"""Return every row in the cache table."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
|
||||
|
||||
def stats(self) -> dict:
|
||||
"""Return aggregate cache statistics."""
|
||||
now = int(time.time())
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
|
||||
expired = conn.execute(
|
||||
"SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
|
||||
(now,),
|
||||
).fetchone()[0]
|
||||
by_status = dict(
|
||||
conn.execute(
|
||||
"SELECT status, COUNT(*) FROM cache GROUP BY status"
|
||||
).fetchall()
|
||||
)
|
||||
by_source = dict(
|
||||
conn.execute(
|
||||
"SELECT source, COUNT(*) FROM cache GROUP BY source"
|
||||
).fetchall()
|
||||
)
|
||||
return {
|
||||
"total": total,
|
||||
"expired": expired,
|
||||
"active": total - expired,
|
||||
"by_status": by_status,
|
||||
"by_source": by_source,
|
||||
}
|
||||
+426
@@ -0,0 +1,426 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-26 02:04:39
|
||||
Description: CLI interface
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
from urllib.parse import quote
|
||||
import cyclopts
|
||||
from loguru import logger
|
||||
|
||||
from .config import enable_debug
|
||||
from .models import TrackMeta, CacheStatus
|
||||
from .mpris import get_current_track
|
||||
from .core import LrcManager
|
||||
from .fetchers import FetcherMethodType
|
||||
from .lrc import get_sidecar_path
|
||||
|
||||
|
||||
app = cyclopts.App(
|
||||
help="LRX-CLI — Fetch line-synced lyrics for your music player.",
|
||||
)
|
||||
app.register_install_completion_command()
|
||||
|
||||
cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.")
|
||||
app.command(cache_app)
|
||||
|
||||
manager = LrcManager()
|
||||
|
||||
# Global state set by the meta launcher
|
||||
_player: str | None = None
|
||||
|
||||
|
||||
@app.meta.default
|
||||
def launcher(
|
||||
*tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
|
||||
debug: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(
|
||||
name=["--debug", "-d"], negative="", help="Enable debug logging."
|
||||
),
|
||||
] = False,
|
||||
player: Annotated[
|
||||
str | None,
|
||||
cyclopts.Parameter(
|
||||
name=["--player", "-p"],
|
||||
help="Target a specific MPRIS player using its DBus name or a portion thereof.",
|
||||
),
|
||||
] = None,
|
||||
):
|
||||
global _player
|
||||
if debug:
|
||||
enable_debug()
|
||||
_player = player
|
||||
app(tokens)
|
||||
|
||||
|
||||
# fetch
|
||||
|
||||
|
||||
@app.command
|
||||
def fetch(
|
||||
*,
|
||||
method: Annotated[
|
||||
FetcherMethodType | None,
|
||||
cyclopts.Parameter(help="Force a specific source."),
|
||||
] = None,
|
||||
no_cache: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(
|
||||
name="--no-cache", negative="", help="Bypass the cache for this request."
|
||||
),
|
||||
] = False,
|
||||
only_synced: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(
|
||||
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
|
||||
),
|
||||
] = False,
|
||||
):
|
||||
"""Fetch and print lyrics for the currently playing track."""
|
||||
track = get_current_track(_player)
|
||||
|
||||
if not track:
|
||||
logger.error("No active playing track found.")
|
||||
sys.exit(1)
|
||||
|
||||
logger.info(f"Track: {track.display_name()}")
|
||||
|
||||
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
|
||||
|
||||
if not result or not result.lyrics:
|
||||
logger.error("No lyrics found.")
|
||||
sys.exit(1)
|
||||
|
||||
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
|
||||
logger.error("Only unsynced lyrics available (--only-synced requested).")
|
||||
sys.exit(1)
|
||||
|
||||
print(result.lyrics)
|
||||
|
||||
|
||||
# search
|
||||
|
||||
|
||||
@app.command
|
||||
def search(
|
||||
*,
|
||||
title: Annotated[
|
||||
str | None, cyclopts.Parameter(name=["--title", "-t"], help="Track title.")
|
||||
] = None,
|
||||
artist: Annotated[
|
||||
str | None, cyclopts.Parameter(name=["--artist", "-a"], help="Artist name.")
|
||||
] = None,
|
||||
album: Annotated[str | None, cyclopts.Parameter(help="Album name.")] = None,
|
||||
trackid: Annotated[str | None, cyclopts.Parameter(help="Spotify track ID.")] = None,
|
||||
length: Annotated[
|
||||
int | None,
|
||||
cyclopts.Parameter(
|
||||
name=["--length", "-l"], help="Track duration in milliseconds."
|
||||
),
|
||||
] = None,
|
||||
url: Annotated[
|
||||
str | None,
|
||||
cyclopts.Parameter(
|
||||
help="Local file URL (file:///...). Mutually exclusive with --path."
|
||||
),
|
||||
] = None,
|
||||
path: Annotated[
|
||||
str | None,
|
||||
cyclopts.Parameter(
|
||||
name=["--path"],
|
||||
help="Local audio file path. Mutually exclusive with --url.",
|
||||
),
|
||||
] = None,
|
||||
method: Annotated[
|
||||
FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
|
||||
] = None,
|
||||
no_cache: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(
|
||||
name="--no-cache", negative="", help="Bypass the cache for this request."
|
||||
),
|
||||
] = False,
|
||||
only_synced: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(
|
||||
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
|
||||
),
|
||||
] = False,
|
||||
):
|
||||
"""Search for lyrics by metadata (bypasses MPRIS)."""
|
||||
if url and path:
|
||||
logger.error("--url and --path are mutually exclusive.")
|
||||
sys.exit(1)
|
||||
|
||||
if path:
|
||||
resolved = str(Path(path).resolve())
|
||||
url = "file://" + quote(resolved, safe="/")
|
||||
|
||||
track = TrackMeta(
|
||||
title=title,
|
||||
artist=artist,
|
||||
album=album,
|
||||
trackid=trackid,
|
||||
length=length,
|
||||
url=url,
|
||||
)
|
||||
|
||||
logger.info(f"Track: {track.display_name()}")
|
||||
|
||||
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
|
||||
|
||||
if not result or not result.lyrics:
|
||||
logger.error("No lyrics found.")
|
||||
sys.exit(1)
|
||||
|
||||
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
|
||||
logger.error("Only unsynced lyrics available (--only-synced requested).")
|
||||
sys.exit(1)
|
||||
|
||||
print(result.lyrics)
|
||||
|
||||
|
||||
# export
|
||||
|
||||
|
||||
@app.command
|
||||
def export(
|
||||
*,
|
||||
output: Annotated[
|
||||
str | None,
|
||||
cyclopts.Parameter(
|
||||
name=["--output", "-o"],
|
||||
help="Output file path (default: same directory as audio file with .lrc extension, or current directory if not available).",
|
||||
),
|
||||
] = None,
|
||||
method: Annotated[
|
||||
FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
|
||||
] = None,
|
||||
no_cache: Annotated[
|
||||
bool, cyclopts.Parameter(name="--no-cache", negative="", help="Bypass cache.")
|
||||
] = False,
|
||||
overwrite: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(
|
||||
name=["--overwrite", "-f"], negative="", help="Overwrite existing file."
|
||||
),
|
||||
] = False,
|
||||
):
|
||||
"""Export lyrics of the current track to a .lrc file."""
|
||||
track = get_current_track(_player)
|
||||
if not track:
|
||||
logger.error("No active playing track found.")
|
||||
sys.exit(1)
|
||||
|
||||
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
|
||||
if not result or not result.lyrics:
|
||||
logger.error("No lyrics available to export.")
|
||||
sys.exit(1)
|
||||
|
||||
# Build default output path
|
||||
if not output:
|
||||
if track.url:
|
||||
lrc_path = get_sidecar_path(track.url, ensure_exists=False)
|
||||
if lrc_path:
|
||||
output = str(lrc_path)
|
||||
logger.info(f"Exporting to sidecar path: {output}")
|
||||
|
||||
# Fallback to current directory with sanitized filename
|
||||
if not output:
|
||||
filename = (
|
||||
f"{track.artist} - {track.title}.lrc"
|
||||
if track.artist and track.title
|
||||
else "lyrics.lrc"
|
||||
)
|
||||
# Sanitize filename
|
||||
filename = "".join(
|
||||
c for c in filename if c.isalpha() or c.isdigit() or c in " -_."
|
||||
).rstrip()
|
||||
output = os.path.join(os.getcwd(), filename)
|
||||
|
||||
if os.path.exists(output) and not overwrite:
|
||||
logger.error(f"File exists: {output} (use -f to overwrite)")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
with open(output, "w", encoding="utf-8") as f:
|
||||
f.write(result.lyrics)
|
||||
logger.info(f"Exported lyrics to {output}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write file: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# cache subcommands
|
||||
|
||||
|
||||
@cache_app.command
|
||||
def query(
|
||||
*,
|
||||
all: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(name="--all", negative="", help="Dump all cache entries."),
|
||||
] = False,
|
||||
):
|
||||
"""Show cached entries for the current track."""
|
||||
if all:
|
||||
rows = manager.cache.query_all()
|
||||
if not rows:
|
||||
print("Cache is empty.")
|
||||
return
|
||||
for row in rows:
|
||||
_print_cache_row(row)
|
||||
print()
|
||||
return
|
||||
|
||||
track = get_current_track(_player)
|
||||
if not track:
|
||||
logger.error("No active playing track found.")
|
||||
sys.exit(1)
|
||||
_print_track_cache(track)
|
||||
|
||||
|
||||
@cache_app.command
|
||||
def clear(
|
||||
*,
|
||||
all: Annotated[
|
||||
bool,
|
||||
cyclopts.Parameter(name="--all", negative="", help="Clear the entire cache."),
|
||||
] = False,
|
||||
):
|
||||
"""Clear cached entries for the current track."""
|
||||
if all:
|
||||
manager.cache.clear_all()
|
||||
return
|
||||
|
||||
track = get_current_track(_player)
|
||||
if not track:
|
||||
logger.error("No active playing track found.")
|
||||
sys.exit(1)
|
||||
manager.cache.clear_track(track)
|
||||
|
||||
|
||||
@cache_app.command
|
||||
def prune():
|
||||
"""Remove expired cache entries."""
|
||||
manager.cache.prune()
|
||||
|
||||
|
||||
@cache_app.command
|
||||
def stats():
|
||||
"""Show cache statistics."""
|
||||
s = manager.cache.stats()
|
||||
print("=== Cache Statistics ===")
|
||||
print(f"Total entries : {s['total']}")
|
||||
print(f"Active : {s['active']}")
|
||||
print(f"Expired : {s['expired']}")
|
||||
if s["by_status"]:
|
||||
print("\nBy status:")
|
||||
for status, count in s["by_status"].items():
|
||||
print(f" {status}: {count}")
|
||||
if s["by_source"]:
|
||||
print("\nBy source:")
|
||||
for source, count in s["by_source"].items():
|
||||
print(f" {source}: {count}")
|
||||
|
||||
|
||||
@cache_app.command
|
||||
def insert(
|
||||
*,
|
||||
path: Annotated[
|
||||
str | None,
|
||||
cyclopts.Parameter(
|
||||
name=["--path"],
|
||||
help="Path to a local .lrc file to insert instead of reading from stdin.",
|
||||
),
|
||||
] = None,
|
||||
):
|
||||
"""Manually insert lyrics into the cache for the current track."""
|
||||
track = get_current_track(_player)
|
||||
if not track:
|
||||
logger.error("No active playing track found.")
|
||||
sys.exit(1)
|
||||
|
||||
if path:
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
lyrics = f.read()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read file: {e}")
|
||||
sys.exit(1)
|
||||
else:
|
||||
logger.info("Reading lyrics from stdin (Ctrl+D to finish)...")
|
||||
lyrics = sys.stdin.read()
|
||||
|
||||
manager.manual_insert(track, lyrics)
|
||||
|
||||
|
||||
# helpers
|
||||
|
||||
|
||||
def _print_track_cache(track: TrackMeta) -> None:
|
||||
"""Print all cached entries for a given track."""
|
||||
print(f"Track: {track.display_name()}")
|
||||
if track.album:
|
||||
print(f"Album: {track.album}")
|
||||
if track.length:
|
||||
secs = track.length / 1000.0
|
||||
print(f"Duration: {int(secs // 60)}:{secs % 60:05.2f}")
|
||||
print()
|
||||
|
||||
rows = manager.cache.query_track(track)
|
||||
if not rows:
|
||||
print(" (no cache entries)")
|
||||
return
|
||||
|
||||
for row in rows:
|
||||
_print_cache_row(row, indent=" ")
|
||||
|
||||
|
||||
def _print_cache_row(row: dict, indent: str = "") -> None:
|
||||
"""Pretty-print a single cache row."""
|
||||
now = int(time.time())
|
||||
source = row.get("source", "?")
|
||||
status = row.get("status", "?")
|
||||
artist = row.get("artist", "")
|
||||
title = row.get("title", "")
|
||||
album = row.get("album", "")
|
||||
created = row.get("created_at", 0)
|
||||
expires = row.get("expires_at")
|
||||
lyrics = row.get("lyrics", "")
|
||||
|
||||
name = f"{artist} - {title}" if artist and title else row.get("key", "?")
|
||||
print(f"{indent}[{source}] {name}")
|
||||
if album:
|
||||
print(f"{indent} Album : {album}")
|
||||
print(f"{indent} Status : {status}")
|
||||
if created:
|
||||
age = now - created
|
||||
print(f"{indent} Cached : {age // 3600}h {(age % 3600) // 60}m ago")
|
||||
if expires:
|
||||
remaining = expires - now
|
||||
if remaining > 0:
|
||||
print(
|
||||
f"{indent} Expires : in {remaining // 3600}h {(remaining % 3600) // 60}m"
|
||||
)
|
||||
else:
|
||||
print(f"{indent} Expires : EXPIRED")
|
||||
else:
|
||||
print(f"{indent} Expires : never")
|
||||
if lyrics:
|
||||
line_count = len(lyrics.splitlines())
|
||||
print(f"{indent} Lyrics : {line_count} lines")
|
||||
|
||||
|
||||
def run():
|
||||
app.meta()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
@@ -0,0 +1,88 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 10:17:56
|
||||
Description: Global configuration constants and logger setup
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from platformdirs import user_cache_dir, user_config_dir
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from importlib.metadata import version
|
||||
|
||||
# Application
|
||||
APP_NAME = "lrx-cli"
|
||||
APP_AUTHOR = "Uyanide"
|
||||
APP_VERSION = version(APP_NAME)
|
||||
|
||||
# Paths
|
||||
CACHE_DIR = user_cache_dir(APP_NAME, APP_AUTHOR)
|
||||
DB_PATH = os.path.join(CACHE_DIR, "cache.db")
|
||||
|
||||
# .env loading
|
||||
_config_env = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / ".env"
|
||||
load_dotenv(_config_env) # ~/.config/lrx-cli/.env
|
||||
load_dotenv() # .env in cwd (does NOT override existing vars)
|
||||
|
||||
# HTTP
|
||||
HTTP_TIMEOUT = 10.0
|
||||
|
||||
# Cache TTLs (seconds)
|
||||
TTL_SYNCED = None # never expires
|
||||
TTL_UNSYNCED = 86400 # 1 day
|
||||
TTL_NOT_FOUND = 86400 * 3 # 3 days
|
||||
TTL_NETWORK_ERROR = 3600 # 1 hour
|
||||
|
||||
# Search
|
||||
DURATION_TOLERANCE_MS = 3000 # max duration mismatch for search matching
|
||||
|
||||
# Spotify related
|
||||
SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token"
|
||||
SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
|
||||
SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time"
|
||||
SPOTIFY_SECRET_URL = (
|
||||
"https://raw.githubusercontent.com/xyloflake/spot-secrets-go"
|
||||
"/refs/heads/main/secrets/secrets.json"
|
||||
)
|
||||
SPOTIFY_SP_DC = os.environ.get("SPOTIFY_SP_DC", "")
|
||||
SPOTIFY_TOKEN_CACHE_FILE = os.path.join(CACHE_DIR, "spotify_token.json")
|
||||
SPOTIFY_APP_VERSION = "1.2.87.284.g3ff41c13"
|
||||
|
||||
# Netease api
|
||||
NETEASE_SEARCH_URL = "https://music.163.com/api/cloudsearch/pc"
|
||||
NETEASE_LYRIC_URL = "https://interface3.music.163.com/api/song/lyric"
|
||||
|
||||
# LRCLIB api
|
||||
LRCLIB_API_URL = "https://lrclib.net/api/get"
|
||||
LRCLIB_SEARCH_URL = "https://lrclib.net/api/search"
|
||||
|
||||
# QQ Music API (self-hosted proxy)
|
||||
QQ_MUSIC_API_URL = os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/")
|
||||
|
||||
# Player preference (used when multiple MPRIS players are active)
|
||||
PREFERRED_PLAYER = os.environ.get("PREFERRED_PLAYER", "spotify")
|
||||
|
||||
# User-Agents
|
||||
UA_BROWSER = "Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0"
|
||||
UA_LRX = f"LRX-CLI {APP_VERSION} (https://github.com/Uyanide/lrx-cli)"
|
||||
|
||||
os.makedirs(CACHE_DIR, exist_ok=True)
|
||||
|
||||
# Logger
|
||||
_LOG_FORMAT = (
|
||||
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
|
||||
"<level>{level: <8}</level> | "
|
||||
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
|
||||
"<level>{message}</level>"
|
||||
)
|
||||
|
||||
logger.remove()
|
||||
logger.add(sys.stderr, format=_LOG_FORMAT, level="INFO")
|
||||
|
||||
|
||||
def enable_debug() -> None:
|
||||
"""Switch logger to DEBUG level."""
|
||||
logger.remove()
|
||||
logger.add(sys.stderr, format=_LOG_FORMAT, level="DEBUG")
|
||||
+178
@@ -0,0 +1,178 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 11:09:53
|
||||
Description: Core orchestrator — coordinates fetchers with cache-aware fallback
|
||||
"""
|
||||
|
||||
"""
|
||||
Fetch pipeline:
|
||||
1. Check cache for each source in the fallback sequence
|
||||
2. For sources without a valid cache hit, call the fetcher
|
||||
3. Cache every result (success, not-found, or error) per source
|
||||
4. Return the best result (synced > unsynced > None)
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
|
||||
from .fetchers import FetcherMethodType, create_fetchers
|
||||
from .fetchers.base import BaseFetcher
|
||||
from .cache import CacheEngine
|
||||
from .lrc import normalize_tags, normalize_unsynced, detect_sync_status
|
||||
from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR
|
||||
from .models import TrackMeta, LyricResult, CacheStatus
|
||||
from .enrichers import enrich_track
|
||||
|
||||
|
||||
# Maps CacheStatus to the default TTL used when storing results
|
||||
_STATUS_TTL: dict[CacheStatus, Optional[int]] = {
|
||||
CacheStatus.SUCCESS_SYNCED: TTL_SYNCED,
|
||||
CacheStatus.SUCCESS_UNSYNCED: TTL_UNSYNCED,
|
||||
CacheStatus.NOT_FOUND: TTL_NOT_FOUND,
|
||||
CacheStatus.NETWORK_ERROR: TTL_NETWORK_ERROR,
|
||||
}
|
||||
|
||||
|
||||
class LrcManager:
|
||||
"""Main entry point for fetching lyrics with caching."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.cache = CacheEngine()
|
||||
self.fetchers = create_fetchers(self.cache)
|
||||
|
||||
def _build_sequence(
|
||||
self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None
|
||||
) -> list[BaseFetcher]:
|
||||
"""Determine the ordered list of fetchers to try."""
|
||||
if force_method:
|
||||
if force_method not in self.fetchers:
|
||||
logger.error(f"Unknown method: {force_method}")
|
||||
return []
|
||||
return [self.fetchers[force_method]]
|
||||
|
||||
sequence: list[BaseFetcher] = []
|
||||
for method in self.fetchers.keys():
|
||||
if self.fetchers[method].is_available(track):
|
||||
sequence.append(self.fetchers[method])
|
||||
|
||||
logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}")
|
||||
return sequence
|
||||
|
||||
def fetch_for_track(
|
||||
self,
|
||||
track: TrackMeta,
|
||||
force_method: Optional[FetcherMethodType] = None,
|
||||
bypass_cache: bool = False,
|
||||
) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for *track* using the fallback pipeline.
|
||||
|
||||
Each source is checked against the cache independently:
|
||||
- Cache hit with synced lyrics → return immediately
|
||||
- Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source
|
||||
- Cache miss or unsynced → call fetcher, then cache the result
|
||||
|
||||
After all sources are tried, returns the best result found
|
||||
(synced > unsynced > None).
|
||||
"""
|
||||
track = enrich_track(track)
|
||||
logger.info(f"Fetching lyrics for: {track.display_name()}")
|
||||
|
||||
sequence = self._build_sequence(track, force_method)
|
||||
if not sequence:
|
||||
return None
|
||||
|
||||
# Best result seen so far (synced wins over unsynced)
|
||||
best_result: Optional[LyricResult] = None
|
||||
|
||||
for fetcher in sequence:
|
||||
source = fetcher.source_name
|
||||
|
||||
# Cache check (skip for fetchers that handle their own caching)
|
||||
if not bypass_cache and not fetcher.self_cached:
|
||||
cached = self.cache.get(track, source)
|
||||
if cached:
|
||||
if cached.status == CacheStatus.SUCCESS_SYNCED:
|
||||
logger.info(f"[{source}] cache hit: synced lyrics")
|
||||
return cached
|
||||
elif cached.status == CacheStatus.SUCCESS_UNSYNCED:
|
||||
logger.debug(
|
||||
f"[{source}] cache hit: unsynced lyrics (continuing)"
|
||||
)
|
||||
if best_result is None:
|
||||
best_result = cached
|
||||
continue # Try next source for synced
|
||||
elif cached.status in (
|
||||
CacheStatus.NOT_FOUND,
|
||||
CacheStatus.NETWORK_ERROR,
|
||||
):
|
||||
logger.debug(
|
||||
f"[{source}] cache hit: {cached.status.value}, skipping"
|
||||
)
|
||||
continue
|
||||
elif not fetcher.self_cached:
|
||||
logger.debug(f"[{source}] cache bypassed")
|
||||
|
||||
# Fetch
|
||||
logger.debug(f"[{source}] calling fetcher...")
|
||||
result = fetcher.fetch(track, bypass_cache=bypass_cache)
|
||||
|
||||
if not result:
|
||||
logger.debug(f"[{source}] returned None (no result)")
|
||||
continue
|
||||
|
||||
# Cache the result (skip for self-cached fetchers)
|
||||
if not fetcher.self_cached:
|
||||
ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND)
|
||||
self.cache.set(track, source, result, ttl_seconds=ttl)
|
||||
|
||||
# Evaluate result
|
||||
if result.status == CacheStatus.SUCCESS_SYNCED:
|
||||
logger.info(f"[{source}] got synced lyrics")
|
||||
return result
|
||||
|
||||
if result.status == CacheStatus.SUCCESS_UNSYNCED:
|
||||
logger.debug(f"[{source}] got unsynced lyrics (continuing)")
|
||||
if best_result is None:
|
||||
best_result = result
|
||||
|
||||
# NOT_FOUND / NETWORK_ERROR: already cached, try next
|
||||
|
||||
# Return best available
|
||||
if best_result:
|
||||
# Normalize unsynced lyrics: set all timestamps to [00:00.00]
|
||||
if (
|
||||
best_result.status == CacheStatus.SUCCESS_UNSYNCED
|
||||
and best_result.lyrics
|
||||
):
|
||||
best_result = LyricResult(
|
||||
status=best_result.status,
|
||||
lyrics=normalize_unsynced(best_result.lyrics),
|
||||
source=best_result.source,
|
||||
ttl=best_result.ttl,
|
||||
)
|
||||
logger.info(
|
||||
f"Returning unsynced lyrics from {best_result.source} "
|
||||
f"(no synced source found)"
|
||||
)
|
||||
else:
|
||||
logger.info(f"No lyrics found for {track.display_name()}")
|
||||
|
||||
return best_result
|
||||
|
||||
def manual_insert(
|
||||
self,
|
||||
track: TrackMeta,
|
||||
lyrics: str,
|
||||
) -> None:
|
||||
"""Manually insert lyrics into the cache for a track."""
|
||||
track = enrich_track(track)
|
||||
logger.info(f"Manually inserting lyrics for: {track.display_name()}")
|
||||
lyrics = normalize_tags(lyrics)
|
||||
result = LyricResult(
|
||||
status=detect_sync_status(lyrics),
|
||||
lyrics=normalize_tags(lyrics),
|
||||
source="manual",
|
||||
ttl=None,
|
||||
)
|
||||
self.cache.set(track, "manual", result, ttl_seconds=None)
|
||||
logger.info("Lyrics inserted into cache.")
|
||||
@@ -0,0 +1,40 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-31 06:09:11
|
||||
Description: Metadata enrichment pipeline
|
||||
"""
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseEnricher
|
||||
from .audio_tag import AudioTagEnricher
|
||||
from .file_name import FileNameEnricher
|
||||
from ..models import TrackMeta
|
||||
|
||||
# Enrichers run in order; earlier ones have higher priority.
|
||||
_ENRICHERS: list[BaseEnricher] = [
|
||||
AudioTagEnricher(),
|
||||
FileNameEnricher(),
|
||||
]
|
||||
|
||||
|
||||
def enrich_track(track: TrackMeta) -> TrackMeta:
|
||||
"""Run all enrichers and return a track with missing fields filled in.
|
||||
|
||||
Each enricher sees the cumulative state (earlier enrichers' results
|
||||
are already applied). A field is only set if it is currently None.
|
||||
"""
|
||||
for enricher in _ENRICHERS:
|
||||
try:
|
||||
result = enricher.enrich(track)
|
||||
except Exception as e:
|
||||
logger.warning(f"Enricher {enricher.name} failed: {e}")
|
||||
continue
|
||||
if not result:
|
||||
continue
|
||||
# Only apply fields that are still None
|
||||
updates = {k: v for k, v in result.items() if getattr(track, k, None) is None}
|
||||
if updates:
|
||||
for k, v in updates.items():
|
||||
setattr(track, k, v)
|
||||
return track
|
||||
@@ -0,0 +1,78 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-31 06:11:27
|
||||
Description: Enricher that reads metadata from audio file tags (mutagen)
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
from mutagen._file import File, FileType
|
||||
|
||||
from .base import BaseEnricher
|
||||
from ..models import TrackMeta
|
||||
from ..lrc import get_audio_path
|
||||
|
||||
|
||||
class AudioTagEnricher(BaseEnricher):
|
||||
"""Extract title, artist, album, and duration from audio file tags."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "audio-tag"
|
||||
|
||||
def enrich(self, track: TrackMeta) -> Optional[dict]:
|
||||
if not track.is_local or not track.url:
|
||||
return None
|
||||
|
||||
audio_path = get_audio_path(track.url, ensure_exists=True)
|
||||
if not audio_path:
|
||||
return None
|
||||
|
||||
try:
|
||||
audio = File(audio_path)
|
||||
except Exception as e:
|
||||
logger.debug(f"AudioTag: failed to read {audio_path}: {e}")
|
||||
return None
|
||||
|
||||
if audio is None:
|
||||
return None
|
||||
|
||||
updates: dict = {}
|
||||
|
||||
# Try common tag names (vorbis comments, ID3, MP4)
|
||||
title = _first_tag(audio, "title", "TIT2", "\xa9nam")
|
||||
if title and not track.title:
|
||||
updates["title"] = title
|
||||
|
||||
artist = _first_tag(audio, "artist", "TPE1", "\xa9ART")
|
||||
if artist and not track.artist:
|
||||
updates["artist"] = artist
|
||||
|
||||
album = _first_tag(audio, "album", "TALB", "\xa9alb")
|
||||
if album and not track.album:
|
||||
updates["album"] = album
|
||||
|
||||
if not track.length and audio.info and hasattr(audio.info, "length"):
|
||||
length_ms = int(audio.info.length * 1000)
|
||||
if length_ms > 0:
|
||||
updates["length"] = length_ms
|
||||
|
||||
if updates:
|
||||
logger.debug(f"AudioTag: enriched fields: {list(updates.keys())}")
|
||||
return updates or None
|
||||
|
||||
|
||||
def _first_tag(audio: FileType, *keys: str) -> Optional[str]:
|
||||
"""Return the first non-empty string value found among the given tag keys."""
|
||||
if not audio.tags:
|
||||
return None
|
||||
for key in keys:
|
||||
val = audio.tags.get(key)
|
||||
if val is None:
|
||||
continue
|
||||
# mutagen returns lists for vorbis, single values for ID3
|
||||
if isinstance(val, list):
|
||||
val = val[0] if val else None
|
||||
if val:
|
||||
return str(val).strip()
|
||||
return None
|
||||
@@ -0,0 +1,31 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-31 06:08:16
|
||||
Description: Base class for metadata enrichers
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
from ..models import TrackMeta
|
||||
|
||||
|
||||
class BaseEnricher(ABC):
|
||||
"""Attempts to fill missing fields on a TrackMeta.
|
||||
|
||||
Each enricher inspects the track, and returns a dict of field names
|
||||
to values for any fields it can provide. Only fields that are
|
||||
currently ``None`` on the track will actually be applied.
|
||||
"""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self) -> str: ...
|
||||
|
||||
@abstractmethod
|
||||
def enrich(self, track: TrackMeta) -> Optional[dict]:
|
||||
"""Return a dict of {field_name: value} for fields this enricher can fill.
|
||||
|
||||
Return None or an empty dict if nothing can be contributed.
|
||||
"""
|
||||
...
|
||||
@@ -0,0 +1,83 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-31 06:08:44
|
||||
Description: Enricher that parses metadata from the audio file path
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseEnricher
|
||||
from ..models import TrackMeta
|
||||
from ..lrc import get_audio_path
|
||||
|
||||
|
||||
# Common track-number prefixes: "01 - ", "01. ", "1 - ", etc.
|
||||
_TRACK_NUM_RE = re.compile(r"^\d{1,3}[\s.\-]+")
|
||||
|
||||
|
||||
class FileNameEnricher(BaseEnricher):
|
||||
"""Derive artist / title from the file path when tags are unavailable.
|
||||
|
||||
Heuristics (applied to the stem of the filename):
|
||||
- "Artist - Title" → artist, title
|
||||
- "01 - Title" → title only (leading track number stripped)
|
||||
- "Title" → title only
|
||||
|
||||
If artist is still missing after parsing the filename, the parent
|
||||
directory name is used as a guess (common layout: ``Artist/Album/track``).
|
||||
"""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "file-name"
|
||||
|
||||
def enrich(self, track: TrackMeta) -> Optional[dict]:
|
||||
if not track.is_local or not track.url:
|
||||
return None
|
||||
|
||||
audio_path = get_audio_path(track.url, ensure_exists=False)
|
||||
if not audio_path:
|
||||
return None
|
||||
|
||||
updates: dict = {}
|
||||
stem = audio_path.stem
|
||||
|
||||
# Try "Artist - Title" split
|
||||
if " - " in stem:
|
||||
left, right = stem.split(" - ", 1)
|
||||
left = _TRACK_NUM_RE.sub("", left).strip()
|
||||
right = right.strip()
|
||||
|
||||
if left and right:
|
||||
# Both sides non-empty after stripping track number
|
||||
if not track.artist:
|
||||
updates["artist"] = left
|
||||
if not track.title:
|
||||
updates["title"] = right
|
||||
elif right:
|
||||
# Left was only a track number → right is the title
|
||||
if not track.title:
|
||||
updates["title"] = right
|
||||
else:
|
||||
# No separator: strip track number, remainder is title
|
||||
title_guess = _TRACK_NUM_RE.sub("", stem).strip()
|
||||
if title_guess and not track.title:
|
||||
updates["title"] = title_guess
|
||||
|
||||
# Use parent directory as artist fallback
|
||||
# Typical layout: /Music/Artist/Album/01 - Track.flac
|
||||
if not track.artist and "artist" not in updates:
|
||||
parents = audio_path.parents
|
||||
if len(parents) >= 2:
|
||||
album_dir = parents[0].name
|
||||
artist_dir = parents[1].name
|
||||
if artist_dir and artist_dir not in (".", "/"):
|
||||
updates["artist"] = artist_dir
|
||||
if not track.album and album_dir and album_dir != artist_dir:
|
||||
updates["album"] = album_dir
|
||||
|
||||
if updates:
|
||||
logger.debug(f"FileName: enriched fields: {list(updates.keys())}")
|
||||
return updates or None
|
||||
@@ -0,0 +1,41 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 02:33:26
|
||||
Description: Fetcher pipeline — registry and types
|
||||
"""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from .base import BaseFetcher
|
||||
from .local import LocalFetcher
|
||||
from .cache_search import CacheSearchFetcher
|
||||
from .spotify import SpotifyFetcher
|
||||
from .lrclib import LrclibFetcher
|
||||
from .lrclib_search import LrclibSearchFetcher
|
||||
from .netease import NeteaseFetcher
|
||||
from .qqmusic import QQMusicFetcher
|
||||
from ..cache import CacheEngine
|
||||
|
||||
FetcherMethodType = Literal[
|
||||
"local",
|
||||
"cache-search",
|
||||
"spotify",
|
||||
"lrclib",
|
||||
"lrclib-search",
|
||||
"netease",
|
||||
"qqmusic",
|
||||
]
|
||||
|
||||
|
||||
def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]:
|
||||
"""Instantiate all fetchers. Returns a dict keyed by source name."""
|
||||
fetchers: dict[FetcherMethodType, BaseFetcher] = {
|
||||
"local": LocalFetcher(),
|
||||
"cache-search": CacheSearchFetcher(cache),
|
||||
"spotify": SpotifyFetcher(),
|
||||
"lrclib": LrclibFetcher(),
|
||||
"lrclib-search": LrclibSearchFetcher(),
|
||||
"netease": NeteaseFetcher(),
|
||||
"qqmusic": QQMusicFetcher(),
|
||||
}
|
||||
return fetchers
|
||||
@@ -0,0 +1,35 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 02:33:26
|
||||
Description: Base fetcher class and common interfaces
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
from ..models import TrackMeta, LyricResult
|
||||
|
||||
|
||||
class BaseFetcher(ABC):
|
||||
@property
|
||||
@abstractmethod
|
||||
def source_name(self) -> str:
|
||||
"""Name of the fetcher source."""
|
||||
pass
|
||||
|
||||
@property
|
||||
def self_cached(self) -> bool:
|
||||
"""True if this fetcher manages its own cache (skip per-source cache check)."""
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
"""Check if the fetcher is available for the given track (e.g. has required metadata)."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for the given track. Returns None if unable to fetch."""
|
||||
pass
|
||||
@@ -0,0 +1,85 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-28 05:57:46
|
||||
Description: Cache-search fetcher — cross-album fuzzy lookup in the local cache
|
||||
"""
|
||||
|
||||
"""
|
||||
Searches existing cache entries by artist + title with fuzzy normalization,
|
||||
ignoring album and source. Useful when the same track appears on different
|
||||
albums or is played from different players.
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..cache import CacheEngine
|
||||
|
||||
|
||||
class CacheSearchFetcher(BaseFetcher):
|
||||
def __init__(self, cache: CacheEngine) -> None:
|
||||
self._cache = cache
|
||||
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "cache-search"
|
||||
|
||||
@property
|
||||
def self_cached(self) -> bool:
|
||||
return True
|
||||
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return bool(track.title)
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
if bypass_cache:
|
||||
logger.debug("Cache-search: bypassed by caller")
|
||||
return None
|
||||
|
||||
if not track.title:
|
||||
logger.debug("Cache-search: skipped — no title")
|
||||
return None
|
||||
|
||||
# Fast path: exact metadata match (artist+title+album), single SQL query
|
||||
exact = self._cache.find_best_positive(track)
|
||||
if exact:
|
||||
logger.info(f"Cache-search: exact hit ({exact.status.value})")
|
||||
return exact
|
||||
|
||||
# Slow path: fuzzy cross-album search
|
||||
matches = self._cache.search_by_meta(
|
||||
artist=track.artist,
|
||||
title=track.title,
|
||||
length=track.length,
|
||||
)
|
||||
|
||||
if not matches:
|
||||
logger.debug(f"Cache-search: no match for {track.display_name()}")
|
||||
return None
|
||||
|
||||
# Pick best: prefer synced, then first available
|
||||
best = None
|
||||
for m in matches:
|
||||
if m.get("status") == CacheStatus.SUCCESS_SYNCED.value:
|
||||
best = m
|
||||
break
|
||||
if best is None:
|
||||
best = m
|
||||
|
||||
if not best or not best.get("lyrics"):
|
||||
return None
|
||||
|
||||
status = CacheStatus(best["status"])
|
||||
logger.info(
|
||||
f"Cache-search: fuzzy hit from [{best.get('source')}] "
|
||||
f"album={best.get('album')!r} ({status.value})"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status,
|
||||
lyrics=best["lyrics"],
|
||||
source=self.source_name,
|
||||
)
|
||||
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-26 02:08:41
|
||||
Description: Local fetcher — reads lyrics from .lrc sidecar files or embedded audio metadata
|
||||
"""
|
||||
|
||||
"""
|
||||
Priority:
|
||||
1. Same-directory .lrc file (e.g. /path/to/track.lrc)
|
||||
2. Embedded lyrics in audio metadata (FLAC, MP3 USLT/SYLT tags)
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
from mutagen._file import File
|
||||
from mutagen.flac import FLAC
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult
|
||||
from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path
|
||||
|
||||
|
||||
class LocalFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "local"
|
||||
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return track.is_local
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
"""Attempt to read lyrics from local filesystem."""
|
||||
if not track.is_local or not track.url:
|
||||
return None
|
||||
|
||||
audio_path = get_audio_path(track.url, ensure_exists=False)
|
||||
if not audio_path:
|
||||
logger.debug(f"Local: audio URL is not a valid file path: {track.url}")
|
||||
return None
|
||||
|
||||
lrc_path = get_sidecar_path(
|
||||
track.url, ensure_audio_exists=False, ensure_exists=True
|
||||
)
|
||||
if lrc_path:
|
||||
try:
|
||||
with open(lrc_path, "r", encoding="utf-8") as f:
|
||||
content = f.read().strip()
|
||||
if content:
|
||||
content = normalize_tags(content)
|
||||
status = detect_sync_status(content)
|
||||
logger.info(f"Local: found .lrc sidecar ({status.value})")
|
||||
return LyricResult(
|
||||
status=status, lyrics=content, source=self.source_name
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Local: error reading {lrc_path}: {e}")
|
||||
else:
|
||||
logger.debug(f"Local: no .lrc sidecar found for {audio_path}")
|
||||
|
||||
# Embedded metadata
|
||||
if not audio_path.exists():
|
||||
logger.debug(f"Local: audio file does not exist: {audio_path}")
|
||||
return None
|
||||
try:
|
||||
audio = File(audio_path)
|
||||
if audio is not None:
|
||||
lyrics = None
|
||||
|
||||
if isinstance(audio, FLAC):
|
||||
# FLAC stores lyrics in vorbis comment tags
|
||||
lyrics = (
|
||||
audio.get("lyrics") or audio.get("unsynclyrics") or [None]
|
||||
)[0]
|
||||
elif hasattr(audio, "tags") and audio.tags:
|
||||
# MP3 / other: look for USLT or SYLT ID3 frames
|
||||
for key in audio.tags.keys():
|
||||
if key.startswith("USLT") or key.startswith("SYLT"):
|
||||
lyrics = str(audio.tags[key])
|
||||
break
|
||||
|
||||
if lyrics:
|
||||
lyrics = normalize_tags(lyrics.strip())
|
||||
status = detect_sync_status(lyrics)
|
||||
logger.info(f"Local: found embedded lyrics ({status.value})")
|
||||
return LyricResult(
|
||||
status=status,
|
||||
lyrics=lyrics,
|
||||
source=f"{self.source_name} (embedded)",
|
||||
)
|
||||
else:
|
||||
logger.debug("Local: no embedded lyrics found")
|
||||
except Exception as e:
|
||||
logger.error(f"Local: error reading metadata for {audio_path}: {e}")
|
||||
|
||||
logger.debug(f"Local: no lyrics found for {audio_path}")
|
||||
return None
|
||||
@@ -0,0 +1,111 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 05:23:38
|
||||
Description: LRCLIB fetcher — queries lrclib.net for synced/plain lyrics
|
||||
"""
|
||||
|
||||
"""
|
||||
Requires complete track metadata (artist, title, album, duration).
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import normalize_tags
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_UNSYNCED,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
LRCLIB_API_URL,
|
||||
UA_LRX,
|
||||
)
|
||||
|
||||
|
||||
class LrclibFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "lrclib"
|
||||
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return track.is_complete
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics from LRCLIB. Requires complete metadata."""
|
||||
if not track.is_complete:
|
||||
logger.debug("LRCLIB: skipped — incomplete metadata")
|
||||
return None
|
||||
|
||||
params = {
|
||||
"track_name": track.title,
|
||||
"artist_name": track.artist,
|
||||
"album_name": track.album,
|
||||
"duration": track.length / 1000.0 if track.length else 0,
|
||||
}
|
||||
|
||||
url = f"{LRCLIB_API_URL}?{urlencode(params)}"
|
||||
logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.get(url, headers={"User-Agent": UA_LRX})
|
||||
|
||||
if resp.status_code == 404:
|
||||
logger.debug(f"LRCLIB: not found for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"LRCLIB: API returned {resp.status_code}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
data = resp.json()
|
||||
|
||||
# Validate response
|
||||
if not isinstance(data, dict):
|
||||
logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
synced = data.get("syncedLyrics")
|
||||
unsynced = data.get("plainLyrics")
|
||||
|
||||
if isinstance(synced, str) and synced.strip():
|
||||
lyrics = normalize_tags(synced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_SYNCED,
|
||||
lyrics=lyrics,
|
||||
source=self.source_name,
|
||||
)
|
||||
elif isinstance(unsynced, str) and unsynced.strip():
|
||||
lyrics = normalize_tags(unsynced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_UNSYNCED,
|
||||
lyrics=lyrics,
|
||||
source=self.source_name,
|
||||
ttl=TTL_UNSYNCED,
|
||||
)
|
||||
else:
|
||||
logger.debug(f"LRCLIB: empty response for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"LRCLIB: HTTP error: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
except Exception as e:
|
||||
logger.error(f"LRCLIB: unexpected error: {e}")
|
||||
return None
|
||||
@@ -0,0 +1,168 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 05:30:50
|
||||
Description: LRCLIB search fetcher — fuzzy search via lrclib.net /api/search
|
||||
"""
|
||||
|
||||
"""
|
||||
Used when metadata is incomplete (no album or duration) but title is available.
|
||||
Selects the best match by duration when track length is known.
|
||||
"""
|
||||
|
||||
import httpx
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import normalize_tags
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_UNSYNCED,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
LRCLIB_SEARCH_URL,
|
||||
UA_LRX,
|
||||
)
|
||||
|
||||
|
||||
class LrclibSearchFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "lrclib-search"
|
||||
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return bool(track.title)
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
"""Search LRCLIB for lyrics. Requires at least a title."""
|
||||
if not track.title:
|
||||
logger.debug("LRCLIB-search: skipped — no title")
|
||||
return None
|
||||
|
||||
params: dict[str, str] = {"track_name": track.title}
|
||||
if track.artist:
|
||||
params["artist_name"] = track.artist
|
||||
if track.album:
|
||||
params["album_name"] = track.album
|
||||
|
||||
url = f"{LRCLIB_SEARCH_URL}?{urlencode(params)}"
|
||||
logger.info(f"LRCLIB-search: searching for {track.display_name()}")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.get(url, headers={"User-Agent": UA_LRX})
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
data = resp.json()
|
||||
|
||||
if not isinstance(data, list) or len(data) == 0:
|
||||
logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
logger.debug(f"LRCLIB-search: got {len(data)} candidates")
|
||||
|
||||
# Select best match by duration
|
||||
best = self._select_best(data, track)
|
||||
if best is None:
|
||||
logger.debug("LRCLIB-search: no valid candidate found")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Extract lyrics
|
||||
synced = best.get("syncedLyrics")
|
||||
unsynced = best.get("plainLyrics")
|
||||
|
||||
if isinstance(synced, str) and synced.strip():
|
||||
lyrics = normalize_tags(synced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_SYNCED,
|
||||
lyrics=lyrics,
|
||||
source=self.source_name,
|
||||
)
|
||||
elif isinstance(unsynced, str) and unsynced.strip():
|
||||
lyrics = normalize_tags(unsynced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_UNSYNCED,
|
||||
lyrics=lyrics,
|
||||
source=self.source_name,
|
||||
ttl=TTL_UNSYNCED,
|
||||
)
|
||||
else:
|
||||
logger.debug("LRCLIB-search: best candidate has empty lyrics")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"LRCLIB-search: HTTP error: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
except Exception as e:
|
||||
logger.error(f"LRCLIB-search: unexpected error: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _select_best(candidates: list[dict], track: TrackMeta) -> Optional[dict]:
|
||||
"""Pick the best candidate, preferring synced lyrics and closest duration."""
|
||||
if track.length is not None:
|
||||
track_s = track.length / 1000.0
|
||||
best: Optional[dict] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for item in candidates:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
duration = item.get("duration")
|
||||
if not isinstance(duration, (int, float)):
|
||||
continue
|
||||
diff = abs(duration - track_s) * 1000 # compare in ms
|
||||
if diff > DURATION_TOLERANCE_MS:
|
||||
continue
|
||||
# Prefer synced over unsynced at similar duration
|
||||
has_synced = (
|
||||
isinstance(item.get("syncedLyrics"), str)
|
||||
and item["syncedLyrics"].strip()
|
||||
)
|
||||
best_synced = (
|
||||
best is not None
|
||||
and isinstance(best.get("syncedLyrics"), str)
|
||||
and best["syncedLyrics"].strip()
|
||||
)
|
||||
if diff < best_diff or (
|
||||
diff == best_diff and has_synced and not best_synced
|
||||
):
|
||||
best_diff = diff
|
||||
best = item
|
||||
|
||||
if best is not None:
|
||||
logger.debug(
|
||||
f"LRCLIB-search: selected id={best.get('id')} (diff={best_diff:.0f}ms)"
|
||||
)
|
||||
return best
|
||||
|
||||
logger.debug(
|
||||
f"LRCLIB-search: no candidate within {DURATION_TOLERANCE_MS}ms"
|
||||
)
|
||||
return None
|
||||
|
||||
# No duration — pick first with synced lyrics, or just first
|
||||
for item in candidates:
|
||||
if (
|
||||
isinstance(item, dict)
|
||||
and isinstance(item.get("syncedLyrics"), str)
|
||||
and item["syncedLyrics"].strip()
|
||||
):
|
||||
return item
|
||||
return candidates[0] if isinstance(candidates[0], dict) else None
|
||||
@@ -0,0 +1,213 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 11:04:51
|
||||
Description: Netease Cloud Music fetcher
|
||||
"""
|
||||
|
||||
"""
|
||||
Uses the public cloudsearch API for searching and the song/lyric API for
|
||||
retrieving lyrics. No authentication required.
|
||||
|
||||
Search results are filtered by duration when the track has a known length
|
||||
to avoid returning lyrics for the wrong version of a song.
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
import httpx
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import detect_sync_status, normalize_tags
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
NETEASE_SEARCH_URL,
|
||||
NETEASE_LYRIC_URL,
|
||||
UA_BROWSER,
|
||||
)
|
||||
|
||||
_HEADERS = {
|
||||
"User-Agent": UA_BROWSER,
|
||||
"Referer": "https://music.163.com/",
|
||||
}
|
||||
|
||||
|
||||
class NeteaseFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "netease"
|
||||
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return bool(track.title)
|
||||
|
||||
def _search(self, track: TrackMeta, limit: int = 10) -> Optional[int]:
|
||||
"""Search Netease and return the best-matching song ID.
|
||||
|
||||
When ``track.length`` is available, candidates are ranked by duration
|
||||
difference and only accepted if within ``DURATION_TOLERANCE_MS``.
|
||||
"""
|
||||
query = f"{track.artist or ''} {track.title or ''}".strip()
|
||||
if not query:
|
||||
return None
|
||||
|
||||
logger.debug(f"Netease: searching for '{query}' (limit={limit})")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.post(
|
||||
NETEASE_SEARCH_URL,
|
||||
headers=_HEADERS,
|
||||
data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
result = resp.json()
|
||||
|
||||
# Validate response
|
||||
if not isinstance(result, dict):
|
||||
logger.error(
|
||||
f"Netease: search returned non-dict: {type(result).__name__}"
|
||||
)
|
||||
return None
|
||||
|
||||
result_body = result.get("result")
|
||||
if not isinstance(result_body, dict):
|
||||
logger.debug("Netease: search 'result' field missing or invalid")
|
||||
return None
|
||||
|
||||
songs = result_body.get("songs")
|
||||
if not isinstance(songs, list) or len(songs) == 0:
|
||||
logger.debug("Netease: search returned 0 results")
|
||||
return None
|
||||
|
||||
logger.debug(f"Netease: search returned {len(songs)} candidates")
|
||||
|
||||
# Duration-based best-match selection
|
||||
if track.length is not None:
|
||||
track_ms = track.length
|
||||
best_id: Optional[int] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for song in songs:
|
||||
if not isinstance(song, dict):
|
||||
continue
|
||||
sid = song.get("id")
|
||||
name = song.get("name", "?")
|
||||
duration = song.get("dt") # milliseconds
|
||||
if not isinstance(duration, int):
|
||||
logger.debug(
|
||||
f" candidate {sid} '{name}': no duration, skipped"
|
||||
)
|
||||
continue
|
||||
diff = abs(duration - track_ms)
|
||||
logger.debug(
|
||||
f" candidate {sid} '{name}': "
|
||||
f"duration={duration}ms, diff={diff}ms"
|
||||
)
|
||||
if diff < best_diff:
|
||||
best_diff = diff
|
||||
best_id = sid
|
||||
|
||||
if best_id is not None and best_diff <= DURATION_TOLERANCE_MS:
|
||||
logger.debug(f"Netease: selected id={best_id} (diff={best_diff}ms)")
|
||||
return best_id
|
||||
|
||||
logger.debug(
|
||||
f"Netease: no candidate within {DURATION_TOLERANCE_MS}ms "
|
||||
f"(best diff={best_diff}ms)"
|
||||
)
|
||||
return None
|
||||
|
||||
# No duration info — take the first result
|
||||
first = songs[0]
|
||||
if not isinstance(first, dict) or "id" not in first:
|
||||
logger.error("Netease: first search result has no 'id'")
|
||||
return None
|
||||
logger.debug(
|
||||
f"Netease: no duration available, using first result "
|
||||
f"id={first['id']} '{first.get('name', '?')}'"
|
||||
)
|
||||
return first["id"]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Netease: search failed: {e}")
|
||||
return None
|
||||
|
||||
def _get_lyric(self, song_id: int) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for a given Netease song ID."""
|
||||
logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.post(
|
||||
NETEASE_LYRIC_URL,
|
||||
headers=_HEADERS,
|
||||
data={
|
||||
"id": str(song_id),
|
||||
"cp": "false",
|
||||
"tv": "0",
|
||||
"lv": "0",
|
||||
"rv": "0",
|
||||
"kv": "0",
|
||||
"yv": "0",
|
||||
"ytv": "0",
|
||||
"yrv": "0",
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
# Validate response
|
||||
if not isinstance(data, dict):
|
||||
logger.error(
|
||||
f"Netease: lyric response is not dict: {type(data).__name__}"
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
lrc_obj = data.get("lrc")
|
||||
if not isinstance(lrc_obj, dict):
|
||||
logger.debug(
|
||||
f"Netease: no 'lrc' object in response for song_id={song_id}"
|
||||
)
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
lrc: str = lrc_obj.get("lyric", "")
|
||||
if not isinstance(lrc, str) or not lrc.strip():
|
||||
logger.debug(f"Netease: empty lyrics for song_id={song_id}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Determine sync status
|
||||
lrc = normalize_tags(lrc)
|
||||
status = detect_sync_status(lrc)
|
||||
logger.info(
|
||||
f"Netease: got {status.value} lyrics for song_id={song_id} "
|
||||
f"({len(lrc.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status, lyrics=lrc.strip(), source=self.source_name
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
"""Search for the track and fetch its lyrics."""
|
||||
query = f"{track.artist or ''} {track.title or ''}".strip()
|
||||
if not query:
|
||||
logger.debug("Netease: skipped — insufficient metadata")
|
||||
return None
|
||||
|
||||
logger.info(f"Netease: fetching lyrics for {track.display_name()}")
|
||||
song_id = self._search(track)
|
||||
if not song_id:
|
||||
logger.debug(f"Netease: no match found for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
return self._get_lyric(song_id)
|
||||
@@ -0,0 +1,178 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-31 01:54:02
|
||||
Description: QQ Music fetcher via self-hosted API proxy
|
||||
"""
|
||||
|
||||
"""
|
||||
Requires a running qq-music-api instance.
|
||||
The base URL is read from the QQ_MUSIC_API_URL environment variable.
|
||||
|
||||
Search → pick best match by duration → fetch LRC lyrics.
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
import httpx
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import detect_sync_status, normalize_tags
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
DURATION_TOLERANCE_MS,
|
||||
QQ_MUSIC_API_URL,
|
||||
)
|
||||
|
||||
|
||||
class QQMusicFetcher(BaseFetcher):
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "qqmusic"
|
||||
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return bool(track.title) and bool(QQ_MUSIC_API_URL)
|
||||
|
||||
def _search(self, track: TrackMeta, limit: int = 10) -> Optional[str]:
|
||||
"""Search QQ Music and return the best-matching song MID."""
|
||||
query = f"{track.artist or ''} {track.title or ''}".strip()
|
||||
if not query:
|
||||
return None
|
||||
|
||||
logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.get(
|
||||
f"{QQ_MUSIC_API_URL}/api/search",
|
||||
params={"keyword": query, "type": "song", "num": limit},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
if data.get("code") != 0:
|
||||
logger.error(f"QQMusic: search API error: {data}")
|
||||
return None
|
||||
|
||||
songs = data.get("data", {}).get("list", [])
|
||||
if not songs:
|
||||
logger.debug("QQMusic: search returned 0 results")
|
||||
return None
|
||||
|
||||
logger.debug(f"QQMusic: search returned {len(songs)} candidates")
|
||||
|
||||
# Duration-based best-match selection
|
||||
if track.length is not None:
|
||||
track_ms = track.length
|
||||
best_mid: Optional[str] = None
|
||||
best_diff = float("inf")
|
||||
|
||||
for song in songs:
|
||||
if not isinstance(song, dict):
|
||||
continue
|
||||
mid = song.get("mid")
|
||||
name = song.get("name", "?")
|
||||
# interval is in seconds
|
||||
interval = song.get("interval")
|
||||
if not isinstance(interval, int):
|
||||
logger.debug(
|
||||
f" candidate {mid} '{name}': no duration, skipped"
|
||||
)
|
||||
continue
|
||||
duration_ms = interval * 1000
|
||||
diff = abs(duration_ms - track_ms)
|
||||
logger.debug(
|
||||
f" candidate {mid} '{name}': "
|
||||
f"duration={duration_ms}ms, diff={diff}ms"
|
||||
)
|
||||
if diff < best_diff:
|
||||
best_diff = diff
|
||||
best_mid = mid
|
||||
|
||||
if best_mid is not None and best_diff <= DURATION_TOLERANCE_MS:
|
||||
logger.debug(
|
||||
f"QQMusic: selected mid={best_mid} (diff={best_diff}ms)"
|
||||
)
|
||||
return best_mid
|
||||
|
||||
logger.debug(
|
||||
f"QQMusic: no candidate within {DURATION_TOLERANCE_MS}ms "
|
||||
f"(best diff={best_diff}ms)"
|
||||
)
|
||||
return None
|
||||
|
||||
# No duration info — take the first result
|
||||
first = songs[0]
|
||||
if not isinstance(first, dict) or "mid" not in first:
|
||||
logger.error("QQMusic: first search result has no 'mid'")
|
||||
return None
|
||||
logger.debug(
|
||||
f"QQMusic: no duration available, using first result "
|
||||
f"mid={first['mid']} '{first.get('name', '?')}'"
|
||||
)
|
||||
return first["mid"]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"QQMusic: search failed: {e}")
|
||||
return None
|
||||
|
||||
def _get_lyric(self, mid: str) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for a given QQ Music song MID."""
|
||||
logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
resp = client.get(
|
||||
f"{QQ_MUSIC_API_URL}/api/lyric",
|
||||
params={"mid": mid},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
if data.get("code") != 0:
|
||||
logger.error(f"QQMusic: lyric API error: {data}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
lrc = data.get("data", {}).get("lyric", "")
|
||||
if not isinstance(lrc, str) or not lrc.strip():
|
||||
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
lrc = normalize_tags(lrc)
|
||||
status = detect_sync_status(lrc)
|
||||
logger.info(
|
||||
f"QQMusic: got {status.value} lyrics for mid={mid} "
|
||||
f"({len(lrc.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status, lyrics=lrc.strip(), source=self.source_name
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
"""Search for the track and fetch its lyrics."""
|
||||
if not QQ_MUSIC_API_URL:
|
||||
logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured")
|
||||
return None
|
||||
|
||||
query = f"{track.artist or ''} {track.title or ''}".strip()
|
||||
if not query:
|
||||
logger.debug("QQMusic: skipped — insufficient metadata")
|
||||
return None
|
||||
|
||||
logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
|
||||
mid = self._search(track)
|
||||
if not mid:
|
||||
logger.debug(f"QQMusic: no match found for {track.display_name()}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
return self._get_lyric(mid)
|
||||
@@ -0,0 +1,373 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 10:43:21
|
||||
Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
|
||||
"""
|
||||
|
||||
"""
|
||||
Authentication flow:
|
||||
1. Fetch server time from Spotify
|
||||
2. Fetch TOTP secret
|
||||
3. Generate a TOTP code and exchange it (with SP_DC cookie) for an access token
|
||||
4. Request lyrics using the access token
|
||||
|
||||
The secret and token are cached on the instance to avoid redundant network
|
||||
calls within the same session.
|
||||
|
||||
Requires SPOTIFY_SP_DC environment variable to be set.
|
||||
"""
|
||||
|
||||
import httpx
|
||||
import json
|
||||
import time
|
||||
import struct
|
||||
import hmac
|
||||
import hashlib
|
||||
from typing import Optional, Tuple
|
||||
from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import normalize_tags
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
SPOTIFY_APP_VERSION,
|
||||
TTL_NOT_FOUND,
|
||||
TTL_NETWORK_ERROR,
|
||||
SPOTIFY_TOKEN_URL,
|
||||
SPOTIFY_LYRICS_URL,
|
||||
SPOTIFY_SERVER_TIME_URL,
|
||||
SPOTIFY_SECRET_URL,
|
||||
SPOTIFY_SP_DC,
|
||||
SPOTIFY_TOKEN_CACHE_FILE,
|
||||
UA_BROWSER,
|
||||
)
|
||||
|
||||
|
||||
class SpotifyFetcher(BaseFetcher):
|
||||
def __init__(self) -> None:
|
||||
# Session-level caches to avoid refetching within the same run
|
||||
self._cached_secret: Optional[Tuple[str, int]] = None
|
||||
self._cached_token: Optional[str] = None
|
||||
self._token_expires_at: float = 0.0
|
||||
|
||||
@property
|
||||
def source_name(self) -> str:
|
||||
return "spotify"
|
||||
|
||||
def is_available(self, track: TrackMeta) -> bool:
|
||||
return bool(track.trackid) and bool(SPOTIFY_SP_DC)
|
||||
|
||||
# ─── Auth helpers ────────────────────────────────────────────────
|
||||
|
||||
def _get_server_time(self, client: httpx.Client) -> Optional[int]:
|
||||
"""Fetch Spotify's server timestamp (seconds since epoch)."""
|
||||
try:
|
||||
res = client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
|
||||
res.raise_for_status()
|
||||
data = res.json()
|
||||
if not isinstance(data, dict) or "serverTime" not in data:
|
||||
logger.error(f"Spotify: unexpected server-time response: {data}")
|
||||
return None
|
||||
server_time = data["serverTime"]
|
||||
logger.debug(f"Spotify: server time = {server_time}")
|
||||
return server_time
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: failed to fetch server time: {e}")
|
||||
return None
|
||||
|
||||
def _get_secret(self, client: httpx.Client) -> Optional[Tuple[str, int]]:
|
||||
"""Fetch and decode the TOTP secret. Cached after first success.
|
||||
|
||||
Response format: [{version: int, secret: str}, ...]
|
||||
Each character in *secret* is XOR-decoded with ``(index % 33) + 9``.
|
||||
"""
|
||||
if self._cached_secret is not None:
|
||||
logger.debug("Spotify: using cached TOTP secret")
|
||||
return self._cached_secret
|
||||
|
||||
try:
|
||||
res = client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
|
||||
res.raise_for_status()
|
||||
data = res.json()
|
||||
|
||||
if not isinstance(data, list) or len(data) == 0:
|
||||
logger.error(
|
||||
f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})"
|
||||
)
|
||||
return None
|
||||
|
||||
last = data[-1]
|
||||
if "secret" not in last or "version" not in last:
|
||||
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
|
||||
return None
|
||||
|
||||
secret_raw = last["secret"]
|
||||
version = last["version"]
|
||||
|
||||
# XOR decode
|
||||
parts = []
|
||||
for i, char in enumerate(secret_raw):
|
||||
parts.append(str(ord(char) ^ ((i % 33) + 9)))
|
||||
secret = "".join(parts)
|
||||
|
||||
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
|
||||
self._cached_secret = (secret, version)
|
||||
return self._cached_secret
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: failed to fetch secret: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _generate_totp(server_time_s: int, secret: str) -> str:
|
||||
"""Generate a 6-digit TOTP code compatible with Spotify's auth.
|
||||
|
||||
Uses HMAC-SHA1 with a 30-second period, matching the Go reference.
|
||||
"""
|
||||
counter = server_time_s // 30
|
||||
counter_bytes = struct.pack(">Q", counter)
|
||||
|
||||
mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
|
||||
|
||||
offset = mac[-1] & 0x0F
|
||||
binary_code = (
|
||||
(mac[offset] & 0x7F) << 24
|
||||
| (mac[offset + 1] & 0xFF) << 16
|
||||
| (mac[offset + 2] & 0xFF) << 8
|
||||
| (mac[offset + 3] & 0xFF)
|
||||
)
|
||||
|
||||
code = binary_code % (10**6)
|
||||
return str(code).zfill(6)
|
||||
|
||||
def _load_cached_token(self) -> Optional[str]:
|
||||
"""Try to load a valid token from the persistent cache file."""
|
||||
try:
|
||||
with open(SPOTIFY_TOKEN_CACHE_FILE, "r") as f:
|
||||
data = json.load(f)
|
||||
expires_ms = data.get("accessTokenExpirationTimestampMs", 0)
|
||||
if expires_ms <= int(time.time() * 1000):
|
||||
logger.debug("Spotify: persisted token expired")
|
||||
return None
|
||||
token = data.get("accessToken", "")
|
||||
if not token:
|
||||
return None
|
||||
self._cached_token = token
|
||||
self._token_expires_at = expires_ms / 1000.0
|
||||
logger.debug("Spotify: loaded token from cache file")
|
||||
return token
|
||||
except (FileNotFoundError, json.JSONDecodeError, KeyError):
|
||||
return None
|
||||
|
||||
def _save_token(self, body: dict) -> None:
|
||||
"""Persist the token response to disk."""
|
||||
try:
|
||||
with open(SPOTIFY_TOKEN_CACHE_FILE, "w") as f:
|
||||
json.dump(body, f)
|
||||
logger.debug("Spotify: token saved to cache file")
|
||||
except Exception as e:
|
||||
logger.warning(f"Spotify: failed to write token cache: {e}")
|
||||
|
||||
def _get_token(self) -> Optional[str]:
|
||||
"""Obtain a Spotify access token. Cached in memory and on disk.
|
||||
|
||||
Requires SP_DC cookie (set via SPOTIFY_SP_DC env var).
|
||||
"""
|
||||
# 1. Memory cache
|
||||
if self._cached_token and time.time() < self._token_expires_at - 30:
|
||||
logger.debug("Spotify: using in-memory cached token")
|
||||
return self._cached_token
|
||||
|
||||
# 2. Disk cache
|
||||
disk_token = self._load_cached_token()
|
||||
if disk_token and time.time() < self._token_expires_at - 30:
|
||||
return disk_token
|
||||
|
||||
# 3. Fetch new token
|
||||
if not SPOTIFY_SP_DC:
|
||||
logger.error(
|
||||
"Spotify: SPOTIFY_SP_DC env var not set — "
|
||||
"cannot authenticate with Spotify"
|
||||
)
|
||||
return None
|
||||
|
||||
headers = {
|
||||
"User-Agent": UA_BROWSER,
|
||||
"Accept": "*/*",
|
||||
"Referer": "https://open.spotify.com/",
|
||||
"Cookie": f"sp_dc={SPOTIFY_SP_DC}",
|
||||
}
|
||||
|
||||
with httpx.Client(headers=headers) as client:
|
||||
server_time = self._get_server_time(client)
|
||||
if server_time is None:
|
||||
return None
|
||||
|
||||
secret_data = self._get_secret(client)
|
||||
if secret_data is None:
|
||||
return None
|
||||
|
||||
secret, version = secret_data
|
||||
totp = self._generate_totp(server_time, secret)
|
||||
logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
|
||||
|
||||
params = {
|
||||
"reason": "init",
|
||||
"productType": "web-player",
|
||||
"totp": totp,
|
||||
"totpVer": str(version),
|
||||
"totpServer": totp,
|
||||
}
|
||||
|
||||
try:
|
||||
res = client.get(SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT)
|
||||
if res.status_code != 200:
|
||||
logger.error(f"Spotify: token request returned {res.status_code}")
|
||||
return None
|
||||
|
||||
body = res.json()
|
||||
|
||||
if not isinstance(body, dict) or "accessToken" not in body:
|
||||
logger.error(
|
||||
f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}"
|
||||
)
|
||||
return None
|
||||
|
||||
token = body["accessToken"]
|
||||
is_anonymous = body.get("isAnonymous", False)
|
||||
if is_anonymous:
|
||||
logger.warning(
|
||||
"Spotify: received anonymous token — SP_DC may be invalid"
|
||||
)
|
||||
|
||||
expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
|
||||
if expires_ms and expires_ms > int(time.time() * 1000):
|
||||
self._token_expires_at = expires_ms / 1000.0
|
||||
else:
|
||||
logger.warning("Spotify: token expiry missing or invalid")
|
||||
self._token_expires_at = time.time() + 3600
|
||||
|
||||
self._cached_token = token
|
||||
# Persist to disk (including anonymous tokens, same as Go ref)
|
||||
self._save_token(body)
|
||||
logger.debug("Spotify: obtained access token")
|
||||
return token
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: token request failed: {e}")
|
||||
return None
|
||||
|
||||
# ─── Lyrics ──────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _format_lrc_line(start_ms: int, words: str) -> str:
|
||||
"""Format a single lyric line as LRC ``[mm:ss.cc]text``."""
|
||||
minutes = start_ms // 60000
|
||||
seconds = (start_ms // 1000) % 60
|
||||
centiseconds = round((start_ms % 1000) / 10.0)
|
||||
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
|
||||
|
||||
@staticmethod
|
||||
def _is_truly_synced(lines: list[dict]) -> bool:
|
||||
"""Check if lyrics are actually synced (not all timestamps zero)."""
|
||||
for line in lines:
|
||||
try:
|
||||
ms = int(line.get("startTimeMs", "0"))
|
||||
if ms > 0:
|
||||
return True
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
return False
|
||||
|
||||
def fetch(
|
||||
self, track: TrackMeta, bypass_cache: bool = False
|
||||
) -> Optional[LyricResult]:
|
||||
"""Fetch lyrics for a Spotify track by its track ID."""
|
||||
if not track.trackid:
|
||||
logger.debug("Spotify: skipped — no trackid in metadata")
|
||||
return None
|
||||
|
||||
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
|
||||
|
||||
token = self._get_token()
|
||||
if not token:
|
||||
logger.error("Spotify: cannot fetch lyrics without a token")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
|
||||
url = f"{SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token"
|
||||
headers = {
|
||||
"User-Agent": UA_BROWSER,
|
||||
"Accept": "application/json",
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Referer": "https://open.spotify.com/",
|
||||
"App-Platform": "WebPlayer",
|
||||
"Spotify-App-Version": SPOTIFY_APP_VERSION,
|
||||
"Origin": "https://open.spotify.com",
|
||||
}
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=HTTP_TIMEOUT) as client:
|
||||
res = client.get(url, headers=headers)
|
||||
|
||||
if res.status_code == 404:
|
||||
logger.debug(f"Spotify: 404 for trackid={track.trackid}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
if res.status_code != 200:
|
||||
logger.error(f"Spotify: lyrics API returned {res.status_code}")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
data = res.json()
|
||||
|
||||
# Validate response structure
|
||||
if not isinstance(data, dict) or "lyrics" not in data:
|
||||
logger.error("Spotify: unexpected lyrics response structure")
|
||||
return LyricResult(
|
||||
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
|
||||
)
|
||||
|
||||
lyrics_data = data["lyrics"]
|
||||
sync_type = lyrics_data.get("syncType", "")
|
||||
lines = lyrics_data.get("lines", [])
|
||||
|
||||
if not isinstance(lines, list) or len(lines) == 0:
|
||||
logger.debug("Spotify: response contained no lyric lines")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Determine sync status
|
||||
# syncType == "LINE_SYNCED" AND at least one non-zero timestamp
|
||||
is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
|
||||
|
||||
# Convert to LRC
|
||||
lrc_lines: list[str] = []
|
||||
for line in lines:
|
||||
words = line.get("words", "")
|
||||
if not isinstance(words, str):
|
||||
continue
|
||||
try:
|
||||
ms = int(line.get("startTimeMs", "0"))
|
||||
except (ValueError, TypeError):
|
||||
ms = 0
|
||||
|
||||
if is_synced:
|
||||
lrc_lines.append(self._format_lrc_line(ms, words))
|
||||
else:
|
||||
# Unsynced: emit with zero timestamps
|
||||
lrc_lines.append(f"[00:00.00]{words}")
|
||||
|
||||
content = normalize_tags("\n".join(lrc_lines))
|
||||
status = (
|
||||
CacheStatus.SUCCESS_SYNCED
|
||||
if is_synced
|
||||
else CacheStatus.SUCCESS_UNSYNCED
|
||||
)
|
||||
|
||||
logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)")
|
||||
return LyricResult(status=status, lyrics=content, source=self.source_name)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Spotify: lyrics fetch failed: {e}")
|
||||
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
|
||||
+178
@@ -0,0 +1,178 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 21:54:01
|
||||
Description: Shared LRC time-tag utilities (definitely overengineered)
|
||||
"""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from urllib.parse import unquote
|
||||
|
||||
from .models import CacheStatus
|
||||
|
||||
# Parses any time tag input format:
|
||||
# [mm:ss], [mm:ss.c], [mm:ss.cc], [mm:ss.ccc], [mm:ss:cc], …
|
||||
_RAW_TAG_RE = re.compile(r"\[(\d{2,}):(\d{2})(?:[.:](\d{1,3}))?\]")
|
||||
|
||||
# Standard format after normalization: [mm:ss.cc]
|
||||
_STD_TAG_RE = re.compile(r"\[\d{2,}:\d{2}\.\d{2}\]")
|
||||
|
||||
# Standard format with capture groups
|
||||
_STD_TAG_CAPTURE_RE = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2})\]")
|
||||
|
||||
# Matches a standard time tag at the start of a line
|
||||
_LRC_LINE_RE = re.compile(r"^\[\d{2,}:\d{2}\.\d{2}\]", re.MULTILINE)
|
||||
|
||||
# [offset:+/-xxx] tag — value in milliseconds
|
||||
_OFFSET_RE = re.compile(r"^\[offset:\s*([+-]?\d+)\]\s*$", re.MULTILINE | re.IGNORECASE)
|
||||
|
||||
|
||||
def _raw_tag_to_cs(mm: str, ss: str, frac: Optional[str]) -> str:
|
||||
"""Convert parsed time tag components to standard [mm:ss.cc] string."""
|
||||
if frac is None:
|
||||
ms = 0
|
||||
else:
|
||||
# cc in [mm:ss:cc] is also treated as centiseconds, per LRC spec
|
||||
# ^
|
||||
# why does this format even exist, idk
|
||||
n = len(frac)
|
||||
if n == 1:
|
||||
ms = int(frac) * 100
|
||||
elif n == 2:
|
||||
ms = int(frac) * 10
|
||||
else:
|
||||
ms = int(frac)
|
||||
cs = min(round(ms / 10), 99)
|
||||
return f"[{mm}:{ss}.{cs:02d}]"
|
||||
|
||||
|
||||
def _reformat(text: str) -> str:
|
||||
"""Parse each line and reformat to standard [mm:ss.cc]...content form.
|
||||
|
||||
Handles any mix of time tag formats on input. Lines with no time tags
|
||||
are stripped of leading/trailing whitespace and passed through unchanged.
|
||||
"""
|
||||
out: list[str] = []
|
||||
for line in text.splitlines():
|
||||
line = line.strip()
|
||||
pos = 0
|
||||
tags: list[str] = []
|
||||
while True:
|
||||
while pos < len(line) and line[pos] == " ":
|
||||
pos += 1
|
||||
m = _RAW_TAG_RE.match(line, pos)
|
||||
# Non-time tags are passed through as-is, except for leading/trailing whitespace which is stripped.
|
||||
if not m:
|
||||
# No more tags on this line
|
||||
break
|
||||
tags.append(_raw_tag_to_cs(m.group(1), m.group(2), m.group(3)))
|
||||
pos = m.end()
|
||||
if tags:
|
||||
# This could break lyric lines of some kind of word-synced LRC format,
|
||||
# but such format were not planned to be supported in the first place, so…
|
||||
out.append("".join(tags) + line[pos:].lstrip())
|
||||
else:
|
||||
out.append(line)
|
||||
# Empty lines with no tags are also preserved
|
||||
return "\n".join(out)
|
||||
|
||||
|
||||
def _apply_offset(text: str) -> str:
|
||||
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
|
||||
|
||||
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
|
||||
"""
|
||||
m = _OFFSET_RE.search(text)
|
||||
if not m:
|
||||
return text
|
||||
offset_ms = int(m.group(1))
|
||||
text = _OFFSET_RE.sub("", text).strip("\n")
|
||||
if offset_ms == 0:
|
||||
return text
|
||||
|
||||
def _shift(match: re.Match) -> str:
|
||||
total_ms = max(
|
||||
0,
|
||||
(int(match.group(1)) * 60 + int(match.group(2))) * 1000
|
||||
+ int(match.group(3)) * 10
|
||||
- offset_ms,
|
||||
)
|
||||
new_mm = total_ms // 60000
|
||||
new_ss = (total_ms % 60000) // 1000
|
||||
new_cs = min(round((total_ms % 1000) / 10), 99)
|
||||
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
|
||||
|
||||
return _STD_TAG_CAPTURE_RE.sub(_shift, text)
|
||||
|
||||
|
||||
def normalize_tags(text: str) -> str:
|
||||
"""Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset."""
|
||||
return _apply_offset(_reformat(text))
|
||||
|
||||
|
||||
def is_synced(text: str) -> bool:
|
||||
"""Check whether text contains non-zero LRC time tags.
|
||||
|
||||
Assumes text has been normalized by normalize_tags (standard [mm:ss.cc] format).
|
||||
"""
|
||||
tags = _STD_TAG_RE.findall(text)
|
||||
return bool(tags) and any(tag != "[00:00.00]" for tag in tags)
|
||||
|
||||
|
||||
def detect_sync_status(text: str) -> CacheStatus:
|
||||
"""Determine whether lyrics contain meaningful LRC time tags.
|
||||
|
||||
Assumes text has been normalized by normalize_tags.
|
||||
"""
|
||||
return (
|
||||
CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED
|
||||
)
|
||||
|
||||
|
||||
def normalize_unsynced(lyrics: str) -> str:
|
||||
"""Normalize unsynced lyrics so every line has a [00:00.00] tag.
|
||||
|
||||
- Lines that already have time tags: replace with [00:00.00]
|
||||
- Lines without time tags: prepend [00:00.00]
|
||||
- Blank lines are converted to [00:00.00]
|
||||
"""
|
||||
out: list[str] = []
|
||||
for line in lyrics.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
out.append("[00:00.00]")
|
||||
continue
|
||||
cleaned = _LRC_LINE_RE.sub("", stripped)
|
||||
while _LRC_LINE_RE.match(cleaned):
|
||||
cleaned = _LRC_LINE_RE.sub("", cleaned)
|
||||
out.append(f"[00:00.00]{cleaned}")
|
||||
return "\n".join(out)
|
||||
|
||||
|
||||
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
|
||||
"""Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist."""
|
||||
if not audio_url.startswith("file://"):
|
||||
return None
|
||||
file_path = unquote(audio_url.replace("file://", "", 1))
|
||||
path = Path(file_path)
|
||||
if ensure_exists and not path.exists():
|
||||
return None
|
||||
return path
|
||||
|
||||
|
||||
def get_sidecar_path(
|
||||
audio_url: str, ensure_audio_exists: bool = False, ensure_exists: bool = False
|
||||
) -> Optional[Path]:
|
||||
"""Given a file:// URL, return the corresponding .lrc sidecar path.
|
||||
|
||||
If ensure_audio_exists is True, return None if the audio file does not exist.
|
||||
If ensure_exists is True, return None if the .lrc file does not exist.
|
||||
"""
|
||||
audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists)
|
||||
if not audio_path:
|
||||
return None
|
||||
lrc_path = audio_path.with_suffix(".lrc")
|
||||
if ensure_exists and not lrc_path.exists():
|
||||
return None
|
||||
return lrc_path
|
||||
@@ -0,0 +1,59 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 04:09:36
|
||||
Description: Data models
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
class CacheStatus(str, Enum):
|
||||
"""Status of a cached lyric entry."""
|
||||
|
||||
SUCCESS_SYNCED = "SUCCESS_SYNCED"
|
||||
SUCCESS_UNSYNCED = "SUCCESS_UNSYNCED"
|
||||
NOT_FOUND = "NOT_FOUND"
|
||||
NETWORK_ERROR = "NETWORK_ERROR"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TrackMeta:
|
||||
"""Metadata describing a track obtained from MPRIS or manual input."""
|
||||
|
||||
trackid: Optional[str] = None # Spotify track ID (without "spotify:track:" prefix)
|
||||
length: Optional[int] = None # Duration in milliseconds
|
||||
album: Optional[str] = None
|
||||
artist: Optional[str] = None
|
||||
title: Optional[str] = None
|
||||
url: Optional[str] = None # Playback URL (file:// for local files)
|
||||
|
||||
@property
|
||||
def is_local(self) -> bool:
|
||||
"""True when the track is a local file (file:// URL)."""
|
||||
return bool(self.url and self.url.startswith("file://"))
|
||||
|
||||
@property
|
||||
def is_complete(self) -> bool:
|
||||
"""True when all fields required by LRCLIB are present."""
|
||||
return all([self.length, self.album, self.title, self.artist])
|
||||
|
||||
def display_name(self) -> str:
|
||||
"""Human-readable representation for logging."""
|
||||
parts = []
|
||||
if self.artist:
|
||||
parts.append(self.artist)
|
||||
if self.title:
|
||||
parts.append(self.title)
|
||||
return " - ".join(parts) if parts else self.trackid or self.url or "(unknown)"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LyricResult:
|
||||
"""Result of a lyric fetch attempt, also used as cache record."""
|
||||
|
||||
status: CacheStatus
|
||||
lyrics: Optional[str] = None
|
||||
source: Optional[str] = None # Which fetcher produced this result
|
||||
ttl: Optional[int] = None # Hint for cache TTL (seconds)
|
||||
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
Author: Uyanide pywang0608@foxmail.com
|
||||
Date: 2026-03-25 04:44:15
|
||||
Description: MPRIS integration for fetching track metadata
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from dbus_next.aio.message_bus import MessageBus
|
||||
from dbus_next.constants import BusType
|
||||
from dbus_next.message import Message
|
||||
from lrx_cli.models import TrackMeta
|
||||
from lrx_cli.config import PREFERRED_PLAYER
|
||||
from loguru import logger
|
||||
from typing import Optional, List, Any
|
||||
|
||||
|
||||
async def _list_mpris_players(bus: MessageBus) -> List[str]:
|
||||
"""List all MPRIS player bus names."""
|
||||
try:
|
||||
reply = await bus.call(
|
||||
Message(
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="ListNames",
|
||||
)
|
||||
)
|
||||
if not reply or not reply.body:
|
||||
return []
|
||||
return [
|
||||
name for name in reply.body[0] if name.startswith("org.mpris.MediaPlayer2.")
|
||||
]
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list DBus names: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def _get_playback_status(bus: MessageBus, player_name: str) -> Optional[str]:
|
||||
"""Get PlaybackStatus ('Playing', 'Paused', 'Stopped') for a player."""
|
||||
try:
|
||||
introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
|
||||
proxy = bus.get_proxy_object(
|
||||
player_name, "/org/mpris/MediaPlayer2", introspection
|
||||
)
|
||||
props = proxy.get_interface("org.freedesktop.DBus.Properties")
|
||||
status_var = await getattr(props, "call_get")(
|
||||
"org.mpris.MediaPlayer2.Player", "PlaybackStatus"
|
||||
)
|
||||
return status_var.value if status_var else None
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not get playback status for {player_name}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def _select_player(
|
||||
bus: MessageBus, specific_player: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Select the best MPRIS player.
|
||||
|
||||
When specific_player is given, filter by name match.
|
||||
Otherwise: prefer the currently playing player. If multiple are playing,
|
||||
prefer the one matching PREFERRED_PLAYER env var (default: spotify).
|
||||
"""
|
||||
players = await _list_mpris_players(bus)
|
||||
if not players:
|
||||
return None
|
||||
|
||||
if specific_player:
|
||||
players = [p for p in players if specific_player.lower() in p.lower()]
|
||||
return players[0] if players else None
|
||||
|
||||
# Check playback status for each player
|
||||
playing = []
|
||||
for p in players:
|
||||
status = await _get_playback_status(bus, p)
|
||||
logger.debug(f"Player {p}: {status}")
|
||||
if status == "Playing":
|
||||
playing.append(p)
|
||||
|
||||
candidates = playing if playing else players
|
||||
|
||||
if len(candidates) == 1:
|
||||
return candidates[0]
|
||||
|
||||
# Multiple candidates: prefer PREFERRED_PLAYER
|
||||
preferred = PREFERRED_PLAYER.lower()
|
||||
if preferred:
|
||||
for p in candidates:
|
||||
if preferred in p.lower():
|
||||
return p
|
||||
return candidates[0]
|
||||
|
||||
|
||||
async def _fetch_metadata_dbus(
|
||||
specific_player: Optional[str] = None,
|
||||
) -> Optional[TrackMeta]:
|
||||
bus = None
|
||||
try:
|
||||
bus = await MessageBus(bus_type=BusType.SESSION).connect()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to DBus: {e}")
|
||||
return None
|
||||
|
||||
try:
|
||||
player_name = await _select_player(bus, specific_player)
|
||||
if not player_name:
|
||||
logger.debug(
|
||||
f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}."
|
||||
)
|
||||
return None
|
||||
|
||||
logger.debug(f"Using player: {player_name}")
|
||||
|
||||
introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
|
||||
proxy = bus.get_proxy_object(
|
||||
player_name, "/org/mpris/MediaPlayer2", introspection
|
||||
)
|
||||
|
||||
props_iface = proxy.get_interface("org.freedesktop.DBus.Properties")
|
||||
if not props_iface:
|
||||
logger.error(f"Player {player_name} doesn't support Properties interface.")
|
||||
return None
|
||||
|
||||
try:
|
||||
metadata_var: Any = await getattr(props_iface, "call_get")(
|
||||
"org.mpris.MediaPlayer2.Player", "Metadata"
|
||||
)
|
||||
if not metadata_var:
|
||||
logger.error("Empty metadata received.")
|
||||
return None
|
||||
|
||||
metadata = metadata_var.value
|
||||
|
||||
# Extract trackid — MPRIS returns either "spotify:track:ID"
|
||||
# or a DBus object path like "/com/spotify/track/ID"
|
||||
trackid = metadata.get("mpris:trackid", None)
|
||||
if trackid:
|
||||
trackid = trackid.value
|
||||
if isinstance(trackid, str):
|
||||
if trackid.startswith("spotify:track:"):
|
||||
trackid = trackid.removeprefix("spotify:track:")
|
||||
elif trackid.startswith("/com/spotify/track/"):
|
||||
trackid = trackid.removeprefix("/com/spotify/track/")
|
||||
|
||||
# Extract length (usually microseconds)
|
||||
length = metadata.get("mpris:length", None)
|
||||
if length:
|
||||
length = length.value // 1000 if isinstance(length.value, int) else None
|
||||
|
||||
album = metadata.get("xesam:album", None)
|
||||
album = album.value if album else None
|
||||
|
||||
artist = metadata.get("xesam:artist", None)
|
||||
artist = (
|
||||
artist.value[0]
|
||||
if artist and isinstance(artist.value, list) and artist.value
|
||||
else None
|
||||
)
|
||||
|
||||
title = metadata.get("xesam:title", None)
|
||||
title = title.value if title else None
|
||||
|
||||
url = metadata.get("xesam:url", None)
|
||||
url = url.value if url else None
|
||||
|
||||
return TrackMeta(
|
||||
trackid=trackid,
|
||||
length=length,
|
||||
album=album,
|
||||
artist=artist,
|
||||
title=title,
|
||||
url=url,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get properties from {player_name}: {e}")
|
||||
return None
|
||||
|
||||
finally:
|
||||
if bus:
|
||||
bus.disconnect()
|
||||
|
||||
|
||||
def get_current_track(player_name: Optional[str] = None) -> Optional[TrackMeta]:
|
||||
try:
|
||||
return asyncio.run(_fetch_metadata_dbus(player_name))
|
||||
except Exception as e:
|
||||
logger.error(f"DBus async loop failed: {e}")
|
||||
return None
|
||||
Reference in New Issue
Block a user