refactor: async parallel fetching

This commit is contained in:
2026-04-03 18:39:31 +02:00
parent c91d5220bd
commit 4c5aba04b0
13 changed files with 452 additions and 256 deletions
+125 -100
View File
@@ -4,18 +4,11 @@ Date: 2026-03-25 11:09:53
Description: Core orchestrator — coordinates fetchers with cache-aware fallback
"""
"""
Fetch pipeline:
1. Check cache for each source in the fallback sequence
2. For sources without a valid cache hit, call the fetcher
3. Cache every result (success, not-found, or error) per source
4. Return the best result by confidence (highest wins)
"""
import asyncio
from typing import Optional
from loguru import logger
from .fetchers import FetcherMethodType, create_fetchers
from .fetchers import FetcherMethodType, build_plan, create_fetchers
from .fetchers.base import BaseFetcher
from .cache import CacheEngine
from .lrc import LRCData
@@ -50,6 +43,19 @@ def _is_better(new: LyricResult, old: LyricResult) -> bool:
)
def _normalize_result(result: LyricResult) -> LyricResult:
"""Normalize unsynced lyrics before returning."""
if result.status == CacheStatus.SUCCESS_UNSYNCED and result.lyrics:
return LyricResult(
status=result.status,
lyrics=result.lyrics.normalize_unsynced(),
source=result.source,
ttl=result.ttl,
confidence=result.confidence,
)
return result
class LrcManager:
"""Main entry point for fetching lyrics with caching."""
@@ -57,54 +63,18 @@ class LrcManager:
self.cache = CacheEngine(db_path=db_path)
self.fetchers = create_fetchers(self.cache)
def _build_sequence(
self, track: TrackMeta, force_method: Optional[FetcherMethodType] = None
) -> list[BaseFetcher]:
"""Determine the ordered list of fetchers to try."""
if force_method:
if force_method not in self.fetchers:
logger.error(f"Unknown method: {force_method}")
return []
return [self.fetchers[force_method]]
sequence: list[BaseFetcher] = []
for method in self.fetchers.keys():
if self.fetchers[method].is_available(track):
sequence.append(self.fetchers[method])
logger.debug(f"Fallback sequence: {[f.source_name for f in sequence]}")
return sequence
def fetch_for_track(
async def _run_group(
self,
group: list[BaseFetcher],
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
bypass_cache: bool = False,
) -> Optional[LyricResult]:
"""Fetch lyrics for *track* using the fallback pipeline.
bypass_cache: bool,
) -> list[tuple[str, LyricResult]]:
"""Run one group: cache-check first, then parallel-fetch uncached. Returns (source, result) pairs."""
cached_results: list[tuple[str, LyricResult]] = []
need_fetch: list[BaseFetcher] = []
Each source is checked against the cache independently:
- Cache hit with synced lyrics → return immediately
- Cache hit with negative status (NOT_FOUND / NETWORK_ERROR) → skip source
- Cache miss or unsynced → call fetcher, then cache the result
After all sources are tried, returns the best result found
(highest confidence wins).
"""
track = enrich_track(track)
logger.info(f"Fetching lyrics for: {track.display_name()}")
sequence = self._build_sequence(track, force_method)
if not sequence:
return None
# Best result seen so far (highest confidence wins)
best_result: Optional[LyricResult] = None
for fetcher in sequence:
for fetcher in group:
source = fetcher.source_name
# Cache check (skip for fetchers that handle their own caching)
if not bypass_cache and not fetcher.self_cached:
cached = self.cache.get(track, source)
if cached:
@@ -116,75 +86,130 @@ class LrcManager:
f"[{source}] cache hit: {cached.status.value}, skipping"
)
continue
# Positive cache hit — apply the same confidence evaluation
# as fresh fetches so that low-confidence cached results
# don't block better results from later fetchers.
is_trusted = cached.confidence >= HIGH_CONFIDENCE
logger.info(
f"[{source}] cache hit: {cached.status.value}"
f" (confidence={cached.confidence:.0f})"
)
cached_results.append((source, cached))
# Return immediately on trusted synced cache hit
if cached.status == CacheStatus.SUCCESS_SYNCED and is_trusted:
return cached
if best_result is None or _is_better(cached, best_result):
best_result = cached
return cached_results
continue
elif not fetcher.self_cached:
logger.debug(f"[{source}] cache bypassed")
need_fetch.append(fetcher)
# Fetch
logger.debug(f"[{source}] calling fetcher...")
result = fetcher.fetch(track, bypass_cache=bypass_cache)
if need_fetch:
task_map: dict[asyncio.Task, BaseFetcher] = {
asyncio.create_task(f.fetch(track, bypass_cache=bypass_cache)): f
for f in need_fetch
}
pending = set(task_map)
if not result:
logger.debug(f"[{source}] returned None (no result)")
continue
# Cache the result (skip for self-cached fetchers)
if not fetcher.self_cached and not bypass_cache:
ttl = result.ttl or _STATUS_TTL.get(result.status, TTL_NOT_FOUND)
self.cache.set(track, source, result, ttl_seconds=ttl)
# Evaluate result
if result.status in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
is_trusted = result.confidence >= HIGH_CONFIDENCE
logger.info(
f"[{source}] got {result.status.value} lyrics"
f" (confidence={result.confidence:.0f})"
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
found_trusted = False
for task in done:
fetcher = task_map[task]
source = fetcher.source_name
try:
result = task.result()
except Exception as e:
logger.error(f"[{source}] fetch raised: {e}")
continue
if result is None:
logger.debug(f"[{source}] returned None")
continue
if not fetcher.self_cached and not bypass_cache:
ttl = result.ttl or _STATUS_TTL.get(
result.status, TTL_NOT_FOUND
)
self.cache.set(track, source, result, ttl_seconds=ttl)
if result.status in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
logger.info(
f"[{source}] got {result.status.value} lyrics"
f" (confidence={result.confidence:.0f})"
)
cached_results.append((source, result))
if (
result.status == CacheStatus.SUCCESS_SYNCED
and result.confidence >= HIGH_CONFIDENCE
):
found_trusted = True
if found_trusted:
for t in pending:
t.cancel()
await asyncio.gather(*pending, return_exceptions=True)
break
return cached_results
async def _fetch_for_track(
self,
track: TrackMeta,
force_method: Optional[FetcherMethodType],
bypass_cache: bool,
) -> Optional[LyricResult]:
track = enrich_track(track)
logger.info(f"Fetching lyrics for: {track.display_name()}")
plan = build_plan(self.fetchers, track, force_method)
if not plan:
return None
best_result: Optional[LyricResult] = None
for group in plan:
group_results = await self._run_group(group, track, bypass_cache)
for source, result in group_results:
if result.status not in (
CacheStatus.SUCCESS_SYNCED,
CacheStatus.SUCCESS_UNSYNCED,
):
continue
is_trusted = result.confidence >= HIGH_CONFIDENCE
# Trusted synced → return immediately
if result.status == CacheStatus.SUCCESS_SYNCED and is_trusted:
return result
# Track best result by confidence
logger.info(
f"Returning {result.status.value} lyrics from {source}"
f" (confidence={result.confidence:.0f})"
)
return _normalize_result(result)
if best_result is None or _is_better(result, best_result):
best_result = result
# NOT_FOUND / NETWORK_ERROR: already cached, try next
# Return best available
if best_result:
if (
best_result.status == CacheStatus.SUCCESS_UNSYNCED
and best_result.lyrics
):
best_result = LyricResult(
status=best_result.status,
lyrics=best_result.lyrics.normalize_unsynced(),
source=best_result.source,
ttl=best_result.ttl,
confidence=best_result.confidence,
)
logger.info(
f"Returning {best_result.status.value} lyrics from {best_result.source}"
)
else:
logger.info(f"No lyrics found for {track.display_name()}")
return _normalize_result(best_result)
return best_result
logger.info(f"No lyrics found for {track.display_name()}")
return None
def fetch_for_track(
self,
track: TrackMeta,
force_method: Optional[FetcherMethodType] = None,
bypass_cache: bool = False,
) -> Optional[LyricResult]:
"""Fetch lyrics for *track* using the group-based parallel pipeline."""
return asyncio.run(self._fetch_for_track(track, force_method, bypass_cache))
def manual_insert(
self,