Skip to content

[agent_farm] (Run ID: codestoryai_sidecar_issue_2073_4f8b6300) #2075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions sidecar/src/agentic/farm/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//! Task execution and state management for the agent farm

use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;

use super::{Agent, Task, TaskStatus};
use crate::agentic::tool::ToolBox;

pub struct TaskExecutor {
agent: Arc<Mutex<Agent>>,
tool_box: Arc<ToolBox>,
task_rx: mpsc::Receiver<Task>,
state_tx: mpsc::Sender<TaskState>,
}

#[derive(Debug, Clone)]
pub struct TaskState {
pub task_id: String,
pub agent_id: String,
pub status: TaskStatus,
pub result: Option<String>,
}

impl TaskExecutor {
pub fn new(
agent: Arc<Mutex<Agent>>,
tool_box: Arc<ToolBox>,
task_rx: mpsc::Receiver<Task>,
state_tx: mpsc::Sender<TaskState>,
) -> Self {
Self {
agent,
tool_box,
task_rx,
state_tx,
}
}

pub async fn run(mut self) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(task) = self.task_rx.recv().await {
let agent_id = {
let agent = self.agent.lock().await;
agent.config.id.clone()
};

let mut state = TaskState {
task_id: task.id.clone(),
agent_id,
status: TaskStatus::InProgress,
result: None,
};

// Notify task started
let _ = self.state_tx.send(state.clone()).await;

// Execute task using toolbox
let result = self.execute_task(&task).await;

// Update task state based on result
state.status = match result {
Ok(output) => {
state.result = Some(output);
TaskStatus::Completed
}
Err(e) => TaskStatus::Failed(e),
};

// Notify task completion
let _ = self.state_tx.send(state).await;

// Update agent's task list
if let Ok(mut agent) = self.agent.lock().await {
if let Some(pos) = agent.current_tasks.iter().position(|t| t.id == task.id) {
agent.current_tasks.remove(pos);
}
}
}
})
}

async fn execute_task(&self, task: &Task) -> Result<String, String> {
// Here we would use the toolbox to execute the task based on its requirements
// For now, just simulate task execution
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(format!("Completed task: {}", task.description))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::agentic::farm::{AgentConfig, Agent};

#[tokio::test]
async fn test_task_execution() {
let config = AgentConfig {
id: "test_agent".to_string(),
capabilities: vec!["test".to_string()],
max_concurrent_tasks: 1,
};

let (task_tx, task_rx) = mpsc::channel(32);
let agent = Agent {
config,
memory: Default::default(),
current_tasks: Vec::new(),
task_tx,
};

let (state_tx, mut state_rx) = mpsc::channel(32);
let tool_box = Arc::new(ToolBox::default());

let executor = TaskExecutor::new(
Arc::new(Mutex::new(agent)),
tool_box,
task_rx,
state_tx,
);

let _handle = executor.run().await;

// Test will be expanded with actual task execution
}
}
136 changes: 136 additions & 0 deletions sidecar/src/agentic/farm/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//! Agent Farm implementation for managing multiple agents and their tasks
//! This module provides functionality for:
//! - Managing a pool of agents
//! - Distributing tasks among agents
//! - Handling inter-agent communication
//! - Managing farm resources and state

use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;

use super::memory::base::Memory;
use super::tool;

#[derive(Debug, Clone)]
pub struct AgentConfig {
pub id: String,
pub capabilities: Vec<String>,
pub max_concurrent_tasks: usize,
}

#[derive(Debug)]
pub struct Task {
pub id: String,
pub description: String,
pub requirements: Vec<String>,
pub priority: u8,
pub status: TaskStatus,
}

#[derive(Debug, Clone)]
pub enum TaskStatus {
Pending,
InProgress,
Completed,
Failed(String),
}

pub struct Agent {
pub config: AgentConfig,
pub memory: Memory,
pub current_tasks: Vec<Task>,
pub task_tx: mpsc::Sender<Task>,
}

pub struct AgentFarm {
agents: HashMap<String, Arc<Mutex<Agent>>>,
task_queue: VecDeque<Task>,
max_agents: usize,
}

