I need to add a new agent_run_result lifecycle hook that fires after each pydantic_agent.run() call, allowing plugins to inspect the result and request a transparent retry. This enables plugins to handle things like content-filter false positives, safety checks, or custom validation without touching core.
Please make the following changes:
- code_puppy/callbacks.py
Add "agent_run_result" to the PhaseType Literal and the _callbacks dict (alongside the existing agent_run_start / agent_run_end entries).
Then add this trigger function after on_agent_run_end:
async def on_agent_run_result(
result: Any,
agent_name: str,
model_name: str,
) -> List[Any]:
"""Trigger callbacks after an agent run returns a result.
Fires after ``pydantic_agent.run()`` completes successfully, **before**
the result is handed back to the caller. Plugins can inspect the result
and request an automatic retry (e.g. when an upstream content-filter
produced a false-positive refusal).
Callback signature::
async def my_callback(result, agent_name: str, model_name: str)
-> dict | None
To request a retry, return a dict with::
{
"retry": True,
"prompt": "<message to send on retry>",
"delay": 1.0, # optional, seconds before retry
}
Return ``None`` (or omit a return) to let the result pass through.
The first callback that returns a retry request wins; the agent
replays at most a small fixed number of times to prevent runaway loops.
Args:
result: The ``RunResult`` returned by ``pydantic_agent.run()``.
agent_name: Name of the agent that produced the result.
model_name: Name of the model that was used.
Returns:
List of results from registered callbacks.
"""
return await _trigger_callbacks(
"agent_run_result", result, agent_name, model_name
)
- code_puppy/agents/base_agent.py
Add on_agent_run_result to the existing import from code_puppy.callbacks:
from code_puppy.callbacks import (
on_agent_run_end,
on_agent_run_result, # <-- add this
on_agent_run_start,
)
Inside run_with_mcp, add this new nested function after _run_with_streaming_retry and before run_agent_task:
async def _run_with_result_hooks(run_coro_factory):
"""Run agent, then let plugin hooks inspect/retry the result.
After a successful ``pydantic_agent.run()``, fires the
``agent_run_result`` hook. If any callback returns
``{"retry": True, "prompt": "..."}`` the agent re-runs with
that prompt (message history is carried forward so the model
sees context). Retries are capped to prevent runaway loops.
"""
_MAX_HOOK_RETRIES = 3
result_ = await _run_with_streaming_retry(run_coro_factory)
for _ in range(_MAX_HOOK_RETRIES):
hook_results = await on_agent_run_result(
result_,
agent_name=self.name,
model_name=self.get_model_name(),
)
retry_req = next(
(
r
for r in hook_results
if isinstance(r, dict) and r.get("retry")
),
None,
)
if not retry_req:
break
# A plugin asked us to replay the turn.
retry_prompt = retry_req.get("prompt", "Please continue.")
retry_delay = retry_req.get("delay", 1.0)
if hasattr(result_, "all_messages"):
self.set_message_history(list(result_.all_messages()))
await asyncio.sleep(retry_delay)
result_ = await _run_with_streaming_retry(
lambda _p=retry_prompt: pydantic_agent.run(
_p,
message_history=self.get_message_history(),
usage_limits=usage_limits,
event_stream_handler=stream_handler,
**kwargs,
)
)
return result_
Then in run_agent_task, replace the three existing _run_with_streaming_retry(...) calls (the DBOS+MCP path, the DBOS-only path, and the non-DBOS path) with _run_with_result_hooks(...) — same arguments, just the wrapper name changes. The lambda factories stay identical.
I need to add a new agent_run_result lifecycle hook that fires after each pydantic_agent.run() call, allowing plugins to inspect the result and request a transparent retry. This enables plugins to handle things like content-filter false positives, safety checks, or custom validation without touching core.
Please make the following changes:
Add "agent_run_result" to the PhaseType Literal and the _callbacks dict (alongside the existing agent_run_start / agent_run_end entries).
Then add this trigger function after on_agent_run_end:
Add on_agent_run_result to the existing import from code_puppy.callbacks:
Inside run_with_mcp, add this new nested function after _run_with_streaming_retry and before run_agent_task:
Then in run_agent_task, replace the three existing _run_with_streaming_retry(...) calls (the DBOS+MCP path, the DBOS-only path, and the non-DBOS path) with _run_with_result_hooks(...) — same arguments, just the wrapper name changes. The lambda factories stay identical.