diff --git a/src/conversation.rs b/src/conversation.rs index 22c569a..8ce7fab 100644 --- a/src/conversation.rs +++ b/src/conversation.rs @@ -27,7 +27,8 @@ use codex_core::{ openai_models::models_manager::ModelsManager, protocol::{ AgentMessageContentDeltaEvent, AgentMessageEvent, AgentReasoningEvent, - AgentReasoningSectionBreakEvent, ApplyPatchApprovalRequestEvent, ElicitationAction, + AgentReasoningRawContentEvent, AgentReasoningSectionBreakEvent, + ApplyPatchApprovalRequestEvent, ElicitationAction, ErrorEvent, Event, EventMsg, ExecApprovalRequestEvent, ExecCommandBeginEvent, ExecCommandEndEvent, ExecCommandOutputDeltaEvent, ExitedReviewModeEvent, FileChange, ItemCompletedEvent, ItemStartedEvent, ListCustomPromptsResponseEvent, McpInvocation, @@ -64,6 +65,37 @@ use crate::{ static APPROVAL_PRESETS: LazyLock> = LazyLock::new(builtin_approval_presets); const INIT_COMMAND_PROMPT: &str = include_str!("./prompt_for_init_command.md"); +fn codex_reasoning_meta( + kind: &str, + item_id: Option<&str>, + summary_index: Option, + content_index: Option, + section_break: bool, + source: Option<&str>, +) -> Meta { + let mut codex_meta = serde_json::Map::new(); + codex_meta.insert("reasoning_kind".to_string(), json!(kind)); + if let Some(item_id) = item_id { + codex_meta.insert("item_id".to_string(), json!(item_id)); + } + if let Some(summary_index) = summary_index { + codex_meta.insert("summary_index".to_string(), json!(summary_index)); + } + if let Some(content_index) = content_index { + codex_meta.insert("content_index".to_string(), json!(content_index)); + } + if section_break { + codex_meta.insert("section_break".to_string(), json!(true)); + } + if let Some(source) = source { + codex_meta.insert("source".to_string(), json!(source)); + } + Meta::from_iter([( + "codex".to_string(), + serde_json::Value::Object(codex_meta), + )]) +} + /// Trait for abstracting over the `CodexConversation` to make testing easier. #[async_trait::async_trait] pub trait CodexConversationImpl { @@ -305,11 +337,13 @@ struct PromptState { submission_id: String, seen_message_deltas: bool, seen_reasoning_deltas: bool, + show_raw_agent_reasoning: bool, } impl PromptState { fn new( conversation: Arc, + show_raw_agent_reasoning: bool, response_tx: oneshot::Sender>, submission_id: String, ) -> Self { @@ -322,6 +356,7 @@ impl PromptState { submission_id, seen_message_deltas: false, seen_reasoning_deltas: false, + show_raw_agent_reasoning, } } @@ -385,17 +420,69 @@ impl PromptState { self.seen_message_deltas = true; client.send_agent_text(delta).await; } - EventMsg::ReasoningContentDelta(ReasoningContentDeltaEvent { thread_id, turn_id, item_id, delta, summary_index: index }) - | EventMsg::ReasoningRawContentDelta(ReasoningRawContentDeltaEvent { thread_id, turn_id, item_id, delta, content_index: index }) => { + EventMsg::ReasoningContentDelta(ReasoningContentDeltaEvent { + thread_id, + turn_id, + item_id, + delta, + summary_index: index, + }) => { info!("Agent reasoning content delta received: thread_id: {thread_id}, turn_id: {turn_id}, item_id: {item_id}, index: {index}, delta: {delta:?}"); self.seen_reasoning_deltas = true; - client.send_agent_thought(delta).await; + client + .send_agent_thought_with_meta( + delta, + Some(codex_reasoning_meta( + "summary", + Some(item_id.as_str()), + Some(index), + None, + false, + None, + )), + ) + .await; + } + EventMsg::ReasoningRawContentDelta(ReasoningRawContentDeltaEvent { + thread_id, + turn_id, + item_id, + delta, + content_index: index, + }) => { + info!("Agent reasoning content delta received: thread_id: {thread_id}, turn_id: {turn_id}, item_id: {item_id}, index: {index}, delta: {delta:?}"); + self.seen_reasoning_deltas = true; + client + .send_agent_thought_with_meta( + delta, + Some(codex_reasoning_meta( + "raw", + Some(item_id.as_str()), + None, + Some(index), + false, + None, + )), + ) + .await; } EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent { item_id, summary_index}) => { info!("Agent reasoning section break received: item_id: {item_id}, index: {summary_index}"); // Make sure the section heading actually get spacing self.seen_reasoning_deltas = true; - client.send_agent_thought("\n\n").await; + client + .send_agent_thought_with_meta( + "\n\n", + Some(codex_reasoning_meta( + "summary", + Some(item_id.as_str()), + Some(summary_index), + None, + true, + None, + )), + ) + .await; } EventMsg::AgentMessage(AgentMessageEvent { message }) => { info!("Agent message (non-delta) received: {message:?}"); @@ -408,7 +495,37 @@ impl PromptState { info!("Agent reasoning (non-delta) received: {text:?}"); // We didn't receive this message via streaming if !std::mem::take(&mut self.seen_reasoning_deltas) { - client.send_agent_thought(text).await; + client + .send_agent_thought_with_meta( + text, + Some(codex_reasoning_meta( + "summary", + None, + None, + None, + false, + Some("non_delta"), + )), + ) + .await; + } + } + EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text }) => { + info!("Agent reasoning raw content (non-delta) received: {text:?}"); + if !std::mem::take(&mut self.seen_reasoning_deltas) { + client + .send_agent_thought_with_meta( + text, + Some(codex_reasoning_meta( + "raw", + None, + None, + None, + false, + Some("non_delta_raw"), + )), + ) + .await; } } EventMsg::PlanUpdate(UpdatePlanArgs { explanation, plan }) => { @@ -576,7 +693,6 @@ impl PromptState { } // Ignore these events - EventMsg::AgentReasoningRawContent(..) // In the future we can use this to update usage stats | EventMsg::TokenCount(..) // we already have a way to diff the turn, so ignore @@ -1315,12 +1431,17 @@ fn parse_command_tool_call(parsed_cmd: Vec, cwd: &Path) -> ParseC struct TaskState { response_tx: Option>>, + show_raw_agent_reasoning: bool, } impl TaskState { - fn new(response_tx: oneshot::Sender>) -> Self { + fn new( + show_raw_agent_reasoning: bool, + response_tx: oneshot::Sender>, + ) -> Self { Self { response_tx: Some(response_tx), + show_raw_agent_reasoning, } } @@ -1341,7 +1462,24 @@ impl TaskState { client.send_agent_text(message).await; } EventMsg::AgentReasoning(AgentReasoningEvent { text }) => { - client.send_agent_thought(text).await; + let kind = if self.show_raw_agent_reasoning { + "raw" + } else { + "summary" + }; + client + .send_agent_thought_with_meta( + text, + Some(codex_reasoning_meta( + kind, + None, + None, + None, + false, + Some("non_delta"), + )), + ) + .await; } EventMsg::UndoStarted(event) => { client @@ -1516,10 +1654,10 @@ impl SessionClient { .await; } - async fn send_agent_thought(&self, text: impl Into) { - self.send_notification(SessionUpdate::AgentThoughtChunk(ContentChunk::new( - text.into().into(), - ))) + async fn send_agent_thought_with_meta(&self, text: impl Into, meta: Option) { + self.send_notification(SessionUpdate::AgentThoughtChunk( + ContentChunk::new(text.into().into()).meta(meta), + )) .await; } @@ -1918,9 +2056,13 @@ impl ConversationActor { info!("Starting to wait for conversation events for submission_id: {submission_id}"); let state = match op { - Op::Compact | Op::Undo => SubmissionState::Task(TaskState::new(response_tx)), + Op::Compact | Op::Undo => SubmissionState::Task(TaskState::new( + self.config.show_raw_agent_reasoning, + response_tx, + )), _ => SubmissionState::Prompt(PromptState::new( self.conversation.clone(), + self.config.show_raw_agent_reasoning, response_tx, submission_id.clone(), )), @@ -2715,6 +2857,197 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_reasoning_meta_streams() -> anyhow::Result<()> { + let (session_id, client, _, message_tx, local_set) = setup(vec![]).await?; + let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); + + message_tx.send(ConversationMessage::Prompt { + request: PromptRequest::new(session_id.clone(), vec!["reasoning meta".into()]), + response_tx: prompt_response_tx, + })?; + + tokio::try_join!( + async { + let stop_reason = prompt_response_rx.await??.await??; + assert_eq!(stop_reason, StopReason::EndTurn); + drop(message_tx); + anyhow::Ok(()) + }, + async { + local_set.await; + anyhow::Ok(()) + } + )?; + + let notifications = client.notifications.lock().unwrap(); + assert_eq!(notifications.len(), 3, "notifications don't match {notifications:?}"); + + let summary_meta = match ¬ifications[0].update { + SessionUpdate::AgentThoughtChunk(chunk) => chunk.meta.as_ref(), + _ => None, + } + .expect("missing summary meta"); + let summary_codex = summary_meta + .get("codex") + .and_then(|value| value.as_object()) + .expect("missing summary codex meta"); + assert_eq!( + summary_codex.get("reasoning_kind"), + Some(&serde_json::json!("summary")) + ); + assert_eq!( + summary_codex.get("summary_index"), + Some(&serde_json::json!(2)) + ); + let summary_item_id = summary_codex + .get("item_id") + .and_then(|value| value.as_str()) + .unwrap_or_default(); + assert!(summary_item_id.starts_with("item-")); + + let raw_meta = match ¬ifications[1].update { + SessionUpdate::AgentThoughtChunk(chunk) => chunk.meta.as_ref(), + _ => None, + } + .expect("missing raw meta"); + let raw_codex = raw_meta + .get("codex") + .and_then(|value| value.as_object()) + .expect("missing raw codex meta"); + assert_eq!( + raw_codex.get("reasoning_kind"), + Some(&serde_json::json!("raw")) + ); + assert_eq!( + raw_codex.get("content_index"), + Some(&serde_json::json!(7)) + ); + let raw_item_id = raw_codex + .get("item_id") + .and_then(|value| value.as_str()) + .unwrap_or_default(); + assert!(raw_item_id.starts_with("item-")); + + let break_meta = match ¬ifications[2].update { + SessionUpdate::AgentThoughtChunk(chunk) => chunk.meta.as_ref(), + _ => None, + } + .expect("missing section break meta"); + let break_codex = break_meta + .get("codex") + .and_then(|value| value.as_object()) + .expect("missing section break codex meta"); + assert_eq!( + break_codex.get("reasoning_kind"), + Some(&serde_json::json!("summary")) + ); + assert_eq!( + break_codex.get("summary_index"), + Some(&serde_json::json!(3)) + ); + assert_eq!( + break_codex.get("section_break"), + Some(&serde_json::json!(true)) + ); + + Ok(()) + } + + #[tokio::test] + async fn test_reasoning_meta_non_delta() -> anyhow::Result<()> { + let (session_id, client, _, message_tx, local_set) = setup(vec![]).await?; + let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); + + message_tx.send(ConversationMessage::Prompt { + request: PromptRequest::new(session_id.clone(), vec!["reasoning nondelta".into()]), + response_tx: prompt_response_tx, + })?; + + tokio::try_join!( + async { + let stop_reason = prompt_response_rx.await??.await??; + assert_eq!(stop_reason, StopReason::EndTurn); + drop(message_tx); + anyhow::Ok(()) + }, + async { + local_set.await; + anyhow::Ok(()) + } + )?; + + let notifications = client.notifications.lock().unwrap(); + assert_eq!(notifications.len(), 1, "notifications don't match {notifications:?}"); + + let meta = match ¬ifications[0].update { + SessionUpdate::AgentThoughtChunk(chunk) => chunk.meta.as_ref(), + _ => None, + } + .expect("missing meta"); + let codex = meta + .get("codex") + .and_then(|value| value.as_object()) + .expect("missing codex meta"); + assert_eq!( + codex.get("reasoning_kind"), + Some(&serde_json::json!("summary")) + ); + assert_eq!( + codex.get("source"), + Some(&serde_json::json!("non_delta")) + ); + + Ok(()) + } + + #[tokio::test] + async fn test_reasoning_meta_raw_non_delta() -> anyhow::Result<()> { + let (session_id, client, _, message_tx, local_set) = setup(vec![]).await?; + let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); + + message_tx.send(ConversationMessage::Prompt { + request: PromptRequest::new(session_id.clone(), vec!["reasoning raw nondelta".into()]), + response_tx: prompt_response_tx, + })?; + + tokio::try_join!( + async { + let stop_reason = prompt_response_rx.await??.await??; + assert_eq!(stop_reason, StopReason::EndTurn); + drop(message_tx); + anyhow::Ok(()) + }, + async { + local_set.await; + anyhow::Ok(()) + } + )?; + + let notifications = client.notifications.lock().unwrap(); + assert_eq!(notifications.len(), 1, "notifications don't match {notifications:?}"); + + let meta = match ¬ifications[0].update { + SessionUpdate::AgentThoughtChunk(chunk) => chunk.meta.as_ref(), + _ => None, + } + .expect("missing meta"); + let codex = meta + .get("codex") + .and_then(|value| value.as_object()) + .expect("missing codex meta"); + assert_eq!( + codex.get("reasoning_kind"), + Some(&serde_json::json!("raw")) + ); + assert_eq!( + codex.get("source"), + Some(&serde_json::json!("non_delta_raw")) + ); + + Ok(()) + } + async fn setup( custom_prompts: Vec, ) -> anyhow::Result<( @@ -2807,6 +3140,98 @@ mod tests { }) .join("\n"); + if prompt == "reasoning meta" { + let item_id = format!("item-{id}"); + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::ReasoningContentDelta( + ReasoningContentDeltaEvent { + thread_id: id.to_string(), + turn_id: id.to_string(), + item_id: item_id.clone(), + delta: "summary delta".to_string(), + summary_index: 2, + }, + ), + }) + .unwrap(); + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::ReasoningRawContentDelta( + ReasoningRawContentDeltaEvent { + thread_id: id.to_string(), + turn_id: id.to_string(), + item_id: item_id.clone(), + delta: "raw delta".to_string(), + content_index: 7, + }, + ), + }) + .unwrap(); + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::AgentReasoningSectionBreak( + AgentReasoningSectionBreakEvent { + item_id, + summary_index: 3, + }, + ), + }) + .unwrap(); + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: None, + }), + }) + .unwrap(); + return Ok(id.to_string()); + } + + if prompt == "reasoning nondelta" { + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::AgentReasoning(AgentReasoningEvent { + text: "non-delta reasoning".to_string(), + }), + }) + .unwrap(); + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: None, + }), + }) + .unwrap(); + return Ok(id.to_string()); + } + + if prompt == "reasoning raw nondelta" { + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { + text: "non-delta raw reasoning".to_string(), + }), + }) + .unwrap(); + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: None, + }), + }) + .unwrap(); + return Ok(id.to_string()); + } + self.op_tx .send(Event { id: id.to_string(),