Skip to content

: actor: port receiver monitoring is made non-optional #578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 60 additions & 91 deletions monarch_hyperactor/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use pyo3::exceptions::PyEOFError;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyNotImplementedError;
use pyo3::exceptions::PyRuntimeError;
use pyo3::exceptions::PyTypeError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
Expand Down Expand Up @@ -192,14 +193,28 @@ impl PythonActorMesh {
.map(PyActorId::from))
}

// Start monitoring the actor mesh by subscribing to its supervision events. For each supervision
// event, it is consumed by PythonActorMesh first, then gets sent to the monitor for user to consume.
fn monitor<'py>(&self, py: Python<'py>) -> PyResult<PyObject> {
let receiver = self.user_monitor_sender.subscribe();
let monitor_instance = PyActorMeshMonitor {
receiver: SharedCell::from(Mutex::new(receiver)),
};
monitor_instance.into_py_any(py)
fn supervise(&self, py: Python<'_>, receiver: Bound<'_, PyAny>) -> PyResult<PyObject> {
if let Ok(r) = receiver.extract::<PyRef<PythonPortReceiver>>() {
let rx = SupervisedPythonPortReceiver {
inner: r.inner(),
monitor: ActorMeshMonitor {
receiver: SharedCell::from(Mutex::new(self.user_monitor_sender.subscribe())),
},
};
rx.into_py_any(py)
} else if let Ok(r) = receiver.extract::<PyRef<PythonOncePortReceiver>>() {
let rx = SupervisedPythonOncePortReceiver {
inner: r.inner(),
monitor: ActorMeshMonitor {
receiver: SharedCell::from(Mutex::new(self.user_monitor_sender.subscribe())),
},
};
rx.into_py_any(py)
} else {
Err(PyTypeError::new_err(
"Expected a PortReceiver or OncePortReceiver",
))
}
}

#[pyo3(signature = (**kwargs))]
Expand Down Expand Up @@ -374,84 +389,46 @@ impl Drop for PythonActorMesh {
}
}

