Skip to content

Integrate async transport with SDK #4615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 107 commits into
base: srothh/async-transport
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
63b1f24
ref(transport): Added shared sync/async transport superclass and crea…
srothh Jul 11, 2025
666ff3a
ref(transport) Removed Todo and reverted class name change
srothh Jul 11, 2025
748764e
test(transport): Add test for HTTP error status handling
srothh Jul 11, 2025
ee6dbee
test(transport): Restore accidentally removed comments
srothh Jul 11, 2025
19405fd
ref(transport) Refactor class names to reflect previous functionality
srothh Jul 14, 2025
3736c03
ref(transport): Add flush_async in the Transport abc
srothh Jul 17, 2025
3607d44
ref(transport): Move flush_async from ABC
srothh Jul 17, 2025
0ba5a83
ref(transport): add async type annotations to HTTPTransportCore
srothh Jul 23, 2025
9bb628e
ref(transport): Add abstract base class for worker implementation
srothh Jul 14, 2025
a81487e
ref(transport): Add _create_worker factory method to Transport
srothh Jul 14, 2025
8960e6f
ref(worker): Add flush_async method to Worker ABC
srothh Jul 17, 2025
0f7937b
ref(worker): Move worker flush_async from Worker ABC
srothh Jul 17, 2025
268ea1a
ref(worker): Amend function signature for coroutines
srothh Jul 17, 2025
b3c05cc
feat(transport): Add an async task-based worker for transport
srothh Jul 17, 2025
fb0ad18
ref(worker): Make worker work with new ABC interface
srothh Jul 17, 2025
7edbbaf
fix(worker): Check if callbacks from worker queue are coroutines or f…
srothh Jul 17, 2025
0f63d24
ref(worker): Amend return type of submit and flush to accomodate for …
srothh Jul 17, 2025
2430e2e
ref(worker): Add type parameters for AsyncWorker variables
srothh Jul 17, 2025
96fcd85
ref(worker): Remove loop upon killing worker
srothh Jul 17, 2025
331e40b
feat(worker): Enable concurrent callbacks on async task worker
srothh Jul 18, 2025
5f67485
fix(worker): Modify kill behaviour to mirror threaded worker
srothh Jul 18, 2025
97c5e3d
ref(worker): add proper type annotation to worker task list
srothh Jul 21, 2025
8809b08
feat(transport): Add async transport class
srothh Jul 21, 2025
7c5dec0
ref(transport): Fix event loop handling in async transport
srothh Jul 22, 2025
b0390e6
feat(transport): Add kill method for async transport
srothh Jul 22, 2025
f01b00d
ref(transport): Fix type errors in async transport
srothh Jul 23, 2025
23b8ea2
Add silent failing to kill on event loop errors
srothh Jul 24, 2025
176a1d1
ref(transport): Fix event loop check in make_transport
srothh Jul 24, 2025
4fe61bf
ref(transport): Add missing transport instantiation in non-async context
srothh Jul 24, 2025
52c9e36
ref(transport): Fix httpcore async specific request handling
srothh Jul 25, 2025
6d69406
ref(transport): Add gc safety to async kill
srothh Jul 25, 2025
3629609
ref(transport): Add missing httpcore extensions
srothh Jul 25, 2025
21cde52
fix(transport): Fix fallback sync transport creating async worker
srothh Jul 28, 2025
c541bd7
ref(transport): Make kill optionally return a task for async
srothh Jul 29, 2025
2808062
Integrate AsyncHttpTransport as a new experimental option
srothh Jul 22, 2025
ea5f557
ref(transport): Fix type issues in AsyncTransport
srothh Jul 22, 2025
c61eb02
ref(transport): Add missing async transport loop type annotation
srothh Jul 23, 2025
38baead
fix(client): Fix mypy type errors
srothh Jul 23, 2025
e4ed773
ref(client): Fix mypy inheritance type error
srothh Jul 23, 2025
236ae2c
ref(client): Move mypy annotations to correct place
srothh Jul 23, 2025
9dd546c
ref(client): Add event loop handling to client flush/close
srothh Jul 24, 2025
f4ac157
ref(client): Moved close done callback into async task
srothh Jul 24, 2025
ed392e9
ref(client): Move timeout check in client to properly cover async
srothh Jul 25, 2025
50553d4
feat(asyncio): Add patching for loop.close in asyncio
srothh Jul 25, 2025
4a7b8ce
ref(asyncio): Fix mypy type annotation errors
srothh Jul 25, 2025
cd8a35f
ref(client): Fix redundant async flush helper flushes
srothh Jul 25, 2025
b9f2ec7
ref(client): remove wrongful indent
srothh Jul 25, 2025
98d74ed
test(transport): Add inital transport tests
srothh Jul 28, 2025
9df5ec5
fix(requirements): Fix requirements for async transport testing on py…
srothh Jul 28, 2025
23d8740
fix(dependencies): Remove version constraint from httpcore
srothh Jul 28, 2025
a496787
fix(dependencies): Version guards for correct httpcore version
srothh Jul 28, 2025
32a9abd
ref(tox): remove anyio version pin
srothh Jul 28, 2025
a69f7bb
Revert "ref(tox): remove anyio version pin"
srothh Jul 28, 2025
c80b095
ref(dependencies): Revert dependency changes
srothh Jul 28, 2025
5904968
ref(test): Remove flush from async transport test
srothh Jul 28, 2025
09034b7
ref(client): Remove wrongful indents in client sync flush
srothh Jul 28, 2025
21b1cda
fix(testing): Changed httpx anyio version pin to >=3, <5
srothh Jul 29, 2025
9d0cde4
fix(test): Properly modify httpx anyio pin
srothh Jul 29, 2025
f21e2ea
fix(test): Add fastapi anyio pin for <0.8
srothh Jul 29, 2025
76aae83
feat(test): Add tests for specific async transport functionality
srothh Jul 29, 2025
4c1e99b
feat(test): Add flush to async transport test
srothh Jul 29, 2025
6df7037
ref(client): Adapt client for blocking kill in async transport
srothh Jul 29, 2025
25c04fc
ref(client): Fix transport shutdown if loop is not running
srothh Jul 29, 2025
e8d889c
ref(test): Add config tests for SSL/proxy for async
srothh Jul 29, 2025
a644465
feat(transport): Add async transport class
srothh Jul 21, 2025
c935e9e
ref(transport): Fix event loop handling in async transport
srothh Jul 22, 2025
b90daf4
feat(transport): Add kill method for async transport
srothh Jul 22, 2025
e1d7cdb
ref(transport): Fix type errors in async transport
srothh Jul 23, 2025
90346a5
Add silent failing to kill on event loop errors
srothh Jul 24, 2025
47416f4
ref(transport): Fix event loop check in make_transport
srothh Jul 24, 2025
73cdc6d
ref(transport): Add missing transport instantiation in non-async context
srothh Jul 24, 2025
1ae8708
ref(transport): Fix httpcore async specific request handling
srothh Jul 25, 2025
6f18657
ref(transport): Add gc safety to async kill
srothh Jul 25, 2025
87a9b2f
ref(transport): Add missing httpcore extensions
srothh Jul 25, 2025
4fd7fa0
fix(transport): Fix fallback sync transport creating async worker
srothh Jul 28, 2025
69734cd
ref(transport): Make kill optionally return a task for async
srothh Jul 29, 2025
f5ef707
ref(transport): Adapt transport for synchronous flush interface
srothh Jul 30, 2025
fca8740
ref(transport): Fix mypy error
srothh Jul 30, 2025
4b0d09b
Merge branch 'srothh/async-transport' into srothh/async-transport-int…
srothh Jul 30, 2025
e23efd7
ref(transport): Make client work with sync flush changes
srothh Jul 30, 2025
55b606a
ref(client): Properly add client changes for sync flush
srothh Jul 30, 2025
d89abed
feat(transport): Add async transport class
srothh Jul 21, 2025
5e1e0c6
ref(transport): Fix event loop handling in async transport
srothh Jul 22, 2025
8fdf43d
feat(transport): Add kill method for async transport
srothh Jul 22, 2025
6619670
ref(transport): Fix type errors in async transport
srothh Jul 23, 2025
2eee1b1
Add silent failing to kill on event loop errors
srothh Jul 24, 2025
6c787a4
ref(transport): Fix event loop check in make_transport
srothh Jul 24, 2025
b79d346
ref(transport): Add missing transport instantiation in non-async context
srothh Jul 24, 2025
1717888
ref(transport): Fix httpcore async specific request handling
srothh Jul 25, 2025
e1fd57a
ref(transport): Add gc safety to async kill
srothh Jul 25, 2025
b87c68e
ref(transport): Add missing httpcore extensions
srothh Jul 25, 2025
a827d0d
fix(transport): Fix fallback sync transport creating async worker
srothh Jul 28, 2025
ee0b440
ref(transport): Make kill optionally return a task for async
srothh Jul 29, 2025
70f228e
ref(transport): Adapt transport for synchronous flush interface
srothh Jul 30, 2025
328d8ad
ref(transport): Fix mypy error
srothh Jul 30, 2025
ef61134
ref(transport): Make transport loop public
srothh Jul 30, 2025
ad93516
Merge branch 'srothh/async-transport' into srothh/async-transport-int…
srothh Jul 30, 2025
42d3a34
ref(transport): Add import checking for async transport
srothh Jul 30, 2025
10d85f6
ref(transport): Fix typing errors
srothh Jul 30, 2025
aaae195
ref(transport): Fix import checking
srothh Jul 30, 2025
de47da2
Merge branch 'srothh/async-transport' into srothh/async-transport-int…
srothh Jul 30, 2025
6e2c4f6
ref(client): Fix type checking with fallback asynctransport
srothh Jul 30, 2025
9da7be8
ref(asyncio): Refactor loop close patch in asyncio integration
srothh Jul 31, 2025
8a5ab06
ref(client): Split client flush into seperate function for readability
srothh Jul 31, 2025
295a0e9
ref(transport): Refactor async transport to be more aligned with sync
srothh Jul 31, 2025
e754a85
Merge branch 'srothh/async-transport' into srothh/async-transport-int…
srothh Jul 31, 2025
9f226cf
Merge branch 'srothh/async-transport' into srothh/async-transport-int…
srothh Jul 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-testing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ asttokens
responses
pysocks
socksio
httpcore[http2]
httpcore[http2,asyncio]
setuptools
freezegun
Brotli
Expand Down
3 changes: 2 additions & 1 deletion scripts/populate_tox/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"pytest-asyncio",
"python-multipart",
"requests",
"anyio<4",
"anyio>=3,<5",
],
# There's an incompatibility between FastAPI's TestClient, which is
# actually Starlette's TestClient, which is actually httpx's Client.
Expand All @@ -106,6 +106,7 @@
# FastAPI versions we use older httpx which still supports the
# deprecated argument.
"<0.110.1": ["httpx<0.28.0"],
"<0.80": ["anyio<4"],
"py3.6": ["aiocontextvars"],
},
},
Expand Down
2 changes: 1 addition & 1 deletion scripts/populate_tox/tox.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ deps =
httpx-v0.25: pytest-httpx==0.25.0
httpx: pytest-httpx
# anyio is a dep of httpx
httpx: anyio<4.0.0
httpx: anyio>=3,<5
httpx-v0.16: httpx~=0.16.0
httpx-v0.18: httpx~=0.18.0
httpx-v0.20: httpx~=0.20.0
Expand Down
118 changes: 97 additions & 21 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
import random
import socket
import asyncio
from collections.abc import Mapping
from datetime import datetime, timezone
from importlib import import_module
Expand All @@ -25,7 +26,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.transport import HttpTransportCore, make_transport
from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport
from sentry_sdk.consts import (
SPANDATA,
DEFAULT_MAX_VALUE_LENGTH,
Expand Down Expand Up @@ -917,51 +918,126 @@ def get_integration(

return self.integrations.get(integration_name)

def close(
def _close_transport(self) -> Optional[asyncio.Task[None]]:
"""Close transport and return cleanup task if any."""
if self.transport is not None:
cleanup_task = self.transport.kill() # type: ignore
self.transport = None
return cleanup_task
return None

def _close_components(self) -> None:
"""Kill all client components in the correct order."""
self.session_flusher.kill()
if self.log_batcher is not None:
self.log_batcher.kill()
if self.monitor:
self.monitor.kill()

async def _close_components_async(self) -> None:
"""Async version of _close_components that properly awaits transport cleanup."""
self._close_components()
cleanup_task = self._close_transport()
if cleanup_task is not None:
await cleanup_task

def close( # type: ignore[override]
self,
timeout: Optional[float] = None,
callback: Optional[Callable[[int, float], None]] = None,
) -> None:
) -> Optional[asyncio.Task[None]]:
"""
Close the client and shut down the transport. Arguments have the same
semantics as :py:meth:`Client.flush`.
semantics as :py:meth:`Client.flush`. When using the async transport, close needs to be awaited to block.
"""
if self.transport is not None:
self.flush(timeout=timeout, callback=callback)

self.session_flusher.kill()
async def _flush_and_close(
timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
) -> None:

if self.log_batcher is not None:
self.log_batcher.kill()
await self._flush_async(timeout=timeout, callback=callback)
await self._close_components_async()

if self.monitor:
self.monitor.kill()
if self.transport is not None:
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
self.transport, "loop"
):

self.transport.kill()
self.transport = None
try:
flush_task = self.transport.loop.create_task(
_flush_and_close(timeout, callback)
)
except RuntimeError:
# Shutdown the components anyway
self._close_components()
self._close_transport()
logger.warning("Event loop not running, aborting close.")
return None
# Enforce flush before shutdown
return flush_task
else:
self.flush(timeout=timeout, callback=callback)
self._close_components()
self._close_transport()

return None

def flush(
def flush( # type: ignore[override]
self,
timeout: Optional[float] = None,
callback: Optional[Callable[[int, float], None]] = None,
) -> None:
) -> Optional[asyncio.Task[None]]:
"""
Wait for the current events to be sent.
Wait for the current events to be sent. When using the async transport, flush needs to be awaited to block.

:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.

:param callback: Is invoked with the number of pending events and the configured timeout.
"""
if self.transport is not None:
if timeout is None:
timeout = self.options["shutdown_timeout"]
self.session_flusher.flush()
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
self.transport, "loop"
):
try:
return self.transport.loop.create_task(
self._flush_async(timeout, callback)
)
except RuntimeError:
logger.warning("Event loop not running, aborting flush.")
return None
else:
self._flush_sync(timeout, callback)
return None

