From 22d85c52f330525e63125b4b14a2d4edd51b7fe2 Mon Sep 17 00:00:00 2001 From: zdevito Date: Fri, 18 Jul 2025 17:43:25 -0700 Subject: [PATCH 1/2] Define blocking variants of everything in terms of non-blocking variant. This uses the PyPythonTask functionality developed in the last diff to delete all the _blocking variants of exposing tokio async work to Python. The new approach is to define the entire thing in async and then wrap it in a future: `Future(async_impl, requires_loop=False)`. If async_impl only awaits on (1) PythonTask objects, and (2) other coroutine functions, then it is correct to pass `requires_loop=False`, which keeps execution speed the same as the blocking version. Otherwise, pass requires_loop=True and the code will work correctly in both async and sync contexts. Differential Revision: [D78585722](https://our.internmc.facebook.com/intern/diff/D78585722/) [ghstack-poisoned] --- monarch_hyperactor/src/alloc.rs | 90 ++---------- monarch_hyperactor/src/proc_mesh.rs | 92 ++---------- monarch_rdma/extension/lib.rs | 123 ++-------------- .../monarch_hyperactor/alloc.pyi | 50 +------ .../monarch_hyperactor/proc_mesh.pyi | 37 +---- .../monarch/_rust_bindings/rdma/__init__.pyi | 37 ++--- python/monarch/_src/actor/actor_mesh.py | 17 +-- python/monarch/_src/actor/allocator.py | 20 +-- python/monarch/_src/actor/future.py | 27 +--- python/monarch/_src/actor/proc_mesh.py | 133 ++++++------------ python/monarch/_src/tensor_engine/rdma.py | 39 +---- python/tests/test_allocator.py | 6 +- python/tests/test_python_actors.py | 21 ++- 13 files changed, 130 insertions(+), 562 deletions(-) diff --git a/monarch_hyperactor/src/alloc.rs b/monarch_hyperactor/src/alloc.rs index 558e20bfa..74f1d4d51 100644 --- a/monarch_hyperactor/src/alloc.rs +++ b/monarch_hyperactor/src/alloc.rs @@ -37,6 +37,7 @@ use pyo3::prelude::*; use pyo3::types::PyDict; use tokio::process::Command; +use crate::actor::PyPythonTask; use crate::channel::PyChannelAddr; use crate::runtime::signal_safe_block_on; @@ -190,16 +191,12 @@ impl PyLocalAllocator { PyLocalAllocator {} } - fn allocate_nonblocking<'py>( - &self, - py: Python<'py>, - spec: &PyAllocSpec, - ) -> PyResult> { + fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult { // We could use Bound here, and acquire the GIL inside of `future_into_py`, but // it is rather awkward with the current APIs, and we can anyway support Arc/Mutex // pretty easily. let spec = spec.inner.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + PyPythonTask::new(async move { LocalAllocator .allocate(spec) .await @@ -207,20 +204,6 @@ impl PyLocalAllocator { .map_err(|e| PyRuntimeError::new_err(format!("{}", e))) }) } - - fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult { - // We could use Bound here, and acquire the GIL inside of - // `signal_safe_block_on`, but it is rather awkward with the current - // APIs, and we can anyway support Arc/Mutex pretty easily. - let spec = spec.inner.clone(); - signal_safe_block_on(py, async move { - LocalAllocator - .allocate(spec) - .await - .map(|inner| PyAlloc::new(Box::new(inner))) - .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e))) - })? - } } #[pyclass( @@ -237,16 +220,12 @@ impl PySimAllocator { PySimAllocator {} } - fn allocate_nonblocking<'py>( - &self, - py: Python<'py>, - spec: &PyAllocSpec, - ) -> PyResult> { + fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult { // We could use Bound here, and acquire the GIL inside of `future_into_py`, but // it is rather awkward with the current APIs, and we can anyway support Arc/Mutex // pretty easily. let spec = spec.inner.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + PyPythonTask::new(async move { SimAllocator .allocate(spec) .await @@ -254,20 +233,6 @@ impl PySimAllocator { .map_err(|e| PyRuntimeError::new_err(format!("{}", e))) }) } - - fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult { - // We could use Bound here, and acquire the GIL inside of - // `signal_safe_block_on`, but it is rather awkward with the current - // APIs, and we can anyway support Arc/Mutex pretty easily. - let spec = spec.inner.clone(); - signal_safe_block_on(py, async move { - SimAllocator - .allocate(spec) - .await - .map(|inner| PyAlloc::new(Box::new(inner))) - .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e))) - })? - } } #[pyclass( @@ -296,17 +261,13 @@ impl PyProcessAllocator { } } - fn allocate_nonblocking<'py>( - &self, - py: Python<'py>, - spec: &PyAllocSpec, - ) -> PyResult> { + fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult { // We could use Bound here, and acquire the GIL inside of `future_into_py`, but // it is rather awkward with the current APIs, and we can anyway support Arc/Mutex // pretty easily. let instance = Arc::clone(&self.inner); let spec = spec.inner.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + PyPythonTask::new(async move { instance .lock() .await @@ -316,23 +277,6 @@ impl PyProcessAllocator { .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e))) }) } - - fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult { - // We could use Bound here, and acquire the GIL inside of - // `signal_safe_block_on`, but it is rather awkward with the current - // APIs, and we can anyway support Arc/Mutex pretty easily. - let instance = Arc::clone(&self.inner); - let spec = spec.inner.clone(); - signal_safe_block_on(py, async move { - instance - .lock() - .await - .allocate(spec) - .await - .map(|inner| PyAlloc::new(Box::new(inner))) - .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e))) - })? - } } /// A `[hyperactor_mesh::alloc::RemoteProcessAllocInitializer]` wrapper to enable subclassing from Python. @@ -486,15 +430,11 @@ impl PyRemoteAllocator { }) } - fn allocate_nonblocking<'py>( - &self, - py: Python<'py>, - spec: &PyAllocSpec, - ) -> PyResult> { + fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult { let spec = spec.inner.clone(); let mut cloned = self.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + PyPythonTask::new(async move { cloned .allocate(spec) .await @@ -502,18 +442,6 @@ impl PyRemoteAllocator { .map_err(|e| PyRuntimeError::new_err(format!("{}", e))) }) } - fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult { - let spec = spec.inner.clone(); - let mut cloned = self.clone(); - - signal_safe_block_on(py, async move { - cloned - .allocate(spec) - .await - .map(|alloc| PyAlloc::new(Box::new(alloc))) - .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e))) - })? - } } pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> { diff --git a/monarch_hyperactor/src/proc_mesh.rs b/monarch_hyperactor/src/proc_mesh.rs index 6222a3bfe..e96eb4a0c 100644 --- a/monarch_hyperactor/src/proc_mesh.rs +++ b/monarch_hyperactor/src/proc_mesh.rs @@ -38,6 +38,8 @@ use pyo3::types::PyType; use tokio::sync::Mutex; use tokio::sync::mpsc; +use crate::actor::PyPythonTask; +use crate::actor::PythonTask; use crate::actor_mesh::PythonActorMesh; use crate::alloc::PyAlloc; use crate::mailbox::PyMailbox; @@ -120,7 +122,7 @@ pub struct PyProcMesh { unhealthy_event: Arc>>>, } -fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult> { +fn allocate_proc_mesh(alloc: &PyAlloc) -> PyResult { let alloc = match alloc.take() { Some(alloc) => alloc, None => { @@ -129,7 +131,7 @@ fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult(py: Python<'py>, alloc: &PyAlloc) -> PyResult(py: Python<'py>, alloc: &PyAlloc) -> PyResult { - let alloc = match alloc.take() { - Some(alloc) => alloc, - None => { - return Err(PyException::new_err( - "Alloc object already been used".to_string(), - )); - } - }; - signal_safe_block_on(py, async move { - let world_id = alloc.world_id().clone(); - let mesh = ProcMesh::allocate(alloc) - .await - .map_err(|err| PyException::new_err(err.to_string()))?; - Ok(PyProcMesh::monitored(mesh, world_id)) - })? -} - impl PyProcMesh { /// Create a new [`PyProcMesh`] with self health status monitoring. fn monitored(mut proc_mesh: ProcMesh, world_id: WorldId) -> Self { @@ -281,71 +265,34 @@ impl PyProcMesh { _cls: &Bound<'_, PyType>, py: Python<'py>, alloc: &PyAlloc, - ) -> PyResult> { - allocate_proc_mesh(py, alloc) - } - - #[classmethod] - fn allocate_blocking<'py>( - _cls: &Bound<'_, PyType>, - py: Python<'py>, - alloc: &PyAlloc, - ) -> PyResult { - allocate_proc_mesh_blocking(py, alloc) + ) -> PyResult { + allocate_proc_mesh(alloc) } fn spawn_nonblocking<'py>( &self, - py: Python<'py>, name: String, actor: &Bound<'py, PyType>, - ) -> PyResult> { + ) -> PyResult { let unhealthy_event = Arc::clone(&self.unhealthy_event); let pickled_type = PickledPyObject::pickle(actor.as_any())?; let proc_mesh = self.try_inner()?; let keepalive = self.keepalive.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + PyPythonTask::new(async move { ensure_mesh_healthy(&unhealthy_event).await?; let mailbox = proc_mesh.client().clone(); let actor_mesh = proc_mesh.spawn(&name, &pickled_type).await?; let actor_events = actor_mesh.with_mut(|a| a.events()).await.unwrap().unwrap(); - let python_actor_mesh = PythonActorMesh::monitored( + Ok(PythonActorMesh::monitored( actor_mesh, PyMailbox { inner: mailbox }, keepalive, actor_events, - ); - Python::with_gil(|py| python_actor_mesh.into_py_any(py)) + )) }) } - fn spawn_blocking<'py>( - &self, - py: Python<'py>, - name: String, - actor: &Bound<'py, PyType>, - ) -> PyResult { - let unhealthy_event = Arc::clone(&self.unhealthy_event); - let pickled_type = PickledPyObject::pickle(actor.as_any())?; - let proc_mesh = self.try_inner()?; - let keepalive = self.keepalive.clone(); - signal_safe_block_on(py, async move { - ensure_mesh_healthy(&unhealthy_event).await?; - - let mailbox = proc_mesh.client().clone(); - let actor_mesh = proc_mesh.spawn(&name, &pickled_type).await?; - let actor_events = actor_mesh.with_mut(|a| a.events()).await.unwrap().unwrap(); - let python_actor_mesh = PythonActorMesh::monitored( - actor_mesh, - PyMailbox { inner: mailbox }, - keepalive, - actor_events, - ); - Python::with_gil(|py| python_actor_mesh.into_py_any(py)) - })? - } - // User can call this to monitor the proc mesh events. This will override // the default monitor that exits the client on process crash, so user can // handle the process crash in their own way. @@ -383,27 +330,16 @@ impl PyProcMesh { Ok(self.try_inner()?.shape().clone().into()) } - fn stop_nonblocking<'py>(&self, py: Python<'py>) -> PyResult> { + fn stop_nonblocking<'py>(&self) -> PyResult { // Clone the necessary fields from self to avoid capturing self in the async block let inner = self.inner.clone(); let proc_events = self.proc_events.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + Ok(PythonTask::new(async move { Self::stop_mesh(inner, proc_events).await?; - PyResult::Ok(()) + Python::with_gil(|py| Ok(py.None())) }) - } - - fn stop_blocking<'py>(&self, py: Python<'py>) -> PyResult<()> { - // Clone the necessary fields from self to avoid capturing self in the async block - let inner = self.inner.clone(); - let proc_events = self.proc_events.clone(); - - signal_safe_block_on(py, async move { - Self::stop_mesh(inner, proc_events) - .await - .map_err(|e| PyRuntimeError::new_err(format!("{}", e))) - })? + .into()) } } diff --git a/monarch_rdma/extension/lib.rs b/monarch_rdma/extension/lib.rs index 1dc4da813..20fb88e67 100644 --- a/monarch_rdma/extension/lib.rs +++ b/monarch_rdma/extension/lib.rs @@ -13,6 +13,7 @@ use hyperactor::Named; use hyperactor::ProcId; use hyperactor_mesh::RootActorMesh; use hyperactor_mesh::shared_cell::SharedCell; +use monarch_hyperactor::actor::PyPythonTask; use monarch_hyperactor::mailbox::PyMailbox; use monarch_hyperactor::proc_mesh::PyProcMesh; use monarch_hyperactor::runtime::signal_safe_block_on; @@ -65,26 +66,6 @@ async fn create_rdma_buffer( #[pymethods] impl PyRdmaBuffer { - #[classmethod] - fn create_rdma_buffer_blocking<'py>( - _cls: &Bound<'_, PyType>, - py: Python<'py>, - addr: usize, - size: usize, - proc_id: String, - client: PyMailbox, - ) -> PyResult { - if !ibverbs_supported() { - return Err(PyException::new_err( - "ibverbs is not supported on this system", - )); - } - signal_safe_block_on( - py, - create_rdma_buffer(addr, size, proc_id.parse().unwrap(), client), - )? - } - #[classmethod] fn create_rdma_buffer_nonblocking<'py>( _cls: &Bound<'_, PyType>, @@ -93,16 +74,18 @@ impl PyRdmaBuffer { size: usize, proc_id: String, client: PyMailbox, - ) -> PyResult> { + ) -> PyResult { if !ibverbs_supported() { return Err(PyException::new_err( "ibverbs is not supported on this system", )); } - pyo3_async_runtimes::tokio::future_into_py( - py, - create_rdma_buffer(addr, size, proc_id.parse().unwrap(), client), - ) + PyPythonTask::new(create_rdma_buffer( + addr, + size, + proc_id.parse().unwrap(), + client, + )) } #[classmethod] @@ -136,9 +119,9 @@ impl PyRdmaBuffer { local_proc_id: String, client: PyMailbox, timeout: u64, - ) -> PyResult> { + ) -> PyResult { let (local_owner_ref, buffer) = setup_rdma_context(self, local_proc_id); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + PyPythonTask::new(async move { let caps = client.get_inner(); let local_buffer = local_owner_ref.request_buffer(caps, addr, size).await?; let _result_ = local_buffer @@ -149,41 +132,6 @@ impl PyRdmaBuffer { }) } - /// Reads data from the local buffer and places it into this remote RDMA buffer. - /// - /// This operation appears as "read_into" from the caller's perspective (reading from local memory - /// into the remote buffer), but internally it's implemented as a "write_from" operation on the - /// local buffer since the data flows from the local buffer to the remote one. - /// - /// This is the blocking version of `read_into`, compatible with non asyncio Python code. - /// - /// # Arguments - /// * `addr` - The address of the local buffer to read from - /// * `size` - The size of the data to transfer - /// * `local_proc_id` - The process ID where the local buffer resides - /// * `client` - The mailbox for communication - /// * `timeout` - Maximum time in milliseconds to wait for the operation - #[pyo3(signature = (addr, size, local_proc_id, client, timeout))] - fn read_into_blocking<'py>( - &self, - py: Python<'py>, - addr: usize, - size: usize, - local_proc_id: String, - client: PyMailbox, - timeout: u64, - ) -> PyResult { - let (local_owner_ref, buffer) = setup_rdma_context(self, local_proc_id); - signal_safe_block_on(py, async move { - let caps = client.get_inner(); - let local_buffer = local_owner_ref.request_buffer(caps, addr, size).await?; - local_buffer - .write_from(caps, buffer, timeout) - .await - .map_err(|e| PyException::new_err(format!("failed to read into buffer: {}", e))) - })? - } - /// Writes data from this remote RDMA buffer into a local buffer. /// /// This operation appears as "write_from" from the caller's perspective (writing from the remote @@ -205,9 +153,9 @@ impl PyRdmaBuffer { local_proc_id: String, client: PyMailbox, timeout: u64, - ) -> PyResult> { + ) -> PyResult { let (local_owner_ref, buffer) = setup_rdma_context(self, local_proc_id); - pyo3_async_runtimes::tokio::future_into_py(py, async move { + PyPythonTask::new(async move { let caps = client.get_inner(); let local_buffer = local_owner_ref.request_buffer(caps, addr, size).await?; let _result_ = local_buffer @@ -218,41 +166,6 @@ impl PyRdmaBuffer { }) } - /// Writes data from this remote RDMA buffer into a local buffer. - /// - /// This operation appears as "write_from" from the caller's perspective (writing from the remote - /// buffer into local memory), but internally it's implemented as a "read_into" operation on the - /// local buffer since the data flows from the remote buffer to the local one. - /// - /// This is the blocking version of `write_from`, compatible with non asyncio Python code. - /// - /// # Arguments - /// * `addr` - The address of the local buffer to write to - /// * `size` - The size of the data to transfer - /// * `local_proc_id` - The process ID where the local buffer resides - /// * `client` - The mailbox for communication - /// * `timeout` - Maximum time in milliseconds to wait for the operation - #[pyo3(signature = (addr, size, local_proc_id, client, timeout))] - fn write_from_blocking<'py>( - &self, - py: Python<'py>, - addr: usize, - size: usize, - local_proc_id: String, - client: PyMailbox, - timeout: u64, - ) -> PyResult { - let (local_owner_ref, buffer) = setup_rdma_context(self, local_proc_id); - signal_safe_block_on(py, async move { - let caps = client.get_inner(); - let local_buffer = local_owner_ref.request_buffer(caps, addr, size).await?; - local_buffer - .read_into(caps, buffer, timeout) - .await - .map_err(|e| PyException::new_err(format!("failed to write from buffer: {}", e))) - })? - } - fn __reduce__(&self) -> PyResult<(PyObject, PyObject)> { Python::with_gil(|py| { let ctor = py.get_type::().to_object(py); @@ -272,18 +185,10 @@ impl PyRdmaBuffer { Ok(deserialized) } - fn drop<'py>(&self, py: Python<'py>) -> PyResult> { + fn drop<'py>(&self) -> PyResult { // no op with CPUs, currently a stub. // TODO - replace with correct GPU behavior. - pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(()) }) - } - - fn drop_blocking<'py>(&self, py: Python<'py>) -> PyResult<()> { - signal_safe_block_on(py, async move { - // no op with CPUs, currently a stub. - // TODO - replace with correct GPU behavior. - Ok(()) - })? + PyPythonTask::new(async move { Ok(()) }) } } diff --git a/python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi b/python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi index a58cb6674..317902141 100644 --- a/python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi +++ b/python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi @@ -10,6 +10,8 @@ from datetime import timedelta from typing import final, Optional from monarch._rust_bindings.monarch_hyperactor.alloc import Alloc, AllocSpec +from monarch._rust_bindings.monarch_hyperactor.mailbox import Mailbox, PythonTask + from typing_extensions import Self class Alloc: @@ -61,7 +63,7 @@ class ProcessAllocatorBase: """ ... - async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc: + def allocate_nonblocking(self, spec: AllocSpec) -> PythonTask[Alloc]: """ Allocate a process according to the provided spec. @@ -70,18 +72,8 @@ class ProcessAllocatorBase: """ ... - def allocate_blocking(self, spec: AllocSpec) -> Alloc: - """ - Allocate a process according to the provided spec, blocking until an - alloc is returned. - - Arguments: - - `spec`: The spec to allocate according to. - """ - ... - class LocalAllocatorBase: - async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc: + def allocate_nonblocking(self, spec: AllocSpec) -> PythonTask[Alloc]: """ Allocate a process according to the provided spec. @@ -90,18 +82,8 @@ class LocalAllocatorBase: """ ... - def allocate_blocking(self, spec: AllocSpec) -> Alloc: - """ - Allocate a process according to the provided spec, blocking until an - alloc is returned. - - Arguments: - - `spec`: The spec to allocate according to. - """ - ... - class SimAllocatorBase: - async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc: + def allocate_nonblocking(self, spec: AllocSpec) -> PythonTask[Alloc]: """ Allocate a process according to the provided spec. @@ -110,16 +92,6 @@ class SimAllocatorBase: """ ... - def allocate_blocking(self, spec: AllocSpec) -> Alloc: - """ - Allocate a process according to the provided spec, blocking until an - alloc is returned. - - Arguments: - - `spec`: The spec to allocate according to. - """ - ... - class RemoteAllocatorBase: def __new__( cls, @@ -138,7 +110,7 @@ class RemoteAllocatorBase: """ ... - async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc: + def allocate_nonblocking(self, spec: AllocSpec) -> PythonTask[Alloc]: """ Allocate a process according to the provided spec. @@ -146,13 +118,3 @@ class RemoteAllocatorBase: - `spec`: The spec to allocate according to. """ ... - - def allocate_blocking(self, spec: AllocSpec) -> Alloc: - """ - Allocate a process according to the provided spec, blocking until an - alloc is returned. - - Arguments: - - `spec`: The spec to allocate according to. - """ - ... diff --git a/python/monarch/_rust_bindings/monarch_hyperactor/proc_mesh.pyi b/python/monarch/_rust_bindings/monarch_hyperactor/proc_mesh.pyi index 8fffc466b..d4cc4ad9a 100644 --- a/python/monarch/_rust_bindings/monarch_hyperactor/proc_mesh.pyi +++ b/python/monarch/_rust_bindings/monarch_hyperactor/proc_mesh.pyi @@ -12,13 +12,13 @@ from monarch._rust_bindings.monarch_hyperactor.actor import Actor from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh from monarch._rust_bindings.monarch_hyperactor.alloc import Alloc -from monarch._rust_bindings.monarch_hyperactor.mailbox import Mailbox +from monarch._rust_bindings.monarch_hyperactor.mailbox import Mailbox, PythonTask from monarch._rust_bindings.monarch_hyperactor.shape import Shape @final class ProcMesh: @classmethod - async def allocate_nonblocking(self, alloc: Alloc) -> ProcMesh: + def allocate_nonblocking(self, alloc: Alloc) -> PythonTask[ProcMesh]: """ Allocate a process mesh according to the provided alloc. Returns when the mesh is fully allocated. @@ -28,18 +28,9 @@ class ProcMesh: """ ... - @classmethod - def allocate_blocking(self, alloc: Alloc) -> ProcMesh: - """ - Allocate a process mesh according to the provided alloc. - Blocks until the mesh is fully allocated. - - Arguments: - - `alloc`: The alloc to allocate according to. - """ - ... - - async def spawn_nonblocking(self, name: str, actor: Type[Actor]) -> PythonActorMesh: + def spawn_nonblocking( + self, name: str, actor: Type[Actor] + ) -> PythonTask[PythonActorMesh]: """ Spawn a new actor on this mesh. @@ -49,16 +40,6 @@ class ProcMesh: """ ... - async def spawn_blocking(self, name: str, actor: Type[Actor]) -> PythonActorMesh: - """ - Spawn a new actor on this mesh. Blocks until the actor is fully spawned. - - Arguments: - - `name`: Name of the actor. - - `actor`: The type of the actor that will be spawned. - """ - ... - async def monitor(self) -> ProcMeshMonitor: """ Returns a supervision monitor for this mesh. @@ -81,18 +62,12 @@ class ProcMesh: """ ... - async def stop_nonblocking(self) -> None: + def stop_nonblocking(self) -> PythonTask[None]: """ Stop the proc mesh. """ ... - def stop_blocking(self) -> None: - """ - Stop the proc mesh. Blocks until the mesh is fully stopped. - """ - ... - def __repr__(self) -> str: ... @final diff --git a/python/monarch/_rust_bindings/rdma/__init__.pyi b/python/monarch/_rust_bindings/rdma/__init__.pyi index 45c53fc69..624128d0f 100644 --- a/python/monarch/_rust_bindings/rdma/__init__.pyi +++ b/python/monarch/_rust_bindings/rdma/__init__.pyi @@ -6,6 +6,8 @@ from typing import Any, final, Optional +from monarch._rust_bindings.monarch_hyperactor.mailbox import PythonTask + @final class _RdmaMemoryRegionView: def __init__(self, addr: int, size_in_bytes: int) -> None: ... @@ -22,47 +24,26 @@ class _RdmaBuffer: name: str @classmethod - def create_rdma_buffer_blocking( - cls, addr: int, size: int, proc_id: str, client: Any - ) -> _RdmaBuffer: ... - @classmethod - async def create_rdma_buffer_nonblocking( + def create_rdma_buffer_nonblocking( cls, addr: int, size: int, proc_id: str, client: Any - ) -> Any: ... - async def drop(self, client: Any): ... - def drop_blocking(self, client: Any): ... - async def read_into( - self, - addr: int, - size: int, - local_proc_id: str, - client: Any, - timeout: int, - ) -> Any: ... - def read_into_blocking( - self, - addr: int, - size: int, - local_proc_id: str, - client: Any, - timeout: int, - ) -> Any: ... - async def write_from( + ) -> PythonTask[Any]: ... + def drop(self, client: Any) -> PythonTask[None]: ... + def read_into( self, addr: int, size: int, local_proc_id: str, client: Any, timeout: int, - ) -> Any: ... - def write_from_blocking( + ) -> PythonTask[Any]: ... + def write_from( self, addr: int, size: int, local_proc_id: str, client: Any, timeout: int, - ) -> Any: ... + ) -> PythonTask[Any]: ... def __reduce__(self) -> tuple: ... def __repr__(self) -> str: ... @staticmethod diff --git a/python/monarch/_src/actor/actor_mesh.py b/python/monarch/_src/actor/actor_mesh.py index c3c3c4a1a..8ee030ca3 100644 --- a/python/monarch/_src/actor/actor_mesh.py +++ b/python/monarch/_src/actor/actor_mesh.py @@ -353,18 +353,7 @@ async def process() -> ValueMesh[R]: ) return ValueMesh(call_shape, results) - def process_blocking() -> ValueMesh[R]: - results: List[R] = [None] * extent.nelements # pyre-fixme[9] - for _ in range(extent.nelements): - rank, value = r.recv().get() - results[rank] = value - call_shape = Shape( - extent.labels, - NDSlice.new_row_major(extent.sizes), - ) - return ValueMesh(call_shape, results) - - return Future(process, process_blocking) + return Future(impl=process, requires_loop=False) async def stream(self, *args: P.args, **kwargs: P.kwargs) -> AsyncGenerator[R, R]: """ @@ -460,7 +449,7 @@ async def impl() -> A: value = self._combine(value, x) return value - return Future(impl) + return Future(impl=impl) class ValueMesh(MeshTrait, Generic[R]): @@ -673,7 +662,7 @@ def _process(self, msg: PythonMessage) -> R: raise ValueError(f"Unexpected message kind: {msg.kind}") def recv(self) -> "Future[R]": - return Future(lambda: self._recv(), None, requires_loop=False) + return Future(impl=lambda: self._recv(), requires_loop=False) class RankedPortReceiver(PortReceiver[Tuple[int, R]]): diff --git a/python/monarch/_src/actor/allocator.py b/python/monarch/_src/actor/allocator.py index 51a9fd9ac..81894bc32 100644 --- a/python/monarch/_src/actor/allocator.py +++ b/python/monarch/_src/actor/allocator.py @@ -42,10 +42,7 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]: Returns: - A future that will be fulfilled when the requested allocation is fulfilled. """ - return Future( - lambda: self.allocate_nonblocking(spec), - lambda: self.allocate_blocking(spec), - ) + return Future(impl=lambda: self.allocate_nonblocking(spec), requires_loop=False) @final @@ -64,10 +61,7 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]: Returns: - A future that will be fulfilled when the requested allocation is fulfilled. """ - return Future( - lambda: self.allocate_nonblocking(spec), - lambda: self.allocate_blocking(spec), - ) + return Future(impl=lambda: self.allocate_nonblocking(spec), requires_loop=False) @final @@ -86,10 +80,7 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]: Returns: - A future that will be fulfilled when the requested allocation is fulfilled. """ - return Future( - lambda: self.allocate_nonblocking(spec), - lambda: self.allocate_blocking(spec), - ) + return Future(impl=lambda: self.allocate_nonblocking(spec), requires_loop=False) class RemoteAllocInitializer(abc.ABC): @@ -235,7 +226,4 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]: Returns: - A future that will be fulfilled when the requested allocation is fulfilled. """ - return Future( - lambda: self.allocate_nonblocking(spec), - lambda: self.allocate_blocking(spec), - ) + return Future(impl=lambda: self.allocate_nonblocking(spec), requires_loop=False) diff --git a/python/monarch/_src/actor/future.py b/python/monarch/_src/actor/future.py index 5a1e9ec8c..767c541a8 100644 --- a/python/monarch/_src/actor/future.py +++ b/python/monarch/_src/actor/future.py @@ -12,14 +12,6 @@ R = TypeVar("R") -def _incomplete(impl, self): - try: - return self._set_result(impl()) - except Exception as e: - self._set_exception(e) - raise - - async def _aincomplete(impl, self): try: return self._set_result(await impl()) @@ -57,14 +49,13 @@ async def _aincomplete(impl, self): class Future(Generic[R]): - def __init__(self, impl, blocking_impl=None, requires_loop=True): - if blocking_impl is None: - blocking_impl = partial(asyncio.run, impl()) - self._get = partial(_incomplete, blocking_impl) + def __init__(self, *, impl, requires_loop=True): self._aget = partial(_aincomplete, impl) self._requires_loop = requires_loop def get(self, timeout: Optional[float] = None) -> R: + if asyncio._get_running_loop() is not None: + raise RuntimeError("get() cannot be called from within an async context") if timeout is not None: return asyncio.run(asyncio.wait_for(self._aget(self), timeout)) if not self._requires_loop: @@ -77,29 +68,23 @@ def get(self, timeout: Optional[float] = None) -> R: ) except StopIteration as e: return e.value - return self._get(self) + return asyncio.run(self._aget(self)) def __await__(self) -> Generator[R, None, R]: return self._aget(self).__await__() def _set_result(self, result): - def f(self): - return result - async def af(self): return result - self._get, self._aget = f, af + self._aget = af return result def _set_exception(self, e): - def f(self): - raise e - async def af(self): raise e - self._get, self._aget = f, af + self._aget = af # compatibility with old tensor engine Future objects # hopefully we do not need done(), add_callback because diff --git a/python/monarch/_src/actor/proc_mesh.py b/python/monarch/_src/actor/proc_mesh.py index 600930bc5..ee633db75 100644 --- a/python/monarch/_src/actor/proc_mesh.py +++ b/python/monarch/_src/actor/proc_mesh.py @@ -36,7 +36,13 @@ ProcMeshMonitor, ) from monarch._rust_bindings.monarch_hyperactor.shape import Shape, Slice -from monarch._src.actor.actor_mesh import _Actor, _ActorMeshRefImpl, Actor, ActorMeshRef +from monarch._src.actor.actor_mesh import ( + _Actor, + _ActorMeshRefImpl, + Actor, + ActorMeshRef, + fake_sync_state, +) from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator, SimAllocator from monarch._src.actor.code_sync import ( CodeSyncMeshClient, @@ -85,10 +91,6 @@ async def _allocate_nonblocking(alloc: Alloc) -> "ProcMesh": return ProcMesh(await HyProcMesh.allocate_nonblocking(alloc)) -def _allocate_blocking(alloc: Alloc) -> "ProcMesh": - return ProcMesh(HyProcMesh.allocate_blocking(alloc)) - - class ProcMesh(MeshTrait): def __init__( self, @@ -108,13 +110,19 @@ def __init__( self._logging_mesh_client: Optional[LoggingMeshClient] = None self._maybe_device_mesh: Optional["DeviceMesh"] = _device_mesh self._stopped = False - if _mock_shape is None and HAS_TENSOR_ENGINE: - # type: ignore[21] - self._rdma_manager = self._spawn_blocking("rdma_manager", RDMAManager) - if not _is_initializing_debugger and _mock_shape is None: - self._debug_manager = self._spawn_blocking( - _DEBUG_MANAGER_ACTOR_NAME, DebugManager, debug_client() - ) + + # This code is unsafe in async contexts, but we currently do it all over the place + # we need to refactor this by moving it to the first time we try to spawn on the mesh. + # Right now we simply preserve the previous behavior and disable the check that prevents + # end users from doing the same. + with fake_sync_state(): + if _mock_shape is None and HAS_TENSOR_ENGINE: + # type: ignore[21] + self._rdma_manager = self.spawn("rdma_manager", RDMAManager).get() + if not _is_initializing_debugger and _mock_shape is None: + self._debug_manager = self.spawn( + _DEBUG_MANAGER_ACTOR_NAME, DebugManager, debug_client() + ).get() @property def _shape(self) -> Shape: @@ -140,8 +148,8 @@ def spawn(self, name: str, Class: Type[T], *args: Any, **kwargs: Any) -> Future[ if self._mock_shape is not None: raise NotImplementedError("NYI: spawn on slice of a proc mesh.") return Future( - lambda: self._spawn_nonblocking(name, Class, *args, **kwargs), - lambda: self._spawn_blocking(name, Class, *args, **kwargs), + impl=lambda: self._spawn_nonblocking(name, Class, *args, **kwargs), + requires_loop=False, ) async def monitor(self) -> ProcMeshMonitor: @@ -165,29 +173,10 @@ async def monitor_loop(monitor): @classmethod def from_alloc(self, alloc: Alloc) -> Future["ProcMesh"]: return Future( - lambda: _allocate_nonblocking(alloc), - lambda: _allocate_blocking(alloc), + impl=lambda: _allocate_nonblocking(alloc), + requires_loop=False, ) - def _spawn_blocking( - self, name: str, Class: Type[T], *args: Any, **kwargs: Any - ) -> T: - if not issubclass(Class, Actor): - raise ValueError( - f"{Class} must subclass monarch.service.Actor to spawn it." - ) - - actor_mesh = self._proc_mesh.spawn_blocking(name, _Actor) - service = ActorMeshRef( - Class, - _ActorMeshRefImpl.from_hyperactor_mesh(self._mailbox, actor_mesh, self), - self._mailbox, - ) - # useful to have this separate, because eventually we can reconstitute ActorMeshRef objects across pickling by - # doing `ActorMeshRef(Class, actor_handle)` but not calling _create. - service._create(args, kwargs) - return cast(T, service) - def __repr__(self) -> str: return repr(self._proc_mesh) @@ -201,7 +190,6 @@ async def _spawn_nonblocking( raise ValueError( f"{Class} must subclass monarch.service.Actor to spawn it." ) - actor_mesh = await self._proc_mesh.spawn_nonblocking(name, _Actor) service = ActorMeshRef( Class, @@ -248,7 +236,7 @@ async def sync_workspace(self, auto_reload: bool = False) -> None: proc_mesh=self._proc_mesh, ) # TODO(agallagher): Merge this into the `CodeSyncMeshClient` actor. - self._auto_reload_actor = self._spawn_blocking( + self._auto_reload_actor = await self._spawn_nonblocking( "auto_reload", AutoReloadActor, ) @@ -298,13 +286,9 @@ async def _stop_nonblocking() -> None: await self._proc_mesh.stop_nonblocking() self._stopped = True - def _stop_blocking() -> None: - self._proc_mesh.stop_blocking() - self._stopped = True - return Future( - lambda: _stop_nonblocking(), - lambda: _stop_blocking(), + impl=lambda: _stop_nonblocking(), + requires_loop=False, ) async def __aexit__( @@ -328,17 +312,6 @@ def __del__(self) -> None: async def local_proc_mesh_nonblocking( - *, gpus: Optional[int] = None, hosts: int = 1 -) -> ProcMesh: - if gpus is None: - gpus = _local_device_count() - spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts) - allocator = LocalAllocator() - alloc = await allocator.allocate(spec) - return await ProcMesh.from_alloc(alloc) - - -def local_proc_mesh_blocking( *, gpus: Optional[int] = None, hosts: int = 1, @@ -348,17 +321,18 @@ def local_proc_mesh_blocking( gpus = _local_device_count() spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts) allocator = LocalAllocator() - alloc = allocator.allocate(spec).get() + alloc = await allocator.allocate(spec) + proc_mesh = HyProcMesh.allocate_nonblocking(alloc) return ProcMesh( - HyProcMesh.allocate_blocking(alloc), + await proc_mesh, _is_initializing_debugger=_is_initializing_debugger, ) def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[ProcMesh]: return Future( - lambda: local_proc_mesh_nonblocking(gpus=gpus, hosts=hosts), - lambda: local_proc_mesh_blocking(gpus=gpus, hosts=hosts), + impl=lambda: local_proc_mesh_nonblocking(gpus=gpus, hosts=hosts), + requires_loop=False, ) @@ -373,19 +347,10 @@ async def sim_proc_mesh_nonblocking( return await ProcMesh.from_alloc(alloc) -def sim_proc_mesh_blocking(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh: - if gpus is None: - gpus = _local_device_count() - spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts) - allocator = SimAllocator() - alloc = allocator.allocate(spec).get() - return ProcMesh.from_alloc(alloc).get() - - def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[ProcMesh]: return Future( - lambda: sim_proc_mesh_nonblocking(gpus=gpus, hosts=hosts), - lambda: sim_proc_mesh_blocking(gpus=gpus, hosts=hosts), + impl=lambda: sim_proc_mesh_nonblocking(gpus=gpus, hosts=hosts), + requires_loop=False, ) @@ -424,29 +389,12 @@ async def proc_mesh_nonblocking( return await ProcMesh.from_alloc(alloc) -def proc_mesh_blocking( - *, gpus: Optional[int] = None, hosts: int = 1, env: Optional[dict[str, str]] = None -) -> ProcMesh: - if gpus is None: - gpus = _local_device_count() - # gpus must come last in this order - # because test_remote_function_all_gather expects that hosts comes before gpus - # in the order of the dimensions. - spec = AllocSpec(AllocConstraints(), hosts=hosts, gpus=gpus) - env = env or {} - cmd, args, base_env = _get_bootstrap_args() - env.update(base_env) - allocator = ProcessAllocator(cmd, args, env) - alloc = allocator.allocate(spec).get() - return ProcMesh.from_alloc(alloc).get() - - def proc_mesh( *, gpus: Optional[int] = None, hosts: int = 1, env: Optional[dict[str, str]] = None ) -> Future[ProcMesh]: return Future( - lambda: proc_mesh_nonblocking(gpus=gpus, hosts=hosts, env=env), - lambda: proc_mesh_blocking(gpus=gpus, hosts=hosts, env=env), + impl=lambda: proc_mesh_nonblocking(gpus=gpus, hosts=hosts, env=env), + requires_loop=False, ) @@ -460,9 +408,12 @@ def proc_mesh( def _get_debug_proc_mesh() -> "ProcMesh": global _debug_proc_mesh if _debug_proc_mesh is None: - _debug_proc_mesh = local_proc_mesh_blocking( - gpus=1, hosts=1, _is_initializing_debugger=True - ) + _debug_proc_mesh = Future( + impl=lambda: local_proc_mesh_nonblocking( + gpus=1, hosts=1, _is_initializing_debugger=True + ), + requires_loop=False, + ).get() return _debug_proc_mesh diff --git a/python/monarch/_src/tensor_engine/rdma.py b/python/monarch/_src/tensor_engine/rdma.py index d8f99478d..11f504c63 100644 --- a/python/monarch/_src/tensor_engine/rdma.py +++ b/python/monarch/_src/tensor_engine/rdma.py @@ -73,18 +73,13 @@ def __init__(self, data: torch.Tensor) -> None: size = storage.element_size() * data.numel() ctx = MonarchContext.get() f = Future( - lambda: _RdmaBuffer.create_rdma_buffer_nonblocking( - addr=addr, - size=size, - proc_id=ctx.proc_id, - client=ctx.mailbox, - ), - lambda: _RdmaBuffer.create_rdma_buffer_blocking( + impl=lambda: _RdmaBuffer.create_rdma_buffer_nonblocking( addr=addr, size=size, proc_id=ctx.proc_id, client=ctx.mailbox, ), + requires_loop=False, ) self._buffer: _RdmaBuffer = f.get() # TODO - specific exception @@ -137,20 +132,7 @@ async def read_into_nonblocking() -> Optional[int]: dst_gpu.copy_(dst) return res - def read_into_blocking() -> Optional[int]: - res = self._buffer.read_into_blocking( - addr=addr, - size=size, - local_proc_id=MonarchContext.get().proc_id, - client=MonarchContext.get().mailbox, - timeout=timeout, - ) - # TODO - remove this once GPU support is added. - if dst_gpu is not None: - dst_gpu.copy_(dst) - return res - - return Future(read_into_nonblocking, read_into_blocking) + return Future(impl=read_into_nonblocking, requires_loop=False) def write_from( self, src: torch.Tensor, offset: int = 0, timeout: int = 3 @@ -194,17 +176,4 @@ async def write_from_nonblocking() -> None: src_gpu.copy_(src) return res - def write_from_blocking() -> None: - res = self._buffer.write_from_blocking( - addr=addr, - size=size, - local_proc_id=MonarchContext.get().proc_id, - client=MonarchContext.get().mailbox, - timeout=timeout, - ) - # TODO - remove this once GPU support is added. - if src_gpu is not None: - src_gpu.copy_(src) - return res - - return Future(write_from_nonblocking, write_from_blocking) + return Future(impl=write_from_nonblocking, requires_loop=False) diff --git a/python/tests/test_allocator.py b/python/tests/test_allocator.py index f363f1a03..e3b8650c3 100644 --- a/python/tests/test_allocator.py +++ b/python/tests/test_allocator.py @@ -204,7 +204,7 @@ async def test_allocate_2d_mesh(self) -> None: self.assert_computed_world_size(values, world_size) - async def test_stop_proc_mesh_blocking(self) -> None: + def test_stop_proc_mesh_blocking(self) -> None: spec = AllocSpec(AllocConstraints(), host=2, gpu=4) with remote_process_allocator() as host1, remote_process_allocator() as host2: allocator = RemoteAllocator( @@ -212,8 +212,8 @@ async def test_stop_proc_mesh_blocking(self) -> None: initializer=StaticRemoteAllocInitializer(host1, host2), heartbeat_interval=_100_MILLISECONDS, ) - alloc = await allocator.allocate(spec) - proc_mesh = await ProcMesh.from_alloc(alloc) + alloc = allocator.allocate(spec).get() + proc_mesh = ProcMesh.from_alloc(alloc).get() actor = proc_mesh.spawn("test_actor", TestActor).get() proc_mesh.stop().get() with self.assertRaises( diff --git a/python/tests/test_python_actors.py b/python/tests/test_python_actors.py index dbda9c463..57513d46b 100644 --- a/python/tests/test_python_actors.py +++ b/python/tests/test_python_actors.py @@ -519,7 +519,7 @@ async def awaitit(f): return await f -def test_actor_future(): +def test_actor_future() -> None: v = 0 async def incr(): @@ -529,32 +529,31 @@ async def incr(): # can use async implementation from sync # if no non-blocking is provided - f = Future(incr) + f = Future(impl=incr, requires_loop=False) assert f.get() == 1 assert v == 1 assert f.get() == 1 assert asyncio.run(awaitit(f)) == 1 - f = Future(incr) + f = Future(impl=incr, requires_loop=False) assert asyncio.run(awaitit(f)) == 2 assert f.get() == 2 - def incr2(): + async def incr2(): nonlocal v v += 2 return v # Use non-blocking optimization if provided - f = Future(incr, incr2) + f = Future(impl=incr2) assert f.get() == 4 - assert asyncio.run(awaitit(f)) == 4 async def nope(): nonlocal v v += 1 raise ValueError("nope") - f = Future(nope) + f = Future(impl=nope, requires_loop=False) with pytest.raises(ValueError): f.get() @@ -571,12 +570,12 @@ async def nope(): assert v == 5 - def nope2(): + async def nope2(): nonlocal v v += 1 raise ValueError("nope") - f = Future(incr, nope2) + f = Future(impl=nope2) with pytest.raises(ValueError): f.get() @@ -598,7 +597,7 @@ def nope2(): async def seven(): return 7 - f = Future(seven) + f = Future(impl=seven, requires_loop=False) assert 7 == f.get(timeout=0.001) @@ -606,7 +605,7 @@ async def neverfinish(): f = asyncio.Future() await f - f = Future(neverfinish) + f = Future(impl=neverfinish, requires_loop=True) with pytest.raises(asyncio.exceptions.TimeoutError): f.get(timeout=0.1) From 6988f5009763f6efda40c67dedbb3a76bcf1f42d Mon Sep 17 00:00:00 2001 From: zdevito Date: Fri, 18 Jul 2025 19:33:46 -0700 Subject: [PATCH 2/2] Update on "Define blocking variants of everything in terms of non-blocking variant." This uses the PyPythonTask functionality developed in the last diff to delete all the _blocking variants of exposing tokio async work to Python. The new approach is to define the entire thing in async and then wrap it in a future: `Future(async_impl, requires_loop=False)`. If async_impl only awaits on (1) PythonTask objects, and (2) other coroutine functions, then it is correct to pass `requires_loop=False`, which keeps execution speed the same as the blocking version. Otherwise, pass requires_loop=True and the code will work correctly in both async and sync contexts. Differential Revision: [D78585722](https://our.internmc.facebook.com/intern/diff/D78585722/) [ghstack-poisoned] --- python/tests/test_allocator.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/python/tests/test_allocator.py b/python/tests/test_allocator.py index e3b8650c3..a92af9592 100644 --- a/python/tests/test_allocator.py +++ b/python/tests/test_allocator.py @@ -31,6 +31,9 @@ ChannelAddr, ChannelTransport, ) + +from monarch._src.actor.actor_mesh import fake_sync_state + from monarch._src.actor.allocator import ( ALLOC_LABEL_PROC_MESH_NAME, RemoteAllocator, @@ -204,7 +207,7 @@ async def test_allocate_2d_mesh(self) -> None: self.assert_computed_world_size(values, world_size) - def test_stop_proc_mesh_blocking(self) -> None: + async def test_stop_proc_mesh_blocking(self) -> None: spec = AllocSpec(AllocConstraints(), host=2, gpu=4) with remote_process_allocator() as host1, remote_process_allocator() as host2: allocator = RemoteAllocator( @@ -212,10 +215,14 @@ def test_stop_proc_mesh_blocking(self) -> None: initializer=StaticRemoteAllocInitializer(host1, host2), heartbeat_interval=_100_MILLISECONDS, ) - alloc = allocator.allocate(spec).get() - proc_mesh = ProcMesh.from_alloc(alloc).get() - actor = proc_mesh.spawn("test_actor", TestActor).get() - proc_mesh.stop().get() + + alloc = await allocator.allocate(spec) + proc_mesh = await ProcMesh.from_alloc(alloc) + # XXX - it is not clear why this trying to use + # async code in a sync context. + with fake_sync_state(): + actor = proc_mesh.spawn("test_actor", TestActor).get() + proc_mesh.stop().get() with self.assertRaises( RuntimeError, msg="`ProcMesh` has already been stopped" ):