#[pyclass(
name = "ActorMeshMonitor",
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
)]
pub struct PyActorMeshMonitor {
#[derive(Debug, Clone)]
struct ActorMeshMonitor {
receiver: SharedCell<Mutex<tokio::sync::broadcast::Receiver<Option<ActorSupervisionEvent>>>>,
}

#[pymethods]
impl PyActorMeshMonitor {
fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}

pub fn __anext__(&self, py: Python<'_>) -> PyResult<PyObject> {
impl ActorMeshMonitor {
pub async fn next(&self) -> PyActorSupervisionEvent {
let receiver = self.receiver.clone();
Ok(pyo3_async_runtimes::tokio::future_into_py(py, get_next(receiver))?.into())
}
}

impl PyActorMeshMonitor {
pub async fn next(&self) -> PyResult<PyObject> {
get_next(self.receiver.clone()).await
}
}

impl Clone for PyActorMeshMonitor {
fn clone(&self) -> Self {
Self {
receiver: self.receiver.clone(),
let receiver = receiver
.borrow()
.expect("`Actor mesh receiver` is shutdown");
let mut receiver = receiver.lock().await;
let event = receiver.recv().await.unwrap();
match event {
None => PyActorSupervisionEvent {
// Dummy actor as place holder to indicate the whole mesh is stopped
// TODO(albertli): remove this when pushing all supervision logic to rust.
actor_id: id!(default[0].actor[0]).into(),
actor_status: "actor mesh is stopped due to proc mesh shutdown".to_string(),
},
Some(event) => PyActorSupervisionEvent::from(event.clone()),
}
}
}

async fn get_next(
receiver: SharedCell<Mutex<tokio::sync::broadcast::Receiver<Option<ActorSupervisionEvent>>>>,
) -> PyResult<PyObject> {
let receiver = receiver.clone();

let receiver = receiver
.borrow()
.expect("`Actor mesh receiver` is shutdown");
let mut receiver = receiver.lock().await;
let event = receiver.recv().await.unwrap();

let supervision_event = match event {
None => PyActorSupervisionEvent {
// Dummy actor as place holder to indicate the whole mesh is stopped
// TODO(albertli): remove this when pushing all supervision logic to rust.
actor_id: id!(default[0].actor[0]).into(),
actor_status: "actor mesh is stopped due to proc mesh shutdown".to_string(),
},
Some(event) => PyActorSupervisionEvent::from(event.clone()),
};
tracing::info!("recv supervision event: {supervision_event:?}");

Python::with_gil(|py| supervision_event.into_py_any(py))
}

// TODO(albertli): this is temporary remove this when pushing all supervision logic to rust.
// Values of this type can only be created by calling
// `PythonActorMesh::supervise()`.
#[pyclass(
name = "MonitoredPortReceiver",
name = "SupervisedPortReceiver",
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
)]
pub(super) struct MonitoredPythonPortReceiver {
struct SupervisedPythonPortReceiver {
inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
monitor: PyActorMeshMonitor,
monitor: ActorMeshMonitor,
}

#[pymethods]
impl MonitoredPythonPortReceiver {
#[new]
fn new(receiver: &PythonPortReceiver, monitor: &PyActorMeshMonitor) -> Self {
let inner = receiver.inner();
MonitoredPythonPortReceiver {
inner,
monitor: monitor.clone(),
}
impl SupervisedPythonPortReceiver {
fn __repr__(&self) -> &'static str {
"<SupervisedPortReceiver>"
}

fn recv_task(&mut self) -> PyPythonTask {
Expand All @@ -464,10 +441,8 @@ impl MonitoredPythonPortReceiver {
result.map_err(|err| PyErr::new::<PyEOFError, _>(format!("port closed: {}", err)))
}
event = monitor.next() => {
let event = event.expect("supervision event should not be None");
Python::with_gil(|py| {
let e = event.downcast_bound::<PyActorSupervisionEvent>(py)?;
Err(PyErr::new::<SupervisionError, _>(format!("supervision error: {:?}", e)))
Err(PyErr::new::<SupervisionError, _>(format!("supervision error: {:?}", event)))
})
}
};
Expand All @@ -476,24 +451,21 @@ impl MonitoredPythonPortReceiver {
}
}

// Values of this type can only be created by calling
// `PythonActorMesh::supervise()`.
#[pyclass(
name = "MonitoredOncePortReceiver",
name = "SupervisedOncePortReceiver",
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
)]
pub(super) struct MonitoredPythonOncePortReceiver {
struct SupervisedPythonOncePortReceiver {
inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
monitor: PyActorMeshMonitor,
monitor: ActorMeshMonitor,
}

#[pymethods]
impl MonitoredPythonOncePortReceiver {
#[new]
fn new(receiver: &PythonOncePortReceiver, monitor: &PyActorMeshMonitor) -> Self {
let inner = receiver.inner();
MonitoredPythonOncePortReceiver {
inner,
monitor: monitor.clone(),
}
impl SupervisedPythonOncePortReceiver {
fn __repr__(&self) -> &'static str {
"<SupervisedOncePortReceiver>"
}

fn recv_task(&mut self) -> PyResult<PyPythonTask> {
Expand All @@ -507,10 +479,8 @@ impl MonitoredPythonOncePortReceiver {
result.map_err(|err| PyErr::new::<PyEOFError, _>(format!("port closed: {}", err)))
}
event = monitor.next() => {
let event = event.expect("supervision event should not be None");
Python::with_gil(|py| {
let e = event.downcast_bound::<PyActorSupervisionEvent>(py)?;
Err(PyErr::new::<SupervisionError, _>(format!("supervision error: {:?}", e)))
Err(PyErr::new::<SupervisionError, _>(format!("supervision error: {:?}", event)))
})
}
};
Expand Down Expand Up @@ -556,9 +526,8 @@ impl From<ActorSupervisionEvent> for PyActorSupervisionEvent {
pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
hyperactor_mod.add_class::<PythonActorMesh>()?;
hyperactor_mod.add_class::<PythonActorMeshRef>()?;
hyperactor_mod.add_class::<PyActorMeshMonitor>()?;
hyperactor_mod.add_class::<MonitoredPythonPortReceiver>()?;
hyperactor_mod.add_class::<MonitoredPythonOncePortReceiver>()?;
hyperactor_mod.add_class::<SupervisedPythonPortReceiver>()?;
hyperactor_mod.add_class::<SupervisedPythonOncePortReceiver>()?;
hyperactor_mod.add_class::<PyActorSupervisionEvent>()?;
Ok(())
}
53 changes: 17 additions & 36 deletions python/monarch/_rust_bindings/monarch_hyperactor/actor_mesh.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,17 @@ class PythonActorMesh:
"""
...

# TODO(albertli): remove this when pushing all supervision logic to Rust
def monitor(self) -> ActorMeshMonitor:
"""
Returns a supervision monitor for this mesh.
def supervise(
self, r: PortReceiver | OncePortReceiver
) -> SupervisedPortReceiver | SupervisedOncePortReceiver:
"""Return a monitored port receiver.

A monitored port receiver behaves like a regular port receiver
but also observes the health of the actor mesh associated with
the sender. If the actor mesh becomes unhealthy, the receiver
will yield a supervision error instead of waiting indefinitely
for a message.

"""
...

Expand Down Expand Up @@ -129,42 +136,16 @@ class PythonActorMesh:
...

@final
class ActorMeshMonitor:
def __aiter__(self) -> AsyncIterator["ActorSupervisionEvent"]:
"""
Returns an async iterator for this monitor.
"""
...

async def __anext__(self) -> "ActorSupervisionEvent":
"""
Returns the next proc event in the proc mesh.
"""
...

@final
class MonitoredPortReceiver(PortReceiverBase):
"""
A monitored receiver to which PythonMessages are sent.
class SupervisedPortReceiver(PortReceiverBase):
"""A monitored receiver to which PythonMessages are sent. Values
of this type cannot be constructed directly in Python.
"""

def __init__(self, receiver: PortReceiver, monitor: ActorMeshMonitor) -> None:
"""
Create a new monitored receiver from a PortReceiver.
"""
...

@final
class MonitoredOncePortReceiver(PortReceiverBase):
class SupervisedOncePortReceiver(PortReceiverBase):
"""A monitored once receiver to which PythonMessages are sent.
Values of this type cannot be constructed directly in Python.
"""
A variant of monitored PortReceiver that can only receive a single message.
"""

def __init__(self, receiver: OncePortReceiver, monitor: ActorMeshMonitor) -> None:
"""
Create a new monitored receiver from a PortReceiver.
"""
...

@final
class ActorSupervisionEvent:
Expand Down
49 changes: 17 additions & 32 deletions python/monarch/_src/actor/actor_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,12 @@
PythonMessage,
PythonMessageKind,
)
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import (
ActorMeshMonitor,
MonitoredOncePortReceiver,
MonitoredPortReceiver,
PythonActorMesh,
)
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh
from monarch._rust_bindings.monarch_hyperactor.mailbox import (
Mailbox,
OncePortReceiver,
OncePortRef,
PortReceiver as HyPortReceiver,
PortRef,
)

Expand Down Expand Up @@ -319,6 +315,9 @@ def _send(
def _port(self, once: bool = False) -> "PortTuple[R]":
pass

def _supervise(self, r: HyPortReceiver | OncePortReceiver) -> Any:
return r

# the following are all 'adverbs' or different ways to handle the
# return values of this endpoint. Adverbs should only ever take *args, **kwargs
# of the original call. If we want to add syntax sugar for something that needs additional
Expand Down Expand Up @@ -401,6 +400,10 @@ def __init__(
self._signature: inspect.Signature = inspect.signature(impl)
self._mailbox = mailbox

def _supervise(self, r: HyPortReceiver | OncePortReceiver) -> Any:
mesh = self._actor_mesh._actor_mesh
return r if mesh is None else mesh.supervise(r)

def _send(
self,
args: Tuple[Any, ...],
Expand Down Expand Up @@ -432,12 +435,12 @@ def _send(
return Extent(shape.labels, shape.ndslice.sizes)

def _port(self, once: bool = False) -> "PortTuple[R]":
monitor = (
None
if self._actor_mesh._actor_mesh is None
else self._actor_mesh._actor_mesh.monitor()
)
return PortTuple.create(self._mailbox, monitor, once)
p, r = PortTuple.create(self._mailbox, once)
if TYPE_CHECKING:
assert isinstance(
r._receiver, (HyPortReceiver | OncePortReceiver)
), "unexpected receiver type"
return PortTuple(p, PortReceiver(self._mailbox, self._supervise(r._receiver)))


class Accumulator(Generic[P, R, A]):
Expand Down Expand Up @@ -591,18 +594,9 @@ class PortTuple(NamedTuple, Generic[R]):
receiver: "PortReceiver[R]"

@staticmethod
def create(
mailbox: Mailbox, monitor: Optional[ActorMeshMonitor], once: bool = False
) -> "PortTuple[Any]":
def create(mailbox: Mailbox, once: bool = False) -> "PortTuple[Any]":
handle, receiver = mailbox.open_once_port() if once else mailbox.open_port()
port_ref = handle.bind()
if monitor is not None:
receiver = (
MonitoredOncePortReceiver(receiver, monitor)
if isinstance(receiver, OncePortReceiver)
else MonitoredPortReceiver(receiver, monitor)
)

return PortTuple(
Port(port_ref, mailbox, rank=None),
PortReceiver(mailbox, receiver),
Expand All @@ -614,18 +608,9 @@ class PortTuple(NamedTuple):
receiver: "PortReceiver[Any]"

@staticmethod
def create(
mailbox: Mailbox, monitor: Optional[ActorMeshMonitor], once: bool = False
) -> "PortTuple[Any]":
def create(mailbox: Mailbox, once: bool = False) -> "PortTuple[Any]":
handle, receiver = mailbox.open_once_port() if once else mailbox.open_port()
port_ref = handle.bind()
if monitor is not None:
receiver = (
MonitoredOncePortReceiver(receiver, monitor)
if isinstance(receiver, OncePortReceiver)
else MonitoredPortReceiver(receiver, monitor)
)

return PortTuple(
Port(port_ref, mailbox, rank=None),
PortReceiver(mailbox, receiver),
Expand Down
Loading
Loading