Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions ddtrace/testing/internal/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

DEFAULT_TIMEOUT_SECONDS = 15.0
MAX_ATTEMPTS = 5
MAX_RETRY_AFTER_SECONDS = 120.0

log = logging.getLogger(__name__)

Expand All @@ -51,6 +52,7 @@ class BackendResult:
parsed_response: t.Any = None
is_gzip_response: bool = False
elapsed_seconds: float = 0.0
retry_after_seconds: t.Optional[float] = None

def on_error_raise_exception(self) -> None:
if self.error_type:
Expand All @@ -64,7 +66,13 @@ class Subdomain(str, Enum):
CICOVREPRT = "ci-intake"


RETRIABLE_ERRORS = {ErrorType.TIMEOUT, ErrorType.NETWORK, ErrorType.CODE_5XX, ErrorType.BAD_JSON}
RETRIABLE_ERRORS = {
ErrorType.TIMEOUT,
ErrorType.NETWORK,
ErrorType.CODE_5XX,
ErrorType.BAD_JSON,
ErrorType.RATE_LIMITED,
}


class BackendConnectorSetup:
Expand Down Expand Up @@ -286,6 +294,23 @@ def _do_single_request(
result.error_description = f"{result.response.status} {result.response.reason}"
if result.response.status >= 500:
result.error_type = ErrorType.CODE_5XX
elif result.response.status == 429:
result.error_type = ErrorType.RATE_LIMITED
reset_header = result.response.headers.get("X-RateLimit-Reset")
if reset_header is not None:
try:
reset_value = int(reset_header)
now = int(time.time())
if reset_value > now:
# Unix timestamp: wait until that point in time
delay = float(reset_value - now)
else:
# Duration in seconds
delay = float(reset_value)
# Cap to avoid unreasonable waits (e.g. expired timestamp misread as duration)
result.retry_after_seconds = min(delay, MAX_RETRY_AFTER_SECONDS)
except ValueError:
pass # Fall back to exponential backoff in the retry loop
elif result.response.status >= 400:
result.error_type = ErrorType.CODE_4XX
else:
Expand Down Expand Up @@ -352,7 +377,10 @@ def request(
)

if result.error_type and result.error_type in RETRIABLE_ERRORS and attempts_so_far < max_attempts:
delay_seconds = random.uniform(0, (1.618 ** (attempts_so_far - 1))) # nosec: B311
if result.retry_after_seconds is not None:
delay_seconds = result.retry_after_seconds
else:
delay_seconds = random.uniform(0, (1.618 ** (attempts_so_far - 1))) # nosec: B311
log.debug(
"Retrying %s %s in %.3f seconds (%d attempts so far)", method, path, delay_seconds, attempts_so_far
)
Expand Down
7 changes: 5 additions & 2 deletions ddtrace/testing/internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ErrorType(str, Enum):
TIMEOUT = "timeout"
NETWORK = "network"
CODE_4XX = "status_code_4xx_response"
RATE_LIMITED = "rate_limited"
CODE_5XX = "status_code_5xx_response"
BAD_JSON = "bad_json"
UNKNOWN = "unknown"
Expand Down Expand Up @@ -206,7 +207,7 @@ def record_event_payload_error(self, endpoint: str, error: ErrorType) -> None:
# `endpoint_payload.requests_errors` accepts a different set of error types, so we need to convert them here.
if error == ErrorType.TIMEOUT:
endpoint_error = "timeout"
elif error in (ErrorType.CODE_4XX, ErrorType.CODE_5XX):
elif error in (ErrorType.CODE_4XX, ErrorType.RATE_LIMITED, ErrorType.CODE_5XX):
endpoint_error = "status_code"
else:
endpoint_error = "network"
Expand Down Expand Up @@ -313,4 +314,6 @@ def record_request(
self.record_error(error)

def record_error(self, error: ErrorType) -> None:
self.telemetry_api.add_count_metric(self.error, 1, {"error_type": error})
# Map RATE_LIMITED to the same telemetry value as CODE_4XX for cross-language consistency
error_type = ErrorType.CODE_4XX if error == ErrorType.RATE_LIMITED else error
self.telemetry_api.add_count_metric(self.error, 1, {"error_type": error_type})
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
CI Visibility: Fixes an issue where HTTP 429 (Too Many Requests) responses from the Datadog
backend were treated as non-retriable errors, causing CI visibility data to be dropped when
the backend applied rate limiting. The backend connector now retries on 429 responses and
respects the ``X-RateLimit-Reset`` header when present to determine the retry delay.
233 changes: 233 additions & 0 deletions tests/testing/internal/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ddtrace.testing.internal.errors import SetupError
from ddtrace.testing.internal.http import DEFAULT_TIMEOUT_SECONDS
from ddtrace.testing.internal.http import MAX_RETRY_AFTER_SECONDS
from ddtrace.testing.internal.http import BackendConnector
from ddtrace.testing.internal.http import BackendConnectorAgentlessSetup
from ddtrace.testing.internal.http import BackendConnectorEVPProxySetup
Expand Down Expand Up @@ -376,6 +377,238 @@ def test_post_json_unknown_error(self, mock_time: Mock, mock_sleep: Mock, mock_h
call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.UNKNOWN),
]

@patch("http.client.HTTPSConnection")
@patch("time.sleep")
@patch("time.perf_counter", return_value=0.0)
def test_post_json_rate_limited_retry_then_ok(
self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock
) -> None:
mock_response_429 = Mock()
mock_response_429.headers = {}
mock_response_429.read.return_value = b"Rate limited"
mock_response_429.status = 429
mock_response_429.reason = "Too Many Requests"

mock_response_ok = Mock()
mock_response_ok.headers = {"Content-Length": 14}
mock_response_ok.read.return_value = b'{"answer": 42}'
mock_response_ok.status = 200

mock_conn = Mock()
mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok]
mock_https_connection.return_value = mock_conn

