Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion agentops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from typing import List, Optional, Union, Dict, Any
from agentops.client import Client
from agentops.sdk.core import TraceContext, tracer
from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool, guardrail
from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool, guardrail, track_endpoint
from agentops.enums import TraceState, SUCCESS, ERROR, UNSET
from opentelemetry.trace.status import StatusCode

Expand Down Expand Up @@ -90,6 +90,7 @@ def init(
env_data_opt_out: Optional[bool] = None,
log_level: Optional[Union[str, int]] = None,
fail_safe: Optional[bool] = None,
log_session_replay_url: Optional[bool] = None,
exporter_endpoint: Optional[str] = None,
**kwargs,
):
Expand Down Expand Up @@ -117,6 +118,7 @@ def init(
env_data_opt_out (bool): Whether to opt out of collecting environment data.
log_level (str, int): The log level to use for the client. Defaults to 'CRITICAL'.
fail_safe (bool): Whether to suppress errors and continue execution when possible.
log_session_replay_url (bool): Whether to log session replay URLs to the console. Defaults to True.
exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will
be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable.
**kwargs: Additional configuration parameters to be passed to the client.
Expand Down Expand Up @@ -159,6 +161,7 @@ def init(
"env_data_opt_out": env_data_opt_out,
"log_level": log_level,
"fail_safe": fail_safe,
"log_session_replay_url": log_session_replay_url,
"exporter_endpoint": exporter_endpoint,
**kwargs,
}
Expand Down Expand Up @@ -472,6 +475,7 @@ def extract_key_from_attr(attr_value: str) -> str:
"operation",
"tool",
"guardrail",
"track_endpoint",
# Enums
"TraceState",
"SUCCESS",
Expand Down
11 changes: 11 additions & 0 deletions agentops/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ConfigDict(TypedDict):
log_level: Optional[Union[str, int]]
fail_safe: Optional[bool]
prefetch_jwt_token: Optional[bool]
log_session_replay_url: Optional[bool]


@dataclass
Expand Down Expand Up @@ -115,6 +116,11 @@ class Config:
metadata={"description": "Whether to prefetch JWT token during initialization"},
)

log_session_replay_url: bool = field(
default_factory=lambda: get_env_bool("AGENTOPS_LOG_SESSION_REPLAY_URL", True),
metadata={"description": "Whether to log session replay URLs to the console"},
)

