Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5c9eeb2
WIP Changes for adding Argo Workflow Id to Logs
saig214 Dec 3, 2025
75a9646
Changes to accomodate argo name and node
saig214 Dec 9, 2025
b510358
Resolve Merge Conflict
saig214 Dec 9, 2025
fffc536
Refactored Logging method
saig214 Dec 9, 2025
d66e301
Merge remote-tracking branch 'origin/main' into LH-88
saig214 Dec 9, 2025
3c1dbd9
Pre-commit fixes
saig214 Dec 10, 2025
8396c9d
Removed redundant check
saig214 Dec 10, 2025
f52f10d
Changes to incorporate generic headers + refactors
saig214 Dec 10, 2025
bc41d51
refactor: extract shared ContextVars to break circular imports
saig214 Dec 10, 2025
665f702
refactor: update correlation string handling in AtlanLoggerAdapter
saig214 Dec 10, 2025
db2af9e
pre-commit formatting
saig214 Dec 10, 2025
a8bd523
feat: add CorrelationContextInterceptor for atlan-* field propagation…
saig214 Dec 11, 2025
9a09709
refactor: simplify docstrings in correlation context interceptors
saig214 Dec 11, 2025
2e957ab
feat: enhance correlation context interceptors to include trace_id in…
saig214 Dec 11, 2025
2f3361e
feat: update observability context to support optional trace_id and e…
saig214 Dec 11, 2025
6b63acb
Update temporal.py
saig214 Dec 11, 2025
8699dde
fix: add automatic gRPC message size limit handling for large file up…
SanilK2108 Dec 11, 2025
7f9f08b
Bump version to 1.0.4 (#884)
atlan-ci Dec 11, 2025
d41d160
chore: update readme and other docs (#885)
junaidrahim Dec 11, 2025
9537279
Merge branch 'main' into LH-88
saig214 Dec 12, 2025
f105d0a
Update correlation_context.py
saig214 Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions application_sdk/activities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ async def get_workflow_args(
)
workflow_args["workflow_id"] = workflow_id
workflow_args["workflow_run_id"] = get_workflow_run_id()

# Preserve atlan- prefixed keys from workflow_config for logging context
for key, value in workflow_config.items():
if key.startswith("atlan-") and value:
workflow_args[key] = str(value)

return workflow_args

except Exception as e:
Expand Down
4 changes: 4 additions & 0 deletions application_sdk/clients/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
WorkerTokenRefreshEventData,
)
from application_sdk.interceptors.cleanup import CleanupInterceptor, cleanup
from application_sdk.interceptors.correlation_context import (
CorrelationContextInterceptor,
)
from application_sdk.interceptors.events import EventInterceptor, publish_event
from application_sdk.interceptors.lock import RedisLockInterceptor
from application_sdk.observability.logger_adaptor import get_logger
Expand Down Expand Up @@ -430,6 +433,7 @@ def create_worker(
max_concurrent_activities=max_concurrent_activities,
activity_executor=activity_executor,
interceptors=[
CorrelationContextInterceptor(),
EventInterceptor(),
CleanupInterceptor(),
RedisLockInterceptor(activities_dict),
Expand Down
143 changes: 143 additions & 0 deletions application_sdk/interceptors/correlation_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Correlation context interceptor for Temporal workflows.

Propagates atlan-* correlation context fields from workflow arguments to activities
via Temporal headers, ensuring all activity logs include correlation identifiers
"""

from dataclasses import replace
from typing import Any, Dict, Optional, Type

from temporalio import workflow
from temporalio.api.common.v1 import Payload
from temporalio.converter import default as default_converter
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
StartActivityInput,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
WorkflowOutboundInterceptor,
)

from application_sdk.observability.context import correlation_context
from application_sdk.observability.logger_adaptor import get_logger

logger = get_logger(__name__)

ATLAN_HEADER_PREFIX = "atlan-"


class CorrelationContextOutboundInterceptor(WorkflowOutboundInterceptor):
"""Outbound interceptor that injects atlan-* context into activity headers."""

def __init__(
self,
next: WorkflowOutboundInterceptor,
inbound: "CorrelationContextWorkflowInboundInterceptor",
):
"""Initialize the outbound interceptor."""
super().__init__(next)
self.inbound = inbound

def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle[Any]:
"""Inject atlan-* headers and trace_id into activity calls."""
try:
if self.inbound.correlation_data:
new_headers: Dict[str, Payload] = dict(input.headers)
payload_converter = default_converter().payload_converter

for key, value in self.inbound.correlation_data.items():
# Include atlan-* prefixed headers and trace_id
if (
key.startswith(ATLAN_HEADER_PREFIX) or key == "trace_id"
) and value:
payload = payload_converter.to_payload(value)
new_headers[key] = payload

input = replace(input, headers=new_headers)
except Exception as e:
logger.warning(f"Failed to inject correlation context headers: {e}")

return self.next.start_activity(input)


class CorrelationContextWorkflowInboundInterceptor(WorkflowInboundInterceptor):
"""Inbound workflow interceptor that extracts atlan-* context from workflow args."""

def __init__(self, next: WorkflowInboundInterceptor):
"""Initialize the inbound interceptor."""
super().__init__(next)
self.correlation_data: Dict[str, str] = {}

def init(self, outbound: WorkflowOutboundInterceptor) -> None:
"""Initialize with correlation context outbound interceptor."""
context_outbound = CorrelationContextOutboundInterceptor(outbound, self)
super().init(context_outbound)

async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
"""Execute workflow and extract atlan-* fields and trace_id from arguments."""
try:
if input.args and len(input.args) > 0:
workflow_config = input.args[0]
if isinstance(workflow_config, dict):
# Extract atlan-* prefixed fields
self.correlation_data = {
k: str(v)
for k, v in workflow_config.items()
if k.startswith(ATLAN_HEADER_PREFIX) and v
}
# Extract trace_id separately (not atlan- prefixed)
trace_id = workflow_config.get("trace_id", "")
if trace_id:
self.correlation_data["trace_id"] = str(trace_id)
if self.correlation_data:
correlation_context.set(self.correlation_data)
except Exception as e:
logger.warning(f"Failed to extract correlation context from args: {e}")

return await super().execute_workflow(input)


class CorrelationContextActivityInboundInterceptor(ActivityInboundInterceptor):
"""Activity interceptor that reads atlan-* headers and trace_id, sets correlation_context."""

async def execute_activity(self, input: ExecuteActivityInput) -> Any:
"""Execute activity after extracting atlan-* headers and trace_id."""
try:
atlan_fields: Dict[str, str] = {}
payload_converter = default_converter().payload_converter

for key, payload in input.headers.items():
# Extract atlan-* prefixed headers and trace_id
if key.startswith(ATLAN_HEADER_PREFIX) or key == "trace_id":
value = payload_converter.from_payload(payload, type_hint=str)
atlan_fields[key] = value

if atlan_fields:
correlation_context.set(atlan_fields)

except Exception as e:
logger.warning(f"Failed to extract correlation context from headers: {e}")

return await super().execute_activity(input)


class CorrelationContextInterceptor(Interceptor):
"""Main interceptor for propagating atlan-* correlation context.

Ensures atlan-* fields are propagated from workflow arguments to all activities via Temporal headers.
"""

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
"""Get the workflow interceptor class."""
return CorrelationContextWorkflowInboundInterceptor

def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""Intercept activity executions to read correlation context."""
return CorrelationContextActivityInboundInterceptor(next)
2 changes: 0 additions & 2 deletions application_sdk/interceptors/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ async def execute_activity(self, input: ExecuteActivityInput) -> Any:
Returns:
Any: The result of the activity execution.
"""
# Extract activity information for tracking

start_event = Event(
event_type=EventTypes.APPLICATION_EVENT.value,
event_name=ApplicationEventNames.ACTIVITY_START.value,
Expand Down
18 changes: 18 additions & 0 deletions application_sdk/observability/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Shared context variables for observability.

This module contains ContextVar definitions that are shared across
multiple observability modules to avoid circular imports.
"""

from contextvars import ContextVar
from typing import Any, Dict

# Context variable for request-scoped data (e.g., request_id from HTTP middleware)
request_context: ContextVar[Dict[str, Any] | None] = ContextVar(
"request_context", default=None
)

# Context variable for correlation context (atlan- prefixed headers for distributed tracing)
correlation_context: ContextVar[Dict[str, Any] | None] = ContextVar(
"correlation_context", default=None
)
Loading
Loading