Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions music_assistant_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def __init__(
self.connection = WebsocketsConnection(server_url, aiohttp_session, token, ssl_context)
self.logger = logging.getLogger(__package__)
self._result_futures: dict[str | int, asyncio.Future[Any]] = {}
# buffer for chunked/streamed (partial) command results, keyed by message_id
self._partial_results: dict[str | int, list[Any]] = {}
self._subscribers: list[EventSubscriptionType] = []
self._stop_called: bool = False
self._loop: asyncio.AbstractEventLoop | None = None
Expand Down Expand Up @@ -312,6 +314,7 @@ async def send_command(
if not self._listening:
await self.connection.send_message(command_message.to_dict())
# Read messages until we get the response for our command
partial_result: list[Any] | None = None
while True:
raw = await self.connection.receive_message()
response = parse_message(raw)
Expand All @@ -321,6 +324,15 @@ async def send_command(
if response.message_id != command_message.message_id:
continue
if isinstance(response, SuccessResultMessage):
if response.partial:
# chunked/streamed result: accumulate and wait for the final message
if partial_result is None:
partial_result = []
partial_result.extend(response.result)
continue
if partial_result is not None:
partial_result.extend(response.result)
return partial_result
return response.result
if isinstance(response, ErrorResultMessage):
exc = ERROR_MAP[response.error_code]
Expand Down Expand Up @@ -404,6 +416,7 @@ async def disconnect(self) -> None:
# cancel all command-tasks awaiting a result
for future in self._result_futures.values():
future.cancel()
self._partial_results.clear()
await self.connection.disconnect()

def _handle_incoming_message(self, raw: dict[str, Any]) -> None:
Expand All @@ -418,12 +431,22 @@ def _handle_incoming_message(self, raw: dict[str, Any]) -> None:
future = self._result_futures.get(msg.message_id)

if future is None:
# no listener for this result
# no listener for this result, discard any buffered partial result
self._partial_results.pop(msg.message_id, None)
return
if isinstance(msg, SuccessResultMessage):
future.set_result(msg.result)
if msg.partial:
# chunked/streamed result: accumulate and wait for the final message
self._partial_results.setdefault(msg.message_id, []).extend(msg.result)
return
if (partial := self._partial_results.pop(msg.message_id, None)) is not None:
partial.extend(msg.result)
future.set_result(partial)
else:
future.set_result(msg.result)
return
if isinstance(msg, ErrorResultMessage):
self._partial_results.pop(msg.message_id, None)
exc = ERROR_MAP[msg.error_code]
future.set_exception(exc(msg.details))
return
Expand Down
10 changes: 7 additions & 3 deletions music_assistant_client/music.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,20 @@ async def get_playlist_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
page: int = 0,
force_refresh: bool = False,
) -> list[Track]:
"""Get tracks for given playlist."""
"""
Get all tracks for given playlist.

:param force_refresh: Force a refresh of the (cached) playlist tracks.
"""
return [
Track.from_dict(obj)
for obj in await self.client.send_command(
"music/playlists/playlist_tracks",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
page=page,
force_refresh=force_refresh,
)
]

Expand Down