diff --git a/ddtrace/testing/internal/http.py b/ddtrace/testing/internal/http.py index 235fb733787..f1542b0c441 100644 --- a/ddtrace/testing/internal/http.py +++ b/ddtrace/testing/internal/http.py @@ -31,6 +31,7 @@ DEFAULT_TIMEOUT_SECONDS = 15.0 MAX_ATTEMPTS = 5 +MAX_RETRY_AFTER_SECONDS = 120.0 log = logging.getLogger(__name__) @@ -51,6 +52,7 @@ class BackendResult: parsed_response: t.Any = None is_gzip_response: bool = False elapsed_seconds: float = 0.0 + retry_after_seconds: t.Optional[float] = None def on_error_raise_exception(self) -> None: if self.error_type: @@ -64,7 +66,13 @@ class Subdomain(str, Enum): CICOVREPRT = "ci-intake" -RETRIABLE_ERRORS = {ErrorType.TIMEOUT, ErrorType.NETWORK, ErrorType.CODE_5XX, ErrorType.BAD_JSON} +RETRIABLE_ERRORS = { + ErrorType.TIMEOUT, + ErrorType.NETWORK, + ErrorType.CODE_5XX, + ErrorType.BAD_JSON, + ErrorType.RATE_LIMITED, +} class BackendConnectorSetup: @@ -286,6 +294,23 @@ def _do_single_request( result.error_description = f"{result.response.status} {result.response.reason}" if result.response.status >= 500: result.error_type = ErrorType.CODE_5XX + elif result.response.status == 429: + result.error_type = ErrorType.RATE_LIMITED + reset_header = result.response.headers.get("X-RateLimit-Reset") + if reset_header is not None: + try: + reset_value = int(reset_header) + now = int(time.time()) + if reset_value > now: + # Unix timestamp: wait until that point in time + delay = float(reset_value - now) + else: + # Duration in seconds + delay = float(reset_value) + # Cap to avoid unreasonable waits (e.g. expired timestamp misread as duration) + result.retry_after_seconds = min(delay, MAX_RETRY_AFTER_SECONDS) + except ValueError: + pass # Fall back to exponential backoff in the retry loop elif result.response.status >= 400: result.error_type = ErrorType.CODE_4XX else: @@ -352,7 +377,10 @@ def request( ) if result.error_type and result.error_type in RETRIABLE_ERRORS and attempts_so_far < max_attempts: - delay_seconds = random.uniform(0, (1.618 ** (attempts_so_far - 1))) # nosec: B311 + if result.retry_after_seconds is not None: + delay_seconds = result.retry_after_seconds + else: + delay_seconds = random.uniform(0, (1.618 ** (attempts_so_far - 1))) # nosec: B311 log.debug( "Retrying %s %s in %.3f seconds (%d attempts so far)", method, path, delay_seconds, attempts_so_far ) diff --git a/ddtrace/testing/internal/telemetry.py b/ddtrace/testing/internal/telemetry.py index c165134da5a..0569bb0a61d 100644 --- a/ddtrace/testing/internal/telemetry.py +++ b/ddtrace/testing/internal/telemetry.py @@ -28,6 +28,7 @@ class ErrorType(str, Enum): TIMEOUT = "timeout" NETWORK = "network" CODE_4XX = "status_code_4xx_response" + RATE_LIMITED = "rate_limited" CODE_5XX = "status_code_5xx_response" BAD_JSON = "bad_json" UNKNOWN = "unknown" @@ -206,7 +207,7 @@ def record_event_payload_error(self, endpoint: str, error: ErrorType) -> None: # `endpoint_payload.requests_errors` accepts a different set of error types, so we need to convert them here. if error == ErrorType.TIMEOUT: endpoint_error = "timeout" - elif error in (ErrorType.CODE_4XX, ErrorType.CODE_5XX): + elif error in (ErrorType.CODE_4XX, ErrorType.RATE_LIMITED, ErrorType.CODE_5XX): endpoint_error = "status_code" else: endpoint_error = "network" @@ -313,4 +314,6 @@ def record_request( self.record_error(error) def record_error(self, error: ErrorType) -> None: - self.telemetry_api.add_count_metric(self.error, 1, {"error_type": error}) + # Map RATE_LIMITED to the same telemetry value as CODE_4XX for cross-language consistency + error_type = ErrorType.CODE_4XX if error == ErrorType.RATE_LIMITED else error + self.telemetry_api.add_count_metric(self.error, 1, {"error_type": error_type}) diff --git a/releasenotes/notes/ci-visibility-handle-rate-limiting-d7df3d047661bbd9.yaml b/releasenotes/notes/ci-visibility-handle-rate-limiting-d7df3d047661bbd9.yaml new file mode 100644 index 00000000000..24f061c83b7 --- /dev/null +++ b/releasenotes/notes/ci-visibility-handle-rate-limiting-d7df3d047661bbd9.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + CI Visibility: Fixes an issue where HTTP 429 (Too Many Requests) responses from the Datadog + backend were treated as non-retriable errors, causing CI visibility data to be dropped when + the backend applied rate limiting. The backend connector now retries on 429 responses and + respects the ``X-RateLimit-Reset`` header when present to determine the retry delay. diff --git a/tests/testing/internal/test_http.py b/tests/testing/internal/test_http.py index 5bba6aad287..36388eb1d9e 100644 --- a/tests/testing/internal/test_http.py +++ b/tests/testing/internal/test_http.py @@ -10,6 +10,7 @@ from ddtrace.testing.internal.errors import SetupError from ddtrace.testing.internal.http import DEFAULT_TIMEOUT_SECONDS +from ddtrace.testing.internal.http import MAX_RETRY_AFTER_SECONDS from ddtrace.testing.internal.http import BackendConnector from ddtrace.testing.internal.http import BackendConnectorAgentlessSetup from ddtrace.testing.internal.http import BackendConnectorEVPProxySetup @@ -376,6 +377,238 @@ def test_post_json_unknown_error(self, mock_time: Mock, mock_sleep: Mock, mock_h call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.UNKNOWN), ] + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_rate_limited_retry_then_ok( + self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock + ) -> None: + mock_response_429 = Mock() + mock_response_429.headers = {} + mock_response_429.read.return_value = b"Rate limited" + mock_response_429.status = 429 + mock_response_429.reason = "Too Many Requests" + + mock_response_ok = Mock() + mock_response_ok.headers = {"Content-Length": 14} + mock_response_ok.read.return_value = b'{"answer": 42}' + mock_response_ok.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok] + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert len(mock_sleep.call_args_list) == 1 + + assert result.error_type is None + assert result.parsed_response == {"answer": 42} + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED), + call(seconds=0.0, response_bytes=14, compressed_response=False, error=None), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_rate_limited_retry_limit( + self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock + ) -> None: + mock_response_429 = Mock() + mock_response_429.headers = {} + mock_response_429.read.return_value = b"Rate limited" + mock_response_429.status = 429 + mock_response_429.reason = "Too Many Requests" + + mock_conn = Mock() + mock_conn.getresponse.return_value = mock_response_429 + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert len(mock_sleep.call_args_list) == 4 + + assert result.error_type is ErrorType.RATE_LIMITED + assert result.error_description == "429 Too Many Requests" + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.RATE_LIMITED), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.time", return_value=1700000000) + @patch("time.perf_counter", return_value=0.0) + def test_post_json_rate_limited_uses_header_unix_timestamp( + self, mock_perf: Mock, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock + ) -> None: + """When X-RateLimit-Reset is a future Unix timestamp, sleep until that point.""" + reset_timestamp = 1700000000 + 60 # 60 seconds in the future + + mock_response_429 = Mock() + mock_response_429.headers = {"X-RateLimit-Reset": str(reset_timestamp)} + mock_response_429.read.return_value = b"Rate limited" + mock_response_429.status = 429 + mock_response_429.reason = "Too Many Requests" + + mock_response_ok = Mock() + mock_response_ok.headers = {"Content-Length": 14} + mock_response_ok.read.return_value = b'{"answer": 42}' + mock_response_ok.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok] + mock_https_connection.return_value = mock_conn + + connector = BackendConnector(url="https://api.example.com") + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock()) + + assert result.error_type is None + mock_sleep.assert_called_once_with(60.0) + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.time", return_value=1700000000) + @patch("time.perf_counter", return_value=0.0) + def test_post_json_rate_limited_uses_header_duration( + self, mock_perf: Mock, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock + ) -> None: + """When X-RateLimit-Reset is a small value (≤ current time), treat it as a duration in seconds.""" + mock_response_429 = Mock() + mock_response_429.headers = {"X-RateLimit-Reset": "30"} + mock_response_429.read.return_value = b"Rate limited" + mock_response_429.status = 429 + mock_response_429.reason = "Too Many Requests" + + mock_response_ok = Mock() + mock_response_ok.headers = {"Content-Length": 14} + mock_response_ok.read.return_value = b'{"answer": 42}' + mock_response_ok.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok] + mock_https_connection.return_value = mock_conn + + connector = BackendConnector(url="https://api.example.com") + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock()) + + assert result.error_type is None + mock_sleep.assert_called_once_with(30.0) + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.time", return_value=1700000000) + @patch("time.perf_counter", return_value=0.0) + def test_post_json_rate_limited_caps_retry_delay( + self, mock_perf: Mock, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock + ) -> None: + """Retry delay is capped at 120 seconds to avoid unreasonable waits.""" + reset_timestamp = 1700000000 + 600 # 600 seconds in the future, exceeds 120s cap + + mock_response_429 = Mock() + mock_response_429.headers = {"X-RateLimit-Reset": str(reset_timestamp)} + mock_response_429.read.return_value = b"Rate limited" + mock_response_429.status = 429 + mock_response_429.reason = "Too Many Requests" + + mock_response_ok = Mock() + mock_response_ok.headers = {"Content-Length": 14} + mock_response_ok.read.return_value = b'{"answer": 42}' + mock_response_ok.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok] + mock_https_connection.return_value = mock_conn + + connector = BackendConnector(url="https://api.example.com") + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock()) + + assert result.error_type is None + mock_sleep.assert_called_once_with(MAX_RETRY_AFTER_SECONDS) + + @patch("http.client.HTTPSConnection") + @patch("random.uniform", return_value=0.5) + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_rate_limited_falls_back_to_exponential_backoff_without_header( + self, mock_perf: Mock, mock_sleep: Mock, mock_uniform: Mock, mock_https_connection: Mock + ) -> None: + """When no X-RateLimit-Reset header is present, exponential backoff is used.""" + mock_response_429 = Mock() + mock_response_429.headers = {} + mock_response_429.read.return_value = b"Rate limited" + mock_response_429.status = 429 + mock_response_429.reason = "Too Many Requests" + + mock_response_ok = Mock() + mock_response_ok.headers = {"Content-Length": 14} + mock_response_ok.read.return_value = b'{"answer": 42}' + mock_response_ok.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok] + mock_https_connection.return_value = mock_conn + + connector = BackendConnector(url="https://api.example.com") + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock()) + + assert result.error_type is None + mock_uniform.assert_called_once() + mock_sleep.assert_called_once_with(0.5) + + @patch("http.client.HTTPSConnection") + @patch("random.uniform", return_value=0.5) + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_rate_limited_falls_back_to_exponential_backoff_with_invalid_header( + self, mock_perf: Mock, mock_sleep: Mock, mock_uniform: Mock, mock_https_connection: Mock + ) -> None: + """When X-RateLimit-Reset header is non-numeric, exponential backoff is used.""" + mock_response_429 = Mock() + mock_response_429.headers = {"X-RateLimit-Reset": "not-a-number"} + mock_response_429.read.return_value = b"Rate limited" + mock_response_429.status = 429 + mock_response_429.reason = "Too Many Requests" + + mock_response_ok = Mock() + mock_response_ok.headers = {"Content-Length": 14} + mock_response_ok.read.return_value = b'{"answer": 42}' + mock_response_ok.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.side_effect = [mock_response_429, mock_response_ok] + mock_https_connection.return_value = mock_conn + + connector = BackendConnector(url="https://api.example.com") + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=Mock()) + + assert result.error_type is None + mock_uniform.assert_called_once() + mock_sleep.assert_called_once_with(0.5) + @patch("http.client.HTTPSConnection") @patch("uuid.uuid4") def test_post_files_multiple_files(self, mock_uuid: Mock, mock_https_connection: Mock) -> None: diff --git a/tests/testing/internal/test_telemetry.py b/tests/testing/internal/test_telemetry.py index 1986b288162..788d117193f 100644 --- a/tests/testing/internal/test_telemetry.py +++ b/tests/testing/internal/test_telemetry.py @@ -26,9 +26,9 @@ def telemetry_api() -> t.Generator[TelemetryAPI, None, None]: api = TelemetryAPI(connector_setup=Mock()) mock_writer = Mock() - api.writer = mock_writer + api.writer = mock_writer # type: ignore[assignment] - yield api + yield api # type: ignore[misc] class TestTelemetry: @@ -91,6 +91,32 @@ def test_record_request_without_response_bytes(self, telemetry_api: TelemetryAPI call(CIVISIBILITY, "known_tests.request_ms", 1.41, ()), ] + def test_record_request_rate_limited_maps_to_4xx(self, telemetry_api: TelemetryAPI) -> None: + """RATE_LIMITED is emitted as status_code_4xx_response for cross-language consistency.""" + request_telemetry = telemetry_api.with_request_metric_names( + count="known_tests.request", + duration="known_tests.request_ms", + response_bytes="known_tests.response_bytes", + error="known_tests.request_errors", + ) + + request_telemetry.record_request( + seconds=1.41, + response_bytes=42, + compressed_response=False, + error=ErrorType.RATE_LIMITED, + ) + + assert telemetry_api.writer.add_count_metric.call_args_list == [ + call(CIVISIBILITY, "known_tests.request", 1, ()), + call( + CIVISIBILITY, + "known_tests.request_errors", + 1, + (("error_type", ErrorType.CODE_4XX.value),), + ), + ] + def test_record_request_without_error(self, telemetry_api: TelemetryAPI) -> None: request_telemetry = telemetry_api.with_request_metric_names( count="known_tests.request", @@ -315,6 +341,7 @@ def test_record_event_payload_ok(self, telemetry_api: TelemetryAPI) -> None: (ErrorType.NETWORK, "network"), (ErrorType.CODE_4XX, "status_code"), (ErrorType.CODE_5XX, "status_code"), + (ErrorType.RATE_LIMITED, "status_code"), (ErrorType.BAD_JSON, "network"), (ErrorType.UNKNOWN, "network"), ],