feat: auth: add auth module

This commit is contained in:
2026-04-05 02:36:10 +02:00
parent 1ed51fdbdb
commit 449952c6c1
19 changed files with 711 additions and 375 deletions
+6 -4
View File
@@ -14,9 +14,9 @@ Sources are queried in order. High-confidence results (exact match or manual ins
2. **Cache Search** — fuzzy cross-album lookup in local cache
3. **Spotify** — synced lyrics via Spotify's API (requires `SPOTIFY_SP_DC` and Spotify trackid)
4. **LRCLIB** — exact match from [lrclib.net](https://lrclib.net) (requires full metadata)
5. **Musixmatch (Spotify)** — Musixmatch API with Spotify trackid (requires `MUSIXMATCH_USERTOKEN` and Spotify trackid)
5. **Musixmatch (Spotify)** — Musixmatch API with Spotify trackid (requires Spotify trackid)
6. **LRCLIB Search** — fuzzy search from lrclib.net (requires at least a title)
7. **Musixmatch** — Musixmatch API with metadata search (requires `MUSIXMATCH_USERTOKEN` and at least a title)
7. **Musixmatch** — Musixmatch API with metadata search (requires at least a title)
8. **Netease** — Netease Cloud Music public API
9. **QQ Music** — QQ Music via self-hosted API proxy (requires `QQ_MUSIC_API_URL` that provides the same interface as [tooplick/qq-music-api](https://github.com/tooplick/qq-music-api))
@@ -35,7 +35,7 @@ See `lrx --help` for full command reference. Common use cases:
targeting a specific player and a source to fetch from:
```bash
lrx --player mpd fetch --method lrclib-search
lrx fetch --player mpd --method lrclib-search
```
- Search by metadata (bypasses MPRIS):
@@ -85,7 +85,7 @@ PREFERRED_PLAYER=spotify
```
- `SPOTIFY_SP_DC` — required for Spotify source. Defaults to empty (disabled Spotify source).
- `MUSIXMATCH_USERTOKEN` — required for Musixmatch sources ([Curators Settings Page](https://curators.musixmatch.com/settings) -> Login (if required) -> "Copy debug info")
- `MUSIXMATCH_USERTOKEN` — optional for Musixmatch sources ([Curators Settings Page](https://curators.musixmatch.com/settings) -> Login (if required) -> "Copy debug info"). If not set, an anonymous token will be fetched at runtime.
- `QQ_MUSIC_API_URL` — required for QQ Music source. Defaults to empty (disabled QQ Music source).
- `PREFERRED_PLAYER` — preferred MPRIS player when multiple are active. Defaults to `spotify`. Only used when no `--player` flag is given and more than one player (or none of them) is currently playing.
@@ -102,4 +102,6 @@ lrx --install-completion
- [librelyrics-spotify](https://github.com/libre-lyrics/librelyrics-spotify)
- [NeteaseCloudMusicAPI](https://www.npmjs.com/package/NeteaseCloudMusicApi?activeTab=readme)
- [qq-music-api](https://github.com/tooplick/qq-music-api)
- [LyricsMPRIS-Rust](https://github.com/BEST8OY/LyricsMPRIS-Rust)
- [onetagger](https://github.com/Marekkon5/onetagger)
- [Rise Media Player](https://github.com/theimpactfulcompany/Rise-Media-Player)
+29
View File
@@ -0,0 +1,29 @@
"""
Author: Uyanide pywang0608@foxmail.com
Description: Credential authenticators for third-party provider APIs
"""
from lrx_cli.authenticators.qqmusic import QQMusicAuthenticator
from .base import BaseAuthenticator
from .spotify import SpotifyAuthenticator
from .musixmatch import MusixmatchAuthenticator
from .dummy import DummyAuthenticator
__all__ = [
"BaseAuthenticator",
"SpotifyAuthenticator",
"MusixmatchAuthenticator",
"QQMusicAuthenticator",
"DummyAuthenticator",
]
def create_authenticators(cache) -> dict[str, BaseAuthenticator]:
"""Factory function to create authenticators with cache access."""
return {
"dummy": DummyAuthenticator(),
"spotify": SpotifyAuthenticator(cache),
"musixmatch": MusixmatchAuthenticator(cache),
"qqmusic": QQMusicAuthenticator(),
}
+32
View File
@@ -0,0 +1,32 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:18:14
Description: Base class for credential authenticators
"""
from abc import ABC, abstractmethod
from typing import Optional
class BaseAuthenticator(ABC):
"""Manages obtaining, caching, and refreshing a credential for one provider."""
@property
@abstractmethod
def name(self) -> str: ...
def is_configured(self) -> bool:
"""True if the prerequisite config (e.g. env var) is present.
Default is True — authenticators that can obtain credentials anonymously
should not override this.
"""
return True
@abstractmethod
async def authenticate(self) -> Optional[str]:
"""Return current valid credential string, refreshing if needed.
Returns None if unavailable (misconfigured or network failure).
"""
...
+19
View File
@@ -0,0 +1,19 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:36:44
Description:
"""
from .base import BaseAuthenticator
class DummyAuthenticator(BaseAuthenticator):
@property
def name(self) -> str:
return "dummy"
def is_configured(self) -> bool:
return True
async def authenticate(self) -> None:
return None
+160
View File
@@ -0,0 +1,160 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:27:56
Description: Musixmatch authenticator — token management, 401 retry, and cooldown
"""
import time
from typing import Optional
from urllib.parse import urlencode
import httpx
from loguru import logger
from .base import BaseAuthenticator
from ..cache import CacheEngine
from ..config import (
HTTP_TIMEOUT,
MUSIXMATCH_TOKEN_URL,
MUSIXMATCH_USERTOKEN,
MUSIXMATCH_COOLDOWN_MS,
)
_MXM_HEADERS = {"Cookie": "x-mxm-token-guid="}
_MXM_BASE_PARAMS = {
"format": "json",
"app_id": "web-desktop-app-v1.0",
}
class MusixmatchAuthenticator(BaseAuthenticator):
def __init__(self, cache: CacheEngine) -> None:
self._cache = cache
self._cached_token: Optional[str] = None
self._cooldown_until_ms: int = 0
@property
def name(self) -> str:
return "musixmatch"
def is_configured(self) -> bool:
return True # anonymous token always available
def is_cooldown(self) -> bool:
"""Return True if Musixmatch requests are blocked due to repeated auth failure."""
now_ms = int(time.time() * 1000)
if self._cooldown_until_ms > now_ms:
return True
data = self._cache.get_credential("musixmatch_cooldown")
if data:
until = data.get("until_ms", 0)
if until > now_ms:
self._cooldown_until_ms = until
return True
return False
def _set_cooldown(self) -> None:
now_ms = int(time.time() * 1000)
until_ms = now_ms + MUSIXMATCH_COOLDOWN_MS
self._cooldown_until_ms = until_ms
self._cache.set_credential(
"musixmatch_cooldown",
{"until_ms": until_ms},
expires_at_ms=until_ms,
)
logger.warning("Musixmatch: token unavailable, entering cooldown for 1 hour")
def _invalidate_token(self) -> None:
"""Discard the current token from memory and DB."""
self._cached_token = None
# Store with an already-expired timestamp so get_credential returns None
self._cache.set_credential("musixmatch", {"token": ""}, expires_at_ms=1)
async def _fetch_new_token(self) -> Optional[str]:
"""Call token.get and persist the result. Returns token string or None."""
params = {
**_MXM_BASE_PARAMS,
"user_language": "en",
"t": str(int(time.time() * 1000)),
}
url = f"{MUSIXMATCH_TOKEN_URL}?{urlencode(params)}"
logger.debug("Musixmatch: fetching anonymous token")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning(f"Musixmatch: token fetch failed: {e}")
return None
token = (
data.get("message", {}).get("body", {}).get("user_token")
if isinstance(data, dict)
else None
)
if not isinstance(token, str) or not token:
logger.warning("Musixmatch: unexpected token.get response structure")
return None
self._cached_token = token
# No expiry — token is valid until we get a 401
self._cache.set_credential("musixmatch", {"token": token}, expires_at_ms=None)
logger.debug("Musixmatch: obtained anonymous token")
return token
async def _get_token(self) -> Optional[str]:
"""Return a valid token: env var > memory > DB > fresh fetch."""
if MUSIXMATCH_USERTOKEN:
return MUSIXMATCH_USERTOKEN
if self._cached_token:
return self._cached_token
data = self._cache.get_credential("musixmatch")
if data and isinstance(data.get("token"), str) and data["token"]:
self._cached_token = data["token"]
return self._cached_token
return await self._fetch_new_token()
async def authenticate(self) -> Optional[str]:
if self.is_cooldown():
logger.debug("Musixmatch: authenticate called during cooldown")
return None
return await self._get_token()
async def get_json(self, url_base: str, params: dict) -> Optional[dict]:
"""Authenticated GET to a Musixmatch endpoint.
- Injects format, app_id, and usertoken automatically.
- On 401: invalidates token, fetches a fresh one, retries once.
- On failed token fetch (initial or retry): sets cooldown, returns None.
- On network / HTTP error: raises (callers map this to NETWORK_ERROR).
- Returns None if cooldown is active.
"""
if self.is_cooldown():
logger.debug("Musixmatch: request blocked by cooldown")
return None
token = await self._get_token()
if not token:
self._set_cooldown()
return None
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
url = f"{url_base}?{urlencode({**_MXM_BASE_PARAMS, **params, 'usertoken': token})}"
resp = await client.get(url, headers=_MXM_HEADERS)
if resp.status_code == 401:
logger.debug("Musixmatch: 401 received, refreshing token")
self._invalidate_token()
token = await self._fetch_new_token()
if not token:
self._set_cooldown()
return None
url = f"{url_base}?{urlencode({**_MXM_BASE_PARAMS, **params, 'usertoken': token})}"
resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status()
return resp.json()
+25
View File
@@ -0,0 +1,25 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:47:30
Description: QQ Music API authenticator - currently only a proxy
"""
from typing import Optional
from .base import BaseAuthenticator
from ..config import QQ_MUSIC_API_URL
class QQMusicAuthenticator(BaseAuthenticator):
def __init__(self) -> None:
pass
@property
def name(self) -> str:
return "qqmusic"
def is_configured(self) -> bool:
return bool(QQ_MUSIC_API_URL)
async def authenticate(self) -> Optional[str]:
return QQ_MUSIC_API_URL
+203
View File
@@ -0,0 +1,203 @@
"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-05 03:18:14
Description: Spotify authenticator — TOTP-based access token via SP_DC cookie
"""
import hashlib
import hmac
import struct
import time
from typing import Optional, Tuple
import httpx
from loguru import logger
from .base import BaseAuthenticator
from ..cache import CacheEngine
from ..config import (
HTTP_TIMEOUT,
SPOTIFY_SERVER_TIME_URL,
SPOTIFY_SECRET_URL,
SPOTIFY_SP_DC,
SPOTIFY_TOKEN_URL,
UA_BROWSER,
)
_SPOTIFY_BASE_HEADERS = {
"Referer": "https://open.spotify.com/",
"Origin": "https://open.spotify.com",
"App-Platform": "WebPlayer",
"Spotify-App-Version": "1.2.88.21.g8e037c8f",
}
class SpotifyAuthenticator(BaseAuthenticator):
def __init__(self, cache: CacheEngine) -> None:
self._cache = cache
self._cached_secret: Optional[Tuple[str, int]] = None
self._cached_token: Optional[str] = None
self._token_expires_at: float = 0.0
@property
def name(self) -> str:
return "spotify"
def is_configured(self) -> bool:
return bool(SPOTIFY_SP_DC)
@staticmethod
def _generate_totp(server_time_s: int, secret: str) -> str:
counter = server_time_s // 30
counter_bytes = struct.pack(">Q", counter)
mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
offset = mac[-1] & 0x0F
binary_code = (
(mac[offset] & 0x7F) << 24
| (mac[offset + 1] & 0xFF) << 16
| (mac[offset + 2] & 0xFF) << 8
| (mac[offset + 3] & 0xFF)
)
return str(binary_code % (10**6)).zfill(6)
def _load_cached_token(self) -> Optional[str]:
data = self._cache.get_credential("spotify")
if not data:
return None
expires_ms = data.get("accessTokenExpirationTimestampMs", 0)
if expires_ms <= int(time.time() * 1000):
logger.debug("Spotify: persisted token expired")
return None
token = data.get("accessToken", "")
if not token:
return None
self._cached_token = token
self._token_expires_at = expires_ms / 1000.0
logger.debug("Spotify: loaded token from DB cache")
return token
def _save_token(self, body: dict) -> None:
expires_ms = body.get("accessTokenExpirationTimestampMs")
self._cache.set_credential("spotify", body, expires_ms)
logger.debug("Spotify: token saved to DB cache")
async def _get_server_time(self, client: httpx.AsyncClient) -> Optional[int]:
try:
res = await client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, dict) or "serverTime" not in data:
logger.error(f"Spotify: unexpected server-time response: {data}")
return None
server_time = data["serverTime"]
logger.debug(f"Spotify: server time = {server_time}")
return server_time
except Exception as e:
logger.error(f"Spotify: failed to fetch server time: {e}")
return None
async def _get_secret(self, client: httpx.AsyncClient) -> Optional[Tuple[str, int]]:
if self._cached_secret is not None:
logger.debug("Spotify: using cached TOTP secret")
return self._cached_secret
try:
res = await client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, list) or len(data) == 0:
logger.error(
f"Spotify: unexpected secrets response (type={type(data).__name__})"
)
return None
last = data[-1]
if "secret" not in last or "version" not in last:
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
return None
secret_raw = last["secret"]
version = last["version"]
secret = "".join(
str(ord(c) ^ ((i % 33) + 9)) for i, c in enumerate(secret_raw)
)
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
self._cached_secret = (secret, version)
return self._cached_secret
except Exception as e:
logger.error(f"Spotify: failed to fetch secret: {e}")
return None
async def authenticate(self) -> Optional[str]:
if self._cached_token and time.time() < self._token_expires_at - 30:
logger.debug("Spotify: using in-memory cached token")
return self._cached_token
db_token = self._load_cached_token()
if db_token and time.time() < self._token_expires_at - 30:
return db_token
if not SPOTIFY_SP_DC:
logger.error("Spotify: SPOTIFY_SP_DC env var not set — cannot authenticate")
return None
headers = {
"User-Agent": UA_BROWSER,
"Accept": "*/*",
"Cookie": f"sp_dc={SPOTIFY_SP_DC}",
**_SPOTIFY_BASE_HEADERS,
}
async with httpx.AsyncClient(headers=headers) as client:
server_time = await self._get_server_time(client)
if server_time is None:
return None
secret_data = await self._get_secret(client)
if secret_data is None:
return None
secret, version = secret_data
totp = self._generate_totp(server_time, secret)
logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
params = {
"reason": "init",
"productType": "web-player",
"totp": totp,
"totpVer": str(version),
"totpServer": totp,
}
try:
res = await client.get(
SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT
)
if res.status_code != 200:
logger.error(f"Spotify: token request returned {res.status_code}")
return None
body = res.json()
if not isinstance(body, dict) or "accessToken" not in body:
logger.error(
f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}"
)
return None
token = body["accessToken"]
if body.get("isAnonymous", False):
logger.warning(
"Spotify: received anonymous token — SP_DC may be invalid"
)
expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
if expires_ms and expires_ms > int(time.time() * 1000):
self._token_expires_at = expires_ms / 1000.0
else:
logger.warning("Spotify: token expiry missing or invalid")
self._token_expires_at = time.time() + 3600
self._cached_token = token
self._save_token(body)
logger.debug("Spotify: obtained access token")
return token
except Exception as e:
logger.error(f"Spotify: token request failed: {e}")
return None
+37
View File
@@ -4,6 +4,7 @@ Date: 2026-03-25 10:18:03
Description: SQLite-based lyric cache with per-source storage and TTL expiration
"""
import json
import sqlite3
import hashlib
import time
@@ -74,6 +75,13 @@ class CacheEngine:
album TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS credentials (
name TEXT PRIMARY KEY,
data TEXT NOT NULL,
expires_at INTEGER
)
""")
# Migrations
cols = {r[1] for r in conn.execute("PRAGMA table_info(cache)").fetchall()}
if "length" not in cols:
@@ -442,6 +450,35 @@ class CacheEngine:
).fetchall()
]
# Credentials
def get_credential(self, name: str) -> Optional[dict]:
"""Return cached credential data if present and not expired."""
now_ms = int(time.time() * 1000)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT data FROM credentials WHERE name = ? AND (expires_at IS NULL OR expires_at > ?)",
(name, now_ms),
).fetchone()
if row is None:
return None
try:
return json.loads(row["data"])
except (json.JSONDecodeError, KeyError):
return None
def set_credential(
self, name: str, data: dict, expires_at_ms: Optional[int] = None
) -> None:
"""Persist credential data, optionally with an expiry timestamp (Unix ms)."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT OR REPLACE INTO credentials (name, data, expires_at) VALUES (?, ?, ?)",
(name, json.dumps(data), expires_at_ms),
)
conn.commit()
def query_all(self) -> list[dict]:
"""Return every row in the cache table."""
with sqlite3.connect(self.db_path) as conn:
+2 -1
View File
@@ -66,7 +66,6 @@ SPOTIFY_SECRET_URL = (
"/refs/heads/main/secrets/secrets.json"
)
SPOTIFY_SP_DC = os.environ.get("SPOTIFY_SP_DC", "")
SPOTIFY_TOKEN_CACHE_FILE = os.path.join(CACHE_DIR, "spotify_token.json")
# Netease api
NETEASE_SEARCH_URL = "https://music.163.com/api/cloudsearch/pc"
@@ -81,11 +80,13 @@ QQ_MUSIC_API_URL = os.environ.get("QQ_MUSIC_API_URL", "").rstrip("/")
# Musixmatch desktop API
MUSIXMATCH_USERTOKEN = os.environ.get("MUSIXMATCH_USERTOKEN", "")
MUSIXMATCH_TOKEN_URL = "https://apic-desktop.musixmatch.com/ws/1.1/token.get"
MUSIXMATCH_SEARCH_URL = "https://apic-desktop.musixmatch.com/ws/1.1/track.search"
MUSIXMATCH_MACRO_URL = "https://apic-desktop.musixmatch.com/ws/1.1/macro.subtitles.get"
MUSIXMATCH_TRACK_MATCH_URL = (
"https://apic-desktop.musixmatch.com/ws/1.1/matcher.track.get"
)
MUSIXMATCH_COOLDOWN_MS = 600_000 # 10 minutes
# Player preference (used when multiple MPRIS players are active)
PREFERRED_PLAYER = os.environ.get("PREFERRED_PLAYER", "spotify")
+7 -4
View File
@@ -10,6 +10,7 @@ from loguru import logger
from .fetchers import FetcherMethodType, build_plan, create_fetchers
from .fetchers.base import BaseFetcher
from .authenticators import create_authenticators
from .cache import CacheEngine
from .lrc import LRCData
from .config import (
@@ -20,7 +21,7 @@ from .config import (
HIGH_CONFIDENCE,
)
from .models import TrackMeta, LyricResult, CacheStatus
from .enrichers import enrich_track
from .enrichers import create_enrichers, enrich_track
# Maps CacheStatus to the default TTL used when storing results
@@ -61,7 +62,9 @@ class LrcManager:
def __init__(self, db_path: str) -> None:
self.cache = CacheEngine(db_path=db_path)
self.fetchers = create_fetchers(self.cache)
self.authenticators = create_authenticators(self.cache)
self.fetchers = create_fetchers(self.cache, self.authenticators)
self.enrichers = create_enrichers(self.authenticators)
async def _run_group(
self,
@@ -161,7 +164,7 @@ class LrcManager:
force_method: Optional[FetcherMethodType],
bypass_cache: bool,
) -> Optional[LyricResult]:
track = await enrich_track(track)
track = await enrich_track(track, self.enrichers)
logger.info(f"Fetching lyrics for: {track.display_name()}")
plan = build_plan(self.fetchers, track, force_method)
@@ -217,7 +220,7 @@ class LrcManager:
lyrics: str,
) -> None:
"""Manually insert lyrics into the cache for a track."""
track = asyncio.run(enrich_track(track))
track = asyncio.run(enrich_track(track, self.enrichers))
logger.info(f"Manually inserting lyrics for: {track.display_name()}")
lrc = LRCData(lyrics)
result = LyricResult(
+16 -7
View File
@@ -10,24 +10,33 @@ from .base import BaseEnricher
from .audio_tag import AudioTagEnricher
from .file_name import FileNameEnricher
from .musixmatch import MusixmatchSpotifyEnricher
from ..authenticators import BaseAuthenticator, MusixmatchAuthenticator
from ..models import TrackMeta
# Enrichers run in order; earlier ones have higher priority.
# There are only a few of them, so we can just call them sequentially without worrying about async concurrency or batching.
_ENRICHERS: list[BaseEnricher] = [
AudioTagEnricher(),
FileNameEnricher(),
MusixmatchSpotifyEnricher(),
]
async def enrich_track(track: TrackMeta) -> TrackMeta:
def create_enrichers(
authenticators: dict[str, BaseAuthenticator],
) -> list[BaseEnricher]:
"""Instantiate all enrichers."""
mxm_auth = authenticators["musixmatch"]
assert isinstance(mxm_auth, MusixmatchAuthenticator)
return [
AudioTagEnricher(),
FileNameEnricher(),
MusixmatchSpotifyEnricher(mxm_auth),
]
async def enrich_track(track: TrackMeta, enrichers: list[BaseEnricher]) -> TrackMeta:
"""Run all enrichers and return a track with missing fields filled in.
Each enricher sees the cumulative state (earlier enrichers' results
are already applied). A field is only set if it is currently None.
"""
for enricher in _ENRICHERS:
for enricher in enrichers:
try:
# Skip if all provided fields are already filled
if all(
+13 -24
View File
@@ -5,30 +5,21 @@ Description: Musixmatch metadata enricher (matcher.track.get by Spotify track ID
"""
from typing import Optional
from urllib.parse import urlencode
import httpx
from loguru import logger
from .base import BaseEnricher
from ..authenticators.musixmatch import MusixmatchAuthenticator
from ..models import TrackMeta
from ..config import (
HTTP_TIMEOUT,
MUSIXMATCH_TRACK_MATCH_URL,
MUSIXMATCH_USERTOKEN,
)
_MXM_HEADERS = {"Cookie": "x-mxm-token-guid="}
_MXM_TRACK_MATCH_BASE_PARAMS = {
"format": "json",
"app_id": "web-desktop-app-v1.0",
"usertoken": MUSIXMATCH_USERTOKEN,
}
from ..config import MUSIXMATCH_TRACK_MATCH_URL
class MusixmatchSpotifyEnricher(BaseEnricher):
"""Fill title, artist, album, and length from Musixmatch using Spotify track ID."""
def __init__(self, auth: MusixmatchAuthenticator) -> None:
self.auth = auth
@property
def name(self) -> str:
return "musixmatch"
@@ -38,25 +29,23 @@ class MusixmatchSpotifyEnricher(BaseEnricher):
return {"title", "artist", "album", "length"}
async def enrich(self, track: TrackMeta) -> Optional[dict]:
if not track.trackid or not MUSIXMATCH_USERTOKEN:
if not track.trackid:
return None
params = {
**_MXM_TRACK_MATCH_BASE_PARAMS,
"track_spotify_id": track.trackid,
}
url = f"{MUSIXMATCH_TRACK_MATCH_URL}?{urlencode(params)}"
logger.debug(f"Musixmatch enricher: looking up trackid={track.trackid}")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status()
data = resp.json()
data = await self.auth.get_json(
MUSIXMATCH_TRACK_MATCH_URL,
{"track_spotify_id": track.trackid},
)
except Exception as e:
logger.warning(f"Musixmatch enricher: request failed: {e}")
return None
if data is None:
return None
body = data.get("message", {}).get("body")
t = body.get("track") if isinstance(body, dict) else None
if not isinstance(t, dict):
+20 -5
View File
@@ -16,6 +16,12 @@ from .lrclib_search import LrclibSearchFetcher
from .musixmatch import MusixmatchFetcher, MusixmatchSpotifyFetcher
from .netease import NeteaseFetcher
from .qqmusic import QQMusicFetcher
from ..authenticators import (
BaseAuthenticator,
SpotifyAuthenticator,
MusixmatchAuthenticator,
QQMusicAuthenticator,
)
from ..cache import CacheEngine
from ..models import TrackMeta
@@ -43,18 +49,27 @@ _FETCHER_GROUPS: list[list[FetcherMethodType]] = [
]
def create_fetchers(cache: CacheEngine) -> dict[FetcherMethodType, BaseFetcher]:
def create_fetchers(
cache: CacheEngine,
authenticators: dict[str, BaseAuthenticator],
) -> dict[FetcherMethodType, BaseFetcher]:
"""Instantiate all fetchers. Returns a dict keyed by source name."""
spotify_auth = authenticators["spotify"]
mxm_auth = authenticators["musixmatch"]
qqmusic_auth = authenticators.get("qqmusic")
assert isinstance(spotify_auth, SpotifyAuthenticator)
assert isinstance(mxm_auth, MusixmatchAuthenticator)
assert isinstance(qqmusic_auth, QQMusicAuthenticator)
fetchers: dict[FetcherMethodType, BaseFetcher] = {
"local": LocalFetcher(),
"cache-search": CacheSearchFetcher(cache),
"spotify": SpotifyFetcher(),
"spotify": SpotifyFetcher(spotify_auth),
"lrclib": LrclibFetcher(),
"musixmatch-spotify": MusixmatchSpotifyFetcher(),
"musixmatch-spotify": MusixmatchSpotifyFetcher(mxm_auth),
"lrclib-search": LrclibSearchFetcher(),
"netease": NeteaseFetcher(),
"qqmusic": QQMusicFetcher(),
"musixmatch": MusixmatchFetcher(),
"qqmusic": QQMusicFetcher(qqmusic_auth),
"musixmatch": MusixmatchFetcher(mxm_auth),
}
return fetchers
+83 -105
View File
@@ -6,42 +6,34 @@ Description: Musixmatch fetchers (desktop API, usertoken auth)
"""
Uses the Musixmatch desktop API (apic-desktop.musixmatch.com).
Requires MUSIXMATCH_USERTOKEN from https://curators.musixmatch.com/settings
"Copy debug info" → find UserToken.
Token and all HTTP calls are managed by MusixmatchAuthenticator.
Two fetchers:
musixmatch-spotify — direct lookup by Spotify track ID (exact, no search)
musixmatch — metadata search + multi-candidate fallback
musixmatch — metadata search + best-candidate fallback
"""
import json
from typing import Optional
from urllib.parse import urlencode
import httpx
from loguru import logger
from .base import BaseFetcher
from .selection import SearchCandidate, select_best
from ..authenticators.musixmatch import MusixmatchAuthenticator
from ..lrc import LRCData
from ..models import CacheStatus, LyricResult, TrackMeta
from ..config import (
HTTP_TIMEOUT,
MUSIXMATCH_MACRO_URL,
MUSIXMATCH_SEARCH_URL,
MUSIXMATCH_USERTOKEN,
TTL_NETWORK_ERROR,
TTL_NOT_FOUND,
)
_MXM_HEADERS = {"Cookie": "x-mxm-token-guid="}
_MXM_MACRO_BASE_PARAMS: dict[str, str] = {
"format": "json",
# Macro-specific params (format/app_id injected by authenticator)
_MXM_MACRO_PARAMS = {
"namespace": "lyrics_richsynched",
"subtitle_format": "mxm",
"optional_calls": "track.richsync",
"app_id": "web-desktop-app-v1.0",
}
@@ -96,23 +88,19 @@ def _parse_subtitle(body: str) -> Optional[str]:
async def _fetch_macro(
client: httpx.AsyncClient,
params: dict[str, str],
auth: MusixmatchAuthenticator,
params: dict,
) -> Optional[LRCData]:
"""Call macro.subtitles.get via auth.get_json.
Returns LRCData (richsync preferred over subtitle), or None when no usable
lyrics are found. Raises on HTTP/network errors.
"""
Call macro.subtitles.get with given params merged onto base params.
Returns LRCData on success (richsync preferred over subtitle),
None when the API returns no usable lyrics.
Raises on HTTP/network errors.
"""
merged = {**_MXM_MACRO_BASE_PARAMS, **params}
url = f"{MUSIXMATCH_MACRO_URL}?{urlencode(merged)}"
logger.debug(f"Musixmatch: macro call with {list(params.keys())}")
data = await auth.get_json(MUSIXMATCH_MACRO_URL, {**_MXM_MACRO_PARAMS, **params})
if data is None:
return None
resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status()
data = resp.json()
# Musixmatch returns body=[] (not {}) when the track is not found
body = data.get("message", {}).get("body", {})
if not isinstance(body, dict):
@@ -162,26 +150,26 @@ async def _fetch_macro(
class MusixmatchSpotifyFetcher(BaseFetcher):
"""Direct lookup by Spotify track ID — no search, single request."""
def __init__(self, auth: MusixmatchAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "musixmatch-spotify"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and bool(MUSIXMATCH_USERTOKEN)
return bool(track.trackid) and not self.auth.is_cooldown()
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
logger.info(f"Musixmatch-Spotify: fetching lyrics for {track.display_name()}")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
lrc = await _fetch_macro(
client,
{
"track_spotify_id": track.trackid, # type: ignore[dict-item]
"usertoken": MUSIXMATCH_USERTOKEN,
},
)
lrc = await _fetch_macro(
self.auth,
{"track_spotify_id": track.trackid}, # type: ignore[dict-item]
)
except Exception as e:
logger.error(f"Musixmatch-Spotify: fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
@@ -201,21 +189,26 @@ class MusixmatchSpotifyFetcher(BaseFetcher):
class MusixmatchFetcher(BaseFetcher):
"""Metadata search + multi-candidate fallback."""
"""Metadata search + best-candidate lyric fetch."""
def __init__(self, auth: MusixmatchAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "musixmatch"
@property
def requires_auth(self) -> str:
return "musixmatch"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and bool(MUSIXMATCH_USERTOKEN)
return bool(track.title) and not self.auth.is_cooldown()
async def _search(self, track: TrackMeta) -> tuple[Optional[int], float]:
params: dict[str, str] = {
"format": "json",
"app_id": "web-desktop-app-v1.0",
"""Search for track metadata. Raises on network/HTTP errors."""
params: dict = {
"q_track": track.title or "",
"usertoken": MUSIXMATCH_USERTOKEN,
"page_size": "10",
"f_has_lyrics": "1",
}
@@ -224,79 +217,64 @@ class MusixmatchFetcher(BaseFetcher):
if track.album:
params["q_album"] = track.album
url = f"{MUSIXMATCH_SEARCH_URL}?{urlencode(params)}"
logger.debug(f"Musixmatch: searching for '{track.display_name()}'")
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(url, headers=_MXM_HEADERS)
resp.raise_for_status()
data = resp.json()
track_list = data.get("message", {}).get("body", {}).get("track_list", [])
if not isinstance(track_list, list) or not track_list:
logger.debug("Musixmatch: search returned 0 results")
return None, 0.0
logger.debug(f"Musixmatch: search returned {len(track_list)} candidates")
candidates = [
SearchCandidate(
item=int(t["commontrack_id"]),
duration_ms=(
float(t["track_length"]) * 1000
if t.get("track_length")
else None
),
is_synced=bool(t.get("has_subtitles") or t.get("has_richsync")),
title=t.get("track_name"),
artist=t.get("artist_name"),
album=t.get("album_name"),
)
for item in track_list
if isinstance(item, dict)
and isinstance(t := item.get("track", {}), dict)
and isinstance(t.get("commontrack_id"), int)
and not t.get("instrumental")
]
best_id, confidence = select_best(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if best_id is not None:
logger.debug(
f"Musixmatch: best candidate id={best_id} ({confidence:.0f})"
)
else:
logger.debug("Musixmatch: no suitable candidate found")
return best_id, confidence
except Exception as e:
logger.error(f"Musixmatch: search failed: {e}")
data = await self.auth.get_json(MUSIXMATCH_SEARCH_URL, params)
if data is None:
return None, 0.0
track_list = data.get("message", {}).get("body", {}).get("track_list", [])
if not isinstance(track_list, list) or not track_list:
logger.debug("Musixmatch: search returned 0 results")
return None, 0.0
logger.debug(f"Musixmatch: search returned {len(track_list)} candidates")
candidates = [
SearchCandidate(
item=int(t["commontrack_id"]),
duration_ms=(
float(t["track_length"]) * 1000 if t.get("track_length") else None
),
is_synced=bool(t.get("has_subtitles") or t.get("has_richsync")),
title=t.get("track_name"),
artist=t.get("artist_name"),
album=t.get("album_name"),
)
for item in track_list
if isinstance(item, dict)
and isinstance(t := item.get("track", {}), dict)
and isinstance(t.get("commontrack_id"), int)
and not t.get("instrumental")
]
best_id, confidence = select_best(
candidates,
track.length,
title=track.title,
artist=track.artist,
album=track.album,
)
if best_id is not None:
logger.debug(f"Musixmatch: best candidate id={best_id} ({confidence:.0f})")
else:
logger.debug("Musixmatch: no suitable candidate found")
return best_id, confidence
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
logger.info(f"Musixmatch: fetching lyrics for {track.display_name()}")
commontrack_id, confidence = await self._search(track)
if commontrack_id is None:
logger.debug(f"Musixmatch: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
lrc = await _fetch_macro(
client,
{
"commontrack_id": str(commontrack_id),
"usertoken": MUSIXMATCH_USERTOKEN,
},
)
commontrack_id, confidence = await self._search(track)
if commontrack_id is None:
logger.debug(f"Musixmatch: no match found for {track.display_name()}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrc = await _fetch_macro(
self.auth,
{"commontrack_id": str(commontrack_id)},
)
except Exception as e:
logger.error(f"Musixmatch: fetch failed: {e}")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+9 -6
View File
@@ -25,17 +25,20 @@ from ..config import (
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
MULTI_CANDIDATE_DELAY_S,
QQ_MUSIC_API_URL,
)
from ..authenticators import QQMusicAuthenticator
class QQMusicFetcher(BaseFetcher):
def __init__(self, auth: QQMusicAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "qqmusic"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.title) and bool(QQ_MUSIC_API_URL)
return bool(track.title) and self.auth.is_configured()
async def _search(
self, track: TrackMeta, limit: int = 10
@@ -49,7 +52,7 @@ class QQMusicFetcher(BaseFetcher):
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(
f"{QQ_MUSIC_API_URL}/api/search",
f"{self.auth.authenticate()}/api/search",
params={"keyword": query, "type": "song", "num": limit},
)
resp.raise_for_status()
@@ -108,7 +111,7 @@ class QQMusicFetcher(BaseFetcher):
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.get(
f"{QQ_MUSIC_API_URL}/api/lyric",
f"{self.auth.authenticate()}/api/lyric",
params={"mid": mid},
)
resp.raise_for_status()
@@ -144,8 +147,8 @@ class QQMusicFetcher(BaseFetcher):
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
if not QQ_MUSIC_API_URL:
logger.debug("QQMusic: skipped — QQ_MUSIC_API_URL not configured")
if not self.auth.is_configured():
logger.debug("QQMusic: skipped — Auth not configured")
return None
query = f"{track.artist or ''} {track.title or ''}".strip()
+6 -217
View File
@@ -4,41 +4,19 @@ Date: 2026-03-25 10:43:21
Description: Spotify fetcher — obtains synced lyrics via Spotify's internal color-lyrics API.
"""
"""
Authentication flow:
1. Fetch server time from Spotify
2. Fetch TOTP secret
3. Generate a TOTP code and exchange it (with SP_DC cookie) for an access token
4. Request lyrics using the access token
The secret and token are cached on the instance to avoid redundant network
calls within the same session.
Requires SPOTIFY_SP_DC environment variable to be set.
"""
import httpx
import json
import time
import struct
import hmac
import hashlib
from typing import Optional, Tuple
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from ..authenticators.spotify import SpotifyAuthenticator
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
TTL_NETWORK_ERROR,
SPOTIFY_TOKEN_URL,
SPOTIFY_LYRICS_URL,
SPOTIFY_SERVER_TIME_URL,
SPOTIFY_SECRET_URL,
SPOTIFY_SP_DC,
SPOTIFY_TOKEN_CACHE_FILE,
UA_BROWSER,
)
@@ -51,72 +29,18 @@ _SPOTIFY_BASE_HEADERS = {
class SpotifyFetcher(BaseFetcher):
def __init__(self) -> None:
# Session-level caches to avoid refetching within the same run
self._cached_secret: Optional[Tuple[str, int]] = None
self._cached_token: Optional[str] = None
self._token_expires_at: float = 0.0
def __init__(self, auth: SpotifyAuthenticator) -> None:
self.auth = auth
@property
def source_name(self) -> str:
return "spotify"
def is_available(self, track: TrackMeta) -> bool:
return bool(track.trackid) and bool(SPOTIFY_SP_DC)
@staticmethod
def _generate_totp(server_time_s: int, secret: str) -> str:
"""Generate a 6-digit TOTP code compatible with Spotify's auth.
Uses HMAC-SHA1 with a 30-second period, matching the Go reference.
"""
counter = server_time_s // 30
counter_bytes = struct.pack(">Q", counter)
mac = hmac.new(secret.encode(), counter_bytes, hashlib.sha1).digest()
offset = mac[-1] & 0x0F
binary_code = (
(mac[offset] & 0x7F) << 24
| (mac[offset + 1] & 0xFF) << 16
| (mac[offset + 2] & 0xFF) << 8
| (mac[offset + 3] & 0xFF)
)
code = binary_code % (10**6)
return str(code).zfill(6)
def _load_cached_token(self) -> Optional[str]:
"""Try to load a valid token from the persistent cache file."""
try:
with open(SPOTIFY_TOKEN_CACHE_FILE, "r") as f:
data = json.load(f)
expires_ms = data.get("accessTokenExpirationTimestampMs", 0)
if expires_ms <= int(time.time() * 1000):
logger.debug("Spotify: persisted token expired")
return None
token = data.get("accessToken", "")
if not token:
return None
self._cached_token = token
self._token_expires_at = expires_ms / 1000.0
logger.debug("Spotify: loaded token from cache file")
return token
except (FileNotFoundError, json.JSONDecodeError, KeyError):
return None
def _save_token(self, body: dict) -> None:
"""Persist the token response to disk."""
try:
with open(SPOTIFY_TOKEN_CACHE_FILE, "w") as f:
json.dump(body, f)
logger.debug("Spotify: token saved to cache file")
except Exception as e:
logger.warning(f"Spotify: failed to write token cache: {e}")
return bool(track.trackid) and self.auth.is_configured()
@staticmethod
def _format_lrc_line(start_ms: int, words: str) -> str:
"""Format a single lyric line as LRC ``[mm:ss.cc]text``."""
minutes = start_ms // 60000
seconds = (start_ms // 1000) % 60
centiseconds = round((start_ms % 1000) / 10.0)
@@ -124,7 +48,6 @@ class SpotifyFetcher(BaseFetcher):
@staticmethod
def _is_truly_synced(lines: list[dict]) -> bool:
"""Check if lyrics are actually synced (not all timestamps zero)."""
for line in lines:
try:
ms = int(line.get("startTimeMs", "0"))
@@ -134,140 +57,6 @@ class SpotifyFetcher(BaseFetcher):
continue
return False
async def _get_server_time(self, client: httpx.AsyncClient) -> Optional[int]:
try:
res = await client.get(SPOTIFY_SERVER_TIME_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, dict) or "serverTime" not in data:
logger.error(f"Spotify: unexpected server-time response: {data}")
return None
server_time = data["serverTime"]
logger.debug(f"Spotify: server time = {server_time}")
return server_time
except Exception as e:
logger.error(f"Spotify: failed to fetch server time: {e}")
return None
async def _get_secret(self, client: httpx.AsyncClient) -> Optional[Tuple[str, int]]:
if self._cached_secret is not None:
logger.debug("Spotify: using cached TOTP secret")
return self._cached_secret
try:
res = await client.get(SPOTIFY_SECRET_URL, timeout=HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if not isinstance(data, list) or len(data) == 0:
logger.error(
f"Spotify: unexpected secrets response (type={type(data).__name__}, len={len(data) if isinstance(data, list) else '?'})"
)
return None
last = data[-1]
if "secret" not in last or "version" not in last:
logger.error(f"Spotify: malformed secret entry: {list(last.keys())}")
return None
secret_raw = last["secret"]
version = last["version"]
parts = []
for i, char in enumerate(secret_raw):
parts.append(str(ord(char) ^ ((i % 33) + 9)))
secret = "".join(parts)
logger.debug(f"Spotify: decoded secret v{version} (len={len(secret)})")
self._cached_secret = (secret, version)
return self._cached_secret
except Exception as e:
logger.error(f"Spotify: failed to fetch secret: {e}")
return None
async def _get_token(self) -> Optional[str]:
if self._cached_token and time.time() < self._token_expires_at - 30:
logger.debug("Spotify: using in-memory cached token")
return self._cached_token
disk_token = self._load_cached_token()
if disk_token and time.time() < self._token_expires_at - 30:
return disk_token
if not SPOTIFY_SP_DC:
logger.error(
"Spotify: SPOTIFY_SP_DC env var not set — cannot authenticate with Spotify"
)
return None
headers = {
"User-Agent": UA_BROWSER,
"Accept": "*/*",
"Cookie": f"sp_dc={SPOTIFY_SP_DC}",
**_SPOTIFY_BASE_HEADERS,
}
async with httpx.AsyncClient(headers=headers) as client:
server_time = await self._get_server_time(client)
if server_time is None:
return None
secret_data = await self._get_secret(client)
if secret_data is None:
return None
secret, version = secret_data
totp = self._generate_totp(server_time, secret)
logger.debug(f"Spotify: generated TOTP v{version}: {totp}")
params = {
"reason": "init",
"productType": "web-player",
"totp": totp,
"totpVer": str(version),
"totpServer": totp,
}
try:
res = await client.get(
SPOTIFY_TOKEN_URL, params=params, timeout=HTTP_TIMEOUT
)
if res.status_code != 200:
logger.error(f"Spotify: token request returned {res.status_code}")
return None
body = res.json()
if not isinstance(body, dict) or "accessToken" not in body:
logger.error(
f"Spotify: unexpected token response keys: {list(body.keys()) if isinstance(body, dict) else type(body).__name__}"
)
return None
token = body["accessToken"]
is_anonymous = body.get("isAnonymous", False)
if is_anonymous:
logger.warning(
"Spotify: received anonymous token — SP_DC may be invalid"
)
expires_ms = body.get("accessTokenExpirationTimestampMs", 0)
if expires_ms and expires_ms > int(time.time() * 1000):
self._token_expires_at = expires_ms / 1000.0
else:
logger.warning("Spotify: token expiry missing or invalid")
self._token_expires_at = time.time() + 3600
self._cached_token = token
self._save_token(body)
logger.debug("Spotify: obtained access token")
return token
except Exception as e:
logger.error(f"Spotify: token request failed: {e}")
return None
async def fetch(
self, track: TrackMeta, bypass_cache: bool = False
) -> Optional[LyricResult]:
@@ -277,7 +66,7 @@ class SpotifyFetcher(BaseFetcher):
logger.info(f"Spotify: fetching lyrics for trackid={track.trackid}")
token = await self._get_token()
token = await self.auth.authenticate()
if not token:
logger.error("Spotify: cannot fetch lyrics without a token")
return LyricResult(status=CacheStatus.NETWORK_ERROR, ttl=TTL_NETWORK_ERROR)
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "lrx-cli"
version = "0.4.6"
version = "0.5.0"
description = "Fetch line-synced lyrics for your music player."
readme = "README.md"
requires-python = ">=3.13"
+42
View File
@@ -334,6 +334,48 @@ def test_update_confidence_returns_zero_for_empty_track(
assert cache_db.update_confidence(empty, 50.0, "s1") == 0
def test_credential_set_and_get_roundtrip(cache_db: CacheEngine) -> None:
cache_db.set_credential("spotify", {"access_token": "tok", "expires_in": 3600})
result = cache_db.get_credential("spotify")
assert result == {"access_token": "tok", "expires_in": 3600}
def test_credential_get_returns_none_on_miss(cache_db: CacheEngine) -> None:
assert cache_db.get_credential("nonexistent") is None
def test_credential_expires_at_respected(
monkeypatch: pytest.MonkeyPatch, cache_db: CacheEngine
) -> None:
# Store with expiry 1000 ms in the future
now_ms = 5_000_000_000
monkeypatch.setattr("lrx_cli.cache.time.time", lambda: now_ms / 1000)
cache_db.set_credential(
"musixmatch", {"user_token": "abc"}, expires_at_ms=now_ms + 1000
)
# Still valid
assert cache_db.get_credential("musixmatch") == {"user_token": "abc"}
# Advance past expiry
monkeypatch.setattr("lrx_cli.cache.time.time", lambda: (now_ms + 2000) / 1000)
assert cache_db.get_credential("musixmatch") is None
def test_credential_no_expiry_never_expires(
monkeypatch: pytest.MonkeyPatch, cache_db: CacheEngine
) -> None:
cache_db.set_credential("spotify", {"token": "forever"}, expires_at_ms=None)
monkeypatch.setattr("lrx_cli.cache.time.time", lambda: 9_999_999_999.0)
assert cache_db.get_credential("spotify") == {"token": "forever"}
def test_credential_set_overwrites_existing(cache_db: CacheEngine) -> None:
cache_db.set_credential("spotify", {"token": "old"})
cache_db.set_credential("spotify", {"token": "new"})
assert cache_db.get_credential("spotify") == {"token": "new"}
def test_query_track_and_stats_return_expected_aggregates(
cache_db: CacheEngine,
) -> None:
Generated
+1 -1
View File
@@ -153,7 +153,7 @@ wheels = [
[[package]]
name = "lrx-cli"
version = "0.4.5"
version = "0.5.0"
source = { editable = "." }
dependencies = [
{ name = "cyclopts" },