diff --git a/sidecar/src/agentic/farm/executor.rs b/sidecar/src/agentic/farm/executor.rs new file mode 100644 index 000000000..50da39786 --- /dev/null +++ b/sidecar/src/agentic/farm/executor.rs @@ -0,0 +1,126 @@ +//! Task execution and state management for the agent farm + +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; +use tokio::task::JoinHandle; + +use super::{Agent, Task, TaskStatus}; +use crate::agentic::tool::ToolBox; + +pub struct TaskExecutor { + agent: Arc>, + tool_box: Arc, + task_rx: mpsc::Receiver, + state_tx: mpsc::Sender, +} + +#[derive(Debug, Clone)] +pub struct TaskState { + pub task_id: String, + pub agent_id: String, + pub status: TaskStatus, + pub result: Option, +} + +impl TaskExecutor { + pub fn new( + agent: Arc>, + tool_box: Arc, + task_rx: mpsc::Receiver, + state_tx: mpsc::Sender, + ) -> Self { + Self { + agent, + tool_box, + task_rx, + state_tx, + } + } + + pub async fn run(mut self) -> JoinHandle<()> { + tokio::spawn(async move { + while let Some(task) = self.task_rx.recv().await { + let agent_id = { + let agent = self.agent.lock().await; + agent.config.id.clone() + }; + + let mut state = TaskState { + task_id: task.id.clone(), + agent_id, + status: TaskStatus::InProgress, + result: None, + }; + + // Notify task started + let _ = self.state_tx.send(state.clone()).await; + + // Execute task using toolbox + let result = self.execute_task(&task).await; + + // Update task state based on result + state.status = match result { + Ok(output) => { + state.result = Some(output); + TaskStatus::Completed + } + Err(e) => TaskStatus::Failed(e), + }; + + // Notify task completion + let _ = self.state_tx.send(state).await; + + // Update agent's task list + if let Ok(mut agent) = self.agent.lock().await { + if let Some(pos) = agent.current_tasks.iter().position(|t| t.id == task.id) { + agent.current_tasks.remove(pos); + } + } + } + }) + } + + async fn execute_task(&self, task: &Task) -> Result { + // Here we would use the toolbox to execute the task based on its requirements + // For now, just simulate task execution + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + Ok(format!("Completed task: {}", task.description)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::agentic::farm::{AgentConfig, Agent}; + + #[tokio::test] + async fn test_task_execution() { + let config = AgentConfig { + id: "test_agent".to_string(), + capabilities: vec!["test".to_string()], + max_concurrent_tasks: 1, + }; + + let (task_tx, task_rx) = mpsc::channel(32); + let agent = Agent { + config, + memory: Default::default(), + current_tasks: Vec::new(), + task_tx, + }; + + let (state_tx, mut state_rx) = mpsc::channel(32); + let tool_box = Arc::new(ToolBox::default()); + + let executor = TaskExecutor::new( + Arc::new(Mutex::new(agent)), + tool_box, + task_rx, + state_tx, + ); + + let _handle = executor.run().await; + + // Test will be expanded with actual task execution + } +} \ No newline at end of file diff --git a/sidecar/src/agentic/farm/mod.rs b/sidecar/src/agentic/farm/mod.rs new file mode 100644 index 000000000..fd4578f14 --- /dev/null +++ b/sidecar/src/agentic/farm/mod.rs @@ -0,0 +1,136 @@ +//! Agent Farm implementation for managing multiple agents and their tasks +//! This module provides functionality for: +//! - Managing a pool of agents +//! - Distributing tasks among agents +//! - Handling inter-agent communication +//! - Managing farm resources and state + +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; + +use super::memory::base::Memory; +use super::tool; + +#[derive(Debug, Clone)] +pub struct AgentConfig { + pub id: String, + pub capabilities: Vec, + pub max_concurrent_tasks: usize, +} + +#[derive(Debug)] +pub struct Task { + pub id: String, + pub description: String, + pub requirements: Vec, + pub priority: u8, + pub status: TaskStatus, +} + +#[derive(Debug, Clone)] +pub enum TaskStatus { + Pending, + InProgress, + Completed, + Failed(String), +} + +pub struct Agent { + pub config: AgentConfig, + pub memory: Memory, + pub current_tasks: Vec, + pub task_tx: mpsc::Sender, +} + +pub struct AgentFarm { + agents: HashMap>>, + task_queue: VecDeque, + max_agents: usize, +} + +impl AgentFarm { + pub fn new(max_agents: usize) -> Self { + Self { + agents: HashMap::new(), + task_queue: VecDeque::new(), + max_agents, + } + } + + pub fn add_agent(&mut self, config: AgentConfig) -> Result<(), String> { + if self.agents.len() >= self.max_agents { + return Err("Maximum number of agents reached".to_string()); + } + + let (task_tx, mut task_rx) = mpsc::channel(32); + let agent = Agent { + config: config.clone(), + memory: Memory::default(), + current_tasks: Vec::new(), + task_tx, + }; + + self.agents.insert(config.id.clone(), Arc::new(Mutex::new(agent))); + Ok(()) + } + + pub fn submit_task(&mut self, task: Task) { + self.task_queue.push_back(task); + self.distribute_tasks(); + } + + fn distribute_tasks(&mut self) { + while let Some(task) = self.task_queue.pop_front() { + if let Some(agent) = self.find_suitable_agent(&task) { + if let Ok(mut agent) = agent.lock() { + if agent.current_tasks.len() < agent.config.max_concurrent_tasks { + agent.current_tasks.push(task.clone()); + let _ = agent.task_tx.try_send(task); + } else { + self.task_queue.push_front(task); + break; + } + } + } else { + self.task_queue.push_front(task); + break; + } + } + } + + fn find_suitable_agent(&self, task: &Task) -> Option>> { + self.agents.values() + .find(|agent| { + if let Ok(agent) = agent.lock() { + agent.config.capabilities.iter() + .any(|cap| task.requirements.contains(cap)) + && agent.current_tasks.len() < agent.config.max_concurrent_tasks + } else { + false + } + }) + .cloned() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_farm_state_management() { + let tool_box = Arc::new(ToolBox::default()); + let (mut farm, state_tx) = AgentFarm::new(5, tool_box); + + let config = AgentConfig { + id: "agent1".to_string(), + capabilities: vec!["rust".to_string()], + max_concurrent_tasks: 3, + }; + + assert!(farm.add_agent(config, state_tx).await.is_ok()); + + // Add more state management tests + } +} \ No newline at end of file diff --git a/sidecar/src/agentic/mod.rs b/sidecar/src/agentic/mod.rs index ef319335b..80cd17c47 100644 --- a/sidecar/src/agentic/mod.rs +++ b/sidecar/src/agentic/mod.rs @@ -19,6 +19,7 @@ //! //! Nomenclature (cause we keep things professional here, but everyone loves anime and I hate paying tech-debt) //! agent == mecha +pub mod farm; pub mod memory; pub mod swe_bench; pub mod symbol; @@ -37,4 +38,4 @@ pub mod tool; // - each agent and human can interact with each other (should this also be a tool?) // - agent and human have a working memory as well // - End of day is the work togehter on the task and get it done -// - we might need to also lock in resources so human and AI do not override each others work +// - we might need to also lock in resources so human and AI do not override each others work \ No newline at end of file diff --git a/sidecar/src/bin/agent_farm_example.rs b/sidecar/src/bin/agent_farm_example.rs new file mode 100644 index 000000000..a5cdbb427 --- /dev/null +++ b/sidecar/src/bin/agent_farm_example.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; +use tokio; + +use sidecar::agentic::{ + farm::{AgentConfig, AgentFarm, Task, TaskStatus}, + tool::ToolBox, +}; + +#[tokio::main] +async fn main() { + // Initialize toolbox and farm + let tool_box = Arc::new(ToolBox::default()); + let (mut farm, state_tx) = AgentFarm::new(5, tool_box); + + // Create agents with different capabilities + let rust_agent = AgentConfig { + id: "rust_agent".to_string(), + capabilities: vec!["rust".to_string()], + max_concurrent_tasks: 2, + }; + + let python_agent = AgentConfig { + id: "python_agent".to_string(), + capabilities: vec!["python".to_string()], + max_concurrent_tasks: 2, + }; + + // Add agents to farm + farm.add_agent(rust_agent, state_tx.clone()).await.unwrap(); + farm.add_agent(python_agent, state_tx).await.unwrap(); + + // Create some example tasks + let tasks = vec![ + Task { + id: "task1".to_string(), + description: "Implement a Rust function".to_string(), + requirements: vec!["rust".to_string()], + priority: 1, + status: TaskStatus::Pending, + }, + Task { + id: "task2".to_string(), + description: "Write a Python script".to_string(), + requirements: vec!["python".to_string()], + priority: 1, + status: TaskStatus::Pending, + }, + ]; + + // Submit tasks + for task in tasks { + farm.submit_task(task); + } + + // Process state updates + loop { + farm.process_state_updates().await; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } +} \ No newline at end of file