-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrecursive_task.py
More file actions
143 lines (107 loc) Β· 4.17 KB
/
recursive_task.py
File metadata and controls
143 lines (107 loc) Β· 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Recursive Task Delegation β Nested agent collaboration.
Demonstrates a scenario where Agent A (Manager) delegates a task to
Agent B (Worker). Agent B then asks Agent A for clarification,
Agent A responds, and Agent B completes the task.
Flow:
1. Manager -> Worker: "Generate a report for project X"
2. Worker -> Manager: "What is the deadline?" (Clarification)
3. Manager -> Worker: "Friday"
4. Worker -> Manager: "Report generated..."
"""
import threading
import time
from agent_utils import get_llm_client
from codomyrmex.agents.core import AgentRequest
from codomyrmex.ide.antigravity.agent_relay import AgentRelay
from codomyrmex.ide.antigravity.live_bridge import ClaudeCodeEndpoint
CHANNEL = "recursive-task"
class TaskAgent:
"""Helper to run a task agent with a specific persona."""
def __init__(self, identity, system_prompt):
self.identity = identity
self.system_prompt = system_prompt
self.client = get_llm_client(identity=identity)
self.endpoint = ClaudeCodeEndpoint(
CHANNEL,
identity=identity,
poll_interval=0.5,
claude_client=self.client,
auto_respond=False, # We handle messages manually to inject system prompt
)
# Register handler
self.endpoint.on_message(self._handle_message)
def start(self):
self.endpoint.start()
def stop(self):
self.endpoint.stop()
def send(self, message):
self.endpoint.send(message)
def _handle_message(self, msg):
print(f"\n[{self.identity.title()}] Received from {msg.sender}: {msg.content}")
# Construct prompt with system instruction
# We wrap the user message with the system context
full_prompt = f"System: {self.system_prompt}\n\nUser: {msg.content}"
request = AgentRequest(prompt=full_prompt, context=msg.metadata)
try:
# We explicitly invoke the client with our constructed request
response = self.client.execute_with_session(request, session=None)
if hasattr(response, "is_success") and response.is_success():
return response.content
if hasattr(response, "error"):
return f"Error: {response.error}"
return str(response.content)
except Exception as e:
return f"Error executing agent: {e}"
def run_manager():
"""Manager Agent (Alice)."""
system_prompt = (
"You are a project manager named Alice. You delegated a task 'Generate a report for project X'. "
"If the worker asks for clarification (like deadline), provide reasonable answers (e.g. 'Friday'). "
"keep your answers short (max 1 sentence)."
"If they complete the task, thank them and end the conversation with 'Good job'."
)
agent = TaskAgent("manager", system_prompt)
agent.start()
# Manager initiates
time.sleep(2)
print("\n[Manager] Delegating task...")
agent.send("Generate a report for project X.")
time.sleep(15)
agent.stop()
def run_worker():
"""Worker Agent (Bob)."""
system_prompt = (
"You are a worker named Bob. You received a task. "
"If the task is missing details (like specifically the deadline), ask 'What is the deadline?'. "
"Once you have the deadline, say 'Report generated' and complete the task. "
"Keep your answers short (max 1 sentence)."
)
agent = TaskAgent("worker", system_prompt)
agent.start()
# Worker runs longer to listen
time.sleep(16)
agent.stop()
def main():
# Auto-injected: Load configuration
from pathlib import Path
import yaml
config_path = (
Path(__file__).resolve().parent.parent.parent
/ "config"
/ "agents"
/ "config.yaml"
)
if config_path.exists():
with open(config_path) as f:
yaml.safe_load(f) or {}
print("Loaded config from config/agents/config.yaml")
AgentRelay(CHANNEL).clear()
t1 = threading.Thread(target=run_manager)
t2 = threading.Thread(target=run_worker)
t1.start()
t2.start()
t1.join()
t2.join()
print("\nRecursive task demo complete.")
if __name__ == "__main__":
main()