diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index f8328cac12..7f89b01849 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -6,6 +6,7 @@ import socket import ssl import time +import asyncio from datetime import datetime, timedelta, timezone from collections import defaultdict from urllib.request import getproxies @@ -17,18 +18,27 @@ try: import httpcore +except ImportError: + httpcore = None # type: ignore + +try: import h2 # noqa: F401 - HTTP2_ENABLED = True + HTTP2_ENABLED = httpcore is not None except ImportError: HTTP2_ENABLED = False +try: + ASYNC_TRANSPORT_ENABLED = httpcore is not None +except ImportError: + ASYNC_TRANSPORT_ENABLED = False + import urllib3 import certifi from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker, Worker +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -224,9 +234,18 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 - def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: - # For now, we only support the threaded sync background worker. - return BackgroundWorker(queue_size=options["transport_queue_size"]) + def _create_worker(self, options: dict[str, Any]) -> Worker: + async_enabled = options.get("_experiments", {}).get("transport_async", False) + try: + asyncio.get_running_loop() + worker_cls = ( + AsyncWorker + if async_enabled and ASYNC_TRANSPORT_ENABLED + else BackgroundWorker + ) + except RuntimeError: + worker_cls = BackgroundWorker + return worker_cls(queue_size=options["transport_queue_size"]) def record_lost_event( self: Self, @@ -571,6 +590,240 @@ def flush( self._worker.flush(timeout, callback) +if not ASYNC_TRANSPORT_ENABLED: + # Sorry, no AsyncHttpTransport for you + class AsyncHttpTransport(BaseHttpTransport): + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + logger.warning( + "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." + ) + +else: + + class AsyncHttpTransport(HttpTransportCore): # type: ignore + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + # Requires event loop at init time + self.loop = asyncio.get_running_loop() + self.background_tasks: set[asyncio.Task[None]] = set() + + def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: + return next( + ( + val.decode("ascii") + for key, val in response.headers + if key.decode("ascii").lower() == header + ), + None, + ) + + async def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + async def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = await self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + await response.aclose() + + async def _request( # type: ignore[override] + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> httpcore.Response: + return await self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + content=body, + headers=headers, # type: ignore + extensions={ + "timeout": { + "pool": self.TIMEOUT, + "connect": self.TIMEOUT, + "write": self.TIMEOUT, + "read": self.TIMEOUT, + } + }, + ) + + async def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + + async def _capture_envelope(self: Self, envelope: Envelope) -> None: + async def send_envelope_wrapper() -> None: + with capture_internal_exceptions(): + await self._send_envelope(envelope) + await self._flush_client_reports() + + if not self._worker.submit(send_envelope_wrapper): + self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) + + def capture_envelope(self: Self, envelope: Envelope) -> None: + # Synchronous entry point + try: + asyncio.get_running_loop() + # We are on the main thread running the event loop + task = asyncio.create_task(self._capture_envelope(envelope)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + except RuntimeError: + # We are in a background thread, not running an event loop, + # have to launch the task on the loop in a threadsafe way. + if self.loop and self.loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._capture_envelope(envelope), + self.loop, + ) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("internal_sdk_error") + for item in envelope.items: + self.record_lost_event("internal_sdk_error", item=item) + + def flush( # type: ignore[override] + self: Self, + timeout: float, + callback: Optional[Callable[[int, float], None]] = None, + ) -> Optional[asyncio.Task[None]]: + logger.debug("Flushing HTTP transport") + + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) + return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] + return None + + def _get_pool_options(self: Self) -> Dict[str, Any]: + options: Dict[str, Any] = { + "http2": False, # no HTTP2 for now + "retries": 3, + } + + socket_options = ( + self.options["socket_options"] + if self.options["socket_options"] is not None + else [] + ) + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + options["socket_options"] = socket_options + + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") + key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") + if cert_file is not None: + ssl_context.load_cert_chain(cert_file, key_file) + + options["ssl_context"] = ssl_context + + return options + + def _make_pool( + self: Self, + ) -> Union[ + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, + ]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + try: + if "socket_options" in opts: + socket_options = opts.pop("socket_options") + if socket_options: + logger.warning( + "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." + ) + return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) + except RuntimeError: + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", + proxy, + ) + else: + return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + + return httpcore.AsyncConnectionPool(**opts) + + def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore + + logger.debug("Killing HTTP transport") + self._worker.kill() + for task in self.background_tasks: + task.cancel() + self.background_tasks.clear() + try: + # Return the pool cleanup task so caller can await it if needed + return self.loop.create_task(self._pool.aclose()) # type: ignore + except RuntimeError: + logger.warning("Event loop not running, aborting kill.") + return None + + class HttpTransport(BaseHttpTransport): if TYPE_CHECKING: _pool: Union[PoolManager, ProxyManager] @@ -816,11 +1069,18 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: ref_transport = options["transport"] use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) - + use_async_transport = options.get("_experiments", {}).get("transport_async", False) # By default, we use the http transport class - transport_cls: Type[Transport] = ( - Http2Transport if use_http2_transport else HttpTransport - ) + if use_async_transport: + try: + asyncio.get_running_loop() + transport_cls: Type[Transport] = AsyncHttpTransport + except RuntimeError: + # No event loop running, fall back to sync transport + logger.warning("No event loop running, falling back to sync transport.") + transport_cls = Http2Transport if use_http2_transport else HttpTransport + else: + transport_cls = Http2Transport if use_http2_transport else HttpTransport if isinstance(ref_transport, Transport): return ref_transport