Skip to content

chore(ci_visibility): split payload in chunks if needed #13825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 103 additions & 26 deletions ddtrace/internal/ci_visibility/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401
from typing import Tuple # noqa:F401

from ddtrace._trace.span import Span # noqa:F401

Expand All @@ -42,13 +43,15 @@ class CIVisibilityEncoderV01(BufferedEncoder):
TEST_SUITE_EVENT_VERSION = 1
TEST_EVENT_VERSION = 2
ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE
_MAX_PAYLOAD_SIZE = 5 * 1024 * 1024 # 5MB

def __init__(self, *args):
# DEV: args are not used here, but are used by BufferedEncoder's __cinit__() method,
# which is called implicitly by Cython.
super(CIVisibilityEncoderV01, self).__init__()
self._metadata = {} # type: Dict[str, Dict[str, str]]
self._lock = threading.RLock()
self._metadata = {}
self._is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
self._init_buffer()

def __len__(self):
Expand All @@ -68,16 +71,18 @@ def put(self, spans):
self.buffer.append(spans)

def encode_traces(self, traces):
return self._build_payload(traces=traces)
return self._build_payload(traces=traces)[0]

def encode(self):
with self._lock:
if not self.buffer:
return None, 0
with StopWatch() as sw:
payload = self._build_payload(self.buffer)
payload, count = self._build_payload(self.buffer)
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
buffer_size = len(self.buffer)
self._init_buffer()
return payload, buffer_size
if count:
self.buffer = self.buffer[count:]
return payload, count

def _get_parent_session(self, traces):
for trace in traces:
Expand All @@ -87,29 +92,101 @@ def _get_parent_session(self, traces):
return 0

def _build_payload(self, traces):
# type: (List[List[Span]]) -> Tuple[Optional[bytes], int]
new_parent_session_span_id = self._get_parent_session(traces)
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
normalized_spans = [
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
for trace in traces
for span in trace
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
]
if not normalized_spans:
return None
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))

# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
# Convert all traces to spans with filtering
all_spans_with_trace_info = self._convert_traces_to_spans(traces, new_parent_session_span_id)
total_traces = len(traces)

# Get all spans (flattened)
all_spans = [span for _, trace_spans in all_spans_with_trace_info for span in trace_spans]

if not all_spans:
log.debug("No spans to encode after filtering, returning empty payload")
return None, total_traces

# Try to fit all spans first (optimistic case)
payload = self._create_payload_from_spans(all_spans)
if len(payload) <= self._MAX_PAYLOAD_SIZE:
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(all_spans))
return payload, total_traces

# Payload is too big, use binary search to find the maximum number of traces that fit
best_traces_count, best_spans = self._find_max_traces_that_fit(traces, all_spans_with_trace_info)

record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(best_spans))
return self._create_payload_from_spans(best_spans), best_traces_count

def _convert_traces_to_spans(self, traces, new_parent_session_span_id):
# type: (List[List[Span]], Optional[int]) -> List[Tuple[int, List[Dict[str, Any]]]]
"""Convert all traces to spans with xdist filtering applied."""
all_spans_with_trace_info = []
for trace_idx, trace in enumerate(traces):
trace_spans = [
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
for span in trace
if self._is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE
]
all_spans_with_trace_info.append((trace_idx, trace_spans))

return all_spans_with_trace_info

def _create_payload_from_spans(self, spans):
# type: (List[Dict[str, Any]]) -> bytes
"""Create a payload from the given spans."""
return CIVisibilityEncoderV01._pack_payload(
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
{
"version": self.PAYLOAD_FORMAT_VERSION,
"metadata": self._metadata,
"events": spans,
}
)

def _find_max_traces_that_fit(self, traces, all_spans_with_trace_info):
# type: (List[List[Span]], List[Tuple[int, List[Dict[str, Any]]]]) -> Tuple[int, List[Dict[str, Any]]]
"""Use binary search to find the maximum number of traces that fit within the payload size limit."""
left, right = 1, len(traces)
best_traces_count = 1 # At minimum, include 1 trace to avoid infinite loops
best_spans = []

while left <= right:
mid = (left + right) // 2
spans_subset = self._get_spans_for_traces(all_spans_with_trace_info, mid)

if not spans_subset:
# No spans in this subset, try with more traces
left = mid + 1
continue

test_payload = self._create_payload_from_spans(spans_subset)

if len(test_payload) <= self._MAX_PAYLOAD_SIZE:
# This fits, try with more traces
best_traces_count = mid
best_spans = spans_subset
left = mid + 1
else:
# This is too big, try with fewer traces
right = mid - 1

return best_traces_count, best_spans

def _get_spans_for_traces(self, all_spans_with_trace_info, num_traces):
# type: (List[Tuple[int, List[Dict[str, Any]]]], int) -> List[Dict[str, Any]]
"""Get all spans for the first num_traces traces."""
spans_subset = []
for i in range(num_traces):
_, trace_spans = all_spans_with_trace_info[i]
spans_subset.extend(trace_spans)
return spans_subset

@staticmethod
def _pack_payload(payload):
return msgpack_packb(payload)

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, str, Optional[int]) -> Dict[str, Any]
def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0):
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
sp = JSONEncoderV2._span_to_dict(span)
sp = JSONEncoderV2._normalize_span(sp)
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
Expand Down Expand Up @@ -218,7 +295,7 @@ def _build_body(self, data):
def _build_data(self, traces):
# type: (List[List[Span]]) -> Optional[bytes]
normalized_covs = [
self._convert_span(span, "")
self._convert_span(span)
for trace in traces
for span in trace
if (COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None)
Expand All @@ -230,14 +307,14 @@ def _build_data(self, traces):
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})

def _build_payload(self, traces):
# type: (List[List[Span]]) -> Optional[bytes]
# type: (List[List[Span]]) -> Tuple[Optional[bytes], int]
data = self._build_data(traces)
if not data:
return None
return b"\r\n".join(self._build_body(data))
return None, 0
return b"\r\n".join(self._build_body(data)), len(data)

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, str, Optional[int]) -> Dict[str, Any]
def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0):
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
# DEV: new_parent_session_span_id is unused here, but it is used in super class
files: Dict[str, Any] = {}

Expand Down
4 changes: 2 additions & 2 deletions tests/ci_visibility/test_ci_visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ def tearDown(self):
def assert_test_session_name(self, name):
"""Check that the payload metadata contains the test session name attributes."""
payload = msgpack.loads(
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0]
)
assert payload["metadata"]["test_session_end"] == {"test_session.name": name}
assert payload["metadata"]["test_suite_end"] == {"test_session.name": name}
Expand Down Expand Up @@ -1493,7 +1493,7 @@ def test_set_library_capabilities(self):
)

payload = msgpack.loads(
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0]
)
assert payload["metadata"]["test"] == {
"_dd.library_capabilities.early_flake_detection": "1",
Expand Down
Loading
Loading