Skip to content

feat: Extract resource arn and remote resource access key for cross-account support #396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
# TODO:Move to Semantic Conventions when these attributes are added.
AWS_AUTH_ACCESS_KEY: str = "aws.auth.account.access_key"
AWS_AUTH_REGION: str = "aws.auth.region"
AWS_SQS_QUEUE_URL: str = "aws.sqs.queue.url"
AWS_SQS_QUEUE_NAME: str = "aws.sqs.queue.name"
AWS_KINESIS_STREAM_ARN: str = "aws.kinesis.stream.arn"
AWS_KINESIS_STREAM_NAME: str = "aws.kinesis.stream.name"
AWS_BEDROCK_DATA_SOURCE_ID: str = "aws.bedrock.data_source.id"
AWS_BEDROCK_KNOWLEDGE_BASE_ID: str = "aws.bedrock.knowledge_base.id"
Expand All @@ -33,3 +36,7 @@
AWS_LAMBDA_FUNCTION_NAME: str = "aws.lambda.function.name"
AWS_LAMBDA_RESOURCEMAPPING_ID: str = "aws.lambda.resource_mapping.id"
AWS_LAMBDA_FUNCTION_ARN: str = "aws.lambda.function.arn"
AWS_DYNAMODB_TABLE_ARN: str = "aws.dynamodb.table.arn"
AWS_REMOTE_RESOURCE_ACCESS_KEY: str = "aws.remote.resource.account.access_key"
AWS_REMOTE_RESOURCE_ACCOUNT_ID: str = "aws.remote.resource.account.id"
AWS_REMOTE_RESOURCE_REGION: str = "aws.remote.resource.region"
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
from urllib.parse import ParseResult, urlparse

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_AUTH_ACCESS_KEY,
AWS_AUTH_REGION,
AWS_BEDROCK_AGENT_ID,
AWS_BEDROCK_DATA_SOURCE_ID,
AWS_BEDROCK_GUARDRAIL_ARN,
AWS_BEDROCK_GUARDRAIL_ID,
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER,
AWS_DYNAMODB_TABLE_ARN,
AWS_KINESIS_STREAM_ARN,
AWS_KINESIS_STREAM_NAME,
AWS_LAMBDA_FUNCTION_ARN,
AWS_LAMBDA_FUNCTION_NAME,
Expand All @@ -22,7 +26,10 @@
AWS_REMOTE_DB_USER,
AWS_REMOTE_ENVIRONMENT,
AWS_REMOTE_OPERATION,
AWS_REMOTE_RESOURCE_ACCESS_KEY,
AWS_REMOTE_RESOURCE_ACCOUNT_ID,
AWS_REMOTE_RESOURCE_IDENTIFIER,
AWS_REMOTE_RESOURCE_REGION,
AWS_REMOTE_RESOURCE_TYPE,
AWS_REMOTE_SERVICE,
AWS_SECRETSMANAGER_SECRET_ARN,
Expand Down Expand Up @@ -56,6 +63,7 @@
SERVICE_METRIC,
MetricAttributeGenerator,
)
from amazon.opentelemetry.distro.regional_resource_arn_parser import RegionalResourceArnParser
from amazon.opentelemetry.distro.sqs_url_parser import SqsUrlParser
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan
Expand Down Expand Up @@ -148,7 +156,11 @@ def _generate_dependency_metric_attributes(span: ReadableSpan, resource: Resourc
_set_service(resource, span, attributes)
_set_egress_operation(span, attributes)
_set_remote_service_and_operation(span, attributes)
_set_remote_type_and_identifier(span, attributes)
is_remote_identifier_present = _set_remote_type_and_identifier(span, attributes)
if is_remote_identifier_present:
is_remote_account_id_present = _set_remote_account_id_and_region(span, attributes)
if not is_remote_account_id_present:
_set_remote_access_key_and_region(span, attributes)
_set_remote_environment(span, attributes)
_set_remote_db_user(span, attributes)
_set_span_kind_for_dependency(span, attributes)
Expand Down Expand Up @@ -383,7 +395,7 @@ def _generate_remote_operation(span: ReadableSpan) -> str:


# pylint: disable=too-many-branches,too-many-statements
def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttributes) -> None:
def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttributes) -> bool:
"""
Remote resource attributes {@link AwsAttributeKeys#AWS_REMOTE_RESOURCE_TYPE} and {@link
AwsAttributeKeys#AWS_REMOTE_RESOURCE_IDENTIFIER} are used to store information about the resource associated with
Expand All @@ -403,9 +415,23 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
if is_key_present(span, _AWS_TABLE_NAMES) and len(span.attributes.get(_AWS_TABLE_NAMES)) == 1:
remote_resource_type = _NORMALIZED_DYNAMO_DB_SERVICE_NAME + "::Table"
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_TABLE_NAMES)[0])
elif is_key_present(span, AWS_DYNAMODB_TABLE_ARN):
remote_resource_type = _NORMALIZED_DYNAMO_DB_SERVICE_NAME + "::Table"
remote_resource_identifier = _escape_delimiters(
RegionalResourceArnParser.extract_dynamodb_table_name_from_arn(
span.attributes.get(AWS_DYNAMODB_TABLE_ARN)
)
)
elif is_key_present(span, AWS_KINESIS_STREAM_NAME):
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_KINESIS_STREAM_NAME))
elif is_key_present(span, AWS_KINESIS_STREAM_ARN):
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
remote_resource_identifier = _escape_delimiters(
RegionalResourceArnParser.extract_kinesis_stream_name_from_arn(
span.attributes.get(AWS_KINESIS_STREAM_ARN)
)
)
elif is_key_present(span, _AWS_BUCKET_NAME):
remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket"
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME))
Expand Down Expand Up @@ -442,27 +468,35 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
remote_resource_identifier = _escape_delimiters(span.attributes.get(GEN_AI_REQUEST_MODEL))
elif is_key_present(span, AWS_SECRETSMANAGER_SECRET_ARN):
remote_resource_type = _NORMALIZED_SECRETSMANAGER_SERVICE_NAME + "::Secret"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_SECRETSMANAGER_SECRET_ARN)).split(
":"
)[-1]
remote_resource_identifier = _escape_delimiters(
RegionalResourceArnParser.extract_resource_name_from_arn(
span.attributes.get(AWS_SECRETSMANAGER_SECRET_ARN)
)
)
cloudformation_primary_identifier = _escape_delimiters(span.attributes.get(AWS_SECRETSMANAGER_SECRET_ARN))
elif is_key_present(span, AWS_SNS_TOPIC_ARN):
remote_resource_type = _NORMALIZED_SNS_SERVICE_NAME + "::Topic"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_SNS_TOPIC_ARN)).split(":")[-1]
remote_resource_identifier = _escape_delimiters(
RegionalResourceArnParser.extract_resource_name_from_arn(span.attributes.get(AWS_SNS_TOPIC_ARN))
)
cloudformation_primary_identifier = _escape_delimiters(span.attributes.get(AWS_SNS_TOPIC_ARN))
elif is_key_present(span, AWS_STEPFUNCTIONS_STATEMACHINE_ARN):
remote_resource_type = _NORMALIZED_STEPFUNCTIONS_SERVICE_NAME + "::StateMachine"
remote_resource_identifier = _escape_delimiters(
span.attributes.get(AWS_STEPFUNCTIONS_STATEMACHINE_ARN)
).split(":")[-1]
RegionalResourceArnParser.extract_resource_name_from_arn(
span.attributes.get(AWS_STEPFUNCTIONS_STATEMACHINE_ARN)
)
)
cloudformation_primary_identifier = _escape_delimiters(
span.attributes.get(AWS_STEPFUNCTIONS_STATEMACHINE_ARN)
)
elif is_key_present(span, AWS_STEPFUNCTIONS_ACTIVITY_ARN):
remote_resource_type = _NORMALIZED_STEPFUNCTIONS_SERVICE_NAME + "::Activity"
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STEPFUNCTIONS_ACTIVITY_ARN)).split(
":"
)[-1]
remote_resource_identifier = _escape_delimiters(
RegionalResourceArnParser.extract_resource_name_from_arn(
span.attributes.get(AWS_STEPFUNCTIONS_ACTIVITY_ARN)
)
)
cloudformation_primary_identifier = _escape_delimiters(span.attributes.get(AWS_STEPFUNCTIONS_ACTIVITY_ARN))
elif is_key_present(span, AWS_LAMBDA_FUNCTION_NAME):
# For non-Invoke Lambda operations, treat Lambda as a resource,
Expand Down Expand Up @@ -491,6 +525,48 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
attributes[AWS_REMOTE_RESOURCE_TYPE] = remote_resource_type
attributes[AWS_REMOTE_RESOURCE_IDENTIFIER] = remote_resource_identifier
attributes[AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER] = cloudformation_primary_identifier
return True
return False


def _set_remote_account_id_and_region(span: ReadableSpan, attributes: BoundedAttributes) -> bool:
ARN_ATTRIBUTES = [
AWS_DYNAMODB_TABLE_ARN,
AWS_KINESIS_STREAM_ARN,
AWS_SNS_TOPIC_ARN,
AWS_SECRETSMANAGER_SECRET_ARN,
AWS_STEPFUNCTIONS_STATEMACHINE_ARN,
AWS_STEPFUNCTIONS_ACTIVITY_ARN,
AWS_BEDROCK_GUARDRAIL_ARN,
AWS_LAMBDA_FUNCTION_ARN,
]
remote_account_id: Optional[str] = None
remote_region: Optional[str] = None

if is_key_present(span, AWS_SQS_QUEUE_URL):
queue_url = _escape_delimiters(span.attributes.get(AWS_SQS_QUEUE_URL))
remote_account_id = SqsUrlParser.get_account_id(queue_url)
remote_region = SqsUrlParser.get_region(queue_url)
else:
for arn_attribute in ARN_ATTRIBUTES:
if is_key_present(span, arn_attribute):
arn = span.attributes.get(arn_attribute)
remote_account_id = RegionalResourceArnParser.get_account_id(arn)
remote_region = RegionalResourceArnParser.get_region(arn)
break

if remote_account_id is not None and remote_region is not None:
attributes[AWS_REMOTE_RESOURCE_ACCOUNT_ID] = remote_account_id
attributes[AWS_REMOTE_RESOURCE_REGION] = remote_region
return True
return False


def _set_remote_access_key_and_region(span: ReadableSpan, attributes: BoundedAttributes) -> None:
if is_key_present(span, AWS_AUTH_ACCESS_KEY):
attributes[AWS_REMOTE_RESOURCE_ACCESS_KEY] = span.attributes.get(AWS_AUTH_ACCESS_KEY)
if is_key_present(span, AWS_AUTH_REGION):
attributes[AWS_REMOTE_RESOURCE_REGION] = span.attributes.get(AWS_AUTH_REGION)


def _set_remote_environment(span: ReadableSpan, attributes: BoundedAttributes) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,15 @@ def get_aws_region() -> str:
"AWS region not found. Please set AWS_REGION environment variable or configure AWS CLI with 'aws configure'."
)
return None


def is_account_id(input_str: str) -> bool:
if input_str is None:
return False

try:
int(input_str)
except ValueError:
return False

return True
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import importlib

from botocore.exceptions import ClientError

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_AUTH_ACCESS_KEY,
AWS_AUTH_REGION,
AWS_DYNAMODB_TABLE_ARN,
AWS_KINESIS_STREAM_ARN,
AWS_KINESIS_STREAM_NAME,
AWS_LAMBDA_FUNCTION_ARN,
AWS_LAMBDA_FUNCTION_NAME,
Expand All @@ -20,7 +26,14 @@
_BedrockAgentRuntimeExtension,
_BedrockExtension,
)
from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS
from opentelemetry.instrumentation.botocore import (
BotocoreInstrumentor,
_apply_response_attributes,
_determine_call_context,
_safe_invoke,
)
from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS, _find_extension
from opentelemetry.instrumentation.botocore.extensions.dynamodb import _DynamoDbExtension
from opentelemetry.instrumentation.botocore.extensions.lmbd import _LambdaExtension
from opentelemetry.instrumentation.botocore.extensions.sns import _SnsExtension
from opentelemetry.instrumentation.botocore.extensions.sqs import _SqsExtension
Expand All @@ -30,6 +43,11 @@
_BotocoreInstrumentorContext,
_BotoResultT,
)
from opentelemetry.instrumentation.botocore.utils import get_server_attributes
from opentelemetry.instrumentation.utils import (
is_instrumentation_enabled,
suppress_http_instrumentation,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.span import Span

Expand All @@ -39,6 +57,7 @@ def _apply_botocore_instrumentation_patches() -> None:

Adds patches to provide additional support and Java parity for Kinesis, S3, and SQS.
"""
_apply_botocore_api_call_patch()
_apply_botocore_kinesis_patch()
_apply_botocore_s3_patch()
_apply_botocore_sqs_patch()
Expand All @@ -47,6 +66,7 @@ def _apply_botocore_instrumentation_patches() -> None:
_apply_botocore_sns_patch()
_apply_botocore_stepfunctions_patch()
_apply_botocore_lambda_patch()
_apply_botocore_dynamodb_patch()


