Skip to content

(6/final) Updates rdma.py with tensor_engine RDMA #582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions monarch_rdma/extension/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ impl PyRdmaBuffer {
))
}

#[classmethod]
fn create_rdma_buffer_blocking<'py>(
_cls: &Bound<'_, PyType>,
py: Python<'py>,
addr: usize,
size: usize,
proc_id: String,
client: PyMailbox,
) -> PyResult<PyRdmaBuffer> {
if !ibverbs_supported() {
return Err(PyException::new_err(
"ibverbs is not supported on this system",
));
}
signal_safe_block_on(
py,
create_rdma_buffer(addr, size, proc_id.parse().unwrap(), client),
)?
}

#[classmethod]
fn rdma_supported<'py>(_cls: &Bound<'_, PyType>, _py: Python<'py>) -> bool {
ibverbs_supported()
Expand Down
4 changes: 4 additions & 0 deletions python/monarch/_rust_bindings/rdma/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ async def create_rdma_manager_nonblocking(proc_mesh: Any) -> Optional[_RdmaManag
class _RdmaBuffer:
name: str

@classmethod
def create_rdma_buffer_blocking(
cls, addr: int, size: int, proc_id: str, client: Any
) -> _RdmaBuffer: ...
@classmethod
def create_rdma_buffer_nonblocking(
cls, addr: int, size: int, proc_id: str, client: Any
Expand Down
15 changes: 8 additions & 7 deletions python/monarch/_src/actor/proc_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# pyre-strict

import logging
import os
import sys
import warnings
Expand All @@ -24,7 +25,6 @@
)

from monarch._rust_bindings.monarch_extension.logging import LoggingMeshClient

from monarch._rust_bindings.monarch_hyperactor.alloc import ( # @manual=//monarch/monarch_extension:monarch_extension
Alloc,
AllocConstraints,
Expand Down Expand Up @@ -63,13 +63,14 @@

HAS_TENSOR_ENGINE = False
try:
# TODO: while the tensor_engine submodule doesn't exist yet, use the
# available of monarch.rdma as a proxy.
# type: ignore
from monarch.rdma import RDMAManager # @manual
from monarch._rust_bindings.rdma import ( # type: ignore[import]
_RdmaManager,
create_rdma_manager_blocking,
)

HAS_TENSOR_ENGINE = True
except ImportError:
logging.warning("RDMA is not available on this platform")
pass


Expand Down Expand Up @@ -102,7 +103,7 @@ def __init__(
self._proc_mesh = hy_proc_mesh
self._mock_shape: Optional[Shape] = _mock_shape
# type: ignore[21]
self._rdma_manager: Optional["RDMAManager"] = None
self._rdma_manager: Optional["_RdmaManager"] = None
self._debug_manager: Optional[DebugManager] = None
self._mailbox: Mailbox = self._proc_mesh.client
self._code_sync_client: Optional[CodeSyncMeshClient] = None
Expand All @@ -117,7 +118,7 @@ def __init__(
with fake_sync_state():
if _mock_shape is None and HAS_TENSOR_ENGINE:
# type: ignore[21]
self._rdma_manager = self.spawn("rdma_manager", RDMAManager).get()
self._rdma_manager = create_rdma_manager_blocking(self._proc_mesh)
if not _is_initializing_debugger and _mock_shape is None:
self._debug_manager = self.spawn(
_DEBUG_MANAGER_ACTOR_NAME, DebugManager, debug_client()
Expand Down
29 changes: 15 additions & 14 deletions python/monarch/_src/tensor_engine/rdma.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
from typing import Optional

import torch
from monarch._rust_bindings.rdma import _RdmaBuffer

try:
from monarch._rust_bindings.rdma import _RdmaBuffer
except ImportError as e:
logging.error("RDMA is not available: {}".format(e))
raise e
from monarch._src.actor.actor_mesh import MonarchContext
from monarch._src.actor.future import Future

from monarch.actor import MonarchContext


# RDMARead/WriteTransferWarnings are warnings that are only printed once per process.
# Remove these once GPU support is added.
Expand All @@ -30,7 +33,7 @@ class RDMAWriteTransferWarning(Warning):
warnings.simplefilter("once", RDMAWriteTransferWarning)


def rdma_supported():
def is_available():
return _RdmaBuffer.rdma_supported()


Expand All @@ -52,7 +55,9 @@ def __init__(self, data: torch.Tensor) -> None:

TODO: Create TensorBuffer, which will be main user API supporting non-contiguous , multi-byte-per-elment tensors
"""
assert _RdmaBuffer.rdma_supported()
assert (
is_available()
), "Tried to create an RDMABuffer, but RDMA is not available on this platform."

if data.device.type != "cpu":
# TODO - CUDA support for RDMABuffer exists at the Rust layer, but
Expand All @@ -72,16 +77,12 @@ def __init__(self, data: torch.Tensor) -> None:
addr: int = storage.data_ptr()
size = storage.element_size() * data.numel()
ctx = MonarchContext.get()
f = Future(
impl=lambda: _RdmaBuffer.create_rdma_buffer_nonblocking(
addr=addr,
size=size,
proc_id=ctx.proc_id,
client=ctx.mailbox,
),
requires_loop=False,
self._buffer: _RdmaBuffer = _RdmaBuffer.create_rdma_buffer_blocking(
addr=addr,
size=size,
proc_id=ctx.proc_id,
client=ctx.mailbox,
)
self._buffer: _RdmaBuffer = f.get()
# TODO - specific exception
except Exception as e:
logging.error("Failed to create buffer %s", e)
Expand Down
161 changes: 0 additions & 161 deletions python/monarch/rdma.py

This file was deleted.

23 changes: 23 additions & 0 deletions python/monarch/tensor_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
Monarch Tensor Engine API - Public interface for tensor engine functionality.
"""

from monarch._src.tensor_engine.rdma import (
is_available,
RDMABuffer,
RDMAReadTransferWarning,
RDMAWriteTransferWarning,
)

__all__ = [
"is_available",
"RDMABuffer",
"RDMAReadTransferWarning",
"RDMAWriteTransferWarning",
]
Loading
Loading