diff --git a/ddtrace/internal/ci_visibility/encoder.py b/ddtrace/internal/ci_visibility/encoder.py index 255ce3cee2d..8890a61c0e4 100644 --- a/ddtrace/internal/ci_visibility/encoder.py +++ b/ddtrace/internal/ci_visibility/encoder.py @@ -32,6 +32,7 @@ from typing import Dict # noqa:F401 from typing import List # noqa:F401 from typing import Optional # noqa:F401 + from typing import Tuple # noqa:F401 from ddtrace._trace.span import Span # noqa:F401 @@ -42,13 +43,15 @@ class CIVisibilityEncoderV01(BufferedEncoder): TEST_SUITE_EVENT_VERSION = 1 TEST_EVENT_VERSION = 2 ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE + _MAX_PAYLOAD_SIZE = 5 * 1024 * 1024 # 5MB def __init__(self, *args): # DEV: args are not used here, but are used by BufferedEncoder's __cinit__() method, # which is called implicitly by Cython. super(CIVisibilityEncoderV01, self).__init__() + self._metadata = {} # type: Dict[str, Dict[str, str]] self._lock = threading.RLock() - self._metadata = {} + self._is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None self._init_buffer() def __len__(self): @@ -68,16 +71,18 @@ def put(self, spans): self.buffer.append(spans) def encode_traces(self, traces): - return self._build_payload(traces=traces) + return self._build_payload(traces=traces)[0] def encode(self): with self._lock: + if not self.buffer: + return None, 0 with StopWatch() as sw: - payload = self._build_payload(self.buffer) + payload, count = self._build_payload(self.buffer) record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed()) - buffer_size = len(self.buffer) - self._init_buffer() - return payload, buffer_size + if count: + self.buffer = self.buffer[count:] + return payload, count def _get_parent_session(self, traces): for trace in traces: @@ -87,29 +92,65 @@ def _get_parent_session(self, traces): return 0 def _build_payload(self, traces): - new_parent_session_span_id = self._get_parent_session(traces) - is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None - normalized_spans = [ - self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id) - for trace in traces - for span in trace - if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE) - ] - if not normalized_spans: - return None - record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans)) + # type: (List[List[Span]]) -> Tuple[Optional[bytes], int] + if not traces: + return None, 0 - # TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size. + new_parent_session_span_id = self._get_parent_session(traces) + return self._send_all_or_half_spans(traces, new_parent_session_span_id) + + def _send_all_or_half_spans(self, traces, new_parent_session_span_id): + # Convert all traces to spans with filtering + all_spans_with_trace_info = self._convert_traces_to_spans(traces, new_parent_session_span_id) + total_traces = len(traces) + + # Get all spans (flattened) + all_spans = [span for _, trace_spans in all_spans_with_trace_info for span in trace_spans] + + if not all_spans: + log.debug("No spans to encode after filtering, returning empty payload") + return None, total_traces + + # Try to fit all spans first (optimistic case) + payload = self._create_payload_from_spans(all_spans) + if len(payload) <= self._MAX_PAYLOAD_SIZE or total_traces <= 1: + record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(all_spans)) + return payload, total_traces + + mid = (total_traces + 1) // 2 + return self._send_all_or_half_spans(traces[:mid], new_parent_session_span_id) + + def _convert_traces_to_spans(self, traces, new_parent_session_span_id): + # type: (List[List[Span]], Optional[int]) -> List[Tuple[int, List[Dict[str, Any]]]] + """Convert all traces to spans with xdist filtering applied.""" + all_spans_with_trace_info = [] + for trace_idx, trace in enumerate(traces): + trace_spans = [ + self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id) + for span in trace + if self._is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE + ] + all_spans_with_trace_info.append((trace_idx, trace_spans)) + + return all_spans_with_trace_info + + def _create_payload_from_spans(self, spans): + # type: (List[Dict[str, Any]]) -> bytes + """Create a payload from the given spans.""" return CIVisibilityEncoderV01._pack_payload( - {"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans} + { + "version": self.PAYLOAD_FORMAT_VERSION, + "metadata": self._metadata, + "events": spans, + } ) @staticmethod def _pack_payload(payload): return msgpack_packb(payload) - def _convert_span(self, span, dd_origin, new_parent_session_span_id=0): - # type: (Span, str, Optional[int]) -> Dict[str, Any] + def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0): + # type: (Span, Optional[str], Optional[int]) -> Dict[str, Any] sp = JSONEncoderV2._span_to_dict(span) sp = JSONEncoderV2._normalize_span(sp) sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type @@ -218,7 +259,7 @@ def _build_body(self, data): def _build_data(self, traces): # type: (List[List[Span]]) -> Optional[bytes] normalized_covs = [ - self._convert_span(span, "") + self._convert_span(span) for trace in traces for span in trace if (COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None) @@ -230,14 +271,14 @@ def _build_data(self, traces): return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs}) def _build_payload(self, traces): - # type: (List[List[Span]]) -> Optional[bytes] + # type: (List[List[Span]]) -> Tuple[Optional[bytes], int] data = self._build_data(traces) if not data: - return None - return b"\r\n".join(self._build_body(data)) + return None, 0 + return b"\r\n".join(self._build_body(data)), len(data) - def _convert_span(self, span, dd_origin, new_parent_session_span_id=0): - # type: (Span, str, Optional[int]) -> Dict[str, Any] + def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0): + # type: (Span, Optional[str], Optional[int]) -> Dict[str, Any] # DEV: new_parent_session_span_id is unused here, but it is used in super class files: Dict[str, Any] = {} diff --git a/ddtrace/internal/writer/writer.py b/ddtrace/internal/writer/writer.py index 1f78a1e48e3..7afb343bf85 100644 --- a/ddtrace/internal/writer/writer.py +++ b/ddtrace/internal/writer/writer.py @@ -364,7 +364,8 @@ def _write_with_client(self, client, spans=None): def flush_queue(self, raise_exc: bool = False): try: for client in self._clients: - self._flush_queue_with_client(client, raise_exc=raise_exc) + while len(client.encoder) > 0: + self._flush_queue_with_client(client, raise_exc=raise_exc) finally: self._set_drop_rate() diff --git a/releasenotes/notes/fix-ci-visibility-payload-splitting-11d5f9d69ad25a15.yaml b/releasenotes/notes/fix-ci-visibility-payload-splitting-11d5f9d69ad25a15.yaml new file mode 100644 index 00000000000..6e3edda5986 --- /dev/null +++ b/releasenotes/notes/fix-ci-visibility-payload-splitting-11d5f9d69ad25a15.yaml @@ -0,0 +1,3 @@ +fixes: + - | + CI Visibility: This PR fixes an issue where payloads exceeding 5MB could fail to be sent to the intake. Payloads are now automatically split if they exceed the size limit. diff --git a/tests/ci_visibility/test_ci_visibility.py b/tests/ci_visibility/test_ci_visibility.py index a577b2a25d3..6b39c1a7f9a 100644 --- a/tests/ci_visibility/test_ci_visibility.py +++ b/tests/ci_visibility/test_ci_visibility.py @@ -1417,7 +1417,7 @@ def tearDown(self): def assert_test_session_name(self, name): """Check that the payload metadata contains the test session name attributes.""" payload = msgpack.loads( - CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]]) + CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0] ) assert payload["metadata"]["test_session_end"] == {"test_session.name": name} assert payload["metadata"]["test_suite_end"] == {"test_session.name": name} @@ -1493,7 +1493,7 @@ def test_set_library_capabilities(self): ) payload = msgpack.loads( - CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]]) + CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0] ) assert payload["metadata"]["test"] == { "_dd.library_capabilities.early_flake_detection": "1", diff --git a/tests/ci_visibility/test_encoder.py b/tests/ci_visibility/test_encoder.py index 1c466af16de..18ddc7f7b0e 100644 --- a/tests/ci_visibility/test_encoder.py +++ b/tests/ci_visibility/test_encoder.py @@ -678,3 +678,299 @@ def test_full_encoding_with_parent_session_override(): # Both should use the parent session ID (0xAAAAAA) instead of worker session ID assert test_event[b"content"][b"test_session_id"] == 0xAAAAAA assert session_event[b"content"][b"test_session_id"] == 0xAAAAAA + + +def test_payload_size_splitting_under_limit(monkeypatch): + """Test that payloads under the limit are not split""" + # Mock the payload size limit to a small value for testing + monkeypatch.setattr(CIVisibilityEncoderV01, "_MAX_PAYLOAD_SIZE", 10000) # 10KB + + # Create small traces that won't exceed the limit + traces = [] + for i in range(3): + trace = [ + Span(name=f"test.span.{i}", span_id=0xAAAAAA + i, service="test"), + ] + trace[0].set_tag_str("type", "test") + traces.append(trace) + + encoder = CIVisibilityEncoderV01(0, 0) + encoder.set_metadata("*", {"language": "python"}) + for trace in traces: + encoder.put(trace) + + payload, count = encoder.encode() + + # All traces should be processed in one payload + assert count == 3 + assert payload is not None + assert len(encoder) == 0 # Buffer should be empty + + +def test_payload_size_splitting_over_limit(monkeypatch): + """Test that payloads over the limit are split appropriately""" + # Mock the payload size limit to a small value for testing + monkeypatch.setattr(CIVisibilityEncoderV01, "_MAX_PAYLOAD_SIZE", 2000) # 2KB + + # Create traces that will exceed the limit + # Each trace will have metadata to increase payload size + traces = [] + medium_metadata = "x" * 500 # 500 bytes of data per trace + + for i in range(8): # Should trigger splitting + trace = [ + Span(name=f"test.span.{i}", span_id=0xAAAAAA + i, service="test"), + ] + trace[0].set_tag_str("type", "test") + trace[0].set_tag_str("data", medium_metadata) + traces.append(trace) + + encoder = CIVisibilityEncoderV01(0, 0) + encoder.set_metadata("*", {"language": "python"}) + for trace in traces: + encoder.put(trace) + + # First encode should return some traces but not all + payload1, count1 = encoder.encode() + + assert payload1 is not None + assert count1 > 0 + assert count1 < 8 # Should not process all traces due to size limit + assert len(encoder) > 0 # Buffer should have remaining traces + + # Second encode should return remaining traces + payload2, count2 = encoder.encode() + + if payload2 is not None: + assert count2 > 0 + assert count1 + count2 <= 8 # Total processed should not exceed input + + # Eventually all traces should be processed + total_processed = count1 + (count2 if count2 else 0) + remaining_traces = 8 - total_processed + + if remaining_traces > 0: + payload3, count3 = encoder.encode() + if payload3 is not None: + total_processed += count3 + + +def test_payload_size_splitting_single_large_trace(monkeypatch): + """Test that a single trace larger than the limit is still processed""" + # Mock the payload size limit to a small value for testing + monkeypatch.setattr(CIVisibilityEncoderV01, "_MAX_PAYLOAD_SIZE", 1000) # 1KB + + # Create one trace that exceeds the limit by itself + large_metadata = "x" * 2000 # 2KB of data (exceeds 1KB limit) + + trace = [ + Span(name="test.large.span", span_id=0xAAAAAA, service="test"), + ] + trace[0].set_tag_str("type", "test") + trace[0].set_tag_str("large_data", large_metadata) + + encoder = CIVisibilityEncoderV01(0, 0) + encoder.set_metadata("*", {"language": "python"}) + encoder.put(trace) + + payload, count = encoder.encode() + + # Even though it exceeds the limit, it should still be processed + assert count == 1 + assert payload is not None + assert len(encoder) == 0 # Buffer should be empty + + +def test_payload_size_splitting_incremental_processing(monkeypatch): + """Test that encoder processes traces incrementally when splitting""" + # Mock the payload size limit to a small value for testing + monkeypatch.setattr(CIVisibilityEncoderV01, "_MAX_PAYLOAD_SIZE", 3000) # 3KB + + # Create multiple medium-sized traces + traces = [] + medium_metadata = "x" * 400 # 400 bytes per trace + + for i in range(10): # Should trigger splitting + trace = [ + Span(name=f"test.span.{i}", span_id=0xAAAAAA + i, service="test"), + ] + trace[0].set_tag_str("type", "test") + trace[0].set_tag_str("medium_data", medium_metadata) + traces.append(trace) + + encoder = CIVisibilityEncoderV01(0, 0) + encoder.set_metadata("*", {"language": "python"}) + for trace in traces: + encoder.put(trace) + + total_processed = 0 + iterations = 0 + max_iterations = 5 # Safety limit to avoid infinite loops + + while len(encoder) > 0 and iterations < max_iterations: + payload, count = encoder.encode() + if count == 0: + break + total_processed += count + iterations += 1 + + assert payload is not None + assert count > 0 + + # All traces should eventually be processed + assert total_processed == 10 + assert len(encoder) == 0 + + +def test_payload_size_splitting_empty_traces_handling(monkeypatch): + """Test that empty traces are handled correctly during splitting""" + # Mock the payload size limit to a small value for testing + monkeypatch.setattr(CIVisibilityEncoderV01, "_MAX_PAYLOAD_SIZE", 2000) # 2KB + + # Mix of empty traces and traces with data + traces = [] + metadata = "x" * 300 # 300 bytes per non-empty trace + + for i in range(8): + if i % 2 == 0: + # Empty trace + traces.append([]) + else: + # Trace with data + trace = [ + Span(name=f"test.span.{i}", span_id=0xAAAAAA + i, service="test"), + ] + trace[0].set_tag_str("type", "test") + trace[0].set_tag_str("data", metadata) + traces.append(trace) + + encoder = CIVisibilityEncoderV01(0, 0) + encoder.set_metadata("*", {"language": "python"}) + for trace in traces: + encoder.put(trace) + + payload, count = encoder.encode() + + # Should process all traces (empty traces are counted but don't contribute to payload) + assert count > 0 + assert payload is not None or count == len([t for t in traces if not t]) # All empty traces case + + +def test_payload_size_splitting_with_xdist_filtering(monkeypatch): + """Test payload splitting works correctly with xdist session filtering""" + # Mock the payload size limit to a small value for testing + monkeypatch.setattr(CIVisibilityEncoderV01, "_MAX_PAYLOAD_SIZE", 2000) # 2KB + + # Create traces with session spans that will be filtered in xdist worker mode + traces = [] + metadata = "x" * 300 # 300 bytes per trace + + for i in range(6): + session_span = Span(name=f"test.session.{i}", span_id=0xAAAAAA + i, service="test") + session_span.set_tag(EVENT_TYPE, SESSION_TYPE) + session_span.set_tag_str("data", metadata) + + test_span = Span(name=f"test.case.{i}", span_id=0xBBBBBB + i, service="test", span_type="test") + test_span.set_tag(EVENT_TYPE, "test") + test_span.set_tag_str("data", metadata) + + traces.append([session_span, test_span]) + + # Mock xdist worker environment + import os + + original_env = os.getenv("PYTEST_XDIST_WORKER") + os.environ["PYTEST_XDIST_WORKER"] = "gw0" + + try: + encoder = CIVisibilityEncoderV01(0, 0) + encoder.set_metadata("*", {"language": "python"}) + for trace in traces: + encoder.put(trace) + + payload, count = encoder.encode() + + # Should process traces with session spans filtered out + assert count > 0 + assert payload is not None + + if payload: + decoded = msgpack.unpackb(payload, raw=True, strict_map_key=False) + events = decoded[b"events"] + # Should only have test events, no session events + for event in events: + assert event[b"type"] != b"session" + + finally: + # Restore original environment + if original_env is None: + os.environ.pop("PYTEST_XDIST_WORKER", None) + else: + os.environ["PYTEST_XDIST_WORKER"] = original_env + + +def test_payload_size_max_payload_constant(): + """Test that the _MAX_PAYLOAD_SIZE constant is properly defined""" + encoder = CIVisibilityEncoderV01(0, 0) + + # Should be 5MB + assert hasattr(encoder, "_MAX_PAYLOAD_SIZE") + assert encoder._MAX_PAYLOAD_SIZE == 5 * 1024 * 1024 + + +def test_payload_size_splitting_with_multiple_encode_calls(monkeypatch): + """Test that multiple encode calls work correctly with payload splitting""" + # Mock the payload size limit to a small value for testing + monkeypatch.setattr(CIVisibilityEncoderV01, "_MAX_PAYLOAD_SIZE", 1500) # 1.5KB + + # Create traces that will exceed the limit and require splitting + traces = [] + metadata = "x" * 400 # 400 bytes per trace + + for i in range(6): # Should trigger splitting + trace = [ + Span(name=f"test.span.{i}", span_id=0xAAAAAA + i, service="test"), + ] + trace[0].set_tag_str("type", "test") + trace[0].set_tag_str("data", metadata) + traces.append(trace) + + encoder = CIVisibilityEncoderV01(0, 0) + encoder.set_metadata("*", {"language": "python"}) + + # Add traces to encoder + for trace in traces: + encoder.put(trace) + + # Verify that encoder has traces + assert len(encoder) == 6 + + # Track all payloads generated + payloads = [] + total_processed = 0 + + # Keep calling encode until all traces are processed + while len(encoder) > 0: + payload, count = encoder.encode() + if count == 0: + break + total_processed += count + if payload: + payloads.append(payload) + + # Verify that multiple payloads were created due to splitting + assert len(payloads) >= 2 # Should have at least 2 payloads due to splitting + + # Verify that all traces were eventually processed + assert total_processed == 6 + + # Verify that encoder buffer is empty after processing + assert len(encoder) == 0 + + # Verify that all payloads are valid msgpack + for payload in payloads: + assert payload is not None + assert isinstance(payload, bytes) + decoded = msgpack.unpackb(payload, raw=True, strict_map_key=False) + assert b"events" in decoded + assert len(decoded[b"events"]) > 0 diff --git a/tests/ci_visibility/test_is_user_provided_service.py b/tests/ci_visibility/test_is_user_provided_service.py index 7a93aff3d1d..2318153196e 100644 --- a/tests/ci_visibility/test_is_user_provided_service.py +++ b/tests/ci_visibility/test_is_user_provided_service.py @@ -27,7 +27,7 @@ def test_is_user_provided_service_false(self): class IsUserProvidedServiceTestTagTestCase(SubprocessTestCase): def assert_is_user_provided_service_equals(self, value): payload = msgpack.loads( - CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]]) + CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0] ) assert payload["metadata"]["*"]["_dd.test.is_user_provided_service"] == value diff --git a/tests/utils.py b/tests/utils.py index 1327beaaff6..b69047f8cca 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -622,7 +622,7 @@ def write(self, spans=None): CIVisibilityWriter.write(self, spans=spans) # take a snapshot of the writer buffer for tests to inspect if spans: - self._encoded = self._encoder._build_payload([spans]) + self._encoded = self._encoder._build_payload([spans])[0] class DummyTracer(Tracer):