def _apply_botocore_lambda_patch() -> None:
Expand Down Expand Up @@ -208,6 +228,112 @@ def _apply_botocore_bedrock_patch() -> None:
# bedrock-runtime is handled by upstream


def _apply_botocore_dynamodb_patch() -> None:
"""Botocore instrumentation patch for DynamoDB

This patch adds an extension to the upstream's list of known extensions for DynamoDB.
Extensions allow for custom logic for adding service-specific information to
spans, such as attributes. Specifically, we are adding logic to add the
`aws.table.arn` attribute, to be used to generate RemoteTarget and achieve
parity with the Java instrumentation.
"""
old_on_success = _DynamoDbExtension.on_success

def patch_on_success(self, span: Span, result: _BotoResultT, instrumentor_context: _BotocoreInstrumentorContext):
old_on_success(self, span, result, instrumentor_context)
table = result.get("Table", {})
table_arn = table.get("TableArn")
if table_arn:
span.set_attribute(AWS_DYNAMODB_TABLE_ARN, table_arn)

_DynamoDbExtension.on_success = patch_on_success


def _apply_botocore_api_call_patch() -> None:
def patched_api_call(self, original_func, instance, args, kwargs):
"""Botocore instrumentation patch to capture AWS authentication details

This patch extends the upstream implementation to include additional AWS authentication
attributes:
- aws.auth.account.access_key
- aws.auth.region

Note: Current implementation duplicates upstream code in v1.33.x-0.54bx. Future improvements should:
1. Propose refactoring upstream _patched_api_call into smaller components
2. Apply targeted patches to these components to reduce code duplication

Reference: https://github.com/open-telemetry/opentelemetry-python-contrib/blob/release/v1.33.x-0.54bx/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py#L263
"""
if not is_instrumentation_enabled():
return original_func(*args, **kwargs)

