From 45d06efd0109b3656a97da36117d8c04842d14b3 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 6 Jun 2026 23:57:35 +0000 Subject: [PATCH] Fix truncated results for chunked (partial) command responses The server streams large API command results (such as playlist tracks for big playlists) as multiple SuccessResultMessages: every 500 items is sent with partial=True, followed by a final message with partial=False. The client ignored the `partial` flag and resolved the command on the first message, so callers only ever received the first 500 items. Accumulate partial result chunks (keyed by message_id) in both the listening and non-listening code paths, and only return once the final (non-partial) message arrives. Also fix get_playlist_tracks: it accepted a `page` argument that the server endpoint does not use (the server returns the whole listing in one call, streamed in chunks). Replace it with the `force_refresh` argument that the server actually supports. --- music_assistant_client/client.py | 27 +++++++++++++++++++++++++-- music_assistant_client/music.py | 10 +++++++--- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/music_assistant_client/client.py b/music_assistant_client/client.py index 26da371..0232cf4 100644 --- a/music_assistant_client/client.py +++ b/music_assistant_client/client.py @@ -74,6 +74,8 @@ def __init__( self.connection = WebsocketsConnection(server_url, aiohttp_session, token, ssl_context) self.logger = logging.getLogger(__package__) self._result_futures: dict[str | int, asyncio.Future[Any]] = {} + # buffer for chunked/streamed (partial) command results, keyed by message_id + self._partial_results: dict[str | int, list[Any]] = {} self._subscribers: list[EventSubscriptionType] = [] self._stop_called: bool = False self._loop: asyncio.AbstractEventLoop | None = None @@ -312,6 +314,7 @@ async def send_command( if not self._listening: await self.connection.send_message(command_message.to_dict()) # Read messages until we get the response for our command + partial_result: list[Any] | None = None while True: raw = await self.connection.receive_message() response = parse_message(raw) @@ -321,6 +324,15 @@ async def send_command( if response.message_id != command_message.message_id: continue if isinstance(response, SuccessResultMessage): + if response.partial: + # chunked/streamed result: accumulate and wait for the final message + if partial_result is None: + partial_result = [] + partial_result.extend(response.result) + continue + if partial_result is not None: + partial_result.extend(response.result) + return partial_result return response.result if isinstance(response, ErrorResultMessage): exc = ERROR_MAP[response.error_code] @@ -404,6 +416,7 @@ async def disconnect(self) -> None: # cancel all command-tasks awaiting a result for future in self._result_futures.values(): future.cancel() + self._partial_results.clear() await self.connection.disconnect() def _handle_incoming_message(self, raw: dict[str, Any]) -> None: @@ -418,12 +431,22 @@ def _handle_incoming_message(self, raw: dict[str, Any]) -> None: future = self._result_futures.get(msg.message_id) if future is None: - # no listener for this result + # no listener for this result, discard any buffered partial result + self._partial_results.pop(msg.message_id, None) return if isinstance(msg, SuccessResultMessage): - future.set_result(msg.result) + if msg.partial: + # chunked/streamed result: accumulate and wait for the final message + self._partial_results.setdefault(msg.message_id, []).extend(msg.result) + return + if (partial := self._partial_results.pop(msg.message_id, None)) is not None: + partial.extend(msg.result) + future.set_result(partial) + else: + future.set_result(msg.result) return if isinstance(msg, ErrorResultMessage): + self._partial_results.pop(msg.message_id, None) exc = ERROR_MAP[msg.error_code] future.set_exception(exc(msg.details)) return diff --git a/music_assistant_client/music.py b/music_assistant_client/music.py index 94a2d15..bc86586 100644 --- a/music_assistant_client/music.py +++ b/music_assistant_client/music.py @@ -347,16 +347,20 @@ async def get_playlist_tracks( self, item_id: str, provider_instance_id_or_domain: str, - page: int = 0, + force_refresh: bool = False, ) -> list[Track]: - """Get tracks for given playlist.""" + """ + Get all tracks for given playlist. + + :param force_refresh: Force a refresh of the (cached) playlist tracks. + """ return [ Track.from_dict(obj) for obj in await self.client.send_command( "music/playlists/playlist_tracks", item_id=item_id, provider_instance_id_or_domain=provider_instance_id_or_domain, - page=page, + force_refresh=force_refresh, ) ]