Skip to content

Commit ccf6e47

Browse files
: actor: port receiver monitoring is made non-optional
Differential Revision: D78528860
1 parent d4ddd12 commit ccf6e47

File tree

4 files changed

+87
-160
lines changed

4 files changed

+87
-160
lines changed

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,33 @@ impl PythonActorMesh {
190190
.map(PyActorId::from))
191191
}
192192

193+
// Return a monitored port receiver.
194+
fn supervise_port<'py>(&self, py: Python<'py> , receiver: &PythonPortReceiver) -> PyResult<PyObject> {
195+
let monitor = self._monitor(py)?;
196+
let rx = MonitoredPythonPortReceiver {
197+
inner: receiver.inner(),
198+
monitor,
199+
};
200+
Ok(rx.into_py(py))
201+
}
202+
203+
fn supervise_once_port<'py>(&self, py: Python<'py> , receiver: &PythonOncePortReceiver) -> PyResult<PyObject> {
204+
let monitor = self._monitor(py)?;
205+
let rx = MonitoredPythonOncePortReceiver {
206+
inner: receiver.inner(),
207+
monitor,
208+
};
209+
Ok(rx.into_py(py))
210+
}
211+
193212
// Start monitoring the actor mesh by subscribing to its supervision events. For each supervision
194213
// event, it is consumed by PythonActorMesh first, then gets sent to the monitor for user to consume.
195-
fn monitor<'py>(&self, py: Python<'py>) -> PyResult<PyObject> {
214+
fn _monitor<'py>(&self, py: Python<'py>) -> PyResult<PyActorMeshMonitor> {
196215
let receiver = self.user_monitor_sender.subscribe();
197216
let monitor_instance = PyActorMeshMonitor {
198217
receiver: SharedCell::from(Mutex::new(receiver)),
199218
};
200-
Ok(monitor_instance.into_py(py))
219+
Ok(monitor_instance)
201220
}
202221

203222
#[pyo3(signature = (**kwargs))]
@@ -353,21 +372,21 @@ impl Drop for PythonActorMesh {
353372
name = "ActorMeshMonitor",
354373
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
355374
)]
356-
pub struct PyActorMeshMonitor {
375+
struct PyActorMeshMonitor {
357376
receiver: SharedCell<Mutex<tokio::sync::broadcast::Receiver<Option<ActorSupervisionEvent>>>>,
358377
}
359378

360-
#[pymethods]
361-
impl PyActorMeshMonitor {
362-
fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
363-
slf
364-
}
379+
// #[pymethods]
380+
// impl PyActorMeshMonitor {
381+
// fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
382+
// slf
383+
// }
365384

366-
pub fn __anext__(&self, py: Python<'_>) -> PyResult<PyObject> {
367-
let receiver = self.receiver.clone();
368-
Ok(pyo3_async_runtimes::tokio::future_into_py(py, get_next(receiver))?.into())
369-
}
370-
}
385+
// pub fn __anext__(&self, py: Python<'_>) -> PyResult<PyObject> {
386+
// let receiver = self.receiver.clone();
387+
// Ok(pyo3_async_runtimes::tokio::future_into_py(py, get_next(receiver))?.into())
388+
// }
389+
// }
371390

