Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ For any change that affects end users of this package, please add an entry under
If your change does not need a CHANGELOG entry, add the "skip changelog" label to your PR.

## Unreleased
- Add Service and Environment dimensions to EMF metrics when Application Signals EMF export is enabled
([#548](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/548))
- Add Resource and CFN Attributes for Bedrock AgentCore spans
([#495](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/495))
- Add botocore instrumentation extension for Bedrock AgentCore services with span attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import json
import logging
import math
import os
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple

from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics import Histogram as HistogramInstr
from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter
Expand All @@ -28,10 +30,19 @@
)
from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.util.types import Attributes

logger = logging.getLogger(__name__)

# Dimension name constants
SERVICE_DIMENSION_NAME: str = "Service"
ENVIRONMENT_DIMENSION_NAME: str = "Environment"

# Constants
LAMBDA_DEFAULT: str = "lambda:default"
UNKNOWN_SERVICE: str = "UnknownService"


class MetricRecord:
"""The metric data unified representation of all OTel metrics for OTel to CW EMF conversion."""
Expand Down Expand Up @@ -184,6 +195,53 @@ def _get_dimension_names(self, attributes: Attributes) -> List[str]:
# For now, use all attributes as dimensions
return list(attributes.keys())

def _has_dimension_case_insensitive(self, dimension_names: List[str], dimension_to_check: str) -> bool:
"""Check if dimension already exists (case-insensitive match)."""
dimension_lower = dimension_to_check.lower()
return any(dim.lower() == dimension_lower for dim in dimension_names)

@staticmethod
def _is_application_signals_emf_export_enabled() -> bool:
"""Check if Application Signals EMF export is enabled.

Returns True only if BOTH:
- OTEL_AWS_APPLICATION_SIGNALS_ENABLED is true
- OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED is true
"""
app_signals_enabled = os.environ.get("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "false").lower() == "true"
emf_export_enabled = (
os.environ.get("OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED", "false").lower() == "true"
)
return app_signals_enabled and emf_export_enabled

def _add_application_signals_dimensions(
self, dimension_names: List[str], emf_log: Dict, resource: Resource
) -> None:
"""Add Service and Environment dimensions if not already present (case-insensitive)."""
if not self._is_application_signals_emf_export_enabled():
return

# Add Service dimension if not already set by user
if not self._has_dimension_case_insensitive(dimension_names, SERVICE_DIMENSION_NAME):
if resource:
service_name, _ = get_service_attribute(resource)
else:
service_name = UNKNOWN_SERVICE
dimension_names.insert(0, SERVICE_DIMENSION_NAME)
emf_log[SERVICE_DIMENSION_NAME] = str(service_name)

# Add Environment dimension if not already set by user
if not self._has_dimension_case_insensitive(dimension_names, ENVIRONMENT_DIMENSION_NAME):
environment_value = None
if resource and resource.attributes:
environment_value = resource.attributes.get(ResourceAttributes.DEPLOYMENT_ENVIRONMENT)
if not environment_value:
environment_value = LAMBDA_DEFAULT
# Insert after Service if it exists, otherwise at the beginning
insert_pos = 1 if SERVICE_DIMENSION_NAME in dimension_names else 0
dimension_names.insert(insert_pos, ENVIRONMENT_DIMENSION_NAME)
emf_log[ENVIRONMENT_DIMENSION_NAME] = str(environment_value)

def _get_attributes_key(self, attributes: Attributes) -> str:
"""
Create a hashable key from attributes for grouping metrics.
Expand Down Expand Up @@ -493,6 +551,9 @@ def _create_emf_log(
for name, value in all_attributes.items():
emf_log[name] = str(value)

# Add Service and Environment dimensions if Application Signals EMF export is enabled
self._add_application_signals_dimensions(dimension_names, emf_log, resource)

# Add CloudWatch Metrics if we have metrics, include dimensions only if they exist
if metric_definitions:
cloudwatch_metric = {"Namespace": self.namespace, "Metrics": metric_definitions}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import os
import unittest
from unittest.mock import Mock
from unittest.mock import Mock, patch

from amazon.opentelemetry.distro.exporter.aws.metrics.base_emf_exporter import BaseEmfExporter, MetricRecord
from opentelemetry.sdk.metrics.export import MetricExportResult
Expand Down Expand Up @@ -286,6 +287,173 @@ def test_export_failure_handling(self):
result = self.exporter.export(metrics_data)
self.assertEqual(result, MetricExportResult.FAILURE)

def test_has_dimension_case_insensitive(self):
"""Test case-insensitive dimension checking."""
dimension_names = ["Service", "Environment", "operation"]

# Exact match
self.assertTrue(self.exporter._has_dimension_case_insensitive(dimension_names, "Service"))
self.assertTrue(self.exporter._has_dimension_case_insensitive(dimension_names, "Environment"))

# Case variations
self.assertTrue(self.exporter._has_dimension_case_insensitive(dimension_names, "service"))
self.assertTrue(self.exporter._has_dimension_case_insensitive(dimension_names, "SERVICE"))
self.assertTrue(self.exporter._has_dimension_case_insensitive(dimension_names, "environment"))
self.assertTrue(self.exporter._has_dimension_case_insensitive(dimension_names, "ENVIRONMENT"))
self.assertTrue(self.exporter._has_dimension_case_insensitive(dimension_names, "OPERATION"))

# Non-existent dimension
self.assertFalse(self.exporter._has_dimension_case_insensitive(dimension_names, "NotExists"))

# Empty list
self.assertFalse(self.exporter._has_dimension_case_insensitive([], "Service"))

def test_add_application_signals_dimensions_disabled(self):
"""Test that dimensions are not added when feature is disabled."""
# Default exporter has feature disabled
dimension_names = ["operation"]
emf_log = {}
resource = Resource.create({"service.name": "my-service", "deployment.environment": "production"})

self.exporter._add_application_signals_dimensions(dimension_names, emf_log, resource)

# Dimensions should not be added
self.assertEqual(dimension_names, ["operation"])
self.assertNotIn("Service", emf_log)
self.assertNotIn("Environment", emf_log)

@patch.dict(
os.environ,
{"OTEL_AWS_APPLICATION_SIGNALS_ENABLED": "true", "OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED": "true"},
)
def test_add_application_signals_dimensions_enabled(self):
"""Test that dimensions are added when feature is enabled."""
exporter = ConcreteEmfExporter(namespace="TestNamespace")
dimension_names = ["operation"]
emf_log = {}
resource = Resource.create({"service.name": "my-service", "deployment.environment": "production"})

exporter._add_application_signals_dimensions(dimension_names, emf_log, resource)

# Service and Environment should be added at the beginning
self.assertEqual(dimension_names, ["Service", "Environment", "operation"])
self.assertEqual(emf_log["Service"], "my-service")
self.assertEqual(emf_log["Environment"], "production")

@patch.dict(
os.environ,
{"OTEL_AWS_APPLICATION_SIGNALS_ENABLED": "true", "OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED": "true"},
)
def test_add_application_signals_dimensions_fallback_values(self):
"""Test fallback values when resource attributes are not available."""
exporter = ConcreteEmfExporter(namespace="TestNamespace")
dimension_names = ["operation"]
emf_log = {}
# Resource without deployment.environment
resource = Resource.create({"service.name": "my-service"})

exporter._add_application_signals_dimensions(dimension_names, emf_log, resource)

# Service should use service.name, Environment should fallback to lambda:default
self.assertIn("Service", dimension_names)
self.assertIn("Environment", dimension_names)
self.assertEqual(emf_log["Service"], "my-service")
self.assertEqual(emf_log["Environment"], "lambda:default")

@patch.dict(
os.environ,
{"OTEL_AWS_APPLICATION_SIGNALS_ENABLED": "true", "OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED": "true"},
)
def test_add_application_signals_dimensions_no_resource(self):
"""Test fallback when resource is None."""
exporter = ConcreteEmfExporter(namespace="TestNamespace")
dimension_names = ["operation"]
emf_log = {}

exporter._add_application_signals_dimensions(dimension_names, emf_log, None)

# Should use fallback values
self.assertIn("Service", dimension_names)
self.assertIn("Environment", dimension_names)
self.assertEqual(emf_log["Service"], "UnknownService")
self.assertEqual(emf_log["Environment"], "lambda:default")

@patch.dict(
os.environ,
{"OTEL_AWS_APPLICATION_SIGNALS_ENABLED": "true", "OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED": "true"},
)
def test_add_application_signals_dimensions_service_already_set(self):
"""Test that Service dimension is not overwritten if already set (case-insensitive)."""
exporter = ConcreteEmfExporter(namespace="TestNamespace")

# User has set "service" (lowercase)
dimension_names = ["service", "operation"]
emf_log = {"service": "user-service"}
resource = Resource.create({"service.name": "my-service", "deployment.environment": "production"})

exporter._add_application_signals_dimensions(dimension_names, emf_log, resource)

# Service should NOT be added (case-insensitive match), but Environment should be
self.assertIn("Environment", dimension_names)
self.assertNotIn("Service", dimension_names) # "Service" not added because "service" exists
self.assertEqual(emf_log.get("service"), "user-service") # User value preserved
self.assertEqual(emf_log.get("Environment"), "production")

@patch.dict(
os.environ,
{"OTEL_AWS_APPLICATION_SIGNALS_ENABLED": "true", "OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED": "true"},
)
def test_add_application_signals_dimensions_environment_already_set(self):
"""Test that Environment dimension is not overwritten if already set (case-insensitive)."""
exporter = ConcreteEmfExporter(namespace="TestNamespace")

# User has set "ENVIRONMENT" (uppercase)
dimension_names = ["ENVIRONMENT", "operation"]
emf_log = {"ENVIRONMENT": "user-environment"}
resource = Resource.create({"service.name": "my-service", "deployment.environment": "production"})

exporter._add_application_signals_dimensions(dimension_names, emf_log, resource)

# Environment should NOT be added (case-insensitive match), but Service should be
self.assertIn("Service", dimension_names)
self.assertNotIn("Environment", dimension_names) # "Environment" not added because "ENVIRONMENT" exists
self.assertEqual(emf_log.get("Service"), "my-service")
self.assertEqual(emf_log.get("ENVIRONMENT"), "user-environment") # User value preserved

@patch.dict(
os.environ,
{"OTEL_AWS_APPLICATION_SIGNALS_ENABLED": "true", "OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED": "true"},
)
def test_create_emf_log_with_application_signals_enabled(self):
"""Test EMF log creation with Application Signals EMF export enabled."""
exporter = ConcreteEmfExporter(namespace="TestNamespace")

record = exporter._create_metric_record("test_metric", "Count", "Test")
record.value = 50.0
record.timestamp = 1234567890
record.attributes = {"operation": "test"}

records = [record]
resource = Resource.create(
{
"service.name": "test-service",
"deployment.environment": "production",
}
)

result = exporter._create_emf_log(records, resource, 1234567890)

# Check that Service and Environment dimensions were added
self.assertEqual(result["Service"], "test-service")
self.assertEqual(result["Environment"], "production")

# Check CloudWatch metrics dimensions include Service and Environment
cw_metrics = result["_aws"]["CloudWatchMetrics"][0]
dimensions = cw_metrics["Dimensions"][0]
self.assertIn("Service", dimensions)
self.assertIn("Environment", dimensions)
self.assertIn("operation", dimensions)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,9 @@ def test_create_emf_exporter_lambda_without_valid_headers(
result = _create_emf_exporter()

self.assertEqual(result, mock_exporter_instance)
mock_console_exporter.assert_called_once_with(namespace="test-namespace")
mock_console_exporter.assert_called_once_with(
namespace="test-namespace",
)

@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header")
@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment")
Expand Down Expand Up @@ -1474,7 +1476,9 @@ def test_create_emf_exporter_lambda_without_valid_headers_none_namespace(
result = _create_emf_exporter()

self.assertEqual(result, mock_exporter_instance)
mock_console_exporter.assert_called_once_with(namespace=None)
mock_console_exporter.assert_called_once_with(
namespace=None,
)

@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header")
@patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment")
Expand Down
4 changes: 4 additions & 0 deletions lambda-layer/src/otel-instrument
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" ]; then
export OTEL_AWS_APPLICATION_SIGNALS_ENABLED="true";
fi

if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED}" ]; then
export OTEL_AWS_APPLICATION_SIGNALS_EMF_EXPORT_ENABLED="true";
fi

# - If Application Signals is enabled

if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "true" ]; then
Expand Down