call_context = _determine_call_context(instance, args)
if call_context is None:
return original_func(*args, **kwargs)

extension = _find_extension(call_context)
if not extension.should_trace_service_call():
return original_func(*args, **kwargs)

attributes = {
SpanAttributes.RPC_SYSTEM: "aws-api",
SpanAttributes.RPC_SERVICE: call_context.service_id,
SpanAttributes.RPC_METHOD: call_context.operation,
# TODO: update when semantic conventions exist
"aws.region": call_context.region,
**get_server_attributes(call_context.endpoint_url),
AWS_AUTH_REGION: call_context.region,
}
credentials = instance._get_credentials()

if credentials is not None:
access_key = credentials.access_key
if access_key is not None:
attributes[AWS_AUTH_ACCESS_KEY] = access_key

_safe_invoke(extension.extract_attributes, attributes)
end_span_on_exit = extension.should_end_span_on_exit()

tracer = self._get_tracer(extension)
event_logger = self._get_event_logger(extension)
meter = self._get_meter(extension)
metrics = self._get_metrics(extension, meter)
instrumentor_ctx = _BotocoreInstrumentorContext(
event_logger=event_logger,
metrics=metrics,
)
with tracer.start_as_current_span(
call_context.span_name,
kind=call_context.span_kind,
attributes=attributes,
# tracing streaming services require to close the span manually
# at a later time after the stream has been consumed
end_on_exit=end_span_on_exit,
) as span:
_safe_invoke(extension.before_service_call, span, instrumentor_ctx)
self._call_request_hook(span, call_context)

try:
with suppress_http_instrumentation():
result = None
try:
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error, instrumentor_ctx)
raise
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result, instrumentor_ctx)
finally:
_safe_invoke(extension.after_service_call, instrumentor_ctx)
self._call_response_hook(span, call_context, result)

return result

BotocoreInstrumentor._patched_api_call = patched_api_call


# The OpenTelemetry Authors code
def _lazy_load(module, cls):
"""Clone of upstream opentelemetry.instrumentation.botocore.extensions.lazy_load
Expand Down Expand Up @@ -265,3 +391,6 @@ def extract_attributes(self, attributes: _AttributeMapT):
stream_name = self._call_context.params.get("StreamName")
if stream_name:
attributes[AWS_KINESIS_STREAM_NAME] = stream_name
stream_arn = self._call_context.params.get("StreamARN")
if stream_arn:
attributes[AWS_KINESIS_STREAM_ARN] = stream_arn
Loading