Skip to content

Commit d826243

Browse files
benjipelletierfacebook-github-bot
authored andcommitted
Make tracer more integrated with Python actors (#490)
Summary: This diff does a few things * removes enter_span, and exit_span in favor for tracer based span creation * Adds ability to refer to tracer from actor `self.tracer` * Adds includable attribute `actor_id` from python side. If not included, rust actor name will be used (on further inspection those seem to be the same, but oh well) Differential Revision: D78113741
1 parent 2d369e1 commit d826243

File tree

4 files changed

+54
-103
lines changed

4 files changed

+54
-103
lines changed

monarch_hyperactor/src/telemetry.rs

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -8,45 +8,12 @@
88

99
#![allow(unsafe_op_in_unsafe_fn)]
1010

11-
use std::cell::Cell;
12-
1311
use hyperactor::clock::ClockKind;
1412
use hyperactor::clock::RealClock;
1513
use hyperactor::clock::SimClock;
1614
use hyperactor_telemetry::swap_telemetry_clock;
1715
use pyo3::prelude::*;
1816
use pyo3::types::PyTraceback;
19-
use tracing::span::EnteredSpan;
20-
// Thread local to store the current span
21-
thread_local! {
22-
static ACTIVE_ACTOR_SPAN: Cell<Option<EnteredSpan>> = const { Cell::new(None) };
23-
}
24-
25-
/// Enter the span stored in the thread local
26-
#[pyfunction]
27-
pub fn enter_span(module_name: String, method_name: String, actor_id: String) -> PyResult<()> {
28-
let mut maybe_span = ACTIVE_ACTOR_SPAN.take();
29-
if maybe_span.is_none() {
30-
maybe_span = Some(
31-
tracing::info_span!(
32-
"py_actor_method",
33-
name = method_name,
34-
target = module_name,
35-
actor_id = actor_id
36-
)
37-
.entered(),
38-
);
39-
}
40-
ACTIVE_ACTOR_SPAN.set(maybe_span);
41-
Ok(())
42-
}
43-
44-
/// Exit the span stored in the thread local
45-
#[pyfunction]
46-
pub fn exit_span() -> PyResult<()> {
47-
ACTIVE_ACTOR_SPAN.replace(None);
48-
Ok(())
49-
}
5017

5118
/// Get the current span ID from the active span
5219
#[pyfunction]
@@ -122,8 +89,17 @@ struct PySpan {
12289
#[pymethods]
12390
impl PySpan {
12491
#[new]
125-
fn new(name: &str) -> Self {
126-
let span = tracing::span!(tracing::Level::DEBUG, "python.span", name = name);
92+
fn new(name: &str, actor_id: Option<&str>) -> Self {
93+
let span = if let Some(actor_id) = actor_id {
94+
tracing::span!(
95+
tracing::Level::DEBUG,
96+
"python.span",
97+
name = name,
98+
actor_id = actor_id
99+
)
100+
} else {
101+
tracing::span!(tracing::Level::DEBUG, "python.span", name = name)
102+
};
127103
let entered_span = span.entered();
128104

129105
Self { span: entered_span }
@@ -147,20 +123,6 @@ pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
147123
module.add_function(f)?;
148124

149125
// Register the span-related functions
150-
let enter_span_fn = wrap_pyfunction!(enter_span, module)?;
151-
enter_span_fn.setattr(
152-
"__module__",
153-
"monarch._rust_bindings.monarch_hyperactor.telemetry",
154-
)?;
155-
module.add_function(enter_span_fn)?;
156-
157-
let exit_span_fn = wrap_pyfunction!(exit_span, module)?;
158-
exit_span_fn.setattr(
159-
"__module__",
160-
"monarch._rust_bindings.monarch_hyperactor.telemetry",
161-
)?;
162-
module.add_function(exit_span_fn)?;
163-
164126
let get_current_span_id_fn = wrap_pyfunction!(get_current_span_id, module)?;
165127
get_current_span_id_fn.setattr(
166128
"__module__",

python/monarch/_rust_bindings/monarch_hyperactor/telemetry.pyi

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,35 +28,6 @@ def forward_to_tracing(record: logging.LogRecord) -> None:
2828
"""
2929
...
3030

31-
def enter_span(module_name: str, method_name: str, actor_id: str) -> None:
32-
"""
33-
Enter a tracing span for a Python actor method.
34-
35-
Creates and enters a new tracing span for the current thread that tracks
36-
execution of a Python actor method. The span is stored in thread-local
37-
storage and will be active until exit_span() is called.
38-
39-
If a span is already active for the current thread, this function will
40-
preserve that span and not create a new one.
41-
42-
Args:
43-
- module_name (str): The name of the module containing the actor (used as the target).
44-
- method_name (str): The name of the method being called (used as the span name).
45-
- actor_id (str): The ID of the actor instance (included as a field in the span).
46-
"""
47-
...
48-
49-
def exit_span() -> None:
50-
"""
51-
Exit the current tracing span for a Python actor method.
52-
53-
Exits and drops the tracing span that was previously created by enter_span().
54-
This should be called when the actor method execution is complete.
55-
56-
If no span is currently active for this thread, this function has no effect.
57-
"""
58-
...
59-
6031
def get_current_span_id() -> int:
6132
"""
6233
Get the current span ID from the active span.
@@ -87,12 +58,14 @@ def use_sim_clock() -> None:
8758
...
8859

8960
class PySpan:
90-
def __init__(self, name: str) -> None:
61+
def __init__(self, name: str, actor_id: str | None = None) -> None:
9162
"""
9263
Create a new PySpan.
9364
9465
Args:
9566
- name (str): The name of the span.
67+
- actor_id (str | None, optional): The actor ID associated with the span.
68+
If None, Rust will handle actor identification automatically.
9669
"""
9770
...
9871

@@ -101,3 +74,13 @@ class PySpan:
10174
Exit the span.
10275
"""
10376
...
77+
78+
@property
79+
def actor_id(self) -> str | None:
80+
"""
81+
Get the actor ID associated with this span.
82+
83+
Returns:
84+
- str | None: The actor ID, or None if not set.
85+
"""
86+
...

python/monarch/_src/actor/actor_mesh.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,19 @@
5555
from monarch._rust_bindings.monarch_hyperactor.proc import ActorId
5656
from monarch._rust_bindings.monarch_hyperactor.shape import Point as HyPoint, Shape
5757

58-
from monarch._rust_bindings.monarch_hyperactor.telemetry import enter_span, exit_span
5958
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator
6059
from monarch._src.actor.future import Future
6160
from monarch._src.actor.pdb_wrapper import PdbWrapper
6261

6362
from monarch._src.actor.pickle import flatten, unpickle
6463

6564
from monarch._src.actor.shape import MeshTrait, NDSlice
65+
from monarch._src.actor.telemetry.rust_span_tracing import get_monarch_tracer
6666

6767
logger: logging.Logger = logging.getLogger(__name__)
6868

69+
TRACER = get_monarch_tracer()
70+
6971
Allocator = ProcessAllocator | LocalAllocator
7072

7173
try:
@@ -668,31 +670,28 @@ async def handle(
668670
if inspect.iscoroutinefunction(the_method):
669671

670672
async def instrumented():
671-
enter_span(
672-
the_method.__module__,
673+
with TRACER.start_as_current_span(
673674
message.method,
674-
str(ctx.mailbox.actor_id),
675-
)
676-
try:
677-
result = await the_method(self.instance, *args, **kwargs)
678-
self._maybe_exit_debugger()
679-
except Exception as e:
680-
logging.critical(
681-
"Unhandled exception in actor endpoint",
682-
exc_info=e,
683-
)
684-
raise e
685-
exit_span()
686-
return result
675+
attributes={"actor_id": str(ctx.mailbox.actor_id)},
676+
):
677+
try:
678+
result = await the_method(self.instance, *args, **kwargs)
679+
self._maybe_exit_debugger()
680+
except Exception as e:
681+
logging.critical(
682+
"Unhandled exception in actor endpoint",
683+
exc_info=e,
684+
)
685+
raise e
686+
return result
687687

688688
result = await instrumented()
689689
else:
690-
enter_span(
691-
the_method.__module__, message.method, str(ctx.mailbox.actor_id)
692-
)
693-
result = the_method(self.instance, *args, **kwargs)
694-
self._maybe_exit_debugger()
695-
exit_span()
690+
with TRACER.start_as_current_span(
691+
message.method, attributes={"actor_id": str(ctx.mailbox.actor_id)}
692+
):
693+
result = the_method(self.instance, *args, **kwargs)
694+
self._maybe_exit_debugger()
696695

697696
if port is not None:
698697
port.send("result", result)
@@ -759,6 +758,10 @@ def logger(cls) -> logging.Logger:
759758
lgr.setLevel(logging.DEBUG)
760759
return lgr
761760

761+
@property
762+
def tracer(self):
763+
return TRACER
764+
762765
@property
763766
def _ndslice(self) -> NDSlice:
764767
raise NotImplementedError(

python/monarch/_src/actor/telemetry/rust_span_tracing.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ def start_span(
8585
record_exception: bool = True,
8686
set_status_on_exception: bool = True,
8787
) -> trace.Span:
88-
return SpanWrapper(name)
88+
actor_id = str(attributes.get("actor_id")) if attributes else None
89+
return SpanWrapper(name, actor_id)
8990

9091
@contextmanager
9192
# pyre-fixme[15]: `start_as_current_span` overrides method defined in `Tracer`
@@ -102,7 +103,9 @@ def start_as_current_span(
102103
set_status_on_exception: bool = True,
103104
end_on_exit: bool = True,
104105
) -> Iterator[trace.Span]:
105-
with SpanWrapper(name) as s:
106+
actor_id = str(attributes.get("actor_id")) if attributes else None
107+
108+
with SpanWrapper(name, actor_id) as s:
106109
with trace.use_span(s):
107110
yield s
108111

0 commit comments

Comments
 (0)