Files
lrx-cli/src/lrx_cli/watch/fetcher.py
T

90 lines
3.4 KiB
Python

"""
Author: Uyanide pywang0608@foxmail.com
Date: 2026-04-10 08:14:41
Description: Debounced lyric fetch orchestration for watch session.
"""
from __future__ import annotations
import asyncio
from typing import Awaitable, Callable, Optional
from ..lrc import LRCData
from ..models import TrackMeta
class LyricFetcher:
"""Debounces track updates and runs at most one lyric fetch task at a time."""
_watch_debounce_ms: int
_fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]]
_on_fetching: Callable[[], Awaitable[None] | None]
_on_result: Callable[[Optional[LRCData]], Awaitable[None] | None]
_debounce_task: asyncio.Task | None
_fetch_task: asyncio.Task | None
_pending_track: TrackMeta | None
def __init__(
self,
fetch_func: Callable[[TrackMeta], Awaitable[Optional[LRCData]]],
on_fetching: Callable[[], Awaitable[None] | None],
on_result: Callable[[Optional[LRCData]], Awaitable[None] | None],
watch_debounce_ms: int,
) -> None:
"""Initialize fetch callbacks and runtime options."""
self._watch_debounce_ms = watch_debounce_ms
self._fetch_func = fetch_func
self._on_fetching = on_fetching
self._on_result = on_result
self._debounce_task: asyncio.Task | None = None
self._fetch_task: asyncio.Task | None = None
self._pending_track: TrackMeta | None = None
async def stop(self) -> None:
"""Cancel and await all in-flight debounce/fetch tasks."""
for task in (self._debounce_task, self._fetch_task):
if task is not None:
task.cancel()
await asyncio.gather(
*[t for t in (self._debounce_task, self._fetch_task) if t is not None],
return_exceptions=True,
)
self._debounce_task = None
self._fetch_task = None
def request(self, track: TrackMeta) -> None:
"""Request lyrics for track with debounce collapsing."""
self._pending_track = track
if self._debounce_task is not None:
# cancel any pending debounce window — the new request supersedes it
self._debounce_task.cancel()
self._debounce_task = asyncio.create_task(self._debounce_then_fetch())
async def _debounce_then_fetch(self) -> None:
"""Wait debounce window then start a fresh fetch task for latest pending track."""
await asyncio.sleep(self._watch_debounce_ms / 1000.0)
track = self._pending_track
if track is None:
return
if self._fetch_task is not None:
# abort any in-flight fetch for a previous track before starting the new one
self._fetch_task.cancel()
await asyncio.gather(self._fetch_task, return_exceptions=True)
self._fetch_task = asyncio.create_task(self._do_fetch(track))
async def _do_fetch(self, track: TrackMeta) -> None:
"""Execute fetch lifecycle callbacks and fetch lyrics for a track."""
# callbacks may be plain functions or coroutines — handle both
fetching_callback_result = self._on_fetching()
if asyncio.iscoroutine(fetching_callback_result):
await fetching_callback_result
lyrics = await self._fetch_func(track)
result_callback_result = self._on_result(lyrics)
if asyncio.iscoroutine(result_callback_result):
await result_callback_result