diff --git a/jukebox/domain/entities/__init__.py b/jukebox/domain/entities/__init__.py index ab41acaa..4ca76fbf 100644 --- a/jukebox/domain/entities/__init__.py +++ b/jukebox/domain/entities/__init__.py @@ -3,13 +3,14 @@ from .disc import Disc, DiscMetadata, DiscOption from .library import Library from .playback_action import PlaybackAction -from .playback_session import PlaybackSession +from .playback_session import PlaybackCommandRetry, PlaybackSession from .tag_event import TagEvent __all__ = [ "CurrentTagAction", "CurrentTagStatus", "PlaybackAction", + "PlaybackCommandRetry", "PlaybackSession", "TagEvent", "Library", diff --git a/jukebox/domain/entities/playback_session.py b/jukebox/domain/entities/playback_session.py index d6f7ee43..bc220b69 100644 --- a/jukebox/domain/entities/playback_session.py +++ b/jukebox/domain/entities/playback_session.py @@ -1,4 +1,32 @@ -from pydantic import BaseModel +from typing import Self + +from pydantic import BaseModel, model_validator + +from .playback_action import PlaybackAction + + +class PlaybackCommandRetry(BaseModel): + """Tracks retry timing for a failed playback command.""" + + action: PlaybackAction + tag_id: str | None = None + first_failed_at: float + last_failed_at: float + attempt_count: int + next_retry_at: float | None + exhausted: bool = False + + @model_validator(mode="after") + def validate_tag_id_matches_action(self) -> Self: + if self.action == PlaybackAction.PLAY: + if self.tag_id is None: + raise ValueError("tag_id is required for PLAY retry") + elif self.tag_id is not None: + raise ValueError("tag_id is only valid for PLAY retry") + return self + + def matches(self, *, action: PlaybackAction, tag_id: str | None = None) -> bool: + return self.action == action and self.tag_id == tag_id class PlaybackSession(BaseModel): @@ -15,3 +43,6 @@ class PlaybackSession(BaseModel): # Timestamp last_event_timestamp: float | None = None + + # Playback command retry state + playback_command_retry: PlaybackCommandRetry | None = None diff --git a/jukebox/domain/use_cases/handle_tag_event.py b/jukebox/domain/use_cases/handle_tag_event.py index b6db6954..63814d1f 100644 --- a/jukebox/domain/use_cases/handle_tag_event.py +++ b/jukebox/domain/use_cases/handle_tag_event.py @@ -1,7 +1,7 @@ import logging -from contextlib import contextmanager +from collections.abc import Callable -from jukebox.domain.entities import CurrentTagAction, PlaybackAction, PlaybackSession, TagEvent +from jukebox.domain.entities import CurrentTagAction, PlaybackAction, PlaybackCommandRetry, PlaybackSession, TagEvent from jukebox.domain.errors import PlaybackError from jukebox.domain.ports import PlayerPort from jukebox.domain.repositories import CurrentTagRepository, LibraryRepository @@ -9,6 +9,7 @@ from jukebox.domain.use_cases.determine_current_tag_action import DetermineCurrentTagAction LOGGER = logging.getLogger("jukebox") +PLAYBACK_RETRY_DELAYS_SECONDS = (0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0) class HandleTagEvent: @@ -21,12 +22,16 @@ def __init__( current_tag_repository: CurrentTagRepository, determine_action: DetermineAction, determine_current_tag_action: DetermineCurrentTagAction, + retry_delays_seconds: tuple[float, ...] = PLAYBACK_RETRY_DELAYS_SECONDS, ): self.player = player self.library = library self.current_tag_repository = current_tag_repository self.determine_action = determine_action self.determine_current_tag_action = determine_current_tag_action + if not retry_delays_seconds: + raise ValueError("retry_delays_seconds must not be empty") + self.retry_delays_seconds = retry_delays_seconds def execute(self, tag_event: TagEvent, session: PlaybackSession) -> PlaybackSession: self._apply_current_tag_action_best_effort(tag_event, session) @@ -45,10 +50,20 @@ def execute(self, tag_event: TagEvent, session: PlaybackSession) -> PlaybackSess case PlaybackAction.CONTINUE: # Reset when tag is present session.playing_tag_removed_at = None + if ( + session.playback_command_retry is not None + and session.playback_command_retry.action == PlaybackAction.PAUSE + ): + session.playback_command_retry = None case PlaybackAction.RESUME: - with suppress_playback_error("Playback operation `RESUME` failed; stopping session update"): - self.player.resume() + if self._run_playback_command( + action=PlaybackAction.RESUME, + timestamp=tag_event.timestamp, + session=session, + error_message="Playback operation `RESUME` failed; stopping session update", + command=self.player.resume, + ): session.paused_at = None session.playing_tag_removed_at = None @@ -58,14 +73,21 @@ def execute(self, tag_event: TagEvent, session: PlaybackSession) -> PlaybackSess disc = self.library.get_disc(tag_event.tag_id) if tag_event.tag_id is not None else None if disc is not None: LOGGER.info("Found corresponding disc: %s", disc) - with suppress_playback_error( - f"Playback operation `PLAY` failed for tag_id='{tag_event.tag_id}'; stopping session update" + if self._run_playback_command( + action=PlaybackAction.PLAY, + tag_id=tag_event.tag_id, + timestamp=tag_event.timestamp, + session=session, + error_message=( + f"Playback operation `PLAY` failed for tag_id='{tag_event.tag_id}'; stopping session update" + ), + command=lambda: self.player.play(disc.uri, disc.option.shuffle), ): - self.player.play(disc.uri, disc.option.shuffle) session.playing_tag = tag_event.tag_id session.paused_at = None session.playing_tag_removed_at = None else: + session.playback_command_retry = None LOGGER.warning("No disc found for UID: %s", tag_event.tag_id) case PlaybackAction.WAITING: @@ -76,16 +98,26 @@ def execute(self, tag_event: TagEvent, session: PlaybackSession) -> PlaybackSess LOGGER.debug("Grace period: %.3fs / %gs", grace_period_elapsed, self.determine_action.pause_delay) case PlaybackAction.PAUSE: - with suppress_playback_error("Playback operation `PAUSE` failed; continuing session update"): - self.player.pause() - session.paused_at = tag_event.timestamp + if self._run_playback_command( + action=PlaybackAction.PAUSE, + timestamp=tag_event.timestamp, + session=session, + error_message="Playback operation `PAUSE` failed; stopping session update", + command=self.player.pause, + ): + session.paused_at = tag_event.timestamp case PlaybackAction.STOP: - with suppress_playback_error("Playback operation `STOP` failed; continuing session update"): - self.player.stop() - session.playing_tag = None - session.paused_at = None - session.playing_tag_removed_at = None + if self._run_playback_command( + action=PlaybackAction.STOP, + timestamp=tag_event.timestamp, + session=session, + error_message="Playback operation `STOP` failed; stopping session update", + command=self.player.stop, + ): + session.playing_tag = None + session.paused_at = None + session.playing_tag_removed_at = None case PlaybackAction.IDLE: pass @@ -136,10 +168,86 @@ def _apply_current_tag_action( case CurrentTagAction.KEEP: pass # No state changed + def _run_playback_command( + self, + *, + action: PlaybackAction, + timestamp: float, + session: PlaybackSession, + error_message: str, + command: Callable[[], None], + tag_id: str | None = None, + ) -> bool: + retry = session.playback_command_retry + retry_matches = retry is not None and retry.matches(action=action, tag_id=tag_id) + if retry_matches and retry.exhausted: + LOGGER.debug( + "Skipping playback operation `%s`; retry exhausted after %d attempts", + action.value.upper(), + retry.attempt_count, + ) + return False -@contextmanager -def suppress_playback_error(msg: str): - try: - yield - except PlaybackError: - LOGGER.warning(msg) + if retry_matches and retry.next_retry_at is not None and timestamp < retry.next_retry_at: + LOGGER.debug( + "Skipping playback operation `%s` until retry time %.3f", + action.value.upper(), + retry.next_retry_at, + ) + return False + + try: + command() + except PlaybackError: + retry = self._record_playback_command_failure( + action=action, + timestamp=timestamp, + session=session, + tag_id=tag_id, + ) + if retry.exhausted: + LOGGER.warning("%s; retry exhausted after %d attempts", error_message, retry.attempt_count) + else: + next_retry_at = retry.next_retry_at + if next_retry_at is None: + raise RuntimeError("retry is not exhausted but next_retry_at is missing") + LOGGER.warning("%s; retrying in %.3fs", error_message, next_retry_at - timestamp) + return False + + session.playback_command_retry = None + return True + + def _record_playback_command_failure( + self, + *, + action: PlaybackAction, + timestamp: float, + session: PlaybackSession, + tag_id: str | None = None, + ) -> PlaybackCommandRetry: + existing_retry = session.playback_command_retry + if existing_retry is None or not existing_retry.matches(action=action, tag_id=tag_id): + attempt_count = 1 + first_failed_at = timestamp + else: + attempt_count = existing_retry.attempt_count + 1 + first_failed_at = existing_retry.first_failed_at + + retry_delay = self._retry_delay_for_attempt(attempt_count) + retry = PlaybackCommandRetry( + action=action, + tag_id=tag_id, + first_failed_at=first_failed_at, + last_failed_at=timestamp, + attempt_count=attempt_count, + next_retry_at=None if retry_delay is None else timestamp + retry_delay, + exhausted=retry_delay is None, + ) + session.playback_command_retry = retry + return retry + + def _retry_delay_for_attempt(self, attempt_count: int) -> float | None: + delay_index = attempt_count - 1 + if delay_index >= len(self.retry_delays_seconds): + return None + return self.retry_delays_seconds[delay_index] diff --git a/tests/jukebox/domain/entities/test_playback_session.py b/tests/jukebox/domain/entities/test_playback_session.py new file mode 100644 index 00000000..746cc63e --- /dev/null +++ b/tests/jukebox/domain/entities/test_playback_session.py @@ -0,0 +1,33 @@ +import pytest +from pydantic import ValidationError + +from jukebox.domain.entities import PlaybackAction, PlaybackCommandRetry + + +def make_retry(action: PlaybackAction, tag_id: str | None = None) -> PlaybackCommandRetry: + return PlaybackCommandRetry( + action=action, + tag_id=tag_id, + first_failed_at=100.0, + last_failed_at=100.0, + attempt_count=1, + next_retry_at=100.1, + ) + + +def test_playback_command_retry_requires_tag_id_for_play(): + with pytest.raises(ValidationError, match="tag_id is required for PLAY retry"): + make_retry(PlaybackAction.PLAY) + + +def test_playback_command_retry_rejects_tag_id_for_non_play(): + with pytest.raises(ValidationError, match="tag_id is only valid for PLAY retry"): + make_retry(PlaybackAction.PAUSE, tag_id="test-tag") + + +def test_playback_command_retry_matches_action_and_tag_id(): + retry = make_retry(PlaybackAction.PLAY, tag_id="test-tag") + + assert retry.matches(action=PlaybackAction.PLAY, tag_id="test-tag") is True + assert retry.matches(action=PlaybackAction.PLAY, tag_id="other-tag") is False + assert retry.matches(action=PlaybackAction.PAUSE, tag_id="test-tag") is False diff --git a/tests/jukebox/domain/use_cases/test_handle_tag_event.py b/tests/jukebox/domain/use_cases/test_handle_tag_event.py index b75a9215..3ea2ad82 100644 --- a/tests/jukebox/domain/use_cases/test_handle_tag_event.py +++ b/tests/jukebox/domain/use_cases/test_handle_tag_event.py @@ -2,11 +2,19 @@ import pytest -from jukebox.domain.entities import CurrentTagAction, Disc, DiscMetadata, DiscOption, PlaybackSession, TagEvent +from jukebox.domain.entities import ( + CurrentTagAction, + Disc, + DiscMetadata, + DiscOption, + PlaybackAction, + PlaybackSession, + TagEvent, +) from jukebox.domain.errors import PlaybackError from jukebox.domain.use_cases.determine_action import DetermineAction from jukebox.domain.use_cases.determine_current_tag_action import DetermineCurrentTagAction -from jukebox.domain.use_cases.handle_tag_event import HandleTagEvent, suppress_playback_error +from jukebox.domain.use_cases.handle_tag_event import HandleTagEvent @pytest.fixture @@ -462,6 +470,74 @@ def test_handle_play_action_does_not_update_session_when_player_raises(handle_ta mock_player.play.assert_called_once() assert new_session.playing_tag is None + assert new_session.playback_command_retry is not None + assert new_session.playback_command_retry.action == PlaybackAction.PLAY + assert new_session.playback_command_retry.tag_id == "test-tag" + + +def test_handle_play_failure_does_not_throttle_different_tag(handle_tag_event, mock_player): + mock_player.play.side_effect = [PlaybackError("bad uri"), None] + session = PlaybackSession() + + session = handle_tag_event.execute(TagEvent(tag_id="tag-a", timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id="tag-b", timestamp=100.2), session) + + assert mock_player.play.call_count == 2 + assert session.playing_tag == "tag-b" + assert session.playback_command_retry is None + + +def test_handle_play_failure_keeps_retry_after_brief_missed_read(handle_tag_event, mock_player): + mock_player.play.side_effect = PlaybackError("bad uri") + session = PlaybackSession() + + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.04), session) + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=100.05), session) + + mock_player.play.assert_called_once() + assert session.playback_command_retry is not None + assert session.playback_command_retry.action == PlaybackAction.PLAY + assert session.playback_command_retry.tag_id == "test-tag" + assert session.playback_command_retry.next_retry_at == pytest.approx(100.1) + + +def test_handle_play_failure_gives_up_after_retry_delays_are_exhausted(handle_tag_event, mock_player): + handle_tag_event.retry_delays_seconds = (0.5,) + mock_player.play.side_effect = PlaybackError("bad uri") + session = PlaybackSession() + + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=100.5), session) + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=101.0), session) + + assert mock_player.play.call_count == 2 + assert session.playback_command_retry is not None + assert session.playback_command_retry.action == PlaybackAction.PLAY + assert session.playback_command_retry.tag_id == "test-tag" + assert session.playback_command_retry.attempt_count == 2 + assert session.playback_command_retry.exhausted is True + assert session.playback_command_retry.next_retry_at is None + + +def test_handle_play_failure_clears_retry_when_unknown_tag_is_read(handle_tag_event, mock_player, mock_library): + known_disc = Disc(uri="uri:tag-a", metadata=DiscMetadata(), option=DiscOption(shuffle=False)) + mock_library.get_disc.side_effect = lambda tag_id: known_disc if tag_id == "tag-a" else None + mock_player.play.side_effect = PlaybackError("bad uri") + session = PlaybackSession() + + session = handle_tag_event.execute(TagEvent(tag_id="tag-a", timestamp=100.0), session) + assert session.playback_command_retry is not None + assert session.playback_command_retry.tag_id == "tag-a" + + session = handle_tag_event.execute(TagEvent(tag_id="unknown-tag", timestamp=100.2), session) + assert session.playback_command_retry is None + + session = handle_tag_event.execute(TagEvent(tag_id="tag-a", timestamp=100.3), session) + + assert mock_player.play.call_count == 2 + assert session.playback_command_retry is not None + assert session.playback_command_retry.tag_id == "tag-a" def test_handle_resume_action_does_not_update_session_when_player_raises(handle_tag_event, mock_player): @@ -475,9 +551,27 @@ def test_handle_resume_action_does_not_update_session_when_player_raises(handle_ mock_player.resume.assert_called_once() assert new_session.paused_at == 60.0 + assert new_session.playback_command_retry is not None + assert new_session.playback_command_retry.action == PlaybackAction.RESUME + assert new_session.playback_command_retry.tag_id is None + + +def test_handle_resume_failure_keeps_retry_after_brief_missed_read(handle_tag_event, mock_player): + mock_player.resume.side_effect = PlaybackError("cannot resume") + session = PlaybackSession(playing_tag="test-tag", paused_at=60.0) + + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.04), session) + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=100.05), session) + + mock_player.resume.assert_called_once() + assert session.playback_command_retry is not None + assert session.playback_command_retry.action == PlaybackAction.RESUME + assert session.playback_command_retry.tag_id is None + assert session.playback_command_retry.next_retry_at == pytest.approx(100.1) -def test_handle_pause_action_updates_session_even_when_player_raises(handle_tag_event, mock_player): +def test_handle_pause_action_does_not_update_session_when_player_raises(handle_tag_event, mock_player): mock_player.pause.side_effect = PlaybackError("cannot pause") session = PlaybackSession() session.playing_tag = "test-tag" @@ -487,10 +581,103 @@ def test_handle_pause_action_updates_session_even_when_player_raises(handle_tag_ new_session = handle_tag_event.execute(tag_event, session) mock_player.pause.assert_called_once() - assert new_session.paused_at == 100.0 + assert new_session.paused_at is None + assert new_session.playing_tag == "test-tag" + assert new_session.playing_tag_removed_at == 96.9 + assert new_session.playback_command_retry is not None + assert new_session.playback_command_retry.action == PlaybackAction.PAUSE + assert new_session.playback_command_retry.tag_id is None + assert new_session.playback_command_retry.attempt_count == 1 + assert new_session.playback_command_retry.next_retry_at == pytest.approx(100.1) + + +def test_handle_pause_failure_does_not_retry_before_backoff_expires(handle_tag_event, mock_player): + mock_player.pause.side_effect = PlaybackError("cannot pause") + session = PlaybackSession(playing_tag="test-tag", playing_tag_removed_at=96.9) + + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.05), session) + + mock_player.pause.assert_called_once() + assert session.paused_at is None + assert session.playback_command_retry is not None + assert session.playback_command_retry.attempt_count == 1 + +def test_handle_pause_failure_retries_after_backoff_and_updates_session_on_success(handle_tag_event, mock_player): + mock_player.pause.side_effect = [PlaybackError("cannot pause"), None] + session = PlaybackSession(playing_tag="test-tag", playing_tag_removed_at=96.9) -def test_handle_stop_action_updates_session_even_when_player_raises(handle_tag_event, mock_player): + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.1), session) + + assert mock_player.pause.call_count == 2 + assert session.paused_at == 100.1 + assert session.playback_command_retry is None + + +def test_handle_pause_failure_uses_next_backoff_after_retry_fails(handle_tag_event, mock_player): + mock_player.pause.side_effect = PlaybackError("cannot pause") + session = PlaybackSession(playing_tag="test-tag", playing_tag_removed_at=96.9) + + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.1), session) + + assert mock_player.pause.call_count == 2 + assert session.playback_command_retry is not None + assert session.playback_command_retry.attempt_count == 2 + assert session.playback_command_retry.next_retry_at == pytest.approx(100.35) + + +def test_handle_pause_failure_gives_up_after_retry_delays_are_exhausted(handle_tag_event, mock_player): + handle_tag_event.retry_delays_seconds = (0.5,) + mock_player.pause.side_effect = PlaybackError("cannot pause") + session = PlaybackSession(playing_tag="test-tag", playing_tag_removed_at=96.9) + + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.5), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=200.0), session) + + assert mock_player.pause.call_count == 2 + mock_player.stop.assert_not_called() + assert session.paused_at is None + assert session.playback_command_retry is not None + assert session.playback_command_retry.action == PlaybackAction.PAUSE + assert session.playback_command_retry.attempt_count == 2 + assert session.playback_command_retry.exhausted is True + assert session.playback_command_retry.next_retry_at is None + + +def test_persistent_pause_failure_keeps_retrying_pause_after_pause_duration(handle_tag_event, mock_player): + mock_player.pause.side_effect = PlaybackError("cannot pause") + session = PlaybackSession(playing_tag="test-tag", playing_tag_removed_at=96.9) + + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.5), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=1000.0), session) + + assert mock_player.pause.call_count == 3 + mock_player.stop.assert_not_called() + assert session.paused_at is None + assert session.playback_command_retry is not None + assert session.playback_command_retry.action == PlaybackAction.PAUSE + assert session.playback_command_retry.attempt_count == 3 + assert session.playback_command_retry.next_retry_at == pytest.approx(1000.5) + + +def test_handle_pause_failure_clears_retry_when_action_changes(handle_tag_event, mock_player): + mock_player.pause.side_effect = PlaybackError("cannot pause") + session = PlaybackSession(playing_tag="test-tag", playing_tag_removed_at=96.9) + + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id="test-tag", timestamp=100.2), session) + + mock_player.pause.assert_called_once() + assert session.playback_command_retry is None + assert session.playing_tag_removed_at is None + + +def test_handle_stop_action_does_not_update_session_when_player_raises(handle_tag_event, mock_player): mock_player.stop.side_effect = PlaybackError("cannot stop") session = PlaybackSession() session.playing_tag = "test-tag" @@ -500,31 +687,22 @@ def test_handle_stop_action_updates_session_even_when_player_raises(handle_tag_e new_session = handle_tag_event.execute(tag_event, session) mock_player.stop.assert_called_once() - assert new_session.playing_tag is None - assert new_session.paused_at is None - - -def test_suppress_playback_error_suppresses_playback_error(): - with suppress_playback_error("msg"): - raise PlaybackError("boom") - - -def test_suppress_playback_error_logs_warning(caplog): - with suppress_playback_error("something went wrong"), caplog.at_level("WARNING"): - raise PlaybackError("boom") - - assert "something went wrong" in caplog.text - assert any(r.levelname == "WARNING" for r in caplog.records) - + assert new_session.playing_tag == "test-tag" + assert new_session.paused_at == 49.0 + assert new_session.playback_command_retry is not None + assert new_session.playback_command_retry.action == PlaybackAction.STOP + assert new_session.playback_command_retry.tag_id is None -def test_suppress_playback_error_does_not_suppress_other_exceptions(): - with pytest.raises(RuntimeError), suppress_playback_error("msg"): - raise RuntimeError("not a playback error") +def test_handle_stop_failure_does_not_retry_before_backoff_expires(handle_tag_event, mock_player): + mock_player.stop.side_effect = PlaybackError("cannot stop") + session = PlaybackSession(playing_tag="test-tag", paused_at=49.0) -def test_suppress_playback_error_runs_body_when_no_error(): - executed = False - with suppress_playback_error("msg"): - executed = True + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.0), session) + session = handle_tag_event.execute(TagEvent(tag_id=None, timestamp=100.05), session) - assert executed is True + mock_player.stop.assert_called_once() + assert session.playing_tag == "test-tag" + assert session.paused_at == 49.0 + assert session.playback_command_retry is not None + assert session.playback_command_retry.attempt_count == 1