Skip to content

Commit 7dd0b25

Browse files
supervision: new enum Unhealthy (#569)
Summary: Pull Request resolved: #569 - introduced new type `supervision::Unhealthy` ```rust #[derive(Debug, Clone)] pub(crate) enum Unhealthy<Event> { SoFarSoGood, // Still healthy StreamClosed, // Event stream closed Crashed(Event), // Bad health event received } ``` - in `PyProcMesh` - replace: - from`Arc<Mutex<Option<Option<ProcEvent>>>>` - to `Arc<Mutex<Unhealthy<ProcEvent>>>` - in `PythonActorMesh` - replace: - from: `Arc<Mutex<Option<Option<ActorSupervisionEvent>>>>` - to: `Arc<Mutex<Unhealthy<ActorSupervisionEvent>>` Reviewed By: pablorfb-meta Differential Revision: D78498195 fbshipit-source-id: 36622496f3ff632e9286ea45592c3032542d449d
1 parent 0aae80c commit 7dd0b25

File tree

3 files changed

+71
-36
lines changed

3 files changed

+71
-36
lines changed

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::proc_mesh::Keepalive;
4646
use crate::selection::PySelection;
4747
use crate::shape::PyShape;
4848
use crate::supervision::SupervisionError;
49+
use crate::supervision::Unhealthy;
4950

5051
#[pyclass(
5152
name = "PythonActorMesh",
@@ -55,7 +56,7 @@ pub struct PythonActorMesh {
5556
inner: SharedCell<RootActorMesh<'static, PythonActor>>,
5657
client: PyMailbox,
5758
_keepalive: Keepalive,
58-
unhealthy_event: Arc<std::sync::Mutex<Option<Option<ActorSupervisionEvent>>>>,
59+
unhealthy_event: Arc<std::sync::Mutex<Unhealthy<ActorSupervisionEvent>>>,
5960
user_monitor_sender: tokio::sync::broadcast::Sender<Option<ActorSupervisionEvent>>,
6061
monitor: tokio::task::JoinHandle<()>,
6162
}
@@ -71,11 +72,11 @@ impl PythonActorMesh {
7172
) -> Self {
7273
let (user_monitor_sender, _) =
7374
tokio::sync::broadcast::channel::<Option<ActorSupervisionEvent>>(1);
74-
let unhealthy_event = Arc::new(std::sync::Mutex::new(None));
75+
let unhealthy_event = Arc::new(std::sync::Mutex::new(Unhealthy::SoFarSoGood));
7576
let monitor = tokio::spawn(Self::actor_mesh_monitor(
7677
events,
7778
user_monitor_sender.clone(),
78-
unhealthy_event.clone(),
79+
Arc::clone(&unhealthy_event),
7980
));
8081
Self {
8182
inner,
@@ -92,15 +93,19 @@ impl PythonActorMesh {
9293
async fn actor_mesh_monitor(
9394
mut events: ActorSupervisionEvents,
9495
user_sender: tokio::sync::broadcast::Sender<Option<ActorSupervisionEvent>>,
95-
unhealthy_event: Arc<std::sync::Mutex<Option<Option<ActorSupervisionEvent>>>>,
96+
unhealthy_event: Arc<std::sync::Mutex<Unhealthy<ActorSupervisionEvent>>>,
9697
) {
9798
loop {
9899
let event = events.next().await;
99100
let mut inner_unhealthy_event = unhealthy_event.lock().unwrap();
100-
*inner_unhealthy_event = Some(event.clone());
101+
match &event {
102+
None => *inner_unhealthy_event = Unhealthy::StreamClosed,
103+
Some(event) => *inner_unhealthy_event = Unhealthy::Crashed(event.clone()),
104+
}
101105

102-
// Ignore the sender error when there is no receiver, which happens when there
103-
// is no active requests to this mesh.
106+
// Ignore the sender error when there is no receiver,
107+
// which happens when there is no active requests to this
108+
// mesh.
104109
let _ = user_sender.send(event.clone());
105110

106111
if event.is_none() {
@@ -132,11 +137,20 @@ impl PythonActorMesh {
132137
.unhealthy_event
133138
.lock()
134139
.expect("failed to acquire unhealthy_event lock");
135-
if let Some(ref event) = *unhealthy_event {
136-
return Err(PyRuntimeError::new_err(format!(
137-
"actor mesh is unhealthy with reason: {:?}",
138-
event
139-
)));
140+
141+
match &*unhealthy_event {
142+
Unhealthy::SoFarSoGood => (),
143+
Unhealthy::Crashed(event) => {
144+
return Err(PyRuntimeError::new_err(format!(
145+
"actor mesh is unhealthy with reason: {:?}",
146+
event
147+
)));
148+
}
149+
Unhealthy::StreamClosed => {
150+
return Err(PyRuntimeError::new_err(
151+
"actor mesh is stopped due to proc mesh shutdown".to_string(),
152+
));
153+
}
140154
}
141155

142156
self.try_inner()?
@@ -156,15 +170,16 @@ impl PythonActorMesh {
156170
.lock()
157171
.expect("failed to acquire unhealthy_event lock");
158172

159-
Ok(unhealthy_event.as_ref().map(|event| match event {
160-
None => PyActorSupervisionEvent {
173+
match &*unhealthy_event {
174+
Unhealthy::SoFarSoGood => Ok(None),
175+
Unhealthy::StreamClosed => Ok(Some(PyActorSupervisionEvent {
161176
// Dummy actor as place holder to indicate the whole mesh is stopped
162177
// TODO(albertli): remove this when pushing all supervision logic to rust.
163178
actor_id: id!(default[0].actor[0]).into(),
164179
actor_status: "actor mesh is stopped due to proc mesh shutdown".to_string(),
165-
},
166-
Some(event) => PyActorSupervisionEvent::from(event.clone()),
167-
}))
180+
})),
181+
Unhealthy::Crashed(event) => Ok(Some(PyActorSupervisionEvent::from(event.clone()))),
182+
}
168183
}
169184

170185
// Consider defining a "PythonActorRef", which carries specifically

monarch_hyperactor/src/proc_mesh.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::mailbox::PyMailbox;
4646
use crate::runtime::signal_safe_block_on;
4747
use crate::shape::PyShape;
4848
use crate::supervision::SupervisionError;
49+
use crate::supervision::Unhealthy;
4950

5051
// A wrapper around `ProcMesh` which keeps track of all `RootActorMesh`s that it spawns.
5152
pub struct TrackedProcMesh {
@@ -119,7 +120,7 @@ pub struct PyProcMesh {
119120
proc_events: SharedCell<Mutex<ProcEvents>>,
120121
user_monitor_receiver: SharedCell<Mutex<mpsc::UnboundedReceiver<ProcEvent>>>,
121122
user_monitor_registered: Arc<AtomicBool>,
122-
unhealthy_event: Arc<Mutex<Option<Option<ProcEvent>>>>,
123+
unhealthy_event: Arc<Mutex<Unhealthy<ProcEvent>>>,
123124
}
124125

125126
fn allocate_proc_mesh(alloc: &PyAlloc) -> PyResult<PyPythonTask> {
@@ -146,15 +147,15 @@ impl PyProcMesh {
146147
let proc_events = SharedCell::from(Mutex::new(proc_mesh.events().unwrap()));
147148
let (user_sender, user_receiver) = mpsc::unbounded_channel::<ProcEvent>();
148149
let user_monitor_registered = Arc::new(AtomicBool::new(false));
149-
let unhealthy_event = Arc::new(Mutex::new(None));
150+
let unhealthy_event = Arc::new(Mutex::new(Unhealthy::SoFarSoGood));
150151
let monitor = tokio::spawn(Self::default_proc_mesh_monitor(
151152
proc_events
152153
.borrow()
153154
.expect("borrowing immediately after creation"),
154155
world_id,
155156
user_sender,
156-
user_monitor_registered.clone(),
157-
unhealthy_event.clone(),
157+
Arc::clone(&user_monitor_registered),
158+
Arc::clone(&unhealthy_event),
158159
));
159160
Self {
160161
inner: SharedCell::from(TrackedProcMesh::from(proc_mesh)),
@@ -172,7 +173,7 @@ impl PyProcMesh {
172173
world_id: WorldId,
173174
user_sender: mpsc::UnboundedSender<ProcEvent>,
174175
user_monitor_registered: Arc<AtomicBool>,
175-
unhealthy_event: Arc<Mutex<Option<Option<ProcEvent>>>>,
176+
unhealthy_event: Arc<Mutex<Unhealthy<ProcEvent>>>,
176177
) {
177178
loop {
178179
let mut proc_events = events.lock().await;
@@ -181,15 +182,15 @@ impl PyProcMesh {
181182
let mut inner_unhealthy_event = unhealthy_event.lock().await;
182183
match event {
183184
None => {
184-
*inner_unhealthy_event = Some(None);
185+
*inner_unhealthy_event = Unhealthy::StreamClosed;
185186
tracing::info!("ProcMesh {}: alloc has stopped", world_id);
186187
break;
187188
}
188189
Some(event) => match event {
189190
// Graceful stops can be ignored.
190191
ProcEvent::Stopped(_, ProcStopReason::Stopped) => continue,
191192
event => {
192-
*inner_unhealthy_event = Some(Some(event.clone()));
193+
*inner_unhealthy_event = Unhealthy::Crashed(event.clone());
193194
tracing::info!("ProcMesh {}: {}", world_id, event);
194195
if user_monitor_registered.load(std::sync::atomic::Ordering::SeqCst) {
195196
if user_sender.send(event).is_err() {
@@ -202,7 +203,7 @@ impl PyProcMesh {
202203
}
203204
_ = events.preempted() => {
204205
let mut inner_unhealthy_event = unhealthy_event.lock().await;
205-
*inner_unhealthy_event = Some(None);
206+
*inner_unhealthy_event = Unhealthy::StreamClosed;
206207
tracing::info!("ProcMesh {}: is stopped", world_id);
207208
break;
208209
}
@@ -243,19 +244,18 @@ impl PyProcMesh {
243244
}
244245
}
245246

246-
// Return with error if the mesh is unhealthy.
247-
async fn ensure_mesh_healthy(
248-
unhealthy_event: &Mutex<Option<Option<ProcEvent>>>,
249-
) -> Result<(), PyErr> {
247+
async fn ensure_mesh_healthy(unhealthy_event: &Mutex<Unhealthy<ProcEvent>>) -> Result<(), PyErr> {
250248
let locked = unhealthy_event.lock().await;
251-
if let Some(event) = &*locked {
252-
let msg = match event {
253-
Some(e) => format!("proc mesh is stopped with reason: {:?}", e),
254-
None => "proc mesh is stopped with reason: alloc is stopped".to_string(),
255-
};
256-
return Err(SupervisionError::new_err(msg));
249+
match &*locked {
250+
Unhealthy::SoFarSoGood => Ok(()),
251+
Unhealthy::StreamClosed => Err(SupervisionError::new_err(
252+
"proc mesh is stopped with reason: alloc is stopped".to_string(),
253+
)),
254+
Unhealthy::Crashed(event) => Err(SupervisionError::new_err(format!(
255+
"proc mesh is stopped with reason: {:?}",
256+
event
257+
))),
257258
}
258-
Ok(())
259259
}
260260

261261
#[pymethods]

monarch_hyperactor/src/supervision.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,23 @@ pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
2323
module.add("SupervisionError", py.get_type::<SupervisionError>())?;
2424
Ok(())
2525
}
26+
27+
// Shared between mesh types.
28+
#[derive(Debug, Clone)]
29+
pub(crate) enum Unhealthy<Event> {
30+
SoFarSoGood, // Still healthy
31+
StreamClosed, // Event stream closed
32+
Crashed(Event), // Bad health event received
33+
}
34+
35+
impl<Event> Unhealthy<Event> {
36+
#[allow(dead_code)] // No uses yet.
37+
pub(crate) fn is_healthy(&self) -> bool {
38+
matches!(self, Unhealthy::SoFarSoGood)
39+
}
40+
41+
#[allow(dead_code)] // No uses yet.
42+
pub(crate) fn is_crashed(&self) -> bool {
43+
matches!(self, Unhealthy::Crashed(_))
44+
}
45+
}

0 commit comments

Comments
 (0)