Skip to content

Use Python spawned threads for worker event loop creation #626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ impl PythonActor {
self.task_locals.as_ref().unwrap_or_else(|| {
// Use shared TaskLocals
static SHARED_TASK_LOCALS: OnceLock<pyo3_async_runtimes::TaskLocals> = OnceLock::new();
Python::allow_threads(py, || SHARED_TASK_LOCALS.get_or_init(create_task_locals))
Python::allow_threads(py, || {
SHARED_TASK_LOCALS.get_or_init(create_shared_task_locals)
})
})
}
}
Expand Down Expand Up @@ -491,7 +493,24 @@ impl Actor for PythonActor {
}
}

/// Get TaskLocals that use the worker event loop created in bootstrap_main.py.
/// This is used for the shared runtime.
fn create_shared_task_locals() -> pyo3_async_runtimes::TaskLocals {
Python::with_gil(|py| {
let bootstrap_module = py.import("monarch._src.actor.event_loop").unwrap();

let event_loop = bootstrap_module
.call_method0("get_event_loop")
.expect("Could not find get_event_loop in event_loop.py");

pyo3_async_runtimes::TaskLocals::new(event_loop)
.copy_context(py)
.unwrap()
})
}

/// Create a new TaskLocals with its own asyncio event loop in a dedicated thread.
/// This is used for per-actor runtimes.
fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
let (tx, rx) = std::sync::mpsc::channel();
let _ = std::thread::spawn(move || {
Expand Down
3 changes: 2 additions & 1 deletion monarch_hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ use hyperactor::attrs::declare_attrs;
// Declare monarch-specific configuration keys
declare_attrs! {
/// Use a single asyncio runtime for all Python actors, rather than one per actor
pub attr SHARED_ASYNCIO_RUNTIME: bool = false;
/// Note: use shared runtime if you have a lot of Python actors, otherwise too many threads can be spawned
pub attr SHARED_ASYNCIO_RUNTIME: bool = true;
}
97 changes: 97 additions & 0 deletions python/monarch/_src/actor/event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
Module for managing the event loop used by Monarch Python actors.
This provides a way to create a Python-aware thread from Rust that runs the worker event loop.
"""

import asyncio
import logging
import threading
from typing import Optional

from libfb.py.pyre import none_throws

logger = logging.getLogger(__name__)

_event_loop: Optional[asyncio.AbstractEventLoop] = None
_lock = threading.Lock()
_ready = threading.Event()


def _initialize_event_loop() -> None:
"""
Internal function to initialize the event loop.
This creates a new thread with an event loop that runs forever.
"""
global _event_loop, _ready
if _event_loop is not None:
return

# Create a new thread that will host our event loop
def event_loop_thread():
"""Target function for the event loop thread."""
global _event_loop, _ready
try:
# Create a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

_event_loop = loop
_ready.set()

logger.debug(
f"Python worker event loop thread started: {threading.current_thread().name}"
)
try:
# Run the event loop forever
loop.run_forever()
finally:
# Clean up when the loop stops
logger.debug("Python worker event loop stopped, closing...")
loop.close()
except Exception as e:
logger.error(f"Error in event loop thread: {e}")
_ready.set()
raise

# Create and start the thread
threading.Thread(
target=event_loop_thread,
name="asyncio-event-loop",
daemon=True, # Make it a daemon thread so it doesn't block program exit
).start()

_ready.wait() # Wait for the event loop to be ready

if _event_loop is None:
raise RuntimeError("Failed to initialize event loop")


def get_event_loop() -> asyncio.AbstractEventLoop:
"""
Get the Python worker event loop.
If no event loop is currently running, this will start a new one.

Expected to be called from rust code.
"""
global _event_loop
if _event_loop is None:
with _lock:
_initialize_event_loop()
return none_throws(_event_loop)


def stop_event_loop() -> None:
"""
Stop the event loop gracefully.
"""
global _event_loop
if _event_loop is not None:
logger.debug("Stopping event loop...")
event_loop = none_throws(_event_loop)
event_loop.call_soon_threadsafe(event_loop.stop)
Loading