mock_telemetry = Mock()

connector = BackendConnector(url="https://api.example.com")
result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry)

assert mock_conn.request.call_args_list == [
call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}),
call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}),
]
assert len(mock_sleep.call_args_list) == 1

assert result.error_type is None
assert result.parsed_response == {"answer": 42}

assert mock_telemetry.record_request.call_args_list == [
call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED),
call(seconds=0.0, response_bytes=14, compressed_response=False, error=None),
]

@patch("http.client.HTTPSConnection")
@patch("time.sleep")
@patch("time.perf_counter", return_value=0.0)
def test_post_json_rate_limited_retry_limit(
self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock
) -> None:
mock_response_429 = Mock()
mock_response_429.headers = {}
mock_response_429.read.return_value = b"Rate limited"
mock_response_429.status = 429
mock_response_429.reason = "Too Many Requests"

mock_conn = Mock()
mock_conn.getresponse.return_value = mock_response_429
mock_https_connection.return_value = mock_conn

mock_telemetry = Mock()

connector = BackendConnector(url="https://api.example.com")
result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry)

assert mock_conn.request.call_args_list == [
call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}),
call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}),
call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}),
call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}),
call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}),
]
assert len(mock_sleep.call_args_list) == 4

assert result.error_type is ErrorType.RATE_LIMITED
assert result.error_description == "429 Too Many Requests"

assert mock_telemetry.record_request.call_args_list == [
call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED),
call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED),
call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED),
call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED),
call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED),
]

@patch("http.client.HTTPSConnection")
@patch("time.sleep")
@patch("time.time", return_value=1700000000)
@patch("time.perf_counter", return_value=0.0)
def test_post_json_rate_limited_uses_header_unix_timestamp(
self, mock_perf: Mock, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock
) -> None:
"""When X-RateLimit-Reset is a future Unix timestamp, sleep until that point."""
reset_timestamp = 1700000000 + 60 # 60 seconds in the future

mock_response_429 = Mock()
mock_response_429.headers = {"X-RateLimit-Reset": str(reset_timestamp)}
mock_response_429.read.return_value = b"Rate limited"
mock_response_429.status = 429
mock_response_429.reason = "Too Many Requests"

mock_response_ok = Mock()
mock_response_ok.headers = {"Content-Length": 14}
mock_response_ok.read.return_value = b'{"answer": 42}'
mock_response_ok.status = 200

mock_conn = Mock()
mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok]
mock_https_connection.return_value = mock_conn

connector = BackendConnector(url="https://api.example.com")
result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock())

assert result.error_type is None
mock_sleep.assert_called_once_with(60.0)

@patch("http.client.HTTPSConnection")
@patch("time.sleep")
@patch("time.time", return_value=1700000000)
@patch("time.perf_counter", return_value=0.0)
def test_post_json_rate_limited_uses_header_duration(
self, mock_perf: Mock, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock
) -> None:
"""When X-RateLimit-Reset is a small value (≤ current time), treat it as a duration in seconds."""
mock_response_429 = Mock()
mock_response_429.headers = {"X-RateLimit-Reset": "30"}
mock_response_429.read.return_value = b"Rate limited"
mock_response_429.status = 429
mock_response_429.reason = "Too Many Requests"

mock_response_ok = Mock()
mock_response_ok.headers = {"Content-Length": 14}
mock_response_ok.read.return_value = b'{"answer": 42}'
mock_response_ok.status = 200

mock_conn = Mock()
mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok]
mock_https_connection.return_value = mock_conn

connector = BackendConnector(url="https://api.example.com")
result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock())

assert result.error_type is None
mock_sleep.assert_called_once_with(30.0)

@patch("http.client.HTTPSConnection")
@patch("time.sleep")
@patch("time.time", return_value=1700000000)
@patch("time.perf_counter", return_value=0.0)
def test_post_json_rate_limited_caps_retry_delay(
self, mock_perf: Mock, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock
) -> None:
"""Retry delay is capped at 120 seconds to avoid unreasonable waits."""
reset_timestamp = 1700000000 + 600 # 600 seconds in the future, exceeds 120s cap

mock_response_429 = Mock()
mock_response_429.headers = {"X-RateLimit-Reset": str(reset_timestamp)}
mock_response_429.read.return_value = b"Rate limited"
mock_response_429.status = 429
mock_response_429.reason = "Too Many Requests"

