diff --git a/sdk/python/agentfield/agent.py b/sdk/python/agentfield/agent.py index d0fc2031..49746461 100644 --- a/sdk/python/agentfield/agent.py +++ b/sdk/python/agentfield/agent.py @@ -63,6 +63,11 @@ from pydantic import BaseModel, ValidationError from dataclasses import dataclass, field import weakref +from .agent_schema import _AgentSchemaMixin +from .agent_discovery import _AgentDiscoveryMixin +from .agent_serverless import _AgentServerlessMixin +from .agent_pause import _PauseManager +from .agent_vc import _AgentVCMixin if TYPE_CHECKING: from agentfield.harness._result import HarnessResult @@ -347,76 +352,7 @@ def _resolve_callback_url(callback_url: Optional[str], port: int) -> str: return f"http://localhost:{port}" -class _PauseManager: - """Manages pending execution pause futures resolved via webhook callback. - - Each call to ``Agent.pause()`` registers an ``asyncio.Future`` keyed by - ``approval_request_id``. When the webhook route receives a resolution - callback from the control plane it resolves the matching future, unblocking - the caller. - """ - - def __init__(self) -> None: - self._pending: Dict[str, asyncio.Future] = {} - # Also track execution_id → approval_request_id for fallback resolution - self._exec_to_request: Dict[str, str] = {} - self._lock = asyncio.Lock() - - async def register( - self, approval_request_id: str, execution_id: str = "" - ) -> asyncio.Future: - """Register a new pending pause and return the Future to await.""" - async with self._lock: - if approval_request_id in self._pending: - return self._pending[approval_request_id] - loop = asyncio.get_running_loop() - future = loop.create_future() - self._pending[approval_request_id] = future - if execution_id: - self._exec_to_request[execution_id] = approval_request_id - return future - - async def resolve(self, approval_request_id: str, result: "ApprovalResult") -> bool: - """Resolve a pending pause by approval_request_id. Returns True if a waiter was found.""" - async with self._lock: - future = self._pending.pop(approval_request_id, None) - # Clean up execution mapping - exec_id = None - for eid, rid in self._exec_to_request.items(): - if rid == approval_request_id: - exec_id = eid - break - if exec_id: - self._exec_to_request.pop(exec_id, None) - if future and not future.done(): - future.set_result(result) - return True - return False - - async def resolve_by_execution_id( - self, execution_id: str, result: "ApprovalResult" - ) -> bool: - """Fallback: resolve by execution_id when approval_request_id is not in the callback.""" - async with self._lock: - request_id = self._exec_to_request.pop(execution_id, None) - if request_id: - future = self._pending.pop(request_id, None) - if future and not future.done(): - future.set_result(result) - return True - return False - - async def cancel_all(self) -> None: - """Cancel all pending futures (for shutdown).""" - async with self._lock: - for future in self._pending.values(): - if not future.done(): - future.cancel() - self._pending.clear() - self._exec_to_request.clear() - - -class Agent(FastAPI): +class Agent(FastAPI,_AgentSchemaMixin,_AgentDiscoveryMixin,_AgentServerlessMixin,_AgentVCMixin): """ AgentField Agent - FastAPI subclass for creating AI agent nodes. @@ -792,380 +728,6 @@ def _entry_to_metadata( } return metadata - def _types_to_json_schema(self, input_types: Dict[str, tuple]) -> Dict: - """Convert Python types dict to JSON schema (on-demand generation).""" - properties = {} - required = [] - - for name, (typ, default) in input_types.items(): - properties[name] = self._type_to_json_schema(typ) - if default is ...: # Required field (no default) - required.append(name) - - schema = { - "type": "object", - "properties": properties, - } - if required: - schema["required"] = required - return schema - - def _type_to_json_schema(self, typ: type) -> Dict: - """Convert a Python type to JSON schema.""" - # Handle None/NoneType - if typ is None or typ is type(None): - return {"type": "null"} - - # Handle basic types - type_map = { - str: {"type": "string"}, - int: {"type": "integer"}, - float: {"type": "number"}, - bool: {"type": "boolean"}, - list: {"type": "array"}, - dict: {"type": "object"}, - bytes: {"type": "string", "format": "binary"}, - } - - if typ in type_map: - return type_map[typ] - - # Handle Pydantic models - if hasattr(typ, "model_json_schema"): - return typ.model_json_schema() - - # Handle typing constructs (List, Dict, Optional, etc.) - origin = getattr(typ, "__origin__", None) - if origin is list: - args = getattr(typ, "__args__", (Any,)) - return { - "type": "array", - "items": self._type_to_json_schema(args[0]) if args else {}, - } - if origin is dict: - return {"type": "object", "additionalProperties": True} - if origin is Union: - args = getattr(typ, "__args__", ()) - # Handle Optional (Union with None) - non_none = [a for a in args if a is not type(None)] - if len(non_none) == 1: - return self._type_to_json_schema(non_none[0]) - return {"anyOf": [self._type_to_json_schema(a) for a in args]} - - # Default fallback - return {"type": "object"} - - def _validate_handler_input( - self, data: dict, input_types: Dict[str, tuple] - ) -> dict: - """ - Validate input data against expected types at runtime. - - Replaces Pydantic model validation with lightweight runtime validation. - Saves ~1.5-2 KB per handler by not creating Pydantic classes. - - Args: - data: Raw input dict from request body - input_types: Dict mapping field names to (type, default) tuples - - Returns: - Validated dict with type coercion applied - - Raises: - ValueError: If required field is missing or type conversion fails - """ - result = {} - - for name, (expected_type, default) in input_types.items(): - # Check if field is present - if name not in data: - if default is ...: # Required field (no default) - raise ValueError(f"Missing required field: {name}") - result[name] = default - continue - - value = data[name] - - # Handle None values - if value is None: - # Check if Optional type - origin = getattr(expected_type, "__origin__", None) - if origin is Union: - args = getattr(expected_type, "__args__", ()) - if type(None) in args: - result[name] = None - continue - # Not Optional, use default if available - if default is not ...: - result[name] = default - continue - raise ValueError(f"Field '{name}' cannot be None") - - # Type coercion for basic types - try: - # Get the actual type (unwrap Optional) - actual_type = expected_type - origin = getattr(expected_type, "__origin__", None) - if origin is Union: - args = getattr(expected_type, "__args__", ()) - non_none = [a for a in args if a is not type(None)] - if len(non_none) == 1: - actual_type = non_none[0] - - # Basic type coercion - if actual_type is int: - result[name] = int(value) - elif actual_type is float: - result[name] = float(value) - elif actual_type is str: - result[name] = str(value) - elif actual_type is bool: - if isinstance(value, bool): - result[name] = value - elif isinstance(value, str): - result[name] = value.lower() in ("true", "1", "yes") - else: - result[name] = bool(value) - elif ( - actual_type is dict - or getattr(actual_type, "__origin__", None) is dict - ): - if not isinstance(value, dict): - raise ValueError(f"Field '{name}' must be a dict") - result[name] = dict(value) - elif ( - actual_type is list - or getattr(actual_type, "__origin__", None) is list - ): - if not isinstance(value, list): - raise ValueError(f"Field '{name}' must be a list") - result[name] = list(value) - elif hasattr(actual_type, "model_validate"): - # Pydantic model - use its validation - result[name] = actual_type.model_validate(value) - else: - # Pass through for complex/unknown types - result[name] = value - except (ValueError, TypeError) as e: - raise ValueError(f"Invalid value for field '{name}': {e}") - - return result - - def handle_serverless( - self, event: dict, adapter: Optional[Callable] = None - ) -> dict: - """ - Universal serverless handler for executing reasoners and skills. - - This method enables agents to run in serverless environments (AWS Lambda, - Google Cloud Functions, Cloud Run, Kubernetes Jobs, etc.) by providing - a simple entry point that parses the event, executes the target function, - and returns the result. - - Special Endpoints: - - /discover: Returns agent metadata for AgentField server registration - - /execute: Executes reasoners and skills - - Args: - event (dict): Serverless event containing: - - path: Request path (/discover or /execute) - - action: Alternative to path (discover or execute) - - reasoner: Name of the reasoner to execute (for execution) - - input: Input parameters for the function (for execution) - - Returns: - dict: Execution result with status and output, or discovery metadata - - Example: - ```python - # AWS Lambda handler with API Gateway - from agentfield import Agent - - app = Agent("my_agent", auto_register=False) - - @app.reasoner() - async def analyze(text: str) -> dict: - return {"result": text.upper()} - - def lambda_handler(event, context): - # Handle both discovery and execution - return app.handle_serverless(event) - ``` - """ - import asyncio - - if adapter: - try: - event = adapter(event) or event - except Exception as exc: # pragma: no cover - adapter failures - return { - "statusCode": 400, - "body": {"error": f"serverless adapter failed: {exc}"}, - } - - # Check if this is a discovery request - path = event.get("path") or event.get("rawPath") or "" - action = event.get("action", "") - - if path == "/discover" or path.endswith("/discover") or action == "discover": - # Return agent metadata for AgentField server registration - return self._handle_discovery() - - # Auto-register with AgentField if needed (for execution requests) - if self.auto_register and not self.agentfield_connected: - try: - # Attempt registration (non-blocking) - self.agentfield_handler._register_agent() - self.agentfield_connected = True - except Exception as e: - if self.dev_mode: - log_warn(f"Auto-registration failed: {e}") - - # Serverless invocations arrive via the control plane; mark as connected so - # cross-agent calls can route through the gateway without a lease loop. - self.agentfield_connected = True - # Serverless handlers should avoid async execute polling; force sync path. - if getattr(self.async_config, "enable_async_execution", True): - self.async_config.enable_async_execution = False - - # Parse event format for execution - reasoner_name = ( - event.get("reasoner") or event.get("target") or event.get("skill") - ) - if not reasoner_name and path: - # Support paths like /execute/ or /reasoners/ - cleaned_path = path.split("?", 1)[0].strip("/") - parts = cleaned_path.split("/") - if parts and parts[0] not in ("", "discover"): - if len(parts) >= 2 and parts[0] in ("execute", "reasoners", "skills"): - reasoner_name = parts[1] - elif parts[0] in ("execute", "reasoners", "skills"): - reasoner_name = None - elif parts: - reasoner_name = parts[-1] - - input_data = event.get("input") or event.get("input_data", {}) - execution_context_data = ( - event.get("execution_context") or event.get("executionContext") or {} - ) - - if not reasoner_name: - return { - "statusCode": 400, - "body": {"error": "Missing 'reasoner' or 'target' in event"}, - } - - # Create execution context - exec_id = execution_context_data.get( - "execution_id", f"exec_{int(time.time() * 1000)}" - ) - run_id = execution_context_data.get("run_id") or execution_context_data.get( - "workflow_id" - ) - if not run_id: - run_id = f"wf_{int(time.time() * 1000)}" - workflow_id = execution_context_data.get("workflow_id", run_id) - - execution_context = ExecutionContext( - run_id=run_id, - execution_id=exec_id, - agent_instance=self, - agent_node_id=self.node_id, - reasoner_name=reasoner_name, - parent_execution_id=execution_context_data.get("parent_execution_id"), - session_id=execution_context_data.get("session_id"), - actor_id=execution_context_data.get("actor_id"), - caller_did=execution_context_data.get("caller_did"), - target_did=execution_context_data.get("target_did"), - agent_node_did=execution_context_data.get( - "agent_node_did", execution_context_data.get("agent_did") - ), - workflow_id=workflow_id, - parent_workflow_id=execution_context_data.get("parent_workflow_id"), - root_workflow_id=execution_context_data.get("root_workflow_id"), - ) - - # Set execution context - self._current_execution_context = execution_context - - try: - # Find and execute the target function - if hasattr(self, reasoner_name): - func = getattr(self, reasoner_name) - - # Execute function (sync or async) - if asyncio.iscoroutinefunction(func): - result = asyncio.run(func(**input_data)) - else: - result = func(**input_data) - - return {"statusCode": 200, "body": result} - else: - return { - "statusCode": 404, - "body": {"error": f"Function '{reasoner_name}' not found"}, - } - - except Exception as e: - return {"statusCode": 500, "body": {"error": str(e)}} - finally: - # Clean up execution context - self._current_execution_context = None - - def _handle_discovery(self) -> dict: - """ - Handle discovery requests for serverless agent registration. - - Returns agent metadata including reasoners, skills, and configuration - for automatic registration with the AgentField server. - - Returns: - dict: Agent metadata for registration - """ - return { - "node_id": self.node_id, - "version": self.version, - "deployment_type": "serverless", - "reasoners": [ - { - "id": r["id"], - "input_schema": r.get("input_schema", {}), - "output_schema": r.get("output_schema", {}), - "memory_config": r.get("memory_config", {}), - "tags": r.get("tags", []), - } - for r in self.reasoners - ], - "skills": [ - { - "id": s["id"], - "input_schema": s.get("input_schema", {}), - "tags": s.get("tags", []), - } - for s in self.skills - ], - } - - def _initialize_did_system(self): - """Initialize DID and VC components.""" - try: - # Initialize DID Manager - self.did_manager = DIDManager( - self.agentfield_server, self.node_id, self.api_key - ) - - # Initialize VC Generator - self.vc_generator = VCGenerator(self.agentfield_server, self.api_key) - - if self.dev_mode: - log_debug("DID system initialized") - - except Exception as e: - if self.dev_mode: - log_error(f"Failed to initialize DID system: {e}") - self.did_manager = None - self.vc_generator = None - def _register_memory_event_listeners(self): """Scans for methods decorated with @on_change and registers them as listeners.""" if not self.memory_event_client: @@ -1344,189 +906,6 @@ async def handle_ticket(ticket_id: str): return self._current_execution_context return None - def _populate_execution_context_with_did( - self, execution_context, did_execution_context - ): - """ - Populate the execution context with DID information. - - Args: - execution_context: The main ExecutionContext - did_execution_context: The DIDExecutionContext with DID info - """ - if did_execution_context: - execution_context.session_id = did_execution_context.session_id - execution_context.caller_did = did_execution_context.caller_did - execution_context.target_did = did_execution_context.target_did - execution_context.agent_node_did = did_execution_context.agent_node_did - - def _agent_vc_default(self) -> bool: - """Resolve the agent-level VC default, falling back to enabled.""" - return True if self._agent_vc_enabled is None else self._agent_vc_enabled - - def _set_reasoner_vc_override( - self, reasoner_id: str, value: Optional[bool] - ) -> None: - if value is None: - self._reasoner_vc_overrides.pop(reasoner_id, None) - else: - self._reasoner_vc_overrides[reasoner_id] = value - - def _set_skill_vc_override(self, skill_id: str, value: Optional[bool]) -> None: - if value is None: - self._skill_vc_overrides.pop(skill_id, None) - else: - self._skill_vc_overrides[skill_id] = value - - def _effective_component_vc_setting( - self, component_id: str, overrides: Dict[str, bool] - ) -> bool: - if component_id in overrides: - return overrides[component_id] - return self._agent_vc_default() - - def _should_generate_vc( - self, component_id: str, overrides: Dict[str, bool] - ) -> bool: - if ( - not self.did_enabled - or not self.vc_generator - or not self.vc_generator.is_enabled() - ): - return False - return self._effective_component_vc_setting(component_id, overrides) - - def _build_agent_metadata(self) -> Optional[Dict[str, Any]]: - """Build agent metadata (description, tags, author) for registration payload.""" - metadata: Dict[str, Any] = {} - if self.description: - metadata["description"] = self.description - if self.agent_tags: - metadata["tags"] = self.agent_tags - if self.author: - metadata["author"] = self.author - return metadata if metadata else None - - def _build_vc_metadata(self) -> Dict[str, Any]: - """Produce a serializable VC policy snapshot for control-plane visibility.""" - effective_reasoners = { - reasoner["id"]: self._effective_component_vc_setting( - reasoner["id"], self._reasoner_vc_overrides - ) - for reasoner in self.reasoners - if "id" in reasoner - } - effective_skills = { - skill["id"]: self._effective_component_vc_setting( - skill["id"], self._skill_vc_overrides - ) - for skill in self.skills - if "id" in skill - } - - return { - "agent_default": self._agent_vc_default(), - "reasoner_overrides": dict(self._reasoner_vc_overrides), - "skill_overrides": dict(self._skill_vc_overrides), - "effective_reasoners": effective_reasoners, - "effective_skills": effective_skills, - } - - async def _generate_vc_async( - self, - vc_generator, - did_execution_context, - function_name, - input_data, - output_data, - status="success", - error_message=None, - duration_ms=0, - ): - """ - Generate VC asynchronously without blocking execution. - - Args: - vc_generator: VCGenerator instance - did_execution_context: DID execution context - function_name: Name of the executed function - input_data: Input data for the execution - output_data: Output data from the execution - status: Execution status - error_message: Error message if any - duration_ms: Execution duration in milliseconds - """ - try: - if vc_generator and vc_generator.is_enabled(): - vc = vc_generator.generate_execution_vc( - execution_context=did_execution_context, - input_data=input_data, - output_data=output_data, - status=status, - error_message=error_message, - duration_ms=duration_ms, - ) - if vc: - log_info(f"Generated VC {vc.vc_id} for {function_name}") - except Exception as e: - log_warn(f"Failed to generate VC for {function_name}: {e}") - - def _build_callback_discovery_payload(self) -> Optional[Dict[str, Any]]: - """Prepare discovery metadata for agent registration.""" - - if not self.callback_candidates: - return None - - payload: Dict[str, Any] = { - "mode": "python-sdk:auto", - "preferred": self.base_url, - "callback_candidates": self.callback_candidates, - "container": _is_running_in_container(), - "submitted_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z", - } - - return payload - - def _apply_discovery_response(self, payload: Optional[Dict[str, Any]]) -> None: - """Update agent networking state from AgentField discovery response.""" - - if not payload: - return - - discovery_section = ( - payload.get("callback_discovery") if isinstance(payload, dict) else None - ) - - resolved = None - if isinstance(payload, dict): - resolved = payload.get("resolved_base_url") - if not resolved and isinstance(discovery_section, dict): - resolved = ( - discovery_section.get("resolved") - or discovery_section.get("selected") - or discovery_section.get("preferred") - ) - - if resolved and resolved != self.base_url: - log_debug(f"Applying resolved callback URL from AgentField: {resolved}") - self.base_url = resolved - - if isinstance(discovery_section, dict): - candidates = discovery_section.get("candidates") - if isinstance(candidates, list): - normalized = [] - for candidate in candidates: - if isinstance(candidate, str): - normalized.append(candidate) - # Ensure resolved URL is first when present - if resolved and resolved in normalized: - normalized.remove(resolved) - normalized.insert(0, resolved) - elif resolved: - normalized.insert(0, resolved) - - if normalized: - self.callback_candidates = normalized def _register_agent_with_did(self) -> bool: """ @@ -4531,3 +3910,6 @@ def get_status() -> dict: auto_port=auto_port, **kwargs, ) + + def _is_running_in_container(self) -> bool: + return _is_running_in_container() \ No newline at end of file diff --git a/sdk/python/agentfield/agent_discovery.py b/sdk/python/agentfield/agent_discovery.py new file mode 100644 index 00000000..25891085 --- /dev/null +++ b/sdk/python/agentfield/agent_discovery.py @@ -0,0 +1,95 @@ +from typing import Any, Dict, Optional +from datetime import datetime, timezone +from agentfield.logger import log_debug + +class _AgentDiscoveryMixin: + def _handle_discovery(self) -> dict: + """ + Handle discovery requests for serverless agent registration. + + Returns agent metadata including reasoners, skills, and configuration + for automatic registration with the AgentField server. + + Returns: + dict: Agent metadata for registration + """ + return { + "node_id": self.node_id, + "version": self.version, + "deployment_type": "serverless", + "reasoners": [ + { + "id": r["id"], + "input_schema": r.get("input_schema", {}), + "output_schema": r.get("output_schema", {}), + "memory_config": r.get("memory_config", {}), + "tags": r.get("tags", []), + } + for r in self.reasoners + ], + "skills": [ + { + "id": s["id"], + "input_schema": s.get("input_schema", {}), + "tags": s.get("tags", []), + } + for s in self.skills + ], + } + + def _build_callback_discovery_payload(self) -> Optional[Dict[str, Any]]: + """Prepare discovery metadata for agent registration.""" + + if not self.callback_candidates: + return None + + payload: Dict[str, Any] = { + "mode": "python-sdk:auto", + "preferred": self.base_url, + "callback_candidates": self.callback_candidates, + "container": self._is_running_in_container(), + "submitted_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z", + } + + return payload + + def _apply_discovery_response(self, payload: Optional[Dict[str, Any]]) -> None: + """Update agent networking state from AgentField discovery response.""" + + if not payload: + return + + discovery_section = ( + payload.get("callback_discovery") if isinstance(payload, dict) else None + ) + + resolved = None + if isinstance(payload, dict): + resolved = payload.get("resolved_base_url") + if not resolved and isinstance(discovery_section, dict): + resolved = ( + discovery_section.get("resolved") + or discovery_section.get("selected") + or discovery_section.get("preferred") + ) + + if resolved and resolved != self.base_url: + log_debug(f"Applying resolved callback URL from AgentField: {resolved}") + self.base_url = resolved + + if isinstance(discovery_section, dict): + candidates = discovery_section.get("candidates") + if isinstance(candidates, list): + normalized = [] + for candidate in candidates: + if isinstance(candidate, str): + normalized.append(candidate) + # Ensure resolved URL is first when present + if resolved and resolved in normalized: + normalized.remove(resolved) + normalized.insert(0, resolved) + elif resolved: + normalized.insert(0, resolved) + + if normalized: + self.callback_candidates = normalized \ No newline at end of file diff --git a/sdk/python/agentfield/agent_pause.py b/sdk/python/agentfield/agent_pause.py new file mode 100644 index 00000000..2d9536b3 --- /dev/null +++ b/sdk/python/agentfield/agent_pause.py @@ -0,0 +1,74 @@ +import asyncio +from typing import Dict +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from agentfield.client import ApprovalResult + +class _PauseManager: + """Manages pending execution pause futures resolved via webhook callback. + + Each call to ``Agent.pause()`` registers an ``asyncio.Future`` keyed by + ``approval_request_id``. When the webhook route receives a resolution + callback from the control plane it resolves the matching future, unblocking + the caller. + """ + + def __init__(self) -> None: + self._pending: Dict[str, asyncio.Future] = {} + # Also track execution_id → approval_request_id for fallback resolution + self._exec_to_request: Dict[str, str] = {} + self._lock = asyncio.Lock() + + async def register( + self, approval_request_id: str, execution_id: str = "" + ) -> asyncio.Future: + """Register a new pending pause and return the Future to await.""" + async with self._lock: + if approval_request_id in self._pending: + return self._pending[approval_request_id] + loop = asyncio.get_running_loop() + future = loop.create_future() + self._pending[approval_request_id] = future + if execution_id: + self._exec_to_request[execution_id] = approval_request_id + return future + + async def resolve(self, approval_request_id: str, result: "ApprovalResult") -> bool: + """Resolve a pending pause by approval_request_id. Returns True if a waiter was found.""" + async with self._lock: + future = self._pending.pop(approval_request_id, None) + # Clean up execution mapping + exec_id = None + for eid, rid in self._exec_to_request.items(): + if rid == approval_request_id: + exec_id = eid + break + if exec_id: + self._exec_to_request.pop(exec_id, None) + if future and not future.done(): + future.set_result(result) + return True + return False + + async def resolve_by_execution_id( + self, execution_id: str, result: "ApprovalResult" + ) -> bool: + """Fallback: resolve by execution_id when approval_request_id is not in the callback.""" + async with self._lock: + request_id = self._exec_to_request.pop(execution_id, None) + if request_id: + future = self._pending.pop(request_id, None) + if future and not future.done(): + future.set_result(result) + return True + return False + + async def cancel_all(self) -> None: + """Cancel all pending futures (for shutdown).""" + async with self._lock: + for future in self._pending.values(): + if not future.done(): + future.cancel() + self._pending.clear() + self._exec_to_request.clear() \ No newline at end of file diff --git a/sdk/python/agentfield/agent_schema.py b/sdk/python/agentfield/agent_schema.py new file mode 100644 index 00000000..426cfb50 --- /dev/null +++ b/sdk/python/agentfield/agent_schema.py @@ -0,0 +1,161 @@ +from typing import Any, Dict, Union + +class _AgentSchemaMixin: + def _types_to_json_schema(self, input_types: Dict[str, tuple]) -> Dict: + """Convert Python types dict to JSON schema (on-demand generation).""" + properties = {} + required = [] + + for name, (typ, default) in input_types.items(): + properties[name] = self._type_to_json_schema(typ) + if default is ...: # Required field (no default) + required.append(name) + + schema = { + "type": "object", + "properties": properties, + } + if required: + schema["required"] = required + return schema + + def _type_to_json_schema(self, typ: type) -> Dict: + """Convert a Python type to JSON schema.""" + # Handle None/NoneType + if typ is None or typ is type(None): + return {"type": "null"} + + # Handle basic types + type_map = { + str: {"type": "string"}, + int: {"type": "integer"}, + float: {"type": "number"}, + bool: {"type": "boolean"}, + list: {"type": "array"}, + dict: {"type": "object"}, + bytes: {"type": "string", "format": "binary"}, + } + + if typ in type_map: + return type_map[typ] + + # Handle Pydantic models + if hasattr(typ, "model_json_schema"): + return typ.model_json_schema() + + # Handle typing constructs (List, Dict, Optional, etc.) + origin = getattr(typ, "__origin__", None) + if origin is list: + args = getattr(typ, "__args__", (Any,)) + return { + "type": "array", + "items": self._type_to_json_schema(args[0]) if args else {}, + } + if origin is dict: + return {"type": "object", "additionalProperties": True} + if origin is Union: + args = getattr(typ, "__args__", ()) + # Handle Optional (Union with None) + non_none = [a for a in args if a is not type(None)] + if len(non_none) == 1: + return self._type_to_json_schema(non_none[0]) + return {"anyOf": [self._type_to_json_schema(a) for a in args]} + + # Default fallback + return {"type": "object"} + + def _validate_handler_input( + self, data: dict, input_types: Dict[str, tuple] + ) -> dict: + """ + Validate input data against expected types at runtime. + + Replaces Pydantic model validation with lightweight runtime validation. + Saves ~1.5-2 KB per handler by not creating Pydantic classes. + + Args: + data: Raw input dict from request body + input_types: Dict mapping field names to (type, default) tuples + + Returns: + Validated dict with type coercion applied + + Raises: + ValueError: If required field is missing or type conversion fails + """ + result = {} + + for name, (expected_type, default) in input_types.items(): + # Check if field is present + if name not in data: + if default is ...: # Required field (no default) + raise ValueError(f"Missing required field: {name}") + result[name] = default + continue + + value = data[name] + + # Handle None values + if value is None: + # Check if Optional type + origin = getattr(expected_type, "__origin__", None) + if origin is Union: + args = getattr(expected_type, "__args__", ()) + if type(None) in args: + result[name] = None + continue + # Not Optional, use default if available + if default is not ...: + result[name] = default + continue + raise ValueError(f"Field '{name}' cannot be None") + + # Type coercion for basic types + try: + # Get the actual type (unwrap Optional) + actual_type = expected_type + origin = getattr(expected_type, "__origin__", None) + if origin is Union: + args = getattr(expected_type, "__args__", ()) + non_none = [a for a in args if a is not type(None)] + if len(non_none) == 1: + actual_type = non_none[0] + + # Basic type coercion + if actual_type is int: + result[name] = int(value) + elif actual_type is float: + result[name] = float(value) + elif actual_type is str: + result[name] = str(value) + elif actual_type is bool: + if isinstance(value, bool): + result[name] = value + elif isinstance(value, str): + result[name] = value.lower() in ("true", "1", "yes") + else: + result[name] = bool(value) + elif ( + actual_type is dict + or getattr(actual_type, "__origin__", None) is dict + ): + if not isinstance(value, dict): + raise ValueError(f"Field '{name}' must be a dict") + result[name] = dict(value) + elif ( + actual_type is list + or getattr(actual_type, "__origin__", None) is list + ): + if not isinstance(value, list): + raise ValueError(f"Field '{name}' must be a list") + result[name] = list(value) + elif hasattr(actual_type, "model_validate"): + # Pydantic model - use its validation + result[name] = actual_type.model_validate(value) + else: + # Pass through for complex/unknown types + result[name] = value + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid value for field '{name}': {e}") + + return result \ No newline at end of file diff --git a/sdk/python/agentfield/agent_serverless.py b/sdk/python/agentfield/agent_serverless.py new file mode 100644 index 00000000..50671750 --- /dev/null +++ b/sdk/python/agentfield/agent_serverless.py @@ -0,0 +1,166 @@ +from typing import Optional, Callable +import time +from agentfield.execution_context import ExecutionContext +from agentfield.logger import log_warn + +class _AgentServerlessMixin: + def handle_serverless( + self, event: dict, adapter: Optional[Callable] = None + ) -> dict: + """ + Universal serverless handler for executing reasoners and skills. + + This method enables agents to run in serverless environments (AWS Lambda, + Google Cloud Functions, Cloud Run, Kubernetes Jobs, etc.) by providing + a simple entry point that parses the event, executes the target function, + and returns the result. + + Special Endpoints: + - /discover: Returns agent metadata for AgentField server registration + - /execute: Executes reasoners and skills + + Args: + event (dict): Serverless event containing: + - path: Request path (/discover or /execute) + - action: Alternative to path (discover or execute) + - reasoner: Name of the reasoner to execute (for execution) + - input: Input parameters for the function (for execution) + + Returns: + dict: Execution result with status and output, or discovery metadata + + Example: + ```python + # AWS Lambda handler with API Gateway + from agentfield import Agent + + app = Agent("my_agent", auto_register=False) + + @app.reasoner() + async def analyze(text: str) -> dict: + return {"result": text.upper()} + + def lambda_handler(event, context): + # Handle both discovery and execution + return app.handle_serverless(event) + ``` + """ + import asyncio + + if adapter: + try: + event = adapter(event) or event + except Exception as exc: # pragma: no cover - adapter failures + return { + "statusCode": 400, + "body": {"error": f"serverless adapter failed: {exc}"}, + } + + # Check if this is a discovery request + path = event.get("path") or event.get("rawPath") or "" + action = event.get("action", "") + + if path == "/discover" or path.endswith("/discover") or action == "discover": + # Return agent metadata for AgentField server registration + return self._handle_discovery() + + # Auto-register with AgentField if needed (for execution requests) + if self.auto_register and not self.agentfield_connected: + try: + # Attempt registration (non-blocking) + self.agentfield_handler._register_agent() + self.agentfield_connected = True + except Exception as e: + if self.dev_mode: + log_warn(f"Auto-registration failed: {e}") + + # Serverless invocations arrive via the control plane; mark as connected so + # cross-agent calls can route through the gateway without a lease loop. + self.agentfield_connected = True + # Serverless handlers should avoid async execute polling; force sync path. + if getattr(self.async_config, "enable_async_execution", True): + self.async_config.enable_async_execution = False + + # Parse event format for execution + reasoner_name = ( + event.get("reasoner") or event.get("target") or event.get("skill") + ) + if not reasoner_name and path: + # Support paths like /execute/ or /reasoners/ + cleaned_path = path.split("?", 1)[0].strip("/") + parts = cleaned_path.split("/") + if parts and parts[0] not in ("", "discover"): + if len(parts) >= 2 and parts[0] in ("execute", "reasoners", "skills"): + reasoner_name = parts[1] + elif parts[0] in ("execute", "reasoners", "skills"): + reasoner_name = None + elif parts: + reasoner_name = parts[-1] + + input_data = event.get("input") or event.get("input_data", {}) + execution_context_data = ( + event.get("execution_context") or event.get("executionContext") or {} + ) + + if not reasoner_name: + return { + "statusCode": 400, + "body": {"error": "Missing 'reasoner' or 'target' in event"}, + } + + # Create execution context + exec_id = execution_context_data.get( + "execution_id", f"exec_{int(time.time() * 1000)}" + ) + run_id = execution_context_data.get("run_id") or execution_context_data.get( + "workflow_id" + ) + if not run_id: + run_id = f"wf_{int(time.time() * 1000)}" + workflow_id = execution_context_data.get("workflow_id", run_id) + + execution_context = ExecutionContext( + run_id=run_id, + execution_id=exec_id, + agent_instance=self, + agent_node_id=self.node_id, + reasoner_name=reasoner_name, + parent_execution_id=execution_context_data.get("parent_execution_id"), + session_id=execution_context_data.get("session_id"), + actor_id=execution_context_data.get("actor_id"), + caller_did=execution_context_data.get("caller_did"), + target_did=execution_context_data.get("target_did"), + agent_node_did=execution_context_data.get( + "agent_node_did", execution_context_data.get("agent_did") + ), + workflow_id=workflow_id, + parent_workflow_id=execution_context_data.get("parent_workflow_id"), + root_workflow_id=execution_context_data.get("root_workflow_id"), + ) + + # Set execution context + self._current_execution_context = execution_context + + try: + # Find and execute the target function + if hasattr(self, reasoner_name): + func = getattr(self, reasoner_name) + + # Execute function (sync or async) + if asyncio.iscoroutinefunction(func): + result = asyncio.run(func(**input_data)) + else: + result = func(**input_data) + + return {"statusCode": 200, "body": result} + else: + return { + "statusCode": 404, + "body": {"error": f"Function '{reasoner_name}' not found"}, + } + + except Exception as e: + return {"statusCode": 500, "body": {"error": str(e)}} + finally: + # Clean up execution context + self._current_execution_context = None \ No newline at end of file diff --git a/sdk/python/agentfield/agent_vc.py b/sdk/python/agentfield/agent_vc.py new file mode 100644 index 00000000..c14ff883 --- /dev/null +++ b/sdk/python/agentfield/agent_vc.py @@ -0,0 +1,155 @@ + +from typing import Any, Dict, Optional + +from agentfield.logger import log_debug, log_info, log_warn, log_error +from agentfield.did_manager import DIDManager +from agentfield.vc_generator import VCGenerator + + +class _AgentVCMixin: + def _initialize_did_system(self): + """Initialize DID and VC components.""" + try: + # Initialize DID Manager + self.did_manager = DIDManager( + self.agentfield_server, self.node_id, self.api_key + ) + + # Initialize VC Generator + self.vc_generator = VCGenerator(self.agentfield_server, self.api_key) + + if self.dev_mode: + log_debug("DID system initialized") + + except Exception as e: + if self.dev_mode: + log_error(f"Failed to initialize DID system: {e}") + self.did_manager = None + self.vc_generator = None + + def _populate_execution_context_with_did( + self, execution_context, did_execution_context + ): + """ + Populate the execution context with DID information. + + Args: + execution_context: The main ExecutionContext + did_execution_context: The DIDExecutionContext with DID info + """ + if did_execution_context: + execution_context.session_id = did_execution_context.session_id + execution_context.caller_did = did_execution_context.caller_did + execution_context.target_did = did_execution_context.target_did + execution_context.agent_node_did = did_execution_context.agent_node_did + + def _agent_vc_default(self) -> bool: + """Resolve the agent-level VC default, falling back to enabled.""" + return True if self._agent_vc_enabled is None else self._agent_vc_enabled + + def _set_reasoner_vc_override( + self, reasoner_id: str, value: Optional[bool] + ) -> None: + if value is None: + self._reasoner_vc_overrides.pop(reasoner_id, None) + else: + self._reasoner_vc_overrides[reasoner_id] = value + + def _set_skill_vc_override(self, skill_id: str, value: Optional[bool]) -> None: + if value is None: + self._skill_vc_overrides.pop(skill_id, None) + else: + self._skill_vc_overrides[skill_id] = value + + def _effective_component_vc_setting( + self, component_id: str, overrides: Dict[str, bool] + ) -> bool: + if component_id in overrides: + return overrides[component_id] + return self._agent_vc_default() + + def _should_generate_vc( + self, component_id: str, overrides: Dict[str, bool] + ) -> bool: + if ( + not self.did_enabled + or not self.vc_generator + or not self.vc_generator.is_enabled() + ): + return False + return self._effective_component_vc_setting(component_id, overrides) + + def _build_agent_metadata(self) -> Optional[Dict[str, Any]]: + """Build agent metadata (description, tags, author) for registration payload.""" + metadata: Dict[str, Any] = {} + if self.description: + metadata["description"] = self.description + if self.agent_tags: + metadata["tags"] = self.agent_tags + if self.author: + metadata["author"] = self.author + return metadata if metadata else None + + def _build_vc_metadata(self) -> Dict[str, Any]: + """Produce a serializable VC policy snapshot for control-plane visibility.""" + effective_reasoners = { + reasoner["id"]: self._effective_component_vc_setting( + reasoner["id"], self._reasoner_vc_overrides + ) + for reasoner in self.reasoners + if "id" in reasoner + } + effective_skills = { + skill["id"]: self._effective_component_vc_setting( + skill["id"], self._skill_vc_overrides + ) + for skill in self.skills + if "id" in skill + } + + return { + "agent_default": self._agent_vc_default(), + "reasoner_overrides": dict(self._reasoner_vc_overrides), + "skill_overrides": dict(self._skill_vc_overrides), + "effective_reasoners": effective_reasoners, + "effective_skills": effective_skills, + } + + async def _generate_vc_async( + self, + vc_generator, + did_execution_context, + function_name, + input_data, + output_data, + status="success", + error_message=None, + duration_ms=0, + ): + """ + Generate VC asynchronously without blocking execution. + + Args: + vc_generator: VCGenerator instance + did_execution_context: DID execution context + function_name: Name of the executed function + input_data: Input data for the execution + output_data: Output data from the execution + status: Execution status + error_message: Error message if any + duration_ms: Execution duration in milliseconds + """ + try: + if vc_generator and vc_generator.is_enabled(): + vc = vc_generator.generate_execution_vc( + execution_context=did_execution_context, + input_data=input_data, + output_data=output_data, + status=status, + error_message=error_message, + duration_ms=duration_ms, + ) + if vc: + log_info(f"Generated VC {vc.vc_id} for {function_name}") + except Exception as e: + log_warn(f"Failed to generate VC for {function_name}: {e}") \ No newline at end of file diff --git a/sdk/python/tests/helpers.py b/sdk/python/tests/helpers.py index 74d0db7a..52e90a55 100644 --- a/sdk/python/tests/helpers.py +++ b/sdk/python/tests/helpers.py @@ -496,6 +496,8 @@ async def _noop_fire_and_forget_update(self, payload: Dict[str, Any]) -> None: monkeypatch.setattr("agentfield.agent.MemoryEventClient", _FakeMemoryEventClient) monkeypatch.setattr("agentfield.agent.DIDManager", _FakeDIDManager) monkeypatch.setattr("agentfield.agent.VCGenerator", _FakeVCGenerator) + monkeypatch.setattr("agentfield.agent_vc.DIDManager", _FakeDIDManager) + monkeypatch.setattr("agentfield.agent_vc.VCGenerator", _FakeVCGenerator) monkeypatch.setattr( AgentWorkflow, "notify_call_start", _record_call_start, raising=False )