if self.log_batcher is not None:
self.log_batcher.flush()
def _flush_sync(
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
) -> None:
"""Synchronous flush implementation."""
if timeout is None:
timeout = self.options["shutdown_timeout"]

self._flush_components()
if self.transport is not None:
self.transport.flush(timeout=timeout, callback=callback)

async def _flush_async(
self, timeout: Optional[float], callback: Optional[Callable[[int, float], None]]
) -> None:
"""Asynchronous flush implementation."""
if timeout is None:
timeout = self.options["shutdown_timeout"]

self._flush_components()
if self.transport is not None:
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore
if flush_task is not None:
await flush_task

def _flush_components(self) -> None:
self.session_flusher.flush()
if self.log_batcher is not None:
self.log_batcher.flush()

def __enter__(self) -> _Client:
return self

Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class CompressionAlgo(Enum):
"transport_compression_algo": Optional[CompressionAlgo],
"transport_num_pools": Optional[int],
"transport_http2": Optional[bool],
"transport_async": Optional[bool],
"enable_logs": Optional[bool],
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
},
Expand Down
43 changes: 43 additions & 0 deletions sentry_sdk/integrations/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sentry_sdk.consts import OP
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.utils import event_from_exception, logger, reraise
from sentry_sdk.transport import AsyncHttpTransport

