diff --git a/requirements-testing.txt b/requirements-testing.txt index 8e7bc47be0..e9a972680c 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -11,7 +11,7 @@ asttokens responses pysocks socksio -httpcore[http2] +httpcore[http2,asyncio] setuptools freezegun Brotli diff --git a/scripts/populate_tox/config.py b/scripts/populate_tox/config.py index 78bed91475..06eac2aa83 100644 --- a/scripts/populate_tox/config.py +++ b/scripts/populate_tox/config.py @@ -96,7 +96,7 @@ "pytest-asyncio", "python-multipart", "requests", - "anyio<4", + "anyio>=3,<5", ], # There's an incompatibility between FastAPI's TestClient, which is # actually Starlette's TestClient, which is actually httpx's Client. @@ -106,6 +106,7 @@ # FastAPI versions we use older httpx which still supports the # deprecated argument. "<0.110.1": ["httpx<0.28.0"], + "<0.80": ["anyio<4"], "py3.6": ["aiocontextvars"], }, }, diff --git a/scripts/populate_tox/tox.jinja b/scripts/populate_tox/tox.jinja index 66b1d7885a..514566ea46 100644 --- a/scripts/populate_tox/tox.jinja +++ b/scripts/populate_tox/tox.jinja @@ -207,7 +207,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index ddddab488b..23af1111a7 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -3,6 +3,7 @@ import uuid import random import socket +import asyncio from collections.abc import Mapping from datetime import datetime, timezone from importlib import import_module @@ -25,7 +26,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace -from sentry_sdk.transport import BaseHttpTransport, make_transport +from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -406,7 +407,7 @@ def _capture_envelope(envelope: Envelope) -> None: self.monitor or self.log_batcher or has_profiling_enabled(self.options) - or isinstance(self.transport, BaseHttpTransport) + or isinstance(self.transport, HttpTransportCore) ): # If we have anything on that could spawn a background thread, we # need to check if it's safe to use them. @@ -917,51 +918,126 @@ def get_integration( return self.integrations.get(integration_name) - def close( + def _close_transport(self) -> Optional[asyncio.Task[None]]: + """Close transport and return cleanup task if any.""" + if self.transport is not None: + cleanup_task = self.transport.kill() # type: ignore + self.transport = None + return cleanup_task + return None + + def _close_components(self) -> None: + """Kill all client components in the correct order.""" + self.session_flusher.kill() + if self.log_batcher is not None: + self.log_batcher.kill() + if self.monitor: + self.monitor.kill() + + async def _close_components_async(self) -> None: + """Async version of _close_components that properly awaits transport cleanup.""" + self._close_components() + cleanup_task = self._close_transport() + if cleanup_task is not None: + await cleanup_task + + def close( # type: ignore[override] self, timeout: Optional[float] = None, callback: Optional[Callable[[int, float], None]] = None, - ) -> None: + ) -> Optional[asyncio.Task[None]]: """ Close the client and shut down the transport. Arguments have the same - semantics as :py:meth:`Client.flush`. + semantics as :py:meth:`Client.flush`. When using the async transport, close needs to be awaited to block. """ - if self.transport is not None: - self.flush(timeout=timeout, callback=callback) - self.session_flusher.kill() + async def _flush_and_close( + timeout: Optional[float], callback: Optional[Callable[[int, float], None]] + ) -> None: - if self.log_batcher is not None: - self.log_batcher.kill() + await self._flush_async(timeout=timeout, callback=callback) + await self._close_components_async() - if self.monitor: - self.monitor.kill() + if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): - self.transport.kill() - self.transport = None + try: + flush_task = self.transport.loop.create_task( + _flush_and_close(timeout, callback) + ) + except RuntimeError: + # Shutdown the components anyway + self._close_components() + self._close_transport() + logger.warning("Event loop not running, aborting close.") + return None + # Enforce flush before shutdown + return flush_task + else: + self.flush(timeout=timeout, callback=callback) + self._close_components() + self._close_transport() + + return None - def flush( + def flush( # type: ignore[override] self, timeout: Optional[float] = None, callback: Optional[Callable[[int, float], None]] = None, - ) -> None: + ) -> Optional[asyncio.Task[None]]: """ - Wait for the current events to be sent. + Wait for the current events to be sent. When using the async transport, flush needs to be awaited to block. :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. :param callback: Is invoked with the number of pending events and the configured timeout. """ if self.transport is not None: - if timeout is None: - timeout = self.options["shutdown_timeout"] - self.session_flusher.flush() + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + try: + return self.transport.loop.create_task( + self._flush_async(timeout, callback) + ) + except RuntimeError: + logger.warning("Event loop not running, aborting flush.") + return None + else: + self._flush_sync(timeout, callback) + return None - if self.log_batcher is not None: - self.log_batcher.flush() + def _flush_sync( + self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]] + ) -> None: + """Synchronous flush implementation.""" + if timeout is None: + timeout = self.options["shutdown_timeout"] + self._flush_components() + if self.transport is not None: self.transport.flush(timeout=timeout, callback=callback) + async def _flush_async( + self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]] + ) -> None: + """Asynchronous flush implementation.""" + if timeout is None: + timeout = self.options["shutdown_timeout"] + + self._flush_components() + if self.transport is not None: + flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore + if flush_task is not None: + await flush_task + + def _flush_components(self) -> None: + self.session_flusher.flush() + if self.log_batcher is not None: + self.log_batcher.flush() + def __enter__(self) -> _Client: return self diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 29a5b21434..ed179a572f 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -78,6 +78,7 @@ class CompressionAlgo(Enum): "transport_compression_algo": Optional[CompressionAlgo], "transport_num_pools": Optional[int], "transport_http2": Optional[bool], + "transport_async": Optional[bool], }, total=False, ) diff --git a/sentry_sdk/integrations/asyncio.py b/sentry_sdk/integrations/asyncio.py index 719cbba1a8..5188e8efba 100644 --- a/sentry_sdk/integrations/asyncio.py +++ b/sentry_sdk/integrations/asyncio.py @@ -5,6 +5,7 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.utils import event_from_exception, logger, reraise +from sentry_sdk.transport import AsyncHttpTransport try: import asyncio @@ -29,6 +30,47 @@ def get_name(coro: Any) -> str: ) +def patch_loop_close() -> None: + """Patch loop.close to flush pending events before shutdown.""" + # Atexit shutdown hook happens after the event loop is closed. + # Therefore, it is necessary to patch the loop.close method to ensure + # that pending events are flushed before the interpreter shuts down. + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop → cannot patch now + return + + if getattr(loop, "_sentry_flush_patched", False): + return + + async def _flush() -> None: + client = sentry_sdk.get_client() + if not client: + return + + try: + if not isinstance(client.transport, AsyncHttpTransport): + return + + task = client.close() # type: ignore + if task is not None: + await task + except Exception: + logger.warning("Sentry flush failed during loop shutdown", exc_info=True) + + orig_close = loop.close + + def _patched_close() -> None: + try: + loop.run_until_complete(_flush()) + finally: + orig_close() + + loop.close = _patched_close # type: ignore + loop._sentry_flush_patched = True # type: ignore + + def patch_asyncio() -> None: orig_task_factory = None try: @@ -124,3 +166,4 @@ class AsyncioIntegration(Integration): @staticmethod def setup_once() -> None: patch_asyncio() + patch_loop_close() diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index ac7a8c3522..318ec9b659 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -6,6 +6,8 @@ import socket import ssl import time +import asyncio + from datetime import datetime, timedelta, timezone from collections import defaultdict from urllib.request import getproxies @@ -17,18 +19,29 @@ try: import httpcore +except ImportError: + httpcore = None # type: ignore + +try: import h2 # noqa: F401 - HTTP2_ENABLED = True + HTTP2_ENABLED = httpcore is not None except ImportError: HTTP2_ENABLED = False +try: + import anyio # noqa: F401 + + ASYNC_TRANSPORT_ENABLED = httpcore is not None +except ImportError: + ASYNC_TRANSPORT_ENABLED = False + import urllib3 import certifi from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -162,8 +175,8 @@ def _parse_rate_limits( continue -class BaseHttpTransport(Transport): - """The base HTTP transport.""" +class HttpTransportCore(Transport): + """Shared base class for sync and async transports.""" TIMEOUT = 30 # seconds @@ -173,7 +186,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: Transport.__init__(self, options) assert self.parsed_dsn is not None self.options: Dict[str, Any] = options - self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) + self._worker = self._create_worker(options) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until: Dict[Optional[str], datetime] = {} # We only use this Retry() class for the `get_retry_after` method it exposes @@ -224,6 +237,19 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 + def _create_worker(self, options: dict[str, Any]) -> Worker: + async_enabled = options.get("_experiments", {}).get("transport_async", False) + try: + asyncio.get_running_loop() + worker_cls = ( + AsyncWorker + if async_enabled and ASYNC_TRANSPORT_ENABLED + else BackgroundWorker + ) + except RuntimeError: + worker_cls = BackgroundWorker + return worker_cls(queue_size=options["transport_queue_size"]) + def record_lost_event( self: Self, reason: str, @@ -286,12 +312,8 @@ def _update_rate_limits( seconds=retry_after ) - def _send_request( - self: Self, - body: bytes, - headers: Dict[str, str], - endpoint_type: EndpointType = EndpointType.ENVELOPE, - envelope: Optional[Envelope] = None, + def _handle_request_error( + self: Self, envelope: Optional[Envelope], loss_reason: str = "network" ) -> None: def record_loss(reason: str) -> None: if envelope is None: @@ -300,45 +322,45 @@ def record_loss(reason: str) -> None: for item in envelope.items: self.record_lost_event(reason, item=item) + self.on_dropped_event(loss_reason) + record_loss("network_error") + + def _handle_response( + self: Self, + response: Union[urllib3.BaseHTTPResponse, httpcore.Response], + envelope: Optional[Envelope], + ) -> None: + self._update_rate_limits(response) + + if response.status == 429: + # if we hit a 429. Something was rate limited but we already + # acted on this in `self._update_rate_limits`. Note that we + # do not want to record event loss here as we will have recorded + # an outcome in relay already. + self.on_dropped_event("status_429") + pass + + elif response.status >= 300 or response.status < 200: + logger.error( + "Unexpected status code: %s (body: %s)", + response.status, + getattr(response, "data", getattr(response, "content", None)), + ) + self._handle_request_error( + envelope=envelope, loss_reason="status_{}".format(response.status) + ) + + def _update_headers( + self: Self, + headers: Dict[str, str], + ) -> None: + headers.update( { "User-Agent": str(self._auth.client), "X-Sentry-Auth": str(self._auth.to_header()), } ) - try: - response = self._request( - "POST", - endpoint_type, - body, - headers, - ) - except Exception: - self.on_dropped_event("network") - record_loss("network_error") - raise - - try: - self._update_rate_limits(response) - - if response.status == 429: - # if we hit a 429. Something was rate limited but we already - # acted on this in `self._update_rate_limits`. Note that we - # do not want to record event loss here as we will have recorded - # an outcome in relay already. - self.on_dropped_event("status_429") - pass - - elif response.status >= 300 or response.status < 200: - logger.error( - "Unexpected status code: %s (body: %s)", - response.status, - getattr(response, "data", getattr(response, "content", None)), - ) - self.on_dropped_event("status_{}".format(response.status)) - record_loss("network_error") - finally: - response.close() def on_dropped_event(self: Self, _reason: str) -> None: return None @@ -375,11 +397,6 @@ def _fetch_pending_client_report( type="client_report", ) - def _flush_client_reports(self: Self, force: bool = False) -> None: - client_report = self._fetch_pending_client_report(force=force, interval=60) - if client_report is not None: - self.capture_envelope(Envelope(items=[client_report])) - def _check_disabled(self: Self, category: EventDataCategory) -> bool: def _disabled(bucket: Optional[EventDataCategory]) -> bool: ts = self._disabled_until.get(bucket) @@ -398,7 +415,9 @@ def _is_worker_full(self: Self) -> bool: def is_healthy(self: Self) -> bool: return not (self._is_worker_full() or self._is_rate_limited()) - def _send_envelope(self: Self, envelope: Envelope) -> None: + def _prepare_envelope( + self: Self, envelope: Envelope + ) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]: # remove all items from the envelope which are over quota new_items = [] @@ -442,13 +461,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None: if content_encoding: headers["Content-Encoding"] = content_encoding - self._send_request( - body.getvalue(), - headers=headers, - endpoint_type=EndpointType.ENVELOPE, - envelope=envelope, - ) - return None + return envelope, body, headers def _serialize_envelope( self: Self, envelope: Envelope @@ -494,6 +507,9 @@ def _make_pool( httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool, + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, ]: raise NotImplementedError() @@ -506,6 +522,54 @@ def _request( ) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]: raise NotImplementedError() + def kill(self: Self) -> None: + logger.debug("Killing HTTP transport") + self._worker.kill() + + +class BaseHttpTransport(HttpTransportCore): + """The base HTTP transport.""" + + def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + response.close() + + def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + def capture_envelope(self: Self, envelope: Envelope) -> None: def send_envelope_wrapper() -> None: with capture_internal_exceptions(): @@ -528,10 +592,6 @@ def flush( self._worker.submit(lambda: self._flush_client_reports(force=True)) self._worker.flush(timeout, callback) - def kill(self: Self) -> None: - logger.debug("Killing HTTP transport") - self._worker.kill() - class HttpTransport(BaseHttpTransport): if TYPE_CHECKING: @@ -639,6 +699,240 @@ def _request( ) +if not ASYNC_TRANSPORT_ENABLED: + # Sorry, no AsyncHttpTransport for you + class AsyncHttpTransport(HttpTransport): + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + logger.warning( + "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." + ) + +else: + + class AsyncHttpTransport(HttpTransportCore): # type: ignore + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + # Requires event loop at init time + self.loop = asyncio.get_running_loop() + self.background_tasks: set[asyncio.Task[None]] = set() + + def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: + return next( + ( + val.decode("ascii") + for key, val in response.headers + if key.decode("ascii").lower() == header + ), + None, + ) + + async def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + async def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = await self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + await response.aclose() + + async def _request( # type: ignore[override] + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> httpcore.Response: + return await self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + content=body, + headers=headers, # type: ignore + extensions={ + "timeout": { + "pool": self.TIMEOUT, + "connect": self.TIMEOUT, + "write": self.TIMEOUT, + "read": self.TIMEOUT, + } + }, + ) + + async def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + + async def _capture_envelope(self: Self, envelope: Envelope) -> None: + async def send_envelope_wrapper() -> None: + with capture_internal_exceptions(): + await self._send_envelope(envelope) + await self._flush_client_reports() + + if not self._worker.submit(send_envelope_wrapper): + self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) + + def capture_envelope(self: Self, envelope: Envelope) -> None: + # Synchronous entry point + try: + asyncio.get_running_loop() + # We are on the main thread running the event loop + task = asyncio.create_task(self._capture_envelope(envelope)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + except RuntimeError: + # We are in a background thread, not running an event loop, + # have to launch the task on the loop in a threadsafe way. + if self.loop and self.loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._capture_envelope(envelope), + self.loop, + ) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("internal_sdk_error") + for item in envelope.items: + self.record_lost_event("internal_sdk_error", item=item) + + def flush( # type: ignore[override] + self: Self, + timeout: float, + callback: Optional[Callable[[int, float], None]] = None, + ) -> Optional[asyncio.Task[None]]: + logger.debug("Flushing HTTP transport") + + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) + return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] + return None + + def _get_pool_options(self: Self) -> Dict[str, Any]: + options: Dict[str, Any] = { + "http2": False, # no HTTP2 for now + "retries": 3, + } + + socket_options = ( + self.options["socket_options"] + if self.options["socket_options"] is not None + else [] + ) + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + options["socket_options"] = socket_options + + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") + key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") + if cert_file is not None: + ssl_context.load_cert_chain(cert_file, key_file) + + options["ssl_context"] = ssl_context + + return options + + def _make_pool( + self: Self, + ) -> Union[ + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, + ]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + try: + if "socket_options" in opts: + socket_options = opts.pop("socket_options") + if socket_options: + logger.warning( + "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." + ) + return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) + except RuntimeError: + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", + proxy, + ) + else: + return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + + return httpcore.AsyncConnectionPool(**opts) + + def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore + + logger.debug("Killing HTTP transport") + self._worker.kill() + for task in self.background_tasks: + task.cancel() + self.background_tasks.clear() + try: + # Return the pool cleanup task so caller can await it if needed + return self.loop.create_task(self._pool.aclose()) # type: ignore + except RuntimeError: + logger.warning("Event loop not running, aborting kill.") + return None + + if not HTTP2_ENABLED: # Sorry, no Http2Transport for you class Http2Transport(HttpTransport): @@ -778,11 +1072,30 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: ref_transport = options["transport"] use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) - + use_async_transport = options.get("_experiments", {}).get("transport_async", False) # By default, we use the http transport class - transport_cls: Type[Transport] = ( - Http2Transport if use_http2_transport else HttpTransport - ) + sync_transport_cls = Http2Transport if use_http2_transport else HttpTransport + if use_async_transport: + try: + asyncio.get_running_loop() + + # Asyncio Integration is necessary for AsyncHttpTransport, as it patches the event loop close for this transport. + if any( + integration.__class__.__name__ == "AsyncioIntegration" + for integration in options.get("integrations", []) + ): + transport_cls: Type[Transport] = AsyncHttpTransport + else: + logger.warning( + "AsyncHttpTransport requires the AsyncioIntegration to be enabled, falling back to sync transport." + ) + transport_cls = sync_transport_cls + except RuntimeError: + # No event loop running, fall back to sync transport + logger.warning("No event loop running, falling back to sync transport.") + transport_cls = sync_transport_cls + else: + transport_cls = sync_transport_cls if isinstance(ref_transport, Transport): return ref_transport diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d911e15623..c8dbbb2d73 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,6 +1,8 @@ from __future__ import annotations +from abc import ABC, abstractmethod import os import threading +import asyncio from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -16,7 +18,65 @@ _TERMINATOR = object() -class BackgroundWorker: +class Worker(ABC): + """ + Base class for all workers. + + A worker is used to process events in the background and send them to Sentry. + """ + + @property + @abstractmethod + def is_alive(self) -> bool: + """ + Checks whether the worker is alive and running. + + Returns True if the worker is alive, False otherwise. + """ + pass + + @abstractmethod + def kill(self) -> None: + """ + Kills the worker. + + This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. + The worker will not be able to process any more events. + """ + pass + + def flush( + self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None + ) -> None: + """ + Flush the worker. + + This method blocks until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. + """ + return None + + @abstractmethod + def full(self) -> bool: + """ + Checks whether the worker's queue is full. + + Returns True if the queue is full, False otherwise. + """ + pass + + @abstractmethod + def submit(self, callback: Callable[[], Any]) -> bool: + """ + Schedule a callback to be executed by the worker. + + Returns True if the callback was scheduled, False if the queue is full. + """ + pass + + +class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: Queue = Queue(queue_size) self._lock = threading.Lock() @@ -106,7 +166,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_thread() try: self._queue.put_nowait(callback) @@ -127,3 +187,134 @@ def _target(self) -> None: finally: self._queue.task_done() sleep(0) + + +class AsyncWorker(Worker): + def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: + self._queue: Optional[asyncio.Queue[Any]] = None + self._queue_size = queue_size + self._task: Optional[asyncio.Task[None]] = None + # Event loop needs to remain in the same process + self._task_for_pid: Optional[int] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + # Track active callback tasks so they have a strong reference and can be cancelled on kill + self._active_tasks: set[asyncio.Task[None]] = set() + + @property + def is_alive(self) -> bool: + if self._task_for_pid != os.getpid(): + return False + if not self._task or not self._loop: + return False + return self._loop.is_running() and not self._task.done() + + def kill(self) -> None: + if self._task: + if self._queue is not None: + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") + # Also cancel any active callback tasks + # Avoid modifying the set while cancelling tasks + tasks_to_cancel = set(self._active_tasks) + for task in tasks_to_cancel: + task.cancel() + self._active_tasks.clear() + self._loop = None + self._task = None + self._task_for_pid = None + + def start(self) -> None: + if not self.is_alive: + try: + self._loop = asyncio.get_running_loop() + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._queue_size) + self._task = self._loop.create_task(self._target()) + self._task_for_pid = os.getpid() + except RuntimeError: + # There is no event loop running + logger.warning("No event loop running, async worker not started") + self._loop = None + self._task = None + self._task_for_pid = None + + def full(self) -> bool: + if self._queue is None: + return True + return self._queue.full() + + def _ensure_task(self) -> None: + if not self.is_alive: + self.start() + + async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: + if not self._loop or not self._loop.is_running() or self._queue is None: + return + + initial_timeout = min(0.1, timeout) + + # Timeout on the join + try: + await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.debug("%d event(s) pending on flush", pending) + if callback is not None: + callback(pending, timeout) + + try: + remaining_timeout = timeout - initial_timeout + await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.error("flush timed out, dropped %s events", pending) + + def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override] + if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): + return self._loop.create_task(self._wait_flush(timeout, callback)) + return None + + def submit(self, callback: Callable[[], Any]) -> bool: + self._ensure_task() + if self._queue is None: + return False + try: + self._queue.put_nowait(callback) + return True + except asyncio.QueueFull: + return False + + async def _target(self) -> None: + if self._queue is None: + return + while True: + callback = await self._queue.get() + if callback is _TERMINATOR: + self._queue.task_done() + break + # Firing tasks instead of awaiting them allows for concurrent requests + task = asyncio.create_task(self._process_callback(callback)) + # Create a strong reference to the task so it can be cancelled on kill + # and does not get garbage collected while running + self._active_tasks.add(task) + task.add_done_callback(self._on_task_complete) + # Yield to let the event loop run other tasks + await asyncio.sleep(0) + + async def _process_callback(self, callback: Callable[[], Any]) -> None: + # Callback is an async coroutine, need to await it + await callback() + + def _on_task_complete(self, task: asyncio.Task[None]) -> None: + try: + task.result() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + # Mark the task as done and remove it from the active tasks set + # This happens only after the task has completed + if self._queue is not None: + self._queue.task_done() + self._active_tasks.discard(task) diff --git a/tests/integrations/asyncio/test_asyncio.py b/tests/integrations/asyncio/test_asyncio.py index 2ae71f8f43..5c329f8185 100644 --- a/tests/integrations/asyncio/test_asyncio.py +++ b/tests/integrations/asyncio/test_asyncio.py @@ -377,3 +377,52 @@ async def test_span_origin( assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.function.asyncio" + + +@minimum_python_38 +def test_loop_close_patching(sentry_init): + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + assert not hasattr(loop, "_sentry_flush_patched") + AsyncioIntegration.setup_once() + assert hasattr(loop, "_sentry_flush_patched") + assert loop._sentry_flush_patched is True + + finally: + if not loop.is_closed(): + loop.close() + + +@minimum_python_38 +def test_loop_close_flushes_async_transport(sentry_init): + from sentry_sdk.transport import AsyncHttpTransport + from unittest.mock import Mock, AsyncMock + + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + AsyncioIntegration.setup_once() + + mock_client = Mock() + mock_transport = Mock(spec=AsyncHttpTransport) + mock_client.transport = mock_transport + mock_client.close = AsyncMock(return_value=None) + + with patch("sentry_sdk.get_client", return_value=mock_client): + loop.close() + + mock_client.close.assert_called_once() + mock_client.close.assert_awaited_once() + + except Exception: + if not loop.is_closed(): + loop.close() diff --git a/tests/test_client.py b/tests/test_client.py index b69a6a0f3f..990e27e8b5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -23,9 +23,11 @@ from sentry_sdk.spotlight import DEFAULT_SPOTLIGHT_URL from sentry_sdk.utils import capture_internal_exception from sentry_sdk.integrations.executing import ExecutingIntegration -from sentry_sdk.transport import Transport +from sentry_sdk.integrations.asyncio import AsyncioIntegration +from sentry_sdk.transport import Transport, AsyncHttpTransport from sentry_sdk.serializer import MAX_DATABAG_BREADTH from sentry_sdk.consts import DEFAULT_MAX_BREADCRUMBS, DEFAULT_MAX_VALUE_LENGTH +from sentry_sdk._compat import PY38 from sentry_sdk.types import Event @@ -1492,3 +1494,323 @@ def test_keep_alive(env_value, arg_value, expected_value): ) assert transport_cls.options["keep_alive"] is expected_value + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "https://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + # NO_PROXY testcases + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "env_no_proxy": "example.com,sentry.io", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + "arg_proxy_headers": {"Test-Header": "foo-bar"}, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_proxy(monkeypatch, testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + if testcase["env_http_proxy"] is not None: + monkeypatch.setenv("HTTP_PROXY", testcase["env_http_proxy"]) + if testcase["env_https_proxy"] is not None: + monkeypatch.setenv("HTTPS_PROXY", testcase["env_https_proxy"]) + if testcase.get("env_no_proxy") is not None: + monkeypatch.setenv("NO_PROXY", testcase["env_no_proxy"]) + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + if testcase.get("arg_proxy_headers") is not None: + kwargs["proxy_headers"] = testcase["arg_proxy_headers"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + proxy = getattr( + client.transport._pool, + "proxy", + getattr(client.transport._pool, "_proxy_url", None), + ) + if testcase["expected_proxy_scheme"] is None: + assert proxy is None + else: + scheme = ( + proxy.scheme.decode("ascii") + if isinstance(proxy.scheme, bytes) + else proxy.scheme + ) + assert scheme == testcase["expected_proxy_scheme"] + + if testcase.get("arg_proxy_headers") is not None: + proxy_headers = dict( + (k.decode("ascii"), v.decode("ascii")) + for k, v in client.transport._pool._proxy_headers + ) + assert proxy_headers == testcase["arg_proxy_headers"] + + await client.close() + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": False, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4a://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5h://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4a://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5h://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5://localhost/123", + "should_be_socks_proxy": True, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_socks_proxy(testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + assert ("socks" in str(type(client.transport._pool)).lower()) == testcase[ + "should_be_socks_proxy" + ], ( + f"Expected {kwargs} to result in SOCKS == {testcase['should_be_socks_proxy']}" + f"but got {str(type(client.transport._pool))}" + ) + + await client.close() diff --git a/tests/test_transport.py b/tests/test_transport.py index 300251fc0c..ce23c97c94 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -3,6 +3,8 @@ import os import socket import sys +import asyncio +import threading from collections import defaultdict from datetime import datetime, timedelta, timezone from unittest import mock @@ -28,8 +30,10 @@ from sentry_sdk.transport import ( KEEP_ALIVE_SOCKET_OPTIONS, _parse_rate_limits, + AsyncHttpTransport, ) from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger +from sentry_sdk.integrations.asyncio import AsyncioIntegration server = None @@ -145,6 +149,89 @@ def test_transport_works( assert any("Sending envelope" in record.msg for record in caplog.records) == debug +@pytest.mark.asyncio +@pytest.mark.parametrize("debug", (True, False)) +@pytest.mark.parametrize("client_flush_method", ["close", "flush"]) +@pytest.mark.parametrize("use_pickle", (True, False)) +@pytest.mark.parametrize("compression_level", (0, 9, None)) +@pytest.mark.parametrize("compression_algo", ("gzip", "br", "", None)) +@pytest.mark.skipif(not PY38, reason="Async transport only supported in Python 3.8+") +async def test_transport_works_async( + capturing_server, + request, + capsys, + caplog, + debug, + make_client, + client_flush_method, + use_pickle, + compression_level, + compression_algo, +): + caplog.set_level(logging.DEBUG) + + experiments = {} + if compression_level is not None: + experiments["transport_compression_level"] = compression_level + + if compression_algo is not None: + experiments["transport_compression_algo"] = compression_algo + + # Enable async transport + experiments["transport_async"] = True + + client = make_client( + debug=debug, + _experiments=experiments, + integrations=[AsyncioIntegration()], + ) + + if use_pickle: + client = pickle.loads(pickle.dumps(client)) + + # Verify we're using async transport + assert isinstance( + client.transport, AsyncHttpTransport + ), "Expected AsyncHttpTransport" + + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + + add_breadcrumb( + level="info", message="i like bread", timestamp=datetime.now(timezone.utc) + ) + capture_message("löl") + + if client_flush_method == "close": + await client.close(timeout=2.0) + if client_flush_method == "flush": + await client.flush(timeout=2.0) + + out, err = capsys.readouterr() + assert not err and not out + assert capturing_server.captured + should_compress = ( + # default is to compress with brotli if available, gzip otherwise + (compression_level is None) + or ( + # setting compression level to 0 means don't compress + compression_level + > 0 + ) + ) and ( + # if we couldn't resolve to a known algo, we don't compress + compression_algo + != "" + ) + + assert capturing_server.captured[0].compressed == should_compress + # After flush, the worker task is still running, but the end of the test will shut down the event loop + # Therefore, we need to explicitly close the client to clean up the worker task + assert any("Sending envelope" in record.msg for record in caplog.records) == debug + if client_flush_method == "flush": + await client.close(timeout=2.0) + + @pytest.mark.parametrize( "num_pools,expected_num_pools", ( @@ -640,3 +727,216 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_ "reason": "test", "quantity": span_count + 1, } in discarded_events + + +def test_handle_unexpected_status_invokes_handle_request_error( + make_client, monkeypatch +): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + def stub_request(method, endpoint, body=None, headers=None): + class MockResponse: + def __init__(self): + self.status = 500 # Integer + self.data = b"server error" + self.headers = {} + + def close(self): + pass + + return MockResponse() + + monkeypatch.setattr(transport, "_request", stub_request) + + seen = [] + monkeypatch.setattr( + transport, + "_handle_request_error", + lambda envelope, loss_reason: seen.append(loss_reason), + ) + + client.capture_event({"message": "test"}) + client.flush() + + assert seen == ["status_500"] + + +def test_handle_request_error_basic_coverage(make_client, monkeypatch): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + # Track method calls + calls = [] + + def mock_on_dropped_event(reason): + calls.append(("on_dropped_event", reason)) + + def mock_record_lost_event(reason, data_category=None, item=None): + calls.append(("record_lost_event", reason, data_category, item)) + + monkeypatch.setattr(transport, "on_dropped_event", mock_on_dropped_event) + monkeypatch.setattr(transport, "record_lost_event", mock_record_lost_event) + + # Test case 1: envelope is None + transport._handle_request_error(envelope=None, loss_reason="test_reason") + + assert len(calls) == 2 + assert calls[0] == ("on_dropped_event", "test_reason") + assert calls[1] == ("record_lost_event", "network_error", "error", None) + + # Reset + calls.clear() + + # Test case 2: envelope with items + envelope = Envelope() + envelope.add_item(mock.MagicMock()) # Simple mock item + envelope.add_item(mock.MagicMock()) # Another mock item + + transport._handle_request_error(envelope=envelope, loss_reason="connection_error") + + assert len(calls) == 3 + assert calls[0] == ("on_dropped_event", "connection_error") + assert calls[1][0:2] == ("record_lost_event", "network_error") + assert calls[2][0:2] == ("record_lost_event", "network_error") + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_background_thread_capture( + capturing_server, make_client, caplog +): + """Test capture_envelope from background threads uses run_coroutine_threadsafe""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + captured_from_thread = [] + exception_from_thread = [] + + def background_thread_work(): + try: + # This should use run_coroutine_threadsafe path + capture_message("from background thread") + captured_from_thread.append(True) + except Exception as e: + exception_from_thread.append(e) + + thread = threading.Thread(target=background_thread_work) + thread.start() + thread.join() + assert not exception_from_thread + assert captured_from_thread + await client.close(timeout=2.0) + assert capturing_server.captured + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_event_loop_closed_scenario( + capturing_server, make_client, caplog +): + """Test behavior when trying to capture after event loop context ends""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + sentry_sdk.get_global_scope().set_client(client) + original_loop = client.transport.loop + + with mock.patch("asyncio.get_running_loop", side_effect=RuntimeError("no loop")): + with mock.patch.object(client.transport.loop, "is_running", return_value=False): + with mock.patch("sentry_sdk.transport.logger") as mock_logger: + # This should trigger the "no_async_context" path + capture_message("after loop closed") + + mock_logger.warning.assert_called_with( + "Async Transport is not running in an event loop." + ) + + client.transport.loop = original_loop + await client.close(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_concurrent_requests( + capturing_server, make_client, caplog +): + """Test multiple simultaneous envelope submissions""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + + num_messages = 15 + + async def send_message(i): + capture_message(f"concurrent message {i}") + + tasks = [send_message(i) for i in range(num_messages)] + await asyncio.gather(*tasks) + transport = client.transport + await client.close(timeout=2.0) + assert len(transport.background_tasks) == 0 + assert len(capturing_server.captured) == num_messages + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_rate_limiting_with_concurrency( + capturing_server, make_client, request +): + """Test async transport rate limiting with concurrent requests""" + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + capturing_server.respond_with( + code=429, headers={"X-Sentry-Rate-Limits": "60:error:organization"} + ) + + # Send one request first to trigger rate limiting + capture_message("initial message") + await asyncio.sleep(0.1) # Wait for request to execute + assert client.transport._check_disabled("error") is True + capturing_server.clear_captured() + + async def send_message(i): + capture_message(f"message {i}") + await asyncio.sleep(0.01) + + await asyncio.gather(*[send_message(i) for i in range(5)]) + await asyncio.sleep(0.1) + # New request should be dropped due to rate limiting + assert len(capturing_server.captured) == 0 + await client.close(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_two_way_ssl_authentication(): + current_dir = os.path.dirname(__file__) + cert_file = f"{current_dir}/test.pem" + key_file = f"{current_dir}/test.key" + + client = Client( + "https://foo@sentry.io/123", + cert_file=cert_file, + key_file=key_file, + _experiments={"transport_async": True}, + integrations=[AsyncioIntegration()], + ) + assert isinstance(client.transport, AsyncHttpTransport) + + options = client.transport._get_pool_options() + assert options["ssl_context"] is not None + + await client.close() diff --git a/tox.ini b/tox.ini index ac35660ccb..30fdb7a17e 100644 --- a/tox.ini +++ b/tox.ini @@ -378,7 +378,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 @@ -686,10 +686,11 @@ deps = fastapi: pytest-asyncio fastapi: python-multipart fastapi: requests - fastapi: anyio<4 + fastapi: anyio>=3,<5 fastapi-v0.79.1: httpx<0.28.0 fastapi-v0.91.0: httpx<0.28.0 fastapi-v0.103.2: httpx<0.28.0 + fastapi-v0.79.1: anyio<4 py3.6-fastapi: aiocontextvars