Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eval_protocol/integrations/tinker_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
# Update row
new_messages = list(row.messages) + [Message(role="assistant", content=assistant_content)]
row.messages = new_messages
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

# Log usage (approximate since Tinker might not return usage stats in same format)
# We can count tokens ourselves
Expand Down
2 changes: 1 addition & 1 deletion eval_protocol/mcp/execution/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async def _execute_with_semaphore(idx):
else:
evaluation_row.rollout_status = Status.rollout_running()

evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - row_start_time
evaluation_row.execution_metadata.rollout_duration_seconds = time.perf_counter() - row_start_time

return evaluation_row

Expand Down
14 changes: 13 additions & 1 deletion eval_protocol/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,21 @@ class ExecutionMetadata(BaseModel):

cost_metrics: Optional[CostMetrics] = Field(default=None, description="Cost breakdown for LLM API calls.")

# deprecated: use rollout_duration_seconds and eval_duration_seconds instead
duration_seconds: Optional[float] = Field(
default=None,
description="Processing duration in seconds for this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
deprecated=True,
description="[Deprecated] Processing duration in seconds for this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, thanks!

)

rollout_duration_seconds: Optional[float] = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding retries, I think it would still be valuable to track total_duration_seconds so that people can get a sense of wall clock time for this row. This can be helpful in the UI as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up PR work though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, i think we should track number of retries as well. for duration we probably still only count the last successful run maybe. for failure i think failure reason matters more

default=None,
description="Processing duration in seconds for the rollout of this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
)

eval_duration_seconds: Optional[float] = Field(
default=None,
description="Processing duration in seconds for the evaluation of this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
)

experiment_duration_seconds: Optional[float] = Field(
Expand Down
2 changes: 1 addition & 1 deletion eval_protocol/pytest/default_agent_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
total_tokens=agent.usage["total_tokens"],
)

agent.evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - start_time
agent.evaluation_row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

return agent.evaluation_row
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
# total_tokens=usage_info.total_tokens or 0,
# )

row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

return row

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:

row.messages = messages

row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

default_logger.log(row)
return row
Expand Down
11 changes: 9 additions & 2 deletions eval_protocol/pytest/evaluation_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@


async def run_tasks_with_eval_progress(
pointwise_tasks: list[asyncio.Task[EvaluationRow]], run_idx: int
pointwise_tasks: list[asyncio.Task[EvaluationRow]], run_idx: int, disable_tqdm: bool = False
) -> list[EvaluationRow]:
"""
Run evaluation tasks with a progress bar and proper cancellation handling.
Expand All @@ -66,6 +66,7 @@ async def run_tasks_with_eval_progress(
miniters=1,
mininterval=0.1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
disable=disable_tqdm,
) as eval_pbar:

async def task_with_progress(task: asyncio.Task[EvaluationRow]) -> EvaluationRow:
Expand All @@ -88,7 +89,10 @@ async def task_with_progress(task: asyncio.Task[EvaluationRow]) -> EvaluationRow


async def run_tasks_with_run_progress(
execute_run_func: Callable[[int, RolloutProcessorConfig], Any], num_runs: int, config: RolloutProcessorConfig
execute_run_func: Callable[[int, RolloutProcessorConfig], Any],
num_runs: int,
config: RolloutProcessorConfig,
disable_tqdm: bool = False,
) -> None:
"""
Run tasks with a parallel runs progress bar, preserving original logic.
Expand All @@ -108,6 +112,7 @@ async def run_tasks_with_run_progress(
dynamic_ncols=True,
miniters=1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
disable=disable_tqdm,
) as run_pbar:

async def execute_run_with_progress(run_idx: int, config: RolloutProcessorConfig) -> Any:
Expand Down Expand Up @@ -330,6 +335,7 @@ async def rollout_processor_with_retry(
fresh_dataset: list[EvaluationRow],
config: RolloutProcessorConfig,
run_idx: int = 0,
disable_tqdm: bool = False,
) -> AsyncGenerator[EvaluationRow, None]:
"""
Wrapper around rollout_processor that handles retry logic using the Python backoff library.
Expand Down Expand Up @@ -449,6 +455,7 @@ async def execute_row_with_backoff_and_log(
miniters=1,
mininterval=0.1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
disable=disable_tqdm,
) as rollout_pbar:
# Yield results as they complete
for task in asyncio.as_completed(retry_tasks):
Expand Down
8 changes: 4 additions & 4 deletions eval_protocol/pytest/github_action_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ def _list_runs():
row.rollout_status = Status.rollout_error(
f"Failed to find workflow run in GHA with rollout_id {row.execution_metadata.rollout_id}"
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time
return row

run_id = run.get("id")
if not run_id:
row.rollout_status = Status.rollout_error(
f"Failed to find workflow run in GHA with rollout_id {row.execution_metadata.rollout_id}"
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time
return row

# Poll the specific run until completion
Expand All @@ -194,10 +194,10 @@ def _get_run() -> Dict[str, Any]:
row.rollout_status = Status.rollout_error(
f"GitHub Actions run timed out after {self.timeout_seconds} seconds"
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time
return row

row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

def _update_with_trace() -> None:
return update_row_with_remote_trace(row, self._output_data_loader, self.model_base_url)
Expand Down
6 changes: 3 additions & 3 deletions eval_protocol/pytest/openenv_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
completion_tokens=usage["completion_tokens"],
total_tokens=usage["total_tokens"],
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

# Attach per-step rewards and accumulated token IDs to
# execution_metadata.extra for downstream integrations
Expand All @@ -436,14 +436,14 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
logger.info("[OpenEnvRolloutProcessor] Total reward: %.3f", total_reward)
logger.info(
"[OpenEnvRolloutProcessor] Duration: %.2fs",
row.execution_metadata.duration_seconds,
row.execution_metadata.rollout_duration_seconds,
)
logger.debug("[OpenEnvRolloutProcessor] Messages collected: %d", len(messages))

logger.info(
f"Rollout complete: {len(step_rewards)} steps, "
f"total_reward={total_reward:.2f}, "
f"duration={row.execution_metadata.duration_seconds:.2f}s"
f"duration={row.execution_metadata.rollout_duration_seconds:.2f}s"
)
# Final log with complete message history
if getattr(config, "logger", None):
Expand Down
Loading
Loading