refactor: add LRCData class
This commit is contained in:
+4
-3
@@ -12,6 +12,7 @@ import unicodedata
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
|
||||
from .lrc import LRCData
|
||||
from .config import DURATION_TOLERANCE_MS
|
||||
from .models import TrackMeta, LyricResult, CacheStatus
|
||||
|
||||
@@ -161,7 +162,7 @@ class CacheEngine:
|
||||
)
|
||||
return LyricResult(
|
||||
status=CacheStatus(status_str),
|
||||
lyrics=lyrics,
|
||||
lyrics=LRCData(lyrics) if lyrics else None,
|
||||
source=src,
|
||||
ttl=remaining,
|
||||
)
|
||||
@@ -212,7 +213,7 @@ class CacheEngine:
|
||||
key,
|
||||
source,
|
||||
result.status.value,
|
||||
result.lyrics,
|
||||
str(result.lyrics) if result.lyrics else None,
|
||||
now,
|
||||
expires_at,
|
||||
track.artist,
|
||||
@@ -316,7 +317,7 @@ class CacheEngine:
|
||||
row = dict(rows[0])
|
||||
return LyricResult(
|
||||
status=CacheStatus(row["status"]),
|
||||
lyrics=row["lyrics"],
|
||||
lyrics=LRCData(row["lyrics"]) if row["lyrics"] else None,
|
||||
source="cache-search",
|
||||
)
|
||||
|
||||
|
||||
+5
-5
@@ -18,7 +18,7 @@ from .models import TrackMeta, CacheStatus
|
||||
from .mpris import get_current_track
|
||||
from .core import LrcManager
|
||||
from .fetchers import FetcherMethodType
|
||||
from .lrc import get_sidecar_path, print_lyrics, to_plain
|
||||
from .lrc import get_sidecar_path
|
||||
|
||||
|
||||
app = cyclopts.App(
|
||||
@@ -120,7 +120,7 @@ def fetch(
|
||||
logger.error("Only unsynced lyrics available (--only-synced requested).")
|
||||
sys.exit(1)
|
||||
|
||||
print_lyrics(result.lyrics, plain=plain)
|
||||
result.lyrics.print_lyrics(plain=plain)
|
||||
|
||||
|
||||
# search
|
||||
@@ -208,7 +208,7 @@ def search(
|
||||
logger.error("Only unsynced lyrics available (--only-synced requested).")
|
||||
sys.exit(1)
|
||||
|
||||
print_lyrics(result.lyrics, plain=plain)
|
||||
result.lyrics.print_lyrics(plain=plain)
|
||||
|
||||
|
||||
# export
|
||||
@@ -282,9 +282,9 @@ def export(
|
||||
try:
|
||||
with open(output, "w", encoding="utf-8") as f:
|
||||
if plain:
|
||||
f.write(to_plain(result.lyrics))
|
||||
f.write(result.lyrics.to_plain())
|
||||
else:
|
||||
f.write(result.lyrics)
|
||||
f.write(str(result.lyrics))
|
||||
logger.info(f"Exported lyrics to {output}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write file: {e}")
|
||||
|
||||
+5
-5
@@ -18,7 +18,7 @@ from loguru import logger
|
||||
from .fetchers import FetcherMethodType, create_fetchers
|
||||
from .fetchers.base import BaseFetcher
|
||||
from .cache import CacheEngine
|
||||
from .lrc import normalize_tags, normalize_unsynced, detect_sync_status
|
||||
from .lrc import LRCData
|
||||
from .config import TTL_SYNCED, TTL_UNSYNCED, TTL_NOT_FOUND, TTL_NETWORK_ERROR
|
||||
from .models import TrackMeta, LyricResult, CacheStatus
|
||||
from .enrichers import enrich_track
|
||||
@@ -146,7 +146,7 @@ class LrcManager:
|
||||
):
|
||||
best_result = LyricResult(
|
||||
status=best_result.status,
|
||||
lyrics=normalize_unsynced(best_result.lyrics),
|
||||
lyrics=best_result.lyrics.normalize_unsynced(),
|
||||
source=best_result.source,
|
||||
ttl=best_result.ttl,
|
||||
)
|
||||
@@ -167,10 +167,10 @@ class LrcManager:
|
||||
"""Manually insert lyrics into the cache for a track."""
|
||||
track = enrich_track(track)
|
||||
logger.info(f"Manually inserting lyrics for: {track.display_name()}")
|
||||
lyrics = normalize_tags(lyrics)
|
||||
lrc = LRCData(lyrics)
|
||||
result = LyricResult(
|
||||
status=detect_sync_status(lyrics),
|
||||
lyrics=normalize_tags(lyrics),
|
||||
status=lrc.detect_sync_status(),
|
||||
lyrics=lrc,
|
||||
source="manual",
|
||||
ttl=None,
|
||||
)
|
||||
|
||||
@@ -13,9 +13,11 @@ albums or is played from different players.
|
||||
from typing import Optional
|
||||
from loguru import logger
|
||||
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..cache import CacheEngine
|
||||
from ..lrc import LRCData
|
||||
|
||||
|
||||
class CacheSearchFetcher(BaseFetcher):
|
||||
@@ -80,6 +82,6 @@ class CacheSearchFetcher(BaseFetcher):
|
||||
)
|
||||
return LyricResult(
|
||||
status=status,
|
||||
lyrics=best["lyrics"],
|
||||
lyrics=LRCData(best["lyrics"]),
|
||||
source=self.source_name,
|
||||
)
|
||||
|
||||
@@ -17,7 +17,7 @@ from mutagen.flac import FLAC
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult
|
||||
from ..lrc import detect_sync_status, normalize_tags, get_audio_path, get_sidecar_path
|
||||
from ..lrc import get_audio_path, get_sidecar_path, LRCData
|
||||
|
||||
|
||||
class LocalFetcher(BaseFetcher):
|
||||
@@ -48,11 +48,15 @@ class LocalFetcher(BaseFetcher):
|
||||
with open(lrc_path, "r", encoding="utf-8") as f:
|
||||
content = f.read().strip()
|
||||
if content:
|
||||
content = normalize_tags(content)
|
||||
status = detect_sync_status(content)
|
||||
logger.info(f"Local: found .lrc sidecar ({status.value})")
|
||||
lrc = LRCData(content)
|
||||
status = lrc.detect_sync_status()
|
||||
logger.info(
|
||||
f"Local: found .lrc sidecar ({status.value}) for {audio_path.name}"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status, lyrics=content, source=self.source_name
|
||||
status=status,
|
||||
lyrics=lrc,
|
||||
source=self.source_name,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Local: error reading {lrc_path}: {e}")
|
||||
@@ -81,12 +85,14 @@ class LocalFetcher(BaseFetcher):
|
||||
break
|
||||
|
||||
if lyrics:
|
||||
lyrics = normalize_tags(lyrics.strip())
|
||||
status = detect_sync_status(lyrics)
|
||||
logger.info(f"Local: found embedded lyrics ({status.value})")
|
||||
lrc = LRCData(lyrics)
|
||||
status = lrc.detect_sync_status()
|
||||
logger.info(
|
||||
f"Local: found embedded lyrics ({status.value}) for {audio_path.name}"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status,
|
||||
lyrics=lyrics,
|
||||
lyrics=lrc,
|
||||
source=f"{self.source_name} (embedded)",
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -15,7 +15,7 @@ from urllib.parse import urlencode
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import normalize_tags
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_UNSYNCED,
|
||||
@@ -79,20 +79,16 @@ class LrclibFetcher(BaseFetcher):
|
||||
unsynced = data.get("plainLyrics")
|
||||
|
||||
if isinstance(synced, str) and synced.strip():
|
||||
lyrics = normalize_tags(synced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB: got synced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
lyrics = LRCData(synced)
|
||||
logger.info(f"LRCLIB: got synced lyrics ({len(lyrics)} lines)")
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_SYNCED,
|
||||
lyrics=lyrics,
|
||||
source=self.source_name,
|
||||
)
|
||||
elif isinstance(unsynced, str) and unsynced.strip():
|
||||
lyrics = normalize_tags(unsynced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
lyrics = LRCData(unsynced)
|
||||
logger.info(f"LRCLIB: got unsynced lyrics ({len(lyrics)} lines)")
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_UNSYNCED,
|
||||
lyrics=lyrics,
|
||||
|
||||
@@ -16,7 +16,7 @@ from urllib.parse import urlencode
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import normalize_tags
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_UNSYNCED,
|
||||
@@ -82,20 +82,16 @@ class LrclibSearchFetcher(BaseFetcher):
|
||||
unsynced = best.get("plainLyrics")
|
||||
|
||||
if isinstance(synced, str) and synced.strip():
|
||||
lyrics = normalize_tags(synced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB-search: got synced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
lyrics = LRCData(synced)
|
||||
logger.info(f"LRCLIB-search: got synced lyrics ({len(lyrics)} lines)")
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_SYNCED,
|
||||
lyrics=lyrics,
|
||||
source=self.source_name,
|
||||
)
|
||||
elif isinstance(unsynced, str) and unsynced.strip():
|
||||
lyrics = normalize_tags(unsynced.strip())
|
||||
logger.info(
|
||||
f"LRCLIB-search: got unsynced lyrics ({len(lyrics.splitlines())} lines)"
|
||||
)
|
||||
lyrics = LRCData(unsynced)
|
||||
logger.info(f"LRCLIB-search: got unsynced lyrics ({len(lyrics)} lines)")
|
||||
return LyricResult(
|
||||
status=CacheStatus.SUCCESS_UNSYNCED,
|
||||
lyrics=lyrics,
|
||||
|
||||
@@ -18,7 +18,7 @@ from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import detect_sync_status, normalize_tags
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
@@ -181,15 +181,13 @@ class NeteaseFetcher(BaseFetcher):
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
# Determine sync status
|
||||
lrc = normalize_tags(lrc)
|
||||
status = detect_sync_status(lrc)
|
||||
lrcdata = LRCData(lrc)
|
||||
status = lrcdata.detect_sync_status()
|
||||
logger.info(
|
||||
f"Netease: got {status.value} lyrics for song_id={song_id} "
|
||||
f"({len(lrc.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status, lyrics=lrc.strip(), source=self.source_name
|
||||
f"({len(lrcdata)} lines)"
|
||||
)
|
||||
return LyricResult(status=status, lyrics=lrcdata, source=self.source_name)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Netease: lyric fetch failed for song_id={song_id}: {e}")
|
||||
|
||||
@@ -17,7 +17,7 @@ from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import detect_sync_status, normalize_tags
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
TTL_NOT_FOUND,
|
||||
@@ -142,15 +142,13 @@ class QQMusicFetcher(BaseFetcher):
|
||||
logger.debug(f"QQMusic: empty lyrics for mid={mid}")
|
||||
return LyricResult(status=CacheStatus.NOT_FOUND, ttl=TTL_NOT_FOUND)
|
||||
|
||||
lrc = normalize_tags(lrc)
|
||||
status = detect_sync_status(lrc)
|
||||
lrcdata = LRCData(lrc)
|
||||
status = lrcdata.detect_sync_status()
|
||||
logger.info(
|
||||
f"QQMusic: got {status.value} lyrics for mid={mid} "
|
||||
f"({len(lrc.splitlines())} lines)"
|
||||
)
|
||||
return LyricResult(
|
||||
status=status, lyrics=lrc.strip(), source=self.source_name
|
||||
f"({len(lrcdata)} lines)"
|
||||
)
|
||||
return LyricResult(status=status, lyrics=lrcdata, source=self.source_name)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"QQMusic: lyric fetch failed for mid={mid}: {e}")
|
||||
|
||||
@@ -28,7 +28,7 @@ from loguru import logger
|
||||
|
||||
from .base import BaseFetcher
|
||||
from ..models import TrackMeta, LyricResult, CacheStatus
|
||||
from ..lrc import normalize_tags
|
||||
from ..lrc import LRCData
|
||||
from ..config import (
|
||||
HTTP_TIMEOUT,
|
||||
SPOTIFY_APP_VERSION,
|
||||
@@ -358,7 +358,7 @@ class SpotifyFetcher(BaseFetcher):
|
||||
# Unsynced: emit with zero timestamps
|
||||
lrc_lines.append(f"[00:00.00]{words}")
|
||||
|
||||
content = normalize_tags("\n".join(lrc_lines))
|
||||
content = LRCData("\n".join(lrc_lines))
|
||||
status = (
|
||||
CacheStatus.SUCCESS_SYNCED
|
||||
if is_synced
|
||||
|
||||
+159
-128
@@ -70,7 +70,7 @@ def _sanitize_lyric_text(text: str) -> str:
|
||||
return _remove_pattern(text, _WORD_SYNC_TAG_RE)
|
||||
|
||||
|
||||
def _reformat(text: str) -> str:
|
||||
def _reformat(text: str) -> list[str]:
|
||||
"""Parse each line and reformat to standard [mm:ss.cc]...content form.
|
||||
|
||||
Handles any mix of time tag formats on input. Lines with no time tags
|
||||
@@ -99,83 +99,179 @@ def _reformat(text: str) -> str:
|
||||
else:
|
||||
out.append(line)
|
||||
# Empty lines with no tags are also preserved
|
||||
return "\n".join(out)
|
||||
|
||||
# Remove empty lines at the start and end of the whole text, but preserve blank lines in the middle
|
||||
while out and not out[0].strip():
|
||||
out.pop(0)
|
||||
while out and not out[-1].strip():
|
||||
out.pop()
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _apply_offset(text: str) -> str:
|
||||
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
|
||||
class LRCData:
|
||||
_lines: list[str]
|
||||
|
||||
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
|
||||
"""
|
||||
m = _OFFSET_RE.search(text)
|
||||
if not m:
|
||||
return text
|
||||
offset_ms = int(m.group(1))
|
||||
text = _OFFSET_RE.sub("", text).strip("\n")
|
||||
if offset_ms == 0:
|
||||
return text
|
||||
def __init__(self, text: str | None = None) -> None:
|
||||
if not text:
|
||||
self._lines = []
|
||||
return
|
||||
self._lines = _reformat(text)
|
||||
self._apply_offset()
|
||||
|
||||
def _shift(match: re.Match) -> str:
|
||||
total_ms = max(
|
||||
0,
|
||||
(int(match.group(1)) * 60 + int(match.group(2))) * 1000
|
||||
+ int(match.group(3)) * 10
|
||||
- offset_ms,
|
||||
def __str__(self) -> str:
|
||||
return "\n".join(self._lines)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"LRCData(lines={self._lines!r})"
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return len(self._lines) > 0
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._lines)
|
||||
|
||||
def _apply_offset(self):
|
||||
"""Parse [offset:±ms] and shift all standard [mm:ss.cc] tags accordingly.
|
||||
|
||||
Per LRC spec, positive offset = lyrics appear sooner (subtract from timestamps).
|
||||
"""
|
||||
m: Optional[re.Match] = None
|
||||
for i, line in enumerate(self._lines):
|
||||
m = _OFFSET_RE.search(line)
|
||||
if m:
|
||||
self._lines.pop(i)
|
||||
break
|
||||
if not m:
|
||||
return
|
||||
offset_ms = int(m.group(1))
|
||||
if offset_ms == 0:
|
||||
return
|
||||
|
||||
def _shift(match: re.Match) -> str:
|
||||
total_ms = max(
|
||||
0,
|
||||
(int(match.group(1)) * 60 + int(match.group(2))) * 1000
|
||||
+ int(match.group(3)) * 10
|
||||
- offset_ms,
|
||||
)
|
||||
new_mm = total_ms // 60000
|
||||
new_ss = (total_ms % 60000) // 1000
|
||||
new_cs = min(round((total_ms % 1000) / 10), 99)
|
||||
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
|
||||
|
||||
self._lines = [_STD_TAG_CAPTURE_RE.sub(_shift, line) for line in self._lines]
|
||||
|
||||
def is_synced(self) -> bool:
|
||||
"""Check whether text contains non-zero LRC time tags.
|
||||
|
||||
Assumes text has been normalized by normalize (standard [mm:ss.cc] format).
|
||||
"""
|
||||
for line in self._lines:
|
||||
for m in _STD_TAG_CAPTURE_RE.finditer(line):
|
||||
if m.group(1) != "00" or m.group(2) != "00" or m.group(3) != "00":
|
||||
return True
|
||||
return False
|
||||
|
||||
def detect_sync_status(self) -> CacheStatus:
|
||||
"""Determine whether lyrics contain meaningful LRC time tags.
|
||||
|
||||
Assumes text has been normalized by normalize.
|
||||
"""
|
||||
return (
|
||||
CacheStatus.SUCCESS_SYNCED
|
||||
if self.is_synced()
|
||||
else CacheStatus.SUCCESS_UNSYNCED
|
||||
)
|
||||
new_mm = total_ms // 60000
|
||||
new_ss = (total_ms % 60000) // 1000
|
||||
new_cs = min(round((total_ms % 1000) / 10), 99)
|
||||
return f"[{new_mm:02d}:{new_ss:02d}.{new_cs:02d}]"
|
||||
|
||||
return _STD_TAG_CAPTURE_RE.sub(_shift, text)
|
||||
def normalize_unsynced(self):
|
||||
"""Normalize unsynced lyrics so every line has a [00:00.00] tag.
|
||||
|
||||
Assumes lyrics have been normalized by normalize.
|
||||
- Lines that already have time tags: replace with [00:00.00]
|
||||
- Lines without leading tags: prepend [00:00.00]
|
||||
- Blank lines in middle are converted to [00:00.00]
|
||||
"""
|
||||
out: list[str] = []
|
||||
first = True
|
||||
for i, line in enumerate(self._lines):
|
||||
stripped = line.strip()
|
||||
if not stripped and not first:
|
||||
out.append("[00:00.00]")
|
||||
continue
|
||||
elif not stripped:
|
||||
# Skip leading blank lines
|
||||
continue
|
||||
first = False
|
||||
cleaned = _remove_pattern(line, _LINE_START_STD_TAGS_RE)
|
||||
out.append(f"[00:00.00]{cleaned}")
|
||||
ret = LRCData()
|
||||
ret._lines = out
|
||||
return ret
|
||||
|
||||
def normalize_tags(text: str) -> str:
|
||||
"""Normalize LRC to standard form: reformat all tags to [mm:ss.cc], then apply offset."""
|
||||
return _apply_offset(_reformat(text))
|
||||
def to_plain(
|
||||
self,
|
||||
deduplicate: bool = False,
|
||||
) -> str:
|
||||
"""Convert lyrics to plain text with all tags stripped.
|
||||
|
||||
If deduplicate is True, only keep the first line of consecutive lines with the same lyric text (after stripping tags).
|
||||
Otherwise, lines with multiple time tags will be duplicated as many times as the number of tags.
|
||||
Assumes text has been normalized by normalize.
|
||||
"""
|
||||
|
||||
def is_synced(text: str) -> bool:
|
||||
"""Check whether text contains non-zero LRC time tags.
|
||||
if not self.is_synced():
|
||||
return "\n".join(
|
||||
_remove_pattern(line, _LINE_START_TAGS_RE) for line in self._lines
|
||||
).strip("\n")
|
||||
|
||||
Assumes text has been normalized by normalize (standard [mm:ss.cc] format).
|
||||
"""
|
||||
tags = _STD_TAG_RE.findall(text)
|
||||
return bool(tags) and any(tag != "[00:00.00]" for tag in tags)
|
||||
lines = []
|
||||
for line in self._lines:
|
||||
pos = 0
|
||||
cnt = 0
|
||||
plain_line = ""
|
||||
while True:
|
||||
# Only match strictly repeated standard time tags at the start of the line
|
||||
# Lines without any time tags are ignored.
|
||||
# Lyric lines are considered already stripped of whitespaces, so no strips here.
|
||||
m = _STD_TAG_RE.match(line, pos)
|
||||
if not m:
|
||||
plain_line += line[pos:]
|
||||
break
|
||||
pos = m.end()
|
||||
cnt += 1
|
||||
# Also avoid dulplicating blank lines
|
||||
if deduplicate or not plain_line:
|
||||
if cnt > 0:
|
||||
lines.append(plain_line)
|
||||
else:
|
||||
for _ in range(cnt):
|
||||
lines.append(plain_line)
|
||||
|
||||
if deduplicate:
|
||||
# Remove consecutive duplicates
|
||||
deduped_lines = []
|
||||
prev_line = None
|
||||
for line in lines:
|
||||
if line != prev_line:
|
||||
deduped_lines.append(line)
|
||||
prev_line = line
|
||||
lines = deduped_lines
|
||||
|
||||
def detect_sync_status(text: str) -> CacheStatus:
|
||||
"""Determine whether lyrics contain meaningful LRC time tags.
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
Assumes text has been normalized by normalize.
|
||||
"""
|
||||
return (
|
||||
CacheStatus.SUCCESS_SYNCED if is_synced(text) else CacheStatus.SUCCESS_UNSYNCED
|
||||
)
|
||||
def print_lyrics(
|
||||
self,
|
||||
plain: bool = False,
|
||||
) -> None:
|
||||
"""Print lyrics, optionally stripping tags.
|
||||
|
||||
|
||||
def normalize_unsynced(lyrics: str) -> str:
|
||||
"""Normalize unsynced lyrics so every line has a [00:00.00] tag.
|
||||
|
||||
Assumes lyrics have been normalized by normalize.
|
||||
- Lines that already have time tags: replace with [00:00.00]
|
||||
- Lines without leading tags: prepend [00:00.00]
|
||||
- Blank lines in middle are converted to [00:00.00]
|
||||
"""
|
||||
out: list[str] = []
|
||||
first = True
|
||||
for line in lyrics.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped and not first:
|
||||
out.append("[00:00.00]")
|
||||
continue
|
||||
elif not stripped:
|
||||
# Skip leading blank lines
|
||||
continue
|
||||
first = False
|
||||
cleaned = _remove_pattern(line, _LINE_START_STD_TAGS_RE)
|
||||
out.append(f"[00:00.00]{cleaned}")
|
||||
return "\n".join(out)
|
||||
Assumes text has been normalized by normalize.
|
||||
"""
|
||||
if plain:
|
||||
print(self.to_plain())
|
||||
else:
|
||||
print("\n".join(self._lines))
|
||||
|
||||
|
||||
def get_audio_path(audio_url: str, ensure_exists: bool = False) -> Optional[Path]:
|
||||
@@ -204,68 +300,3 @@ def get_sidecar_path(
|
||||
if ensure_exists and not lrc_path.exists():
|
||||
return None
|
||||
return lrc_path
|
||||
|
||||
|
||||
def to_plain(
|
||||
text: str,
|
||||
deduplicate: bool = False,
|
||||
) -> str:
|
||||
"""Convert lyrics to plain text with all tags stripped.
|
||||
|
||||
If deduplicate is True, only keep the first line of consecutive lines with the same lyric text (after stripping tags).
|
||||
Otherwise, lines with multiple time tags will be duplicated as many times as the number of tags.
|
||||
Assumes text has been normalized by normalize.
|
||||
"""
|
||||
|
||||
if not is_synced(text):
|
||||
# If there are no meaningful time tags, just strip all tags and return
|
||||
return _remove_pattern(text, _LINE_START_TAGS_RE)
|
||||
|
||||
lines = []
|
||||
for line in text.splitlines():
|
||||
pos = 0
|
||||
cnt = 0
|
||||
plain_line = ""
|
||||
while True:
|
||||
# Only match strictly repeated standard time tags at the start of the line
|
||||
# Lines without any time tags are ignored.
|
||||
# Lyric lines are considered already stripped of whitespaces, so no strips here.
|
||||
m = _STD_TAG_RE.match(line, pos)
|
||||
if not m:
|
||||
plain_line += line[pos:]
|
||||
break
|
||||
pos = m.end()
|
||||
cnt += 1
|
||||
# Also avoid dulplicating blank lines
|
||||
if deduplicate or not plain_line:
|
||||
if cnt > 0:
|
||||
lines.append(plain_line)
|
||||
else:
|
||||
for _ in range(cnt):
|
||||
lines.append(plain_line)
|
||||
|
||||
if deduplicate:
|
||||
# Remove consecutive duplicates
|
||||
deduped_lines = []
|
||||
prev_line = None
|
||||
for line in lines:
|
||||
if line != prev_line:
|
||||
deduped_lines.append(line)
|
||||
prev_line = line
|
||||
lines = deduped_lines
|
||||
|
||||
return "\n".join(lines).strip("\n")
|
||||
|
||||
|
||||
def print_lyrics(
|
||||
text: str,
|
||||
plain: bool = False,
|
||||
) -> None:
|
||||
"""Print lyrics, optionally stripping tags.
|
||||
|
||||
Assumes text has been normalized by normalize.
|
||||
"""
|
||||
if plain:
|
||||
print(to_plain(text))
|
||||
else:
|
||||
print(text)
|
||||
|
||||
+7
-2
@@ -4,10 +4,15 @@ Date: 2026-03-25 04:09:36
|
||||
Description: Data models
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
from dataclasses import dataclass
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .lrc import LRCData
|
||||
|
||||
|
||||
class CacheStatus(str, Enum):
|
||||
"""Status of a cached lyric entry."""
|
||||
@@ -54,6 +59,6 @@ class LyricResult:
|
||||
"""Result of a lyric fetch attempt, also used as cache record."""
|
||||
|
||||
status: CacheStatus
|
||||
lyrics: Optional[str] = None
|
||||
lyrics: Optional[LRCData] = None
|
||||
source: Optional[str] = None # Which fetcher produced this result
|
||||
ttl: Optional[int] = None # Hint for cache TTL (seconds)
|
||||
|
||||
Reference in New Issue
Block a user