Skip to content

Commit 1b0d02f

Browse files
zdevitofacebook-github-bot
authored andcommitted
Define blocking variants of everything in terms of non-blocking variant. (#588)
Summary: Pull Request resolved: #588 This uses the PyPythonTask functionality developed in the last diff to delete all the _blocking variants of exposing tokio async work to Python. The new approach is to define the entire thing in async and then wrap it in a future: `Future(async_impl, requires_loop=False)`. If async_impl only awaits on (1) PythonTask objects, and (2) other coroutine functions, then it is correct to pass `requires_loop=False`, which keeps execution speed the same as the blocking version. Otherwise, pass requires_loop=True and the code will work correctly in both async and sync contexts. Reviewed By: eliothedeman Differential Revision: D78585722 fbshipit-source-id: 07698ac356bdca01eb83a31ed5b9cf91eddea669
1 parent 411028c commit 1b0d02f

File tree

14 files changed

+137
-563
lines changed

14 files changed

+137
-563
lines changed

monarch_hyperactor/src/actor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ use hyperactor::Context;
2222
use hyperactor::Handler;
2323
use hyperactor::Instance;
2424
use hyperactor::Named;
25-
use hyperactor::forward;
2625
use hyperactor::message::Bind;
2726
use hyperactor::message::Bindings;
2827
use hyperactor::message::Unbind;
2928
use hyperactor_mesh::comm::multicast::CastInfo;
3029
use monarch_types::PickledPyObject;
3130
use monarch_types::SerializablePyErr;
32-
use pyo3::conversion::IntoPyObjectExt;
31+
use pyo3::IntoPyObjectExt;
3332
use pyo3::exceptions::PyBaseException;
3433
use pyo3::exceptions::PyRuntimeError;
3534
use pyo3::exceptions::PyStopIteration;

monarch_hyperactor/src/alloc.rs

Lines changed: 9 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use pyo3::prelude::*;
3737
use pyo3::types::PyDict;
3838
use tokio::process::Command;
3939

40+
use crate::actor::PyPythonTask;
4041
use crate::channel::PyChannelAddr;
4142
use crate::runtime::signal_safe_block_on;
4243

@@ -190,37 +191,19 @@ impl PyLocalAllocator {
190191
PyLocalAllocator {}
191192
}
192193

193-
fn allocate_nonblocking<'py>(
194-
&self,
195-
py: Python<'py>,
196-
spec: &PyAllocSpec,
197-
) -> PyResult<Bound<'py, PyAny>> {
194+
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
198195
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
199196
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
200197
// pretty easily.
201198
let spec = spec.inner.clone();
202-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
199+
PyPythonTask::new(async move {
203200
LocalAllocator
204201
.allocate(spec)
205202
.await
206203
.map(|inner| PyAlloc::new(Box::new(inner)))
207204
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
208205
})
209206
}
210-
211-
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
212-
// We could use Bound here, and acquire the GIL inside of
213-
// `signal_safe_block_on`, but it is rather awkward with the current
214-
// APIs, and we can anyway support Arc/Mutex pretty easily.
215-
let spec = spec.inner.clone();
216-
signal_safe_block_on(py, async move {
217-
LocalAllocator
218-
.allocate(spec)
219-
.await
220-
.map(|inner| PyAlloc::new(Box::new(inner)))
221-
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
222-
})?
223-
}
224207
}
225208

226209
#[pyclass(
@@ -237,37 +220,19 @@ impl PySimAllocator {
237220
PySimAllocator {}
238221
}
239222

240-
fn allocate_nonblocking<'py>(
241-
&self,
242-
py: Python<'py>,
243-
spec: &PyAllocSpec,
244-
) -> PyResult<Bound<'py, PyAny>> {
223+
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
245224
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
246225
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
247226
// pretty easily.
248227
let spec = spec.inner.clone();
249-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
228+
PyPythonTask::new(async move {
250229
SimAllocator
251230
.allocate(spec)
252231
.await
253232
.map(|inner| PyAlloc::new(Box::new(inner)))
254233
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
255234
})
256235
}
257-
258-
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
259-
// We could use Bound here, and acquire the GIL inside of
260-
// `signal_safe_block_on`, but it is rather awkward with the current
261-
// APIs, and we can anyway support Arc/Mutex pretty easily.
262-
let spec = spec.inner.clone();
263-
signal_safe_block_on(py, async move {
264-
SimAllocator
265-
.allocate(spec)
266-
.await
267-
.map(|inner| PyAlloc::new(Box::new(inner)))
268-
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
269-
})?
270-
}
271236
}
272237

