diff --git a/docs/usage/metrics/open-telemetry.rst b/docs/usage/metrics/open-telemetry.rst index bee92c6457..bb546bd0ad 100644 --- a/docs/usage/metrics/open-telemetry.rst +++ b/docs/usage/metrics/open-telemetry.rst @@ -16,8 +16,7 @@ this package, you should first install the required dependencies: pip install 'litestar[opentelemetry]' Once these requirements are satisfied, you can instrument your Litestar application by creating an instance -of :class:`OpenTelemetryConfig ` and passing the middleware it creates to -the Litestar constructor: +of :class:`OpenTelemetryConfig ` and passing it to the plugin: .. code-block:: python @@ -32,5 +31,110 @@ The above example will work out of the box if you configure a global ``tracer_pr exporter to use these (see the `OpenTelemetry Exporter docs `_ for further details). -You can also pass configuration to the ``OpenTelemetryConfig`` telling it which providers to use. Consult -:class:`reference docs ` regarding the configuration options you can use. +You can also pass configuration to the ``OpenTelemetryConfig`` telling it which providers to use. Consult the +:class:`OpenTelemetryConfig ` reference docs for all available configuration options. + +Configuration options +--------------------- + +Provider configuration +~~~~~~~~~~~~~~~~~~~~~~ + +The following options allow you to configure custom OpenTelemetry providers: + +- ``tracer_provider``: Custom ``TracerProvider`` instance. If omitted, the globally configured provider is used. +- ``meter_provider``: Custom ``MeterProvider`` instance. If omitted, the globally configured provider is used. +- ``meter``: Custom ``Meter`` instance. If omitted, the meter from the provider is used. + +Example with custom tracer provider: + +.. code-block:: python + + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter + from litestar import Litestar + from litestar.plugins.opentelemetry import OpenTelemetryConfig, OpenTelemetryPlugin + + # Configure a custom tracer provider + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) + + config = OpenTelemetryConfig(tracer_provider=tracer_provider) + app = Litestar(plugins=[OpenTelemetryPlugin(config)]) + +Hook handlers +~~~~~~~~~~~~~ + +Hook handlers allow you to customize span behavior at various points in the request lifecycle: + +- ``server_request_hook_handler``: Called with the server span and ASGI scope for every incoming request. +- ``client_request_hook_handler``: Called with the internal span when the ``receive`` method is invoked. +- ``client_response_hook_handler``: Called with the internal span when the ``send`` method is invoked. + +Example adding custom attributes: + +.. code-block:: python + + from opentelemetry.trace import Span + from litestar.plugins.opentelemetry import OpenTelemetryConfig, OpenTelemetryPlugin + + def request_hook(span: Span, scope: dict) -> None: + span.set_attribute("custom.user_agent", scope.get("headers", {}).get("user-agent", "")) + + config = OpenTelemetryConfig(server_request_hook_handler=request_hook) + app = Litestar(plugins=[OpenTelemetryPlugin(config)]) + +URL filtering +~~~~~~~~~~~~~ + +You can exclude specific URLs from instrumentation: + +- ``exclude``: Pattern or list of patterns to exclude from instrumentation. +- ``exclude_opt_key``: Route option key to disable instrumentation on a per-route basis. +- ``exclude_urls_env_key`` (default ``"LITESTAR"``): Environment variable prefix for excluded URLs. With the default, the environment variable ``LITESTAR_EXCLUDED_URLS`` will be checked. + +Example excluding health check endpoints: + +.. code-block:: python + + from litestar.plugins.opentelemetry import OpenTelemetryConfig, OpenTelemetryPlugin + + config = OpenTelemetryConfig( + exclude=["/health", "/readiness", "/metrics"] + ) + app = Litestar(plugins=[OpenTelemetryPlugin(config)]) + +Advanced options +~~~~~~~~~~~~~~~~ + +- ``scope_span_details_extractor``: Callback that returns a tuple of ``(span_name, attributes)`` for customizing span details from the ASGI scope. +- ``scopes``: ASGI scope types to process (e.g., ``{"http", "websocket"}``). If ``None``, both HTTP and WebSocket are processed. +- ``middleware_class``: Custom middleware class. Must be a subclass of ``OpenTelemetryInstrumentationMiddleware``. + +Litestar-specific spans +------------------------ + +Litestar can automatically create spans for framework events. With :class:`OpenTelemetryConfig `, +all instrumentation options are enabled by default: + +- ``instrument_guards`` (default ``True``): Create ``guard.*`` spans for each guard executed within a request. +- ``instrument_events`` (default ``True``): Create ``event.emit.*`` and ``event.listener.*`` spans for the event emitter. +- ``instrument_lifecycle`` (default ``True``): Wrap application startup/shutdown hooks with ``lifecycle.*`` spans. +- ``instrument_cli`` (default ``True``): Emit ``cli.*`` spans for Litestar CLI commands. + +Example with selective instrumentation: + +.. code-block:: python + + from litestar import Litestar + from litestar.plugins.opentelemetry import OpenTelemetryConfig, OpenTelemetryPlugin + + # Enable only guard and event instrumentation + config = OpenTelemetryConfig( + instrument_guards=True, + instrument_events=True, + instrument_lifecycle=False, + instrument_cli=False + ) + + app = Litestar(plugins=[OpenTelemetryPlugin(config)]) diff --git a/litestar/plugins/opentelemetry/__init__.py b/litestar/plugins/opentelemetry/__init__.py index 983b777754..58b6839251 100644 --- a/litestar/plugins/opentelemetry/__init__.py +++ b/litestar/plugins/opentelemetry/__init__.py @@ -1,4 +1,11 @@ from .config import OpenTelemetryConfig +from .instrumentation import ( + create_span, + instrument_channel_operation, + instrument_dependency, + instrument_guard, + instrument_lifecycle_event, +) from .middleware import OpenTelemetryInstrumentationMiddleware from .plugin import OpenTelemetryPlugin @@ -6,4 +13,9 @@ "OpenTelemetryConfig", "OpenTelemetryInstrumentationMiddleware", "OpenTelemetryPlugin", + "create_span", + "instrument_channel_operation", + "instrument_dependency", + "instrument_guard", + "instrument_lifecycle_event", ) diff --git a/litestar/plugins/opentelemetry/config.py b/litestar/plugins/opentelemetry/config.py index 3371f519c7..7e073938cd 100644 --- a/litestar/plugins/opentelemetry/config.py +++ b/litestar/plugins/opentelemetry/config.py @@ -13,17 +13,16 @@ __all__ = ("OpenTelemetryConfig",) -try: +try: # pragma: no cover - dependency is optional at runtime import opentelemetry # noqa: F401 -except ImportError as e: + from opentelemetry.trace import Span, TracerProvider # pyright: ignore +except ImportError as e: # pragma: no cover raise MissingDependencyException("opentelemetry") from e - -from opentelemetry.trace import Span, TracerProvider # pyright: ignore - if TYPE_CHECKING: from opentelemetry.metrics import Meter, MeterProvider + from litestar.plugins.opentelemetry.plugin import OpenTelemetryPlugin from litestar.types import Scope, Scopes OpenTelemetryHookHandler = Callable[[Span, dict], None] @@ -87,6 +86,18 @@ class OpenTelemetryConfig: Should be a subclass of OpenTelemetry InstrumentationMiddleware][litestar.plugins.opentelemetry.OpenTelemetryInstrumentationMiddleware]. """ + instrument_guards: bool = field(default=True) + """Whether to automatically instrument Litestar route guards.""" + instrument_events: bool = field(default=True) + """Whether to instrument event listeners and emitters.""" + instrument_lifecycle: bool = field(default=True) + """Whether to instrument application lifecycle hooks (startup/shutdown).""" + instrument_middleware: bool = field(default=False) + """Reserved for future middleware-level instrumentation. Currently unused and left for forward compatibility.""" + instrument_cli: bool = field(default=True) + """Whether to instrument Litestar CLI commands (click entrypoints).""" + + _plugin: OpenTelemetryPlugin | None = field(default=None, init=False, repr=False) @property def middleware(self) -> DefineMiddleware: @@ -100,3 +111,17 @@ def middleware(self) -> DefineMiddleware: An instance of ``DefineMiddleware``. """ return DefineMiddleware(self.middleware_class, config=self) + + @property + def plugin(self) -> OpenTelemetryPlugin: + """Convenience accessor to build an :class:`OpenTelemetryPlugin` from this config. + + Returns: + A plugin instance wired with this configuration. + """ + + if self._plugin is None: + from litestar.plugins.opentelemetry.plugin import OpenTelemetryPlugin + + self._plugin = OpenTelemetryPlugin(self) + return self._plugin diff --git a/litestar/plugins/opentelemetry/instrumentation.py b/litestar/plugins/opentelemetry/instrumentation.py new file mode 100644 index 0000000000..3e61a3c59d --- /dev/null +++ b/litestar/plugins/opentelemetry/instrumentation.py @@ -0,0 +1,253 @@ +"""OpenTelemetry instrumentation helpers for Litestar constructs. + +This module provides optional instrumentation for Litestar-specific features like guards, +lifecycle events, dependency injection, and channels. All functions gracefully handle +cases where OpenTelemetry is not installed. +""" + +from __future__ import annotations + +import inspect +from contextlib import asynccontextmanager, contextmanager +from functools import wraps +from typing import TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Generator + + from litestar.connection import ASGIConnection + from litestar.handlers.base import BaseRouteHandler +__all__ = ( + "create_span", + "instrument_channel_operation", + "instrument_dependency", + "instrument_guard", + "instrument_lifecycle_event", +) + +# Global flag to track if OTEL is available +_OTEL_AVAILABLE: bool | None = None +try: # pragma: no cover - optional dependency + from opentelemetry.trace import TracerProvider as _RuntimeTracerProvider +except ImportError: # pragma: no cover + _RuntimeTracerProvider = object # type: ignore[misc,assignment] + +_CUSTOM_TRACER_PROVIDER: _RuntimeTracerProvider | None = None # pyright: ignore + + +def _get_tracer(name: str) -> Any: + from opentelemetry import trace + + tracer_provider = _CUSTOM_TRACER_PROVIDER or trace.get_tracer_provider() + return tracer_provider.get_tracer(name) + + +def _is_otel_available() -> bool: + """Check if OpenTelemetry is installed and available.""" + global _OTEL_AVAILABLE + if _OTEL_AVAILABLE is not None: + return _OTEL_AVAILABLE + + try: + import opentelemetry # noqa: F401 + + _OTEL_AVAILABLE = True + except ImportError: + _OTEL_AVAILABLE = False + + return _OTEL_AVAILABLE + + +@contextmanager +def create_span(name: str, attributes: dict[str, Any] | None = None) -> Generator[Any, None, None]: + """Create an OTEL span if OpenTelemetry is available. + + Args: + name: The span name + attributes: Optional attributes to add to the span + + Yields: + The span object if OTEL is available, otherwise None + """ + if not _is_otel_available(): + yield None + return + + from opentelemetry.trace import Status, StatusCode + + tracer = _get_tracer(__name__) + with tracer.start_as_current_span(name) as span: + if attributes: + span.set_attributes(attributes) + + try: + yield span + except Exception as exc: + if span.is_recording(): + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise + + +@asynccontextmanager +async def create_span_async(name: str, attributes: dict[str, Any] | None = None) -> AsyncGenerator[Any, None]: + """Create an OTEL span if OpenTelemetry is available (async version). + + Args: + name: The span name + attributes: Optional attributes to add to the span + + Yields: + The span object if OTEL is available, otherwise None + """ + if not _is_otel_available(): + yield None + return + + from opentelemetry.trace import Status, StatusCode + + tracer = _get_tracer(__name__) + with tracer.start_as_current_span(name) as span: + if attributes: + span.set_attributes(attributes) + + try: + yield span + except Exception as exc: + if span.is_recording(): + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise + + +def instrument_guard(guard_func: Callable) -> Callable: + """Instrument a guard function with OpenTelemetry tracing. + + Args: + guard_func: The guard function to instrument + + Returns: + The instrumented guard function (or original if OTEL not available) + """ + if not _is_otel_available(): + return guard_func + + @wraps(guard_func) + async def instrumented_guard(connection: ASGIConnection, route_handler: BaseRouteHandler) -> None: + guard_name = getattr(guard_func, "__name__", str(guard_func)) + handler_name = route_handler.handler_name if hasattr(route_handler, "handler_name") else str(route_handler) + + async with create_span_async( + f"guard.{guard_name}", + attributes={ + "litestar.guard.name": guard_name, + "litestar.handler.name": handler_name, + "litestar.connection.type": connection.scope.get("type", "unknown"), + }, + ): + await guard_func(connection, route_handler) + + return instrumented_guard + + +def instrument_lifecycle_event(event_name: str) -> Callable: + """Decorator to instrument lifecycle events (startup/shutdown). + + Args: + event_name: Name of the lifecycle event (e.g., "startup", "shutdown") + + Returns: + Decorator function + """ + + def decorator(func: Callable) -> Callable: + if not _is_otel_available(): + return func + + @wraps(func) + async def instrumented_event(*args: Any, **kwargs: Any) -> Any: + func_name = getattr(func, "__name__", str(func)) + + async with create_span_async( + f"lifecycle.{event_name}.{func_name}", + attributes={ + "litestar.lifecycle.event": event_name, + "litestar.lifecycle.handler": func_name, + }, + ): + return await func(*args, **kwargs) + + return instrumented_event + + return decorator + + +def instrument_dependency(dependency_key: str, provider_func: Callable | None = None) -> Callable: + """Instrument a dependency provider with OpenTelemetry tracing. + + Args: + dependency_key: The dependency injection key + provider_func: The provider function to instrument + + Returns: + The instrumented provider function (or original if OTEL not available) + """ + if provider_func is None: + return lambda fn: instrument_dependency(dependency_key, fn) + + if not _is_otel_available(): + return provider_func + + @wraps(provider_func) + async def instrumented_provider(*args: Any, **kwargs: Any) -> Any: + provider_name = getattr(provider_func, "__name__", str(provider_func)) + + async with create_span_async( + f"dependency.{dependency_key}", + attributes={ + "litestar.dependency.key": dependency_key, + "litestar.dependency.provider": provider_name, + }, + ) as span: + result = provider_func(*args, **kwargs) + if inspect.isawaitable(result): + result = await result + + # Add result type as attribute + if span and span.is_recording(): + span.set_attribute("litestar.dependency.result_type", type(result).__name__) + + return result + + return instrumented_provider + + +def instrument_channel_operation(operation: str, channel: str) -> Callable: + """Decorator to instrument channel pub/sub operations. + + Args: + operation: The operation type ("publish", "subscribe") + channel: The channel name + + Returns: + Decorator function + """ + + def decorator(func: Callable) -> Callable: + if not _is_otel_available(): + return func + + @wraps(func) + async def instrumented_operation(*args: Any, **kwargs: Any) -> Any: + async with create_span_async( + f"channel.{operation}", + attributes={ + "litestar.channel.operation": operation, + "litestar.channel.name": channel, + }, + ): + return await func(*args, **kwargs) + + return instrumented_operation + + return decorator diff --git a/litestar/plugins/opentelemetry/middleware.py b/litestar/plugins/opentelemetry/middleware.py index c43ab65337..f821f12108 100644 --- a/litestar/plugins/opentelemetry/middleware.py +++ b/litestar/plugins/opentelemetry/middleware.py @@ -1,6 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import copy +import traceback +from typing import TYPE_CHECKING, ClassVar, cast from litestar.exceptions import MissingDependencyException from litestar.middleware.base import AbstractMiddleware @@ -13,7 +15,9 @@ except ImportError as e: raise MissingDependencyException("opentelemetry") from e +from opentelemetry import trace from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware +from opentelemetry.trace import Status, StatusCode from opentelemetry.util.http import get_excluded_urls if TYPE_CHECKING: @@ -22,7 +26,14 @@ class OpenTelemetryInstrumentationMiddleware(AbstractMiddleware): - """OpenTelemetry Middleware.""" + """OpenTelemetry Middleware with enhanced Litestar instrumentation. + + This middleware extends the standard OpenTelemetry ASGI middleware to provide: + - Exception recording with full stack traces via span.record_exception() + - Enhanced error status tracking on spans + """ + + __singleton_middleware__: ClassVar[OpenTelemetryMiddleware | None] = None def __init__(self, app: ASGIApp, config: OpenTelemetryConfig) -> None: """Middleware that adds OpenTelemetry instrumentation to the application. @@ -32,27 +43,42 @@ def __init__(self, app: ASGIApp, config: OpenTelemetryConfig) -> None: config: An instance of :class:`OpenTelemetryConfig <.plugins.opentelemetry.OpenTelemetryConfig>` """ super().__init__(app=app, scopes=config.scopes, exclude=config.exclude, exclude_opt_key=config.exclude_opt_key) - self.open_telemetry_middleware = OpenTelemetryMiddleware( - app=app, - client_request_hook=config.client_request_hook_handler, # type: ignore[arg-type] - client_response_hook=config.client_response_hook_handler, # type: ignore[arg-type] - default_span_details=config.scope_span_details_extractor, - excluded_urls=get_excluded_urls(config.exclude_urls_env_key), - meter=config.meter, - meter_provider=config.meter_provider, - server_request_hook=config.server_request_hook_handler, - tracer_provider=config.tracer_provider, - ) + self.config = config - async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: - """ASGI callable. + reuse_singleton = config.tracer_provider is None and self.__class__.__singleton_middleware__ is not None - Args: - scope: The ASGI connection scope. - receive: The ASGI receive function. - send: The ASGI send function. + if reuse_singleton: + cloned = cast("OpenTelemetryMiddleware", copy.copy(self.__class__.__singleton_middleware__)) + cloned.app = app + self.open_telemetry_middleware = cloned + else: + self.open_telemetry_middleware = OpenTelemetryMiddleware( + app=app, + client_request_hook=config.client_request_hook_handler, # type: ignore[arg-type] + client_response_hook=config.client_response_hook_handler, # type: ignore[arg-type] + default_span_details=config.scope_span_details_extractor, + excluded_urls=get_excluded_urls(config.exclude_urls_env_key), + meter=config.meter, + meter_provider=config.meter_provider, + server_request_hook=config.server_request_hook_handler, + tracer_provider=config.tracer_provider, + ) + if config.tracer_provider is None: + self.__class__.__singleton_middleware__ = self.open_telemetry_middleware - Returns: - None - """ - await self.open_telemetry_middleware(scope, receive, send) # type: ignore[arg-type] # pyright: ignore[reportGeneralTypeIssues] + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + # Import here to avoid circular dependency and ensure OTEL is optional + + try: + await self.open_telemetry_middleware(scope, receive, send) # type: ignore[arg-type] # pyright: ignore[reportGeneralTypeIssues] + except Exception as exc: + current_span = trace.get_current_span() + if current_span.get_span_context().is_valid and current_span.is_recording(): + current_span.record_exception( + exc, + attributes={ + "exception.stacktrace": "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)), + }, + ) + current_span.set_status(Status(StatusCode.ERROR, str(exc))) + raise diff --git a/litestar/plugins/opentelemetry/plugin.py b/litestar/plugins/opentelemetry/plugin.py index 75d3177dcc..0e4754808b 100644 --- a/litestar/plugins/opentelemetry/plugin.py +++ b/litestar/plugins/opentelemetry/plugin.py @@ -1,16 +1,30 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import inspect +from typing import TYPE_CHECKING, Any, cast +from litestar.handlers.base import BaseRouteHandler from litestar.middleware.base import DefineMiddleware from litestar.plugins import InitPlugin from litestar.plugins.opentelemetry.config import OpenTelemetryConfig +from litestar.plugins.opentelemetry.instrumentation import ( + _is_otel_available, + create_span, + create_span_async, + instrument_lifecycle_event, +) from litestar.plugins.opentelemetry.middleware import OpenTelemetryInstrumentationMiddleware if TYPE_CHECKING: from litestar.config.app import AppConfig + from litestar.connection import ASGIConnection + from litestar.types import Scope from litestar.types.composite_types import Middleware +_GUARDS_PATCHED = False +_EVENTS_PATCHED = False +_CLI_PATCHED = False + class OpenTelemetryPlugin(InitPlugin): """OpenTelemetry Plugin.""" @@ -29,9 +43,208 @@ def middleware(self) -> DefineMiddleware: return DefineMiddleware(OpenTelemetryInstrumentationMiddleware, config=self.config) def on_app_init(self, app_config: AppConfig) -> AppConfig: - app_config.middleware, _middleware = self._pop_otel_middleware(app_config.middleware) + app_config.middleware, existing_middleware = self._pop_otel_middleware(app_config.middleware) + + middleware_to_use = existing_middleware or self.middleware + app_config.middleware.insert(0, middleware_to_use) + + if self.config.instrument_guards: + self._instrument_guards() + + if self.config.instrument_events: + self._instrument_events() + + if self.config.instrument_lifecycle: + self._instrument_lifecycle(app_config) + + if self.config.instrument_cli: + self._instrument_cli() + + if self.config.tracer_provider is not None: + # Ensure the configured tracer provider is active so helper instrumentation uses the same provider. If a + # provider is already set, mirror the span processors onto it to avoid the OTEL override warning. + from opentelemetry import trace + + current_provider = trace.get_tracer_provider() + if current_provider is self.config.tracer_provider: + trace.set_tracer_provider(self.config.tracer_provider) + else: + configured_processors = getattr( + getattr(self.config.tracer_provider, "_active_span_processor", None), "_span_processors", None + ) + target_provider = getattr(current_provider, "_tracer_provider", current_provider) + if configured_processors: + for processor in configured_processors: + if hasattr(target_provider, "add_span_processor"): + target_provider.add_span_processor(processor) # pyright: ignore[reportAttributeAccessIssue] + else: # Fallback when provider cannot be mutated + trace.set_tracer_provider(self.config.tracer_provider) + break + else: + # As a last resort, force the configured provider to be used so its span processors are honored. + trace._TRACER_PROVIDER = self.config.tracer_provider + + from litestar.plugins.opentelemetry import instrumentation as otel_instrumentation + + otel_instrumentation._CUSTOM_TRACER_PROVIDER = self.config.tracer_provider + + app_config.after_exception.append(self._record_exception) + return app_config + async def _record_exception(self, exc: Exception, scope: Scope) -> None: + """Record exceptions on the active span if available. + + This hook is registered as an ``after_exception`` handler to ensure span recording even when middleware order + places the OpenTelemetry middleware inside the exception handling middleware. + """ + + from opentelemetry import trace + from opentelemetry.trace import Status, StatusCode + + span = trace.get_current_span() + if span.get_span_context().is_valid and span.is_recording(): + import traceback + + span.record_exception( + exc, + attributes={ + "exception.stacktrace": "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + }, + ) + span.set_status(Status(StatusCode.ERROR, str(exc))) + + def _instrument_guards(self) -> None: + global _GUARDS_PATCHED + if _GUARDS_PATCHED: + return + + original_authorize = BaseRouteHandler.authorize_connection + + async def authorize_with_spans(self: BaseRouteHandler, connection: ASGIConnection[Any, Any, Any, Any]) -> None: + if not _is_otel_available() or not self.guards: + return await original_authorize(self, connection) + + scope: dict[str, Any] = ( + cast("dict[str, Any]", connection.scope) if isinstance(getattr(connection, "scope", None), dict) else {} + ) + conn_type = scope.get("type", "unknown") + + for guard in self.guards: + guard_name = getattr(guard, "__name__", str(guard)) + handler_name = getattr(self, "handler_name", str(self)) + + async with create_span_async( + f"guard.{guard_name}", + attributes={ + "litestar.guard.name": guard_name, + "litestar.handler.name": handler_name, + "litestar.connection.type": conn_type, + }, + ): + await guard(connection, self) + return None + + setattr(BaseRouteHandler, "authorize_connection", cast("Any", authorize_with_spans)) + _GUARDS_PATCHED = True + + def _instrument_events(self) -> None: + try: + from litestar.events.emitter import SimpleEventEmitter + except ImportError: # pragma: no cover - emitter may be absent + return + + global _EVENTS_PATCHED + if _EVENTS_PATCHED: + return + + original_emit = SimpleEventEmitter.emit + + def emit_with_span(self: SimpleEventEmitter, event_id: str, *args: Any, **kwargs: Any) -> None: + if not _is_otel_available(): + return original_emit(self, event_id, *args, **kwargs) + + def make_wrapped(listener_name: str, event_id: str, fn: Any) -> Any: + """Create a wrapped listener function with properly bound loop variables.""" + + async def wrapped(*a: Any, **kw: Any) -> None: + async with create_span_async( + f"event.listener.{listener_name}", + attributes={ + "litestar.event.listener": listener_name, + "litestar.event.id": event_id, + }, + ): + result = fn(*a, **kw) + if inspect.isawaitable(result): + await result + + return wrapped + + if listeners := self.listeners.get(event_id): + with create_span(f"event.emit.{event_id}", attributes={"litestar.event.id": event_id}): + for listener in listeners: + fn = listener.fn + listener_name = getattr(fn, "__name__", str(fn)) + wrapped = make_wrapped(listener_name, event_id, fn) + + if self._send_stream is not None: + self._send_stream.send_nowait((wrapped, args, kwargs)) + return None + + return original_emit(self, event_id, *args, **kwargs) + + setattr(SimpleEventEmitter, "emit", cast("Any", emit_with_span)) + _EVENTS_PATCHED = True + + def _instrument_lifecycle(self, app_config: AppConfig) -> None: + if not _is_otel_available(): + return + + app_config.on_startup = [instrument_lifecycle_event("startup")(fn) for fn in app_config.on_startup] + app_config.on_shutdown = [instrument_lifecycle_event("shutdown")(fn) for fn in app_config.on_shutdown] + + def _instrument_cli(self) -> None: + if not _is_otel_available(): + return + + try: + import click + + from litestar.cli.main import litestar_group + except ImportError: # pragma: no cover - CLI not available in some contexts + return + + global _CLI_PATCHED + if _CLI_PATCHED: + return + + original_command_invoke = click.Command.invoke + original_group_invoke = getattr(click.Group, "invoke", None) + + def _span_attrs(ctx: click.Context) -> dict[str, Any]: + return { + "litestar.cli.command": ctx.command_path or ctx.command.name or "cli", + "litestar.cli.params": sorted(ctx.params.keys()), + "litestar.cli.obj_present": ctx.obj is not None, + } + + def invoke_with_span(command: click.Command, ctx: click.Context) -> Any: + with create_span(f"cli.{ctx.command_path or command.name or 'cli'}", attributes=_span_attrs(ctx)): + return original_command_invoke(command, ctx) + + def group_invoke_with_span(command: click.Group, ctx: click.Context) -> Any: + with create_span(f"cli.{ctx.command_path or command.name or 'cli'}", attributes=_span_attrs(ctx)): + if original_group_invoke: + return original_group_invoke(command, ctx) + return original_command_invoke(command, ctx) + + setattr(click.Command, "invoke", cast("Any", invoke_with_span)) + setattr(click.Group, "invoke", cast("Any", group_invoke_with_span)) + _CLI_PATCHED = True + + _ = litestar_group + @staticmethod def _pop_otel_middleware(middlewares: list[Middleware]) -> tuple[list[Middleware], DefineMiddleware | None]: """Get the OpenTelemetry middleware if it is enabled in the application. diff --git a/pyproject.toml b/pyproject.toml index 1abbf424f1..a548f3447f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -525,6 +525,7 @@ known-first-party = ["litestar", "tests", "examples"] "TC", "TRY", "E721", + "RUF029" ] "tests/unit/test_contrib/test_sqlalchemy/**/*.*" = ["UP006"] "tests/unit/test_openapi/test_typescript_converter/test_converter.py" = ["W293"] @@ -532,6 +533,7 @@ known-first-party = ["litestar", "tests", "examples"] "tools/**/*.*" = ["D", "ARG", "EM", "TRY", "G", "FBT"] "tools/prepare_release.py" = ["S603", "S607"] "tests/*" = ["PLC0415", "UP045"] +"litestar/plugins/opentelemetry/*.py" = ["PLW0603","RUF029"] [tool.ruff.format] docstring-code-format = true diff --git a/tests/unit/test_plugins/test_opentelemetry.py b/tests/unit/test_plugins/test_opentelemetry.py new file mode 100644 index 0000000000..f632de9738 --- /dev/null +++ b/tests/unit/test_plugins/test_opentelemetry.py @@ -0,0 +1,453 @@ +"""Tests for enhanced OpenTelemetry instrumentation features.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + pass + +pytestmark = pytest.mark.usefixtures("reset_opentelemetry_tracer_provider") + + +@pytest.fixture +def reset_opentelemetry_tracer_provider() -> None: + """Reset the OpenTelemetry tracer provider after each test.""" + from opentelemetry import trace + + from litestar.plugins.opentelemetry import instrumentation + + # Reset the global tracer provider + if hasattr(trace, "_TRACER_PROVIDER"): + setattr(trace, "_TRACER_PROVIDER", None) + if hasattr(trace, "_TRACER_PROVIDER_SET"): + setattr(trace, "_TRACER_PROVIDER_SET", False) + instrumentation._CUSTOM_TRACER_PROVIDER = None + instrumentation._OTEL_AVAILABLE = None + + +def test_exception_recording_in_middleware() -> None: + """Test that exceptions are recorded with full stack traces.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar import get + from litestar.plugins.opentelemetry import OpenTelemetryConfig + from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR + from litestar.testing import create_test_client + + # Create an in-memory span exporter to capture traces + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + @get("/error") + def error_handler() -> None: + raise ValueError("Test error for OTEL") + + with create_test_client( + route_handlers=[error_handler], + plugins=[OpenTelemetryConfig(tracer_provider=tracer_provider).plugin], + ) as client: + response = client.get("/error") + assert response.status_code == HTTP_500_INTERNAL_SERVER_ERROR + + # Verify that spans were created and exceptions were recorded + spans = span_exporter.get_finished_spans() + assert spans + + # Find the span that should have recorded the exception + exception_spans = [span for span in spans if span.events] + assert len(exception_spans) > 0, "Expected at least one span with exception events" + + # Verify exception was recorded + for span in exception_spans: + for event in span.events: + if event.name == "exception": + attrs = event.attributes or {} + assert attrs.get("exception.message") == "Test error for OTEL" + assert attrs.get("exception.type") == "ValueError" + break + else: + continue + break + else: + pytest.fail("No exception event found in spans") + + +def test_exception_recording_with_http_exception() -> None: + """Test that HTTP exceptions are also recorded.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar import get + from litestar.exceptions import NotAuthorizedException + from litestar.plugins.opentelemetry import OpenTelemetryConfig + from litestar.status_codes import HTTP_401_UNAUTHORIZED + from litestar.testing import create_test_client + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + @get("/unauthorized") + def unauthorized_handler() -> None: + raise NotAuthorizedException("Not authorized") + + with create_test_client( + route_handlers=[unauthorized_handler], + plugins=[OpenTelemetryConfig(tracer_provider=tracer_provider).plugin], + ) as client: + response = client.get("/unauthorized") + assert response.status_code == HTTP_401_UNAUTHORIZED + + spans = span_exporter.get_finished_spans() + assert spans + + +def test_middleware_without_exceptions() -> None: + """Test that middleware works correctly when no exceptions occur.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar import get + from litestar.plugins.opentelemetry import OpenTelemetryConfig + from litestar.status_codes import HTTP_200_OK + from litestar.testing import create_test_client + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + @get("/success") + def success_handler() -> dict: + return {"status": "ok"} + + with create_test_client( + route_handlers=[success_handler], + plugins=[OpenTelemetryConfig(tracer_provider=tracer_provider).plugin], + ) as client: + response = client.get("/success") + assert response.status_code == HTTP_200_OK + assert response.json() == {"status": "ok"} + + spans = span_exporter.get_finished_spans() + assert len(spans) > 0 + + # Verify no exception events + for span in spans: + exception_events = [event for event in span.events if event.name == "exception"] + assert len(exception_events) == 0 + + +def test_instrumentation_helpers_without_otel() -> None: + """Test that instrumentation helpers work gracefully without OpenTelemetry installed.""" + from litestar.plugins.opentelemetry import instrumentation + + instrumentation._OTEL_AVAILABLE = False # Simulate missing dependency + + from litestar.plugins.opentelemetry import create_span + + with create_span("test.span") as span: + assert span is None # Should return None when OTEL not available + + instrumentation._OTEL_AVAILABLE = None + + +def test_guard_instrumentation() -> None: + """Test that guard functions can be instrumented.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar import get + from litestar.connection import ASGIConnection + from litestar.exceptions import NotAuthorizedException + from litestar.handlers.base import BaseRouteHandler + from litestar.plugins.opentelemetry import OpenTelemetryConfig, instrument_guard + from litestar.testing import create_test_client + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + @instrument_guard + async def test_guard(connection: ASGIConnection, route_handler: BaseRouteHandler) -> None: + if not connection.headers.get("x-api-key"): + raise NotAuthorizedException("API key required") + + @get("/protected", guards=[test_guard]) + def protected_handler() -> dict: + return {"status": "protected"} + + with create_test_client( + route_handlers=[protected_handler], + plugins=[OpenTelemetryConfig(tracer_provider=tracer_provider).plugin], + ) as client: + # Test without API key (should fail) + response = client.get("/protected") + assert response.status_code == 401 + + # Clear spans + span_exporter.clear() + + # Test with API key (should succeed) + response = client.get("/protected", headers={"x-api-key": "test-key"}) + assert response.status_code == 200 + + spans = span_exporter.get_finished_spans() + guard_spans = [span for span in spans if "guard" in span.name] + assert len(guard_spans) > 0, "Expected at least one guard span" + + +def test_guard_instrumentation_config_opt_in() -> None: + """Guard spans are created when config.instrument_guards is enabled without decorators.""" + + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar import get + from litestar.connection import ASGIConnection + from litestar.exceptions import NotAuthorizedException + from litestar.handlers.base import BaseRouteHandler + from litestar.plugins.opentelemetry import OpenTelemetryConfig + from litestar.testing import create_test_client + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + async def plain_guard(connection: ASGIConnection, route_handler: BaseRouteHandler) -> None: + if not connection.headers.get("x-api-key"): + raise NotAuthorizedException("API key required") + + @get("/protected", guards=[plain_guard]) + def protected_handler() -> dict: + return {"status": "protected"} + + config = OpenTelemetryConfig(tracer_provider=tracer_provider, instrument_guards=True) + + with create_test_client(route_handlers=[protected_handler], plugins=[config.plugin]) as client: + client.get("/protected", headers={"x-api-key": "test"}) + + spans = span_exporter.get_finished_spans() + guard_spans = [span for span in spans if span.name.startswith("guard.")] + assert guard_spans, "Expected guard spans when instrument_guards is enabled" + + +def test_dependency_instrumentation() -> None: + """Test that dependency providers can be instrumented.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar import get + from litestar.di import Provide + from litestar.plugins.opentelemetry import OpenTelemetryConfig, instrument_dependency + from litestar.testing import create_test_client + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + @instrument_dependency("test_dependency") + async def test_dependency_provider() -> dict: + return {"value": "test"} + + @get("/with-dependency") + def handler_with_dependency(test_dependency: dict) -> dict: + return test_dependency + + with create_test_client( + route_handlers=[handler_with_dependency], + dependencies={"test_dependency": Provide(test_dependency_provider)}, + plugins=[OpenTelemetryConfig(tracer_provider=tracer_provider).plugin], + ) as client: + response = client.get("/with-dependency") + assert response.status_code == 200 + assert response.json() == {"value": "test"} + + spans = span_exporter.get_finished_spans() + dependency_spans = [span for span in spans if "dependency" in span.name] + assert len(dependency_spans) > 0, "Expected at least one dependency span" + + +def test_lifecycle_event_instrumentation() -> None: + """Test that lifecycle events can be instrumented.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar.plugins.opentelemetry import OpenTelemetryConfig, instrument_lifecycle_event + from litestar.testing import create_test_client + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + startup_called = False + shutdown_called = False + + @instrument_lifecycle_event("startup") + async def on_startup() -> None: + nonlocal startup_called + startup_called = True + + @instrument_lifecycle_event("shutdown") + async def on_shutdown() -> None: + nonlocal shutdown_called + shutdown_called = True + + with create_test_client( + route_handlers=[], + on_startup=[on_startup], + on_shutdown=[on_shutdown], + plugins=[OpenTelemetryConfig(tracer_provider=tracer_provider).plugin], + ): + pass + + assert startup_called + assert shutdown_called + + spans = span_exporter.get_finished_spans() + lifecycle_spans = [span for span in spans if "lifecycle" in span.name] + assert len(lifecycle_spans) >= 2, "Expected at least startup and shutdown spans" + + +def test_create_span_context_manager() -> None: + """Test the create_span context manager.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar.plugins.opentelemetry import create_span + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + from opentelemetry import trace + + trace._TRACER_PROVIDER = tracer_provider + trace.set_tracer_provider(tracer_provider) + + with create_span("test.operation", attributes={"test.attr": "value"}) as span: + assert span is not None + assert span.is_recording() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test.operation" + assert spans[0].attributes is not None + assert spans[0].attributes.get("test.attr") == "value" + + +def test_create_span_with_exception() -> None: + """Test that create_span records exceptions.""" + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar.plugins.opentelemetry import create_span + + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + from opentelemetry import trace + + trace._TRACER_PROVIDER = tracer_provider + trace.set_tracer_provider(tracer_provider) + + with pytest.raises(ValueError, match="Test exception"): + with create_span("test.failing_operation"): + raise ValueError("Test exception") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test.failing_operation" + + # Check for exception event + exception_events = [event for event in spans[0].events if event.name == "exception"] + assert exception_events + first_event = exception_events[0] + assert first_event.attributes is not None + assert first_event.attributes.get("exception.type") == "ValueError" + assert first_event.attributes.get("exception.message") == "Test exception" + + +def test_cli_instrumentation_creates_span() -> None: + """CLI commands emit spans when CLI instrumentation is enabled.""" + + from click.testing import CliRunner + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar.cli.main import litestar_group + from litestar.config.app import AppConfig + from litestar.plugins.opentelemetry import OpenTelemetryConfig, OpenTelemetryPlugin + + exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(exporter)) + trace.set_tracer_provider(tracer_provider) + + plugin = OpenTelemetryPlugin(OpenTelemetryConfig(tracer_provider=tracer_provider, instrument_cli=True)) + plugin.on_app_init(AppConfig()) + + runner = CliRunner() + result = runner.invoke(litestar_group, ["version"]) + assert result.exit_code == 0 + + spans = exporter.get_finished_spans() + assert any(span.name.startswith("cli.") for span in spans), "expected CLI spans when instrument_cli is enabled" + + +def test_exception_stacktrace_recorded_in_middleware() -> None: + """Ensure middleware records stack traces on exceptions.""" + + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from litestar import get + from litestar.plugins.opentelemetry import OpenTelemetryConfig + from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR + from litestar.testing import create_test_client + + exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(exporter)) + + @get("/boom") + def boom() -> None: + raise RuntimeError("kaboom") + + with create_test_client( + route_handlers=[boom], + plugins=[OpenTelemetryConfig(tracer_provider=tracer_provider).plugin], + ) as client: + response = client.get("/boom") + assert response.status_code == HTTP_500_INTERNAL_SERVER_ERROR + + spans = exporter.get_finished_spans() + # find span with exception event + exception_spans = [span for span in spans if any(event.name == "exception" for event in span.events)] + assert exception_spans, "expected exception span" + stacktrace_attrs = [ + (event.attributes or {}).get("exception.stacktrace") + for span in exception_spans + for event in span.events + if event.name == "exception" + ] + assert any(stacktrace_attrs), "stacktrace attribute should be recorded" + first_stacktrace = next(filter(None, stacktrace_attrs)) + assert "RuntimeError" in str(first_stacktrace)