-
Notifications
You must be signed in to change notification settings - Fork 22
Add AwsBatchLogProcessor and OtlpAwsLogExporter Logs Pipeline #402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c34f3b9
010e7df
24f4308
b75fe99
d588605
c78aca5
12eca32
83ec370
79bbf46
b6e1b97
17d0f90
3d12858
7f90bc7
fdddb7a
4b7bb0e
8c64adb
01e3fd8
2f0268c
7dbcb7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,10 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. | ||
import logging | ||
import os | ||
import re | ||
from logging import NOTSET, Logger, getLogger | ||
from typing import ClassVar, Dict, List, Type, Union | ||
from typing import ClassVar, Dict, List, Optional, Type, Union | ||
|
||
from importlib_metadata import version | ||
from typing_extensions import override | ||
|
@@ -22,12 +22,16 @@ | |
AwsMetricAttributesSpanExporterBuilder, | ||
) | ||
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder | ||
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import ( | ||
AwsCloudWatchOtlpBatchLogRecordProcessor, | ||
) | ||
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter | ||
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter | ||
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter | ||
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler | ||
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader | ||
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView | ||
from opentelemetry._events import set_event_logger_provider | ||
from opentelemetry._logs import get_logger_provider, set_logger_provider | ||
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter | ||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter | ||
|
@@ -42,7 +46,9 @@ | |
_import_id_generator, | ||
_import_sampler, | ||
_OTelSDKConfigurator, | ||
_patch_basic_config, | ||
) | ||
from opentelemetry.sdk._events import EventLoggerProvider | ||
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler | ||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter | ||
from opentelemetry.sdk.environment_variables import ( | ||
|
@@ -102,7 +108,7 @@ | |
# UDP package size is not larger than 64KB | ||
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10 | ||
|
||
_logger: Logger = getLogger(__name__) | ||
_logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator): | ||
|
@@ -132,7 +138,7 @@ def _configure(self, **kwargs): | |
# The OpenTelemetry Authors code | ||
# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on | ||
# internal logic. | ||
def _initialize_components(): | ||
def _initialize_components(setup_logging_handler: Optional[bool] = None): | ||
trace_exporters, metric_exporters, log_exporters = _import_exporters( | ||
_get_exporter_names("traces"), | ||
_get_exporter_names("metrics"), | ||
|
@@ -169,32 +175,37 @@ def _initialize_components(): | |
resource=resource, | ||
) | ||
_init_metrics(metric_exporters, resource) | ||
logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") | ||
if logging_enabled.strip().lower() == "true": | ||
_init_logging(log_exporters, resource) | ||
|
||
if setup_logging_handler is None: | ||
setup_logging_handler = ( | ||
os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false").strip().lower() == "true" | ||
) | ||
_init_logging(log_exporters, resource, setup_logging_handler) | ||
|
||
|
||
def _init_logging( | ||
exporters: Dict[str, Type[LogExporter]], | ||
resource: Resource = None, | ||
exporters: dict[str, Type[LogExporter]], | ||
resource: Optional[Resource] = None, | ||
setup_logging_handler: bool = True, | ||
): | ||
|
||
# Provides a default OTLP log exporter when none is specified. | ||
# This is the behavior for the logs exporters for other languages. | ||
if not exporters: | ||
exporters = {"otlp": OTLPLogExporter} | ||
|
||
provider = LoggerProvider(resource=resource) | ||
set_logger_provider(provider) | ||
|
||
for _, exporter_class in exporters.items(): | ||
exporter_args: Dict[str, any] = {} | ||
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource) | ||
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) | ||
exporter_args = {} | ||
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args)) | ||
log_processor = _customize_log_record_processor(log_exporter) | ||
provider.add_log_record_processor(log_processor) | ||
|
||
event_logger_provider = EventLoggerProvider(logger_provider=provider) | ||
set_event_logger_provider(event_logger_provider) | ||
|
||
handler = LoggingHandler(level=NOTSET, logger_provider=provider) | ||
if setup_logging_handler: | ||
_patch_basic_config() | ||
|
||
getLogger().addHandler(handler) | ||
# Add OTel handler | ||
handler = LoggingHandler(level=logging.NOTSET, logger_provider=provider) | ||
logging.getLogger().addHandler(handler) | ||
|
||
|
||
def _init_tracing( | ||
|
@@ -383,7 +394,14 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> | |
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build() | ||
|
||
|
||
def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter: | ||
def _customize_log_record_processor(log_exporter: LogExporter): | ||
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled(): | ||
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter) | ||
|
||
return BatchLogRecordProcessor(exporter=log_exporter) | ||
|
||
|
||
def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter: | ||
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) | ||
|
||
if _is_aws_otlp_endpoint(logs_endpoint, "logs"): | ||
|
@@ -532,7 +550,7 @@ def _is_lambda_environment(): | |
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ | ||
|
||
|
||
def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool: | ||
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally speaking - keep refactors (even small things like this) in separate PRs, it reduces overhead for the reviewer and makes PRs faster overall. Fine to keep this one line change in this PR for now. |
||
"""Is the given endpoint an AWS OTLP endpoint?""" | ||
|
||
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import logging | ||
from typing import List, Mapping, Optional, Sequence, cast | ||
|
||
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter | ||
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, set_value | ||
from opentelemetry.sdk._logs import LogData | ||
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy | ||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor | ||
from opentelemetry.util.types import AnyValue | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
class AwsCloudWatchOtlpBatchLogRecordProcessor(BatchLogRecordProcessor): | ||
""" | ||
Custom implementation of BatchLogRecordProcessor that manages log record batching | ||
with size-based constraints to prevent exceeding AWS CloudWatch Logs OTLP endpoint request size limits. | ||
|
||
This processor still exports all logs up to _max_export_batch_size but rather than doing exactly | ||
one export, we will estimate log sizes and do multiple batch exports | ||
where each exported batch will have an additional constraint: | ||
|
||
If the batch to be exported will have a data size of > 1 MB: | ||
The batch will be split into multiple exports of sub-batches of data size <= 1 MB. | ||
|
||
A unique case is if the sub-batch is of data size > 1 MB, then the sub-batch will have exactly 1 log in it. | ||
""" | ||
|
||
_BASE_LOG_BUFFER_BYTE_SIZE = ( | ||
1000 # Buffer size in bytes to account for log metadata not included in the body or attribute size calculation | ||
) | ||
_MAX_LOG_REQUEST_BYTE_SIZE = ( | ||
1048576 # Maximum uncompressed/unserialized bytes / request - | ||
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html | ||
) | ||
|
||
def __init__( | ||
self, | ||
exporter: OTLPAwsLogExporter, | ||
schedule_delay_millis: Optional[float] = None, | ||
max_export_batch_size: Optional[int] = None, | ||
export_timeout_millis: Optional[float] = None, | ||
max_queue_size: Optional[int] = None, | ||
): | ||
|
||
super().__init__( | ||
exporter=exporter, | ||
schedule_delay_millis=schedule_delay_millis, | ||
max_export_batch_size=max_export_batch_size, | ||
export_timeout_millis=export_timeout_millis, | ||
max_queue_size=max_queue_size, | ||
) | ||
|
||
self._exporter = exporter | ||
|
||
def _export(self, batch_strategy: BatchLogExportStrategy) -> None: | ||
""" | ||
Explicitly overrides upstream _export method to add AWS CloudWatch size-based batching | ||
See: | ||
https://github.com/open-telemetry/opentelemetry-python/blob/bb21ebd46d070c359eee286c97bdf53bfd06759d/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143 | ||
|
||
Preserves existing batching behavior but will intermediarly export small log batches if | ||
the size of the data in the batch is at or above AWS CloudWatch's maximum request size limit of 1 MB. | ||
|
||
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below: | ||
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1 | ||
""" | ||
with self._export_lock: | ||
iteration = 0 | ||
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch | ||
# once the lock is obtained to see if we still need to make the requested export. | ||
while self._should_export_batch(batch_strategy, iteration): | ||
iteration += 1 | ||
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) | ||
try: | ||
batch_length = min(self._max_export_batch_size, len(self._queue)) | ||
batch_data_size = 0 | ||
batch = [] | ||
|
||
for _ in range(batch_length): | ||
log_data: LogData = self._queue.pop() | ||
log_size = self._estimate_log_size(log_data) | ||
|
||
if batch and (batch_data_size + log_size > self._MAX_LOG_REQUEST_BYTE_SIZE): | ||
self._exporter.export(batch) | ||
batch_data_size = 0 | ||
batch = [] | ||
|
||
batch_data_size += log_size | ||
batch.append(log_data) | ||
|
||
if batch: | ||
self._exporter.export(batch) | ||
except Exception as exception: # pylint: disable=broad-exception-caught | ||
_logger.exception("Exception while exporting logs: %s", exception) | ||
detach(token) | ||
|
||
def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran out of time and have not reviewed this method |
||
""" | ||
Estimates the size in bytes of a log by calculating the size of its body and its attributes | ||
and adding a buffer amount to account for other log metadata information. | ||
Will process complex log structures up to the specified depth limit. | ||
If the depth limit of the log structure is exceeded, returns the truncated calculation | ||
to everything up to that point. | ||
|
||
Args: | ||
log: The Log object to calculate size for | ||
depth: Maximum depth to traverse in nested structures (default: 3) | ||
|
||
Returns: | ||
int: The estimated size of the log object in bytes | ||
""" | ||
|
||
# Use a queue to prevent excessive recursive calls. | ||
# We calculate based on the size of the log record body and attributes for the log. | ||
queue: List[tuple[AnyValue, int]] = [(log.log_record.body, 0), (log.log_record.attributes, -1)] | ||
|
||
size: int = self._BASE_LOG_BUFFER_BYTE_SIZE | ||
|
||
while queue: | ||
new_queue: List[tuple[AnyValue, int]] = [] | ||
|
||
for data in queue: | ||
# small optimization, can stop calculating the size once it reaches the 1 MB limit. | ||
if size >= self._MAX_LOG_REQUEST_BYTE_SIZE: | ||
return size | ||
|
||
next_val, current_depth = data | ||
|
||
if isinstance(next_val, (str, bytes)): | ||
size += len(next_val) | ||
continue | ||
|
||
if isinstance(next_val, bool): | ||
size += 4 if next_val else 5 | ||
continue | ||
|
||
if isinstance(next_val, (float, int)): | ||
size += len(str(next_val)) | ||
continue | ||
|
||
if current_depth <= depth: | ||
if isinstance(next_val, Sequence): | ||
for content in next_val: | ||
new_queue.append((cast(AnyValue, content), current_depth + 1)) | ||
|
||
if isinstance(next_val, Mapping): | ||
for key, content in next_val.items(): | ||
size += len(key) | ||
new_queue.append((content, current_depth + 1)) | ||
else: | ||
_logger.debug( | ||
"Max log depth of %s exceeded. Log data size will not be accurately calculated.", depth | ||
) | ||
|
||
queue = new_queue | ||
|
||
return size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General guidance for code within
# The OpenTelemetry Authors code
- this code should be as identical to upstream as possible. If you need to modify this code, you should call a helper function to accomplish your goal. This helps us understand where and why code is different from upstream.In this case, you are modifying
_init_logging
, when I look at that method, I see it already is substantially different from upstream - I see no explanation given in the code nor do I see one provided in the previous PR that added this (#358), despite an explicit question raised here: #358 (comment)Why is this code different from upstream?