Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ async def handle_agent_response(
await self.update_message_thread(delta)

# Remove the agent from the active speakers list.
self._active_speakers.remove(message.name)
if message.name in self._active_speakers:
self._active_speakers.remove(message.name)
if len(self._active_speakers) > 0:
# If there are still active speakers, return without doing anything.
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ async def _process_next(self) -> None:
_warn_if_none(temp_message, "on_send")
except BaseException as e:
future.set_exception(e)
self._message_queue.task_done()
return
if temp_message is DropMessage or isinstance(temp_message, DropMessage):
event_logger.info(
Expand All @@ -718,6 +719,7 @@ async def _process_next(self) -> None:
)
)
future.set_exception(MessageDroppedException())
self._message_queue.task_done()
return

message_envelope.message = temp_message
Expand Down Expand Up @@ -747,6 +749,7 @@ async def _process_next(self) -> None:
except BaseException as e:
# TODO: we should raise the intervention exception to the publisher.
logger.error(f"Exception raised in in intervention handler: {e}", exc_info=True)
self._message_queue.task_done()
return
if temp_message is DropMessage or isinstance(temp_message, DropMessage):
event_logger.info(
Expand All @@ -757,6 +760,7 @@ async def _process_next(self) -> None:
kind=MessageKind.PUBLISH,
)
)
self._message_queue.task_done()
return

message_envelope.message = temp_message
Expand All @@ -773,6 +777,7 @@ async def _process_next(self) -> None:
except BaseException as e:
# TODO: should we raise the exception to sender of the response instead?
future.set_exception(e)
self._message_queue.task_done()
return
if temp_message is DropMessage or isinstance(temp_message, DropMessage):
event_logger.info(
Expand All @@ -784,6 +789,7 @@ async def _process_next(self) -> None:
)
)
future.set_exception(MessageDroppedException())
self._message_queue.task_done()
return
message_envelope.message = temp_message
task = asyncio.create_task(self._process_response(message_envelope))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,25 @@ async def get_messages(self) -> List[LLMMessage]:
"""Get at most `token_limit` tokens in recent messages. If the token limit is not
provided, then return as many messages as the remaining token allowed by the model client."""
messages = list(self._messages)
trimmed = False
if self._token_limit is None:
remaining_tokens = self._model_client.remaining_tokens(messages, tools=self._tool_schema)
while remaining_tokens < 0 and len(messages) > 0:
middle_index = len(messages) // 2
messages.pop(middle_index)
remaining_tokens = self._model_client.remaining_tokens(messages, tools=self._tool_schema)
trimmed = True
else:
token_count = self._model_client.count_tokens(messages, tools=self._tool_schema)
while token_count > self._token_limit and len(messages) > 0:
middle_index = len(messages) // 2
messages.pop(middle_index)
token_count = self._model_client.count_tokens(messages, tools=self._tool_schema)
if messages and isinstance(messages[0], FunctionExecutionResultMessage):
# Handle the first message is a function call result message.
# Remove the first message from the list.
trimmed = True
if trimmed and messages and isinstance(messages[0], FunctionExecutionResultMessage):
# Only remove the function result if trimming actually occurred.
# Function call results are critical for tool-use flows and must be
# preserved when the context fits within the token limit.
messages = messages[1:]
return messages

Expand Down