impl AgentFarm {
pub fn new(max_agents: usize) -> Self {
Self {
agents: HashMap::new(),
task_queue: VecDeque::new(),
max_agents,
}
}

pub fn add_agent(&mut self, config: AgentConfig) -> Result<(), String> {
if self.agents.len() >= self.max_agents {
return Err("Maximum number of agents reached".to_string());
}

let (task_tx, mut task_rx) = mpsc::channel(32);
let agent = Agent {
config: config.clone(),
memory: Memory::default(),
current_tasks: Vec::new(),
task_tx,
};

self.agents.insert(config.id.clone(), Arc::new(Mutex::new(agent)));
Ok(())
}

pub fn submit_task(&mut self, task: Task) {
self.task_queue.push_back(task);
self.distribute_tasks();
}

fn distribute_tasks(&mut self) {
while let Some(task) = self.task_queue.pop_front() {
if let Some(agent) = self.find_suitable_agent(&task) {
if let Ok(mut agent) = agent.lock() {
if agent.current_tasks.len() < agent.config.max_concurrent_tasks {
agent.current_tasks.push(task.clone());
let _ = agent.task_tx.try_send(task);
} else {
self.task_queue.push_front(task);
break;
}
}
} else {
self.task_queue.push_front(task);
break;
}
}
}

fn find_suitable_agent(&self, task: &Task) -> Option<Arc<Mutex<Agent>>> {
self.agents.values()
.find(|agent| {
if let Ok(agent) = agent.lock() {
agent.config.capabilities.iter()
.any(|cap| task.requirements.contains(cap))
&& agent.current_tasks.len() < agent.config.max_concurrent_tasks
} else {
false
}
})
.cloned()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_farm_state_management() {
let tool_box = Arc::new(ToolBox::default());
let (mut farm, state_tx) = AgentFarm::new(5, tool_box);

let config = AgentConfig {
id: "agent1".to_string(),
capabilities: vec!["rust".to_string()],
max_concurrent_tasks: 3,
};

assert!(farm.add_agent(config, state_tx).await.is_ok());

// Add more state management tests
}
}
3 changes: 2 additions & 1 deletion sidecar/src/agentic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//!
//! Nomenclature (cause we keep things professional here, but everyone loves anime and I hate paying tech-debt)
//! agent == mecha
pub mod farm;
pub mod memory;
pub mod swe_bench;
pub mod symbol;
Expand All @@ -37,4 +38,4 @@ pub mod tool;
// - each agent and human can interact with each other (should this also be a tool?)
// - agent and human have a working memory as well
// - End of day is the work togehter on the task and get it done
// - we might need to also lock in resources so human and AI do not override each others work
// - we might need to also lock in resources so human and AI do not override each others work
60 changes: 60 additions & 0 deletions sidecar/src/bin/agent_farm_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::sync::Arc;
use tokio;

use sidecar::agentic::{
farm::{AgentConfig, AgentFarm, Task, TaskStatus},
tool::ToolBox,
};

#[tokio::main]
async fn main() {
// Initialize toolbox and farm
let tool_box = Arc::new(ToolBox::default());
let (mut farm, state_tx) = AgentFarm::new(5, tool_box);

// Create agents with different capabilities
let rust_agent = AgentConfig {
id: "rust_agent".to_string(),
capabilities: vec!["rust".to_string()],
max_concurrent_tasks: 2,
};

let python_agent = AgentConfig {
id: "python_agent".to_string(),
capabilities: vec!["python".to_string()],
max_concurrent_tasks: 2,
};

// Add agents to farm
farm.add_agent(rust_agent, state_tx.clone()).await.unwrap();
farm.add_agent(python_agent, state_tx).await.unwrap();

// Create some example tasks
let tasks = vec![
Task {
id: "task1".to_string(),
description: "Implement a Rust function".to_string(),
requirements: vec!["rust".to_string()],
priority: 1,
status: TaskStatus::Pending,
},
Task {
id: "task2".to_string(),
description: "Write a Python script".to_string(),
requirements: vec!["python".to_string()],
priority: 1,
status: TaskStatus::Pending,
},
];

// Submit tasks
for task in tasks {
farm.submit_task(task);
}

// Process state updates
loop {
farm.process_state_updates().await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}