372391
impl PyActorMeshMonitor {
373392
pub async fn next(&self) -> PyResult<PyObject> {
@@ -407,28 +426,24 @@ async fn get_next(
407426
Ok(Python::with_gil(|py| supervision_event.into_py(py)))
408427
}
409428

410-
// TODO(albertli): this is temporary remove this when pushing all supervision logic to rust.
429+
// Values of this (private) type can only be created by calling
430+
// `PythonActorMesh::supervise_port()`.
411431
#[pyclass(
412432
name = "MonitoredPortReceiver",
413433
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
414434
)]
415-
pub(super) struct MonitoredPythonPortReceiver {
435+
struct MonitoredPythonPortReceiver {
416436
inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
417437
monitor: PyActorMeshMonitor,
418438
}
419439

420440
#[pymethods]
421441
impl MonitoredPythonPortReceiver {
422-
#[new]
423-
fn new(receiver: &PythonPortReceiver, monitor: &PyActorMeshMonitor) -> Self {
424-
let inner = receiver.inner();
425-
MonitoredPythonPortReceiver {
426-
inner,
427-
monitor: monitor.clone(),
428-
}
442+
fn __repr__(&self) -> &'static str {
443+
"<MonitoredPortReceiver>"
429444
}
430445

431-
fn recv<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
446+
fn recv<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
432447
let receiver = self.inner.clone();
433448
let monitor = self.monitor.clone();
434449
pyo3_async_runtimes::tokio::future_into_py(py, async move {
@@ -461,24 +476,21 @@ impl MonitoredPythonPortReceiver {
461476
}
462477
}
463478

479+
// Values of this (private) type can only be created by calling
480+
// `PythonActorMesh::supervise_once_port()`.
464481
#[pyclass(
465482
name = "MonitoredOncePortReceiver",
466483
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
467484
)]
468-
pub(super) struct MonitoredPythonOncePortReceiver {
485+
struct MonitoredPythonOncePortReceiver {
469486
inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
470487
monitor: PyActorMeshMonitor,
471488
}
472489

473490
#[pymethods]
474491
impl MonitoredPythonOncePortReceiver {
475-
#[new]
476-
fn new(receiver: &PythonOncePortReceiver, monitor: &PyActorMeshMonitor) -> Self {
477-
let inner = receiver.inner();
478-
MonitoredPythonOncePortReceiver {
479-
inner,
480-
monitor: monitor.clone(),
481-
}
492+
fn __repr__(&self) -> &'static str {
493+
"<MonitoredOncePortReceiver>"
482494
}
483495

484496
fn recv<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {

python/monarch/_rust_bindings/monarch_hyperactor/actor_mesh.pyi

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,15 @@ class PythonActorMesh:
8888
"""
8989
...
9090

91-
# TODO(albertli): remove this when pushing all supervision logic to Rust
92-
def monitor(self) -> ActorMeshMonitor:
91+
def supervise_port(self, r: PortReceiver) -> MonitoredPortReceiver:
9392
"""
94-
Returns a supervision monitor for this mesh.
93+
Return a monitored port receiver.
94+
"""
95+
...
96+
97+
def supervise_once_port(self, r: OncePortReceiver) -> MonitoredOncePortReceiver:
98+
"""
99+
Return a monitored once port receiver.
95100
"""
96101
...
97102

@@ -113,31 +118,11 @@ class PythonActorMesh:
113118
"""
114119
...
115120

116-
@final
117-
class ActorMeshMonitor:
118-
def __aiter__(self) -> AsyncIterator["ActorSupervisionEvent"]:
119-
"""
120-
Returns an async iterator for this monitor.
121-
"""
122-
...
123-
124-
async def __anext__(self) -> "ActorSupervisionEvent":
125-
"""
126-
Returns the next proc event in the proc mesh.
127-
"""
128-
...
129-
130121
@final
131122
class MonitoredPortReceiver:
123+
"""A monitored receiver to which PythonMessages are sent. Values
124+
of this type cannot be constructed directly in Python.
132125
"""
133-
A monitored receiver to which PythonMessages are sent.
134-
"""
135-
136-
def __init__(self, receiver: PortReceiver, monitor: ActorMeshMonitor) -> None:
137-
"""
138-
Create a new monitored receiver from a PortReceiver.
139-
"""
140-
...
141126

142127
async def recv(self) -> PythonMessage:
143128
"""Receive a PythonMessage from the port's sender."""
@@ -148,15 +133,9 @@ class MonitoredPortReceiver:
148133

149134
@final
150135
class MonitoredOncePortReceiver:
136+
"""A monitored once receiver to which PythonMessages are sent.
137+
Values of this type cannot be constructed directly in Python.
151138
"""
152-
A variant of monitored PortReceiver that can only receive a single message.
153-
"""
154-
155-
def __init__(self, receiver: OncePortReceiver, monitor: ActorMeshMonitor) -> None:
156-
"""
157-
Create a new monitored receiver from a PortReceiver.
158-
"""
159-
...
160139

161140
async def recv(self) -> PythonMessage:
162141
"""Receive a single PythonMessage from the port's sender."""

0 commit comments

Comments
 (0)