diff --git a/agentops/__init__.py b/agentops/__init__.py index a03089318..816e77443 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -26,7 +26,7 @@ from typing import List, Optional, Union, Dict, Any from agentops.client import Client from agentops.sdk.core import TraceContext, tracer -from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool, guardrail +from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool, guardrail, track_endpoint from agentops.enums import TraceState, SUCCESS, ERROR, UNSET from opentelemetry.trace.status import StatusCode @@ -90,6 +90,7 @@ def init( env_data_opt_out: Optional[bool] = None, log_level: Optional[Union[str, int]] = None, fail_safe: Optional[bool] = None, + log_session_replay_url: Optional[bool] = None, exporter_endpoint: Optional[str] = None, **kwargs, ): @@ -117,6 +118,7 @@ def init( env_data_opt_out (bool): Whether to opt out of collecting environment data. log_level (str, int): The log level to use for the client. Defaults to 'CRITICAL'. fail_safe (bool): Whether to suppress errors and continue execution when possible. + log_session_replay_url (bool): Whether to log session replay URLs to the console. Defaults to True. exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable. **kwargs: Additional configuration parameters to be passed to the client. @@ -159,6 +161,7 @@ def init( "env_data_opt_out": env_data_opt_out, "log_level": log_level, "fail_safe": fail_safe, + "log_session_replay_url": log_session_replay_url, "exporter_endpoint": exporter_endpoint, **kwargs, } @@ -472,6 +475,7 @@ def extract_key_from_attr(attr_value: str) -> str: "operation", "tool", "guardrail", + "track_endpoint", # Enums "TraceState", "SUCCESS", diff --git a/agentops/config.py b/agentops/config.py index 51fc6b1fc..0a0a57aab 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -31,6 +31,7 @@ class ConfigDict(TypedDict): log_level: Optional[Union[str, int]] fail_safe: Optional[bool] prefetch_jwt_token: Optional[bool] + log_session_replay_url: Optional[bool] @dataclass @@ -115,6 +116,11 @@ class Config: metadata={"description": "Whether to prefetch JWT token during initialization"}, ) + log_session_replay_url: bool = field( + default_factory=lambda: get_env_bool("AGENTOPS_LOG_SESSION_REPLAY_URL", True), + metadata={"description": "Whether to log session replay URLs to the console"}, + ) + exporter_endpoint: Optional[str] = field( default_factory=lambda: os.getenv("AGENTOPS_EXPORTER_ENDPOINT", "https://otlp.agentops.ai/v1/traces"), metadata={ @@ -148,6 +154,7 @@ def configure( log_level: Optional[Union[str, int]] = None, fail_safe: Optional[bool] = None, prefetch_jwt_token: Optional[bool] = None, + log_session_replay_url: Optional[bool] = None, exporter: Optional[SpanExporter] = None, processor: Optional[SpanProcessor] = None, exporter_endpoint: Optional[str] = None, @@ -213,6 +220,9 @@ def configure( if prefetch_jwt_token is not None: self.prefetch_jwt_token = prefetch_jwt_token + if log_session_replay_url is not None: + self.log_session_replay_url = log_session_replay_url + if exporter is not None: self.exporter = exporter @@ -243,6 +253,7 @@ def dict(self): "log_level": self.log_level, "fail_safe": self.fail_safe, "prefetch_jwt_token": self.prefetch_jwt_token, + "log_session_replay_url": self.log_session_replay_url, "exporter": self.exporter, "processor": self.processor, "exporter_endpoint": self.exporter_endpoint, diff --git a/agentops/helpers/dashboard.py b/agentops/helpers/dashboard.py index ed43b8074..2f7efe6e3 100644 --- a/agentops/helpers/dashboard.py +++ b/agentops/helpers/dashboard.py @@ -39,6 +39,16 @@ def log_trace_url(span: Union[Span, ReadableSpan], title: Optional[str] = None) Args: span: The span to log the URL for. + title: Optional title for the trace. """ + from agentops import get_client + + try: + client = get_client() + if not client.config.log_session_replay_url: + return + except Exception: + return + session_url = get_trace_url(span) logger.info(colored(f"\x1b[34mSession Replay for {title} trace: {session_url}\x1b[0m", "blue")) diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index d996cc80a..14594329a 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -16,6 +16,7 @@ tool = create_entity_decorator(SpanKind.TOOL) operation = task guardrail = create_entity_decorator(SpanKind.GUARDRAIL) +track_endpoint = create_entity_decorator(SpanKind.HTTP) # For backward compatibility: @session decorator calls @trace decorator @@ -46,4 +47,5 @@ def session(*args, **kwargs): # noqa: F811 "operation", "tool", "guardrail", + "track_endpoint", ] diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index c93813abe..6da554e8e 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -17,13 +17,15 @@ _process_sync_generator, _record_entity_input, _record_entity_output, + _extract_request_data, + _extract_response_data, ) def create_entity_decorator(entity_kind: str) -> Callable[..., Any]: """ Factory that creates decorators for instrumenting functions and classes. - Handles different entity kinds (e.g., SESSION, TASK) and function types (sync, async, generator). + Handles different entity kinds (e.g., SESSION, TASK, HTTP) and function types (sync, async, generator). """ def decorator( @@ -34,9 +36,20 @@ def decorator( tags: Optional[Union[list, dict]] = None, cost=None, spec=None, + capture_request: bool = True, + capture_response: bool = True, ) -> Callable[..., Any]: if wrapped is None: - return functools.partial(decorator, name=name, version=version, tags=tags, cost=cost, spec=spec) + return functools.partial( + decorator, + name=name, + version=version, + tags=tags, + cost=cost, + spec=spec, + capture_request=capture_request, + capture_response=capture_response, + ) if inspect.isclass(wrapped): # Class decoration wraps __init__ and aenter/aexit for context management. @@ -96,7 +109,176 @@ def wrapper( is_generator = inspect.isgeneratorfunction(wrapped_func) is_async_generator = inspect.isasyncgenfunction(wrapped_func) - if entity_kind == SpanKind.SESSION: + # Special handling for HTTP entity kind + if entity_kind == SpanKind.HTTP: + if is_generator or is_async_generator: + logger.warning( + f"@track_endpoint on generator '{operation_name}' is not supported. Use @trace instead." + ) + return wrapped_func(*args, **kwargs) + + if is_async: + + async def _wrapped_http_async() -> Any: + trace_context: Optional[TraceContext] = None + try: + # Create main session span + trace_context = tracer.start_trace(trace_name=operation_name, tags=tags) + if not trace_context: + logger.error( + f"Failed to start trace for @track_endpoint '{operation_name}'. Executing without trace." + ) + return await wrapped_func(*args, **kwargs) + + # Create HTTP request span + if capture_request: + with _create_as_current_span( + f"{operation_name}.request", + SpanKind.HTTP, + version=version, + attributes={SpanAttributes.HTTP_METHOD: "REQUEST"} + if SpanAttributes.HTTP_METHOD + else None, + ) as request_span: + try: + request_data = _extract_request_data() + if request_data: + # Set HTTP attributes + if hasattr(SpanAttributes, "HTTP_METHOD") and request_data.get("method"): + request_span.set_attribute( + SpanAttributes.HTTP_METHOD, request_data["method"] + ) + if hasattr(SpanAttributes, "HTTP_URL") and request_data.get("url"): + request_span.set_attribute(SpanAttributes.HTTP_URL, request_data["url"]) + + # Record the full request data + _record_entity_input(request_span, (request_data,), {}) + except Exception as e: + logger.warning(f"Failed to record HTTP request for '{operation_name}': {e}") + + # Execute the main function + result = await wrapped_func(*args, **kwargs) + + # Create HTTP response span + if capture_response: + with _create_as_current_span( + f"{operation_name}.response", + SpanKind.HTTP, + version=version, + attributes={SpanAttributes.HTTP_METHOD: "RESPONSE"} + if SpanAttributes.HTTP_METHOD + else None, + ) as response_span: + try: + response_data = _extract_response_data(result) + if response_data: + # Set HTTP attributes + if hasattr(SpanAttributes, "HTTP_STATUS_CODE") and response_data.get( + "status_code" + ): + response_span.set_attribute( + SpanAttributes.HTTP_STATUS_CODE, response_data["status_code"] + ) + + # Record the full response data + _record_entity_output(response_span, response_data) + except Exception as e: + logger.warning(f"Failed to record HTTP response for '{operation_name}': {e}") + + tracer.end_trace(trace_context, "Success") + return result + except Exception: + if trace_context: + tracer.end_trace(trace_context, "Indeterminate") + raise + finally: + if trace_context and trace_context.span.is_recording(): + logger.warning( + f"Trace for @track_endpoint '{operation_name}' not explicitly ended. Ending as 'Unknown'." + ) + tracer.end_trace(trace_context, "Unknown") + + return _wrapped_http_async() + else: # Sync function for HTTP + trace_context: Optional[TraceContext] = None + try: + # Create main session span + trace_context = tracer.start_trace(trace_name=operation_name, tags=tags) + if not trace_context: + logger.error( + f"Failed to start trace for @track_endpoint '{operation_name}'. Executing without trace." + ) + return wrapped_func(*args, **kwargs) + + # Create HTTP request span + if capture_request: + with _create_as_current_span( + f"{operation_name}.request", + SpanKind.HTTP, + version=version, + attributes={SpanAttributes.HTTP_METHOD: "REQUEST"} + if SpanAttributes.HTTP_METHOD + else None, + ) as request_span: + try: + request_data = _extract_request_data() + if request_data: + # Set HTTP attributes + if hasattr(SpanAttributes, "HTTP_METHOD") and request_data.get("method"): + request_span.set_attribute( + SpanAttributes.HTTP_METHOD, request_data["method"] + ) + if hasattr(SpanAttributes, "HTTP_URL") and request_data.get("url"): + request_span.set_attribute(SpanAttributes.HTTP_URL, request_data["url"]) + + # Record the full request data + _record_entity_input(request_span, (request_data,), {}) + except Exception as e: + logger.warning(f"Failed to record HTTP request for '{operation_name}': {e}") + + # Execute the main function + result = wrapped_func(*args, **kwargs) + + # Create HTTP response span + if capture_response: + with _create_as_current_span( + f"{operation_name}.response", + SpanKind.HTTP, + version=version, + attributes={SpanAttributes.HTTP_METHOD: "RESPONSE"} + if SpanAttributes.HTTP_METHOD + else None, + ) as response_span: + try: + response_data = _extract_response_data(result) + if response_data: + # Set HTTP attributes + if hasattr(SpanAttributes, "HTTP_STATUS_CODE") and response_data.get( + "status_code" + ): + response_span.set_attribute( + SpanAttributes.HTTP_STATUS_CODE, response_data["status_code"] + ) + + # Record the full response data + _record_entity_output(response_span, response_data) + except Exception as e: + logger.warning(f"Failed to record HTTP response for '{operation_name}': {e}") + + tracer.end_trace(trace_context, "Success") + return result + except Exception: + if trace_context: + tracer.end_trace(trace_context, "Indeterminate") + raise + finally: + if trace_context and trace_context.span.is_recording(): + logger.warning( + f"Trace for @track_endpoint '{operation_name}' not explicitly ended. Ending as 'Unknown'." + ) + tracer.end_trace(trace_context, "Unknown") + + elif entity_kind == SpanKind.SESSION: if is_generator or is_async_generator: logger.warning( f"@agentops.trace on generator '{operation_name}' creates a single span, not a full trace." diff --git a/agentops/sdk/decorators/utility.py b/agentops/sdk/decorators/utility.py index 8abc8820d..e6894ad15 100644 --- a/agentops/sdk/decorators/utility.py +++ b/agentops/sdk/decorators/utility.py @@ -160,3 +160,57 @@ def _record_entity_output(span: trace.Span, result: Any, entity_kind: str = "ent logger.debug("Operation output exceeds size limit, not recording") except Exception as err: logger.warning(f"Failed to serialize operation output: {err}") + + +# Helper functions for HTTP request/response data extraction + + +def _extract_request_data(): + """Extract HTTP request data from the current web framework context.""" + request_data = {} + + try: + # Try to import Flask and get current request + from flask import request + + request_data = { + "method": request.method, + "url": request.url, + "headers": dict(request.headers), + "args": dict(request.args), + "form": dict(request.form) if request.form else None, + "json": request.get_json(silent=True), + "data": request.get_data(as_text=True) if request.content_length else None, + } + except ImportError: + logger.debug("Flask not available for request data extraction") + except Exception as e: + logger.warning(f"Failed to extract request data: {e}") + + return request_data + + +def _extract_response_data(response): + """Extract HTTP response data from response object.""" + response_data = {} + + try: + # Handle Flask response objects + from flask import Response + + if isinstance(response, Response): + response_data = { + "status_code": response.status_code, + "headers": dict(response.headers), + "data": response.get_data(as_text=True) if response.content_length else None, + } + else: + # Handle cases where response is just data (will be converted to Response by Flask) + response_data = { + "status_code": 200, # Default status for successful responses + "data": str(response) if response is not None else None, + } + except Exception as e: + logger.warning(f"Failed to extract response data: {e}") + + return response_data diff --git a/agentops/semconv/span_attributes.py b/agentops/semconv/span_attributes.py index a6be4132a..2c5f8ce1c 100644 --- a/agentops/semconv/span_attributes.py +++ b/agentops/semconv/span_attributes.py @@ -104,3 +104,15 @@ class SpanAttributes: LLM_STREAMING_TIME_TO_GENERATE = "gen_ai.streaming.time_to_generate" LLM_STREAMING_DURATION = "gen_ai.streaming_duration" LLM_STREAMING_CHUNK_COUNT = "gen_ai.streaming.chunk_count" + + # HTTP-specific attributes + HTTP_METHOD = "http.method" + HTTP_URL = "http.url" + HTTP_ROUTE = "http.route" + HTTP_STATUS_CODE = "http.status_code" + HTTP_REQUEST_HEADERS = "http.request.headers" + HTTP_RESPONSE_HEADERS = "http.response.headers" + HTTP_REQUEST_BODY = "http.request.body" + HTTP_RESPONSE_BODY = "http.response.body" + HTTP_USER_AGENT = "http.user_agent" + HTTP_REQUEST_ID = "http.request_id" diff --git a/agentops/semconv/span_kinds.py b/agentops/semconv/span_kinds.py index 75b3c6b97..1a8c9af63 100644 --- a/agentops/semconv/span_kinds.py +++ b/agentops/semconv/span_kinds.py @@ -17,6 +17,7 @@ class AgentOpsSpanKindValues(Enum): CHAIN = "chain" TEXT = "text" GUARDRAIL = "guardrail" + HTTP = "http" UNKNOWN = "unknown" @@ -46,3 +47,4 @@ class SpanKind: CHAIN = AgentOpsSpanKindValues.CHAIN.value TEXT = AgentOpsSpanKindValues.TEXT.value GUARDRAIL = AgentOpsSpanKindValues.GUARDRAIL.value + HTTP = AgentOpsSpanKindValues.HTTP.value diff --git a/docs/mint.json b/docs/mint.json index e26bc058c..ac0052bae 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -154,6 +154,7 @@ "v2/usage/tracking-agents", "v2/usage/recording-operations", "v2/usage/trace-decorator", + "v2/usage/track-endpoint-decorator", "v2/usage/manual-trace-control", "v2/usage/public-api" ], diff --git a/docs/v2/examples/openai.mdx b/docs/v2/examples/openai.mdx index ca66e9a55..2114f2135 100755 --- a/docs/v2/examples/openai.mdx +++ b/docs/v2/examples/openai.mdx @@ -1,6 +1,6 @@ --- title: 'OpenAI' -description: 'Load the dataset (ensure you're logged in with huggingface-cli if needed)' +description: 'Load the dataset (ensure you are logged in with huggingface-cli if needed)' --- {/* SOURCE_FILE: examples/openai/multi_tool_orchestration.ipynb */} diff --git a/docs/v2/usage/track-endpoint-decorator.mdx b/docs/v2/usage/track-endpoint-decorator.mdx new file mode 100644 index 000000000..d627630c7 --- /dev/null +++ b/docs/v2/usage/track-endpoint-decorator.mdx @@ -0,0 +1,66 @@ +--- +title: "Track Endpoint Decorator" +description: "HTTP endpoint tracing for Flask applications using the @track_endpoint decorator" +--- + +## Overview + +The `@track_endpoint` decorator provides HTTP endpoint tracing for Flask applications with automatic request/response monitoring. It's designed to work seamlessly with Flask and extends the functionality of the basic `@trace` decorator. + +## Quick Example with OpenAI + +Here's a simple Flask endpoint that generates text using OpenAI: + +```python +from flask import Flask, request +from openai import OpenAI +import agentops + +# Initialize AgentOps +agentops.init( + api_key="your-api-key", + auto_start_session=False, # Required for endpoint tracing +) + +app = Flask(__name__) +client = OpenAI() + +@app.route("/api/generate", methods=["POST"]) +@agentops.track_endpoint( + name="generate_text", + tags=["ai", "openai"] +) +def generate_text(): + """Generate text using OpenAI""" + data = request.get_json() + prompt = data.get("prompt", "Hello!") + + # OpenAI call is automatically traced + response = client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": prompt}], + max_tokens=150 + ) + + return { + "text": response.choices[0].message.content, + "usage": { + "total_tokens": response.usage.total_tokens + } + } + +if __name__ == "__main__": + app.run(debug=True) +``` + +The decorator automatically captures: +- HTTP request data (method, URL, headers, body) +- HTTP response data (status code, headers, body) +- OpenAI API calls and their results +- Any errors that occur during request processing + +You can customize tracing with parameters like: +- `name`: Custom name for the trace +- `tags`: List or dict of tags for categorizing traces +- `capture_request`: Whether to capture request data (default: True) +- `capture_response`: Whether to capture response data (default: True) \ No newline at end of file