Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 162 additions & 58 deletions src/arc_agi_benchmarking/adapters/openai_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ def __init__(self, text):
self.role = "assistant"
self.type = "message"

class _ResponsesReasoning:
def __init__(self, summary=None):
self.summary = summary

class _ResponsesResponse:
def __init__(self, model_name, content, usage_data, response_id, finish_reason="stop"):
def __init__(self, model_name, content, usage_data, response_id, finish_reason="stop", reasoning=None):
self.id = response_id or "stream-response"
self.model = model_name
self.object = "response"
self.output = [_ResponsesOutput(content)]
self.output_text = content
self.finish_reason = finish_reason
self.usage = usage_data
self.reasoning = reasoning


class OpenAIBaseAdapter(ProviderAdapter, abc.ABC):
Expand Down Expand Up @@ -108,29 +114,18 @@ def _chat_completion_stream(self, messages: List[Dict[str, str]]) -> ChatComplet
content_chunks = []
last_chunk = None
finish_reason = "stop"
chunk_count = 0

for chunk in stream:
last_chunk = chunk
chunk_count += 1


# Extract content from chunk
delta_content = ""
if chunk.choices:
delta_content = chunk.choices[0].delta.content or ""
if delta_content:
content_chunks.append(delta_content)

if chunk.choices and chunk.choices[0].delta.content:
content_chunks.append(chunk.choices[0].delta.content)

# Track finish reason
if chunk.choices and chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason

if chunk_count % 200 == 0 and logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Streaming progress: {chunk_count} chunks received; "
f"latest chunk chars={len(delta_content)}"
)


# Build final response
final_content = ''.join(content_chunks)

Expand All @@ -146,9 +141,7 @@ def _chat_completion_stream(self, messages: List[Dict[str, str]]) -> ChatComplet
total_tokens=0
)

logger.debug(
f"Streaming complete. Chunks received: {chunk_count}, Content length: {len(final_content)}"
)
logger.debug(f"Streaming complete. Content length: {len(final_content)}")

