From 9f24136a4eaf618a683ed63a78498fb6995036e4 Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 14 Jul 2025 14:18:36 +0200 Subject: [PATCH 01/42] ref(transport): Add abstract base class for worker implementation Add an abstract bass class for implementation of the background worker. This was done to provide a shared interface for the current implementation of a threaded worker in the sync context as well as the upcoming async task-based worker implementation. GH-4578 --- sentry_sdk/worker.py | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d911e15623..510376f381 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,4 +1,5 @@ from __future__ import annotations +from abc import ABC, abstractmethod import os import threading @@ -16,7 +17,48 @@ _TERMINATOR = object() -class BackgroundWorker: +class Worker(ABC): + """ + Base class for all workers. + + A worker is used to process events in the background and send them to Sentry. + """ + + @property + @abstractmethod + def is_alive(self) -> bool: + pass + + @abstractmethod + def kill(self) -> None: + pass + + @abstractmethod + def flush( + self, timeout: float, callback: Optional[Callable[[int, float], None]] = None + ) -> None: + """ + Flush the worker. + + This method blocks until the worker has flushed all events or the specified timeout is reached. + """ + pass + + @abstractmethod + def full(self) -> bool: + pass + + @abstractmethod + def submit(self, callback: Callable[[], None]) -> bool: + """ + Schedule a callback to be executed by the worker. + + Returns True if the callback was scheduled, False if the queue is full. + """ + pass + + +class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: Queue = Queue(queue_size) self._lock = threading.Lock() From 001f36cbf4cf4f8c40ee49e318f88911eaba7fbd Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 14 Jul 2025 14:23:23 +0200 Subject: [PATCH 02/42] ref(transport): Add _create_worker factory method to Transport Add a new factory method instead of direct instatiation of the threaded background worker. This allows for easy extension to other types of workers, such as the upcoming task-based async worker. GH-4578 --- sentry_sdk/transport.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index d612028250..f8328cac12 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -28,7 +28,7 @@ from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.worker import BackgroundWorker, Worker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: Transport.__init__(self, options) assert self.parsed_dsn is not None self.options: Dict[str, Any] = options - self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) + self._worker = self._create_worker(options) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until: Dict[Optional[str], datetime] = {} # We only use this Retry() class for the `get_retry_after` method it exposes @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 + def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: + # For now, we only support the threaded sync background worker. + return BackgroundWorker(queue_size=options["transport_queue_size"]) + def record_lost_event( self: Self, reason: str, From 401b1bcedc0f92322234f23cb45bc2a49715aec1 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 11:55:26 +0200 Subject: [PATCH 03/42] ref(worker): Add flush_async method to Worker ABC Add a new flush_async method to worker ABC. This is necessary because the async transport cannot use a synchronous blocking flush. GH-4578 --- sentry_sdk/worker.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 510376f381..f37f920fe3 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -33,7 +33,6 @@ def is_alive(self) -> bool: def kill(self) -> None: pass - @abstractmethod def flush( self, timeout: float, callback: Optional[Callable[[int, float], None]] = None ) -> None: @@ -41,8 +40,22 @@ def flush( Flush the worker. This method blocks until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. """ - pass + return None + + async def flush_async( + self, timeout: float, callback: Optional[Callable[[int, float], None]] = None + ) -> None: + """ + Flush the worker. + + This method can be awaited until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. + """ + return None @abstractmethod def full(self) -> bool: From 3f43d8fc0f464a10d568d86bf94ee0ec1346a2ab Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 12:43:04 +0200 Subject: [PATCH 04/42] ref(worker): Move worker flush_async from Worker ABC Move the flush_async down to the concrete subclass to not break existing testing. This makes sense, as this will only really be needed by the async worker anyway and therefore is not shared logic. GH-4578 --- sentry_sdk/worker.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index f37f920fe3..200a9ea914 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -45,18 +45,6 @@ def flush( """ return None - async def flush_async( - self, timeout: float, callback: Optional[Callable[[int, float], None]] = None - ) -> None: - """ - Flush the worker. - - This method can be awaited until the worker has flushed all events or the specified timeout is reached. - Default implementation is a no-op, since this method may only be relevant to some workers. - Subclasses should override this method if necessary. - """ - return None - @abstractmethod def full(self) -> bool: pass From 15fa295611edd059d79f2a6e0aedeb5db1707ca2 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 14:28:16 +0200 Subject: [PATCH 05/42] ref(worker): Amend function signature for coroutines Coroutines have a return value, however the current function signature for the worker methods does not accomodate for this. Therefore, this signature was changed. GH-4578 --- sentry_sdk/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 200a9ea914..7325455f8f 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -34,7 +34,7 @@ def kill(self) -> None: pass def flush( - self, timeout: float, callback: Optional[Callable[[int, float], None]] = None + self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None ) -> None: """ Flush the worker. @@ -50,7 +50,7 @@ def full(self) -> bool: pass @abstractmethod - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: """ Schedule a callback to be executed by the worker. @@ -149,7 +149,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_thread() try: self._queue.put_nowait(callback) From ef780f341f994f3230e95c488f53945296e5adb8 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 24 Jul 2025 15:20:55 +0200 Subject: [PATCH 06/42] ref(worker): Add missing docstrings to worker ABC GH-4578 --- sentry_sdk/worker.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 7325455f8f..555539dc3a 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -27,10 +27,21 @@ class Worker(ABC): @property @abstractmethod def is_alive(self) -> bool: + """ + Checks whether the worker is alive and running. + + Returns True if the worker is alive, False otherwise. + """ pass @abstractmethod def kill(self) -> None: + """ + Kills the worker. + + This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. + The worker will not be able to process any more events. + """ pass def flush( @@ -47,6 +58,11 @@ def flush( @abstractmethod def full(self) -> bool: + """ + Checks whether the worker's queue is full. + + Returns True if the queue is full, False otherwise. + """ pass @abstractmethod From f63e46fabdeffa4a59dc04e71f068f09be79ebf3 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 11:41:16 +0200 Subject: [PATCH 07/42] feat(transport): Add an async task-based worker for transport Add a new implementation of the worker interface, implementing the worker as an async task. This is to be used by the upcoming async transport. GH-4581 --- sentry_sdk/worker.py | 92 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 555539dc3a..d74e1ca2ce 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod import os import threading +import asyncio from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -186,3 +187,94 @@ def _target(self) -> None: finally: self._queue.task_done() sleep(0) + + +class AsyncWorker(Worker): + def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: + self._queue: asyncio.Queue = asyncio.Queue(queue_size) + self._task: Optional[asyncio.Task] = None + # Event loop needs to remain in the same process + self._task_for_pid: Optional[int] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + + @property + def is_alive(self) -> bool: + if self._task_for_pid != os.getpid(): + return False + if not self._task or not self._loop: + return False + return self._loop.is_running() and not self._task.done() + + def kill(self) -> None: + if self._task: + self._task.cancel() + self._task = None + self._task_for_pid = None + + def start(self) -> None: + if not self.is_alive: + try: + self._loop = asyncio.get_running_loop() + self._task = self._loop.create_task(self._target()) + self._task_for_pid = os.getpid() + except RuntimeError: + # There is no event loop running + self._loop = None + self._task = None + self._task_for_pid = None + + def full(self) -> bool: + return self._queue.full() + + def _ensure_task(self) -> None: + if not self.is_alive: + self.start() + + async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: + if not self._loop or not self._loop.is_running(): + return + + initial_timeout = min(0.1, timeout) + + # Timeout on the join + try: + await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + 1 + logger.debug("%d event(s) pending on flush", pending) + if callback is not None: + callback(pending, timeout) + + try: + remaining_timeout = timeout - initial_timeout + await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + 1 + logger.error("flush timed out, dropped %s events", pending) + + async def flush(self, timeout: float, callback: Optional[Any] = None) -> None: + logger.debug("background worker got flush request") + if self.is_alive and timeout > 0.0: + await self._wait_flush(timeout, callback) + logger.debug("background worker flushed") + + def submit(self, callback: Callable[[], None]) -> bool: + self._ensure_task() + + try: + self._queue.put_nowait(callback) + return True + except asyncio.QueueFull: + return False + + async def _target(self) -> None: + while True: + callback = await self._queue.get() + try: + callback() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + self._queue.task_done() + # Yield to let the event loop run other tasks + await asyncio.sleep(0) From 18042718e28908218f66a98ac1f1d4d8a0e877e9 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 11:58:20 +0200 Subject: [PATCH 08/42] ref(worker): Make worker work with new ABC interface Refactor the flush method in the async worker to use the async_flush coroutine. GH-4581 --- sentry_sdk/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d74e1ca2ce..c3e596185e 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -252,7 +252,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - async def flush(self, timeout: float, callback: Optional[Any] = None) -> None: + async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None: logger.debug("background worker got flush request") if self.is_alive and timeout > 0.0: await self._wait_flush(timeout, callback) From 11da869c06d3774026ec3f9d1010a091cbec05b3 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 14:00:33 +0200 Subject: [PATCH 09/42] fix(worker): Check if callbacks from worker queue are coroutines or functions Add a check to see wheter callbacks are awaitable coroutines or functions, as coroutines need to be awaited. GH-4581 --- sentry_sdk/worker.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index c3e596185e..5dce91953e 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,6 +3,7 @@ import os import threading import asyncio +import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -271,7 +272,12 @@ async def _target(self) -> None: while True: callback = await self._queue.get() try: - callback() + if inspect.iscoroutinefunction(callback): + # Callback is an async coroutine, need to await it + await callback() + else: + # Callback is a sync function, need to call it + callback() except Exception: logger.error("Failed processing job", exc_info=True) finally: From 779a0d6ac83f7192b9fbf31339c716ddab675a30 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 14:21:58 +0200 Subject: [PATCH 10/42] ref(worker): Amend return type of submit and flush to accomodate for coroutines Coroutines do not return None, therefore it is necessary to consider this in the callback parameter of the worker. Previously, only callbacks with return Type None were accepted. GH-4581 --- sentry_sdk/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 5dce91953e..aa9c5bf1c1 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -259,7 +259,7 @@ async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> N await self._wait_flush(timeout, callback) logger.debug("background worker flushed") - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_task() try: From 0895d234cf93c404d850104e9a492a69be76e61f Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 15:53:20 +0200 Subject: [PATCH 11/42] ref(worker): Add type parameters for AsyncWorker variables GH-4581 --- sentry_sdk/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index aa9c5bf1c1..90813e8544 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -192,8 +192,8 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue = asyncio.Queue(queue_size) - self._task: Optional[asyncio.Task] = None + self._queue: asyncio.Queue[Callable[[], Any]] = asyncio.Queue(queue_size) + self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None self._loop: Optional[asyncio.AbstractEventLoop] = None From bbf426bee0662be8f02ecc590808a8380ad4b7f5 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 17 Jul 2025 16:05:04 +0200 Subject: [PATCH 12/42] ref(worker): Remove loop upon killing worker GH-4581 --- sentry_sdk/worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 90813e8544..f4ae864d4d 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -211,6 +211,7 @@ def kill(self) -> None: self._task.cancel() self._task = None self._task_for_pid = None + self._loop = None def start(self) -> None: if not self.is_alive: From 744dc8acbe9a71e26953259802676672c11c569e Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 18 Jul 2025 11:59:30 +0200 Subject: [PATCH 13/42] feat(worker): Enable concurrent callbacks on async task worker Enable concurrent callbacks on async task worker by firing them as a task rather than awaiting them. A done callback handles the necessary queue and exception logic. GH-4581 --- sentry_sdk/worker.py | 44 +++++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index f4ae864d4d..c12e73c583 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -197,6 +197,8 @@ def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None self._loop: Optional[asyncio.AbstractEventLoop] = None + # Track active callback tasks so they have a strong reference and can be cancelled on kill + self._active_tasks: set[asyncio.Task] = set() @property def is_alive(self) -> bool: @@ -211,6 +213,12 @@ def kill(self) -> None: self._task.cancel() self._task = None self._task_for_pid = None + # Also cancel any active callback tasks + # Avoid modifying the set while cancelling tasks + tasks_to_cancel = set(self._active_tasks) + for task in tasks_to_cancel: + task.cancel() + self._active_tasks.clear() self._loop = None def start(self) -> None: @@ -272,16 +280,30 @@ def submit(self, callback: Callable[[], Any]) -> bool: async def _target(self) -> None: while True: callback = await self._queue.get() - try: - if inspect.iscoroutinefunction(callback): - # Callback is an async coroutine, need to await it - await callback() - else: - # Callback is a sync function, need to call it - callback() - except Exception: - logger.error("Failed processing job", exc_info=True) - finally: - self._queue.task_done() + # Firing tasks instead of awaiting them allows for concurrent requests + task = asyncio.create_task(self._process_callback(callback)) + # Create a strong reference to the task so it can be cancelled on kill + # and does not get garbage collected while running + self._active_tasks.add(task) + task.add_done_callback(self._on_task_complete) # Yield to let the event loop run other tasks await asyncio.sleep(0) + + async def _process_callback(self, callback: Callable[[], Any]) -> None: + if inspect.iscoroutinefunction(callback): + # Callback is an async coroutine, need to await it + await callback() + else: + # Callback is a sync function, need to call it + callback() + + def _on_task_complete(self, task: asyncio.Task[None]) -> None: + try: + task.result() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + # Mark the task as done and remove it from the active tasks set + # This happens only after the task has completed + self._queue.task_done() + self._active_tasks.discard(task) From fcc8040c31675cae02de137958da95634dedac4b Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 18 Jul 2025 12:21:22 +0200 Subject: [PATCH 14/42] fix(worker): Modify kill behaviour to mirror threaded worker Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async. GH-4581 --- sentry_sdk/worker.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index c12e73c583..91673d7859 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -192,7 +192,7 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Callable[[], Any]] = asyncio.Queue(queue_size) + self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size) self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None @@ -210,9 +210,10 @@ def is_alive(self) -> bool: def kill(self) -> None: if self._task: - self._task.cancel() - self._task = None - self._task_for_pid = None + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") # Also cancel any active callback tasks # Avoid modifying the set while cancelling tasks tasks_to_cancel = set(self._active_tasks) @@ -220,6 +221,8 @@ def kill(self) -> None: task.cancel() self._active_tasks.clear() self._loop = None + self._task = None + self._task_for_pid = None def start(self) -> None: if not self.is_alive: @@ -280,6 +283,9 @@ def submit(self, callback: Callable[[], Any]) -> bool: async def _target(self) -> None: while True: callback = await self._queue.get() + if callback is _TERMINATOR: + self._queue.task_done() + break # Firing tasks instead of awaiting them allows for concurrent requests task = asyncio.create_task(self._process_callback(callback)) # Create a strong reference to the task so it can be cancelled on kill From 9a43d9b7a07b4a5785d79c7d7c5d6a2ad3c5559d Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 21 Jul 2025 10:17:32 +0200 Subject: [PATCH 15/42] ref(worker): add proper type annotation to worker task list Add proper type annotation to worker task list to fix linting problems GH-4581 --- sentry_sdk/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 91673d7859..3491498b56 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -198,7 +198,7 @@ def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._task_for_pid: Optional[int] = None self._loop: Optional[asyncio.AbstractEventLoop] = None # Track active callback tasks so they have a strong reference and can be cancelled on kill - self._active_tasks: set[asyncio.Task] = set() + self._active_tasks: set[asyncio.Task[None]] = set() @property def is_alive(self) -> bool: From b5eda0e78ed8599c126c7e19def0749cd2fe0da5 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 14:07:24 +0200 Subject: [PATCH 16/42] ref(worker): Refactor implementation to incorporate feedback Refactor worker implementation to simplify callback processing, fix pending calculation and improve queue initialisation. GH-4581 --- sentry_sdk/worker.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 3491498b56..8f4625511a 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,7 +3,6 @@ import os import threading import asyncio -import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -192,7 +191,7 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size) + self._queue: asyncio.Queue[Any] = None self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None @@ -228,10 +227,13 @@ def start(self) -> None: if not self.is_alive: try: self._loop = asyncio.get_running_loop() + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._queue_size) self._task = self._loop.create_task(self._target()) self._task_for_pid = os.getpid() except RuntimeError: # There is no event loop running + logger.warning("No event loop running, async worker not started") self._loop = None self._task = None self._task_for_pid = None @@ -253,7 +255,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N try: await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) except asyncio.TimeoutError: - pending = self._queue.qsize() + 1 + pending = self._queue.qsize() + len(self._active_tasks) logger.debug("%d event(s) pending on flush", pending) if callback is not None: callback(pending, timeout) @@ -262,7 +264,7 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N remaining_timeout = timeout - initial_timeout await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) except asyncio.TimeoutError: - pending = self._queue.qsize() + 1 + pending = self._queue.qsize() + len(self._active_tasks) logger.error("flush timed out, dropped %s events", pending) async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None: @@ -296,12 +298,8 @@ async def _target(self) -> None: await asyncio.sleep(0) async def _process_callback(self, callback: Callable[[], Any]) -> None: - if inspect.iscoroutinefunction(callback): - # Callback is an async coroutine, need to await it - await callback() - else: - # Callback is a sync function, need to call it - callback() + # Callback is an async coroutine, need to await it + await callback() def _on_task_complete(self, task: asyncio.Task[None]) -> None: try: From 9e380b89f860d09d44ba7e087a8a3809804b2745 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 14:22:23 +0200 Subject: [PATCH 17/42] ref(worker): fix queue initialization GH-4581 --- sentry_sdk/worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 8f4625511a..e9ae58063d 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -191,7 +191,8 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Any] = None + self._queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=queue_size) + self._queue_size = queue_size self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process self._task_for_pid: Optional[int] = None From ee446215262b17987d418d0b480ebab863a559cd Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 14:50:29 +0200 Subject: [PATCH 18/42] ref(worker): Add queue as optional to allow for initialisation in start GH-4581 --- sentry_sdk/worker.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index e9ae58063d..5d620c4b83 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -191,7 +191,7 @@ def _target(self) -> None: class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=queue_size) + self._queue: Optional[asyncio.Queue[Any]] = None self._queue_size = queue_size self._task: Optional[asyncio.Task[None]] = None # Event loop needs to remain in the same process @@ -210,10 +210,11 @@ def is_alive(self) -> bool: def kill(self) -> None: if self._task: - try: - self._queue.put_nowait(_TERMINATOR) - except asyncio.QueueFull: - logger.debug("async worker queue full, kill failed") + if self._queue is not None: + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") # Also cancel any active callback tasks # Avoid modifying the set while cancelling tasks tasks_to_cancel = set(self._active_tasks) @@ -240,6 +241,8 @@ def start(self) -> None: self._task_for_pid = None def full(self) -> bool: + if self._queue is None: + return True return self._queue.full() def _ensure_task(self) -> None: @@ -247,7 +250,7 @@ def _ensure_task(self) -> None: self.start() async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: - if not self._loop or not self._loop.is_running(): + if not self._loop or not self._loop.is_running() or self._queue is None: return initial_timeout = min(0.1, timeout) @@ -276,7 +279,8 @@ async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> N def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_task() - + if self._queue is None: + return False try: self._queue.put_nowait(callback) return True @@ -284,6 +288,8 @@ def submit(self, callback: Callable[[], Any]) -> bool: return False async def _target(self) -> None: + if self._queue is None: + return while True: callback = await self._queue.get() if callback is _TERMINATOR: @@ -310,5 +316,6 @@ def _on_task_complete(self, task: asyncio.Task[None]) -> None: finally: # Mark the task as done and remove it from the active tasks set # This happens only after the task has completed - self._queue.task_done() + if self._queue is not None: + self._queue.task_done() self._active_tasks.discard(task) From d9f7383a7d83517393514bf82b450d1df57f78f9 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 15:01:40 +0200 Subject: [PATCH 19/42] ref(worker): Change to sync flush method that launches task GH-4581 --- sentry_sdk/worker.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 5d620c4b83..c8dbbb2d73 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -271,11 +271,10 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N pending = self._queue.qsize() + len(self._active_tasks) logger.error("flush timed out, dropped %s events", pending) - async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None: - logger.debug("background worker got flush request") - if self.is_alive and timeout > 0.0: - await self._wait_flush(timeout, callback) - logger.debug("background worker flushed") + def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override] + if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): + return self._loop.create_task(self._wait_flush(timeout, callback)) + return None def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_task() From d2e647b6553558524f777e9d4d6806e565f09762 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 15:56:37 +0200 Subject: [PATCH 20/42] ref(worker): Readd coroutine check for worker callbacks The flush method in the transport enqueues a sync callback for the worker, therefore the check needs to be here after all. GH-4581 --- sentry_sdk/worker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index c8dbbb2d73..ebf86f412e 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,6 +3,7 @@ import os import threading import asyncio +import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -305,7 +306,11 @@ async def _target(self) -> None: async def _process_callback(self, callback: Callable[[], Any]) -> None: # Callback is an async coroutine, need to await it - await callback() + if inspect.iscoroutinefunction(callback): + await callback() + else: + # Callback is a sync function, such as _flush_client_reports() + callback() def _on_task_complete(self, task: asyncio.Task[None]) -> None: try: From 859a0e2914c4b8d9a820b7e648dd777300d64e3a Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 31 Jul 2025 14:06:54 +0200 Subject: [PATCH 21/42] ref(worker): Remove sync callbacks from worker processing for now The callbacks passed to the worker from the transport are all async now, so this is currently not needed. GH-4581 --- sentry_sdk/worker.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index ebf86f412e..c8dbbb2d73 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -3,7 +3,6 @@ import os import threading import asyncio -import inspect from time import sleep, time from sentry_sdk._queue import Queue, FullError @@ -306,11 +305,7 @@ async def _target(self) -> None: async def _process_callback(self, callback: Callable[[], Any]) -> None: # Callback is an async coroutine, need to await it - if inspect.iscoroutinefunction(callback): - await callback() - else: - # Callback is a sync function, such as _flush_client_reports() - callback() + await callback() def _on_task_complete(self, task: asyncio.Task[None]) -> None: try: From 4a58ce78f9c649f62cb116293e707233d4f90c05 Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 21 Jul 2025 14:06:50 +0200 Subject: [PATCH 22/42] feat(transport): Add async transport class Add an implementation of Transport to work with the async background worker and HTTPCore async. GH-4582 --- sentry_sdk/transport.py | 182 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index f8328cac12..44a7c6eacb 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -6,6 +6,7 @@ import socket import ssl import time +import asyncio from datetime import datetime, timedelta, timezone from collections import defaultdict from urllib.request import getproxies @@ -571,6 +572,187 @@ def flush( self._worker.flush(timeout, callback) +class AsyncHttpTransport(HttpTransportCore): + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + # Requires event loop at init time + self._loop = asyncio.get_running_loop() + self.background_tasks = set() + + async def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is None: + return None + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + async def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = await self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + response.close() + + async def _request( + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> httpcore.Response: + return await self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + content=body, + headers=headers, # type: ignore + ) + + def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + + async def _capture_envelope(self: Self, envelope: Envelope) -> None: + async def send_envelope_wrapper() -> None: + with capture_internal_exceptions(): + await self._send_envelope(envelope) + self._flush_client_reports() + + if not self._worker.submit(send_envelope_wrapper): + self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) + + def capture_envelope(self: Self, envelope: Envelope) -> None: + # Synchronous entry point + if asyncio.get_running_loop() is not None: + # We are on the main thread running the event loop + task = asyncio.create_task(self._capture_envelope(envelope)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + else: + # We are in a background thread, not running an event loop, + # have to launch the task on the loop in a threadsafe way. + asyncio.run_coroutine_threadsafe( + self._capture_envelope(envelope), + self._loop, + ) + + async def flush_async( + self: Self, + timeout: float, + callback: Optional[Callable[[int, float], None]] = None, + ) -> None: + logger.debug("Flushing HTTP transport") + + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) + await self._worker.flush_async(timeout, callback) # type: ignore + + def _get_pool_options(self: Self) -> Dict[str, Any]: + options: Dict[str, Any] = { + "http2": False, # no HTTP2 for now + "retries": 3, + } + + socket_options = ( + self.options["socket_options"] + if self.options["socket_options"] is not None + else [] + ) + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + options["socket_options"] = socket_options + + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") + key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") + if cert_file is not None: + ssl_context.load_cert_chain(cert_file, key_file) + + options["ssl_context"] = ssl_context + + return options + + def _make_pool( + self: Self, + ) -> Union[ + httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool + ]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + try: + if "socket_options" in opts: + socket_options = opts.pop("socket_options") + if socket_options: + logger.warning( + "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." + ) + return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) + except RuntimeError: + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", + proxy, + ) + else: + return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + + return httpcore.AsyncConnectionPool(**opts) + + class HttpTransport(BaseHttpTransport): if TYPE_CHECKING: _pool: Union[PoolManager, ProxyManager] From cbecde7710dbbebdff06d283cce4b3744e9d727b Mon Sep 17 00:00:00 2001 From: srothh Date: Tue, 22 Jul 2025 10:31:45 +0200 Subject: [PATCH 23/42] ref(transport): Fix event loop handling in async transport Async Transport now properly checks for the presence of the event loop in capture_envelop, and drops items in case the event loop is no longer running for some reason. GH-4582 --- sentry_sdk/transport.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 44a7c6eacb..fd160f347a 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -29,7 +29,7 @@ from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker, Worker +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -225,9 +225,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 - def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: - # For now, we only support the threaded sync background worker. - return BackgroundWorker(queue_size=options["transport_queue_size"]) + def _create_worker(self, options: dict[str, Any]) -> Worker: + async_enabled = options.get("_experiments", {}).get("transport_async", False) + worker_cls = AsyncWorker if async_enabled else BackgroundWorker + return worker_cls(queue_size=options["transport_queue_size"]) def record_lost_event( self: Self, @@ -647,18 +648,26 @@ async def send_envelope_wrapper() -> None: def capture_envelope(self: Self, envelope: Envelope) -> None: # Synchronous entry point - if asyncio.get_running_loop() is not None: + try: + asyncio.get_running_loop() # We are on the main thread running the event loop task = asyncio.create_task(self._capture_envelope(envelope)) self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) - else: + except RuntimeError: # We are in a background thread, not running an event loop, # have to launch the task on the loop in a threadsafe way. - asyncio.run_coroutine_threadsafe( - self._capture_envelope(envelope), - self._loop, - ) + if self._loop and self._loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._capture_envelope(envelope), + self._loop, + ) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("no_async_context") + for item in envelope.items: + self.record_lost_event("no_async_context", item=item) async def flush_async( self: Self, @@ -998,11 +1007,13 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: ref_transport = options["transport"] use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) - + use_async_transport = options.get("_experiments", {}).get("transport_async", False) # By default, we use the http transport class - transport_cls: Type[Transport] = ( - Http2Transport if use_http2_transport else HttpTransport - ) + if use_async_transport and asyncio.get_running_loop() is not None: + transport_cls: Type[Transport] = AsyncHttpTransport + else: + use_http2 = use_http2_transport + transport_cls = Http2Transport if use_http2 else HttpTransport if isinstance(ref_transport, Transport): return ref_transport From c8bb55a777940b876870cf64ac2d56abe8715262 Mon Sep 17 00:00:00 2001 From: srothh Date: Tue, 22 Jul 2025 11:10:38 +0200 Subject: [PATCH 24/42] feat(transport): Add kill method for async transport Implement a kill method that properly shuts down the async transport. The httpcore async connection pool needs to be explicitly shutdown at the end of its usage. GH-4582 --- sentry_sdk/transport.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index fd160f347a..3a2c0cb2df 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -761,6 +761,16 @@ def _make_pool( return httpcore.AsyncConnectionPool(**opts) + def kill(self: Self) -> None: + + logger.debug("Killing HTTP transport") + self._worker.kill() + for task in self.background_tasks: + task.cancel() + self.background_tasks.clear() + + self._loop.create_task(self._pool.aclose()) # type: ignore + class HttpTransport(BaseHttpTransport): if TYPE_CHECKING: From 05a7de7921e0657d103b4d6c296287ad5788e846 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 23 Jul 2025 15:50:06 +0200 Subject: [PATCH 25/42] ref(transport): Fix type errors in async transport Fix type errors resulting from async override and missing type definition in the async transport. GH-4582 --- sentry_sdk/transport.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 3a2c0cb2df..71d563423d 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -578,7 +578,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: super().__init__(options) # Requires event loop at init time self._loop = asyncio.get_running_loop() - self.background_tasks = set() + self.background_tasks: set[asyncio.Task[None]] = set() async def _send_envelope(self: Self, envelope: Envelope) -> None: _prepared_envelope = self._prepare_envelope(envelope) @@ -616,7 +616,7 @@ async def _send_request( finally: response.close() - async def _request( + async def _request( # type: ignore[override] self: Self, method: str, endpoint_type: EndpointType, From 38246d03940c638b45c173f513eb61dc4e199cfb Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 24 Jul 2025 09:44:58 +0200 Subject: [PATCH 26/42] Add silent failing to kill on event loop errors Add a try/catch to ensure silent fail on kill in case the event loop shuts down. GH-4582 --- sentry_sdk/transport.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 71d563423d..7743910c79 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -768,8 +768,10 @@ def kill(self: Self) -> None: for task in self.background_tasks: task.cancel() self.background_tasks.clear() - - self._loop.create_task(self._pool.aclose()) # type: ignore + try: + self._loop.create_task(self._pool.aclose()) # type: ignore + except RuntimeError: + logger.warning("Event loop not running, aborting kill.") class HttpTransport(BaseHttpTransport): From 8b226cbefa3dd6870ae451b98af4af85c8bbb3e1 Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 24 Jul 2025 11:27:45 +0200 Subject: [PATCH 27/42] ref(transport): Fix event loop check in make_transport Fix the event loop check in make_transport so that it does not throw a runtime error but rather falls back correctly. GH-4582 --- sentry_sdk/transport.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 7743910c79..541f71ba53 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -1021,11 +1021,14 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) use_async_transport = options.get("_experiments", {}).get("transport_async", False) # By default, we use the http transport class - if use_async_transport and asyncio.get_running_loop() is not None: - transport_cls: Type[Transport] = AsyncHttpTransport - else: - use_http2 = use_http2_transport - transport_cls = Http2Transport if use_http2 else HttpTransport + if use_async_transport: + try: + asyncio.get_running_loop() + transport_cls: Type[Transport] = AsyncHttpTransport + except RuntimeError: + # No event loop running, fall back to sync transport + logger.warning("No event loop running, falling back to sync transport.") + transport_cls = Http2Transport if use_http2_transport else HttpTransport if isinstance(ref_transport, Transport): return ref_transport From 823215e41bb43926d982ea2d8ab74fdb76383c2b Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 24 Jul 2025 11:45:13 +0200 Subject: [PATCH 28/42] ref(transport): Add missing transport instantiation in non-async context GH-4582 --- sentry_sdk/transport.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 541f71ba53..832a5d5610 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -1029,6 +1029,8 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: # No event loop running, fall back to sync transport logger.warning("No event loop running, falling back to sync transport.") transport_cls = Http2Transport if use_http2_transport else HttpTransport + else: + transport_cls = Http2Transport if use_http2_transport else HttpTransport if isinstance(ref_transport, Transport): return ref_transport From 4eed4fd6e5e90891c2365b7be811d4b0e7867552 Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 25 Jul 2025 11:21:37 +0200 Subject: [PATCH 29/42] ref(transport): Fix httpcore async specific request handling GH-4582 --- sentry_sdk/transport.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 832a5d5610..aff2132941 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -580,6 +580,16 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: self._loop = asyncio.get_running_loop() self.background_tasks: set[asyncio.Task[None]] = set() + def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: + return next( + ( + val.decode("ascii") + for key, val in response.headers + if key.decode("ascii").lower() == header + ), + None, + ) + async def _send_envelope(self: Self, envelope: Envelope) -> None: _prepared_envelope = self._prepare_envelope(envelope) if _prepared_envelope is None: @@ -614,7 +624,7 @@ async def _send_request( try: self._handle_response(response=response, envelope=envelope) finally: - response.close() + await response.aclose() async def _request( # type: ignore[override] self: Self, From afd494d06ab71cfb2b35007ebdc831fd75dc7e74 Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 25 Jul 2025 12:49:26 +0200 Subject: [PATCH 30/42] ref(transport): Add gc safety to async kill GH-4582 --- sentry_sdk/transport.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index aff2132941..6644a7c4ec 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -779,7 +779,9 @@ def kill(self: Self) -> None: task.cancel() self.background_tasks.clear() try: - self._loop.create_task(self._pool.aclose()) # type: ignore + task = self._loop.create_task(self._pool.aclose()) # type: ignore + self.background_tasks.add(task) + task.add_done_callback(lambda t: self.background_tasks.discard(t)) except RuntimeError: logger.warning("Event loop not running, aborting kill.") From fcc7ac34d11d0143e9cff06796121bd0e0484818 Mon Sep 17 00:00:00 2001 From: srothh Date: Fri, 25 Jul 2025 13:06:14 +0200 Subject: [PATCH 31/42] ref(transport): Add missing httpcore extensions GH-4582 --- sentry_sdk/transport.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 6644a7c4ec..7793bc020d 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -638,6 +638,14 @@ async def _request( # type: ignore[override] self._auth.get_api_url(endpoint_type), content=body, headers=headers, # type: ignore + extensions={ + "timeout": { + "pool": self.TIMEOUT, + "connect": self.TIMEOUT, + "write": self.TIMEOUT, + "read": self.TIMEOUT, + } + }, ) def _flush_client_reports(self: Self, force: bool = False) -> None: From f659514e6b465afb1240a23356935f0aaf443096 Mon Sep 17 00:00:00 2001 From: srothh Date: Mon, 28 Jul 2025 10:17:17 +0200 Subject: [PATCH 32/42] fix(transport): Fix fallback sync transport creating async worker GH-4582 --- sentry_sdk/transport.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 7793bc020d..0986613cb9 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -227,7 +227,11 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: def _create_worker(self, options: dict[str, Any]) -> Worker: async_enabled = options.get("_experiments", {}).get("transport_async", False) - worker_cls = AsyncWorker if async_enabled else BackgroundWorker + try: + asyncio.get_running_loop() + worker_cls = AsyncWorker if async_enabled else BackgroundWorker + except RuntimeError: + worker_cls = BackgroundWorker return worker_cls(queue_size=options["transport_queue_size"]) def record_lost_event( From 8c542ce10577cf17c173f78cb93d4cab59dc9899 Mon Sep 17 00:00:00 2001 From: srothh Date: Tue, 29 Jul 2025 13:12:12 +0200 Subject: [PATCH 33/42] ref(transport): Make kill optionally return a task for async Make kill optionally return a task for async transport. This allows for a blocking kill operation if the caller is in an async context. GH-4582 --- sentry_sdk/transport.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 0986613cb9..36a779a210 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -783,7 +783,7 @@ def _make_pool( return httpcore.AsyncConnectionPool(**opts) - def kill(self: Self) -> None: + def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore logger.debug("Killing HTTP transport") self._worker.kill() @@ -791,11 +791,11 @@ def kill(self: Self) -> None: task.cancel() self.background_tasks.clear() try: - task = self._loop.create_task(self._pool.aclose()) # type: ignore - self.background_tasks.add(task) - task.add_done_callback(lambda t: self.background_tasks.discard(t)) + # Return the pool cleanup task so caller can await it if needed + return self._loop.create_task(self._pool.aclose()) # type: ignore except RuntimeError: logger.warning("Event loop not running, aborting kill.") + return None class HttpTransport(BaseHttpTransport): From 30dde6714d54ba1ef279a507ea6e84c1f8b2c487 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 15:12:20 +0200 Subject: [PATCH 34/42] ref(transport): Adapt transport for synchronous flush interface GH-4582 --- sentry_sdk/transport.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 36a779a210..bf08b1d1e4 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -691,16 +691,17 @@ def capture_envelope(self: Self, envelope: Envelope) -> None: for item in envelope.items: self.record_lost_event("no_async_context", item=item) - async def flush_async( + def flush( # type: ignore[override] self: Self, timeout: float, callback: Optional[Callable[[int, float], None]] = None, - ) -> None: + ) -> Optional[asyncio.Task[None]]: logger.debug("Flushing HTTP transport") if timeout > 0: self._worker.submit(lambda: self._flush_client_reports(force=True)) - await self._worker.flush_async(timeout, callback) # type: ignore + return self._worker.flush(timeout, callback) + return None def _get_pool_options(self: Self) -> Dict[str, Any]: options: Dict[str, Any] = { From 3392e0e1694a71f3067f52e3b1a7da94277382b3 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 15:17:18 +0200 Subject: [PATCH 35/42] ref(transport): Fix mypy error GH-4582 --- sentry_sdk/transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index bf08b1d1e4..277e0f9597 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -700,7 +700,7 @@ def flush( # type: ignore[override] if timeout > 0: self._worker.submit(lambda: self._flush_client_reports(force=True)) - return self._worker.flush(timeout, callback) + return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] return None def _get_pool_options(self: Self) -> Dict[str, Any]: From 6c85500e93cc43d1d02a749b70d3d1d2d90b2ec0 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 16:21:51 +0200 Subject: [PATCH 36/42] ref(transport): Make transport loop public GH-4582 --- sentry_sdk/transport.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 277e0f9597..fadb20a909 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -581,7 +581,7 @@ class AsyncHttpTransport(HttpTransportCore): def __init__(self: Self, options: Dict[str, Any]) -> None: super().__init__(options) # Requires event loop at init time - self._loop = asyncio.get_running_loop() + self.loop = asyncio.get_running_loop() self.background_tasks: set[asyncio.Task[None]] = set() def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: @@ -679,10 +679,10 @@ def capture_envelope(self: Self, envelope: Envelope) -> None: except RuntimeError: # We are in a background thread, not running an event loop, # have to launch the task on the loop in a threadsafe way. - if self._loop and self._loop.is_running(): + if self.loop and self.loop.is_running(): asyncio.run_coroutine_threadsafe( self._capture_envelope(envelope), - self._loop, + self.loop, ) else: # The event loop is no longer running @@ -793,7 +793,7 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore self.background_tasks.clear() try: # Return the pool cleanup task so caller can await it if needed - return self._loop.create_task(self._pool.aclose()) # type: ignore + return self.loop.create_task(self._pool.aclose()) # type: ignore except RuntimeError: logger.warning("Event loop not running, aborting kill.") return None From ae5a864f037d3016ec210993d2aac0e0bc4a2701 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 16:52:24 +0200 Subject: [PATCH 37/42] ref(transport): Add import checking for async transport GH-4582 --- sentry_sdk/transport.py | 414 +++++++++++++++++++++------------------- 1 file changed, 218 insertions(+), 196 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index fadb20a909..6158868a16 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -23,6 +23,15 @@ HTTP2_ENABLED = True except ImportError: HTTP2_ENABLED = False + httpcore = None + +try: + import httpcore # noqa: F401 + import anyio # noqa: F401 + + ASYNC_TRANSPORT_ENABLED = True +except ImportError: + ASYNC_TRANSPORT_ENABLED = False import urllib3 import certifi @@ -577,226 +586,239 @@ def flush( self._worker.flush(timeout, callback) -class AsyncHttpTransport(HttpTransportCore): - def __init__(self: Self, options: Dict[str, Any]) -> None: - super().__init__(options) - # Requires event loop at init time - self.loop = asyncio.get_running_loop() - self.background_tasks: set[asyncio.Task[None]] = set() - - def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: - return next( - ( - val.decode("ascii") - for key, val in response.headers - if key.decode("ascii").lower() == header - ), - None, - ) - - async def _send_envelope(self: Self, envelope: Envelope) -> None: - _prepared_envelope = self._prepare_envelope(envelope) - if _prepared_envelope is None: - return None - envelope, body, headers = _prepared_envelope - await self._send_request( - body.getvalue(), - headers=headers, - endpoint_type=EndpointType.ENVELOPE, - envelope=envelope, - ) - return None - - async def _send_request( - self: Self, - body: bytes, - headers: Dict[str, str], - endpoint_type: EndpointType, - envelope: Optional[Envelope], - ) -> None: - self._update_headers(headers) - try: - response = await self._request( - "POST", - endpoint_type, - body, - headers, +if not ASYNC_TRANSPORT_ENABLED: + # Sorry, no AsyncHttpTransport for you + class AsyncHttpTransport(BaseHttpTransport): + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + logger.warning( + "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." ) - except Exception: - self._handle_request_error(envelope=envelope, loss_reason="network") - raise - try: - self._handle_response(response=response, envelope=envelope) - finally: - await response.aclose() - async def _request( # type: ignore[override] - self: Self, - method: str, - endpoint_type: EndpointType, - body: Any, - headers: Mapping[str, str], - ) -> httpcore.Response: - return await self._pool.request( - method, - self._auth.get_api_url(endpoint_type), - content=body, - headers=headers, # type: ignore - extensions={ - "timeout": { - "pool": self.TIMEOUT, - "connect": self.TIMEOUT, - "write": self.TIMEOUT, - "read": self.TIMEOUT, - } - }, - ) +else: - def _flush_client_reports(self: Self, force: bool = False) -> None: - client_report = self._fetch_pending_client_report(force=force, interval=60) - if client_report is not None: - self.capture_envelope(Envelope(items=[client_report])) + class AsyncHttpTransport(HttpTransportCore): + def __init__(self: Self, options: Dict[str, Any]) -> None: + super().__init__(options) + # Requires event loop at init time + self.loop = asyncio.get_running_loop() + self.background_tasks: set[asyncio.Task[None]] = set() - async def _capture_envelope(self: Self, envelope: Envelope) -> None: - async def send_envelope_wrapper() -> None: - with capture_internal_exceptions(): - await self._send_envelope(envelope) - self._flush_client_reports() + def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: + return next( + ( + val.decode("ascii") + for key, val in response.headers + if key.decode("ascii").lower() == header + ), + None, + ) - if not self._worker.submit(send_envelope_wrapper): - self.on_dropped_event("full_queue") - for item in envelope.items: - self.record_lost_event("queue_overflow", item=item) + async def _send_envelope(self: Self, envelope: Envelope) -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is None: + return None + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None - def capture_envelope(self: Self, envelope: Envelope) -> None: - # Synchronous entry point - try: - asyncio.get_running_loop() - # We are on the main thread running the event loop - task = asyncio.create_task(self._capture_envelope(envelope)) - self.background_tasks.add(task) - task.add_done_callback(self.background_tasks.discard) - except RuntimeError: - # We are in a background thread, not running an event loop, - # have to launch the task on the loop in a threadsafe way. - if self.loop and self.loop.is_running(): - asyncio.run_coroutine_threadsafe( - self._capture_envelope(envelope), - self.loop, + async def _send_request( + self: Self, + body: bytes, + headers: Dict[str, str], + endpoint_type: EndpointType, + envelope: Optional[Envelope], + ) -> None: + self._update_headers(headers) + try: + response = await self._request( + "POST", + endpoint_type, + body, + headers, ) - else: - # The event loop is no longer running - logger.warning("Async Transport is not running in an event loop.") - self.on_dropped_event("no_async_context") - for item in envelope.items: - self.record_lost_event("no_async_context", item=item) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + await response.aclose() + + async def _request( # type: ignore[override] + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> httpcore.Response: + return await self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + content=body, + headers=headers, # type: ignore + extensions={ + "timeout": { + "pool": self.TIMEOUT, + "connect": self.TIMEOUT, + "write": self.TIMEOUT, + "read": self.TIMEOUT, + } + }, + ) - def flush( # type: ignore[override] - self: Self, - timeout: float, - callback: Optional[Callable[[int, float], None]] = None, - ) -> Optional[asyncio.Task[None]]: - logger.debug("Flushing HTTP transport") + def _flush_client_reports(self: Self, force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) - if timeout > 0: - self._worker.submit(lambda: self._flush_client_reports(force=True)) - return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] - return None - - def _get_pool_options(self: Self) -> Dict[str, Any]: - options: Dict[str, Any] = { - "http2": False, # no HTTP2 for now - "retries": 3, - } + async def _capture_envelope(self: Self, envelope: Envelope) -> None: + async def send_envelope_wrapper() -> None: + with capture_internal_exceptions(): + await self._send_envelope(envelope) + self._flush_client_reports() - socket_options = ( - self.options["socket_options"] - if self.options["socket_options"] is not None - else [] - ) + if not self._worker.submit(send_envelope_wrapper): + self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) + + def capture_envelope(self: Self, envelope: Envelope) -> None: + # Synchronous entry point + try: + asyncio.get_running_loop() + # We are on the main thread running the event loop + task = asyncio.create_task(self._capture_envelope(envelope)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + except RuntimeError: + # We are in a background thread, not running an event loop, + # have to launch the task on the loop in a threadsafe way. + if self.loop and self.loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._capture_envelope(envelope), + self.loop, + ) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("no_async_context") + for item in envelope.items: + self.record_lost_event("no_async_context", item=item) - used_options = {(o[0], o[1]) for o in socket_options} - for default_option in KEEP_ALIVE_SOCKET_OPTIONS: - if (default_option[0], default_option[1]) not in used_options: - socket_options.append(default_option) + def flush( # type: ignore[override] + self: Self, + timeout: float, + callback: Optional[Callable[[int, float], None]] = None, + ) -> Optional[asyncio.Task[None]]: + logger.debug("Flushing HTTP transport") + + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) + return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] + return None - options["socket_options"] = socket_options + def _get_pool_options(self: Self) -> Dict[str, Any]: + options: Dict[str, Any] = { + "http2": False, # no HTTP2 for now + "retries": 3, + } - ssl_context = ssl.create_default_context() - ssl_context.load_verify_locations( - self.options["ca_certs"] # User-provided bundle from the SDK init - or os.environ.get("SSL_CERT_FILE") - or os.environ.get("REQUESTS_CA_BUNDLE") - or certifi.where() - ) - cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") - key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") - if cert_file is not None: - ssl_context.load_cert_chain(cert_file, key_file) + socket_options = ( + self.options["socket_options"] + if self.options["socket_options"] is not None + else [] + ) - options["ssl_context"] = ssl_context + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) - return options + options["socket_options"] = socket_options - def _make_pool( - self: Self, - ) -> Union[ - httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool - ]: - if self.parsed_dsn is None: - raise ValueError("Cannot create HTTP-based transport without valid DSN") - proxy = None - no_proxy = self._in_no_proxy(self.parsed_dsn) + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") + key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") + if cert_file is not None: + ssl_context.load_cert_chain(cert_file, key_file) - # try HTTPS first - https_proxy = self.options["https_proxy"] - if self.parsed_dsn.scheme == "https" and (https_proxy != ""): - proxy = https_proxy or (not no_proxy and getproxies().get("https")) + options["ssl_context"] = ssl_context - # maybe fallback to HTTP proxy - http_proxy = self.options["http_proxy"] - if not proxy and (http_proxy != ""): - proxy = http_proxy or (not no_proxy and getproxies().get("http")) + return options - opts = self._get_pool_options() + def _make_pool( + self: Self, + ) -> Union[ + httpcore.AsyncSOCKSProxy, + httpcore.AsyncHTTPProxy, + httpcore.AsyncConnectionPool, + ]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) - if proxy: - proxy_headers = self.options["proxy_headers"] - if proxy_headers: - opts["proxy_headers"] = proxy_headers + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) - if proxy.startswith("socks"): - try: - if "socket_options" in opts: - socket_options = opts.pop("socket_options") - if socket_options: - logger.warning( - "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." - ) - return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) - except RuntimeError: - logger.warning( - "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", - proxy, - ) - else: - return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) - return httpcore.AsyncConnectionPool(**opts) + opts = self._get_pool_options() - def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers - logger.debug("Killing HTTP transport") - self._worker.kill() - for task in self.background_tasks: - task.cancel() - self.background_tasks.clear() - try: - # Return the pool cleanup task so caller can await it if needed - return self.loop.create_task(self._pool.aclose()) # type: ignore - except RuntimeError: - logger.warning("Event loop not running, aborting kill.") - return None + if proxy.startswith("socks"): + try: + if "socket_options" in opts: + socket_options = opts.pop("socket_options") + if socket_options: + logger.warning( + "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." + ) + return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) + except RuntimeError: + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", + proxy, + ) + else: + return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + + return httpcore.AsyncConnectionPool(**opts) + + def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore + + logger.debug("Killing HTTP transport") + self._worker.kill() + for task in self.background_tasks: + task.cancel() + self.background_tasks.clear() + try: + # Return the pool cleanup task so caller can await it if needed + return self.loop.create_task(self._pool.aclose()) # type: ignore + except RuntimeError: + logger.warning("Event loop not running, aborting kill.") + return None class HttpTransport(BaseHttpTransport): From 9c537e6cb800287e10a13bcf5140195f38aa9099 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 16:58:26 +0200 Subject: [PATCH 38/42] ref(transport): Fix typing errors GH-4582 --- sentry_sdk/transport.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 6158868a16..3049e88e8f 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -23,7 +23,7 @@ HTTP2_ENABLED = True except ImportError: HTTP2_ENABLED = False - httpcore = None + httpcore = None # type: ignore try: import httpcore # noqa: F401 @@ -597,7 +597,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: else: - class AsyncHttpTransport(HttpTransportCore): + class AsyncHttpTransport(HttpTransportCore): # type: ignore def __init__(self: Self, options: Dict[str, Any]) -> None: super().__init__(options) # Requires event loop at init time From f7554b206686b69dee1afa7d728d474fdbda42e1 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 30 Jul 2025 17:04:56 +0200 Subject: [PATCH 39/42] ref(transport): Fix import checking GH-4582 --- sentry_sdk/transport.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 3049e88e8f..3f28b38d19 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -18,18 +18,18 @@ try: import httpcore +except ImportError: + httpcore = None # type: ignore + +try: import h2 # noqa: F401 - HTTP2_ENABLED = True + HTTP2_ENABLED = httpcore is not None except ImportError: HTTP2_ENABLED = False - httpcore = None # type: ignore try: - import httpcore # noqa: F401 - import anyio # noqa: F401 - - ASYNC_TRANSPORT_ENABLED = True + ASYNC_TRANSPORT_ENABLED = httpcore is not None except ImportError: ASYNC_TRANSPORT_ENABLED = False @@ -238,7 +238,11 @@ def _create_worker(self, options: dict[str, Any]) -> Worker: async_enabled = options.get("_experiments", {}).get("transport_async", False) try: asyncio.get_running_loop() - worker_cls = AsyncWorker if async_enabled else BackgroundWorker + worker_cls = ( + AsyncWorker + if async_enabled and ASYNC_TRANSPORT_ENABLED + else BackgroundWorker + ) except RuntimeError: worker_cls = BackgroundWorker return worker_cls(queue_size=options["transport_queue_size"]) From 6cb72adc672de01551a9e251d51eba02e65b3c7b Mon Sep 17 00:00:00 2001 From: srothh Date: Thu, 31 Jul 2025 14:01:15 +0200 Subject: [PATCH 40/42] ref(transport): Refactor async transport to be more aligned with sync GH-4582 --- sentry_sdk/transport.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 3f28b38d19..7f89b01849 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -620,15 +620,14 @@ def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: async def _send_envelope(self: Self, envelope: Envelope) -> None: _prepared_envelope = self._prepare_envelope(envelope) - if _prepared_envelope is None: - return None - envelope, body, headers = _prepared_envelope - await self._send_request( - body.getvalue(), - headers=headers, - endpoint_type=EndpointType.ENVELOPE, - envelope=envelope, - ) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) return None async def _send_request( @@ -676,7 +675,7 @@ async def _request( # type: ignore[override] }, ) - def _flush_client_reports(self: Self, force: bool = False) -> None: + async def _flush_client_reports(self: Self, force: bool = False) -> None: client_report = self._fetch_pending_client_report(force=force, interval=60) if client_report is not None: self.capture_envelope(Envelope(items=[client_report])) @@ -685,7 +684,7 @@ async def _capture_envelope(self: Self, envelope: Envelope) -> None: async def send_envelope_wrapper() -> None: with capture_internal_exceptions(): await self._send_envelope(envelope) - self._flush_client_reports() + await self._flush_client_reports() if not self._worker.submit(send_envelope_wrapper): self.on_dropped_event("full_queue") @@ -711,9 +710,9 @@ def capture_envelope(self: Self, envelope: Envelope) -> None: else: # The event loop is no longer running logger.warning("Async Transport is not running in an event loop.") - self.on_dropped_event("no_async_context") + self.on_dropped_event("internal_sdk_error") for item in envelope.items: - self.record_lost_event("no_async_context", item=item) + self.record_lost_event("internal_sdk_error", item=item) def flush( # type: ignore[override] self: Self, From 111861b140b62b9784e93e22ee3b92c6b623e84d Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 13 Aug 2025 15:54:24 +0200 Subject: [PATCH 41/42] ref(transport): Improve transport code quality Refactor transport code based on PR suggestions. Furthermore, add a requirement for the async extra in setup.py GH-4582 --- sentry_sdk/transport.py | 69 ++++++++++++++++------------------------- setup.py | 1 + 2 files changed, 27 insertions(+), 43 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 7f89b01849..bf13119cff 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -235,17 +235,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: self._compression_level = 4 def _create_worker(self, options: dict[str, Any]) -> Worker: - async_enabled = options.get("_experiments", {}).get("transport_async", False) - try: - asyncio.get_running_loop() - worker_cls = ( - AsyncWorker - if async_enabled and ASYNC_TRANSPORT_ENABLED - else BackgroundWorker - ) - except RuntimeError: - worker_cls = BackgroundWorker - return worker_cls(queue_size=options["transport_queue_size"]) + raise NotImplementedError() def record_lost_event( self: Self, @@ -562,6 +552,9 @@ def _send_request( finally: response.close() + def _create_worker(self: Self, options: dict[str, Any]) -> Worker: + return BackgroundWorker(queue_size=options["transport_queue_size"]) + def _flush_client_reports(self: Self, force: bool = False) -> None: client_report = self._fetch_pending_client_report(force=force, interval=60) if client_report is not None: @@ -592,12 +585,7 @@ def flush( if not ASYNC_TRANSPORT_ENABLED: # Sorry, no AsyncHttpTransport for you - class AsyncHttpTransport(BaseHttpTransport): - def __init__(self: Self, options: Dict[str, Any]) -> None: - super().__init__(options) - logger.warning( - "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." - ) + AsyncHttpTransport = BaseHttpTransport else: @@ -608,6 +596,9 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: self.loop = asyncio.get_running_loop() self.background_tasks: set[asyncio.Task[None]] = set() + def _create_worker(self: Self, options: dict[str, Any]) -> Worker: + return AsyncWorker(queue_size=options["transport_queue_size"]) + def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: return next( ( @@ -680,7 +671,7 @@ async def _flush_client_reports(self: Self, force: bool = False) -> None: if client_report is not None: self.capture_envelope(Envelope(items=[client_report])) - async def _capture_envelope(self: Self, envelope: Envelope) -> None: + def _capture_envelope(self: Self, envelope: Envelope) -> None: async def send_envelope_wrapper() -> None: with capture_internal_exceptions(): await self._send_envelope(envelope) @@ -693,26 +684,14 @@ async def send_envelope_wrapper() -> None: def capture_envelope(self: Self, envelope: Envelope) -> None: # Synchronous entry point - try: - asyncio.get_running_loop() - # We are on the main thread running the event loop - task = asyncio.create_task(self._capture_envelope(envelope)) - self.background_tasks.add(task) - task.add_done_callback(self.background_tasks.discard) - except RuntimeError: - # We are in a background thread, not running an event loop, - # have to launch the task on the loop in a threadsafe way. - if self.loop and self.loop.is_running(): - asyncio.run_coroutine_threadsafe( - self._capture_envelope(envelope), - self.loop, - ) - else: - # The event loop is no longer running - logger.warning("Async Transport is not running in an event loop.") - self.on_dropped_event("internal_sdk_error") - for item in envelope.items: - self.record_lost_event("internal_sdk_error", item=item) + if self.loop and self.loop.is_running(): + self.loop.call_soon_threadsafe(self._capture_envelope, envelope) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("internal_sdk_error") + for item in envelope.items: + self.record_lost_event("internal_sdk_error", item=item) def flush( # type: ignore[override] self: Self, @@ -1071,16 +1050,20 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) use_async_transport = options.get("_experiments", {}).get("transport_async", False) # By default, we use the http transport class - if use_async_transport: + transport_cls: Type[Transport] = ( + Http2Transport if use_http2_transport else HttpTransport + ) + if use_async_transport and ASYNC_TRANSPORT_ENABLED: try: asyncio.get_running_loop() - transport_cls: Type[Transport] = AsyncHttpTransport + transport_cls = AsyncHttpTransport except RuntimeError: # No event loop running, fall back to sync transport logger.warning("No event loop running, falling back to sync transport.") - transport_cls = Http2Transport if use_http2_transport else HttpTransport - else: - transport_cls = Http2Transport if use_http2_transport else HttpTransport + elif use_async_transport: + logger.warning( + "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." + ) if isinstance(ref_transport, Transport): return ref_transport diff --git a/setup.py b/setup.py index e4a29d858a..d2d0bf7bac 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,7 @@ def get_file_text(file_name): "flask": ["flask>=0.11", "blinker>=1.1", "markupsafe"], "grpcio": ["grpcio>=1.21.1", "protobuf>=3.8.0"], "http2": ["httpcore[http2]==1.*"], + "asyncio": ["httpcore[asyncio]==1.*"], "httpx": ["httpx>=0.16.0"], "huey": ["huey>=2"], "huggingface_hub": ["huggingface_hub>=0.22"], From 4744817c49d003ece08f2267d8fb0a9cab570d01 Mon Sep 17 00:00:00 2001 From: srothh Date: Wed, 13 Aug 2025 16:06:44 +0200 Subject: [PATCH 42/42] ref(transport): Remove redundant background task set GH-4582 --- sentry_sdk/transport.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index bf13119cff..eec4025048 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -594,7 +594,6 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: super().__init__(options) # Requires event loop at init time self.loop = asyncio.get_running_loop() - self.background_tasks: set[asyncio.Task[None]] = set() def _create_worker(self: Self, options: dict[str, Any]) -> Worker: return AsyncWorker(queue_size=options["transport_queue_size"]) @@ -792,9 +791,6 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore logger.debug("Killing HTTP transport") self._worker.kill() - for task in self.background_tasks: - task.cancel() - self.background_tasks.clear() try: # Return the pool cleanup task so caller can await it if needed return self.loop.create_task(self._pool.aclose()) # type: ignore