diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 8a97b7afc..16fb06132 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -1,9 +1,10 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. +import logging import os import re -from logging import NOTSET, Logger, getLogger +from logging import Logger, getLogger from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union from importlib_metadata import version @@ -22,12 +23,18 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder + +# pylint: disable=line-too-long +from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import ( + AwsCloudWatchOtlpBatchLogRecordProcessor, +) from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView +from opentelemetry._events import set_event_logger_provider from opentelemetry._logs import get_logger_provider, set_logger_provider from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter @@ -42,7 +49,9 @@ _import_id_generator, _import_sampler, _OTelSDKConfigurator, + _patch_basic_config, ) +from opentelemetry.sdk._events import EventLoggerProvider from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter from opentelemetry.sdk.environment_variables import ( @@ -197,26 +206,28 @@ def _initialize_components(): def _init_logging( - exporters: Dict[str, Type[LogExporter]], - resource: Resource = None, + exporters: dict[str, Type[LogExporter]], + resource: Optional[Resource] = None, + setup_logging_handler: bool = True, ): - - # Provides a default OTLP log exporter when none is specified. - # This is the behavior for the logs exporters for other languages. - if not exporters: - exporters = {"otlp": OTLPLogExporter} - provider = LoggerProvider(resource=resource) set_logger_provider(provider) for _, exporter_class in exporters.items(): - exporter_args: Dict[str, any] = {} - log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource) - provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) + exporter_args = {} + log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args)) + log_processor = _customize_log_record_processor(log_exporter) + provider.add_log_record_processor(log_processor) + + event_logger_provider = EventLoggerProvider(logger_provider=provider) + set_event_logger_provider(event_logger_provider) - handler = LoggingHandler(level=NOTSET, logger_provider=provider) + if setup_logging_handler: + _patch_basic_config() - getLogger().addHandler(handler) + # Add OTel handler + handler = LoggingHandler(level=logging.NOTSET, logger_provider=provider) + logging.getLogger().addHandler(handler) def _init_tracing( @@ -417,7 +428,14 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build() -def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter: +def _customize_log_record_processor(log_exporter: LogExporter): + if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled(): + return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter) + + return BatchLogRecordProcessor(exporter=log_exporter) + + +def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter: logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) if _is_aws_otlp_endpoint(logs_endpoint, "logs"): @@ -586,7 +604,7 @@ def _is_lambda_environment(): return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ -def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool: +def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool: """Is the given endpoint an AWS OTLP endpoint?""" pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/_aws_cw_otlp_batch_log_record_processor.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/_aws_cw_otlp_batch_log_record_processor.py new file mode 100644 index 000000000..5ad3c2c8e --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/_aws_cw_otlp_batch_log_record_processor.py @@ -0,0 +1,249 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. + +import logging +from typing import Mapping, Optional, Sequence, cast + +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.util.types import AnyValue + +_logger = logging.getLogger(__name__) + + +class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor): + """ + Custom implementation of BatchLogRecordProcessor that manages log record batching + with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits. + + This processor still exports all logs up to _MAX_LOG_REQUEST_BYTE_SIZE but rather than doing exactly + one export, we will estimate log sizes and do multiple batch exports + where each exported batch will have an additional constraint: + + If the batch to be exported will have a data size of > 1 MB: + The batch will be split into multiple exports of sub-batches of data size <= 1 MB. + + A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it. + """ + + # OTel log events include fixed metadata attributes so the estimated metadata size + # possibly be calculated as this with best efforts: + # service.name (255 chars) + cloud.resource_id (max ARN length) + telemetry.xxx (~20 chars) + + # common attributes (255 chars) + + # scope + flags + traceId + spanId + numeric/timestamp fields + ... + # Example log structure: + # { + # "resource": { + # "attributes": { + # "aws.local.service": "example-service123", + # "telemetry.sdk.language": "python", + # "service.name": "my-application", + # "cloud.resource_id": "example-resource", + # "aws.log.group.names": "example-log-group", + # "aws.ai.agent.type": "default", + # "telemetry.sdk.version": "1.x.x", + # "telemetry.auto.version": "0.x.x", + # "telemetry.sdk.name": "opentelemetry" + # } + # }, + # "scope": {"name": "example.instrumentation.library"}, + # "timeUnixNano": 1234567890123456789, + # "observedTimeUnixNano": 1234567890987654321, + # "severityNumber": 9, + # "body": {...}, + # "attributes": {...}, + # "flags": 1, + # "traceId": "abcd1234efgh5678ijkl9012mnop3456", + # "spanId": "1234abcd5678efgh" + # } + # 2000 might be a bit of an overestimate but it's better to overestimate the size of the log + # and suffer a small performance impact with batching than it is to underestimate and risk + # a large log being dropped when sent to the AWS otlp endpoint. + _BASE_LOG_BUFFER_BYTE_SIZE = 2000 + + _MAX_LOG_REQUEST_BYTE_SIZE = ( + 1048576 # Maximum uncompressed/unserialized bytes / request - + # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html + ) + + def __init__( + self, + exporter: OTLPAwsLogExporter, + schedule_delay_millis: Optional[float] = None, + max_export_batch_size: Optional[int] = None, + export_timeout_millis: Optional[float] = None, + max_queue_size: Optional[int] = None, + ): + + super().__init__( + exporter=exporter, + schedule_delay_millis=schedule_delay_millis, + max_export_batch_size=max_export_batch_size, + export_timeout_millis=export_timeout_millis, + max_queue_size=max_queue_size, + ) + + self._exporter = exporter + + def _export(self, batch_strategy: BatchLogExportStrategy) -> None: + """ + Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching + See: + https://github.com/open-telemetry/opentelemetry-python/blob/bb21ebd46d070c359eee286c97bdf53bfd06759d/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143 + + Preserves existing batching behavior but will intermediarly export small log batches if + the size of the data in the batch is estimated to be at or above AWS CloudWatch's + maximum request size limit of 1 MB. + + - Estimated data size of exported batches will typically be <= 1 MB except for the case below: + - If the estimated data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1 + """ + with self._export_lock: + iteration = 0 + # We could see concurrent export calls from worker and force_flush. We call _should_export_batch + # once the lock is obtained to see if we still need to make the requested export. + while self._should_export_batch(batch_strategy, iteration): + iteration += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + batch_length = min(self._max_export_batch_size, len(self._queue)) + batch_data_size = 0 + batch = [] + + for _ in range(batch_length): + log_data: LogData = self._queue.pop() + log_size = self._estimate_log_size(log_data) + + if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE): + self._exporter.export(batch) + batch_data_size = 0 + batch = [] + + batch_data_size += log_size + batch.append(log_data) + + if batch: + self._exporter.export(batch) + except Exception as exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs: %s", exception) + detach(token) + + def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: disable=too-many-branches + """ + Estimates the size in bytes of a log by calculating the size of its body and its attributes + and adding a buffer amount to account for other log metadata information. + + Features: + - Processes complex log structures up to the specified depth limit + - Includes cycle detection to prevent processing the same content more than once + - Returns truncated calculation if depth limit is exceeded + + We set depth to 3 as this is the minimum required depth to estimate our consolidated Gen AI log events: + + Example structure: + { + "output": { + "messages": [ + { + "content": "Hello, World!", + "role": "assistant" + } + ] + }, + "input": { + "messages": [ + { + "content": "Say Hello, World!", + "role": "user" + } + ] + } + } + + Args: + log: The Log object to calculate size for + depth: Maximum depth to traverse in nested structures (default: 3) + + Returns: + int: The estimated size of the log object in bytes + """ + + # Queue contains tuples of (log_content, depth) where: + # - log_content is the current piece of log data being processed + # - depth tracks how many levels deep we've traversed to reach this content + # - body starts at depth 0 since it's an AnyValue object + # - Attributes start at depth -1 since it's a Mapping[str, AnyValue] - when traversed, we will + # start processing its keys at depth 0 + queue = [(log.log_record.body, 0), (log.log_record.attributes, -1)] + + # Track visited complex log contents to avoid calculating the same one more than once + visited = set() + + size: int = self._BASE_LOG_BUFFER_BYTE_SIZE + + while queue: + new_queue = [] + + for data in queue: + # small optimization, can stop calculating the size once it reaches the 1 MB limit. + if size >= self._MAX_LOG_REQUEST_BYTE_SIZE: + return size + + next_val, current_depth = data + + if next_val is None: + continue + + if isinstance(next_val, bytes): + size += len(next_val) + continue + + if isinstance(next_val, (str, float, int, bool)): + size += AwsCloudWatchOtlpBatchLogRecordProcessor._estimate_utf8_size(str(next_val)) + continue + + # next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"] + # See: https://github.com/open-telemetry/opentelemetry-python/blob/\ + # 9426d6da834cfb4df7daedd4426bba0aa83165b5/opentelemetry-api/src/opentelemetry/util/types.py#L20 + if current_depth <= depth: + obj_id = id( + next_val + ) # Guaranteed to be unique, see: https://www.w3schools.com/python/ref_func_id.asp + if obj_id in visited: + continue + visited.add(obj_id) + + if isinstance(next_val, Sequence): + for content in next_val: + new_queue.append((cast(AnyValue, content), current_depth + 1)) + + if isinstance(next_val, Mapping): + for key, content in next_val.items(): + size += len(key) + new_queue.append((content, current_depth + 1)) + else: + _logger.debug( + "Max log depth of %s exceeded. Log data size will not be accurately calculated.", depth + ) + + queue = new_queue + + return size + + @staticmethod + def _estimate_utf8_size(s: str): + ascii_count = 0 + non_ascii_count = 0 + + for char in s: + if ord(char) < 128: + ascii_count += 1 + else: + non_ascii_count += 1 + + # Estimate: ASCII chars (1 byte) + upper bound of non-ASCII chars 4 bytes + return ascii_count + (non_ascii_count * 4) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py index 048632c06..16a976d54 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py @@ -1,14 +1,41 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. -from typing import Dict, Optional +import gzip +import logging +import random +from io import BytesIO +from threading import Event +from time import time +from typing import Dict, Optional, Sequence + +from requests import Response +from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.structures import CaseInsensitiveDict from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession +from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExportResult + +_logger = logging.getLogger(__name__) +_MAX_RETRYS = 6 class OTLPAwsLogExporter(OTLPLogExporter): + """ + This exporter extends the functionality of the OTLPLogExporter to allow logs to be exported + to the CloudWatch Logs OTLP endpoint https://logs.[AWSRegion].amazonaws.com/v1/logs. Utilizes the aws-sdk + library to sign and directly inject SigV4 Authentication to the exported request's headers. + + See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html + """ + + _RETRY_AFTER_HEADER = "Retry-After" # See: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + def __init__( self, endpoint: Optional[str] = None, @@ -34,3 +61,124 @@ def __init__( compression=Compression.Gzip, session=AwsAuthSession(aws_region=self._aws_region, service="logs"), ) + self._shutdown_event = Event() + + def export(self, batch: Sequence[LogData]) -> LogExportResult: + """ + Exports log batch with AWS-specific enhancements over the base OTLPLogExporter. + + Key differences from upstream OTLPLogExporter: + 1. Respects Retry-After header from server responses for proper throttling + 2. Treats HTTP 429 (Too Many Requests) as a retryable exception + 3. Always compresses data with gzip before sending + + Upstream implementation does not support Retry-After header: + https://github.com/open-telemetry/opentelemetry-python/blob/acae2c232b101d3e447a82a7161355d66aa06fa2/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167 + """ + + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return LogExportResult.FAILURE + + serialized_data = encode_logs(batch).SerializeToString() + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + data = gzip_data.getvalue() + + deadline_sec = time() + self._timeout + retry_num = 0 + + # This loop will eventually terminate because: + # 1) The export request will eventually either succeed or fail permanently + # 2) Maximum retries (_MAX_RETRYS = 6) will be reached + # 3) Deadline timeout will be exceeded + # 4) Non-retryable errors (4xx except 429) immediately exit the loop + while True: + resp = self._send(data, deadline_sec - time()) + + if resp.ok: + return LogExportResult.SUCCESS + + backoff_seconds = self._get_retry_delay_sec(resp.headers, retry_num) + is_retryable = self._retryable(resp) + + if not is_retryable or retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()): + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return LogExportResult.FAILURE + + _logger.warning( + "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + # Use interruptible sleep that can be interrupted by shutdown + if self._shutdown_event.wait(backoff_seconds): + _logger.info("Export interrupted by shutdown") + return LogExportResult.FAILURE + + retry_num += 1 + + def shutdown(self) -> None: + """Shutdown the exporter and interrupt any ongoing waits.""" + self._shutdown_event.set() + return super().shutdown() + + def _send(self, serialized_data: bytes, timeout_sec: float): + try: + response = self._session.post( + url=self._endpoint, + data=serialized_data, + verify=self._certificate_file, + timeout=timeout_sec, + cert=self._client_cert, + ) + return response + except RequestsConnectionError: + response = self._session.post( + url=self._endpoint, + data=serialized_data, + verify=self._certificate_file, + timeout=timeout_sec, + cert=self._client_cert, + ) + return response + + @staticmethod + def _retryable(resp: Response) -> bool: + """ + Logic based on https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + """ + # See: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + + return resp.status_code in (429, 503) or OTLPLogExporter._retryable(resp) + + def _get_retry_delay_sec(self, headers: CaseInsensitiveDict, retry_num: int) -> float: + """ + Get retry delay in seconds from headers or backoff strategy. + """ + # Check for Retry-After header first, then use exponential backoff with jitter + retry_after_delay = self._parse_retryable_header(headers.get(self._RETRY_AFTER_HEADER)) + if retry_after_delay > -1: + return retry_after_delay + # multiplying by a random number between .8 and 1.2 introduces a +/-20% jitter to each backoff. + return 2**retry_num * random.uniform(0.8, 1.2) + + @staticmethod + def _parse_retryable_header(retry_header: Optional[str]) -> float: + """ + Converts the given retryable header into a delay in seconds, returns -1 if there's no header + or error with the parsing + """ + if not retry_header: + return -1 + + try: + val = float(retry_header) + return val if val >= 0 else -1 + except ValueError: + return -1 diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 7c608e885..7f44b04e4 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -18,6 +18,14 @@ class OTLPAwsSpanExporter(OTLPSpanExporter): + """ + This exporter extends the functionality of the OTLPSpanExporter to allow spans to be exported + to the XRay OTLP endpoint https://xray.[AWSRegion].amazonaws.com/v1/traces. Utilizes the + AwsAuthSession to sign and directly inject SigV4 Authentication to the exported request's headers. + + See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html + """ + def __init__( self, endpoint: Optional[str] = None, diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_auth_session.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/common/test_aws_auth_session.py similarity index 100% rename from aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_auth_session.py rename to aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/common/test_aws_auth_session.py diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_cw_otlp_batch_log_record_processor.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_cw_otlp_batch_log_record_processor.py new file mode 100644 index 000000000..2d019bce7 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_cw_otlp_batch_log_record_processor.py @@ -0,0 +1,268 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import time +import unittest +from typing import List +from unittest.mock import MagicMock, patch + +from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import ( + AwsCloudWatchOtlpBatchLogRecordProcessor, + BatchLogExportStrategy, +) +from opentelemetry._logs.severity import SeverityNumber +from opentelemetry.sdk._logs import LogData, LogRecord +from opentelemetry.sdk._logs.export import LogExportResult +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.trace import TraceFlags +from opentelemetry.util.types import AnyValue + + +class TestAwsBatchLogRecordProcessor(unittest.TestCase): + + def setUp(self): + self.mock_exporter = MagicMock() + self.mock_exporter.export.return_value = LogExportResult.SUCCESS + + self.processor = AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=self.mock_exporter) + + def test_process_log_data_nested_structure(self): + """Tests that the processor correctly handles nested structures (dict/list)""" + log_body = "X" * 400 + log_key = "test" + log_depth = 2 + + nested_dict_log = self.generate_test_log_data( + log_body=log_body, log_key=log_key, log_body_depth=log_depth, count=1, create_map=True + ) + nested_array_log = self.generate_test_log_data( + log_body=log_body, log_key=log_key, log_body_depth=log_depth, count=1, create_map=False + ) + + expected_dict_size = len(log_key) * log_depth + len(log_body) + expected_array_size = len(log_body) + + dict_size = self.processor._estimate_log_size(log=nested_dict_log[0], depth=log_depth) + array_size = self.processor._estimate_log_size(log=nested_array_log[0], depth=log_depth) + + self.assertEqual(dict_size - self.processor._BASE_LOG_BUFFER_BYTE_SIZE, expected_dict_size) + self.assertEqual(array_size - self.processor._BASE_LOG_BUFFER_BYTE_SIZE, expected_array_size) + + def test_process_log_data_with_attributes(self): + """Tests that the processor correctly handles both body and attributes""" + log_body = "test_body" + attr_key = "attr_key" + attr_value = "attr_value" + + record = LogRecord( + timestamp=int(time.time_ns()), + trace_id=0x123456789ABCDEF0123456789ABCDEF0, + span_id=0x123456789ABCDEF0, + trace_flags=TraceFlags(1), + severity_text="INFO", + severity_number=SeverityNumber.INFO, + body=log_body, + attributes={attr_key: attr_value}, + ) + log_data = LogData(log_record=record, instrumentation_scope=InstrumentationScope("test-scope", "1.0.0")) + + expected_size = len(log_body) + len(attr_key) + len(attr_value) + actual_size = self.processor._estimate_log_size(log_data) + + self.assertEqual(actual_size - self.processor._BASE_LOG_BUFFER_BYTE_SIZE, expected_size) + + def test_process_log_data_nested_structure_exceeds_depth(self): + """Tests that the processor cuts off calculation for nested structure that exceeds the depth limit""" + max_depth = 0 + calculated_body = "X" * 400 + log_body = { + "calculated": "X" * 400, + "restOfThisLogWillBeTruncated": {"truncated": {"test": "X" * self.processor._MAX_LOG_REQUEST_BYTE_SIZE}}, + } + + expected_size = self.processor._BASE_LOG_BUFFER_BYTE_SIZE + ( + len("calculated") + len(calculated_body) + len("restOfThisLogWillBeTruncated") + ) + + test_logs = self.generate_test_log_data(log_body=log_body, count=1) + + # Only calculates log size of up to depth of 0 + dict_size = self.processor._estimate_log_size(log=test_logs[0], depth=max_depth) + + self.assertEqual(dict_size, expected_size) + + def test_process_log_data_nested_structure_size_exceeds_max_log_size(self): + """Tests that the processor returns prematurely if the size already exceeds _MAX_LOG_REQUEST_BYTE_SIZE""" + # Should stop calculation at bigKey + biggerKey and not calculate the content of biggerKey + log_body = { + "bigKey": "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE), + "biggerKey": "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE * 100), + } + + expected_size = ( + self.processor._BASE_LOG_BUFFER_BYTE_SIZE + + self.processor._MAX_LOG_REQUEST_BYTE_SIZE + + len("bigKey") + + len("biggerKey") + ) + + nest_dict_log = self.generate_test_log_data(log_body=log_body, count=1, create_map=True) + nest_array_log = self.generate_test_log_data(log_body=log_body, count=1, create_map=False) + + dict_size = self.processor._estimate_log_size(log=nest_dict_log[0]) + array_size = self.processor._estimate_log_size(log=nest_array_log[0]) + + self.assertEqual(dict_size, expected_size) + self.assertEqual(array_size, expected_size) + + def test_process_log_data_primitive(self): + + primitives: List[AnyValue] = ["test", b"test", 1, 1.2, True, False, None, "深入 Python", "calfé"] + expected_sizes = [4, 4, 1, 3, 4, 5, 0, 2 * 4 + len(" Python"), 1 * 4 + len("calf")] + + for index, primitive in enumerate(primitives): + log = self.generate_test_log_data(log_body=primitive, count=1) + expected_size = self.processor._BASE_LOG_BUFFER_BYTE_SIZE + expected_sizes[index] + actual_size = self.processor._estimate_log_size(log[0]) + self.assertEqual(actual_size, expected_size) + + def test_process_log_data_with_cycle(self): + """Test that processor handles processing logs with circular references only once""" + cyclic_dict: dict = {"data": "test"} + cyclic_dict["self_ref"] = cyclic_dict + + log = self.generate_test_log_data(log_body=cyclic_dict, count=1) + expected_size = self.processor._BASE_LOG_BUFFER_BYTE_SIZE + len("data") + len("self_ref") + len("test") + actual_size = self.processor._estimate_log_size(log[0]) + self.assertEqual(actual_size, expected_size) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach", + return_value=MagicMock(), + ) + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value") + def test_export_single_batch_under_size_limit(self, _, __, ___): + """Tests that export is only called once if a single batch is under the size limit""" + log_count = 10 + log_body = "test" + test_logs = self.generate_test_log_data(log_body=log_body, count=log_count) + total_data_size = 0 + + for log in test_logs: + size = self.processor._estimate_log_size(log) + total_data_size += size + self.processor._queue.appendleft(log) + + self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL) + args, _ = self.mock_exporter.export.call_args + actual_batch = args[0] + + self.assertLess(total_data_size, self.processor._MAX_LOG_REQUEST_BYTE_SIZE) + self.assertEqual(len(self.processor._queue), 0) + self.assertEqual(len(actual_batch), log_count) + self.mock_exporter.export.assert_called_once() + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach", + return_value=MagicMock(), + ) + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value") + def test_export_single_batch_all_logs_over_size_limit(self, _, __, ___): + """Should make multiple export calls of batch size 1 to export logs of size > 1 MB.""" + + large_log_body = "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE + 1) + test_logs = self.generate_test_log_data(log_body=large_log_body, count=15) + + for log in test_logs: + self.processor._queue.appendleft(log) + + self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL) + + self.assertEqual(len(self.processor._queue), 0) + self.assertEqual(self.mock_exporter.export.call_count, len(test_logs)) + + batches = self.mock_exporter.export.call_args_list + + for batch in batches: + self.assertEqual(len(batch[0]), 1) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach", + return_value=MagicMock(), + ) + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value") + def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___): + """Should make calls to export smaller sub-batch logs""" + large_log_body = "X" * (self.processor._MAX_LOG_REQUEST_BYTE_SIZE + 1) + small_log_body = "X" * ( + self.processor._MAX_LOG_REQUEST_BYTE_SIZE // 10 - self.processor._BASE_LOG_BUFFER_BYTE_SIZE + ) + + large_logs = self.generate_test_log_data(log_body=large_log_body, count=3) + small_logs = self.generate_test_log_data(log_body=small_log_body, count=12) + + # 1st, 2nd, 3rd batch = size 1 + # 4th batch = size 10 + # 5th batch = size 2 + test_logs = large_logs + small_logs + + for log in test_logs: + self.processor._queue.appendleft(log) + + self.processor._export(batch_strategy=BatchLogExportStrategy.EXPORT_ALL) + + self.assertEqual(len(self.processor._queue), 0) + self.assertEqual(self.mock_exporter.export.call_count, 5) + + batches = self.mock_exporter.export.call_args_list + + expected_sizes = { + 0: 1, # 1st batch (index 1) should have 1 log + 1: 1, # 2nd batch (index 1) should have 1 log + 2: 1, # 3rd batch (index 2) should have 1 log + 3: 10, # 4th batch (index 3) should have 10 logs + 4: 2, # 5th batch (index 4) should have 2 logs + } + + for index, call in enumerate(batches): + batch = call[0][0] + expected_size = expected_sizes[index] + self.assertEqual(len(batch), expected_size) + + @staticmethod + def generate_test_log_data( + log_body, + log_key="key", + log_body_depth=0, + count=5, + create_map=True, + ) -> List[LogData]: + + def generate_nested_value(depth, value, create_map=True) -> AnyValue: + if depth <= 0: + return value + + if create_map: + return {log_key: generate_nested_value(depth - 1, value, True)} + + return [generate_nested_value(depth - 1, value, False)] + + logs = [] + + for _ in range(count): + record = LogRecord( + timestamp=int(time.time_ns()), + trace_id=0x123456789ABCDEF0123456789ABCDEF0, + span_id=0x123456789ABCDEF0, + trace_flags=TraceFlags(1), + severity_text="INFO", + severity_number=SeverityNumber.INFO, + body=generate_nested_value(log_body_depth, log_body, create_map), + ) + + log_data = LogData(log_record=record, instrumentation_scope=InstrumentationScope("test-scope", "1.0.0")) + logs.append(log_data) + + return logs diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py new file mode 100644 index 000000000..8623a6696 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py @@ -0,0 +1,224 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import time +from unittest import TestCase +from unittest.mock import patch + +import requests +from requests.structures import CaseInsensitiveDict + +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import _MAX_RETRYS, OTLPAwsLogExporter +from opentelemetry._logs.severity import SeverityNumber +from opentelemetry.sdk._logs import LogData, LogRecord +from opentelemetry.sdk._logs.export import LogExportResult +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.trace import TraceFlags + + +class TestOTLPAwsLogsExporter(TestCase): + _ENDPOINT = "https://logs.us-west-2.amazonaws.com/v1/logs" + good_response = requests.Response() + good_response.status_code = 200 + + non_retryable_response = requests.Response() + non_retryable_response.status_code = 404 + + retryable_response_no_header = requests.Response() + retryable_response_no_header.status_code = 429 + + retryable_response_header = requests.Response() + retryable_response_header.headers = CaseInsensitiveDict({"Retry-After": "10"}) + retryable_response_header.status_code = 503 + + retryable_response_bad_header = requests.Response() + retryable_response_bad_header.headers = CaseInsensitiveDict({"Retry-After": "-12"}) + retryable_response_bad_header.status_code = 503 + + def setUp(self): + self.logs = self.generate_test_log_data() + self.exporter = OTLPAwsLogExporter(endpoint=self._ENDPOINT) + + @patch("requests.Session.post", return_value=good_response) + def test_export_success(self, mock_request): + """Tests that the exporter always compresses the serialized logs with gzip before exporting.""" + result = self.exporter.export(self.logs) + + mock_request.assert_called_once() + + _, kwargs = mock_request.call_args + data = kwargs.get("data", None) + + self.assertEqual(result, LogExportResult.SUCCESS) + + # Gzip first 10 bytes are reserved for metadata headers: + # https://www.loc.gov/preservation/digital/formats/fdd/fdd000599.shtml?loclr=blogsig + self.assertIsNotNone(data) + self.assertTrue(len(data) >= 10) + self.assertEqual(data[0:2], b"\x1f\x8b") + + @patch("requests.Session.post", return_value=good_response) + def test_should_not_export_if_shutdown(self, mock_request): + """Tests that no export request is made if the exporter is shutdown.""" + self.exporter.shutdown() + result = self.exporter.export(self.logs) + + mock_request.assert_not_called() + self.assertEqual(result, LogExportResult.FAILURE) + + @patch("requests.Session.post", return_value=non_retryable_response) + def test_should_not_export_again_if_not_retryable(self, mock_request): + """Tests that only one export request is made if the response status code is non-retryable.""" + result = self.exporter.export(self.logs) + mock_request.assert_called_once() + + self.assertEqual(result, LogExportResult.FAILURE) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait", + side_effect=lambda x: False, + ) + @patch("requests.Session.post", return_value=retryable_response_no_header) + def test_should_export_again_with_backoff_if_retryable_and_no_retry_after_header(self, mock_request, mock_wait): + """Tests that multiple export requests are made with exponential delay if the response status code is retryable. + But there is no Retry-After header.""" + self.exporter._timeout = 10000 # Large timeout to avoid early exit + result = self.exporter.export(self.logs) + + self.assertEqual(mock_wait.call_count, _MAX_RETRYS - 1) + + delays = mock_wait.call_args_list + + for index, delay in enumerate(delays): + expected_base = 2**index + actual_delay = delay[0][0] + # Assert delay is within jitter range: base * [0.8, 1.2] + self.assertGreaterEqual(actual_delay, expected_base * 0.8) + self.assertLessEqual(actual_delay, expected_base * 1.2) + + self.assertEqual(mock_request.call_count, _MAX_RETRYS) + self.assertEqual(result, LogExportResult.FAILURE) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait", + side_effect=lambda x: False, + ) + @patch( + "requests.Session.post", + side_effect=[retryable_response_header, retryable_response_header, retryable_response_header, good_response], + ) + def test_should_export_again_with_server_delay_if_retryable_and_retry_after_header(self, mock_request, mock_wait): + """Tests that multiple export requests are made with the server's suggested + delay if the response status code is retryable and there is a Retry-After header.""" + self.exporter._timeout = 10000 # Large timeout to avoid early exit + result = self.exporter.export(self.logs) + + delays = mock_wait.call_args_list + + for delay in delays: + self.assertEqual(delay[0][0], 10) + + self.assertEqual(mock_wait.call_count, 3) + self.assertEqual(mock_request.call_count, 4) + self.assertEqual(result, LogExportResult.SUCCESS) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait", + side_effect=lambda x: False, + ) + @patch( + "requests.Session.post", + side_effect=[ + retryable_response_bad_header, + retryable_response_bad_header, + retryable_response_bad_header, + good_response, + ], + ) + def test_should_export_again_with_backoff_delay_if_retryable_and_bad_retry_after_header( + self, mock_request, mock_wait + ): + """Tests that multiple export requests are made with exponential delay if the response status code is retryable. + but the Retry-After header is invalid or malformed.""" + self.exporter._timeout = 10000 # Large timeout to avoid early exit + result = self.exporter.export(self.logs) + + delays = mock_wait.call_args_list + + for index, delay in enumerate(delays): + expected_base = 2**index + actual_delay = delay[0][0] + # Assert delay is within jitter range: base * [0.8, 1.2] + self.assertGreaterEqual(actual_delay, expected_base * 0.8) + self.assertLessEqual(actual_delay, expected_base * 1.2) + + self.assertEqual(mock_wait.call_count, 3) + self.assertEqual(mock_request.call_count, 4) + self.assertEqual(result, LogExportResult.SUCCESS) + + @patch("requests.Session.post", side_effect=[requests.exceptions.ConnectionError(), good_response]) + def test_export_connection_error_retry(self, mock_request): + """Tests that the exporter retries on ConnectionError.""" + result = self.exporter.export(self.logs) + + self.assertEqual(mock_request.call_count, 2) + self.assertEqual(result, LogExportResult.SUCCESS) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait", + side_effect=lambda x: False, + ) + @patch("requests.Session.post", return_value=retryable_response_no_header) + def test_should_stop_retrying_when_deadline_exceeded(self, mock_request, mock_wait): + """Tests that the exporter stops retrying when the deadline is exceeded.""" + self.exporter._timeout = 5 # Short timeout to trigger deadline check + + with patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.time") as mock_time: + # First call returns start time, subsequent calls simulate time passing + mock_time.side_effect = [0, 0, 1, 2, 4, 8] # Exponential backoff would be 1, 2, 4 seconds + + result = self.exporter.export(self.logs) + + # Should stop before max retries due to deadline + self.assertLess(mock_wait.call_count, _MAX_RETRYS) + self.assertLess(mock_request.call_count, _MAX_RETRYS + 1) + self.assertEqual(result, LogExportResult.FAILURE) + + # Verify total time passed is at the timeout limit + self.assertGreaterEqual(5, self.exporter._timeout) + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.Event.wait", + side_effect=lambda x: True, + ) + @patch("requests.Session.post", return_value=retryable_response_no_header) + def test_export_interrupted_by_shutdown(self, mock_request, mock_wait): + """Tests that export can be interrupted by shutdown during retry wait.""" + self.exporter._timeout = 10000 + + result = self.exporter.export(self.logs) + + # Should make one request, then get interrupted during retry wait + self.assertEqual(mock_request.call_count, 1) + self.assertEqual(result, LogExportResult.FAILURE) + mock_wait.assert_called_once() + + @staticmethod + def generate_test_log_data(count=5): + logs = [] + for index in range(count): + record = LogRecord( + timestamp=int(time.time_ns()), + trace_id=int(f"0x{index + 1:032x}", 16), + span_id=int(f"0x{index + 1:016x}", 16), + trace_flags=TraceFlags(1), + severity_text="INFO", + severity_number=SeverityNumber.INFO, + body=f"Test log {index + 1}", + attributes={"test.attribute": f"value-{index + 1}"}, + ) + + log_data = LogData(log_record=record, instrumentation_scope=InstrumentationScope("test-scope", "1.0.0")) + + logs.append(log_data) + + return logs diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/traces/test_otlp_aws_span_exporter.py similarity index 100% rename from aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py rename to aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/traces/test_otlp_aws_span_exporter.py diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index b2eb70e60..66e168a57 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -26,6 +26,7 @@ OtlpLogHeaderSetting, _check_emf_exporter_enabled, _custom_import_sampler, + _customize_log_record_processor, _customize_logs_exporter, _customize_metric_exporters, _customize_resource, @@ -46,6 +47,11 @@ from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession + +# pylint: disable=line-too-long +from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import ( + AwsCloudWatchOtlpBatchLogRecordProcessor, +) from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter @@ -505,6 +511,7 @@ def test_customize_span_exporter_sigv4(self): OTLPAwsSpanExporter, AwsAuthSession, Compression.NoCompression, + Resource.get_empty(), ) for config in bad_configs: @@ -515,6 +522,7 @@ def test_customize_span_exporter_sigv4(self): OTLPSpanExporter, Session, Compression.NoCompression, + Resource.get_empty(), ) self.assertIsInstance( @@ -618,13 +626,11 @@ def test_customize_logs_exporter_sigv4(self): config, _customize_logs_exporter, OTLPLogExporter(), OTLPLogExporter, Session, Compression.NoCompression ) - self.assertIsInstance( - _customize_logs_exporter(OTLPGrpcLogExporter(), Resource.get_empty()), OTLPGrpcLogExporter - ) + self.assertIsInstance(_customize_logs_exporter(OTLPGrpcLogExporter()), OTLPGrpcLogExporter) # Need to patch all of these to prevent some weird multi-threading error with the LogProvider @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.LoggingHandler", return_value=MagicMock()) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.getLogger", return_value=MagicMock()) + @patch("logging.getLogger", return_value=MagicMock()) @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._customize_logs_exporter") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.LoggerProvider", return_value=MagicMock()) @patch( @@ -903,19 +909,13 @@ def test_customize_metric_exporter(self): os.environ.pop("OTEL_METRIC_EXPORT_INTERVAL", None) def customize_exporter_test( - self, - config, - executor, - default_exporter, - expected_exporter_type, - expected_session, - expected_compression, + self, config, executor, default_exporter, expected_exporter_type, expected_session, expected_compression, *args ): for key, value in config.items(): os.environ[key] = value try: - result = executor(default_exporter, Resource.get_empty()) + result = executor(default_exporter, *args) self.assertIsInstance(result, expected_exporter_type) self.assertIsInstance(result._session, expected_session) self.assertEqual(result._compression, expected_compression) @@ -1015,6 +1015,18 @@ def test_validate_and_fetch_logs_header(self): # Clean up os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_aws_otlp_endpoint") + def test_customize_log_record_processor_with_agent_observability(self, mock_is_aws_endpoint, mock_is_agent_enabled): + """Test that AwsCloudWatchOtlpBatchLogRecordProcessor is used when agent observability is enabled""" + mock_exporter = MagicMock(spec=OTLPAwsLogExporter) + mock_is_agent_enabled.return_value = True + mock_is_aws_endpoint.return_value = True + + processor = _customize_log_record_processor(mock_exporter) + + self.assertIsInstance(processor, AwsCloudWatchOtlpBatchLogRecordProcessor) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._validate_and_fetch_logs_header") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_installed") def test_create_emf_exporter(self, mock_is_installed, mock_validate):