refactor: add LRCData class

This commit is contained in:
2026-04-02 02:57:50 +02:00
parent 83c9553309
commit 8001c3f7e8
15 changed files with 259 additions and 220 deletions
+4 -3
View File
@@ -12,6 +12,7 @@ import unicodedata
from typing import Optional
from loguru import logger
from .lrc import LRCData
from .config import DURATION_TOLERANCE_MS
from .models import TrackMeta, LyricResult, CacheStatus
@@ -161,7 +162,7 @@ class CacheEngine:
)
return LyricResult(
status=CacheStatus(status_str),
lyrics=lyrics,
lyrics=LRCData(lyrics) if lyrics else None,
source=src,
ttl=remaining,
)
@@ -212,7 +213,7 @@ class CacheEngine:
key,
source,
result.status.value,
result.lyrics,
str(result.lyrics) if result.lyrics else None,
now,
expires_at,
track.artist,
@@ -316,7 +317,7 @@ class CacheEngine:
row = dict(rows[0])
return LyricResult(
status=CacheStatus(row["status"]),
lyrics=row["lyrics"],
lyrics=LRCData(row["lyrics"]) if row["lyrics"] else None,
source="cache-search",
)
+5 -5
View File
@@ -18,7 +18,7 @@ from .models import TrackMeta, CacheStatus
from .mpris import get_current_track
from .core import LrcManager
from .fetchers import FetcherMethodType
from .lrc import get_sidecar_path, print_lyrics, to_plain
from .lrc import get_sidecar_path
app = cyclopts.App(
@@ -120,7 +120,7 @@ def fetch(
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print_lyrics(result.lyrics, plain=plain)
result.lyrics.print_lyrics(plain=plain)
# search
@@ -208,7 +208,7 @@ def search(
logger.error("Only unsynced lyrics available (--only-synced requested).")
sys.exit(1)
print_lyrics(result.lyrics, plain=plain)
result.lyrics.print_lyrics(plain=plain)
# export
@@ -282,9 +282,9 @@ def export(
try:
with open(output, "w", encoding="utf-8") as f:
if plain:
f.write(to_plain(result.lyrics))
f.write(result.lyrics.to_plain())
else:
f.write(result.lyrics)
f.write(str(result.lyrics))
logger.info(f"Exported lyrics to {output}")
except Exception as e:
logger.error(f"Failed to write file: {e}")
+5 -5
View File
@@ -18,7 +18,7 @@ from loguru import logger
from .fetchers import FetcherMethodType, create_fetchers
from .fetchers.base import BaseFetcher
from .cache import CacheEngine
from .lrc import normalize_tags, normalize_unsynced, detect_sync_status
from .lrc import LRCData
from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR
from .models import TrackMeta, LyricResult, CacheStatus
from .enrichers import enrich_track
@@ -146,7 +146,7 @@ class LrcManager:
):
best_result = LyricResult(
status=best_result.status,
lyrics=normalize_unsynced(best_result.lyrics),
lyrics=best_result.lyrics.normalize_unsynced(),
source=best_result.source,
ttl=best_result.ttl,
)
@@ -167,10 +167,10 @@ class LrcManager:
"""Manually insert lyrics into the cache for a track."""
track = enrich_track(track)
logger.info(f"Manually inserting lyrics for: {track.display_name()}")
lyrics = normalize_tags(lyrics)
lrc = LRCData(lyrics)
result = LyricResult(
status=detect_sync_status(lyrics),
lyrics=normalize_tags(lyrics),
status=lrc.detect_sync_status(),
lyrics=lrc,
source="manual",
ttl=None,
)
+3 -1
View File
@@ -13,9 +13,11 @@ albums or is played from different players.
from typing import Optional
from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..cache import CacheEngine
from ..lrc import LRCData
class CacheSearchFetcher(BaseFetcher):
@@ -80,6 +82,6 @@ class CacheSearchFetcher(BaseFetcher):
)
return LyricResult(
status=status,
lyrics=best["lyrics"],
lyrics=LRCData(best["lyrics"]),
source=self.source_name,
)
+15 -9
View File
@@ -17,7 +17,7 @@ from mutagen.flac import FLAC
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult
from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path
from ..lrc import get_audio_path, get_sidecar_path, LRCData
class LocalFetcher(BaseFetcher):
@@ -48,11 +48,15 @@ class LocalFetcher(BaseFetcher):
with open(lrc_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if content:
content = normalize_tags(content)
status = detect_sync_status(content)
logger.info(f"Local: found .lrc sidecar ({status.value})")
lrc = LRCData(content)
status = lrc.detect_sync_status()
logger.info(
f"Local: found .lrc sidecar ({status.value}) for {audio_path.name}"
)
return LyricResult(
status=status, lyrics=content, source=self.source_name
status=status,
lyrics=lrc,
source=self.source_name,
)
except Exception as e:
logger.error(f"Local: error reading {lrc_path}: {e}")
@@ -81,12 +85,14 @@ class LocalFetcher(BaseFetcher):
break
if lyrics:
lyrics = normalize_tags(lyrics.strip())
status = detect_sync_status(lyrics)
logger.info(f"Local: found embedded lyrics ({status.value})")
lrc = LRCData(lyrics)
status = lrc.detect_sync_status()
logger.info(
f"Local: found embedded lyrics ({status.value}) for {audio_path.name}"
)
return LyricResult(
status=status,
lyrics=lyrics,
lyrics=lrc,
source=f"{self.source_name} (embedded)",
)
else:
+5 -9
View File
@@ -15,7 +15,7 @@ from urllib.parse import urlencode
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
@@ -79,20 +79,16 @@ class LrclibFetcher(BaseFetcher):
unsynced = data.get("plainLyrics")
if isinstance(synced, str) and synced.strip():
lyrics = normalize_tags(synced.strip())
logger.info(
f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)"
)
lyrics = LRCData(synced)
logger.info(f"LRCLIB: got synced lyrics ({len(lyrics)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
)
elif isinstance(unsynced, str) and unsynced.strip():
lyrics = normalize_tags(unsynced.strip())
logger.info(
f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
)
lyrics = LRCData(unsynced)
logger.info(f"LRCLIB: got unsynced lyrics ({len(lyrics)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
+5 -9
View File
@@ -16,7 +16,7 @@ from urllib.parse import urlencode
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_UNSYNCED,
@@ -82,20 +82,16 @@ class LrclibSearchFetcher(BaseFetcher):
unsynced = best.get("plainLyrics")
if isinstance(synced, str) and synced.strip():
lyrics = normalize_tags(synced.strip())
logger.info(
f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)"
)
lyrics = LRCData(synced)
logger.info(f"LRCLIB-search: got synced lyrics ({len(lyrics)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_SYNCED,
lyrics=lyrics,
source=self.source_name,
)
elif isinstance(unsynced, str) and unsynced.strip():
lyrics = normalize_tags(unsynced.strip())
logger.info(
f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
)
lyrics = LRCData(unsynced)
logger.info(f"LRCLIB-search: got unsynced lyrics ({len(lyrics)} lines)")
return LyricResult(
status=CacheStatus.SUCCESS_UNSYNCED,
lyrics=lyrics,
+5 -7
View File
@@ -18,7 +18,7 @@ from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import detect_sync_status, normalize_tags
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
@@ -181,15 +181,13 @@ class NeteaseFetcher(BaseFetcher):
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
# Determine sync status
lrc = normalize_tags(lrc)
status = detect_sync_status(lrc)
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
logger.info(
f"Netease: got {status.value} lyrics for song_id={song_id} "
f"({len(lrc.splitlines())} lines)"
)
return LyricResult(
status=status, lyrics=lrc.strip(), source=self.source_name
f"({len(lrcdata)} lines)"
)
return LyricResult(status=status, lyrics=lrcdata, source=self.source_name)
except Exception as e:
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
+5 -7
View File
@@ -17,7 +17,7 @@ from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import detect_sync_status, normalize_tags
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
TTL_NOT_FOUND,
@@ -142,15 +142,13 @@ class QQMusicFetcher(BaseFetcher):
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
lrc = normalize_tags(lrc)
status = detect_sync_status(lrc)
lrcdata = LRCData(lrc)
status = lrcdata.detect_sync_status()
logger.info(
f"QQMusic: got {status.value} lyrics for mid={mid} "
f"({len(lrc.splitlines())} lines)"
)
return LyricResult(
status=status, lyrics=lrc.strip(), source=self.source_name
f"({len(lrcdata)} lines)"
)
return LyricResult(status=status, lyrics=lrcdata, source=self.source_name)
except Exception as e:
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
+2 -2
View File
@@ -28,7 +28,7 @@ from loguru import logger
from .base import BaseFetcher
from ..models import TrackMeta, LyricResult, CacheStatus
from ..lrc import normalize_tags
from ..lrc import LRCData
from ..config import (
HTTP_TIMEOUT,
SPOTIFY_APP_VERSION,
@@ -358,7 +358,7 @@ class SpotifyFetcher(BaseFetcher):
# Unsynced: emit with zero timestamps
lrc_lines.append(f"[00:00.00]{words}")
content = normalize_tags("\n".join(lrc_lines))
content = LRCData("\n".join(lrc_lines))
status = (
CacheStatus.SUCCESS_SYNCED
if is_synced
+159 -128
View File
@@ -70,7 +70,7 @@ def _sanitize_lyric_text(text: str) -> str:
return _remove_pattern(text, _WORD_SYNC_TAG_RE)
def _reformat(text: str) -> str:
def _reformat(text: str) -> list[str]:
"""Parse each line and reformat to standard [mm:ss.cc]...content form.
Handles any mix of time tag formats on input. Lines with no time tags
@@ -99,83 +99,179 @@ def _reformat(text: str) -> str:
else:
out.append(line)
# Empty lines with no tags are also preserved
return "\n".join(out)
# Remove empty lines at the start and end of the whole text, but preserve blank lines in the middle
while out and not out[0].strip():
out.pop(0)
while out and not out[-1].strip():
out.pop()
return out
def _apply_offset(text: str) -> str:
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
class LRCData:
_lines: list[str]
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
"""
m = _OFFSET_RE.search(text)
if not m:
return text
offset_ms = int(m.group(1))
text = _OFFSET_RE.sub("", text).strip("\n")
if offset_ms == 0:
return text
def __init__(self, text: str | None = None) -> None:
if not text:
self._lines = []
return
self._lines = _reformat(text)
self._apply_offset()
def _shift(match: re.Match) -> str:
total_ms = max(
0,
(int(match.group(1)) * 60 + int(match.group(2))) * 1000
+ int(match.group(3)) * 10
- offset_ms,
def __str__(self) -> str:
return "\n".join(self._lines)
def __repr__(self) -> str:
return f"LRCData(lines={self._lines!r})"
def __bool__(self) -> bool:
return len(self._lines) > 0
def __len__(self) -> int:
return len(self._lines)
def _apply_offset(self):
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
"""
m: Optional[re.Match] = None
for i, line in enumerate(self._lines):
m = _OFFSET_RE.search(line)
if m:
self._lines.pop(i)
break
if not m:
return
offset_ms = int(m.group(1))
if offset_ms == 0:
return
def _shift(match: re.Match) -> str:
total_ms = max(
0,
(int(match.group(1)) * 60 + int(match.group(2))) * 1000
+ int(match.group(3)) * 10
- offset_ms,
)
new_mm = total_ms // 60000
new_ss = (total_ms % 60000) // 1000
new_cs = min(round((total_ms % 1000) / 10), 99)
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
self._lines = [_STD_TAG_CAPTURE_RE.sub(_shift, line) for line in self._lines]
def is_synced(self) -> bool:
"""Check whether text contains non-zero LRC time tags.
Assumes text has been normalized by normalize (standard [mm:ss.cc] format).
"""
for line in self._lines:
for m in _STD_TAG_CAPTURE_RE.finditer(line):
if m.group(1) != "00" or m.group(2) != "00" or m.group(3) != "00":
return True
return False
def detect_sync_status(self) -> CacheStatus:
"""Determine whether lyrics contain meaningful LRC time tags.
Assumes text has been normalized by normalize.
"""
return (
CacheStatus.SUCCESS_SYNCED
if self.is_synced()
else CacheStatus.SUCCESS_UNSYNCED
)
new_mm = total_ms // 60000
new_ss = (total_ms % 60000) // 1000
new_cs = min(round((total_ms % 1000) / 10), 99)
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
return _STD_TAG_CAPTURE_RE.sub(_shift, text)
def normalize_unsynced(self):
"""Normalize unsynced lyrics so every line has a [00:00.00] tag.
Assumes lyrics have been normalized by normalize.
- Lines that already have time tags: replace with [00:00.00]
- Lines without leading tags: prepend [00:00.00]
- Blank lines in middle are converted to [00:00.00]
"""
out: list[str] = []
first = True
for i, line in enumerate(self._lines):
stripped = line.strip()
if not stripped and not first:
out.append("[00:00.00]")
continue
elif not stripped:
# Skip leading blank lines
continue
first = False
cleaned = _remove_pattern(line, _LINE_START_STD_TAGS_RE)
out.append(f"[00:00.00]{cleaned}")
ret = LRCData()
ret._lines = out
return ret
def normalize_tags(text: str) -> str:
"""Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset."""
return _apply_offset(_reformat(text))
def to_plain(
self,
deduplicate: bool = False,
) -> str:
"""Convert lyrics to plain text with all tags stripped.
If deduplicate is True, only keep the first line of consecutive lines with the same lyric text (after stripping tags).
Otherwise, lines with multiple time tags will be duplicated as many times as the number of tags.
Assumes text has been normalized by normalize.
"""
def is_synced(text: str) -> bool:
"""Check whether text contains non-zero LRC time tags.
if not self.is_synced():
return "\n".join(
_remove_pattern(line, _LINE_START_TAGS_RE) for line in self._lines
).strip("\n")
Assumes text has been normalized by normalize (standard [mm:ss.cc] format).
"""
tags = _STD_TAG_RE.findall(text)
return bool(tags) and any(tag != "[00:00.00]" for tag in tags)
lines = []
for line in self._lines:
pos = 0
cnt = 0
plain_line = ""
while True:
# Only match strictly repeated standard time tags at the start of the line
# Lines without any time tags are ignored.
# Lyric lines are considered already stripped of whitespaces, so no strips here.
m = _STD_TAG_RE.match(line, pos)
if not m:
plain_line += line[pos:]
break
pos = m.end()
cnt += 1
# Also avoid dulplicating blank lines
if deduplicate or not plain_line:
if cnt > 0:
lines.append(plain_line)
else:
for _ in range(cnt):
lines.append(plain_line)
if deduplicate:
# Remove consecutive duplicates
deduped_lines = []
prev_line = None
for line in lines:
if line != prev_line:
deduped_lines.append(line)
prev_line = line
lines = deduped_lines
def detect_sync_status(text: str) -> CacheStatus:
"""Determine whether lyrics contain meaningful LRC time tags.
return "\n".join(lines).strip()
Assumes text has been normalized by normalize.
"""
return (
CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED
)
def print_lyrics(
self,
plain: bool = False,
) -> None:
"""Print lyrics, optionally stripping tags.
def normalize_unsynced(lyrics: str) -> str:
"""Normalize unsynced lyrics so every line has a [00:00.00] tag.
Assumes lyrics have been normalized by normalize.
- Lines that already have time tags: replace with [00:00.00]
- Lines without leading tags: prepend [00:00.00]
- Blank lines in middle are converted to [00:00.00]
"""
out: list[str] = []
first = True
for line in lyrics.splitlines():
stripped = line.strip()
if not stripped and not first:
out.append("[00:00.00]")
continue
elif not stripped:
# Skip leading blank lines
continue
first = False
cleaned = _remove_pattern(line, _LINE_START_STD_TAGS_RE)
out.append(f"[00:00.00]{cleaned}")
return "\n".join(out)
Assumes text has been normalized by normalize.
"""
if plain:
print(self.to_plain())
else:
print("\n".join(self._lines))
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
@@ -204,68 +300,3 @@ def get_sidecar_path(
if ensure_exists and not lrc_path.exists():
return None
return lrc_path
def to_plain(
text: str,
deduplicate: bool = False,
) -> str:
"""Convert lyrics to plain text with all tags stripped.
If deduplicate is True, only keep the first line of consecutive lines with the same lyric text (after stripping tags).
Otherwise, lines with multiple time tags will be duplicated as many times as the number of tags.
Assumes text has been normalized by normalize.
"""
if not is_synced(text):
# If there are no meaningful time tags, just strip all tags and return
return _remove_pattern(text, _LINE_START_TAGS_RE)
lines = []
for line in text.splitlines():
pos = 0
cnt = 0
plain_line = ""
while True:
# Only match strictly repeated standard time tags at the start of the line
# Lines without any time tags are ignored.
# Lyric lines are considered already stripped of whitespaces, so no strips here.
m = _STD_TAG_RE.match(line, pos)
if not m:
plain_line += line[pos:]
break
pos = m.end()
cnt += 1
# Also avoid dulplicating blank lines
if deduplicate or not plain_line:
if cnt > 0:
lines.append(plain_line)
else:
for _ in range(cnt):
lines.append(plain_line)
if deduplicate:
# Remove consecutive duplicates
deduped_lines = []
prev_line = None
for line in lines:
if line != prev_line:
deduped_lines.append(line)
prev_line = line
lines = deduped_lines
return "\n".join(lines).strip("\n")
def print_lyrics(
text: str,
plain: bool = False,
) -> None:
"""Print lyrics, optionally stripping tags.
Assumes text has been normalized by normalize.
"""
if plain:
print(to_plain(text))
else:
print(text)
+7 -2
View File
@@ -4,10 +4,15 @@ Date: 2026-03-25 04:09:36
Description: Data models
"""
from __future__ import annotations
from enum import Enum
from typing import Optional
from typing import Optional, TYPE_CHECKING
from dataclasses import dataclass
if TYPE_CHECKING:
from .lrc import LRCData
class CacheStatus(str, Enum):
"""Status of a cached lyric entry."""
@@ -54,6 +59,6 @@ class LyricResult:
"""Result of a lyric fetch attempt, also used as cache record."""
status: CacheStatus
lyrics: Optional[str] = None
lyrics: Optional[LRCData] = None
source: Optional[str] = None # Which fetcher produced this result
ttl: Optional[int] = None # Hint for cache TTL (seconds)