mock_response_ok = Mock()
mock_response_ok.headers = {"Content-Length": 14}
mock_response_ok.read.return_value = b'{"answer": 42}'
mock_response_ok.status = 200

mock_conn = Mock()
mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok]
mock_https_connection.return_value = mock_conn

connector = BackendConnector(url="https://api.example.com")
result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock())

assert result.error_type is None
mock_sleep.assert_called_once_with(MAX_RETRY_AFTER_SECONDS)

@patch("http.client.HTTPSConnection")
@patch("random.uniform", return_value=0.5)
@patch("time.sleep")
@patch("time.perf_counter", return_value=0.0)
def test_post_json_rate_limited_falls_back_to_exponential_backoff_without_header(
self, mock_perf: Mock, mock_sleep: Mock, mock_uniform: Mock, mock_https_connection: Mock
) -> None:
"""When no X-RateLimit-Reset header is present, exponential backoff is used."""
mock_response_429 = Mock()
mock_response_429.headers = {}
mock_response_429.read.return_value = b"Rate limited"
mock_response_429.status = 429
mock_response_429.reason = "Too Many Requests"

mock_response_ok = Mock()
mock_response_ok.headers = {"Content-Length": 14}
mock_response_ok.read.return_value = b'{"answer": 42}'
mock_response_ok.status = 200

mock_conn = Mock()
mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok]
mock_https_connection.return_value = mock_conn

connector = BackendConnector(url="https://api.example.com")
result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock())

assert result.error_type is None
mock_uniform.assert_called_once()
mock_sleep.assert_called_once_with(0.5)

@patch("http.client.HTTPSConnection")
@patch("random.uniform", return_value=0.5)
@patch("time.sleep")
@patch("time.perf_counter", return_value=0.0)
def test_post_json_rate_limited_falls_back_to_exponential_backoff_with_invalid_header(
self, mock_perf: Mock, mock_sleep: Mock, mock_uniform: Mock, mock_https_connection: Mock
) -> None:
"""When X-RateLimit-Reset header is non-numeric, exponential backoff is used."""
mock_response_429 = Mock()
mock_response_429.headers = {"X-RateLimit-Reset": "not-a-number"}
mock_response_429.read.return_value = b"Rate limited"
mock_response_429.status = 429
mock_response_429.reason = "Too Many Requests"

mock_response_ok = Mock()
mock_response_ok.headers = {"Content-Length": 14}
mock_response_ok.read.return_value = b'{"answer": 42}'
mock_response_ok.status = 200

mock_conn = Mock()
mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok]
mock_https_connection.return_value = mock_conn

connector = BackendConnector(url="https://api.example.com")
result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock())

assert result.error_type is None
mock_uniform.assert_called_once()
mock_sleep.assert_called_once_with(0.5)

@patch("http.client.HTTPSConnection")
@patch("uuid.uuid4")
def test_post_files_multiple_files(self, mock_uuid: Mock, mock_https_connection: Mock) -> None:
Expand Down
31 changes: 29 additions & 2 deletions tests/testing/internal/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def telemetry_api() -> t.Generator[TelemetryAPI, None, None]:
api = TelemetryAPI(connector_setup=Mock())

mock_writer = Mock()
api.writer = mock_writer
api.writer = mock_writer # type: ignore[assignment]

yield api
yield api # type: ignore[misc]


class TestTelemetry:
Expand Down Expand Up @@ -91,6 +91,32 @@ def test_record_request_without_response_bytes(self, telemetry_api: TelemetryAPI
call(CIVISIBILITY, "known_tests.request_ms", 1.41, ()),
]

def test_record_request_rate_limited_maps_to_4xx(self, telemetry_api: TelemetryAPI) -> None:
"""RATE_LIMITED is emitted as status_code_4xx_response for cross-language consistency."""
request_telemetry = telemetry_api.with_request_metric_names(
count="known_tests.request",
duration="known_tests.request_ms",
response_bytes="known_tests.response_bytes",
error="known_tests.request_errors",
)

request_telemetry.record_request(
seconds=1.41,
response_bytes=42,
compressed_response=False,
error=ErrorType.RATE_LIMITED,
)

assert telemetry_api.writer.add_count_metric.call_args_list == [
call(CIVISIBILITY, "known_tests.request", 1, ()),
call(
CIVISIBILITY,
"known_tests.request_errors",
1,
(("error_type", ErrorType.CODE_4XX.value),),
),
]

def test_record_request_without_error(self, telemetry_api: TelemetryAPI) -> None:
request_telemetry = telemetry_api.with_request_metric_names(
count="known_tests.request",
Expand Down Expand Up @@ -315,6 +341,7 @@ def test_record_event_payload_ok(self, telemetry_api: TelemetryAPI) -> None:
(ErrorType.NETWORK, "network"),
(ErrorType.CODE_4XX, "status_code"),
(ErrorType.CODE_5XX, "status_code"),
(ErrorType.RATE_LIMITED, "status_code"),
(ErrorType.BAD_JSON, "network"),
(ErrorType.UNKNOWN, "network"),
],
Expand Down
Loading