chore: switch to src layout

This commit is contained in:
2026-04-06 09:15:07 +02:00
parent c5abbff14c
commit 69b7f5c60c
35 changed files with 4 additions and 7 deletions
View File
+10
View File
@@ -0,0 +1,10 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-06 08:19:54
Description: The entry point.
"""
from .cli import run
if __name__ == "__main__":
run()
+30
View File
@@ -0,0 +1,30 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-06 08:21:01
Description: Credential authenticators for third-party provider APIs
"""
from lrx_cli.authenticators.qqmusic import QQMusicAuthenticator
from .base import BaseAuthenticator
from .spotify import SpotifyAuthenticator
from .musixmatch import MusixmatchAuthenticator
from .dummy import DummyAuthenticator
__all__ = [
"BaseAuthenticator",
"SpotifyAuthenticator",
"MusixmatchAuthenticator",
"QQMusicAuthenticator",
"DummyAuthenticator",
]
def create_authenticators(cache) -> dict[str, BaseAuthenticator]:
"""Factory function to create authenticators with cache access."""
return {
"dummy": DummyAuthenticator(),
"spotify": SpotifyAuthenticator(cache),
"musixmatch": MusixmatchAuthenticator(cache),
"qqmusic": QQMusicAuthenticator(),
}
+32
View File
@@ -0,0 +1,32 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:18:14
Description: Base class for credential authenticators.
"""
from abc import ABC, abstractmethod
from typing import Optional
class BaseAuthenticator(ABC):
"""Manages obtaining, caching, and refreshing a credential for one provider."""
@property
@abstractmethod
def name(self) -> str: ...
def is_configured(self) -> bool:
"""True if the prerequisite config (e.g. env var) is present.
Default is True — authenticators that can obtain credentials anonymously
should not override this.
"""
return True
@abstractmethod
async def authenticate(self) -> Optional[str]:
"""Return current valid credential string, refreshing if needed.
Returns None if unavailable (misconfigured or network failure).
"""
...
+19
View File
@@ -0,0 +1,19 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:36:44
Description: A dummy authenticator that does nothing and always reports as configured.
"""
from .base import BaseAuthenticator
class DummyAuthenticator(BaseAuthenticator):
@property
def name(self) -> str:
return "dummy"
def is_configured(self) -> bool:
return True
async def authenticate(self) -> None:
return None
+157
View File
@@ -0,0 +1,157 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:27:56
Description: Musixmatch authenticator — token management, 401 retry, and cooldown.
"""
import time
from typing import Optional
from urllib.parse import urlencode
import httpx
from loguru import logger
from .base import BaseAuthenticator
from ..cache import CacheEngine
from ..config import HTTP_TIMEOUT, MUSIXMATCH_COOLDOWN_MS, credentials
_MUSIXMATCH_TOKEN_URL = "https://apic-desktop.musixmatch.com/ws/1.1/token.get"
_MXM_HEADERS = {"Cookie": "x-mxm-token-guid="}
_MXM_BASE_PARAMS = {
"format": "json",
"app_id": "web-desktop-app-v1.0",
}
class MusixmatchAuthenticator(BaseAuthenticator):
def __init__(self, cache: CacheEngine) -> None:
self._cache = cache
self._cached_token: Optional[str] = None
self._cooldown_until_ms: int = 0
@property
def name(self) -> str:
return "musixmatch"
def is_configured(self) -> bool:
return True # anonymous token always available
def is_cooldown(self) -> bool:
"""Return True if Musixmatch requests are blocked due to repeated auth failure."""
now_ms = int(time.time() * 1000)
if self._cooldown_until_ms > now_ms:
return True
data = self._cache.get_credential("musixmatch_cooldown")
if data:
until = data.get("until_ms", 0)
if until > now_ms:
self._cooldown_until_ms = until
return True
return False
def _set_cooldown(self) -> None:
now_ms = int(time.time() * 1000)
until_ms = now_ms + MUSIXMATCH_COOLDOWN_MS
self._cooldown_until_ms = until_ms
self._cache.set_credential(
"musixmatch_cooldown",
{"until_ms": until_ms},
expires_at_ms=until_ms,
)
logger.warning("Musixmatch: token unavailable, entering cooldown")
def _invalidate_token(self) -> None:
"""Discard the current token from memory and DB."""
self._cached_token = None
# Store with an already-expired timestamp so get_credential returns None
self._cache.set_credential("musixmatch", {"token": ""}, expires_at_ms=1)
async def _fetch_new_token(self) -> Optional[str]:
"""Call token.get and persist the result. Returns token string or None."""
params = {
**_MXM_BASE_PARAMS,
"user_language": "en",
"t": str(int(time.time() * 1000)),
}
url = f"{_MUSIXMATCH_TOKEN_URL}?{urlencode(params)}"
logger.debug("Musixmatch: fetching anonymous token")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning(f"Musixmatch: token fetch failed: {e}")
return None
token = (
data.get("message", {}).get("body", {}).get("user_token")
if isinstance(data, dict)
else None
)
if not isinstance(token, str) or not token:
logger.warning("Musixmatch: unexpected token.get response structure")
return None
self._cached_token = token
# No expiry — token is valid until we get a 401
self._cache.set_credential("musixmatch", {"token": token}, expires_at_ms=None)
logger.debug("Musixmatch: obtained anonymous token")
return token
async def _get_token(self) -> Optional[str]:
"""Return a valid token: env var > memory > DB > fresh fetch."""
if credentials.MUSIXMATCH_USERTOKEN:
return credentials.MUSIXMATCH_USERTOKEN
if self._cached_token:
return self._cached_token
data = self._cache.get_credential("musixmatch")
if data and isinstance(data.get("token"), str) and data["token"]:
self._cached_token = data["token"]
return self._cached_token
return await self._fetch_new_token()
async def authenticate(self) -> Optional[str]:
if self.is_cooldown():
logger.debug("Musixmatch: authenticate called during cooldown")
return None
return await self._get_token()
async def get_json(self, url_base: str, params: dict) -> Optional[dict]:
"""Authenticated GET to a Musixmatch endpoint.
- Injects format, app_id, and usertoken automatically.
- On 401: invalidates token, fetches a fresh one, retries once.
- On failed token fetch (initial or retry): sets cooldown, returns None.
- On network / HTTP error: raises (callers map this to NETWORK_ERROR).
- Returns None if cooldown is active.
"""
if self.is_cooldown():
logger.debug("Musixmatch: request blocked by cooldown")
return None
token = await self._get_token()
if not token:
self._set_cooldown()
return None
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
url = f"{url_base}?{urlencode({**_MXM_BASE_PARAMS, **params, 'usertoken': token})}"
resp = await client.get(url, headers=_MXM_HEADERS)
if resp.status_code == 401:
logger.debug("Musixmatch: 401 received, refreshing token")
self._invalidate_token()
token = await self._fetch_new_token()
if not token:
self._set_cooldown()
return None
url = f"{url_base}?{urlencode({**_MXM_BASE_PARAMS, **params, 'usertoken': token})}"
resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status()
return resp.json()
+25
View File
@@ -0,0 +1,25 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:47:30
Description: QQ Music API authenticator - currently only a proxy.
"""
from typing import Optional
from .base import BaseAuthenticator
from ..config import credentials
class QQMusicAuthenticator(BaseAuthenticator):
def __init__(self) -> None:
pass
@property
def name(self) -> str:
return "qqmusic"
def is_configured(self) -> bool:
return bool(credentials.QQ_MUSIC_API_URL)
async def authenticate(self) -> Optional[str]:
return credentials.QQ_MUSIC_API_URL
+202
View File
@@ -0,0 +1,202 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:18:14
Description: Spotify authenticator — TOTP-based access token via SP_DC cookie.
"""
import hashlib
import hmac
import struct
import time
from typing import Optional, Tuple
import httpx
from loguru import logger
from .base import BaseAuthenticator
from ..cache import CacheEngine
from ..config import HTTP_TIMEOUT, UA_BROWSER, credentials
_SPOTIFY_TOKEN_URL = "https://open.spotify.com/api/token"
_SPOTIFY_SERVER_TIME_URL = "https://open.spotify.com/api/server-time"
_SPOTIFY_SECRET_URL = (
"https://raw.githubusercontent.com/xyloflake/spot-secrets-go"
"/refs/heads/main/secrets/secrets.json"
)
SPOTIFY_BASE_HEADERS = {
"User-Agent": UA_BROWSER,
"Referer": "https://open.spotify.com/",
"Origin": "https://open.spotify.com",
"App-Platform": "WebPlayer",
"Spotify-App-Version": "1.2.88.21.g8e037c8f",
}
class SpotifyAuthenticator(BaseAuthenticator):
def __init__(self, cache: CacheEngine) -> None:
self._cache = cache
self._cached_secret: Optional[Tuple[str, int]] = None
self._cached_token: Optional[str] = None
self._token_expires_at: float = 0.0
@property
def name(self) -> str:
return "spotify"
def is_configured(self) -> bool:
return bool(credentials.SPOTIFY_SP_DC)
@staticmethod
def _generate_totp(server_time_s: int, secret: str) -> str:
counter = server_time_s // 30
counter_bytes = struct.pack(">Q", counter)
mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
offset = mac[-1] & 0x0F
binary_code = (
(mac[offset] & 0x7F) << 24
| (mac[offset + 1] & 0xFF) << 16
| (mac[offset + 2] & 0xFF) << 8
| (mac[offset + 3] & 0xFF)
)
return str(binary_code % (10**6)).zfill(6)
def _load_cached_token(self) -> Optional[str]:
data = self._cache.get_credential("spotify")
if not data:
return None
expires_ms = data.get("accessTokenExpirationTimestampMs", 0)
if expires_ms <= int(time.time() * 1000):
logger.debug("Spotify: persisted token expired")
return None
token = data.get("accessToken", "")
if not token:
return None
self._cached_token = token
self._token_expires_at = expires_ms / 1000.0
logger.debug("Spotify: loaded token from DB cache")
return token
def _save_token(self, body: dict) -> None:
expires_ms = body.get("accessTokenExpirationTimestampMs")
self._cache.set_credential("spotify", body, expires_ms)
logger.debug("Spotify: token saved to DB cache")
async def _get_server_time(self, client: httpx.AsyncClient) -> Optional[int]:
try:
res = await client.get(_SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, dict) or "serverTime" not in data:
logger.error(f"Spotify: unexpected server-time response: {data}")
return None
server_time = data["serverTime"]
logger.debug(f"Spotify: server time = {server_time}")
return server_time
except Exception as e:
logger.error(f"Spotify: failed to fetch server time: {e}")
return None
async def _get_secret(self, client: httpx.AsyncClient) -> Optional[Tuple[str, int]]:
if self._cached_secret is not None:
logger.debug("Spotify: using cached TOTP secret")
return self._cached_secret
try:
res = await client.get(_SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, list) or len(data) == 0:
logger.error(
f"Spotify: unexpected secrets response (type={type(data).__name__})"
)
return None
last = data[-1]
if "secret" not in last or "version" not in last:
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
return None
secret_raw = last["secret"]
version = last["version"]
secret = "".join(
str(ord(c) ^ ((i % 33) + 9)) for i, c in enumerate(secret_raw)
)
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
self._cached_secret = (secret, version)
return self._cached_secret
except Exception as e:
logger.error(f"Spotify: failed to fetch secret: {e}")
return None
async def authenticate(self) -> Optional[str]:
if self._cached_token and time.time() < self._token_expires_at - 30:
logger.debug("Spotify: using in-memory cached token")
return self._cached_token
db_token = self._load_cached_token()
if db_token and time.time() < self._token_expires_at - 30:
return db_token
if not credentials.SPOTIFY_SP_DC:
logger.error("Spotify: SPOTIFY_SP_DC env var not set — cannot authenticate")
return None
headers = {
"Accept": "*/*",
"Cookie": f"sp_dc={credentials.SPOTIFY_SP_DC}",
**SPOTIFY_BASE_HEADERS,
}
async with httpx.AsyncClient(headers=headers) as client:
server_time = await self._get_server_time(client)
if server_time is None:
return None
secret_data = await self._get_secret(client)
if secret_data is None:
return None
secret, version = secret_data
totp = self._generate_totp(server_time, secret)
logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
params = {
"reason": "init",
"productType": "web-player",
"totp": totp,
"totpVer": str(version),
"totpServer": totp,
}
try:
res = await client.get(
_SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT
)
if res.status_code != 200:
logger.error(f"Spotify: token request returned {res.status_code}")
return None
body = res.json()
if not isinstance(body, dict) or "accessToken" not in body:
logger.error(
f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}"
)
return None
token = body["accessToken"]
if body.get("isAnonymous", False):
logger.warning(
"Spotify: received anonymous token — SP_DC may be invalid"
)
expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
if expires_ms and expires_ms > int(time.time() * 1000):
self._token_expires_at = expires_ms / 1000.0
else:
logger.warning("Spotify: token expiry missing or invalid")
self._token_expires_at = time.time() + 3600
self._cached_token = token
self._save_token(body)
logger.debug("Spotify: obtained access token")
return token
except Exception as e:
logger.error(f"Spotify: token request failed: {e}")
return None
+553
View File
@@ -0,0 +1,553 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:18:03
Description: SQLite-based lyric cache with per-source storage and TTL expiration.
"""
import json
import sqlite3
import hashlib
import time
from typing import Optional
from loguru import logger
from .lrc import LRCData
from .normalize import normalize_for_match as _normalize_for_match
from .config import (
DURATION_TOLERANCE_MS,
LEGACY_CONFIDENCE_SYNCED,
LEGACY_CONFIDENCE_UNSYNCED,
)
from .models import TrackMeta, LyricResult, CacheStatus
# Fixed WHERE clause for exact track matching. Column names are hardcoded
# literals; only the *values* come from user-supplied params — no injection risk.
_TRACK_WHERE = (
"(? IS NULL OR artist = ?) AND "
"(? IS NULL OR title = ?) AND "
"(? IS NULL OR album = ?)"
)
def _track_where_params(track: TrackMeta) -> list:
return [
track.artist,
track.artist,
track.title,
track.title,
track.album,
track.album,
]
def _generate_key(track: TrackMeta, source: str) -> str:
"""Generate a unique cache key from track metadata and source.
The key is scoped by source so that different fetchers can cache
independently for the same track (e.g. Spotify synced vs Netease unsynced).
"""
# Spotify tracks always use their track ID as the primary identifier
if track.trackid and source == "spotify":
return f"spotify:{track.trackid}"
parts = []
if track.artist:
parts.append(track.artist)
if track.title:
parts.append(track.title)
if track.album:
parts.append(track.album)
if track.length:
parts.append(str(track.length))
# Fall back to URL for local files
if not parts and track.url:
return f"{source}:url:{track.url}"
if not parts:
raise ValueError("Insufficient metadata to generate cache key")
raw = "|".join(parts)
digest = hashlib.sha256(raw.encode()).hexdigest()
return f"{source}:{digest}"
class CacheEngine:
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self) -> None:
"""Create or migrate the cache and credentials tables."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
key TEXT PRIMARY KEY,
source TEXT NOT NULL,
status TEXT NOT NULL,
lyrics TEXT,
created_at INTEGER NOT NULL,
expires_at INTEGER,
artist TEXT,
title TEXT,
album TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS credentials (
name TEXT PRIMARY KEY,
data TEXT NOT NULL,
expires_at INTEGER
)
""")
# Migrations
cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
if "length" not in cols:
conn.execute("ALTER TABLE cache ADD COLUMN length INTEGER")
if "confidence" not in cols:
conn.execute("ALTER TABLE cache ADD COLUMN confidence REAL")
conn.commit()
# Read
def get(self, track: TrackMeta, source: str) -> Optional[LyricResult]:
"""Look up a cached result for *track* from *source*.
Returns None on cache miss or expiration.
"""
try:
key = _generate_key(track, source)
except ValueError:
return None
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT status, lyrics, source, expires_at, length, confidence FROM cache WHERE key = ?",
(key,),
).fetchone()
if not row:
logger.debug(f"Cache miss: {source} / {track.display_name()}")
return None
status_str, lyrics, src, expires_at, cached_length, confidence = row
# Check TTL expiration
if expires_at and expires_at < int(time.time()):
logger.debug(f"Cache expired: {source} / {track.display_name()}")
conn.execute("DELETE FROM cache WHERE key = ?", (key,))
conn.commit()
return None
# Backfill length if the cached row is missing it
if cached_length is None and track.length is not None:
conn.execute(
"UPDATE cache SET length = ? WHERE key = ?",
(track.length, key),
)
conn.commit()
remaining = expires_at - int(time.time()) if expires_at else None
logger.debug(
f"Cache hit: {source} / {track.display_name()} "
f"[{status_str}, ttl={remaining}s]"
)
status = CacheStatus(status_str)
if confidence is None:
if status == CacheStatus.SUCCESS_SYNCED:
confidence = LEGACY_CONFIDENCE_SYNCED
elif status == CacheStatus.SUCCESS_UNSYNCED:
confidence = LEGACY_CONFIDENCE_UNSYNCED
else:
confidence = 0.0 # negative statuses: no confidence
return LyricResult(
status=status,
lyrics=LRCData(lyrics) if lyrics else None,
source=src,
ttl=remaining,
confidence=confidence,
)
def get_best(self, track: TrackMeta, sources: list[str]) -> Optional[LyricResult]:
"""Return the best cached result across *sources* by confidence.
Skips negative statuses (NOT_FOUND, NETWORK_ERROR) — those are only
consulted per-source to avoid redundant fetches.
"""
best: Optional[LyricResult] = None
for src in sources:
cached = self.get(track, src)
if not cached:
continue
if cached.status not in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
continue
if best is None:
best = cached
elif cached.confidence > best.confidence:
best = cached
elif (
cached.confidence == best.confidence
and cached.status == CacheStatus.SUCCESS_SYNCED
and best.status != CacheStatus.SUCCESS_SYNCED
):
best = cached
return best
# Write
def set(
self,
track: TrackMeta,
source: str,
result: LyricResult,
ttl_seconds: Optional[int] = None,
) -> None:
"""Store a lyric result in the cache."""
try:
key = _generate_key(track, source)
except ValueError:
logger.warning("Cannot cache: insufficient track metadata.")
return
now = int(time.time())
expires_at = now + ttl_seconds if ttl_seconds else None
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO cache
(key, source, status, lyrics, created_at, expires_at,
artist, title, album, length, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
key,
source,
result.status.value,
str(result.lyrics) if result.lyrics else None,
now,
expires_at,
track.artist,
track.title,
track.album,
track.length,
result.confidence,
),
)
conn.commit()
logger.debug(
f"Cached: {source} / {track.display_name()} "
f"[{result.status.value}, ttl={ttl_seconds}s]"
)
# Delete
def clear_all(self) -> None:
"""Remove every entry from the cache."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM cache")
conn.commit()
logger.info("Cache cleared.")
def clear_track(self, track: TrackMeta) -> None:
"""Remove all cached entries (every source) for a single track."""
if not self._track_has_meta(track):
logger.info(f"No cache entries found for {track.display_name()}.")
return
with sqlite3.connect(self.db_path) as conn:
cur = conn.execute(
f"DELETE FROM cache WHERE {_TRACK_WHERE}",
_track_where_params(track),
)
conn.commit()
if cur.rowcount:
logger.info(
f"Cleared {cur.rowcount} cache entries for {track.display_name()}."
)
else:
logger.info(f"No cache entries found for {track.display_name()}.")
def prune(self) -> int:
"""Remove all expired entries. Returns the number of rows deleted."""
with sqlite3.connect(self.db_path) as conn:
cur = conn.execute(
"DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
(int(time.time()),),
)
conn.commit()
count = cur.rowcount
logger.info(f"Pruned {count} expired cache entries.")
return count
@staticmethod
def _track_has_meta(track: TrackMeta) -> bool:
return bool(track.artist or track.title or track.album)
# Exact cross-source search
def find_best_positive(self, track: TrackMeta) -> Optional[LyricResult]:
"""Find the best positive (synced/unsynced) cache entry for *track*.
Uses exact metadata match (artist + title + album) across all sources.
Returns the highest-confidence entry, or None.
"""
if not self._track_has_meta(track):
return None
now = int(time.time())
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
f"SELECT status, lyrics, source, confidence FROM cache"
f" WHERE {_TRACK_WHERE}"
" AND status IN (?, ?)"
" AND (expires_at IS NULL OR expires_at > ?)"
" ORDER BY COALESCE(confidence,"
" CASE status WHEN ? THEN ? ELSE ? END"
" ) DESC,"
" CASE status WHEN ? THEN 0 ELSE 1 END,"
" created_at DESC LIMIT 1",
_track_where_params(track)
+ [
CacheStatus.SUCCESS_SYNCED.value,
CacheStatus.SUCCESS_UNSYNCED.value,
now,
CacheStatus.SUCCESS_SYNCED.value,
LEGACY_CONFIDENCE_SYNCED,
LEGACY_CONFIDENCE_UNSYNCED,
CacheStatus.SUCCESS_SYNCED.value,
],
).fetchall()
if not rows:
return None
row = dict(rows[0])
confidence = row["confidence"]
if confidence is None:
confidence = (
LEGACY_CONFIDENCE_SYNCED
if row["status"] == CacheStatus.SUCCESS_SYNCED.value
else LEGACY_CONFIDENCE_UNSYNCED
)
return LyricResult(
status=CacheStatus(row["status"]),
lyrics=LRCData(row["lyrics"]) if row["lyrics"] else None,
source="cache-search",
confidence=confidence,
)
# Fuzzy search
def search_by_meta(
self,
title: Optional[str],
length: Optional[int] = None,
) -> list[dict]:
"""Search cache for lyrics matching title with fuzzy normalization.
Artist is intentionally not filtered here — artist names can differ
significantly across languages (e.g. Japanese romanization vs. kanji),
making hard artist filtering unreliable for cross-language queries.
Ignores artist, album and source. Only returns positive results
(synced/unsynced) that have not expired. When *length* is provided,
filters by duration tolerance and sorts by closest match.
"""
if not title:
return []
now = int(time.time())
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""SELECT * FROM cache
WHERE status IN (?, ?)
AND (expires_at IS NULL OR expires_at > ?)""",
(
CacheStatus.SUCCESS_SYNCED.value,
CacheStatus.SUCCESS_UNSYNCED.value,
now,
),
).fetchall()
norm_title = _normalize_for_match(title)
matches: list[dict] = []
for row in rows:
row_dict = dict(row)
# Title must match
row_title = row_dict.get("title") or ""
if _normalize_for_match(row_title) != norm_title:
continue
matches.append(row_dict)
# Duration filtering
if length is not None and matches:
scored = []
for m in matches:
row_len = m.get("length")
if row_len is not None:
diff = abs(row_len - length)
if diff <= DURATION_TOLERANCE_MS:
scored.append((diff, m))
else:
# No duration info in cache — still a candidate but lower priority
scored.append((DURATION_TOLERANCE_MS, m))
scored.sort(
key=lambda x: (
x[0],
-(x[1].get("confidence") or 0),
x[1].get("status") != CacheStatus.SUCCESS_SYNCED.value,
-(x[1].get("created_at") or 0),
)
)
matches = [m for _, m in scored]
return matches
# Update
def update_confidence(
self,
track: TrackMeta,
confidence: float,
source: str,
) -> int:
"""Update confidence for a specific source's cache entry matching *track*.
Returns the number of rows updated.
"""
if not self._track_has_meta(track):
return 0
with sqlite3.connect(self.db_path) as conn:
cur = conn.execute(
f"UPDATE cache SET confidence = ? WHERE {_TRACK_WHERE} AND source = ?",
[confidence] + _track_where_params(track) + [source],
)
conn.commit()
return cur.rowcount
# Query / inspect
def query_track(self, track: TrackMeta) -> list[dict]:
"""Return all cached rows for a given track (across all sources)."""
if not self._track_has_meta(track):
return []
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
return [
dict(r)
for r in conn.execute(
f"SELECT * FROM cache WHERE {_TRACK_WHERE}",
_track_where_params(track),
).fetchall()
]
# Credentials
def get_credential(self, name: str) -> Optional[dict]:
"""Return cached credential data if present and not expired."""
now_ms = int(time.time() * 1000)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT data FROM credentials WHERE name = ? AND (expires_at IS NULL OR expires_at > ?)",
(name, now_ms),
).fetchone()
if row is None:
return None
try:
return json.loads(row["data"])
except (json.JSONDecodeError, KeyError):
return None
def set_credential(
self, name: str, data: dict, expires_at_ms: Optional[int] = None
) -> None:
"""Persist credential data, optionally with an expiry timestamp (Unix ms)."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT OR REPLACE INTO credentials (name, data, expires_at) VALUES (?, ?, ?)",
(name, json.dumps(data), expires_at_ms),
)
conn.commit()
def query_all(self) -> list[dict]:
"""Return every row in the cache table."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
return [dict(r) for r in conn.execute("SELECT * FROM cache").fetchall()]
def stats(self) -> dict:
"""Return aggregate cache statistics."""
now = int(time.time())
with sqlite3.connect(self.db_path) as conn:
total = conn.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
expired = conn.execute(
"SELECT COUNT(*) FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?",
(now,),
).fetchone()[0]
by_status = dict(
conn.execute(
"SELECT status, COUNT(*) FROM cache GROUP BY status"
).fetchall()
)
by_source = dict(
conn.execute(
"SELECT source, COUNT(*) FROM cache GROUP BY source"
).fetchall()
)
# Source × Status cross-tabulation
source_status = conn.execute(
"SELECT source, status, COUNT(*) FROM cache GROUP BY source, status"
).fetchall()
# Confidence buckets (only for positive statuses)
confidence_rows = conn.execute(
"SELECT confidence FROM cache WHERE status IN (?, ?)",
(
CacheStatus.SUCCESS_SYNCED.value,
CacheStatus.SUCCESS_UNSYNCED.value,
),
).fetchall()
# Build source×status table: {source: {status: count}}
source_status_table: dict[str, dict[str, int]] = {}
for src, status, count in source_status:
source_status_table.setdefault(src, {})[status] = count
# Build confidence buckets
buckets = {
"legacy (NULL)": 0,
"0-24": 0,
"25-49": 0,
"50-79": 0,
"80-99": 0,
"100": 0,
}
for (conf,) in confidence_rows:
if conf is None:
buckets["legacy (NULL)"] += 1
elif conf >= 100:
buckets["100"] += 1
elif conf >= 80:
buckets["80-99"] += 1
elif conf >= 50:
buckets["50-79"] += 1
elif conf >= 25:
buckets["25-49"] += 1
else:
buckets["0-24"] += 1
return {
"total": total,
"expired": expired,
"active": total - expired,
"by_status": by_status,
"by_source": by_source,
"source_status": source_status_table,
"confidence_buckets": buckets,
}
+534
View File
@@ -0,0 +1,534 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-26 02:04:39
Description: CLI interface.
"""
import sys
import time
import os
from pathlib import Path
from typing import Annotated
from urllib.parse import quote
import cyclopts
from loguru import logger
from .config import DB_PATH, enable_debug
from .models import TrackMeta, CacheStatus
from .mpris import get_current_track
from .core import LrcManager
from .fetchers import FetcherMethodType
from .lrc import get_sidecar_path
app = cyclopts.App(
help="LRX-CLI — Fetch line-synced lyrics for your music player.",
)
app.register_install_completion_command()
cache_app = cyclopts.App(name="cache", help="Manage the local SQLite cache.")
app.command(cache_app)
# Global state set by the meta launcher
_player: str | None = None
_db_path: str | None = None
# Will be initialized before any command runs, safe to set to None here
manager: LrcManager = None # type: ignore
@app.meta.default
def launcher(
*tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
debug: Annotated[
bool,
cyclopts.Parameter(
name=["--debug", "-d"], negative="", help="Enable debug logging."
),
] = False,
player: Annotated[
str | None,
cyclopts.Parameter(
name=["--player", "-p"],
help="Target a specific MPRIS player using its DBus name or a portion thereof.",
),
] = None,
db_path: Annotated[
str | None,
cyclopts.Parameter(
name=["--db-path", "-c"],
help=f"Custom path for the cache database file (default: {DB_PATH}).",
),
] = None,
):
global _player, _db_path
if debug:
enable_debug()
_player = player
_db_path = str(Path(db_path).resolve()) if db_path else DB_PATH
global manager
manager = LrcManager(db_path=_db_path)
app(tokens)
# fetch
@app.command
def fetch(
*,
method: Annotated[
FetcherMethodType | None,
cyclopts.Parameter(help="Force a specific source."),
] = None,
no_cache: Annotated[
bool,
cyclopts.Parameter(
name="--no-cache", negative="", help="Bypass the cache for this request."
),
] = False,
only_synced: Annotated[
bool,
cyclopts.Parameter(
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
),
] = False,
plain: Annotated[
bool,
cyclopts.Parameter(
name="--plain", negative="", help="Output only the raw lyrics without tags."
),
] = False,
):
"""Fetch and print lyrics for the currently playing track."""
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
logger.info(f"Track: {track.display_name()}")
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
if not result or not result.lyrics:
logger.error("No lyrics found.")
sys.exit(1)
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print(result.lyrics.to_lrc(plain=plain))
# search
@app.command
def search(
*,
title: Annotated[
str | None, cyclopts.Parameter(name=["--title", "-t"], help="Track title.")
] = None,
artist: Annotated[
str | None, cyclopts.Parameter(name=["--artist", "-a"], help="Artist name.")
] = None,
album: Annotated[str | None, cyclopts.Parameter(help="Album name.")] = None,
trackid: Annotated[str | None, cyclopts.Parameter(help="Spotify track ID.")] = None,
length: Annotated[
int | None,
cyclopts.Parameter(
name=["--length", "-l"], help="Track duration in milliseconds."
),
] = None,
url: Annotated[
str | None,
cyclopts.Parameter(
help="Local file URL (file:///...). Mutually exclusive with --path."
),
] = None,
path: Annotated[
str | None,
cyclopts.Parameter(
name=["--path"],
help="Local audio file path. Mutually exclusive with --url.",
),
] = None,
method: Annotated[
FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
] = None,
no_cache: Annotated[
bool,
cyclopts.Parameter(
name="--no-cache", negative="", help="Bypass the cache for this request."
),
] = False,
only_synced: Annotated[
bool,
cyclopts.Parameter(
name="--only-synced", negative="", help="Only accept synced (timed) lyrics."
),
] = False,
plain: Annotated[
bool,
cyclopts.Parameter(
name="--plain", negative="", help="Output only the raw lyrics without tags."
),
] = False,
):
"""Search for lyrics by metadata (bypasses MPRIS)."""
if url and path:
logger.error("--url and --path are mutually exclusive.")
sys.exit(1)
if path:
resolved = str(Path(path).resolve())
url = "file://" + quote(resolved, safe="/")
track = TrackMeta(
title=title,
artist=artist,
album=album,
trackid=trackid,
length=length,
url=url,
)
logger.info(f"Track: {track.display_name()}")
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
if not result or not result.lyrics:
logger.error("No lyrics found.")
sys.exit(1)
if only_synced and result.status != CacheStatus.SUCCESS_SYNCED:
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print(result.lyrics.to_lrc(plain=plain))
# export
@app.command
def export(
*,
output: Annotated[
str | None,
cyclopts.Parameter(
name=["--output", "-o"],
help="Output file path (default: same directory as audio file with .lrc extension, or current directory if not available).",
),
] = None,
method: Annotated[
FetcherMethodType | None, cyclopts.Parameter(help="Force a specific source.")
] = None,
no_cache: Annotated[
bool, cyclopts.Parameter(name="--no-cache", negative="", help="Bypass cache.")
] = False,
overwrite: Annotated[
bool,
cyclopts.Parameter(
name=["--overwrite", "-f"], negative="", help="Overwrite existing file."
),
] = False,
plain: Annotated[
bool,
cyclopts.Parameter(
name="--plain", negative="", help="Export only the raw lyrics without tags."
),
] = False,
):
"""Export lyrics of the current track to a .lrc file."""
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
result = manager.fetch_for_track(track, force_method=method, bypass_cache=no_cache)
if not result or not result.lyrics:
logger.error("No lyrics available to export.")
sys.exit(1)
# Output file extension
ext = ".lrc" if not plain else ".txt"
if output and not output.endswith(ext):
output += ext
# Build default output path
if not output:
if track.url:
lrc_path = get_sidecar_path(track.url, ensure_exists=False, extension=ext)
if lrc_path:
output = str(lrc_path)
logger.info(f"Exporting to sidecar path: {output}")
# Fallback to current directory with sanitized filename
if not output:
filename = (
f"{track.artist} - {track.title}{ext}"
if track.artist and track.title
else "lyrics" + ext
)
# Sanitize filename
filename = "".join(
c for c in filename if c.isalpha() or c.isdigit() or c in " -_."
).rstrip()
output = os.path.join(os.getcwd(), filename)
if os.path.exists(output) and not overwrite:
logger.error(f"File exists: {output} (use -f to overwrite)")
sys.exit(1)
try:
with open(output, "w", encoding="utf-8") as f:
if plain:
f.write(result.lyrics.to_plain())
else:
f.write(str(result.lyrics))
logger.info(f"Exported lyrics to {output}")
except Exception as e:
logger.error(f"Failed to write file: {e}")
sys.exit(1)
# cache subcommands
@cache_app.command
def query(
*,
all: Annotated[
bool,
cyclopts.Parameter(name="--all", negative="", help="Dump all cache entries."),
] = False,
):
"""Show cached entries for the current track."""
if all:
rows = manager.cache.query_all()
if not rows:
print("Cache is empty.")
return
for row in rows:
_print_cache_row(row)
print()
return
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
_print_track_cache(track)
@cache_app.command
def clear(
*,
all: Annotated[
bool,
cyclopts.Parameter(name="--all", negative="", help="Clear the entire cache."),
] = False,
):
"""Clear cached entries for the current track."""
if all:
manager.cache.clear_all()
return
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
manager.cache.clear_track(track)
@cache_app.command
def prune():
"""Remove expired cache entries."""
manager.cache.prune()
@cache_app.command
def stats():
"""Show cache statistics."""
s = manager.cache.stats()
print("=== Cache Statistics ===")
print(f"Total entries : {s['total']}")
print(f"Active : {s['active']}")
print(f"Expired : {s['expired']}")
# Source × Status table
table = s.get("source_status", {})
if table:
all_statuses = sorted({st for row in table.values() for st in row})
# Short labels for column headers
short = {
"SUCCESS_SYNCED": "synced",
"SUCCESS_UNSYNCED": "unsynced",
"NOT_FOUND": "not_found",
"NETWORK_ERROR": "net_err",
}
headers = [short.get(st, st) for st in all_statuses]
sources = sorted(table.keys())
# Column widths
src_w = max(len(src) for src in sources)
src_w = max(src_w, 6) # min width for "source" header
col_w = [max(len(h) if h else 0, 4) for h in headers]
print(
f"\n{'source':<{src_w}} "
+ " ".join(f"{h:>{w}}" for h, w in zip(headers, col_w))
)
print("-" * src_w + " " + " ".join("-" * w for w in col_w))
for src in sources:
counts = [str(table[src].get(st, 0)) for st in all_statuses]
print(
f"{src:<{src_w}} "
+ " ".join(f"{c:>{w}}" for c, w in zip(counts, col_w))
)
totals = [
str(sum(table[src].get(st, 0) for src in sources)) for st in all_statuses
]
print("-" * src_w + " " + " ".join("-" * w for w in col_w))
print(
f"{'total':<{src_w}} "
+ " ".join(f"{c:>{w}}" for c, w in zip(totals, col_w))
)
# Confidence distribution (positive entries only)
buckets = s.get("confidence_buckets", {})
non_empty = {k: v for k, v in buckets.items() if v > 0}
if non_empty:
label_w = max(len(k) for k in non_empty)
print("\nConfidence distribution (positive entries):")
for label, count in buckets.items():
if count > 0:
print(f" {label:>{label_w}} : {count}")
@cache_app.command
def confidence(
source: Annotated[
str, cyclopts.Parameter(help="Source to update (e.g. spotify, netease).")
],
score: Annotated[float, cyclopts.Parameter(help="Confidence score (0-100).")],
):
"""Set confidence score for the current track's cache entry from a specific source."""
if not 0 <= score <= 100:
logger.error("Score must be between 0 and 100.")
sys.exit(1)
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
updated = manager.cache.update_confidence(track, score, source=source)
if updated:
print(f"Updated [{source}] confidence to {score:.0f}.")
else:
print(f"No cache entry found for [{source}].")
@cache_app.command
def insert(
*,
path: Annotated[
str | None,
cyclopts.Parameter(
name=["--path"],
help="Path to a local .lrc file to insert instead of reading from stdin.",
),
] = None,
):
"""Manually insert lyrics into the cache for the current track."""
track = get_current_track(_player)
if not track:
logger.error("No active playing track found.")
sys.exit(1)
if path:
try:
with open(path, "r", encoding="utf-8") as f:
lyrics = f.read()
except Exception as e:
logger.error(f"Failed to read file: {e}")
sys.exit(1)
else:
logger.info("Reading lyrics from stdin (Ctrl+D to finish)...")
lyrics = sys.stdin.read()
manager.manual_insert(track, lyrics)
# helpers
def _print_track_cache(track: TrackMeta) -> None:
"""Print all cached entries for a given track."""
print(f"Track: {track.display_name()}")
if track.album:
print(f"Album: {track.album}")
if track.length:
secs = track.length / 1000.0
print(f"Duration: {int(secs // 60)}:{secs % 60:05.2f}")
print()
rows = manager.cache.query_track(track)
if not rows:
print(" (no cache entries)")
return
for row in rows:
_print_cache_row(row, indent=" ")
def _print_cache_row(row: dict, indent: str = "") -> None:
"""Pretty-print a single cache row."""
now = int(time.time())
source = row.get("source", "?")
status = row.get("status", "?")
artist = row.get("artist", "")
title = row.get("title", "")
album = row.get("album", "")
created = row.get("created_at", 0)
expires = row.get("expires_at")
lyrics = row.get("lyrics", "")
confidence = row.get("confidence")
name = f"{artist} - {title}" if artist and title else row.get("key", "?")
print(f"{indent}[{source}] {name}")
if album:
print(f"{indent} Album : {album}")
print(f"{indent} Status : {status}")
if created:
age = now - created
print(f"{indent} Cached : {age // 3600}h {(age % 3600) // 60}m ago")
if expires:
remaining = expires - now
if remaining > 0:
print(
f"{indent} Expires : in {remaining // 3600}h {(remaining % 3600) // 60}m"
)
else:
print(f"{indent} Expires : EXPIRED")
else:
print(f"{indent} Expires : never")
if lyrics:
line_count = len(lyrics.splitlines())
print(f"{indent} Lyrics : {line_count} lines")
if confidence is not None:
print(f"{indent} Confidence: {confidence:.0f}")
else:
print(f"{indent} Confidence: (legacy)")
def run():
app.meta()
if __name__ == "__main__":
run()
+110
View File
@@ -0,0 +1,110 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:17:56
Description: Global configuration constants and logger setup.
"""
import os
import sys
from pathlib import Path
from platformdirs import user_cache_dir, user_config_dir
from dotenv import load_dotenv
from loguru import logger
from importlib.metadata import version
# Application
APP_NAME = "lrx-cli"
APP_AUTHOR = "Uyanide"
APP_VERSION = version(APP_NAME)
# Paths
CACHE_DIR = user_cache_dir(APP_NAME, APP_AUTHOR)
DB_PATH = os.path.join(CACHE_DIR, "cache.db")
# .env loading
_config_env = Path(user_config_dir(APP_NAME, APP_AUTHOR)) / ".env"
load_dotenv(_config_env) # ~/.config/lrx-cli/.env
load_dotenv() # .env in cwd (does NOT override existing vars)
# HTTP
HTTP_TIMEOUT = 10.0
# Cache TTLs (seconds)
TTL_SYNCED = None # never expires
TTL_UNSYNCED = 86400 # 1 day
TTL_NOT_FOUND = 86400 * 3 # 3 days
TTL_NETWORK_ERROR = 3600 # 1 hour
# Search
DURATION_TOLERANCE_MS = 3000 # max duration mismatch for search matching
# Confidence scoring weights (sum to 100)
SCORE_W_TITLE = 40.0
SCORE_W_ARTIST = 30.0
SCORE_W_ALBUM = 10.0
SCORE_W_DURATION = 10.0
SCORE_W_SYNCED = 10.0
# Confidence thresholds
MIN_CONFIDENCE = 25.0 # below this, candidate is rejected
HIGH_CONFIDENCE = 80.0 # at or above this, stop searching early
# Multi-candidate fetching
MULTI_CANDIDATE_LIMIT = 3 # max candidates to try per search-based fetcher
MULTI_CANDIDATE_DELAY_S = 0.2 # delay between sequential lyric fetches
# Legacy cache rows (no confidence stored) get a base score by sync status
LEGACY_CONFIDENCE_SYNCED = 50.0
LEGACY_CONFIDENCE_UNSYNCED = 40.0
# User-Agents
UA_BROWSER = "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0"
UA_LRX = f"LRX-CLI {APP_VERSION} (https://github.com/Uyanide/lrx-cli)"
MUSIXMATCH_COOLDOWN_MS = 600_000 # 10 minutes
# Player preference (used when multiple MPRIS players are active)
PREFERRED_PLAYER = os.environ.get("PREFERRED_PLAYER", "spotify")
class _Credentials:
"""Credential config with lazy os.environ reads.
Stable constants live as module-level names above.
Credentials are @property so monkeypatch.setenv / monkeypatch.delenv
affect them without needing to patch each consumer separately.
"""
@property
def SPOTIFY_SP_DC(self) -> str:
return os.environ.get("SPOTIFY_SP_DC", "")
@property
def QQ_MUSIC_API_URL(self) -> str:
return os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/")
@property
def MUSIXMATCH_USERTOKEN(self) -> str:
return os.environ.get("MUSIXMATCH_USERTOKEN", "")
credentials = _Credentials()
os.makedirs(CACHE_DIR, exist_ok=True)
# Logger
_LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
logger.remove()
logger.add(sys.stderr, format=_LOG_FORMAT, level="INFO")
def enable_debug() -> None:
"""Switch logger to DEBUG level."""
logger.remove()
logger.add(sys.stderr, format=_LOG_FORMAT, level="DEBUG")
+234
View File
@@ -0,0 +1,234 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 11:09:53
Description: Core orchestrator — coordinates fetchers with cache-aware fallback.
Also handles enrichers & authenticators & …
"""
import asyncio
from typing import Optional
from loguru import logger
from .fetchers import FetcherMethodType, build_plan, create_fetchers
from .fetchers.base import BaseFetcher
from .authenticators import create_authenticators
from .cache import CacheEngine
from .lrc import LRCData
from .config import (
TTL_SYNCED,
TTL_UNSYNCED,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
HIGH_CONFIDENCE,
)
from .models import TrackMeta, LyricResult, CacheStatus
from .enrichers import create_enrichers, enrich_track
# Maps CacheStatus to the default TTL used when storing results
_STATUS_TTL: dict[CacheStatus, Optional[int]] = {
CacheStatus.SUCCESS_SYNCED: TTL_SYNCED,
CacheStatus.SUCCESS_UNSYNCED: TTL_UNSYNCED,
CacheStatus.NOT_FOUND: TTL_NOT_FOUND,
CacheStatus.NETWORK_ERROR: TTL_NETWORK_ERROR,
}
def _is_better(new: LyricResult, old: LyricResult) -> bool:
"""Compare two results: higher confidence wins; synced breaks ties."""
if new.confidence != old.confidence:
return new.confidence > old.confidence
# Equal confidence — prefer synced as tiebreaker
return (
new.status == CacheStatus.SUCCESS_SYNCED
and old.status != CacheStatus.SUCCESS_SYNCED
)
def _normalize_result(result: LyricResult) -> LyricResult:
"""Normalize unsynced lyrics before returning."""
if result.status == CacheStatus.SUCCESS_UNSYNCED and result.lyrics:
return LyricResult(
status=result.status,
lyrics=result.lyrics.normalize_unsynced(),
source=result.source,
ttl=result.ttl,
confidence=result.confidence,
)
return result
class LrcManager:
"""Main entry point for fetching lyrics with caching."""
def __init__(self, db_path: str) -> None:
self.cache = CacheEngine(db_path=db_path)
self.authenticators = create_authenticators(self.cache)
self.fetchers = create_fetchers(self.cache, self.authenticators)
self.enrichers = create_enrichers(self.authenticators)
async def _run_group(
self,
group: list[BaseFetcher],
track: TrackMeta,
bypass_cache: bool,
) -> list[tuple[str, LyricResult]]:
"""Run one group: cache-check first, then parallel-fetch uncached. Returns (source, result) pairs."""
cached_results: list[tuple[str, LyricResult]] = []
need_fetch: list[BaseFetcher] = []
for fetcher in group:
source = fetcher.source_name
if not bypass_cache and not fetcher.self_cached:
cached = self.cache.get(track, source)
if cached:
if cached.status in (
CacheStatus.NOT_FOUND,
CacheStatus.NETWORK_ERROR,
):
logger.debug(
f"[{source}] cache hit: {cached.status.value}, skipping"
)
continue
is_trusted = cached.confidence >= HIGH_CONFIDENCE
logger.info(
f"[{source}] cache hit: {cached.status.value}"
f" (confidence={cached.confidence:.0f})"
)
cached_results.append((source, cached))
# Return immediately on trusted synced cache hit
if cached.status == CacheStatus.SUCCESS_SYNCED and is_trusted:
return cached_results
continue
elif not fetcher.self_cached:
logger.debug(f"[{source}] cache bypassed")
need_fetch.append(fetcher)
if need_fetch:
task_map: dict[asyncio.Task, BaseFetcher] = {
asyncio.create_task(f.fetch(track, bypass_cache=bypass_cache)): f
for f in need_fetch
}
pending = set(task_map)
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
found_trusted = False
for task in done:
fetcher = task_map[task]
source = fetcher.source_name
try:
result = task.result()
except Exception as e:
logger.error(f"[{source}] fetch raised: {e}")
continue
if result is None:
logger.debug(f"[{source}] returned None")
continue
if not fetcher.self_cached and not bypass_cache:
ttl = result.ttl or _STATUS_TTL.get(
result.status, TTL_NOT_FOUND
)
self.cache.set(track, source, result, ttl_seconds=ttl)
if result.status in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
logger.info(
f"[{source}] got {result.status.value} lyrics"
f" (confidence={result.confidence:.0f})"
)
cached_results.append((source, result))
if (
result.status == CacheStatus.SUCCESS_SYNCED
and result.confidence >= HIGH_CONFIDENCE
):
found_trusted = True
if found_trusted:
for t in pending:
t.cancel()
await asyncio.gather(*pending, return_exceptions=True)
break
return cached_results
async def _fetch_for_track(
self,
track: TrackMeta,
force_method: Optional[FetcherMethodType],
bypass_cache: bool,
) -> Optional[LyricResult]:
track = await enrich_track(track, self.enrichers)
logger.info(f"Fetching lyrics for: {track.display_name()}")
plan = build_plan(self.fetchers, track, force_method)
if not plan:
return None
best_result: Optional[LyricResult] = None
for group in plan:
group_results = await self._run_group(group, track, bypass_cache)
for source, result in group_results:
if result.status not in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
continue
is_trusted = result.confidence >= HIGH_CONFIDENCE
# Trusted synced → return immediately
if result.status == CacheStatus.SUCCESS_SYNCED and is_trusted:
logger.info(
f"Returning {result.status.value} lyrics from {source}"
f" (confidence={result.confidence:.0f})"
)
return _normalize_result(result)
if best_result is None or _is_better(result, best_result):
best_result = result
if best_result:
logger.info(
f"Returning {best_result.status.value} lyrics from {best_result.source}"
)
return _normalize_result(best_result)
logger.info(f"No lyrics found for {track.display_name()}")
return None
def fetch_for_track(
self,
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
bypass_cache: bool = False,
) -> Optional[LyricResult]:
"""Fetch lyrics for *track* using the group-based parallel pipeline."""
return asyncio.run(self._fetch_for_track(track, force_method, bypass_cache))
def manual_insert(
self,
track: TrackMeta,
lyrics: str,
) -> None:
"""Manually insert lyrics into the cache for a track."""
track = asyncio.run(enrich_track(track, self.enrichers))
logger.info(f"Manually inserting lyrics for: {track.display_name()}")
lrc = LRCData(lyrics)
result = LyricResult(
status=lrc.detect_sync_status(),
lyrics=lrc,
source="manual",
ttl=None,
)
self.cache.set(track, "manual", result, ttl_seconds=None)
logger.info("Lyrics inserted into cache.")
+58
View File
@@ -0,0 +1,58 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:09:11
Description: Metadata enrichment pipeline
"""
from loguru import logger
from .base import BaseEnricher
from .audio_tag import AudioTagEnricher
from .file_name import FileNameEnricher
from .musixmatch import MusixmatchSpotifyEnricher
from ..authenticators import BaseAuthenticator, MusixmatchAuthenticator
from ..models import TrackMeta
# Enrichers run in order; earlier ones have higher priority.
# There are only a few of them, so we can just call them sequentially without worrying about async concurrency or batching.
def create_enrichers(
authenticators: dict[str, BaseAuthenticator],
) -> list[BaseEnricher]:
"""Instantiate all enrichers."""
mxm_auth = authenticators["musixmatch"]
assert isinstance(mxm_auth, MusixmatchAuthenticator)
return [
AudioTagEnricher(),
FileNameEnricher(),
MusixmatchSpotifyEnricher(mxm_auth),
]
async def enrich_track(track: TrackMeta, enrichers: list[BaseEnricher]) -> TrackMeta:
"""Run all enrichers and return a track with missing fields filled in.
Each enricher sees the cumulative state (earlier enrichers' results
are already applied). A field is only set if it is currently None.
"""
for enricher in enrichers:
try:
# Skip if all provided fields are already filled
if all(
getattr(track, field, None) is not None for field in enricher.provides
):
continue
result = await enricher.enrich(track)
except Exception as e:
logger.warning(f"Enricher {enricher.name} failed: {e}")
continue
if not result:
continue
# Only apply fields that are still None
updates = {k: v for k, v in result.items() if getattr(track, k, None) is None}
if updates:
for k, v in updates.items():
setattr(track, k, v)
return track
+82
View File
@@ -0,0 +1,82 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:11:27
Description: Enricher that reads metadata from audio file tags.
"""
from typing import Optional
from loguru import logger
from mutagen._file import File, FileType
from .base import BaseEnricher
from ..models import TrackMeta
from ..lrc import get_audio_path
class AudioTagEnricher(BaseEnricher):
"""Extract title, artist, album, and duration from audio file tags."""
@property
def name(self) -> str:
return "audio-tag"
@property
def provides(self) -> set[str]:
return {"title", "artist", "album", "length"}
async def enrich(self, track: TrackMeta) -> Optional[dict]:
if not track.is_local or not track.url:
return None
audio_path = get_audio_path(track.url, ensure_exists=True)
if not audio_path:
return None
try:
audio = File(audio_path)
except Exception as e:
logger.debug(f"AudioTag: failed to read {audio_path}: {e}")
return None
if audio is None:
return None
updates: dict = {}
# Try common tag names (vorbis comments, ID3, MP4)
title = _first_tag(audio, "title", "TIT2", "\xa9nam")
if title and not track.title:
updates["title"] = title
artist = _first_tag(audio, "artist", "TPE1", "\xa9ART")
if artist and not track.artist:
updates["artist"] = artist
album = _first_tag(audio, "album", "TALB", "\xa9alb")
if album and not track.album:
updates["album"] = album
if not track.length and audio.info and hasattr(audio.info, "length"):
length_ms = int(audio.info.length * 1000)
if length_ms > 0:
updates["length"] = length_ms
if updates:
logger.debug(f"AudioTag: enriched fields: {list(updates.keys())}")
return updates or None
def _first_tag(audio: FileType, *keys: str) -> Optional[str]:
"""Return the first non-empty string value found among the given tag keys."""
if not audio.tags:
return None
for key in keys:
val = audio.tags.get(key)
if val is None:
continue
# mutagen returns lists for vorbis, single values for ID3
if isinstance(val, list):
val = val[0] if val else None
if val:
return str(val).strip()
return None
+35
View File
@@ -0,0 +1,35 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:08:16
Description: Base class for metadata enrichers.
"""
from abc import ABC, abstractmethod
from typing import Optional
from ..models import TrackMeta
class BaseEnricher(ABC):
"""Attempts to fill missing fields on a TrackMeta.
Each enricher inspects the track, and returns a dict of field names
to values for any fields it can provide. Only fields that are
currently ``None`` on the track will actually be applied.
"""
@property
@abstractmethod
def name(self) -> str: ...
@property
@abstractmethod
def provides(self) -> set[str]: ...
@abstractmethod
async def enrich(self, track: TrackMeta) -> Optional[dict]:
"""Return a dict of {field_name: value} for fields this enricher can fill.
Return None or an empty dict if nothing can be contributed.
"""
...
+100
View File
@@ -0,0 +1,100 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 06:08:44
Description: Enricher that parses metadata from the audio file path.
"""
import re
from typing import Optional
from loguru import logger
from .base import BaseEnricher
from ..models import TrackMeta
from ..lrc import get_audio_path
# Common track-number prefixes: "01 - ", "01. ", "1 - ", etc.
_TRACK_NUM_RE = re.compile(r"^\d{1,3}[\s.\-]+")
class FileNameEnricher(BaseEnricher):
"""Derive artist / title from the file path when tags are unavailable.
Heuristics (applied to the stem of the filename):
- "Artist - Title" → artist, title
- "01 - Title" → title only (leading track number stripped)
- "Title" → title only
If artist is still missing after parsing the filename, the parent
directory name is used as a guess (common layout: ``Artist/Album/track``).
"""
@property
def name(self) -> str:
return "file-name"
@property
def provides(self) -> set[str]:
return {"artist", "title", "album"}
async def enrich(self, track: TrackMeta) -> Optional[dict]:
if not track.is_local or not track.url:
return None
audio_path = get_audio_path(track.url, ensure_exists=False)
if not audio_path:
return None
updates: dict = {}
stem = audio_path.stem
# Try "Artist - Title" split
if " - " in stem:
left, right = stem.split(" - ", 1)
left = _TRACK_NUM_RE.sub("", left).strip()
right = right.strip()
if left and right:
# Both sides non-empty after stripping track number
if not track.artist:
updates["artist"] = left
if not track.title:
updates["title"] = right
elif right:
# Left was only a track number → right is the title
if not track.title:
updates["title"] = right
# Try "Artist-Title" split (no spaces)
elif "-" in stem:
left, right = stem.split("-", 1)
left = _TRACK_NUM_RE.sub("", left).strip()
right = right.strip()
if left and right:
if not track.artist:
updates["artist"] = left
if not track.title:
updates["title"] = right
elif right:
if not track.title:
updates["title"] = right
# No separator: strip track number, remainder is title
else:
title_guess = _TRACK_NUM_RE.sub("", stem).strip()
if title_guess and not track.title:
updates["title"] = title_guess
# Use parent directory as album fallback
if not track.album and "album" not in updates:
parents = audio_path.parents
if len(parents) >= 1:
album_dir = parents[0].name
if album_dir and album_dir not in (".", "/"):
if not track.album:
updates["album"] = album_dir
if updates:
logger.debug(f"FileName: enriched fields: {list(updates.keys())}")
return updates or None
+72
View File
@@ -0,0 +1,72 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 02:13:49
Description: Musixmatch metadata enricher (matcher.track.get by Spotify track ID).
"""
from typing import Optional
from loguru import logger
from .base import BaseEnricher
from ..authenticators.musixmatch import MusixmatchAuthenticator
from ..models import TrackMeta
_MUSIXMATCH_TRACK_MATCH_URL = (
"https://apic-desktop.musixmatch.com/ws/1.1/matcher.track.get"
)
class MusixmatchSpotifyEnricher(BaseEnricher):
"""Fill title, artist, album, and length from Musixmatch using Spotify track ID."""
def __init__(self, auth: MusixmatchAuthenticator) -> None:
self.auth = auth
@property
def name(self) -> str:
return "musixmatch"
@property
def provides(self) -> set[str]:
return {"title", "artist", "album", "length"}
async def enrich(self, track: TrackMeta) -> Optional[dict]:
if not track.trackid:
return None
logger.debug(f"Musixmatch enricher: looking up trackid={track.trackid}")
try:
data = await self.auth.get_json(
_MUSIXMATCH_TRACK_MATCH_URL,
{"track_spotify_id": track.trackid},
)
except Exception as e:
logger.warning(f"Musixmatch enricher: request failed: {e}")
return None
if data is None:
return None
body = data.get("message", {}).get("body")
t = body.get("track") if isinstance(body, dict) else None
if not isinstance(t, dict):
logger.debug(
f"Musixmatch enricher: no track data for trackid={track.trackid}"
)
return None
updates: dict = {}
if isinstance(t.get("track_name"), str) and t["track_name"]:
updates["title"] = t["track_name"]
if isinstance(t.get("artist_name"), str) and t["artist_name"]:
updates["artist"] = t["artist_name"]
if isinstance(t.get("album_name"), str) and t["album_name"]:
updates["album"] = t["album_name"]
if isinstance(t.get("track_length"), int) and t["track_length"] > 0:
updates["length"] = t["track_length"] * 1000
if updates:
logger.debug(f"Musixmatch enricher: filled {list(updates.keys())}")
return updates or None
+100
View File
@@ -0,0 +1,100 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 02:33:26
Description: Fetcher pipeline — registry and types.
"""
from typing import Literal, Optional
from loguru import logger
from .base import BaseFetcher
from .local import LocalFetcher
from .cache_search import CacheSearchFetcher
from .spotify import SpotifyFetcher
from .lrclib import LrclibFetcher
from .lrclib_search import LrclibSearchFetcher
from .musixmatch import MusixmatchFetcher, MusixmatchSpotifyFetcher
from .netease import NeteaseFetcher
from .qqmusic import QQMusicFetcher
from ..authenticators import (
BaseAuthenticator,
SpotifyAuthenticator,
MusixmatchAuthenticator,
QQMusicAuthenticator,
)
from ..cache import CacheEngine
from ..models import TrackMeta
FetcherMethodType = Literal[
"local",
"cache-search",
"spotify",
"lrclib",
"musixmatch-spotify",
"lrclib-search",
"netease",
"qqmusic",
"musixmatch",
]
# Fetchers within a group run in parallel; groups run sequentially.
# A group that produces any trusted and synced result stops the pipeline.
_FETCHER_GROUPS: list[list[FetcherMethodType]] = [
["local"],
["cache-search"],
["spotify"],
["lrclib", "musixmatch-spotify"],
["lrclib-search", "musixmatch"],
["netease", "qqmusic"],
]
def create_fetchers(
cache: CacheEngine,
authenticators: dict[str, BaseAuthenticator],
) -> dict[FetcherMethodType, BaseFetcher]:
"""Instantiate all fetchers. Returns a dict keyed by source name."""
spotify_auth = authenticators["spotify"]
mxm_auth = authenticators["musixmatch"]
qqmusic_auth = authenticators.get("qqmusic")
assert isinstance(spotify_auth, SpotifyAuthenticator)
assert isinstance(mxm_auth, MusixmatchAuthenticator)
assert isinstance(qqmusic_auth, QQMusicAuthenticator)
fetchers: dict[FetcherMethodType, BaseFetcher] = {
"local": LocalFetcher(),
"cache-search": CacheSearchFetcher(cache),
"spotify": SpotifyFetcher(spotify_auth),
"lrclib": LrclibFetcher(),
"musixmatch-spotify": MusixmatchSpotifyFetcher(mxm_auth),
"lrclib-search": LrclibSearchFetcher(),
"netease": NeteaseFetcher(),
"qqmusic": QQMusicFetcher(qqmusic_auth),
"musixmatch": MusixmatchFetcher(mxm_auth),
}
return fetchers
def build_plan(
fetchers: dict[FetcherMethodType, BaseFetcher],
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
) -> list[list[BaseFetcher]]:
"""Return the fetch plan as a list of groups (each group runs in parallel)."""
if force_method:
if force_method not in fetchers:
logger.error(f"Unknown method: {force_method}")
return []
return [[fetchers[force_method]]]
plan: list[list[BaseFetcher]] = []
for group_methods in _FETCHER_GROUPS:
group = [
fetchers[m]
for m in group_methods
if m in fetchers and fetchers[m].is_available(track)
]
if group:
plan.append(group)
logger.debug(f"Fetch plan: {[[f.source_name for f in g] for g in plan]}")
return plan
+35
View File
@@ -0,0 +1,35 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 02:33:26
Description: Base fetcher class and common interfaces.
"""
from abc import ABC, abstractmethod
from typing import Optional
from ..models import TrackMeta, LyricResult
class BaseFetcher(ABC):
@property
@abstractmethod
def source_name(self) -> str:
"""Name of the fetcher source."""
pass
@property
def self_cached(self) -> bool:
"""True if this fetcher manages its own cache (skip per-source cache check)."""
return False
@abstractmethod
def is_available(self, track: TrackMeta) -> bool:
"""Check if the fetcher is available for the given track (e.g. has required metadata)."""
pass
@abstractmethod
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Fetch lyrics for the given track. Returns None if unable to fetch."""
pass
+98
View File
@@ -0,0 +1,98 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-28 05:57:46
Description: Cache-search fetcher — cross-album fuzzy lookup in the local cache.
Searches existing cache entries by artist + title with fuzzy normalization,
ignoring album and source. Useful when the same track appears on different
albums or is played from different players.
"""
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from .selection import SearchCandidate, select_best
from ..models import TrackMeta, LyricResult, CacheStatus
from ..cache import CacheEngine
from ..lrc import LRCData
class CacheSearchFetcher(BaseFetcher):
def __init__(self, cache: CacheEngine) -> None:
self._cache = cache
@property
def source_name(self) -> str:
return "cache-search"
@property
def self_cached(self) -> bool:
return True
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
if bypass_cache:
logger.debug("Cache-search: bypassed by caller")
return None
if not track.title:
logger.debug("Cache-search: skipped — no title")
return None
# Fast path: exact metadata match (artist+title+album), single SQL query
exact = self._cache.find_best_positive(track)
if exact:
logger.info(f"Cache-search: exact hit ({exact.status.value})")
return exact
# Slow path: fuzzy cross-album search
matches = self._cache.search_by_meta(
title=track.title,
length=track.length,
)
if not matches:
logger.debug(f"Cache-search: no match for {track.display_name()}")
return None
# Pick best by confidence scoring
candidates = [
SearchCandidate(
item=m,
duration_ms=float(m["length"]) if m.get("length") else None,
is_synced=m.get("status") == CacheStatus.SUCCESS_SYNCED.value,
title=m.get("title"),
artist=m.get("artist"),
album=m.get("album"),
)
for m in matches
if m.get("lyrics")
]
best, confidence = select_best(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if not best:
return None
status = CacheStatus(best["status"])
logger.info(
f"Cache-search: fuzzy hit from [{best.get('source')}] "
f"album={best.get('album')!r} ({status.value}, confidence={confidence:.0f})"
)
return LyricResult(
status=status,
lyrics=LRCData(best["lyrics"]),
source=self.source_name,
confidence=confidence,
)
+101
View File
@@ -0,0 +1,101 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-26 02:08:41
Description: Local fetcher — reads lyrics from .lrc sidecar files or embedded audio metadata.
Priority:
1. Same-directory .lrc file (e.g. /path/to/track.lrc)
2. Embedded lyrics in audio metadata (FLAC, MP3 USLT/SYLT tags)
"""
from typing import Optional
from loguru import logger
from mutagen._file import File
from mutagen.flac import FLAC
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult
from ..lrc import get_audio_path, get_sidecar_path, LRCData
class LocalFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "local"
def is_available(self, track: TrackMeta) -> bool:
return track.is_local
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Attempt to read lyrics from local filesystem."""
if not track.is_local or not track.url:
return None
audio_path = get_audio_path(track.url, ensure_exists=False)
if not audio_path:
logger.debug(f"Local: audio URL is not a valid file path: {track.url}")
return None
lrc_path = get_sidecar_path(
track.url, ensure_audio_exists=False, ensure_exists=True
)
if lrc_path:
try:
with open(lrc_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if content:
lrc = LRCData(content)
status = lrc.detect_sync_status()
logger.info(
f"Local: found .lrc sidecar ({status.value}) for {audio_path.name}"
)
return LyricResult(
status=status,
lyrics=lrc,
source=self.source_name,
)
except Exception as e:
logger.error(f"Local: error reading {lrc_path}: {e}")
else:
logger.debug(f"Local: no .lrc sidecar found for {audio_path}")
# Embedded metadata
if not audio_path.exists():
logger.debug(f"Local: audio file does not exist: {audio_path}")
return None
try:
audio = File(audio_path)
if audio is not None:
lyrics = None
if isinstance(audio, FLAC):
# FLAC stores lyrics in vorbis comment tags
lyrics = (
audio.get("lyrics") or audio.get("unsynclyrics") or [None]
)[0]
elif hasattr(audio, "tags") and audio.tags:
# MP3 / other: look for USLT or SYLT ID3 frames
for key in audio.tags.keys():
if key.startswith("USLT") or key.startswith("SYLT"):
lyrics = str(audio.tags[key])
break
if lyrics:
lrc = LRCData(lyrics)
status = lrc.detect_sync_status()
logger.info(
f"Local: found embedded lyrics ({status.value}) for {audio_path.name}"
)
return LyricResult(
status=status,
lyrics=lrc,
source=f"{self.source_name} (embedded)",
)
else:
logger.debug("Local: no embedded lyrics found")
except Exception as e:
logger.error(f"Local: error reading metadata for {audio_path}: {e}")
logger.debug(f"Local: no lyrics found for {audio_path}")
return None
+102
View File
@@ -0,0 +1,102 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 05:23:38
Description: LRCLIB fetcher — queries lrclib.net for synced/plain lyrics.
Requires complete track metadata (artist, title, album, duration).
"""
from typing import Optional
import httpx
from loguru import logger
from urllib.parse import urlencode
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
UA_LRX,
)
_LRCLIB_API_URL = "https://lrclib.net/api/get"
class LrclibFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "lrclib"
def is_available(self, track: TrackMeta) -> bool:
return track.is_complete
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
"""Fetch lyrics from LRCLIB. Requires complete metadata."""
if not track.is_complete:
logger.debug("LRCLIB: skipped — incomplete metadata")
return None
params = {
"track_name": track.title,
"artist_name": track.artist,
"album_name": track.album,
"duration": track.length / 1000.0 if track.length else 0,
}
url = f"{_LRCLIB_API_URL}?{urlencode(params)}"
logger.info(f"LRCLIB: fetching lyrics for {track.display_name()}")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(url, headers={"User-Agent": UA_LRX})
if resp.status_code == 404:
logger.debug(f"LRCLIB: not found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if resp.status_code != 200:
logger.error(f"LRCLIB: API returned {resp.status_code}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
data = resp.json()
if not isinstance(data, dict):
logger.error(f"LRCLIB: unexpected response type: {type(data).__name__}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
synced = data.get("syncedLyrics")
unsynced = data.get("plainLyrics")
if isinstance(synced, str) and synced.strip():
lyrics = LRCData(synced)
logger.info(f"LRCLIB: got synced lyrics ({len(lyrics)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
)
elif isinstance(unsynced, str) and unsynced.strip():
lyrics = LRCData(unsynced)
logger.info(f"LRCLIB: got unsynced lyrics ({len(lyrics)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source=self.source_name,
ttl=TTL_UNSYNCED,
)
else:
logger.debug(f"LRCLIB: empty response for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
except httpx.HTTPError as e:
logger.error(f"LRCLIB: HTTP error: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
except Exception as e:
logger.error(f"LRCLIB: unexpected error: {e}")
return None
+184
View File
@@ -0,0 +1,184 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 05:30:50
Description: LRCLIB search fetcher — fuzzy search via lrclib.net /api/search.
Used when metadata is incomplete (no album or duration) but title is available.
"""
import asyncio
import httpx
from typing import Optional
from loguru import logger
from urllib.parse import urlencode
from .base import BaseFetcher
from .selection import SearchCandidate, select_best
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
UA_LRX,
)
_LRCLIB_SEARCH_URL = "https://lrclib.net/api/search"
class LrclibSearchFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "lrclib-search"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
def _build_queries(self, track: TrackMeta) -> list[dict[str, str]]:
"""Build up to 4 query param sets, from most specific to least.
1. title + artist + album (if all present)
2. title + artist (if artist present)
3. title + album (if album present)
4. title only
"""
assert track.title is not None
title = track.title
queries: list[dict[str, str]] = []
if track.artist and track.album:
queries.append(
{
"track_name": title,
"artist_name": track.artist,
"album_name": track.album,
}
)
if track.artist:
queries.append({"track_name": title, "artist_name": track.artist})
if track.album:
queries.append({"track_name": title, "album_name": track.album})
queries.append({"track_name": title})
return queries
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
if not track.title:
logger.debug("LRCLIB-search: skipped — no title")
return None
queries = self._build_queries(track)
logger.info(f"LRCLIB-search: searching for {track.display_name()}")
seen_ids: set[int] = set()
candidates: list[dict] = []
had_error = False
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
async def _query(params: dict[str, str]) -> tuple[list[dict], bool]:
url = f"{_LRCLIB_SEARCH_URL}?{urlencode(params)}"
logger.debug(f"LRCLIB-search: query {params}")
try:
resp = await client.get(url, headers={"User-Agent": UA_LRX})
except httpx.HTTPError as e:
logger.error(f"LRCLIB-search: HTTP error: {e}")
return [], True
if resp.status_code != 200:
logger.error(f"LRCLIB-search: API returned {resp.status_code}")
return [], True
data = resp.json()
if not isinstance(data, list):
return [], False
return [item for item in data if isinstance(item, dict)], False
all_results = await asyncio.gather(*(_query(p) for p in queries))
for items, err in all_results:
if err:
had_error = True
for item in items:
item_id = item.get("id")
if item_id is not None and item_id in seen_ids:
continue
if item_id is not None:
seen_ids.add(item_id)
candidates.append(item)
if not candidates:
if had_error:
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
logger.debug(f"LRCLIB-search: no results for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
logger.debug(
f"LRCLIB-search: got {len(candidates)} unique candidates "
f"from {len(queries)} queries"
)
mapped = [
SearchCandidate(
item=item,
duration_ms=item["duration"] * 1000
if isinstance(item.get("duration"), (int, float))
else None,
is_synced=isinstance(item.get("syncedLyrics"), str)
and bool(item["syncedLyrics"].strip()),
title=item.get("trackName"),
artist=item.get("artistName"),
album=item.get("albumName"),
)
for item in candidates
]
best, confidence = select_best(
mapped,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if best is None:
logger.debug("LRCLIB-search: no valid candidate found")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
synced = best.get("syncedLyrics")
unsynced = best.get("plainLyrics")
if isinstance(synced, str) and synced.strip():
lyrics = LRCData(synced)
logger.info(
f"LRCLIB-search: got synced lyrics ({len(lyrics)} lines, confidence={confidence:.0f})"
)
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
confidence=confidence,
)
elif isinstance(unsynced, str) and unsynced.strip():
lyrics = LRCData(unsynced)
logger.info(
f"LRCLIB-search: got unsynced lyrics ({len(lyrics)} lines, confidence={confidence:.0f})"
)
return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
source=self.source_name,
ttl=TTL_UNSYNCED,
confidence=confidence,
)
else:
logger.debug("LRCLIB-search: best candidate has empty lyrics")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
except httpx.HTTPError as e:
logger.error(f"LRCLIB-search: HTTP error: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
except Exception as e:
logger.error(f"LRCLIB-search: unexpected error: {e}")
return None
+295
View File
@@ -0,0 +1,295 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-04 15:28:34
Description: Musixmatch fetchers (desktop API, anonymous or usertoken auth).
Uses the Musixmatch desktop API (apic-desktop.musixmatch.com).
Token and all HTTP calls are managed by MusixmatchAuthenticator.
Two fetchers:
musixmatch-spotify — direct lookup by Spotify track ID (exact, no search)
musixmatch — metadata search + best-candidate fallback
"""
import json
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from .selection import SearchCandidate, select_best
from ..authenticators.musixmatch import MusixmatchAuthenticator
from ..lrc import LRCData
from ..models import CacheStatus, LyricResult, TrackMeta
from ..config import TTL_NETWORK_ERROR, TTL_NOT_FOUND
_MUSIXMATCH_MACRO_URL = "https://apic-desktop.musixmatch.com/ws/1.1/macro.subtitles.get"
_MUSIXMATCH_SEARCH_URL = "https://apic-desktop.musixmatch.com/ws/1.1/track.search"
# Macro-specific params (format/app_id injected by authenticator)
_MXM_MACRO_PARAMS = {
"namespace": "lyrics_richsynched",
"subtitle_format": "mxm",
"optional_calls": "track.richsync",
}
def _format_ts(s: float) -> str:
mm = int(s) // 60
ss = int(s) % 60
cs = min(round((s % 1) * 100), 99)
return f"[{mm:02d}:{ss:02d}.{cs:02d}]"
def _parse_richsync(body: str) -> Optional[str]:
"""Parse richsync JSON body → LRC text. Each entry: {"ts": float, "x": str}."""
try:
data = json.loads(body)
if not isinstance(data, list):
return None
lines = []
for entry in data:
if not isinstance(entry, dict):
continue
ts = entry.get("ts")
x = entry.get("x")
if not isinstance(ts, (int, float)) or not isinstance(x, str):
continue
lines.append(f"{_format_ts(float(ts))}{x}")
return "\n".join(lines) if lines else None
except Exception:
return None
def _parse_subtitle(body: str) -> Optional[str]:
"""Parse subtitle JSON body → LRC text. Each entry: {"text": str, "time": {"total": float}}."""
try:
data = json.loads(body)
if not isinstance(data, list):
return None
lines = []
for entry in data:
if not isinstance(entry, dict):
continue
text = entry.get("text")
time_obj = entry.get("time")
if not isinstance(text, str) or not isinstance(time_obj, dict):
continue
total = time_obj.get("total")
if not isinstance(total, (int, float)):
continue
lines.append(f"{_format_ts(float(total))}{text}")
return "\n".join(lines) if lines else None
except Exception:
return None
async def _fetch_macro(
auth: MusixmatchAuthenticator,
params: dict,
) -> Optional[LRCData]:
"""Call macro.subtitles.get via auth.get_json.
Returns LRCData (richsync preferred over subtitle), or None when no usable
lyrics are found. Raises on HTTP/network errors.
"""
logger.debug(f"Musixmatch: macro call with {list(params.keys())}")
data = await auth.get_json(_MUSIXMATCH_MACRO_URL, {**_MXM_MACRO_PARAMS, **params})
if data is None:
return None
# Musixmatch returns body=[] (not {}) when the track is not found
body = data.get("message", {}).get("body", {})
if not isinstance(body, dict):
return None
macro_calls = body.get("macro_calls", {})
if not isinstance(macro_calls, dict):
return None
# Prefer richsync (word-level timing)
richsync_msg = macro_calls.get("track.richsync.get", {}).get("message", {})
if (
isinstance(richsync_msg, dict)
and richsync_msg.get("header", {}).get("status_code") == 200
):
richsync_body = (
richsync_msg.get("body", {}).get("richsync", {}).get("richsync_body")
)
if isinstance(richsync_body, str):
lrc_text = _parse_richsync(richsync_body)
if lrc_text:
lrc = LRCData(lrc_text)
if lrc:
logger.debug("Musixmatch: got richsync lyrics")
return lrc
# Fall back to subtitle (line-level timing)
subtitle_msg = macro_calls.get("track.subtitles.get", {}).get("message", {})
if (
isinstance(subtitle_msg, dict)
and subtitle_msg.get("header", {}).get("status_code") == 200
):
subtitle_list = subtitle_msg.get("body", {}).get("subtitle_list", [])
if isinstance(subtitle_list, list) and subtitle_list:
subtitle_body = subtitle_list[0].get("subtitle", {}).get("subtitle_body")
if isinstance(subtitle_body, str):
lrc_text = _parse_subtitle(subtitle_body)
if lrc_text:
lrc = LRCData(lrc_text)
if lrc:
logger.debug("Musixmatch: got subtitle lyrics")
return lrc
logger.debug("Musixmatch: no usable lyrics in macro response")
return None
class MusixmatchSpotifyFetcher(BaseFetcher):
"""Direct lookup by Spotify track ID — no search, single request."""
def __init__(self, auth: MusixmatchAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "musixmatch-spotify"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and not self.auth.is_cooldown()
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
logger.info(f"Musixmatch-Spotify: fetching lyrics for {track.display_name()}")
try:
lrc = await _fetch_macro(
self.auth,
{"track_spotify_id": track.trackid}, # type: ignore[dict-item]
)
except AttributeError:
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
except Exception as e:
logger.error(f"Musixmatch-Spotify: fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
if lrc is None:
logger.debug(
f"Musixmatch-Spotify: no lyrics found for {track.display_name()}"
)
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
logger.info(f"Musixmatch-Spotify: got SUCCESS_SYNCED lyrics ({len(lrc)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrc,
source=self.source_name,
)
class MusixmatchFetcher(BaseFetcher):
"""Metadata search + best-candidate lyric fetch."""
def __init__(self, auth: MusixmatchAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "musixmatch"
@property
def requires_auth(self) -> str:
return "musixmatch"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and not self.auth.is_cooldown()
async def _search(self, track: TrackMeta) -> tuple[Optional[int], float]:
"""Search for track metadata. Raises on network/HTTP errors."""
params: dict = {
"q_track": track.title or "",
"page_size": "10",
"f_has_lyrics": "1",
}
if track.artist:
params["q_artist"] = track.artist
if track.album:
params["q_album"] = track.album
logger.debug(f"Musixmatch: searching for '{track.display_name()}'")
data = await self.auth.get_json(_MUSIXMATCH_SEARCH_URL, params)
if data is None:
return None, 0.0
track_list = data.get("message", {}).get("body", {}).get("track_list", [])
if not isinstance(track_list, list) or not track_list:
logger.debug("Musixmatch: search returned 0 results")
return None, 0.0
logger.debug(f"Musixmatch: search returned {len(track_list)} candidates")
candidates = [
SearchCandidate(
item=int(t["commontrack_id"]),
duration_ms=(
float(t["track_length"]) * 1000 if t.get("track_length") else None
),
is_synced=bool(t.get("has_subtitles") or t.get("has_richsync")),
title=t.get("track_name"),
artist=t.get("artist_name"),
album=t.get("album_name"),
)
for item in track_list
if isinstance(item, dict)
and isinstance(t := item.get("track", {}), dict)
and isinstance(t.get("commontrack_id"), int)
and not t.get("instrumental")
]
best_id, confidence = select_best(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if best_id is not None:
logger.debug(f"Musixmatch: best candidate id={best_id} ({confidence:.0f})")
else:
logger.debug("Musixmatch: no suitable candidate found")
return best_id, confidence
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
logger.info(f"Musixmatch: fetching lyrics for {track.display_name()}")
try:
commontrack_id, confidence = await self._search(track)
if commontrack_id is None:
logger.debug(f"Musixmatch: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrc = await _fetch_macro(
self.auth,
{"commontrack_id": str(commontrack_id)},
)
except AttributeError:
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
except Exception as e:
logger.error(f"Musixmatch: fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
if lrc is None:
logger.debug(f"Musixmatch: no lyrics for commontrack_id={commontrack_id}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
logger.info(
f"Musixmatch: got SUCCESS_SYNCED lyrics "
f"for commontrack_id={commontrack_id} ({len(lrc)} lines)"
)
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lrc,
source=self.source_name,
confidence=confidence,
)
+200
View File
@@ -0,0 +1,200 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 11:04:51
Description: Netease Cloud Music fetcher.
Uses the public cloudsearch API for searching and the song/lyric API for
retrieving lyrics. No authentication required.
"""
import asyncio
from typing import Optional
import httpx
from loguru import logger
from .base import BaseFetcher
from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
MULTI_CANDIDATE_DELAY_S,
UA_BROWSER,
)
_NETEASE_SEARCH_URL = "https://music.163.com/api/cloudsearch/pc"
_NETEASE_LYRIC_URL = "https://interface3.music.163.com/api/song/lyric"
_NETEASE_BASE_HEADERS = {
"User-Agent": UA_BROWSER,
"Referer": "https://music.163.com/",
"Origin": "https://music.163.com",
}
class NeteaseFetcher(BaseFetcher):
@property
def source_name(self) -> str:
return "netease"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title)
async def _search(
self, track: TrackMeta, limit: int = 10
) -> list[tuple[int, float]]:
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
return []
logger.debug(f"Netease: searching for '{query}' (limit={limit})")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.post(
_NETEASE_SEARCH_URL,
headers=_NETEASE_BASE_HEADERS,
data={"s": query, "type": "1", "limit": str(limit), "offset": "0"},
)
resp.raise_for_status()
result = resp.json()
if not isinstance(result, dict):
logger.error(
f"Netease: search returned non-dict: {type(result).__name__}"
)
return []
result_body = result.get("result")
if not isinstance(result_body, dict):
logger.debug("Netease: search 'result' field missing or invalid")
return []
songs = result_body.get("songs")
if not isinstance(songs, list) or len(songs) == 0:
logger.debug("Netease: search returned 0 results")
return []
logger.debug(f"Netease: search returned {len(songs)} candidates")
candidates = [
SearchCandidate(
item=song_id,
duration_ms=float(song["dt"])
if isinstance(song.get("dt"), int)
else None,
title=song.get("name"),
artist=", ".join(a.get("name", "") for a in song.get("ar", []))
or None,
album=(song.get("al") or {}).get("name"),
)
for song in songs
if isinstance(song, dict) and isinstance(song_id := song.get("id"), int)
]
ranked = select_ranked(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if ranked:
logger.debug(
"Netease: top candidates: "
+ ", ".join(f"id={i} ({c:.0f})" for i, c in ranked)
)
else:
logger.debug("Netease: no suitable candidate found")
return ranked
except Exception as e:
logger.error(f"Netease: search failed: {e}")
return []
async def _get_lyric(
self, song_id: int, confidence: float = 0.0
) -> Optional[LyricResult]:
logger.debug(f"Netease: fetching lyrics for song_id={song_id}")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.post(
_NETEASE_LYRIC_URL,
headers=_NETEASE_BASE_HEADERS,
data={
"id": str(song_id),
"cp": "false",
"tv": "0",
"lv": "0",
"rv": "0",
"kv": "0",
"yv": "0",
"ytv": "0",
"yrv": "0",
},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
logger.error(
f"Netease: lyric response is not dict: {type(data).__name__}"
)
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
lrc_obj = data.get("lrc")
if not isinstance(lrc_obj, dict):
logger.debug(
f"Netease: no 'lrc' object in response for song_id={song_id}"
)
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrc: str = lrc_obj.get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
logger.debug(f"Netease: empty lyrics for song_id={song_id}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
logger.info(
f"Netease: got {status.value} lyrics for song_id={song_id} "
f"({len(lrcdata)} lines)"
)
return LyricResult(
status=status,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
)
except Exception as e:
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
logger.debug("Netease: skipped — insufficient metadata")
return None
logger.info(f"Netease: fetching lyrics for {track.display_name()}")
candidates = await self._search(track)
if not candidates:
logger.debug(f"Netease: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
for i, (song_id, confidence) in enumerate(candidates):
if i > 0:
await asyncio.sleep(MULTI_CANDIDATE_DELAY_S)
result = await self._get_lyric(song_id, confidence=confidence)
if result is None or result.status == CacheStatus.NETWORK_ERROR:
return result
if result.status != CacheStatus.NOT_FOUND:
return result
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+175
View File
@@ -0,0 +1,175 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-31 01:54:02
Description: QQ Music fetcher via self-hosted API proxy.
Requires a running qq-music-api instance.
The base URL is read from the QQ_MUSIC_API_URL environment variable.
Search → pick best match → fetch LRC lyrics.
"""
import asyncio
from typing import Optional
import httpx
from loguru import logger
from .base import BaseFetcher
from .selection import SearchCandidate, select_ranked
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
MULTI_CANDIDATE_DELAY_S,
)
_QQ_MUSIC_API_SEARCH_ENDPOINT = "/api/search"
_QQ_MUSIC_API_LYRIC_ENDPOINT = "/api/lyric"
from ..authenticators import QQMusicAuthenticator
class QQMusicFetcher(BaseFetcher):
def __init__(self, auth: QQMusicAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "qqmusic"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and self.auth.is_configured()
async def _search(
self, track: TrackMeta, limit: int = 10
) -> list[tuple[str, float]]:
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
return []
logger.debug(f"QQMusic: searching for '{query}' (limit={limit})")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(
f"{await self.auth.authenticate()}{_QQ_MUSIC_API_SEARCH_ENDPOINT}",
params={"keyword": query, "type": "song", "num": limit},
)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
logger.error(f"QQMusic: search API error: {data}")
return []
songs = data.get("data", {}).get("list", [])
if not songs:
logger.debug("QQMusic: search returned 0 results")
return []
logger.debug(f"QQMusic: search returned {len(songs)} candidates")
candidates = [
SearchCandidate(
item=mid,
duration_ms=float(song["interval"]) * 1000
if isinstance(song.get("interval"), int)
else None,
title=song.get("name"),
artist=", ".join(s.get("name", "") for s in song.get("singer", []))
or None,
album=(song.get("album") or {}).get("name"),
)
for song in songs
if isinstance(song, dict) and isinstance(mid := song.get("mid"), str)
]
ranked = select_ranked(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if ranked:
logger.debug(
"QQMusic: top candidates: "
+ ", ".join(f"mid={m} ({c:.0f})" for m, c in ranked)
)
else:
logger.debug("QQMusic: no suitable candidate found")
return ranked
except Exception as e:
logger.error(f"QQMusic: search failed: {e}")
return []
async def _get_lyric(
self, mid: str, confidence: float = 0.0
) -> Optional[LyricResult]:
logger.debug(f"QQMusic: fetching lyrics for mid={mid}")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(
f"{await self.auth.authenticate()}{_QQ_MUSIC_API_LYRIC_ENDPOINT}",
params={"mid": mid},
)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
logger.error(f"QQMusic: lyric API error: {data}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
lrc = data.get("data", {}).get("lyric", "")
if not isinstance(lrc, str) or not lrc.strip():
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} ({len(lrcdata)} lines)"
)
return LyricResult(
status=status,
lyrics=lrcdata,
source=self.source_name,
confidence=confidence,
)
except Exception as e:
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
if not self.auth.is_configured():
logger.debug("QQMusic: skipped — Auth not configured")
return None
query = f"{track.artist or ''} {track.title or ''}".strip()
if not query:
logger.debug("QQMusic: skipped — insufficient metadata")
return None
logger.info(f"QQMusic: fetching lyrics for {track.display_name()}")
candidates = await self._search(track)
if not candidates:
logger.debug(f"QQMusic: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
for i, (mid, confidence) in enumerate(candidates):
if i > 0:
await asyncio.sleep(MULTI_CANDIDATE_DELAY_S)
result = await self._get_lyric(mid, confidence=confidence)
if result is None or result.status == CacheStatus.NETWORK_ERROR:
return result
if result.status != CacheStatus.NOT_FOUND:
return result
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
+211
View File
@@ -0,0 +1,211 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-04 11:32:23
Description: Shared candidate-selection logic for search-based fetchers.
Each fetcher maps its API-specific results to SearchCandidate, then calls
select_best() which scores candidates by metadata similarity, duration
proximity, and sync status.
"""
from dataclasses import dataclass
from typing import Generic, Optional, TypeVar
from ..config import (
DURATION_TOLERANCE_MS,
MULTI_CANDIDATE_LIMIT,
SCORE_W_TITLE as _W_TITLE,
SCORE_W_ARTIST as _W_ARTIST,
SCORE_W_ALBUM as _W_ALBUM,
SCORE_W_DURATION as _W_DURATION,
SCORE_W_SYNCED as _W_SYNCED,
MIN_CONFIDENCE,
)
from ..normalize import normalize_for_match, normalize_artist
T = TypeVar("T")
@dataclass
class SearchCandidate(Generic[T]):
"""A normalized search result for best-match selection.
Attributes:
item: The original API-specific object (dict, ID, etc.)
duration_ms: Track duration in milliseconds, or None if unknown.
is_synced: Whether this candidate is known to have synced lyrics.
title: Candidate track title for similarity scoring.
artist: Candidate artist name for similarity scoring.
album: Candidate album name for similarity scoring.
"""
item: T
duration_ms: Optional[float] = None
is_synced: bool = False
title: Optional[str] = None
artist: Optional[str] = None
album: Optional[str] = None
def _text_similarity(a: str, b: str) -> float:
"""Compare two normalized strings. Returns 0.0-1.0."""
if a == b:
return 1.0
if not a or not b:
return 0.0
# Containment: one is a substring of the other (e.g. "My Love" vs "My Love (Album Version)")
if a in b or b in a:
return min(len(a), len(b)) / max(len(a), len(b))
return 0.0
def _score_candidate(
c: SearchCandidate[T],
ref_title: Optional[str],
ref_artist: Optional[str],
ref_album: Optional[str],
ref_length_ms: Optional[int],
) -> float:
"""Score a candidate from 0-100 based on metadata match quality.
Scoring works in two tiers:
1. **Metadata score** — computed from fields available on *both* sides,
then rescaled to fill the 0-90 range so that missing fields don't
inflate the score. Fields missing on both sides are simply excluded
from the calculation (neutral). Fields present on only one side
contribute 0 to the numerator but their weight still counts in the
denominator (penalty for asymmetric absence).
2. **Synced bonus** — a flat 10 pts, always applied independently.
Field weights (before rescaling):
- Title: 40
- Artist: 30
- Album: 10
- Duration: 10 (only when reference track has duration; hard mismatch is
pre-filtered before scoring)
"""
raw = 0.0
available_weight = 0.0
# Title
if ref_title is not None or c.title is not None:
available_weight += _W_TITLE
if ref_title is not None and c.title is not None:
raw += _W_TITLE * _text_similarity(
normalize_for_match(ref_title), normalize_for_match(c.title)
)
# else both None → excluded
# Artist
if ref_artist is not None or c.artist is not None:
available_weight += _W_ARTIST
if ref_artist is not None and c.artist is not None:
na = normalize_artist(ref_artist)
nb = normalize_artist(c.artist)
if na == nb:
raw += _W_ARTIST
else:
raw += _W_ARTIST * _text_similarity(
normalize_for_match(ref_artist), normalize_for_match(c.artist)
)
# Album
if ref_album is not None or c.album is not None:
available_weight += _W_ALBUM
if ref_album is not None and c.album is not None:
raw += _W_ALBUM * _text_similarity(
normalize_for_match(ref_album), normalize_for_match(c.album)
)
# Duration — only counted when the reference track has duration.
# If the candidate also has duration, it contributes positively when matching
# (hard mismatch is already filtered upstream in select_best).
# If the candidate lacks duration, it contributes 0 to raw but still counts
# in available_weight (penalty for missing verifiable info).
# If the reference has no duration, duration is excluded entirely (neutral).
if ref_length_ms is not None:
available_weight += _W_DURATION
if c.duration_ms is not None:
diff = abs(c.duration_ms - ref_length_ms)
if diff <= DURATION_TOLERANCE_MS:
raw += _W_DURATION * (1.0 - diff / DURATION_TOLERANCE_MS)
# Rescale metadata to 0-90 range
_MAX_METADATA = _W_TITLE + _W_ARTIST + _W_ALBUM + _W_DURATION # 90
if available_weight > 0:
metadata_score = (raw / available_weight) * _MAX_METADATA
else:
# No comparable fields at all — only synced bonus matters
metadata_score = 0.0
# Synced bonus (always 10 pts, independent of metadata)
synced_score = _W_SYNCED if c.is_synced else 0.0
return metadata_score + synced_score
def select_ranked(
candidates: list[SearchCandidate[T]],
track_length_ms: Optional[int] = None,
*,
title: Optional[str] = None,
artist: Optional[str] = None,
album: Optional[str] = None,
min_confidence: float = MIN_CONFIDENCE,
max_results: int = MULTI_CANDIDATE_LIMIT,
) -> list[tuple[T, float]]:
"""Score candidates and return top max_results above min_confidence, sorted by score descending."""
scored: list[tuple[T, float]] = []
for c in candidates:
if (
track_length_ms is not None
and c.duration_ms is not None
and abs(c.duration_ms - track_length_ms) > DURATION_TOLERANCE_MS
):
continue
s = _score_candidate(c, title, artist, album, track_length_ms)
if s >= min_confidence:
scored.append((c.item, s))
scored.sort(key=lambda x: x[1], reverse=True)
return scored[:max_results]
def select_best(
candidates: list[SearchCandidate[T]],
track_length_ms: Optional[int] = None,
*,
title: Optional[str] = None,
artist: Optional[str] = None,
album: Optional[str] = None,
min_confidence: float = MIN_CONFIDENCE,
) -> tuple[Optional[T], float]:
"""Pick the best candidate by confidence scoring.
Returns (item, score). Item is None if no candidate scores above min_confidence.
"""
if not candidates:
return None, 0.0
best_item: Optional[T] = None
best_score = -1.0
for c in candidates:
# Hard duration filter: both sides have duration but they don't match → skip.
if (
track_length_ms is not None
and c.duration_ms is not None
and abs(c.duration_ms - track_length_ms) > DURATION_TOLERANCE_MS
):
continue
s = _score_candidate(c, title, artist, album, track_length_ms)
if s > best_score:
best_score = s
best_item = c.item
if best_score < min_confidence:
return None, best_score
return best_item, best_score
+129
View File
@@ -0,0 +1,129 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 10:43:21
Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
"""
import httpx
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from ..authenticators.spotify import SpotifyAuthenticator, SPOTIFY_BASE_HEADERS
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import HTTP_TIMEOUT, TTL_NOT_FOUND, TTL_NETWORK_ERROR
_SPOTIFY_LYRICS_URL = "https://spclient.wg.spotify.com/color-lyrics/v2/track/"
class SpotifyFetcher(BaseFetcher):
def __init__(self, auth: SpotifyAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "spotify"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and self.auth.is_configured()
@staticmethod
def _format_lrc_line(start_ms: int, words: str) -> str:
minutes = start_ms // 60000
seconds = (start_ms // 1000) % 60
centiseconds = round((start_ms % 1000) / 10.0)
return f"[{minutes:02d}:{seconds:02d}.{centiseconds:02.0f}]{words}"
@staticmethod
def _is_truly_synced(lines: list[dict]) -> bool:
for line in lines:
try:
ms = int(line.get("startTimeMs", "0"))
if ms > 0:
return True
except (ValueError, TypeError):
continue
return False
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
if not track.trackid:
logger.debug("Spotify: skipped — no trackid in metadata")
return None
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
token = await self.auth.authenticate()
if not token:
logger.error("Spotify: cannot fetch lyrics without a token")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
url = f"{_SPOTIFY_LYRICS_URL}{track.trackid}?format=json&vocalRemoval=false&market=from_token"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {token}",
**SPOTIFY_BASE_HEADERS,
}
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
res = await client.get(url, headers=headers)
if res.status_code == 404:
logger.debug(f"Spotify: 404 for trackid={track.trackid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
if res.status_code != 200:
logger.error(f"Spotify: lyrics API returned {res.status_code}")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
data = res.json()
if not isinstance(data, dict) or "lyrics" not in data:
logger.error("Spotify: unexpected lyrics response structure")
return LyricResult(
status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR
)
lyrics_data = data["lyrics"]
sync_type = lyrics_data.get("syncType", "")
lines = lyrics_data.get("lines", [])
if not isinstance(lines, list) or len(lines) == 0:
logger.debug("Spotify: response contained no lyric lines")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
is_synced = sync_type == "LINE_SYNCED" and self._is_truly_synced(lines)
lrc_lines: list[str] = []
for line in lines:
words = line.get("words", "")
if not isinstance(words, str):
continue
try:
ms = int(line.get("startTimeMs", "0"))
except (ValueError, TypeError):
ms = 0
if is_synced:
lrc_lines.append(self._format_lrc_line(ms, words))
else:
lrc_lines.append(f"[00:00.00]{words}")
content = LRCData("\n".join(lrc_lines))
status = (
CacheStatus.SUCCESS_SYNCED
if is_synced
else CacheStatus.SUCCESS_UNSYNCED
)
logger.info(f"Spotify: got {status.value} lyrics ({len(lrc_lines)} lines)")
return LyricResult(status=status, lyrics=content, source=self.source_name)
except Exception as e:
logger.error(f"Spotify: lyrics fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+315
View File
@@ -0,0 +1,315 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 21:54:01
Description: Shared LRC time-tag utilities (definitely overengineered).
"""
import re
from pathlib import Path
from typing import Optional
from urllib.parse import unquote
from .models import CacheStatus
# Parses any time tag input format:
# [mm:ss], [mm:ss.c], [mm:ss.cc], [mm:ss.ccc], [mm:ss:cc], …
_RAW_TAG_RE = re.compile(r"\[(\d{2,}):(\d{2})(?:[.:](\d{1,3}))?\]")
# Standard format after normalization: [mm:ss.cc]
# _STD_TAG_RE = re.compile(r"\[\d{2,}:\d{2}\.\d{2}\]")
# Standard format with capture groups
_STD_TAG_CAPTURE_RE = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2})\]")
# [offset:+/-xxx] tag — value in milliseconds
_OFFSET_RE = re.compile(r"^\[offset:\s*([+-]?\d+)\]\s*$", re.MULTILINE | re.IGNORECASE)
# Any number of ID/Time tags at the start of a line
_LINE_START_TAGS_RE = re.compile(r"^(?:\[[^\]]*\])+", re.MULTILINE)
# Any number of standard time tags at the start of a line
_LINE_START_STD_TAGS_RE = re.compile(r"^(?:\[\d{2,}:\d{2}\.\d{2}\])+", re.MULTILINE)
# Word-level sync tags
# <mm:ss>, <mm:ss.c>, <mm:ss.cc>, <mm:ss:cc>, <xx,yy,zz>
_WORD_SYNC_TAG_RE = re.compile(r"<\d{2,}:\d{2}(?:[.:]\d{1,3})?>|<\d+,\d+,\d+>")
# QRC is totally a completely different matter. Since they are still providing standard LRC APIs,
# it might be a good idea to leave this mass to the future :)
def _remove_pattern(text: str, pattern: re.Pattern) -> str:
"""Remove all occurrences of pattern from text, then strip leading/trailing whitespace."""
return pattern.sub("", text).strip()
def _raw_tag_to_ms(mm: str, ss: str, frac: Optional[str]) -> int:
"""Convert parsed time tag components to total milliseconds."""
if frac is None:
ms = 0
else:
n = len(frac)
if n == 1:
ms = int(frac) * 100
elif n == 2:
ms = int(frac) * 10
else:
ms = int(frac)
return (int(mm) * 60 + int(ss)) * 1000 + ms
def _raw_tag_to_cs(mm: str, ss: str, frac: Optional[str]) -> str:
"""Convert parsed time tag components to standard [mm:ss.cc] string."""
if frac is None:
ms = 0
else:
# cc in [mm:ss:cc] is also treated as centiseconds, per LRC spec
# ^
# why does this format even exist, idk
n = len(frac)
if n == 1:
ms = int(frac) * 100
elif n == 2:
ms = int(frac) * 10
else:
ms = int(frac)
cs = min(round(ms / 10), 99)
return f"[{mm}:{ss}.{cs:02d}]"
def _sanitize_lyric_text(text: str) -> str:
"""Remove possibly word-sync time tags in lyric
Assumes the normal line-sync time tags are already stripped.
"""
return _remove_pattern(text, _WORD_SYNC_TAG_RE)
def _reformat(text: str) -> list[str]:
"""Parse each line and reformat to standard [mm:ss.cc]...content form.
Handles any mix of time tag formats on input. Lines with no time tags
are stripped of leading/trailing whitespace and passed through unchanged.
"""
out: list[str] = []
for line in text.splitlines():
line = line.strip()
pos = 0
tags: list[str] = []
while True:
while pos < len(line) and line[pos].isspace():
pos += 1
m = _RAW_TAG_RE.match(line, pos)
# Non-time tags are passed through as-is, except for leading/trailing whitespace which is stripped.
if not m:
# No more tags on this line
break
tags.append(_raw_tag_to_cs(m.group(1), m.group(2), m.group(3)))
pos = m.end()
if tags:
# This could break lyric lines of some kind of word-synced LRC format, e.g.
# [00:01.00]Lyric [00:02.00]line
# but such format were not planned to be supported in the first place, so…
out.append(_sanitize_lyric_text("".join(tags) + line[pos:]))
else:
out.append(line)
# Empty lines with no tags are also preserved
# Remove empty lines at the start and end of the whole text, but preserve blank lines in the middle
while out and not out[0].strip():
out.pop(0)
while out and not out[-1].strip():
out.pop()
return out
class LRCData:
_lines: list[str]
def __init__(self, text: str | None = None) -> None:
if not text:
self._lines = []
return
self._lines = _reformat(text)
self._apply_offset()
def __str__(self) -> str:
return "\n".join(self._lines)
def __repr__(self) -> str:
return f"LRCData(lines={self._lines!r})"
def __bool__(self) -> bool:
return len(self._lines) > 0
def __len__(self) -> int:
return len(self._lines)
def _apply_offset(self):
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
"""
m: Optional[re.Match] = None
for i, line in enumerate(self._lines):
m = _OFFSET_RE.search(line)
if m:
self._lines.pop(i)
break
if not m:
return
offset_ms = int(m.group(1))
if offset_ms == 0:
return
def _shift(match: re.Match) -> str:
total_ms = max(
0,
(int(match.group(1)) * 60 + int(match.group(2))) * 1000
+ int(match.group(3)) * 10
- offset_ms,
)
new_mm = total_ms // 60000
new_ss = (total_ms % 60000) // 1000
new_cs = min(round((total_ms % 1000) / 10), 99)
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
self._lines = [_STD_TAG_CAPTURE_RE.sub(_shift, line) for line in self._lines]
def is_synced(self) -> bool:
"""Check whether text contains non-zero LRC time tags.
Assumes text has been normalized by normalize (standard [mm:ss.cc] format).
"""
for line in self._lines:
for m in _STD_TAG_CAPTURE_RE.finditer(line):
if m.group(1) != "00" or m.group(2) != "00" or m.group(3) != "00":
return True
return False
def detect_sync_status(self) -> CacheStatus:
"""Determine whether lyrics contain meaningful LRC time tags.
Assumes text has been normalized by normalize.
"""
return (
CacheStatus.SUCCESS_SYNCED
if self.is_synced()
else CacheStatus.SUCCESS_UNSYNCED
)
def normalize_unsynced(self):
"""Normalize unsynced lyrics so every line has a [00:00.00] tag.
Assumes lyrics have been normalized by normalize.
- Lines that already have time tags: replace with [00:00.00]
- Lines without leading tags: prepend [00:00.00]
- Blank lines in middle are converted to [00:00.00]
"""
out: list[str] = []
first = True
for i, line in enumerate(self._lines):
stripped = line.strip()
if not stripped and not first:
out.append("[00:00.00]")
continue
elif not stripped:
# Skip leading blank lines
continue
first = False
cleaned = _remove_pattern(line, _LINE_START_STD_TAGS_RE)
out.append(f"[00:00.00]{cleaned}")
ret = LRCData()
ret._lines = out
return ret
def to_plain(
self,
deduplicate: bool = False,
) -> str:
"""Convert lyrics to plain text with all tags stripped.
If deduplicate is True, only keep the first line of consecutive lines with the same lyric text (after stripping tags).
Otherwise, lines with multiple time tags will be duplicated as many times as the number of tags.
Assumes text has been normalized by normalize.
"""
if not self.is_synced():
return "\n".join(
_remove_pattern(line, _LINE_START_TAGS_RE) for line in self._lines
).strip("\n")
tagged_lines = []
for line in self._lines:
pos = 0
tag_ms = []
while True:
# Only match strictly repeated standard time tags at the start of the line
# Lines without any time tags are ignored.
# Lyric lines are considered already stripped of whitespaces, so no strips here.
m = _STD_TAG_CAPTURE_RE.match(line, pos)
if not m:
lyric = line[pos:]
for tag in tag_ms:
tagged_lines.append((tag, lyric))
break
tag_ms.append(_raw_tag_to_ms(m.group(1), m.group(2), m.group(3)))
pos = m.end()
sorted_lines = [lyric for _, lyric in sorted(tagged_lines, key=lambda x: x[0])]
if deduplicate:
# Remove consecutive duplicates
deduped_lines = []
prev_line = None
for line in sorted_lines:
if line != prev_line:
deduped_lines.append(line)
prev_line = line
sorted_lines = deduped_lines
return "\n".join(sorted_lines).strip()
def to_lrc(
self,
plain: bool = False,
) -> str:
"""Return lyrics, optionally stripping tags.
Assumes text has been normalized by normalize.
"""
if plain:
return self.to_plain()
return "\n".join(self._lines)
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
"""Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist."""
if not audio_url.startswith("file://"):
return None
file_path = unquote(audio_url.replace("file://", "", 1))
path = Path(file_path)
if ensure_exists and not path.exists():
return None
return path
def get_sidecar_path(
audio_url: str,
ensure_audio_exists: bool = False,
ensure_exists: bool = False,
extension: str = ".lrc",
) -> Optional[Path]:
"""Given a file:// URL, return the corresponding .lrc sidecar path.
If ensure_audio_exists is True, return None if the audio file does not exist.
If ensure_exists is True, return None if the .lrc file does not exist.
"""
audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists)
if not audio_path:
return None
lrc_path = audio_path.with_suffix(extension)
if ensure_exists and not lrc_path.exists():
return None
return lrc_path
+74
View File
@@ -0,0 +1,74 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 04:09:36
Description: Data models.
"""
from __future__ import annotations
from enum import Enum
from typing import Optional, TYPE_CHECKING
from dataclasses import dataclass
from .config import SCORE_W_SYNCED
if TYPE_CHECKING:
from .lrc import LRCData
class CacheStatus(str, Enum):
"""Status of a cached lyric entry."""
SUCCESS_SYNCED = "SUCCESS_SYNCED"
SUCCESS_UNSYNCED = "SUCCESS_UNSYNCED"
NOT_FOUND = "NOT_FOUND"
NETWORK_ERROR = "NETWORK_ERROR"
@dataclass
class TrackMeta:
"""Metadata describing a track obtained from MPRIS or manual input."""
trackid: Optional[str] = None # Spotify track ID (without "spotify:track:" prefix)
length: Optional[int] = None # Duration in milliseconds
album: Optional[str] = None
artist: Optional[str] = None
title: Optional[str] = None
url: Optional[str] = None # Playback URL (file:// for local files)
@property
def is_local(self) -> bool:
"""True when the track is a local file (file:// URL)."""
return bool(self.url and self.url.startswith("file://"))
@property
def is_complete(self) -> bool:
"""True when all fields required by LRCLIB are present."""
return all([self.length, self.album, self.title, self.artist])
def display_name(self) -> str:
"""Human-readable representation for logging."""
parts = []
if self.artist:
parts.append(self.artist)
if self.title:
parts.append(self.title)
return " - ".join(parts) if parts else self.trackid or self.url or "(unknown)"
@dataclass
class LyricResult:
"""Result of a lyric fetch attempt, also used as cache record."""
status: CacheStatus
lyrics: Optional[LRCData] = None
source: Optional[str] = None # Which fetcher produced this result
ttl: Optional[int] = None # Hint for cache TTL (seconds)
confidence: float = 100.0 # 0-100 selection confidence (100 = trusted/exact)
def __post_init__(self) -> None:
if self.status in (CacheStatus.NOT_FOUND, CacheStatus.NETWORK_ERROR):
self.confidence = 0.0
if self.status is CacheStatus.SUCCESS_UNSYNCED and self.confidence == 100.0:
# Fix: remove inflated confidence for unsynced results
self.confidence = 100 - SCORE_W_SYNCED
+190
View File
@@ -0,0 +1,190 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-03-25 04:44:15
Description: MPRIS integration for fetching track metadata.
"""
import asyncio
from dbus_next.aio.message_bus import MessageBus
from dbus_next.constants import BusType
from dbus_next.message import Message
from lrx_cli.models import TrackMeta
from lrx_cli.config import PREFERRED_PLAYER
from loguru import logger
from typing import Optional, List, Any
async def _list_mpris_players(bus: MessageBus) -> List[str]:
"""List all MPRIS player bus names."""
try:
reply = await bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="ListNames",
)
)
if not reply or not reply.body:
return []
return [
name for name in reply.body[0] if name.startswith("org.mpris.MediaPlayer2.")
]
except Exception as e:
logger.error(f"Failed to list DBus names: {e}")
return []
async def _get_playback_status(bus: MessageBus, player_name: str) -> Optional[str]:
"""Get PlaybackStatus ('Playing', 'Paused', 'Stopped') for a player."""
try:
introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
proxy = bus.get_proxy_object(
player_name, "/org/mpris/MediaPlayer2", introspection
)
props = proxy.get_interface("org.freedesktop.DBus.Properties")
status_var = await getattr(props, "call_get")(
"org.mpris.MediaPlayer2.Player", "PlaybackStatus"
)
return status_var.value if status_var else None
except Exception as e:
logger.debug(f"Could not get playback status for {player_name}: {e}")
return None
async def _select_player(
bus: MessageBus, specific_player: Optional[str] = None
) -> Optional[str]:
"""Select the best MPRIS player.
When specific_player is given, filter by name match.
Otherwise: prefer the currently playing player. If multiple are playing,
prefer the one matching PREFERRED_PLAYER env var (default: spotify).
"""
players = await _list_mpris_players(bus)
if not players:
return None
if specific_player:
players = [p for p in players if specific_player.lower() in p.lower()]
return players[0] if players else None
# Check playback status for each player
playing = []
for p in players:
status = await _get_playback_status(bus, p)
logger.debug(f"Player {p}: {status}")
if status == "Playing":
playing.append(p)
candidates = playing if playing else players
if len(candidates) == 1:
return candidates[0]
# Multiple candidates: prefer PREFERRED_PLAYER
preferred = PREFERRED_PLAYER.lower()
if preferred:
for p in candidates:
if preferred in p.lower():
return p
return candidates[0]
async def _fetch_metadata_dbus(
specific_player: Optional[str] = None,
) -> Optional[TrackMeta]:
bus = None
try:
bus = await MessageBus(bus_type=BusType.SESSION).connect()
except Exception as e:
logger.error(f"Failed to connect to DBus: {e}")
return None
try:
player_name = await _select_player(bus, specific_player)
if not player_name:
logger.debug(
f"No active MPRIS players found via DBus{' for ' + specific_player if specific_player else ''}."
)
return None
logger.debug(f"Using player: {player_name}")
introspection = await bus.introspect(player_name, "/org/mpris/MediaPlayer2")
proxy = bus.get_proxy_object(
player_name, "/org/mpris/MediaPlayer2", introspection
)
props_iface = proxy.get_interface("org.freedesktop.DBus.Properties")
if not props_iface:
logger.error(f"Player {player_name} doesn't support Properties interface.")
return None
try:
metadata_var: Any = await getattr(props_iface, "call_get")(
"org.mpris.MediaPlayer2.Player", "Metadata"
)
if not metadata_var:
logger.error("Empty metadata received.")
return None
metadata = metadata_var.value
# Extract trackid — MPRIS returns either "spotify:track:ID"
# or a DBus object path like "/com/spotify/track/ID"
trackid = metadata.get("mpris:trackid", None)
if trackid:
trackid = trackid.value
if isinstance(trackid, str):
if trackid.startswith("spotify:track:"):
trackid = trackid.removeprefix("spotify:track:")
elif trackid.startswith("/com/spotify/track/"):
trackid = trackid.removeprefix("/com/spotify/track/")
else:
trackid = None
# Extract length (usually microseconds)
length = metadata.get("mpris:length", None)
if length:
length = length.value // 1000 if isinstance(length.value, int) else None
album = metadata.get("xesam:album", None)
album = album.value if album else None
artist = metadata.get("xesam:artist", None)
artist = (
artist.value[0]
if artist and isinstance(artist.value, list) and artist.value
else None
)
title = metadata.get("xesam:title", None)
title = title.value if title else None
url = metadata.get("xesam:url", None)
url = url.value if url else None
return TrackMeta(
trackid=trackid,
length=length,
album=album,
artist=artist,
title=title,
url=url,
)
except Exception as e:
logger.error(f"Failed to get properties from {player_name}: {e}")
return None
finally:
if bus:
bus.disconnect()
def get_current_track(player_name: Optional[str] = None) -> Optional[TrackMeta]:
try:
return asyncio.run(_fetch_metadata_dbus(player_name))
except Exception as e:
logger.error(f"DBus async loop failed: {e}")
return None
+48
View File
@@ -0,0 +1,48 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-02 05:24:27
Description: Shared text normalization utilities for fuzzy matching.
Used by cache key generation, cache search, and candidate selection scoring.
"""
import re
import unicodedata
# Punctuation to strip for fuzzy matching (ASCII + fullwidth + CJK brackets/symbols)
_PUNCT_RE = re.compile(
r"[~!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/\\`"
r"~!@#$%^&*()_+-=【】{}|;:'",。<>?/\`"
r"「」『』《》〈〉〔〕·•‥…—–]"
)
_SPACE_RE = re.compile(r"\s+")
# feat./ft./featuring and everything after (case-insensitive, word boundary)
_FEAT_RE = re.compile(r"\s*(?:\bfeat\.?\b|\bft\.?\b|\bfeaturing\b).*", re.IGNORECASE)
# Multi-artist separators: /, &, ×, x (surrounded by spaces), ;, 、, vs.
_ARTIST_SEP_RE = re.compile(r"\s*(?:[/&;×、]|\bvs\.?\b|\bx\b)\s*", re.IGNORECASE)
def normalize_for_match(s: str) -> str:
"""Normalize a string for fuzzy comparison.
Lowercases, NFKC-normalizes (fullwidth → halfwidth), strips punctuation,
and collapses whitespace.
"""
s = unicodedata.normalize("NFKC", s).lower()
s = _FEAT_RE.sub("", s)
s = _PUNCT_RE.sub(" ", s)
s = _SPACE_RE.sub(" ", s).strip()
return s
def normalize_artist(s: str) -> str:
"""Normalize an artist string: split by separators, normalize each, sort.
Splits first (on /, &, ;, ×, 、, vs., x), then strips feat./ft./featuring
from each part individually, so 'A feat. C / B' → ['a', 'b'] not just ['a'].
"""
s = unicodedata.normalize("NFKC", s).lower()
parts = _ARTIST_SEP_RE.split(s)
normed = sorted(
{normalize_for_match(p) for p in parts if _FEAT_RE.sub("", p).strip()}
)
return "\0".join(normed) if normed else normalize_for_match(s)