return ChatCompletion(
id=response_id,
Expand Down Expand Up @@ -183,6 +176,8 @@ def _responses(self, messages: List[Dict[str, str]]) -> Any:
"""
Make a call to the OpenAI Responses API
"""
# Ensure verbosity is set to 'high' for detailed output
self._ensure_verbosity()

resp = self.client.responses.create(
model=self.model_config.model_name,
Expand Down Expand Up @@ -211,7 +206,10 @@ def _responses_stream(self, messages: List[Dict[str, str]]) -> Any:
Make a streaming call to the OpenAI Responses API and return the final response.
"""
logger.debug(f"Starting streaming responses for model: {self.model_config.model_name}")


# Ensure verbosity is set to 'high' for detailed output
self._ensure_verbosity()

# Prepare kwargs for streaming, removing 'stream' to avoid duplication
stream_kwargs = {k: v for k, v in self.model_config.kwargs.items() if k != 'stream'}

Expand All @@ -226,55 +224,104 @@ def _responses_stream(self, messages: List[Dict[str, str]]) -> Any:

# Process the stream and collect data
content_chunks = []
reasoning_chunks = []
response_id = None
finish_reason = "stop"
usage_data = None
chunk_count = 0

for chunk in stream:
chunk_count += 1

for chunk in stream:
# Extract response ID
if chunk.type == 'response.created':
response_id = chunk.response.id
# Extract content deltas

# Extract output text deltas
if chunk.type == 'response.output_text.delta':
delta = chunk.delta or ""
if delta:
content_chunks.append(delta)
delta_length = len(delta)
else:
delta_length = 0

content_chunks.append(chunk.delta)

# Extract reasoning deltas
if chunk.type == 'response.reasoning.delta':
reasoning_chunks.append(chunk.delta)

# Track finish reason
if hasattr(chunk, 'finish_reason') and chunk.finish_reason:
finish_reason = chunk.finish_reason
# Extract usage data

# Extract usage data from response object
if hasattr(chunk, 'response') and chunk.response:
usage_data = self._get_usage(chunk.response)

if chunk_count % 10 == 0 and logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Streaming progress: {chunk_count} chunks received; "
f"last chunk type={getattr(chunk, 'type', 'unknown')}, chars={delta_length}"
)

# Always retrieve final response to get complete output array with reasoning
reasoning_summary = None
if response_id:
try:
final_response = self.client.responses.retrieve(response_id)

# Parse the output array to extract reasoning content
if hasattr(final_response, 'output') and final_response.output:
logger.debug(f"Parsing output array with {len(final_response.output)} items")
for idx, output_item in enumerate(final_response.output):
item_type = getattr(output_item, 'type', 'UNKNOWN')
logger.debug(f"Output item {idx}: type={item_type}")

# Look for reasoning blocks (type: "reasoning")
if hasattr(output_item, 'type') and output_item.type == 'reasoning':
logger.debug(f"Found reasoning block at index {idx}")

# Try different fields for reasoning content
# 1. Try summary first (plain text summary)
if hasattr(output_item, 'summary') and output_item.summary:
reasoning_summary = output_item.summary
logger.debug(f"Got reasoning from 'summary' field, length: {len(reasoning_summary)}")
# 2. Try content (may be None or empty)
elif hasattr(output_item, 'content') and output_item.content:
content = output_item.content
logger.debug(f"Reasoning content type: {type(content)}")

# content can be a string or array of content objects
if isinstance(content, list):
reasoning_texts = [c.text if hasattr(c, 'text') else str(c) for c in content]
reasoning_summary = '\n'.join(reasoning_texts)
elif isinstance(content, str):
reasoning_summary = content
elif hasattr(content, 'text'):
reasoning_summary = content.text
else:
logger.warning(f"Unknown reasoning content structure: {content}")

logger.debug(f"Reasoning summary length: {len(reasoning_summary) if reasoning_summary else 0}")
else:
logger.warning(f"Reasoning block has no content attribute")

# Fallback: use streamed reasoning chunks if output array didn't have it
if not reasoning_summary and reasoning_chunks:
reasoning_summary = ''.join(reasoning_chunks)

# Get usage if we didn't get it during streaming
if usage_data is None:
usage_data = self._get_usage(final_response)

except Exception as e:
logger.warning(f"Failed to retrieve final response for reasoning: {e}")
# Fallback to streamed reasoning chunks
if reasoning_chunks:
reasoning_summary = ''.join(reasoning_chunks)

# Build final response
final_content = ''.join(content_chunks)
response_id = response_id or f"stream-{int(time.time())}"

logger.debug(
f"Streaming complete. Chunks received: {chunk_count}, Content length: {len(final_content)}"
)


logger.debug(f"Streaming complete. Content length: {len(final_content)}, Reasoning captured: {reasoning_summary is not None}")

# Create reasoning object if we captured a summary
reasoning_obj = _ResponsesReasoning(summary=reasoning_summary) if reasoning_summary is not None else None

return _ResponsesResponse(
model_name=self.model_config.model_name,
content=final_content,
usage_data=usage_data,
response_id=response_id,
finish_reason=finish_reason
finish_reason=finish_reason,
reasoning=reasoning_obj
)

except Exception as e:
Expand Down Expand Up @@ -352,17 +399,63 @@ def _get_usage(self, response: Any) -> Usage:
)
)

def _get_reasoning_summary(self, response: Any) -> Optional[List[Dict[str, Any]]]:
def _get_reasoning_summary(self, response: Any) -> Optional[str]:
"""
Extract reasoning summary from the response if available (primarily for Responses API).
Extract and normalize reasoning summary from the response if available (Responses API).
"""
reasoning_summary = None
if self.model_config.api_type == APIType.RESPONSES:
# Safely access potential reasoning summary
if hasattr(response, 'reasoning') and response.reasoning and hasattr(response.reasoning, 'summary'):
reasoning_summary = response.reasoning.summary # Will be None if not present
# Chat Completions API does not currently provide a separate summary field
return reasoning_summary
if self.model_config.api_type != APIType.RESPONSES:
return None

summary = None
try:
# Primary location: top-level reasoning object
reasoning_obj = getattr(response, 'reasoning', None)
if reasoning_obj and hasattr(reasoning_obj, 'summary'):
summary = reasoning_obj.summary

# Fallback: some providers nest reasoning on output items
if summary is None and hasattr(response, 'output') and response.output:
for output in response.output:
nested_reasoning = getattr(output, 'reasoning', None)
if nested_reasoning and hasattr(nested_reasoning, 'summary'):
summary = nested_reasoning.summary
break
except Exception as exc:
logger.debug(f"Unable to extract reasoning summary: {exc}", exc_info=True)

return self._coerce_reasoning_summary_to_text(summary)

def _coerce_reasoning_summary_to_text(self, summary: Any) -> Optional[str]:
"""
Normalize reasoning summaries of various shapes (str/list/dict/obj) into a string.
"""
if summary is None:
return None

if isinstance(summary, str):
clean = summary.strip()
return clean or None

if isinstance(summary, list):
parts: List[str] = []
for item in summary:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text") or item.get("message") or item.get("content")
if text:
parts.append(str(text))
elif hasattr(item, "text"):
text = getattr(item, "text", None)
if text:
parts.append(str(text))
merged = "\n\n".join([p.strip() for p in parts if p and str(p).strip()])
return merged or None

try:
return str(summary)
except Exception:
return None

def _get_content(self, response: Any) -> str:
"""
Expand Down Expand Up @@ -398,7 +491,18 @@ def _normalize_to_responses_kwargs(self):
if "max_tokens" in self.model_config.kwargs:
self.model_config.kwargs["max_output_tokens"] = self.model_config.kwargs.pop("max_tokens")
if "max_completion_tokens" in self.model_config.kwargs:
self.model_config.kwargs["max_output_tokens"] = self.model_config.kwargs.pop("max_completion_tokens")
self.model_config.kwargs["max_output_tokens"] = self.model_config.kwargs.pop("max_completion_tokens")

def _ensure_verbosity(self):
"""
Ensure text.verbosity is set to 'high' for Responses API calls.
This ensures detailed output is returned, especially for reasoning models.
"""
if self.model_config.api_type == APIType.RESPONSES:
if "text" not in self.model_config.kwargs:
self.model_config.kwargs["text"] = {}
if "verbosity" not in self.model_config.kwargs["text"]:
self.model_config.kwargs["text"]["verbosity"] = "high"

def _calculate_cost(self, response: Any) -> Cost:
"""Calculate usage costs, validate token counts, and return a Cost object."""
Expand Down