273238
#[pyclass(
@@ -296,17 +261,13 @@ impl PyProcessAllocator {
296261
}
297262
}
298263

299-
fn allocate_nonblocking<'py>(
300-
&self,
301-
py: Python<'py>,
302-
spec: &PyAllocSpec,
303-
) -> PyResult<Bound<'py, PyAny>> {
264+
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
304265
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
305266
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
306267
// pretty easily.
307268
let instance = Arc::clone(&self.inner);
308269
let spec = spec.inner.clone();
309-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
270+
PyPythonTask::new(async move {
310271
instance
311272
.lock()
312273
.await
@@ -316,23 +277,6 @@ impl PyProcessAllocator {
316277
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
317278
})
318279
}
319-
320-
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
321-
// We could use Bound here, and acquire the GIL inside of
322-
// `signal_safe_block_on`, but it is rather awkward with the current
323-
// APIs, and we can anyway support Arc/Mutex pretty easily.
324-
let instance = Arc::clone(&self.inner);
325-
let spec = spec.inner.clone();
326-
signal_safe_block_on(py, async move {
327-
instance
328-
.lock()
329-
.await
330-
.allocate(spec)
331-
.await
332-
.map(|inner| PyAlloc::new(Box::new(inner)))
333-
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
334-
})?
335-
}
336280
}
337281

338282
/// A `[hyperactor_mesh::alloc::RemoteProcessAllocInitializer]` wrapper to enable subclassing from Python.
@@ -486,34 +430,18 @@ impl PyRemoteAllocator {
486430
})
487431
}
488432

489-
fn allocate_nonblocking<'py>(
490-
&self,
491-
py: Python<'py>,
492-
spec: &PyAllocSpec,
493-
) -> PyResult<Bound<'py, PyAny>> {
433+
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
494434
let spec = spec.inner.clone();
495435
let mut cloned = self.clone();
496436

497-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
437+
PyPythonTask::new(async move {
498438
cloned
499439
.allocate(spec)
500440
.await
501441
.map(|alloc| PyAlloc::new(Box::new(alloc)))
502442
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
503443
})
504444
}
505-
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
506-
let spec = spec.inner.clone();
507-
let mut cloned = self.clone();
508-
509-
signal_safe_block_on(py, async move {
510-
cloned
511-
.allocate(spec)
512-
.await
513-
.map(|alloc| PyAlloc::new(Box::new(alloc)))
514-
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
515-
})?
516-
}
517445
}
518446

519447
pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {

monarch_hyperactor/src/proc_mesh.rs

Lines changed: 14 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ use pyo3::types::PyType;
3838
use tokio::sync::Mutex;
3939
use tokio::sync::mpsc;
4040

41+
use crate::actor::PyPythonTask;
42+
use crate::actor::PythonTask;
4143
use crate::actor_mesh::PythonActorMesh;
4244
use crate::alloc::PyAlloc;
4345
use crate::mailbox::PyMailbox;
@@ -120,7 +122,7 @@ pub struct PyProcMesh {
120122
unhealthy_event: Arc<Mutex<Option<Option<ProcEvent>>>>,
121123
}
122124

123-
fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'py, PyAny>> {
125+
fn allocate_proc_mesh(alloc: &PyAlloc) -> PyResult<PyPythonTask> {
124126
let alloc = match alloc.take() {
125127
Some(alloc) => alloc,
126128
None => {
@@ -129,7 +131,7 @@ fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'
129131
));
130132
}
131133
};
132-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
134+
PyPythonTask::new(async move {
133135
let world_id = alloc.world_id().clone();
134136
let mesh = ProcMesh::allocate(alloc)
135137
.await
@@ -138,24 +140,6 @@ fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'
138140
})
139141
}
140142

141-
fn allocate_proc_mesh_blocking<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<PyProcMesh> {
142-
let alloc = match alloc.take() {
143-
Some(alloc) => alloc,
144-
None => {
145-
return Err(PyException::new_err(
146-
"Alloc object already been used".to_string(),
147-
));
148-
}
149-
};
150-
signal_safe_block_on(py, async move {
151-
let world_id = alloc.world_id().clone();
152-
let mesh = ProcMesh::allocate(alloc)
153-
.await
154-
.map_err(|err| PyException::new_err(err.to_string()))?;
155-
Ok(PyProcMesh::monitored(mesh, world_id))
156-
})?
157-
}
158-
159143
impl PyProcMesh {
160144
/// Create a new [`PyProcMesh`] with self health status monitoring.
161145
fn monitored(mut proc_mesh: ProcMesh, world_id: WorldId) -> Self {
@@ -281,71 +265,34 @@ impl PyProcMesh {
281265
_cls: &Bound<'_, PyType>,
282266
py: Python<'py>,
283267
alloc: &PyAlloc,
284-
) -> PyResult<Bound<'py, PyAny>> {
285-
allocate_proc_mesh(py, alloc)
286-
}
287-
288-
#[classmethod]
289-
fn allocate_blocking<'py>(
290-
_cls: &Bound<'_, PyType>,
291-
py: Python<'py>,
292-
alloc: &PyAlloc,
293-
) -> PyResult<PyProcMesh> {
294-
allocate_proc_mesh_blocking(py, alloc)
268+
) -> PyResult<PyPythonTask> {
269+
allocate_proc_mesh(alloc)
295270
}
296271

