diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index e4e83d8fb..e9c8f2459 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -332,7 +332,7 @@ def _perform_instrumentation(package_name: str): _has_agentic_library = True # Special case: If mem0 is instrumented, also instrument concurrent.futures - if package_name == "mem0" and is_newly_added: + if (package_name == "mem0" or package_name == "autogen") and is_newly_added: try: # Check if concurrent.futures module is available diff --git a/agentops/instrumentation/agentic/ag2/instrumentor.py b/agentops/instrumentation/agentic/ag2/instrumentor.py index ebb9bf594..983c9fb20 100644 --- a/agentops/instrumentation/agentic/ag2/instrumentor.py +++ b/agentops/instrumentation/agentic/ag2/instrumentor.py @@ -11,7 +11,9 @@ from opentelemetry.trace import SpanKind from opentelemetry.metrics import Meter from opentelemetry.instrumentation.utils import unwrap as otel_unwrap - +import contextvars +import threading +from opentelemetry import context as otel_context from agentops.logging import logger from agentops.instrumentation.common import ( CommonInstrumentor, @@ -23,6 +25,7 @@ from agentops.instrumentation.agentic.ag2 import LIBRARY_NAME, LIBRARY_VERSION from agentops.semconv.message import MessageAttributes from agentops.semconv.span_attributes import SpanAttributes +from agentops.semconv.span_kinds import AgentOpsSpanKindValues from agentops.semconv.agent import AgentAttributes from agentops.semconv.workflow import WorkflowAttributes from agentops.semconv.tool import ToolAttributes @@ -52,16 +55,30 @@ def _create_metrics(self, meter: Meter) -> Dict[str, Any]: return StandardMetrics.create_standard_metrics(meter) def _initialize(self, **kwargs): - """Initialize attribute manager.""" + """Initialize attribute manager and AG2-specific concurrent.futures instrumentation.""" self._attribute_manager = SpanAttributeManager(service_name="agentops", deployment_environment="production") def _custom_wrap(self, **kwargs): """Perform custom wrapping for AG2 methods.""" + methods_to_wrap = [ ("autogen.agentchat.conversable_agent", "ConversableAgent.__init__", self._agent_init_wrapper), - ("autogen.agentchat.conversable_agent", "ConversableAgent.run", self._agent_run_wrapper), + ("autogen.agentchat.conversable_agent", "ConversableAgent.run", self._agent_run_wrapper_with_context), ("autogen.agentchat.conversable_agent", "ConversableAgent.initiate_chat", self._initiate_chat_wrapper), + ( + "autogen.agentchat.conversable_agent", + "ConversableAgent.a_initiate_chat", + self._async_initiate_chat_wrapper, + ), + ( + "autogen.agentchat.conversable_agent", + "ConversableAgent._generate_oai_reply_from_client", + self._generate_oai_reply_from_client_wrapper, + ), + ("autogen.agentchat.conversable_agent", "ConversableAgent.receive", self._receive_wrapper), + ("autogen.agentchat.conversable_agent", "ConversableAgent.a_receive", self._async_receive_wrapper), ("autogen.agentchat.groupchat", "GroupChatManager.run_chat", self._group_chat_run_wrapper), + ("autogen.agentchat.groupchat", "GroupChatManager.a_run_chat", self._async_group_chat_run_wrapper), ( "autogen.agentchat.conversable_agent", "ConversableAgent.execute_function", @@ -78,7 +95,6 @@ def _custom_wrap(self, **kwargs): for module, method, wrapper_factory in methods_to_wrap: try: wrap_function_wrapper(module, method, wrapper_factory(self._tracer)) - logger.debug(f"Successfully wrapped {method}") except (AttributeError, ModuleNotFoundError) as e: logger.debug(f"Failed to wrap {method}: {e}") @@ -89,7 +105,12 @@ def _custom_unwrap(self, **kwargs): ("autogen.agentchat.conversable_agent", "ConversableAgent.__init__"), ("autogen.agentchat.conversable_agent", "ConversableAgent.run"), ("autogen.agentchat.conversable_agent", "ConversableAgent.initiate_chat"), + ("autogen.agentchat.conversable_agent", "ConversableAgent.a_initiate_chat"), + ("autogen.agentchat.conversable_agent", "ConversableAgent._generate_oai_reply_from_client"), + ("autogen.agentchat.conversable_agent", "ConversableAgent.receive"), + ("autogen.agentchat.conversable_agent", "ConversableAgent.a_receive"), ("autogen.agentchat.groupchat", "GroupChatManager.run_chat"), + ("autogen.agentchat.groupchat", "GroupChatManager.a_run_chat"), ("autogen.agentchat.conversable_agent", "ConversableAgent.execute_function"), ("autogen.agentchat.conversable_agent", "ConversableAgent.run_code"), ("autogen.agentchat.groupchat", "GroupChat.select_speaker"), @@ -135,12 +156,132 @@ def wrapper(wrapped, instance, args, kwargs): instance._agentops_metadata = {"name": name, "type": "ConversableAgent", "model": model} return result - except Exception as e: - logger.error(f"Error in agent init instrumentation: {e}") + except Exception: return wrapped(*args, **kwargs) return wrapper + def _generate_oai_reply_from_client_wrapper(self, tracer): + """Wrapper for capturing _generate_oai_reply_from_client method calls with token metrics.""" + + def wrapper(wrapped, instance, args, kwargs): + agent_name = getattr(instance, "name", "unnamed_agent") + + # Get model name from llm_client for span naming + llm_client = args[0] if args else kwargs.get("llm_client") + + # Extract model from _config_list + model_name = "unknown" + if hasattr(llm_client, "_config_list") and llm_client._config_list: + if isinstance(llm_client._config_list, list) and len(llm_client._config_list) > 0: + config = llm_client._config_list[0] + if isinstance(config, dict) and "model" in config: + model_name = config["model"] + + span_name = f"{model_name}.llm" + + with create_span( + tracer, span_name, kind=SpanKind.CLIENT, attribute_manager=self._attribute_manager + ) as span: + # Set span kind for actual LLM client call + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, AgentOpsSpanKindValues.LLM.value) + span.set_attribute(AgentAttributes.AGENT_NAME, agent_name) + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "llm") + span.set_attribute("llm.client_call", "true") + + # Get messages from args + messages = args[1] if len(args) > 1 else kwargs.get("messages", []) + + # Extract input from messages and set gen_ai.prompt + if messages and isinstance(messages, list) and len(messages) > 0: + # Set gen_ai.prompt array with full conversation history + prompt_index = 0 + for msg in messages: + if isinstance(msg, dict) and msg.get("role") in ["user", "assistant", "system"]: + role = msg.get("role") + content = msg.get("content", "") + if content and role: + span.set_attribute( + f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", self._safe_str(content) + ) + span.set_attribute(f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", role) + prompt_index += 1 + + # Set entity input to the latest user message (what triggered this LLM call) + latest_user_message = None + for msg in messages: + if isinstance(msg, dict) and msg.get("role") == "user": + content = msg.get("content", "") + if content: + latest_user_message = content + + if latest_user_message: + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, self._safe_str(latest_user_message)) + + # Call the wrapped method - this is where the actual LLM call happens + result = wrapped(*args, **kwargs) + + # Set the output and gen_ai.completion + if result: + if isinstance(result, dict): + content = result.get("content", "") + if content: + content_str = self._safe_str(content) + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_OUTPUT, content_str) + span.set_attribute(f"{SpanAttributes.LLM_COMPLETIONS}.0.content", content_str) + span.set_attribute(f"{SpanAttributes.LLM_COMPLETIONS}.0.role", "assistant") + + # If model information is in the result + if "model" in result: + span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, result["model"]) + elif isinstance(result, str): + # Handle string result (which is what AG2 returns) + result_str = self._safe_str(result) + + # Set entity output + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_OUTPUT, result_str) + + # Set gen_ai.completion with full content + span.set_attribute(f"{SpanAttributes.LLM_COMPLETIONS}.0.content", result_str) + span.set_attribute(f"{SpanAttributes.LLM_COMPLETIONS}.0.role", "assistant") + + # Try to get token metrics from the client's usage tracking + try: + # The OpenAIWrapper tracks usage in actual_usage_summary and total_usage_summary + if hasattr(llm_client, "actual_usage_summary") and llm_client.actual_usage_summary: + # Get the latest usage + for model, usage in llm_client.actual_usage_summary.items(): + if model != "total_cost" and isinstance(usage, dict): + prompt_tokens = usage.get("prompt_tokens", 0) + completion_tokens = usage.get("completion_tokens", 0) + total_tokens = usage.get("total_tokens", 0) + cost = usage.get("cost", 0.0) + + # Set token usage metrics + if prompt_tokens > 0: + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, str(prompt_tokens)) + if completion_tokens > 0: + span.set_attribute( + SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, str(completion_tokens) + ) + if total_tokens > 0: + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, str(total_tokens)) + if cost > 0: + span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, str(cost)) + + # Set request/response model + span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model) + span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, model) + span.set_attribute(SpanAttributes.LLM_SYSTEM, "ag2") + + break # Use the first model's metrics + except Exception as e: + logger.debug(f"[AG2 DEBUG] Could not extract token metrics: {e}") + + return result + + return wrapper + def _initiate_chat_wrapper(self, tracer): """Wrapper for capturing individual chat initiation as a parent span.""" @@ -153,25 +294,54 @@ def wrapper(wrapped, instance, args, kwargs): initiator_name = getattr(instance, "name", "unnamed_initiator") recipient_name = getattr(recipient_agent, "name", "unnamed_agent") - span_name = f"ag2.chat.{initiator_name}_to_{recipient_name}" + span_name = f"ag2.chat.{initiator_name}_to_{recipient_name}.workflow" with create_span( tracer, span_name, kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager ) as span: + # Set span kind as agent for proper categorization + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, AgentOpsSpanKindValues.AGENT.value) span.set_attribute(AgentAttributes.FROM_AGENT, initiator_name) span.set_attribute(AgentAttributes.TO_AGENT, recipient_name) - span.set_attribute("ag2.chat.type", "individual") - span.set_attribute("ag2.chat.initiator", initiator_name) - span.set_attribute("ag2.chat.recipient", recipient_name) + span.set_attribute("agent.type", "individual") + span.set_attribute("agent.initiator", initiator_name) + span.set_attribute("agent.recipient", recipient_name) - # Extract system messages and LLM configs - self._extract_agent_attributes(span, instance, recipient_agent) + # Set agentops entity attributes + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "agent") # Extract initial message initial_message = kwargs.get("message", "") if initial_message: initial_message = self._safe_str(initial_message) - span.set_attribute("ag2.chat.initial_message", initial_message) + span.set_attribute("agent.initial_message", initial_message) + # Set entity input + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, initial_message) + + # Extract system messages and put them in agent attributes + initiator_system_msg = getattr(instance, "system_message", "") + if initiator_system_msg: + initiator_system_msg = self._safe_str(initiator_system_msg) + span.set_attribute("agent.initiator_system_message", initiator_system_msg) + + recipient_system_msg = getattr(recipient_agent, "system_message", "") + if recipient_system_msg: + recipient_system_msg = self._safe_str(recipient_system_msg) + span.set_attribute("agent.system_instruction", recipient_system_msg) + # Also set in gen_ai for compatibility + span.set_attribute(SpanAttributes.LLM_REQUEST_SYSTEM_INSTRUCTION, recipient_system_msg) + + # Extract LLM config and set gen_ai attributes + recipient_llm_config = getattr(recipient_agent, "llm_config", {}) + + if isinstance(recipient_llm_config, dict) and recipient_llm_config: + model = recipient_llm_config.get("model", "unknown") + span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model) + span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, model) + span.set_attribute(SpanAttributes.LLM_SYSTEM, "ag2") + + # Also set LLM config attributes + self._set_llm_config_attributes(span, recipient_llm_config) result = wrapped(*args, **kwargs) @@ -182,13 +352,180 @@ def wrapper(wrapped, instance, args, kwargs): return wrapper - def _agent_run_wrapper(self, tracer): - """Wrapper for capturing agent run as a summary.""" + def _async_initiate_chat_wrapper(self, tracer): + """Wrapper for capturing async individual chat initiation as a parent span.""" + + async def wrapper(wrapped, instance, args, kwargs): + recipient_agent = args[0] if args else None + if not recipient_agent: + return await wrapped(*args, **kwargs) + + # Get agent names for span identification + initiator_name = getattr(instance, "name", "unnamed_initiator") + recipient_name = getattr(recipient_agent, "name", "unnamed_agent") + + span_name = f"ag2.chat.{initiator_name}_to_{recipient_name}.workflow" + + with create_span( + tracer, span_name, kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager + ) as span: + # Set span kind as agent for proper categorization + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, AgentOpsSpanKindValues.AGENT.value) + span.set_attribute(AgentAttributes.FROM_AGENT, initiator_name) + span.set_attribute(AgentAttributes.TO_AGENT, recipient_name) + span.set_attribute("agent.type", "individual_async") + span.set_attribute("agent.initiator", initiator_name) + span.set_attribute("agent.recipient", recipient_name) + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "agent") + + # Extract initial message + initial_message = kwargs.get("message", "") + if initial_message: + initial_message = self._safe_str(initial_message) + span.set_attribute("agent.initial_message", initial_message) + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, initial_message) + + # Extract system messages + recipient_system_msg = getattr(recipient_agent, "system_message", "") + if recipient_system_msg: + recipient_system_msg = self._safe_str(recipient_system_msg) + span.set_attribute("agent.system_instruction", recipient_system_msg) + span.set_attribute(SpanAttributes.LLM_REQUEST_SYSTEM_INSTRUCTION, recipient_system_msg) + + # Extract LLM config + recipient_llm_config = getattr(recipient_agent, "llm_config", {}) + if isinstance(recipient_llm_config, dict) and recipient_llm_config: + self._set_llm_config_attributes(span, recipient_llm_config) + + result = await wrapped(*args, **kwargs) + + # Extract chat history after completion + self._extract_chat_history(span, instance, recipient_agent) + + return result + + return wrapper + + def _receive_wrapper(self, tracer): + """Wrapper for capturing message receive events.""" + + def wrapper(wrapped, instance, args, kwargs): + agent_name = getattr(instance, "name", "unnamed_agent") + span_name = f"ag2.agent.{agent_name}.receive" + + with create_span( + tracer, span_name, kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager + ) as span: + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, AgentOpsSpanKindValues.AGENT.value) + span.set_attribute(AgentAttributes.AGENT_NAME, agent_name) + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "agent") + + # Get message and sender + message = args[0] if args else kwargs.get("message", "") + sender = args[1] if len(args) > 1 else kwargs.get("sender") + + if sender: + sender_name = getattr(sender, "name", "unknown") + span.set_attribute("agent.sender", sender_name) + + if message: + if isinstance(message, dict): + content = message.get("content", "") + if content: + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, self._safe_str(content)) + elif isinstance(message, str): + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, self._safe_str(message)) + + result = wrapped(*args, **kwargs) + return result + + return wrapper + + def _async_receive_wrapper(self, tracer): + """Wrapper for async capturing message reception.""" + + async def wrapper(wrapped, instance, args, kwargs): + agent_name = getattr(instance, "name", "unnamed_agent") + span_name = f"ag2.agent.{agent_name}.async_receive" + + with create_span( + tracer, span_name, kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager + ) as span: + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, AgentOpsSpanKindValues.AGENT.value) + span.set_attribute(AgentAttributes.AGENT_NAME, agent_name) + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "agent") + + # Get message from the first argument + message = args[0] if args else None + + # Enhanced message processing + if message: + if isinstance(message, dict): + # Dict message format + sender_name = message.get("name", "unknown") + content = self._extract_message_content(message) + role = message.get("role", "user") + + # Set sender and message attributes + span.set_attribute("agent.sender", sender_name) + span.set_attribute("message.role", role) + + if content: + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, content) + span.set_attribute("message.content", content) + + elif isinstance(message, str): + # String message format + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, message) + span.set_attribute("message.content", message) + + # Get sender from the second argument if available + sender = args[1] if len(args) > 1 else None + if sender and hasattr(sender, "name"): + span.set_attribute("agent.sender_name", sender.name) + + return await wrapped(*args, **kwargs) + + return wrapper + + def _async_group_chat_run_wrapper(self, tracer): + """Wrapper for capturing async group chat execution.""" + + async def wrapper(wrapped, instance, args, kwargs): + with create_span( + tracer, + "ag2.groupchat.run.task.async", + kind=SpanKind.INTERNAL, + attribute_manager=self._attribute_manager, + ) as span: + group_chat = getattr(instance, "groupchat", None) + agents = getattr(group_chat, "agents", []) if group_chat else [] + agent_names = [getattr(agent, "name", f"agent_{i}") for i, agent in enumerate(agents)] + + span.set_attribute(AgentAttributes.AGENT_ROLE, "GroupChatManager") + span.set_attribute(AgentAttributes.AGENT_NAME, getattr(instance, "name", "unnamed_manager")) + span.set_attribute("groupchat.agents", ", ".join(agent_names)) + span.set_attribute("groupchat.agent_count", len(agents)) + + # Capture input message if available + message = kwargs.get("message", "") + if message: + content_to_set = self._extract_message_content(message) + span.set_attribute("groupchat.input_message", content_to_set) + + result = await wrapped(*args, **kwargs) + self._capture_group_chat_summary(span, instance, result) + return result + + return wrapper + + def _agent_run_wrapper_with_context(self, tracer): + """Wrapper for capturing agent run with context propagation and proper span lifecycle.""" def wrapper(wrapped, instance, args, kwargs): agent_name = getattr(instance, "name", "unnamed_agent") agent_type = getattr(instance, "_agentops_metadata", {}).get("type", "ConversableAgent") - span_name = f"ag2.agent.{agent_name}.run" + span_name = f"ag2.agent.{agent_name}.run.workflow" with create_span( tracer, span_name, kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager @@ -206,20 +543,96 @@ def wrapper(wrapped, instance, args, kwargs): message = kwargs.get("message", "") if message: content_to_set = self._extract_message_content(message) - span.set_attribute("ag2.run.input_message", content_to_set) - - # Initialize completions and prompts count - span.set_attribute(SpanAttributes.LLM_COMPLETIONS, 0) - span.set_attribute(SpanAttributes.LLM_PROMPTS, 0) - - response = wrapped(*args, **kwargs) + span.set_attribute("agent.run.input_message", content_to_set) + + # Capture BOTH contextvars and OpenTelemetry context + ctx = contextvars.copy_context() + current_otel_context = otel_context.get_current() + + # Thread tracking for proper span lifecycle + active_threads = [] + + # Store the original Thread.__init__ and start methods + original_thread_init = threading.Thread.__init__ + original_thread_start = threading.Thread.start + + def context_aware_init(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): + """Modified Thread.__init__ that wraps the target to run in both captured contexts.""" + if kwargs is None: + kwargs = {} + if target and callable(target): + original_target = target + + def wrapped_target(*target_args, **target_kwargs): + # Run in both contextvars AND OpenTelemetry context + def run_with_otel_context(): + # Attach the OpenTelemetry context in the thread + token = otel_context.attach(current_otel_context) + try: + return original_target(*target_args, **target_kwargs) + finally: + otel_context.detach(token) + + # Run with contextvars context + return ctx.run(run_with_otel_context) + + target = wrapped_target + + # Keep original daemon setting but ensure conversations don't run indefinitely + # If daemon was not explicitly set, default to False (AG2's normal behavior) + if daemon is None: + daemon = False + + original_thread_init( + self, group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon + ) + + def context_aware_start(self): + """Modified Thread.start that tracks the thread.""" + active_threads.append(self) + return original_thread_start(self) + + # Temporarily patch Thread.__init__ and start just for this run() call + threading.Thread.__init__ = context_aware_init + threading.Thread.start = context_aware_start + try: + response = wrapped(*args, **kwargs) + except Exception as e: + logger.error(f"[AG2 DEBUG] Error in agent.run execution: {e}") + raise + finally: + # Always restore the original Thread methods + try: + threading.Thread.__init__ = original_thread_init + threading.Thread.start = original_thread_start + except Exception as e: + logger.error(f"[AG2 DEBUG] Error restoring Thread methods: {e}") + # Force restore + threading.Thread.__init__ = ( + threading.Thread.__init__.__wrapped__ + if hasattr(threading.Thread.__init__, "__wrapped__") + else original_thread_init + ) + threading.Thread.start = ( + threading.Thread.start.__wrapped__ + if hasattr(threading.Thread.start, "__wrapped__") + else original_thread_start + ) - if hasattr(response, "chat_history"): - self._capture_conversation_summary(span, instance, response) - elif hasattr(response, "get") and callable(response.get): - model_info = response.get("model", "") - if model_info: - span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, model_info) + # Try to get final results from response if available + try: + if hasattr(response, "get_chat_results"): + chat_results = response.get_chat_results() + if chat_results: + self._capture_conversation_summary(span, instance, chat_results) + elif hasattr(response, "chat_history"): + self._capture_conversation_summary(span, instance, response) + elif hasattr(response, "get") and callable(response.get): + model_info = response.get("model", "") + if model_info: + span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, str(model_info)) + except Exception as e: + logger.debug(f"[AG2 DEBUG] Could not extract final results: {e}") span.set_attribute(WorkflowAttributes.WORKFLOW_STEP_STATUS, "completed") return response @@ -231,7 +644,7 @@ def _group_chat_run_wrapper(self, tracer): def wrapper(wrapped, instance, args, kwargs): with create_span( - tracer, "ag2.groupchat.run", kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager + tracer, "ag2.groupchat.run.task", kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager ) as span: group_chat = getattr(instance, "groupchat", None) agents = getattr(group_chat, "agents", []) if group_chat else [] @@ -239,18 +652,17 @@ def wrapper(wrapped, instance, args, kwargs): span.set_attribute(AgentAttributes.AGENT_ROLE, "GroupChatManager") span.set_attribute(AgentAttributes.AGENT_NAME, getattr(instance, "name", "unnamed_manager")) - span.set_attribute("ag2.groupchat.agents", ", ".join(agent_names)) - span.set_attribute("ag2.groupchat.agent_count", len(agents)) + span.set_attribute("groupchat.agents", ", ".join(agent_names)) + span.set_attribute("groupchat.agent_count", len(agents)) # Capture input message if available message = kwargs.get("message", "") if message: content_to_set = self._extract_message_content(message) - span.set_attribute("ag2.groupchat.input_message", content_to_set) + span.set_attribute("groupchat.input_message", content_to_set) result = wrapped(*args, **kwargs) self._capture_group_chat_summary(span, instance, result) - return result return wrapper @@ -259,14 +671,17 @@ def _tool_execution_wrapper(self, tracer, tool_type): """Wrapper for capturing tool execution.""" def wrapper(wrapped, instance, args, kwargs): - span_name = f"ag2.tool.{tool_type}" + span_name = f"ag2.tool.{tool_type}.tool_usage" with create_span( - tracer, span_name, kind=SpanKind.INTERNAL, attribute_manager=self._attribute_manager + tracer, span_name, kind=SpanKind.CLIENT, attribute_manager=self._attribute_manager ) as span: + # Set span kind and type as tool for proper categorization + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, AgentOpsSpanKindValues.TOOL.value) agent_name = getattr(instance, "name", "unnamed_agent") span.set_attribute(AgentAttributes.AGENT_NAME, agent_name) span.set_attribute(ToolAttributes.TOOL_NAME, tool_type) + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "tool") if tool_type == "function" and args: func_call = args[0] @@ -286,8 +701,8 @@ def wrapper(wrapped, instance, args, kwargs): elif tool_type == "code" and args: code = args[0] if isinstance(code, str): - span.set_attribute("ag2.tool.code.size", len(code)) - span.set_attribute("ag2.tool.code.language", kwargs.get("lang", "unknown")) + span.set_attribute("tool.code.size", len(code)) + span.set_attribute("tool.code.language", kwargs.get("lang", "unknown")) result = wrapped(*args, **kwargs) @@ -337,7 +752,7 @@ def wrapper(wrapped, instance, args, kwargs): span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, meta["model"]) break - span.set_attribute("ag2.groupchat.role", "participant") + span.set_attribute("groupchat.role", "participant") return selected_speaker @@ -360,56 +775,33 @@ def _extract_message_content(self, message): else: return str(message) - def _extract_agent_attributes(self, span, initiator, recipient): - """Extract and set agent attributes on span.""" - # Extract system message from both agents - initiator_system_msg = getattr(initiator, "system_message", "") - if initiator_system_msg: - initiator_system_msg = self._safe_str(initiator_system_msg) - span.set_attribute("ag2.initiator.system_message", initiator_system_msg) - - recipient_system_msg = getattr(recipient, "system_message", "") - if recipient_system_msg: - recipient_system_msg = self._safe_str(recipient_system_msg) - span.set_attribute(SpanAttributes.LLM_REQUEST_SYSTEM_INSTRUCTION, recipient_system_msg) - - # Extract LLM config from both agents - initiator_llm_config = getattr(initiator, "llm_config", {}) - if isinstance(initiator_llm_config, dict) and initiator_llm_config: - model = initiator_llm_config.get("model", "unknown") - span.set_attribute("ag2.initiator.model", model) - - recipient_llm_config = getattr(recipient, "llm_config", {}) - self._set_llm_config_attributes(span, recipient_llm_config) - def _extract_chat_history(self, span, initiator, recipient): """Extract chat history information.""" try: - # Get initiator chat history - initiator_chat_history = getattr(initiator, "chat_history", []) - if initiator_chat_history: - span.set_attribute("ag2.initiator.message_count", len(initiator_chat_history)) - # Get recipient chat history recipient_chat_history = getattr(recipient, "chat_history", []) + if recipient_chat_history: message_count = len(recipient_chat_history) - span.set_attribute("ag2.conversation.message_count", message_count) + span.set_attribute("conversation.message_count", message_count) # Record sample of conversation messages if message_count > 0: - self._set_message_attributes(span, recipient_chat_history[0], 0, "prompt") - self._set_message_attributes(span, recipient_chat_history[-1], 0, "completion") + first_msg = recipient_chat_history[0] + last_msg = recipient_chat_history[-1] + + self._set_message_attributes(span, first_msg, 0, "prompt") + self._set_message_attributes(span, last_msg, 0, "completion") # Check for tool usage - last_msg = recipient_chat_history[-1] - span.set_attribute("ag2.chat.used_tools", "tool_calls" in last_msg) + span.set_attribute("chat.used_tools", "tool_calls" in last_msg) # Capture metadata if "metadata" in last_msg and isinstance(last_msg["metadata"], dict): meta = last_msg["metadata"] if "model" in meta: span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, meta["model"]) + except Exception as e: logger.debug(f"Could not extract chat history: {e}") @@ -444,11 +836,11 @@ def _process_tool_result(self, span, result, tool_type): if len(result) > 1 and result[1]: stdout = self._safe_str(result[1]) - span.set_attribute("ag2.tool.code.stdout", stdout) + span.set_attribute("tool.code.stdout", stdout) if len(result) > 2 and result[2]: stderr = self._safe_str(result[2]) - span.set_attribute("ag2.tool.code.stderr", stderr) + span.set_attribute("tool.code.stderr", stderr) def _capture_conversation_summary(self, span, agent, response): """Extract and record conversation summary data.""" @@ -458,18 +850,16 @@ def _capture_conversation_summary(self, span, agent, response): try: chat_history = getattr(response, "chat_history", []) message_count = len(chat_history) - user_messages = sum(1 for msg in chat_history if msg.get("role") == "user") assistant_messages = sum(1 for msg in chat_history if msg.get("role") == "assistant") - span.set_attribute("ag2.conversation.message_count", message_count) - span.set_attribute("ag2.conversation.user_messages", user_messages) - span.set_attribute("ag2.conversation.assistant_messages", assistant_messages) + span.set_attribute("conversation.message_count", message_count) + span.set_attribute("conversation.user_messages", user_messages) + span.set_attribute("conversation.assistant_messages", assistant_messages) # Set prompts and completions span.set_attribute(SpanAttributes.LLM_PROMPTS, user_messages) span.set_attribute(SpanAttributes.LLM_COMPLETIONS, assistant_messages) - if message_count > 0: for i, msg in enumerate(chat_history[: min(2, message_count)]): self._set_message_attributes(span, msg, i, "prompt") @@ -477,7 +867,7 @@ def _capture_conversation_summary(self, span, agent, response): if message_count > 2: self._set_message_attributes(span, chat_history[-1], 0, "completion") except Exception as e: - logger.error(f"Error capturing conversation summary: {e}") + logger.error(f"[AG2 DEBUG] Error capturing conversation summary: {e}") def _capture_group_chat_summary(self, span, manager, result): """Extract and record group chat summary data.""" @@ -492,14 +882,14 @@ def _capture_group_chat_summary(self, span, manager, result): agent_message_counts[agent_name] = 0 agent_message_counts[agent_name] += 1 - span.set_attribute("ag2.conversation.message_count", message_count) + span.set_attribute("conversation.message_count", message_count) for agent_name, count in agent_message_counts.items(): - span.set_attribute(f"ag2.conversation.agent_messages.{agent_name}", count) + span.set_attribute(f"conversation.agent_messages.{agent_name}", count) if hasattr(manager.groupchat, "speaker_selection_method"): span.set_attribute( - "ag2.groupchat.speaker_selection_method", str(manager.groupchat.speaker_selection_method) + "groupchat.speaker_selection_method", str(manager.groupchat.speaker_selection_method) ) if message_count > 0: diff --git a/docs/v2/examples/ag2.mdx b/docs/v2/examples/ag2.mdx index cd28800ea..cb6ccfc65 100644 --- a/docs/v2/examples/ag2.mdx +++ b/docs/v2/examples/ag2.mdx @@ -1,176 +1,185 @@ --- -title: 'AG2 with Mem0 Example' -description: 'Observe an AG2 Agent with memory powered by Mem0 using AgentOps' +title: 'AG2' +description: 'AG2 Async Agent Chat' --- -{/* SOURCE_FILE: examples/ag2/agentchat_with_memory.ipynb */} +{/* SOURCE_FILE: examples/ag2/async_human_input.ipynb */} -_View Notebook on Github_ +_View Notebook on Github_ -# Observe an Agent with memory powered by Mem0 +# AG2 Async Agent Chat with Automated Responses -This notebook demonstrates an intelligent customer service chatbot system that combines: +This notebook demonstrates how to leverage asynchronous programming with AG2 agents +to create automated conversations between AI agents, eliminating the need for human +input while maintaining full traceability. -- AG2 for conversational agents -- Mem0 for memory management +# Overview +This notebook demonstrates a practical example of automated AI-to-AI communication where we: -[Mem0](https://www.mem0.ai/) provides a smart, self-improving memory layer for Large Language Models (LLMs), enabling developers to create personalized AI experiences that evolve with each user interaction. Refer [docs](https://docs.mem0.ai/overview) for more information. +1. Initialize AG2 agents with OpenAI's GPT-4o-mini model +2. Create custom async agents that simulate human-like responses and processing delays +3. Automate the entire conversation flow without requiring manual intervention +4. Track all interactions using AgentOps for monitoring and analysis -The implementation showcases how to initialize agents, manage conversation memory, and facilitate multi-agent conversations for enhanced problem-solving in customer support scenarios. - -With AgentOps, you can observe the agent's memory and interactions in real-time, providing insights into how the agent learns and adapts over time. - -## Pre-requisites -- AgentOps API key from [AgentOps](https://app.agentops.ai/). -- Mem0 API key from [Mem0 Platform](https://app.mem0.ai/). -- OpenAI API key from [OpenAI](https://platform.openai.com/). +By using async operations and automated responses, you can create fully autonomous +agent conversations that simulate real-world scenarios. This is particularly useful +for testing, prototyping, and creating demos where you want to showcase agent +capabilities without manual input. ## Installation - -Install required dependencies: ```bash pip - pip install agentops "ag2[openai]" mem0ai python-dotenv + pip install ag2 agentops nest-asyncio ``` ```bash poetry - poetry add agentops ag2 mem0ai python-dotenv - # Note: For ag2[openai] with poetry, you might need to specify openai as an extra or directly. - # poetry add ag2 -E openai + poetry add ag2 agentops nest-asyncio ``` ```bash uv - uv add agentops "ag2[openai]" mem0ai python-dotenv + uv add ag2 agentops nest-asyncio ``` -## Setup - -```python +``` +import asyncio +from typing import Dict, Optional, Union import os from dotenv import load_dotenv +import nest_asyncio import agentops -from mem0 import MemoryClient -from autogen import ConversableAgent +from autogen import AssistantAgent +from autogen.agentchat.user_proxy_agent import UserProxyAgent +``` + +``` +# Load environment variables for API keys load_dotenv() -os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_agentops_api_key_here") +os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_api_key_here") os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here") -os.environ["MEM0_API_KEY"] = os.getenv("MEM0_API_KEY", "your_mem0_api_key_here") +# Initialize AgentOps for tracking and monitoring +agentops.init(auto_start_session=False, trace_name="AG2 Async Demo") +tracer = agentops.start_trace(trace_name="AG2 Async Agent Demo", tags=["ag2-async-demo", "agentops-example"]) ``` -## Initialize Agent and Memory - -The conversational agent is set up using the 'gpt-4o' model and a mem0 client. We'll utilize the client's methods for storing and accessing memories. - -```python -agentops.init(auto_start_session=False) -tracer = agentops.start_trace(trace_name="AG2 Agent using Mem0", tags=["ag2-mem0-example", "agentops-example"]) -agent = ConversableAgent( - "chatbot", - llm_config={"config_list": [{"model": "gpt-4o", "api_key": os.environ.get("OPENAI_API_KEY")}]}, - code_execution_config=False, - function_map=None, - human_input_mode="NEVER", -) - -memory = MemoryClient() ``` - -Initialize a conversation history for a Best Buy customer service chatbot. It contains a list of message exchanges between the user and the assistant, structured as dictionaries with 'role' and 'content' keys. The entire conversation is then stored in memory using the `memory.add()` method, associated with the identifier "customer_service_bot". - -```python -conversation = [ - { - "role": "assistant", - "content": "Hi, I'm Best Buy's chatbot!\n\nThanks for being a My Best Buy TotalTM member.\n\nWhat can I help you with?", - }, - { - "role": "user", - "content": 'Seeing horizontal lines on our tv. TV model: Sony - 77" Class BRAVIA XR A80K OLED 4K UHD Smart Google TV', - }, - { - "role": "assistant", - "content": "Thanks for being a My Best Buy Totalβ„’ member. I can connect you to an expert immediately - just one perk of your membership!\n\nSelect the button below when you're ready to chat.", - }, - { - "role": "assistant", - "content": "Good evening, thank you for choosing Best Buy, Fnu. My name is Lovely. I hope you are doing well. I'm sorry to hear that you're seeing horizontal lines on your TV.\n\nI'm absolutely committed to exploring all possible ways to assist you to fix this issue.\n\nTo ensure that we are on the right account, may I please have your email address registered with your Best Buy account?", - }, - {"role": "user", "content": "dd@gmail.com"}, - { - "role": "assistant", - "content": "Perfect! Thank you for providing all the details, surely you have made my job easier by doing this. I really appreciate it.\n\nI also want to take a moment to express our heartfelt appreciation for your trust and loyalty. Thank you for being an amazing customer of BestBuy Total.\n\nCould you please help me with the order number or product's details to check it quickly?\n\nSamsung - 49\\" Odyssey OLED G9 (G95SC) DQHD 240Hz 0.03ms G-Sync Compatible Curved Smart Gaming Monitor - Silver - just to confirm this is the item, right?", - }, - {"role": "user", "content": "Order number: 112217629"}, - { - "role": "assistant", - "content": "Superb! Thank you for confirmation.\n\nThank you for your patience. After exploring all possible solutions, I can help you to arrange a home repair appointment for your device. Our Geek Squad experts will visit your home to inspect and fix your device.\n\nIt's great that you have a protection plan - rest assured, we've got your back! As a valued Total member, you can avail this service at a minimal service fee. This fee, applicable to all repairs, covers the cost of diagnosing the issue and any small parts needed for the repair. It's part of our 24-month free protection plan.\n\nPlease click here to review the service fee and plan coverage details -\n\nhttps://www.bestbuy.com/site/best-buy-membership/best-buy-protection/pcmcat1608643232014.c?id=pcmcat1608643232014#jl-servicefees\n\nFnu - just to confirm shall I proceed to schedule the appointment?", - }, - {"role": "user", "content": "Yes please"}, - {"role": "assistant", "content": "When should I schedule the appointment?"}, - {"role": "user", "content": "Schedule it for tomorrow please"}, -] - -memory.add(messages=conversation, user_id="customer_service_bot") +# Define an asynchronous function that simulates async processing +async def simulate_async_processing(task_name: str, delay: float = 1.0) -> str: + """ + Simulate some asynchronous processing (e.g., API calls, file operations, etc.) + """ + print(f"πŸ”„ Starting async task: {task_name}") + await asyncio.sleep(delay) # Simulate async work + print(f"βœ… Completed async task: {task_name}") + return f"Processed: {task_name}" ``` -## Agent Inference - -We ask a question to the agent, utilizing mem0 to retrieve relevant memories. The agent then formulates a response based on both the question and the retrieved contextual information. -```python -data = "I forgot the order number, can you quickly tell me?" - -relevant_memories = memory.search(data, user_id="customer_service_bot") -flatten_relevant_memories = "\n".join([m["memory"] for m in relevant_memories]) - -prompt = f\"\"\"Answer the user question considering the memories. Keep answers clear and concise. -Memories: -{flatten_relevant_memories} -\n\n -Question: {data} -\"\"\" - -reply = agent.generate_reply(messages=[{"content": prompt, "role": "user"}]) -print(reply) +``` +# Define a custom UserProxyAgent that simulates automated user responses +class AutomatedUserProxyAgent(UserProxyAgent): + def __init__(self, name: str, **kwargs): + super().__init__(name, **kwargs) + self.response_count = 0 + self.predefined_responses = [ + "Yes, please generate interview questions for these topics.", + "The questions look good. Can you make them more specific to senior-level positions?", + "Perfect! These questions are exactly what we need. Thank you!", + ] + + async def a_get_human_input(self, prompt: str) -> str: + # Simulate async processing before responding + await simulate_async_processing(f"Processing user input #{self.response_count + 1}") + + if self.response_count < len(self.predefined_responses): + response = self.predefined_responses[self.response_count] + self.response_count += 1 + print(f"πŸ‘€ User: {response}") + return response + else: + print("πŸ‘€ User: TERMINATE") + return "TERMINATE" + + async def a_receive( + self, + message: Union[Dict, str], + sender, + request_reply: Optional[bool] = None, + silent: Optional[bool] = False, + ): + await super().a_receive(message, sender, request_reply, silent) ``` -## Multi Agent Conversation - -Initialize two AI agents: a "manager" for resolving customer issues and a "customer_bot" for gathering information on customer problems, both using GPT-4. It then retrieves relevant memories for a given question, combining them with the question into a prompt. This prompt can be used by either the manager or customer_bot to generate a contextually informed response. - -```python -manager = ConversableAgent( - "manager", - system_message="You are a manager who helps in resolving customer issues.", - llm_config={"config_list": [{"model": "gpt-4o-mini", "api_key": os.environ.get("OPENAI_API_KEY")}]}, - human_input_mode="NEVER", -) - -customer_bot = ConversableAgent( - "customer_bot", - system_message="You are a customer service bot who gathers information on issues customers are facing. Keep answers clear and concise.", - llm_config={"config_list": [{"model": "gpt-4", "api_key": os.environ.get("OPENAI_API_KEY")}]}, - human_input_mode="NEVER", -) -data = "When is the appointment?" +``` +# Define an AssistantAgent that simulates async processing before responding +class AsyncAssistantAgent(AssistantAgent): + async def a_receive( + self, + message: Union[Dict, str], + sender, + request_reply: Optional[bool] = None, + silent: Optional[bool] = False, + ): + # Simulate async processing before responding + await simulate_async_processing("Analyzing request and preparing response", 0.5) + await super().a_receive(message, sender, request_reply, silent) +``` -relevant_memories = memory.search(data, user_id="customer_service_bot") -flatten_relevant_memories = "\n".join([m["memory"] for m in relevant_memories]) -prompt = f\"\"\" -Context: -{flatten_relevant_memories} -\n\n -Question: {data} -\"\"\" +``` +async def main(): + print("πŸš€ Starting AG2 Async Demo") + + # Create agents with automated behavior + user_proxy = AutomatedUserProxyAgent( + name="hiring_manager", + human_input_mode="NEVER", # No human input required + max_consecutive_auto_reply=3, + code_execution_config=False, + is_termination_msg=lambda msg: "TERMINATE" in str(msg.get("content", "")), + ) + + assistant = AsyncAssistantAgent( + name="interview_consultant", + system_message="""You are an expert interview consultant. When given interview topics, + you create thoughtful, relevant questions. You ask for feedback and incorporate it. + When the user is satisfied with the questions, end with 'TERMINATE'.""", + llm_config={"config_list": [{"model": "gpt-4o-mini", "api_key": os.environ.get("OPENAI_API_KEY")}]}, + is_termination_msg=lambda msg: "TERMINATE" in str(msg.get("content", "")), + ) + + try: + print("πŸ€– Initiating automated conversation...") + # Start the automated chat between the user and assistant + await user_proxy.a_initiate_chat( + assistant, + message="""I need help creating interview questions for these topics: + - Resume Review + - Technical Skills Assessment + - Project Discussion + - Job Role Expectations + - Closing Remarks + + Please create 2-3 questions for each topic.""", + max_turns=6, + ) + except Exception as e: + print(f"\n❌ Error occurred: {e}") + finally: + agentops.end_trace(tracer, end_state="Success") + + print("\nπŸŽ‰ Demo completed successfully!") +``` -result = manager.send(prompt, customer_bot, request_reply=True) -agentops.end_trace(tracer, end_state="Success") ``` +# Run the main async demo +nest_asyncio.apply() +asyncio.run(main()) +``` + - + \ No newline at end of file diff --git a/docs/v2/integrations/ag2.mdx b/docs/v2/integrations/ag2.mdx index 8eea55ac6..0e78d3feb 100755 --- a/docs/v2/integrations/ag2.mdx +++ b/docs/v2/integrations/ag2.mdx @@ -97,82 +97,15 @@ user_proxy.initiate_chat( message="How can I implement a basic web scraper in Python?" ) ``` - -```python Multi-Agent Conversation -import os -import agentops -import autogen - -# Initialize AgentOps -agentops.init() - -# Configure your AG2 agents -config_list = [ - { - "model": "gpt-4", - "api_key": os.getenv("OPENAI_API_KEY"), - } -] - -llm_config = { - "config_list": config_list, - "timeout": 60, -} - -# Create a team of agents -researcher = autogen.AssistantAgent( - name="researcher", - llm_config=llm_config, - system_message="You are a researcher who specializes in finding accurate information." -) - -coder = autogen.AssistantAgent( - name="coder", - llm_config=llm_config, - system_message="You are an expert programmer who writes clean, efficient code." -) - -critic = autogen.AssistantAgent( - name="critic", - llm_config=llm_config, - system_message="You review solutions and provide constructive feedback." -) - -user_proxy = autogen.UserProxyAgent( - name="user_proxy", - human_input_mode="TERMINATE", - max_consecutive_auto_reply=10, - is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"), - code_execution_config={"last_n_messages": 3, "work_dir": "coding"}, -) - -# Create a group chat -groupchat = autogen.GroupChat( - agents=[user_proxy, researcher, coder, critic], - messages=[], - max_round=12 -) - -manager = autogen.GroupChatManager( - groupchat=groupchat, - llm_config=llm_config -) - -# Initiate the group chat -user_proxy.initiate_chat( - manager, - message="Create a Python program to analyze sentiment from Twitter data." -) -``` ## Examples - - Multi-agent conversation with memory capabilities + + AG2 Async Agent Chat with Automated Responses - + Demonstrates asynchronous human input with AG2 agents. diff --git a/examples/ag2/ag2_async_agent.ipynb b/examples/ag2/ag2_async_agent.ipynb new file mode 100755 index 000000000..17d6f6982 --- /dev/null +++ b/examples/ag2/ag2_async_agent.ipynb @@ -0,0 +1,236 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1ad612e0", + "metadata": {}, + "source": [ + "AG2 Async Agent Chat with Automated Responses\n", + "\n", + "This notebook demonstrates how to leverage asynchronous programming with AG2 agents \n", + "to create automated conversations between AI agents, eliminating the need for human \n", + "input while maintaining full traceability.\n", + "\n", + "Overview\n", + "This notebook demonstrates a practical example of automated AI-to-AI communication where we:\n", + "\n", + "1. Initialize AG2 agents with OpenAI's GPT-4o-mini model\n", + "2. Create custom async agents that simulate human-like responses and processing delays\n", + "3. Automate the entire conversation flow without requiring manual intervention\n", + "4. Track all interactions using AgentOps for monitoring and analysis\n", + "\n", + "By using async operations and automated responses, you can create fully autonomous \n", + "agent conversations that simulate real-world scenarios. This is particularly useful \n", + "for testing, prototyping, and creating demos where you want to showcase agent \n", + "capabilities without manual input." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "361b3cf5", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install agentops\n", + "%pip install ag2\n", + "%pip install nest-asyncio" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9962270b", + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "from typing import Dict, Optional, Union\n", + "import os\n", + "from dotenv import load_dotenv\n", + "import nest_asyncio\n", + "import agentops\n", + "from autogen import AssistantAgent\n", + "from autogen.agentchat.user_proxy_agent import UserProxyAgent" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "60e84ffb", + "metadata": { + "lines_to_next_cell": 1 + }, + "outputs": [], + "source": [ + "# Load environment variables for API keys\n", + "load_dotenv()\n", + "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_api_key_here\")\n", + "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")\n", + "# Initialize AgentOps for tracking and monitoring\n", + "agentops.init(auto_start_session=False, trace_name=\"AG2 Async Demo\")\n", + "tracer = agentops.start_trace(trace_name=\"AG2 Async Agent Demo\", tags=[\"ag2-async-demo\", \"agentops-example\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8c1dc105", + "metadata": { + "lines_to_next_cell": 1 + }, + "outputs": [], + "source": [ + "# Define an asynchronous function that simulates async processing \n", + "async def simulate_async_processing(task_name: str, delay: float = 1.0) -> str:\n", + " \"\"\"\n", + " Simulate some asynchronous processing (e.g., API calls, file operations, etc.)\n", + " \"\"\"\n", + " print(f\"πŸ”„ Starting async task: {task_name}\")\n", + " await asyncio.sleep(delay) # Simulate async work\n", + " print(f\"βœ… Completed async task: {task_name}\")\n", + " return f\"Processed: {task_name}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d683b3d", + "metadata": { + "lines_to_next_cell": 1 + }, + "outputs": [], + "source": [ + "# Define a custom UserProxyAgent that simulates automated user responses\n", + "class AutomatedUserProxyAgent(UserProxyAgent):\n", + " def __init__(self, name: str, **kwargs):\n", + " super().__init__(name, **kwargs)\n", + " self.response_count = 0\n", + " self.predefined_responses = [\n", + " \"Yes, please generate interview questions for these topics.\",\n", + " \"The questions look good. Can you make them more specific to senior-level positions?\",\n", + " \"Perfect! These questions are exactly what we need. Thank you!\",\n", + " ]\n", + "\n", + " async def a_get_human_input(self, prompt: str) -> str:\n", + " # Simulate async processing before responding\n", + " await simulate_async_processing(f\"Processing user input #{self.response_count + 1}\")\n", + "\n", + " if self.response_count < len(self.predefined_responses):\n", + " response = self.predefined_responses[self.response_count]\n", + " self.response_count += 1\n", + " print(f\"πŸ‘€ User: {response}\")\n", + " return response\n", + " else:\n", + " print(\"πŸ‘€ User: TERMINATE\")\n", + " return \"TERMINATE\"\n", + "\n", + " async def a_receive(\n", + " self,\n", + " message: Union[Dict, str],\n", + " sender,\n", + " request_reply: Optional[bool] = None,\n", + " silent: Optional[bool] = False,\n", + " ):\n", + " await super().a_receive(message, sender, request_reply, silent)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b792d207", + "metadata": {}, + "outputs": [], + "source": [ + "# Define an AssistantAgent that simulates async processing before responding\n", + "class AsyncAssistantAgent(AssistantAgent):\n", + " async def a_receive(\n", + " self,\n", + " message: Union[Dict, str],\n", + " sender,\n", + " request_reply: Optional[bool] = None,\n", + " silent: Optional[bool] = False,\n", + " ):\n", + " # Simulate async processing before responding\n", + " await simulate_async_processing(\"Analyzing request and preparing response\", 0.5)\n", + " await super().a_receive(message, sender, request_reply, silent)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7f8c6c50", + "metadata": { + "lines_to_next_cell": 1 + }, + "outputs": [], + "source": [ + "async def main():\n", + " print(\"πŸš€ Starting AG2 Async Demo\")\n", + "\n", + " # Create agents with automated behavior\n", + " user_proxy = AutomatedUserProxyAgent(\n", + " name=\"hiring_manager\",\n", + " human_input_mode=\"NEVER\", # No human input required\n", + " max_consecutive_auto_reply=3,\n", + " code_execution_config=False,\n", + " is_termination_msg=lambda msg: \"TERMINATE\" in str(msg.get(\"content\", \"\")),\n", + " )\n", + "\n", + " assistant = AsyncAssistantAgent(\n", + " name=\"interview_consultant\",\n", + " system_message=\"\"\"You are an expert interview consultant. When given interview topics, \n", + " you create thoughtful, relevant questions. You ask for feedback and incorporate it.\n", + " When the user is satisfied with the questions, end with 'TERMINATE'.\"\"\",\n", + " llm_config={\"config_list\": [{\"model\": \"gpt-4o-mini\", \"api_key\": os.environ.get(\"OPENAI_API_KEY\")}]},\n", + " is_termination_msg=lambda msg: \"TERMINATE\" in str(msg.get(\"content\", \"\")),\n", + " )\n", + "\n", + " try:\n", + " print(\"πŸ€– Initiating automated conversation...\")\n", + " # Start the automated chat between the user and assistant\n", + " await user_proxy.a_initiate_chat(\n", + " assistant,\n", + " message=\"\"\"I need help creating interview questions for these topics:\n", + " - Resume Review\n", + " - Technical Skills Assessment \n", + " - Project Discussion\n", + " - Job Role Expectations\n", + " - Closing Remarks\n", + " \n", + " Please create 2-3 questions for each topic.\"\"\",\n", + " max_turns=6,\n", + " )\n", + " except Exception as e:\n", + " print(f\"\\n❌ Error occurred: {e}\")\n", + " finally:\n", + " agentops.end_trace(tracer, end_state=\"Success\")\n", + "\n", + " print(\"\\nπŸŽ‰ Demo completed successfully!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4520d6f", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "# Run the main async demo\n", + "nest_asyncio.apply()\n", + "asyncio.run(main())" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all", + "main_language": "python", + "notebook_metadata_filter": "-all" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/ag2/async_human_input.py b/examples/ag2/ag2_async_agent.py similarity index 71% rename from examples/ag2/async_human_input.py rename to examples/ag2/ag2_async_agent.py index f07ced613..9f05ac205 100644 --- a/examples/ag2/async_human_input.py +++ b/examples/ag2/ag2_async_agent.py @@ -1,36 +1,44 @@ -# Agent Chat with Async Operations +# AG2 Async Agent Chat with Automated Responses # -# We are going to create agents that can perform asynchronous operations and chat with each other. -# This example demonstrates async capabilities without requiring human input. +# This notebook demonstrates how to leverage asynchronous programming with AG2 agents +# to create automated conversations between AI agents, eliminating the need for human +# input while maintaining full traceability. # -# We are going to use AgentOps to monitor the agent's performance and observe their interactions. -# # Install required dependencies +# Overview +# This notebook demonstrates a practical example of automated AI-to-AI communication where we: +# +# 1. Initialize AG2 agents with OpenAI's GPT-4o-mini model +# 2. Create custom async agents that simulate human-like responses and processing delays +# 3. Automate the entire conversation flow without requiring manual intervention +# 4. Track all interactions using AgentOps for monitoring and analysis +# +# By using async operations and automated responses, you can create fully autonomous +# agent conversations that simulate real-world scenarios. This is particularly useful +# for testing, prototyping, and creating demos where you want to showcase agent +# capabilities without manual input. + # %pip install agentops # %pip install ag2 -# %pip install chromadb -# %pip install sentence_transformers -# %pip install tiktoken -# %pip install pypdf # %pip install nest-asyncio + import asyncio from typing import Dict, Optional, Union import os from dotenv import load_dotenv - import nest_asyncio import agentops from autogen import AssistantAgent from autogen.agentchat.user_proxy_agent import UserProxyAgent +# Load environment variables for API keys load_dotenv() os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_api_key_here") os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here") - +# Initialize AgentOps for tracking and monitoring agentops.init(auto_start_session=False, trace_name="AG2 Async Demo") tracer = agentops.start_trace(trace_name="AG2 Async Agent Demo", tags=["ag2-async-demo", "agentops-example"]) - -# Define an asynchronous function that simulates async processing +# Define an asynchronous function that simulates async processing async def simulate_async_processing(task_name: str, delay: float = 1.0) -> str: """ Simulate some asynchronous processing (e.g., API calls, file operations, etc.) @@ -40,8 +48,7 @@ async def simulate_async_processing(task_name: str, delay: float = 1.0) -> str: print(f"βœ… Completed async task: {task_name}") return f"Processed: {task_name}" - -# Define a custom UserProxyAgent that simulates automated responses +# Define a custom UserProxyAgent that simulates automated user responses class AutomatedUserProxyAgent(UserProxyAgent): def __init__(self, name: str, **kwargs): super().__init__(name, **kwargs) @@ -74,7 +81,7 @@ async def a_receive( ): await super().a_receive(message, sender, request_reply, silent) - +# Define an AssistantAgent that simulates async processing before responding class AsyncAssistantAgent(AssistantAgent): async def a_receive( self, @@ -88,12 +95,8 @@ async def a_receive( await super().a_receive(message, sender, request_reply, silent) -nest_asyncio.apply() - - async def main(): print("πŸš€ Starting AG2 Async Demo") - print("=" * 50) # Create agents with automated behavior user_proxy = AutomatedUserProxyAgent( @@ -115,6 +118,7 @@ async def main(): try: print("πŸ€– Initiating automated conversation...") + # Start the automated chat between the user and assistant await user_proxy.a_initiate_chat( assistant, message="""I need help creating interview questions for these topics: @@ -127,23 +131,25 @@ async def main(): Please create 2-3 questions for each topic.""", max_turns=6, ) + + # Let's check programmatically that spans were recorded in AgentOps + print("\n" + "=" * 50) + print("Now let's verify that our LLM calls were tracked properly...") + try: + agentops.validate_trace_spans(trace_context=tracer) + print("\nβœ… Success! All LLM spans were properly recorded in AgentOps.") + except agentops.ValidationError as e: + print(f"\n❌ Error validating spans: {e}") + raise + except Exception as e: print(f"\n❌ Error occurred: {e}") finally: agentops.end_trace(tracer, end_state="Success") - # Validate AgentOps tracking - print("\n" + "=" * 50) - print("πŸ” Validating AgentOps tracking...") - try: - agentops.validate_trace_spans(trace_context=tracer) - print("βœ… Success! All LLM spans were properly recorded in AgentOps.") - except agentops.ValidationError as e: - print(f"❌ Error validating spans: {e}") - raise - print("\nπŸŽ‰ Demo completed successfully!") +# Run the main async demo +nest_asyncio.apply() +asyncio.run(main()) -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/ag2/async_human_input.ipynb b/examples/ag2/async_human_input.ipynb deleted file mode 100755 index 39ea10c3c..000000000 --- a/examples/ag2/async_human_input.ipynb +++ /dev/null @@ -1,223 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Agent Chat with Async Human Inputs\n", - "\n", - "We are going to create an agent that can chat with a human asynchronously. The agent will be able to respond to messages from the human and will also be able to send messages to the human.\n", - "\n", - "We are going to use AgentOps to monitor the agent's performance and observe its interactions with the human." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "tLIs1YRdr8jM", - "outputId": "909c1c70-1a22-4e9d-b7f4-a40e2d737fb0" - }, - "outputs": [], - "source": [ - "# Install required dependencies\n", - "%pip install agentops\n", - "%pip install ag2\n", - "%pip install chromadb\n", - "%pip install sentence_transformers\n", - "%pip install tiktoken\n", - "%pip install pypdf\n", - "%pip install nest-asyncio" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "yU8zQPetsW28" - }, - "outputs": [], - "source": [ - "import asyncio\n", - "from typing import Dict, Optional, Union\n", - "import os\n", - "from dotenv import load_dotenv\n", - "\n", - "import nest_asyncio\n", - "import agentops\n", - "from autogen import AssistantAgent\n", - "from autogen.agentchat.user_proxy_agent import UserProxyAgent" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "load_dotenv()\n", - "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_api_key_here\")\n", - "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "agentops.init(auto_start_session=False)\n", - "tracer = agentops.start_trace(\n", - " trace_name=\"AG2 Agent chat with Async Human Inputs\", tags=[\"ag2-chat-async-human-inputs\", \"agentops-example\"]\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "R2ITRFQisgzI" - }, - "outputs": [], - "source": [ - "# Define an asynchronous function that simulates some asynchronous task (e.g., I/O operation)\n", - "async def my_asynchronous_function():\n", - " print(\"Start asynchronous function\")\n", - " await asyncio.sleep(2) # Simulate some asynchronous task (e.g., I/O operation)\n", - " print(\"End asynchronous function\")\n", - " return \"input\"\n", - "\n", - "\n", - "# Define a custom class `CustomisedUserProxyAgent` that extends `UserProxyAgent`\n", - "class CustomisedUserProxyAgent(UserProxyAgent):\n", - " # Asynchronous function to get human input\n", - " async def a_get_human_input(self, prompt: str) -> str:\n", - " # Call the asynchronous function to get user input asynchronously\n", - " user_input = await my_asynchronous_function()\n", - "\n", - " return user_input\n", - "\n", - " # Asynchronous function to receive a message\n", - " async def a_receive(\n", - " self,\n", - " message: Union[Dict, str],\n", - " sender,\n", - " request_reply: Optional[bool] = None,\n", - " silent: Optional[bool] = False,\n", - " ):\n", - " # Call the superclass method to handle message reception asynchronously\n", - " await super().a_receive(message, sender, request_reply, silent)\n", - "\n", - "\n", - "class CustomisedAssistantAgent(AssistantAgent):\n", - " # Asynchronous function to get human input\n", - " async def a_get_human_input(self, prompt: str) -> str:\n", - " # Call the asynchronous function to get user input asynchronously\n", - " user_input = await my_asynchronous_function()\n", - "\n", - " return user_input\n", - "\n", - " # Asynchronous function to receive a message\n", - " async def a_receive(\n", - " self,\n", - " message: Union[Dict, str],\n", - " sender,\n", - " request_reply: Optional[bool] = None,\n", - " silent: Optional[bool] = False,\n", - " ):\n", - " # Call the superclass method to handle message reception asynchronously\n", - " await super().a_receive(message, sender, request_reply, silent)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "ywFsRNMDteaH", - "outputId": "5716adad-de80-4595-eec9-523b1f1cd313" - }, - "outputs": [], - "source": [ - "nest_asyncio.apply()\n", - "\n", - "\n", - "async def main():\n", - " boss = CustomisedUserProxyAgent(\n", - " name=\"boss\",\n", - " human_input_mode=\"ALWAYS\",\n", - " max_consecutive_auto_reply=0,\n", - " code_execution_config=False,\n", - " )\n", - "\n", - " assistant = CustomisedAssistantAgent(\n", - " name=\"assistant\",\n", - " system_message=\"You will provide some agenda, and I will create questions for an interview meeting. Every time when you generate question then you have to ask user for feedback and if user provides the feedback then you have to incorporate that feedback and generate new set of questions and if user don't want to update then terminate the process and exit\",\n", - " llm_config={\"config_list\": [{\"model\": \"gpt-4o-mini\", \"api_key\": os.environ.get(\"OPENAI_API_KEY\")}]},\n", - " )\n", - "\n", - " await boss.a_initiate_chat(\n", - " assistant,\n", - " message=\"Resume Review, Technical Skills Assessment, Project Discussion, Job Role Expectations, Closing Remarks.\",\n", - " n_results=3,\n", - " )\n", - "\n", - "\n", - "await main()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "agentops.end_trace(tracer, end_state=\"Success\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "colab": { - "provenance": [], - "toc_visible": true - }, - "front_matter": { - "description": "Async human inputs.", - "tags": [ - "async", - "human" - ] - }, - "kernelspec": { - "display_name": "agentops (3.11.11)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.11" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/examples/ag2/groupchat.ipynb b/examples/ag2/groupchat.ipynb new file mode 100644 index 000000000..a69825323 --- /dev/null +++ b/examples/ag2/groupchat.ipynb @@ -0,0 +1,166 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "cdf47b93", + "metadata": {}, + "source": [ + "AG2 Multi-Agent Group Chat Example with AgentOps Integration\n", + "\n", + "This script demonstrates how to orchestrate a group of specialized AI agents collaborating on a task using AG2 and AgentOps.\n", + "\n", + "Overview\n", + "This example shows how to:\n", + "1. Initialize multiple AG2 agents with different roles (researcher, coder, critic, and user proxy)\n", + "2. Set up a group chat where agents interact and collaborate to solve a problem\n", + "3. Simulate a human participant using a user proxy agent\n", + "4. Limit the number of chat rounds and user turns for controlled execution\n", + "5. Track and monitor all agent interactions and LLM calls using AgentOps for full traceability\n", + "\n", + "By using group chat and specialized agents, you can model real-world collaborative workflows, automate complex problem solving, and analyze agent behavior in detail." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "49497415", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install agentops\n", + "%pip install ag2\n", + "%pip install nest-asyncio" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e8f3321", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import agentops\n", + "import autogen" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "017dd674", + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize AgentOps for tracing and monitoring\n", + "agentops.init(auto_start_session=False, trace_name=\"AG2 Group Chat\")\n", + "tracer = agentops.start_trace(trace_name=\"AG2 Group Chat\", tags=[\"ag2-group-chat\", \"agentops-example\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d23930de", + "metadata": {}, + "outputs": [], + "source": [ + "# Configure your AG2 agents with model and API key\n", + "config_list = [\n", + " {\n", + " \"model\": \"gpt-4\",\n", + " \"api_key\": os.getenv(\"OPENAI_API_KEY\"),\n", + " }\n", + "]\n", + "llm_config = {\n", + " \"config_list\": config_list,\n", + " \"timeout\": 60,\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eeee05a1", + "metadata": {}, + "outputs": [], + "source": [ + "# Create a team of agents with specialized roles\n", + "researcher = autogen.AssistantAgent(\n", + " name=\"researcher\",\n", + " llm_config=llm_config,\n", + " system_message=\"You are a researcher who specializes in finding accurate information.\"\n", + ")\n", + "coder = autogen.AssistantAgent(\n", + " name=\"coder\",\n", + " llm_config=llm_config,\n", + " system_message=\"You are an expert programmer who writes clean, efficient code.\"\n", + ")\n", + "critic = autogen.AssistantAgent(\n", + " name=\"critic\",\n", + " llm_config=llm_config,\n", + " system_message=\"You review solutions and provide constructive feedback.\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "abd6720b", + "metadata": {}, + "outputs": [], + "source": [ + "# The user proxy agent simulates a human participant in the chat\n", + "user_proxy = autogen.UserProxyAgent(\n", + " name=\"user_proxy\",\n", + " human_input_mode=\"TERMINATE\", # Stops when a message ends with 'TERMINATE'\n", + " max_consecutive_auto_reply=10, # Limits auto-replies before requiring termination\n", + " is_termination_msg=lambda x: x.get(\"content\", \"\").rstrip().endswith(\"TERMINATE\"),\n", + " code_execution_config={\"last_n_messages\": 3, \"work_dir\": \"coding\"},\n", + ")\n", + "# Create a group chat with all agents and set a maximum number of rounds\n", + "groupchat = autogen.GroupChat(\n", + " agents=[user_proxy, researcher, coder, critic],\n", + " messages=[],\n", + " max_round=4 # Limits the total number of chat rounds\n", + ")\n", + "# The manager coordinates the group chat and LLM configuration\n", + "manager = autogen.GroupChatManager(\n", + " groupchat=groupchat,\n", + " llm_config=llm_config\n", + ")\n", + "# Start the group chat with an initial task and a maximum number of user turns\n", + "user_proxy.initiate_chat(\n", + " manager,\n", + " message=\"Create a Python program to analyze sentiment from Twitter data.\",\n", + " max_turns=2, # Limits the number of user turns\n", + ")\n", + "agentops.end_trace(tracer, end_state=\"Success\")" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all", + "main_language": "python", + "notebook_metadata_filter": "-all" + }, + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/ag2/groupchat.py b/examples/ag2/groupchat.py new file mode 100644 index 000000000..06835805b --- /dev/null +++ b/examples/ag2/groupchat.py @@ -0,0 +1,92 @@ +# AG2 Multi-Agent Group Chat Example with AgentOps Integration +# +# This script demonstrates how to orchestrate a group of specialized AI agents collaborating on a task using AG2 and AgentOps. +# +# Overview +# This example shows how to: +# 1. Initialize multiple AG2 agents with different roles (researcher, coder, critic, and user proxy) +# 2. Set up a group chat where agents interact and collaborate to solve a problem +# 3. Simulate a human participant using a user proxy agent +# 4. Limit the number of chat rounds and user turns for controlled execution +# 5. Track and monitor all agent interactions and LLM calls using AgentOps for full traceability +# +# By using group chat and specialized agents, you can model real-world collaborative workflows, automate complex problem solving, and analyze agent behavior in detail. + +# %pip install agentops +# %pip install ag2 +# %pip install nest-asyncio + +import os +import agentops +import autogen + +# Initialize AgentOps for tracing and monitoring +agentops.init(auto_start_session=False, trace_name="AG2 Group Chat") +tracer = agentops.start_trace(trace_name="AG2 Group Chat", tags=["ag2-group-chat", "agentops-example"]) + +# Configure your AG2 agents with model and API key +config_list = [ + { + "model": "gpt-4", + "api_key": os.getenv("OPENAI_API_KEY"), + } +] +llm_config = { + "config_list": config_list, + "timeout": 60, +} + +# Create a team of agents with specialized roles +researcher = autogen.AssistantAgent( + name="researcher", + llm_config=llm_config, + system_message="You are a researcher who specializes in finding accurate information." +) +coder = autogen.AssistantAgent( + name="coder", + llm_config=llm_config, + system_message="You are an expert programmer who writes clean, efficient code." +) +critic = autogen.AssistantAgent( + name="critic", + llm_config=llm_config, + system_message="You review solutions and provide constructive feedback." +) + +# The user proxy agent simulates a human participant in the chat +user_proxy = autogen.UserProxyAgent( + name="user_proxy", + human_input_mode="TERMINATE", # Stops when a message ends with 'TERMINATE' + max_consecutive_auto_reply=10, # Limits auto-replies before requiring termination + is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"), + code_execution_config={"last_n_messages": 3, "work_dir": "coding"}, +) +# Create a group chat with all agents and set a maximum number of rounds +groupchat = autogen.GroupChat( + agents=[user_proxy, researcher, coder, critic], + messages=[], + max_round=4 # Limits the total number of chat rounds +) +# The manager coordinates the group chat and LLM configuration +manager = autogen.GroupChatManager( + groupchat=groupchat, + llm_config=llm_config +) +# Start the group chat with an initial task and a maximum number of user turns +user_proxy.initiate_chat( + manager, + message="Create a Python program to analyze sentiment from Twitter data.", + max_turns=2, # Limits the number of user turns +) +agentops.end_trace(tracer, end_state="Success") + +# Let's check programmatically that spans were recorded in AgentOps +print("\n" + "=" * 50) +print("Now let's verify that our LLM calls were tracked properly...") +try: + agentops.validate_trace_spans(trace_context=tracer) + print("\nβœ… Success! All LLM spans were properly recorded in AgentOps.") +except agentops.ValidationError as e: + print(f"\n❌ Error validating spans: {e}") + raise +