try:
import asyncio
Expand All @@ -29,6 +30,47 @@ def get_name(coro: Any) -> str:
)


def patch_loop_close() -> None:
"""Patch loop.close to flush pending events before shutdown."""
# Atexit shutdown hook happens after the event loop is closed.
# Therefore, it is necessary to patch the loop.close method to ensure
# that pending events are flushed before the interpreter shuts down.
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop → cannot patch now
return

if getattr(loop, "_sentry_flush_patched", False):
return

async def _flush() -> None:
client = sentry_sdk.get_client()
if not client:
return

try:
if not isinstance(client.transport, AsyncHttpTransport):
return

task = client.close() # type: ignore
if task is not None:
await task
except Exception:
logger.warning("Sentry flush failed during loop shutdown", exc_info=True)

orig_close = loop.close

def _patched_close() -> None:
try:
loop.run_until_complete(_flush())
finally:
orig_close()

loop.close = _patched_close # type: ignore
loop._sentry_flush_patched = True # type: ignore


def patch_asyncio() -> None:
orig_task_factory = None
try:
Expand Down Expand Up @@ -124,3 +166,4 @@ class AsyncioIntegration(Integration):
@staticmethod
def setup_once() -> None:
patch_asyncio()
patch_loop_close()
3 changes: 3 additions & 0 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import ssl
import time
import asyncio