297272
fn spawn_nonblocking<'py>(
298273
&self,
299-
py: Python<'py>,
300274
name: String,
301275
actor: &Bound<'py, PyType>,
302-
) -> PyResult<Bound<'py, PyAny>> {
276+
) -> PyResult<PyPythonTask> {
303277
let unhealthy_event = Arc::clone(&self.unhealthy_event);
304278
let pickled_type = PickledPyObject::pickle(actor.as_any())?;
305279
let proc_mesh = self.try_inner()?;
306280
let keepalive = self.keepalive.clone();
307-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
281+
PyPythonTask::new(async move {
308282
ensure_mesh_healthy(&unhealthy_event).await?;
309283

310284
let mailbox = proc_mesh.client().clone();
311285
let actor_mesh = proc_mesh.spawn(&name, &pickled_type).await?;
312286
let actor_events = actor_mesh.with_mut(|a| a.events()).await.unwrap().unwrap();
313-
let python_actor_mesh = PythonActorMesh::monitored(
287+
Ok(PythonActorMesh::monitored(
314288
actor_mesh,
315289
PyMailbox { inner: mailbox },
316290
keepalive,
317291
actor_events,
318-
);
319-
Python::with_gil(|py| python_actor_mesh.into_py_any(py))
292+
))
320293
})
321294
}
322295

323-
fn spawn_blocking<'py>(
324-
&self,
325-
py: Python<'py>,
326-
name: String,
327-
actor: &Bound<'py, PyType>,
328-
) -> PyResult<PyObject> {
329-
let unhealthy_event = Arc::clone(&self.unhealthy_event);
330-
let pickled_type = PickledPyObject::pickle(actor.as_any())?;
331-
let proc_mesh = self.try_inner()?;
332-
let keepalive = self.keepalive.clone();
333-
signal_safe_block_on(py, async move {
334-
ensure_mesh_healthy(&unhealthy_event).await?;
335-
336-
let mailbox = proc_mesh.client().clone();
337-
let actor_mesh = proc_mesh.spawn(&name, &pickled_type).await?;
338-
let actor_events = actor_mesh.with_mut(|a| a.events()).await.unwrap().unwrap();
339-
let python_actor_mesh = PythonActorMesh::monitored(
340-
actor_mesh,
341-
PyMailbox { inner: mailbox },
342-
keepalive,
343-
actor_events,
344-
);
345-
Python::with_gil(|py| python_actor_mesh.into_py_any(py))
346-
})?
347-
}
348-
349296
// User can call this to monitor the proc mesh events. This will override
350297
// the default monitor that exits the client on process crash, so user can
351298
// handle the process crash in their own way.
@@ -383,27 +330,16 @@ impl PyProcMesh {
383330
Ok(self.try_inner()?.shape().clone().into())
384331
}
385332

386-
fn stop_nonblocking<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
333+
fn stop_nonblocking<'py>(&self) -> PyResult<PyPythonTask> {
387334
// Clone the necessary fields from self to avoid capturing self in the async block
388335
let inner = self.inner.clone();
389336
let proc_events = self.proc_events.clone();
390337

391-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
338+
Ok(PythonTask::new(async move {
392339
Self::stop_mesh(inner, proc_events).await?;
393-
PyResult::Ok(())
340+
Python::with_gil(|py| Ok(py.None()))
394341
})
395-
}
396-
397-
fn stop_blocking<'py>(&self, py: Python<'py>) -> PyResult<()> {
398-
// Clone the necessary fields from self to avoid capturing self in the async block
399-
let inner = self.inner.clone();
400-
let proc_events = self.proc_events.clone();
401-
402-
signal_safe_block_on(py, async move {
403-
Self::stop_mesh(inner, proc_events)
404-
.await
405-
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
406-
})?
342+
.into())
407343
}
408344
}
409345

0 commit comments

Comments
 (0)