exporter_endpoint: Optional[str] = field(
default_factory=lambda: os.getenv("AGENTOPS_EXPORTER_ENDPOINT", "https://otlp.agentops.ai/v1/traces"),
metadata={
Expand Down Expand Up @@ -148,6 +154,7 @@ def configure(
log_level: Optional[Union[str, int]] = None,
fail_safe: Optional[bool] = None,
prefetch_jwt_token: Optional[bool] = None,
log_session_replay_url: Optional[bool] = None,
exporter: Optional[SpanExporter] = None,
processor: Optional[SpanProcessor] = None,
exporter_endpoint: Optional[str] = None,
Expand Down Expand Up @@ -213,6 +220,9 @@ def configure(
if prefetch_jwt_token is not None:
self.prefetch_jwt_token = prefetch_jwt_token

if log_session_replay_url is not None:
self.log_session_replay_url = log_session_replay_url

if exporter is not None:
self.exporter = exporter

Expand Down Expand Up @@ -243,6 +253,7 @@ def dict(self):
"log_level": self.log_level,
"fail_safe": self.fail_safe,
"prefetch_jwt_token": self.prefetch_jwt_token,
"log_session_replay_url": self.log_session_replay_url,
"exporter": self.exporter,
"processor": self.processor,
"exporter_endpoint": self.exporter_endpoint,
Expand Down
10 changes: 10 additions & 0 deletions agentops/helpers/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def log_trace_url(span: Union[Span, ReadableSpan], title: Optional[str] = None)

Args:
span: The span to log the URL for.
title: Optional title for the trace.
"""
from agentops import get_client

try:
client = get_client()
if not client.config.log_session_replay_url:
return
except Exception:
return

session_url = get_trace_url(span)
logger.info(colored(f"\x1b[34mSession Replay for {title} trace: {session_url}\x1b[0m", "blue"))
2 changes: 2 additions & 0 deletions agentops/sdk/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
tool = create_entity_decorator(SpanKind.TOOL)
operation = task
guardrail = create_entity_decorator(SpanKind.GUARDRAIL)
track_endpoint = create_entity_decorator(SpanKind.HTTP)


# For backward compatibility: @session decorator calls @trace decorator
Expand Down Expand Up @@ -46,4 +47,5 @@ def session(*args, **kwargs): # noqa: F811
"operation",
"tool",
"guardrail",
"track_endpoint",
]
188 changes: 185 additions & 3 deletions agentops/sdk/decorators/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
_process_sync_generator,
_record_entity_input,
_record_entity_output,
_extract_request_data,
_extract_response_data,
)


def create_entity_decorator(entity_kind: str) -> Callable[..., Any]:
"""
Factory that creates decorators for instrumenting functions and classes.
Handles different entity kinds (e.g., SESSION, TASK) and function types (sync, async, generator).
Handles different entity kinds (e.g., SESSION, TASK, HTTP) and function types (sync, async, generator).
"""

def decorator(
Expand All @@ -34,9 +36,20 @@ def decorator(
tags: Optional[Union[list, dict]] = None,
cost=None,
spec=None,
capture_request: bool = True,
capture_response: bool = True,
) -> Callable[..., Any]:
if wrapped is None:
return functools.partial(decorator, name=name, version=version, tags=tags, cost=cost, spec=spec)
return functools.partial(
decorator,
name=name,
version=version,
tags=tags,
cost=cost,
spec=spec,
capture_request=capture_request,
capture_response=capture_response,
)

if inspect.isclass(wrapped):
# Class decoration wraps __init__ and aenter/aexit for context management.
Expand Down Expand Up @@ -96,7 +109,176 @@ def wrapper(
is_generator = inspect.isgeneratorfunction(wrapped_func)
is_async_generator = inspect.isasyncgenfunction(wrapped_func)

if entity_kind == SpanKind.SESSION:
# Special handling for HTTP entity kind
if entity_kind == SpanKind.HTTP:
if is_generator or is_async_generator:
logger.warning(
f"@track_endpoint on generator '{operation_name}' is not supported. Use @trace instead."
)
return wrapped_func(*args, **kwargs)

if is_async:

async def _wrapped_http_async() -> Any:
trace_context: Optional[TraceContext] = None
try:
# Create main session span
trace_context = tracer.start_trace(trace_name=operation_name, tags=tags)
if not trace_context:
logger.error(
f"Failed to start trace for @track_endpoint '{operation_name}'. Executing without trace."
)
return await wrapped_func(*args, **kwargs)

# Create HTTP request span
if capture_request:
with _create_as_current_span(
f"{operation_name}.request",
SpanKind.HTTP,
version=version,
attributes={SpanAttributes.HTTP_METHOD: "REQUEST"}
if SpanAttributes.HTTP_METHOD
else None,
) as request_span:
try:
request_data = _extract_request_data()
if request_data:
# Set HTTP attributes
if hasattr(SpanAttributes, "HTTP_METHOD") and request_data.get("method"):
request_span.set_attribute(
SpanAttributes.HTTP_METHOD, request_data["method"]
)
if hasattr(SpanAttributes, "HTTP_URL") and request_data.get("url"):
request_span.set_attribute(SpanAttributes.HTTP_URL, request_data["url"])

# Record the full request data
_record_entity_input(request_span, (request_data,), {})
except Exception as e:
logger.warning(f"Failed to record HTTP request for '{operation_name}': {e}")

# Execute the main function
result = await wrapped_func(*args, **kwargs)

# Create HTTP response span
if capture_response:
with _create_as_current_span(
f"{operation_name}.response",
SpanKind.HTTP,
version=version,
attributes={SpanAttributes.HTTP_METHOD: "RESPONSE"}
if SpanAttributes.HTTP_METHOD
else None,
) as response_span:
try:
response_data = _extract_response_data(result)
if response_data:
# Set HTTP attributes
if hasattr(SpanAttributes, "HTTP_STATUS_CODE") and response_data.get(
"status_code"
):
response_span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE, response_data["status_code"]
)

# Record the full response data
_record_entity_output(response_span, response_data)
except Exception as e:
logger.warning(f"Failed to record HTTP response for '{operation_name}': {e}")

tracer.end_trace(trace_context, "Success")
return result
except Exception:
if trace_context:
tracer.end_trace(trace_context, "Indeterminate")
raise
finally:
if trace_context and trace_context.span.is_recording():
logger.warning(
f"Trace for @track_endpoint '{operation_name}' not explicitly ended. Ending as 'Unknown'."
)
tracer.end_trace(trace_context, "Unknown")

return _wrapped_http_async()
else: # Sync function for HTTP
trace_context: Optional[TraceContext] = None
try:
# Create main session span
trace_context = tracer.start_trace(trace_name=operation_name, tags=tags)
if not trace_context:
logger.error(
f"Failed to start trace for @track_endpoint '{operation_name}'. Executing without trace."
)
return wrapped_func(*args, **kwargs)

# Create HTTP request span
if capture_request:
with _create_as_current_span(
f"{operation_name}.request",
SpanKind.HTTP,
version=version,
attributes={SpanAttributes.HTTP_METHOD: "REQUEST"}
if SpanAttributes.HTTP_METHOD
else None,
) as request_span:
try:
request_data = _extract_request_data()
if request_data:
# Set HTTP attributes
if hasattr(SpanAttributes, "HTTP_METHOD") and request_data.get("method"):
request_span.set_attribute(
SpanAttributes.HTTP_METHOD, request_data["method"]
)
if hasattr(SpanAttributes, "HTTP_URL") and request_data.get("url"):
request_span.set_attribute(SpanAttributes.HTTP_URL, request_data["url"])

# Record the full request data
_record_entity_input(request_span, (request_data,), {})
except Exception as e:
logger.warning(f"Failed to record HTTP request for '{operation_name}': {e}")

# Execute the main function
result = wrapped_func(*args, **kwargs)

# Create HTTP response span
if capture_response:
with _create_as_current_span(
f"{operation_name}.response",
SpanKind.HTTP,
version=version,
attributes={SpanAttributes.HTTP_METHOD: "RESPONSE"}
if SpanAttributes.HTTP_METHOD
else None,
) as response_span:
try:
response_data = _extract_response_data(result)
if response_data:
# Set HTTP attributes
if hasattr(SpanAttributes, "HTTP_STATUS_CODE") and response_data.get(
"status_code"
):
response_span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE, response_data["status_code"]
)

# Record the full response data
_record_entity_output(response_span, response_data)
except Exception as e:
logger.warning(f"Failed to record HTTP response for '{operation_name}': {e}")

tracer.end_trace(trace_context, "Success")
return result
except Exception:
if trace_context:
tracer.end_trace(trace_context, "Indeterminate")
raise
finally:
if trace_context and trace_context.span.is_recording():
logger.warning(
f"Trace for @track_endpoint '{operation_name}' not explicitly ended. Ending as 'Unknown'."
)
tracer.end_trace(trace_context, "Unknown")

elif entity_kind == SpanKind.SESSION:
if is_generator or is_async_generator:
logger.warning(
f"@agentops.trace on generator '{operation_name}' creates a single span, not a full trace."
Expand Down
54 changes: 54 additions & 0 deletions agentops/sdk/decorators/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,57 @@ def _record_entity_output(span: trace.Span, result: Any, entity_kind: str = "ent
logger.debug("Operation output exceeds size limit, not recording")
except Exception as err:
logger.warning(f"Failed to serialize operation output: {err}")


# Helper functions for HTTP request/response data extraction


def _extract_request_data():
"""Extract HTTP request data from the current web framework context."""
request_data = {}

try:
# Try to import Flask and get current request
from flask import request

request_data = {
"method": request.method,
"url": request.url,
"headers": dict(request.headers),
"args": dict(request.args),
"form": dict(request.form) if request.form else None,
"json": request.get_json(silent=True),
"data": request.get_data(as_text=True) if request.content_length else None,
}
except ImportError:
logger.debug("Flask not available for request data extraction")
except Exception as e:
logger.warning(f"Failed to extract request data: {e}")

return request_data


def _extract_response_data(response):
"""Extract HTTP response data from response object."""
response_data = {}

try:
# Handle Flask response objects
from flask import Response

if isinstance(response, Response):
response_data = {
"status_code": response.status_code,
"headers": dict(response.headers),
"data": response.get_data(as_text=True) if response.content_length else None,
}
else:
# Handle cases where response is just data (will be converted to Response by Flask)
response_data = {
"status_code": 200, # Default status for successful responses
"data": str(response) if response is not None else None,
}
except Exception as e:
logger.warning(f"Failed to extract response data: {e}")

return response_data
Loading
Loading