from datetime import datetime, timedelta, timezone
from collections import defaultdict
from urllib.request import getproxies
Expand All @@ -29,6 +30,8 @@
HTTP2_ENABLED = False

try:
import anyio # noqa: F401

ASYNC_TRANSPORT_ENABLED = httpcore is not None
except ImportError:
ASYNC_TRANSPORT_ENABLED = False
Expand Down
49 changes: 49 additions & 0 deletions tests/integrations/asyncio/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,52 @@ async def test_span_origin(

assert event["contexts"]["trace"]["origin"] == "manual"
assert event["spans"][0]["origin"] == "auto.function.asyncio"


@minimum_python_38
def test_loop_close_patching(sentry_init):
sentry_init(integrations=[AsyncioIntegration()])

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
with patch("asyncio.get_running_loop", return_value=loop):
assert not hasattr(loop, "_sentry_flush_patched")
AsyncioIntegration.setup_once()
assert hasattr(loop, "_sentry_flush_patched")
assert loop._sentry_flush_patched is True

finally:
if not loop.is_closed():
loop.close()


@minimum_python_38
def test_loop_close_flushes_async_transport(sentry_init):
from sentry_sdk.transport import AsyncHttpTransport
from unittest.mock import Mock, AsyncMock

sentry_init(integrations=[AsyncioIntegration()])

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
with patch("asyncio.get_running_loop", return_value=loop):
AsyncioIntegration.setup_once()

mock_client = Mock()
mock_transport = Mock(spec=AsyncHttpTransport)
mock_client.transport = mock_transport
mock_client.close = AsyncMock(return_value=None)

with patch("sentry_sdk.get_client", return_value=mock_client):
loop.close()

mock_client.close.assert_called_once()
mock_client.close.assert_awaited_once()

except Exception:
if not loop.is_closed():
loop.close()
Loading
Loading