Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions code_puppy/agents/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
UsageLimitExceeded,
UsageLimits,
)

from code_puppy.llm_retry import (
LLMRetryConfig,
RetryExhaustedError,
llm_run_with_retry,
)
from pydantic_ai.durable_exec.dbos import DBOSAgent
from pydantic_ai.messages import (
ModelMessage,
Expand Down Expand Up @@ -1890,6 +1896,7 @@ async def run_agent_task():
)

usage_limits = UsageLimits(request_limit=get_message_limit())
retry_config = LLMRetryConfig()

# Handle MCP servers - add them temporarily when using DBOS
if (
Expand All @@ -1905,35 +1912,44 @@ async def run_agent_task():
try:
# Set the workflow ID for DBOS context so DBOS and Code Puppy ID match
with SetWorkflowID(group_id):
result_ = await pydantic_agent.run(
prompt_payload,
message_history=self.get_message_history(),
usage_limits=usage_limits,
event_stream_handler=event_stream_handler,
**kwargs,
result_ = await llm_run_with_retry(
lambda: pydantic_agent.run(
prompt_payload,
message_history=self.get_message_history(),
usage_limits=usage_limits,
event_stream_handler=event_stream_handler,
**kwargs,
),
config=retry_config,
)
return result_
finally:
# Always restore original toolsets
pydantic_agent._toolsets = original_toolsets
elif get_use_dbos():
with SetWorkflowID(group_id):
result_ = await pydantic_agent.run(
result_ = await llm_run_with_retry(
lambda: pydantic_agent.run(
prompt_payload,
message_history=self.get_message_history(),
usage_limits=usage_limits,
event_stream_handler=event_stream_handler,
**kwargs,
),
config=retry_config,
)
return result_
else:
# Non-DBOS path (MCP servers are already included)
result_ = await llm_run_with_retry(
lambda: pydantic_agent.run(
prompt_payload,
message_history=self.get_message_history(),
usage_limits=usage_limits,
event_stream_handler=event_stream_handler,
**kwargs,
)
return result_
else:
# Non-DBOS path (MCP servers are already included)
result_ = await pydantic_agent.run(
prompt_payload,
message_history=self.get_message_history(),
usage_limits=usage_limits,
event_stream_handler=event_stream_handler,
**kwargs,
),
config=retry_config,
)
return result_
except* UsageLimitExceeded as ule:
Expand All @@ -1942,6 +1958,16 @@ async def run_agent_task():
"The agent has reached its usage limit. You can ask it to continue by saying 'please continue' or similar.",
group_id=group_id,
)
except* RetryExhaustedError as retry_error:
emit_info(
f"API request failed after retries: {str(retry_error)}",
group_id=group_id,
)
emit_info(
"The API may be experiencing high load. Try again in a moment, "
"or switch models with /model.",
group_id=group_id,
)
except* mcp.shared.exceptions.McpError as mcp_error:
emit_info(f"MCP server error: {str(mcp_error)}", group_id=group_id)
emit_info(f"{str(mcp_error)}", group_id=group_id)
Expand Down
4 changes: 4 additions & 0 deletions code_puppy/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
"register_model_providers",
"message_history_processor_start",
"message_history_processor_end",
"api_retry_start",
"api_retry_end",
]
CallbackFunc = Callable[..., Any]

Expand Down Expand Up @@ -68,6 +70,8 @@
"register_model_providers": [],
"message_history_processor_start": [],
"message_history_processor_end": [],
"api_retry_start": [],
"api_retry_end": [],
}

logger = logging.getLogger(__name__)
Expand Down
Loading
Loading