Compare commits

...

2 Commits

Author SHA1 Message Date
Uyanide 59c6b711f4 fix: --no-cache now also skips cache writing 2026-04-02 03:56:04 +02:00
Uyanide c864da8187 refactor: add LRCData class 2026-04-02 03:56:04 +02:00
15 changed files with 260 additions and 221 deletions
+4 -3
View File
@@ -12,6 +12,7 @@ import unicodedata
from typing import Optional from typing import Optional
from loguru import logger from loguru import logger
from .lrc import LRCData
from .config import DURATION_TOLERANCE_MS from .config import DURATION_TOLERANCE_MS
from .models import TrackMeta, LyricResult, CacheStatus from .models import TrackMeta, LyricResult, CacheStatus
@@ -161,7 +162,7 @@ class CacheEngine:
) )
return LyricResult( return LyricResult(
status=CacheStatus(status_str), status=CacheStatus(status_str),
lyrics=lyrics, lyrics=LRCData(lyrics) if lyrics else None,
source=src, source=src,
ttl=remaining, ttl=remaining,
) )
@@ -212,7 +213,7 @@ class CacheEngine:
key, key,
source, source,
result.status.value, result.status.value,
result.lyrics, str(result.lyrics) if result.lyrics else None,
now, now,
expires_at, expires_at,
track.artist, track.artist,
@@ -316,7 +317,7 @@ class CacheEngine:
row = dict(rows[0]) row = dict(rows[0])
return LyricResult( return LyricResult(
status=CacheStatus(row["status"]), status=CacheStatus(row["status"]),
lyrics=row["lyrics"], lyrics=LRCData(row["lyrics"]) if row["lyrics"] else None,
source="cache-search", source="cache-search",
) )
+5 -5
View File
@@ -18,7 +18,7 @@ from .models import TrackMeta, CacheStatus
from .mpris import get_current_track from .mpris import get_current_track
from .core import LrcManager from .core import LrcManager
from .fetchers import FetcherMethodType from .fetchers import FetcherMethodType
from .lrc import get_sidecar_path, print_lyrics, to_plain from .lrc import get_sidecar_path
app = cyclopts.App( app = cyclopts.App(
@@ -120,7 +120,7 @@ def fetch(
logger.error("Only unsynced lyrics available (--only-synced requested).") logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1) sys.exit(1)
print_lyrics(result.lyrics, plain=plain) result.lyrics.print_lyrics(plain=plain)
# search # search
@@ -208,7 +208,7 @@ def search(
logger.error("Only unsynced lyrics available (--only-synced requested).") logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1) sys.exit(1)
print_lyrics(result.lyrics, plain=plain) result.lyrics.print_lyrics(plain=plain)
# export # export
@@ -282,9 +282,9 @@ def export(
try: try:
with open(output, "w", encoding="utf-8") as f: with open(output, "w", encoding="utf-8") as f:
if plain: if plain:
f.write(to_plain(result.lyrics)) f.write(result.lyrics.to_plain())
else: else:
f.write(result.lyrics) f.write(str(result.lyrics))
logger.info(f"Exported lyrics to {output}") logger.info(f"Exported lyrics to {output}")
except Exception as e: except Exception as e:
logger.error(f"Failed to write file: {e}") logger.error(f"Failed to write file: {e}")
+6 -6
View File
@@ -18,7 +18,7 @@ from loguru import logger
from .fetchers import FetcherMethodType, create_fetchers from .fetchers import FetcherMethodType, create_fetchers
from .fetchers.base import BaseFetcher from .fetchers.base import BaseFetcher
from .cache import CacheEngine from .cache import CacheEngine
from .lrc import normalize_tags, normalize_unsynced, detect_sync_status from .lrc import LRCData
from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR
from .models import TrackMeta, LyricResult, CacheStatus from .models import TrackMeta, LyricResult, CacheStatus
from .enrichers import enrich_track from .enrichers import enrich_track
@@ -121,7 +121,7 @@ class LrcManager:
continue continue
# Cache the result (skip for self-cached fetchers) # Cache the result (skip for self-cached fetchers)
if not fetcher.self_cached: if not fetcher.self_cached and not bypass_cache:
ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND) ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND)
self.cache.set(track, source, result, ttl_seconds=ttl) self.cache.set(track, source, result, ttl_seconds=ttl)
@@ -146,7 +146,7 @@ class LrcManager:
): ):
best_result = LyricResult( best_result = LyricResult(
status=best_result.status, status=best_result.status,
lyrics=normalize_unsynced(best_result.lyrics), lyrics=best_result.lyrics.normalize_unsynced(),
source=best_result.source, source=best_result.source,
ttl=best_result.ttl, ttl=best_result.ttl,
) )
@@ -167,10 +167,10 @@ class LrcManager:
"""Manually insert lyrics into the cache for a track.""" """Manually insert lyrics into the cache for a track."""
track = enrich_track(track) track = enrich_track(track)
logger.info(f"Manually inserting lyrics for: {track.display_name()}") logger.info(f"Manually inserting lyrics for: {track.display_name()}")
lyrics = normalize_tags(lyrics) lrc = LRCData(lyrics)
result = LyricResult( result = LyricResult(
status=detect_sync_status(lyrics), status=lrc.detect_sync_status(),
lyrics=normalize_tags(lyrics), lyrics=lrc,
source="manual", source="manual",
ttl=None, ttl=None,
) )
+3 -1
View File
@@ -13,9 +13,11 @@ albums or is played from different players.
from typing import Optional from typing import Optional
from loguru import logger from loguru import logger
from .base import BaseFetcher from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..cache import CacheEngine from ..cache import CacheEngine
from ..lrc import LRCData
class CacheSearchFetcher(BaseFetcher): class CacheSearchFetcher(BaseFetcher):
@@ -80,6 +82,6 @@ class CacheSearchFetcher(BaseFetcher):
) )
return LyricResult( return LyricResult(
status=status, status=status,
lyrics=best["lyrics"], lyrics=LRCData(best["lyrics"]),
source=self.source_name, source=self.source_name,
) )
+15 -9
View File
@@ -17,7 +17,7 @@ from mutagen.flac import FLAC
from .base import BaseFetcher from .base import BaseFetcher
from ..models import TrackMeta, LyricResult from ..models import TrackMeta, LyricResult
from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path from ..lrc import get_audio_path, get_sidecar_path, LRCData
class LocalFetcher(BaseFetcher): class LocalFetcher(BaseFetcher):
@@ -48,11 +48,15 @@ class LocalFetcher(BaseFetcher):
with open(lrc_path, "r", encoding="utf-8") as f: with open(lrc_path, "r", encoding="utf-8") as f:
content = f.read().strip() content = f.read().strip()
if content: if content:
content = normalize_tags(content) lrc = LRCData(content)
status = detect_sync_status(content) status = lrc.detect_sync_status()
logger.info(f"Local: found .lrc sidecar ({status.value})") logger.info(
f"Local: found .lrc sidecar ({status.value}) for {audio_path.name}"
)
return LyricResult( return LyricResult(
status=status, lyrics=content, source=self.source_name status=status,
lyrics=lrc,
source=self.source_name,
) )
except Exception as e: except Exception as e:
logger.error(f"Local: error reading {lrc_path}: {e}") logger.error(f"Local: error reading {lrc_path}: {e}")
@@ -81,12 +85,14 @@ class LocalFetcher(BaseFetcher):
break break
if lyrics: if lyrics:
lyrics = normalize_tags(lyrics.strip()) lrc = LRCData(lyrics)
status = detect_sync_status(lyrics) status = lrc.detect_sync_status()
logger.info(f"Local: found embedded lyrics ({status.value})") logger.info(
f"Local: found embedded lyrics ({status.value}) for {audio_path.name}"
)
return LyricResult( return LyricResult(
status=status, status=status,
lyrics=lyrics, lyrics=lrc,
source=f"{self.source_name} (embedded)", source=f"{self.source_name} (embedded)",
) )
else: else:
+5 -9
View File
@@ -15,7 +15,7 @@ from urllib.parse import urlencode
from .base import BaseFetcher from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, HTTP_TIMEOUT,
TTL_UNSYNCED, TTL_UNSYNCED,
@@ -79,20 +79,16 @@ class LrclibFetcher(BaseFetcher):
unsynced = data.get("plainLyrics") unsynced = data.get("plainLyrics")
if isinstance(synced, str) and synced.strip(): if isinstance(synced, str) and synced.strip():
lyrics = normalize_tags(synced.strip()) lyrics = LRCData(synced)
logger.info( logger.info(f"LRCLIB: got synced lyrics ({len(lyrics)} lines)")
f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult( return LyricResult(
status=CacheStatus.SUCCESS_SYNCED, status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics, lyrics=lyrics,
source=self.source_name, source=self.source_name,
) )
elif isinstance(unsynced, str) and unsynced.strip(): elif isinstance(unsynced, str) and unsynced.strip():
lyrics = normalize_tags(unsynced.strip()) lyrics = LRCData(unsynced)
logger.info( logger.info(f"LRCLIB: got unsynced lyrics ({len(lyrics)} lines)")
f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult( return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED, status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics, lyrics=lyrics,
+5 -9
View File
@@ -16,7 +16,7 @@ from urllib.parse import urlencode
from .base import BaseFetcher from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, HTTP_TIMEOUT,
TTL_UNSYNCED, TTL_UNSYNCED,
@@ -82,20 +82,16 @@ class LrclibSearchFetcher(BaseFetcher):
unsynced = best.get("plainLyrics") unsynced = best.get("plainLyrics")
if isinstance(synced, str) and synced.strip(): if isinstance(synced, str) and synced.strip():
lyrics = normalize_tags(synced.strip()) lyrics = LRCData(synced)
logger.info( logger.info(f"LRCLIB-search: got synced lyrics ({len(lyrics)} lines)")
f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult( return LyricResult(
status=CacheStatus.SUCCESS_SYNCED, status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics, lyrics=lyrics,
source=self.source_name, source=self.source_name,
) )
elif isinstance(unsynced, str) and unsynced.strip(): elif isinstance(unsynced, str) and unsynced.strip():
lyrics = normalize_tags(unsynced.strip()) lyrics = LRCData(unsynced)
logger.info( logger.info(f"LRCLIB-search: got unsynced lyrics ({len(lyrics)} lines)")
f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
)
return LyricResult( return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED, status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics, lyrics=lyrics,
+5 -7
View File
@@ -18,7 +18,7 @@ from loguru import logger
from .base import BaseFetcher from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import detect_sync_status, normalize_tags from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, HTTP_TIMEOUT,
TTL_NOT_FOUND, TTL_NOT_FOUND,
@@ -181,15 +181,13 @@ class NeteaseFetcher(BaseFetcher):
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
# Determine sync status # Determine sync status
lrc = normalize_tags(lrc) lrcdata = LRCData(lrc)
status = detect_sync_status(lrc) status = lrcdata.detect_sync_status()
logger.info( logger.info(
f"Netease: got {status.value} lyrics for song_id={song_id} " f"Netease: got {status.value} lyrics for song_id={song_id} "
f"({len(lrc.splitlines())} lines)" f"({len(lrcdata)} lines)"
)
return LyricResult(
status=status, lyrics=lrc.strip(), source=self.source_name
) )
return LyricResult(status=status, lyrics=lrcdata, source=self.source_name)
except Exception as e: except Exception as e:
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}") logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
+5 -7
View File
@@ -17,7 +17,7 @@ from loguru import logger
from .base import BaseFetcher from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import detect_sync_status, normalize_tags from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, HTTP_TIMEOUT,
TTL_NOT_FOUND, TTL_NOT_FOUND,
@@ -142,15 +142,13 @@ class QQMusicFetcher(BaseFetcher):
logger.debug(f"QQMusic: empty lyrics for mid={mid}") logger.debug(f"QQMusic: empty lyrics for mid={mid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND) return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrc = normalize_tags(lrc) lrcdata = LRCData(lrc)
status = detect_sync_status(lrc) status = lrcdata.detect_sync_status()
logger.info( logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} " f"QQMusic: got {status.value} lyrics for mid={mid} "
f"({len(lrc.splitlines())} lines)" f"({len(lrcdata)} lines)"
)
return LyricResult(
status=status, lyrics=lrc.strip(), source=self.source_name
) )
return LyricResult(status=status, lyrics=lrcdata, source=self.source_name)
except Exception as e: except Exception as e:
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}") logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
+2 -2
View File
@@ -28,7 +28,7 @@ from loguru import logger
from .base import BaseFetcher from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags from ..lrc import LRCData
from ..config import ( from ..config import (
HTTP_TIMEOUT, HTTP_TIMEOUT,
SPOTIFY_APP_VERSION, SPOTIFY_APP_VERSION,
@@ -358,7 +358,7 @@ class SpotifyFetcher(BaseFetcher):
# Unsynced: emit with zero timestamps # Unsynced: emit with zero timestamps
lrc_lines.append(f"[00:00.00]{words}") lrc_lines.append(f"[00:00.00]{words}")
content = normalize_tags("\n".join(lrc_lines)) content = LRCData("\n".join(lrc_lines))
status = ( status = (
CacheStatus.SUCCESS_SYNCED CacheStatus.SUCCESS_SYNCED
if is_synced if is_synced
+94 -63
View File
@@ -70,7 +70,7 @@ def _sanitize_lyric_text(text: str) -> str:
return _remove_pattern(text, _WORD_SYNC_TAG_RE) return _remove_pattern(text, _WORD_SYNC_TAG_RE)
def _reformat(text: str) -> str: def _reformat(text: str) -> list[str]:
"""Parse each line and reformat to standard [mm:ss.cc]...content form. """Parse each line and reformat to standard [mm:ss.cc]...content form.
Handles any mix of time tag formats on input. Lines with no time tags Handles any mix of time tag formats on input. Lines with no time tags
@@ -99,21 +99,54 @@ def _reformat(text: str) -> str:
else: else:
out.append(line) out.append(line)
# Empty lines with no tags are also preserved # Empty lines with no tags are also preserved
return "\n".join(out)
# Remove empty lines at the start and end of the whole text, but preserve blank lines in the middle
while out and not out[0].strip():
out.pop(0)
while out and not out[-1].strip():
out.pop()
return out
def _apply_offset(text: str) -> str: class LRCData:
_lines: list[str]
def __init__(self, text: str | None = None) -> None:
if not text:
self._lines = []
return
self._lines = _reformat(text)
self._apply_offset()
def __str__(self) -> str:
return "\n".join(self._lines)
def __repr__(self) -> str:
return f"LRCData(lines={self._lines!r})"
def __bool__(self) -> bool:
return len(self._lines) > 0
def __len__(self) -> int:
return len(self._lines)
def _apply_offset(self):
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly. """Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps). Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
""" """
m = _OFFSET_RE.search(text) m: Optional[re.Match] = None
for i, line in enumerate(self._lines):
m = _OFFSET_RE.search(line)
if m:
self._lines.pop(i)
break
if not m: if not m:
return text return
offset_ms = int(m.group(1)) offset_ms = int(m.group(1))
text = _OFFSET_RE.sub("", text).strip("\n")
if offset_ms == 0: if offset_ms == 0:
return text return
def _shift(match: re.Match) -> str: def _shift(match: re.Match) -> str:
total_ms = max( total_ms = max(
@@ -127,34 +160,31 @@ def _apply_offset(text: str) -> str:
new_cs = min(round((total_ms % 1000) / 10), 99) new_cs = min(round((total_ms % 1000) / 10), 99)
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]" return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
return _STD_TAG_CAPTURE_RE.sub(_shift, text) self._lines = [_STD_TAG_CAPTURE_RE.sub(_shift, line) for line in self._lines]
def is_synced(self) -> bool:
def normalize_tags(text: str) -> str:
"""Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset."""
return _apply_offset(_reformat(text))
def is_synced(text: str) -> bool:
"""Check whether text contains non-zero LRC time tags. """Check whether text contains non-zero LRC time tags.
Assumes text has been normalized by normalize (standard [mm:ss.cc] format). Assumes text has been normalized by normalize (standard [mm:ss.cc] format).
""" """
tags = _STD_TAG_RE.findall(text) for line in self._lines:
return bool(tags) and any(tag != "[00:00.00]" for tag in tags) for m in _STD_TAG_CAPTURE_RE.finditer(line):
if m.group(1) != "00" or m.group(2) != "00" or m.group(3) != "00":
return True
return False
def detect_sync_status(self) -> CacheStatus:
def detect_sync_status(text: str) -> CacheStatus:
"""Determine whether lyrics contain meaningful LRC time tags. """Determine whether lyrics contain meaningful LRC time tags.
Assumes text has been normalized by normalize. Assumes text has been normalized by normalize.
""" """
return ( return (
CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED CacheStatus.SUCCESS_SYNCED
if self.is_synced()
else CacheStatus.SUCCESS_UNSYNCED
) )
def normalize_unsynced(self):
def normalize_unsynced(lyrics: str) -> str:
"""Normalize unsynced lyrics so every line has a [00:00.00] tag. """Normalize unsynced lyrics so every line has a [00:00.00] tag.
Assumes lyrics have been normalized by normalize. Assumes lyrics have been normalized by normalize.
@@ -164,7 +194,7 @@ def normalize_unsynced(lyrics: str) -> str:
""" """
out: list[str] = [] out: list[str] = []
first = True first = True
for line in lyrics.splitlines(): for i, line in enumerate(self._lines):
stripped = line.strip() stripped = line.strip()
if not stripped and not first: if not stripped and not first:
out.append("[00:00.00]") out.append("[00:00.00]")
@@ -175,39 +205,12 @@ def normalize_unsynced(lyrics: str) -> str:
first = False first = False
cleaned = _remove_pattern(line, _LINE_START_STD_TAGS_RE) cleaned = _remove_pattern(line, _LINE_START_STD_TAGS_RE)
out.append(f"[00:00.00]{cleaned}") out.append(f"[00:00.00]{cleaned}")
return "\n".join(out) ret = LRCData()
ret._lines = out
return ret
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
"""Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist."""
if not audio_url.startswith("file://"):
return None
file_path = unquote(audio_url.replace("file://", "", 1))
path = Path(file_path)
if ensure_exists and not path.exists():
return None
return path
def get_sidecar_path(
audio_url: str, ensure_audio_exists: bool = False, ensure_exists: bool = False
) -> Optional[Path]:
"""Given a file:// URL, return the corresponding .lrc sidecar path.
If ensure_audio_exists is True, return None if the audio file does not exist.
If ensure_exists is True, return None if the .lrc file does not exist.
"""
audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists)
if not audio_path:
return None
lrc_path = audio_path.with_suffix(".lrc")
if ensure_exists and not lrc_path.exists():
return None
return lrc_path
def to_plain( def to_plain(
text: str, self,
deduplicate: bool = False, deduplicate: bool = False,
) -> str: ) -> str:
"""Convert lyrics to plain text with all tags stripped. """Convert lyrics to plain text with all tags stripped.
@@ -217,12 +220,13 @@ def to_plain(
Assumes text has been normalized by normalize. Assumes text has been normalized by normalize.
""" """
if not is_synced(text): if not self.is_synced():
# If there are no meaningful time tags, just strip all tags and return return "\n".join(
return _remove_pattern(text, _LINE_START_TAGS_RE) _remove_pattern(line, _LINE_START_TAGS_RE) for line in self._lines
).strip("\n")
lines = [] lines = []
for line in text.splitlines(): for line in self._lines:
pos = 0 pos = 0
cnt = 0 cnt = 0
plain_line = "" plain_line = ""
@@ -254,11 +258,10 @@ def to_plain(
prev_line = line prev_line = line
lines = deduped_lines lines = deduped_lines
return "\n".join(lines).strip("\n") return "\n".join(lines).strip()
def print_lyrics( def print_lyrics(
text: str, self,
plain: bool = False, plain: bool = False,
) -> None: ) -> None:
"""Print lyrics, optionally stripping tags. """Print lyrics, optionally stripping tags.
@@ -266,6 +269,34 @@ def print_lyrics(
Assumes text has been normalized by normalize. Assumes text has been normalized by normalize.
""" """
if plain: if plain:
print(to_plain(text)) print(self.to_plain())
else: else:
print(text) print("\n".join(self._lines))
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
"""Convert file:// URL to Path, return None if invalid or (if ensure_exists) file doesn't exist."""
if not audio_url.startswith("file://"):
return None
file_path = unquote(audio_url.replace("file://", "", 1))
path = Path(file_path)
if ensure_exists and not path.exists():
return None
return path
def get_sidecar_path(
audio_url: str, ensure_audio_exists: bool = False, ensure_exists: bool = False
) -> Optional[Path]:
"""Given a file:// URL, return the corresponding .lrc sidecar path.
If ensure_audio_exists is True, return None if the audio file does not exist.
If ensure_exists is True, return None if the .lrc file does not exist.
"""
audio_path = get_audio_path(audio_url, ensure_exists=ensure_audio_exists)
if not audio_path:
return None
lrc_path = audio_path.with_suffix(".lrc")
if ensure_exists and not lrc_path.exists():
return None
return lrc_path
+7 -2
View File
@@ -4,10 +4,15 @@ Date: 2026-03-25 04:09:36
Description: Data models Description: Data models
""" """
from __future__ import annotations
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional, TYPE_CHECKING
from dataclasses import dataclass from dataclasses import dataclass
if TYPE_CHECKING:
from .lrc import LRCData
class CacheStatus(str, Enum): class CacheStatus(str, Enum):
"""Status of a cached lyric entry.""" """Status of a cached lyric entry."""
@@ -54,6 +59,6 @@ class LyricResult:
"""Result of a lyric fetch attempt, also used as cache record.""" """Result of a lyric fetch attempt, also used as cache record."""
status: CacheStatus status: CacheStatus
lyrics: Optional[str] = None lyrics: Optional[LRCData] = None
source: Optional[str] = None # Which fetcher produced this result source: Optional[str] = None # Which fetcher produced this result
ttl: Optional[int] = None # Hint for cache TTL (seconds) ttl: Optional[int] = None # Hint for cache TTL (seconds)
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "lrx-cli" name = "lrx-cli"
version = "0.2.0" version = "0.2.1"
description = "Fetch line-synced lyrics for your music player." description = "Fetch line-synced lyrics for your music player."
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
+3 -3
View File
@@ -100,7 +100,7 @@ def test_set_and_get_roundtrip_with_ttl(
assert cached is not None assert cached is not None
assert cached.status is CacheStatus.SUCCESS_SYNCED assert cached.status is CacheStatus.SUCCESS_SYNCED
assert cached.lyrics == "[00:01.00]line" assert str(cached.lyrics) == "[00:01.00]line"
assert cached.source == "lrclib" assert cached.source == "lrclib"
assert cached.ttl == 120 assert cached.ttl == 120
@@ -181,7 +181,7 @@ def test_get_best_prefers_synced_over_unsynced_and_negative(
assert best is not None assert best is not None
assert best.status is CacheStatus.SUCCESS_SYNCED assert best.status is CacheStatus.SUCCESS_SYNCED
assert best.lyrics == "synced" assert str(best.lyrics) == "synced"
def test_clear_track_and_clear_all_affect_expected_rows(cache_db: CacheEngine) -> None: def test_clear_track_and_clear_all_affect_expected_rows(cache_db: CacheEngine) -> None:
@@ -239,7 +239,7 @@ def test_find_best_positive_uses_exact_match_and_prefers_synced(
assert best is not None assert best is not None
assert best.status is CacheStatus.SUCCESS_SYNCED assert best.status is CacheStatus.SUCCESS_SYNCED
assert best.lyrics == "s" assert str(best.lyrics) == "s"
# find_best_positive always reports cache-search source # find_best_positive always reports cache-search source
assert best.source == "cache-search" assert best.source == "cache-search"
+34 -28
View File
@@ -1,15 +1,13 @@
from __future__ import annotations from __future__ import annotations
from lrx_cli.lrc import ( from lrx_cli.lrc import LRCData
detect_sync_status,
is_synced,
normalize_tags,
normalize_unsynced,
to_plain,
)
from lrx_cli.models import CacheStatus from lrx_cli.models import CacheStatus
def _normalize(text: str) -> str:
return str(LRCData(text))
def test_normalize_tags_supports_all_raw_time_formats() -> None: def test_normalize_tags_supports_all_raw_time_formats() -> None:
raw = "\n".join( raw = "\n".join(
[ [
@@ -21,7 +19,7 @@ def test_normalize_tags_supports_all_raw_time_formats() -> None:
] ]
) )
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "\n".join( assert normalized == "\n".join(
[ [
@@ -37,7 +35,7 @@ def test_normalize_tags_supports_all_raw_time_formats() -> None:
def test_normalize_tags_keeps_non_timed_lines_trimmed_and_unchanged() -> None: def test_normalize_tags_keeps_non_timed_lines_trimmed_and_unchanged() -> None:
raw = " plain line \n\n [ar:Meta Header] " raw = " plain line \n\n [ar:Meta Header] "
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "plain line\n\n[ar:Meta Header]" assert normalized == "plain line\n\n[ar:Meta Header]"
@@ -51,7 +49,7 @@ def test_normalize_tags_removes_word_sync_patterns() -> None:
"[00:05.00]<1,2,3>baz" "[00:05.00]<1,2,3>baz"
) )
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "\n".join( assert normalized == "\n".join(
[ [
@@ -67,14 +65,14 @@ def test_normalize_tags_removes_word_sync_patterns() -> None:
def test_normalize_tags_keeps_midline_timestamps_as_is() -> None: def test_normalize_tags_keeps_midline_timestamps_as_is() -> None:
raw = "[00:01.00]Lyric [00:02.00]line" raw = "[00:01.00]Lyric [00:02.00]line"
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "[00:01.00]Lyric [00:02.00]line" assert normalized == "[00:01.00]Lyric [00:02.00]line"
def test_normalize_tags_applies_positive_and_negative_offset_per_spec() -> None: def test_normalize_tags_applies_positive_and_negative_offset_per_spec() -> None:
positive = normalize_tags("[offset:+1000]\n[00:10.00]line") positive = _normalize("[offset:+1000]\n[00:10.00]line")
negative = normalize_tags("[offset:-500]\n[00:10.00]line") negative = _normalize("[offset:-500]\n[00:10.00]line")
assert positive == "[00:09.00]line" assert positive == "[00:09.00]line"
assert negative == "[00:10.50]line" assert negative == "[00:10.50]line"
@@ -83,7 +81,7 @@ def test_normalize_tags_applies_positive_and_negative_offset_per_spec() -> None:
def test_normalize_tags_accepts_leading_spaces_and_tabs_before_tags() -> None: def test_normalize_tags_accepts_leading_spaces_and_tabs_before_tags() -> None:
raw = "\t [00:01.2] hello" raw = "\t [00:01.2] hello"
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "[00:01.20]hello" assert normalized == "[00:01.20]hello"
@@ -91,7 +89,7 @@ def test_normalize_tags_accepts_leading_spaces_and_tabs_before_tags() -> None:
def test_normalize_tags_handles_consecutive_start_tags_with_spaces_between() -> None: def test_normalize_tags_handles_consecutive_start_tags_with_spaces_between() -> None:
raw = "[00:01] [00:02.3] chorus" raw = "[00:01] [00:02.3] chorus"
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "[00:01.00][00:02.30]chorus" assert normalized == "[00:01.00][00:02.30]chorus"
@@ -99,7 +97,7 @@ def test_normalize_tags_handles_consecutive_start_tags_with_spaces_between() ->
def test_normalize_tags_preserves_non_leading_raw_like_tags() -> None: def test_normalize_tags_preserves_non_leading_raw_like_tags() -> None:
raw = "intro [00:01]line" raw = "intro [00:01]line"
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "intro [00:01]line" assert normalized == "intro [00:01]line"
@@ -107,7 +105,7 @@ def test_normalize_tags_preserves_non_leading_raw_like_tags() -> None:
def test_normalize_tags_removes_offset_tag_line_even_without_lyrics() -> None: def test_normalize_tags_removes_offset_tag_line_even_without_lyrics() -> None:
raw = "[offset:+500]" raw = "[offset:+500]"
normalized = normalize_tags(raw) normalized = _normalize(raw)
assert normalized == "" assert normalized == ""
@@ -117,20 +115,20 @@ def test_is_synced_and_detect_sync_status_follow_non_zero_rule() -> None:
unsynced_text = "[00:00.00]a\n[00:00.00]b" unsynced_text = "[00:00.00]a\n[00:00.00]b"
synced_text = "[00:00.00]a\n[00:01.00]b" synced_text = "[00:00.00]a\n[00:01.00]b"
assert is_synced(plain_text) is False assert LRCData(plain_text).is_synced() is False
assert detect_sync_status(plain_text) is CacheStatus.SUCCESS_UNSYNCED assert LRCData(plain_text).detect_sync_status() is CacheStatus.SUCCESS_UNSYNCED
assert is_synced(unsynced_text) is False assert LRCData(unsynced_text).is_synced() is False
assert detect_sync_status(unsynced_text) is CacheStatus.SUCCESS_UNSYNCED assert LRCData(unsynced_text).detect_sync_status() is CacheStatus.SUCCESS_UNSYNCED
assert is_synced(synced_text) is True assert LRCData(synced_text).is_synced() is True
assert detect_sync_status(synced_text) is CacheStatus.SUCCESS_SYNCED assert LRCData(synced_text).detect_sync_status() is CacheStatus.SUCCESS_SYNCED
def test_normalize_unsynced_covers_documented_blank_and_tag_rules() -> None: def test_normalize_unsynced_covers_documented_blank_and_tag_rules() -> None:
lyrics = "\n[00:12.34]first\nsecond\n\n[00:00.00]third" lyrics = "\n[00:12.34]first\nsecond\n\n[00:00.00]third"
normalized = normalize_unsynced(lyrics) normalized = str(LRCData(lyrics).normalize_unsynced())
assert normalized == "\n".join( assert normalized == "\n".join(
[ [
@@ -152,7 +150,7 @@ def test_to_plain_duplicates_lines_by_leading_repeated_timestamps() -> None:
] ]
) )
plain = to_plain(text) plain = LRCData(text).to_plain()
# In synced mode, lines with standard tags are kept (including [00:00.00]), # In synced mode, lines with standard tags are kept (including [00:00.00]),
# while lines without leading standard tags are ignored. # while lines without leading standard tags are ignored.
@@ -171,7 +169,7 @@ def test_to_plain_deduplicate_collapses_only_consecutive_equals() -> None:
] ]
) )
plain = to_plain(text, deduplicate=True) plain = LRCData(text).to_plain(deduplicate=True)
assert plain == "\n".join(["hello", "", "world", "hello"]) assert plain == "\n".join(["hello", "", "world", "hello"])
@@ -179,7 +177,7 @@ def test_to_plain_deduplicate_collapses_only_consecutive_equals() -> None:
def test_to_plain_fallback_for_non_synced_text_strips_start_tags() -> None: def test_to_plain_fallback_for_non_synced_text_strips_start_tags() -> None:
text = "\n".join(["[ar:Artist]", "[00:00.00]only-zero", "plain line"]) text = "\n".join(["[ar:Artist]", "[00:00.00]only-zero", "plain line"])
plain = to_plain(text) plain = LRCData(text).to_plain()
assert plain == "only-zero\nplain line" assert plain == "only-zero\nplain line"
@@ -187,6 +185,14 @@ def test_to_plain_fallback_for_non_synced_text_strips_start_tags() -> None:
def test_to_plain_trims_leading_and_trailing_blank_lines() -> None: def test_to_plain_trims_leading_and_trailing_blank_lines() -> None:
text = "\n\n[00:01.00]line1\n\n[00:01.00]\n[00:02.00]line2\nline3\n \n" text = "\n\n[00:01.00]line1\n\n[00:01.00]\n[00:02.00]line2\nline3\n \n"
plain = to_plain(text) plain = LRCData(text).to_plain()
assert plain == "line1\n\nline2" assert plain == "line1\n\nline2"
def test_reformat_pipeline_trims_outer_blanks_and_preserves_inner_blanks() -> None:
text = "\n\n[00:01]a\n\n[00:02]b\n\n"
normalized = str(LRCData(text))
assert normalized == "[00:01.00]a\n\n[00:02.00]b"