From 9a0ff9151295b1f0536255e84a8ac44eb14206ea Mon Sep 17 00:00:00 2001 From: Benji Pelletier Date: Thu, 24 Jul 2025 14:10:02 -0700 Subject: [PATCH] Use Python spawned threads for worker event loop creation (#626) Summary: Seeing the following error when running paft at large scale: ``` Fatal Python error: PyGILState_Release: thread state 0x7f7271468000 must be current when releasing ``` Seems to be an issue with interpreter shutdown due to the main thread (actor_paft.py) ending before messages are done being handled in background actors. The interpreter begins shutdown because it is not aware of our raw rust thread created to host py actor event loops. * Make the thread needed to handle py actors a Python Thread * Create the asyncio runtime for py actors in python as well * Grab the asyncio runtime to use for task locals through a global Reviewed By: dulinriley Differential Revision: D78745022 --- monarch_hyperactor/src/actor.rs | 21 +++++- monarch_hyperactor/src/config.rs | 3 +- python/monarch/_src/actor/event_loop.py | 97 +++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 python/monarch/_src/actor/event_loop.py diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index d8cd381d4..f329510db 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -445,7 +445,9 @@ impl PythonActor { self.task_locals.as_ref().unwrap_or_else(|| { // Use shared TaskLocals static SHARED_TASK_LOCALS: OnceLock = OnceLock::new(); - Python::allow_threads(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals)) + Python::allow_threads(py, || { + SHARED_TASK_LOCALS.get_or_init(create_shared_task_locals) + }) }) } } @@ -491,7 +493,24 @@ impl Actor for PythonActor { } } +/// Get TaskLocals that use the worker event loop created in bootstrap_main.py. +/// This is used for the shared runtime. +fn create_shared_task_locals() -> pyo3_async_runtimes::TaskLocals { + Python::with_gil(|py| { + let bootstrap_module = py.import("monarch._src.actor.event_loop").unwrap(); + + let event_loop = bootstrap_module + .call_method0("get_event_loop") + .expect("Could not find get_event_loop in event_loop.py"); + + pyo3_async_runtimes::TaskLocals::new(event_loop) + .copy_context(py) + .unwrap() + }) +} + /// Create a new TaskLocals with its own asyncio event loop in a dedicated thread. +/// This is used for per-actor runtimes. fn create_task_locals() -> pyo3_async_runtimes::TaskLocals { let (tx, rx) = std::sync::mpsc::channel(); let _ = std::thread::spawn(move || { diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 3cc48d743..497b8f562 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -16,5 +16,6 @@ use hyperactor::attrs::declare_attrs; // Declare monarch-specific configuration keys declare_attrs! { /// Use a single asyncio runtime for all Python actors, rather than one per actor - pub attr SHARED_ASYNCIO_RUNTIME: bool = false; + /// Note: use shared runtime if you have a lot of Python actors, otherwise too many threads can be spawned + pub attr SHARED_ASYNCIO_RUNTIME: bool = true; } diff --git a/python/monarch/_src/actor/event_loop.py b/python/monarch/_src/actor/event_loop.py new file mode 100644 index 000000000..e09b95231 --- /dev/null +++ b/python/monarch/_src/actor/event_loop.py @@ -0,0 +1,97 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Module for managing the event loop used by Monarch Python actors. +This provides a way to create a Python-aware thread from Rust that runs the worker event loop. +""" + +import asyncio +import logging +import threading +from typing import Optional + +from libfb.py.pyre import none_throws + +logger = logging.getLogger(__name__) + +_event_loop: Optional[asyncio.AbstractEventLoop] = None +_lock = threading.Lock() +_ready = threading.Event() + + +def _initialize_event_loop() -> None: + """ + Internal function to initialize the event loop. + This creates a new thread with an event loop that runs forever. + """ + global _event_loop, _ready + if _event_loop is not None: + return + + # Create a new thread that will host our event loop + def event_loop_thread(): + """Target function for the event loop thread.""" + global _event_loop, _ready + try: + # Create a new event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + _event_loop = loop + _ready.set() + + logger.debug( + f"Python worker event loop thread started: {threading.current_thread().name}" + ) + try: + # Run the event loop forever + loop.run_forever() + finally: + # Clean up when the loop stops + logger.debug("Python worker event loop stopped, closing...") + loop.close() + except Exception as e: + logger.error(f"Error in event loop thread: {e}") + _ready.set() + raise + + # Create and start the thread + threading.Thread( + target=event_loop_thread, + name="asyncio-event-loop", + daemon=True, # Make it a daemon thread so it doesn't block program exit + ).start() + + _ready.wait() # Wait for the event loop to be ready + + if _event_loop is None: + raise RuntimeError("Failed to initialize event loop") + + +def get_event_loop() -> asyncio.AbstractEventLoop: + """ + Get the Python worker event loop. + If no event loop is currently running, this will start a new one. + + Expected to be called from rust code. + """ + global _event_loop + if _event_loop is None: + with _lock: + _initialize_event_loop() + return none_throws(_event_loop) + + +def stop_event_loop() -> None: + """ + Stop the event loop gracefully. + """ + global _event_loop + if _event_loop is not None: + logger.debug("Stopping event loop...") + event_loop = none_throws(_event_loop) + event_loop.call_soon_threadsafe(event_loop.stop)