From 836ed81ecde97f499cb48fcadbc8fff9b2ffd592 Mon Sep 17 00:00:00 2001 From: "Han, Chao1" Date: Wed, 4 Jun 2025 22:13:39 +0800 Subject: [PATCH 1/3] add options for xccl work --- torch/csrc/distributed/c10d/init.cpp | 21 +++++++++++++++++++-- torch/distributed/distributed_c10d.py | 12 +++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp index 6f5ae85ca9a6..fe3bc69817b0 100644 --- a/torch/csrc/distributed/c10d/init.cpp +++ b/torch/csrc/distributed/c10d/init.cpp @@ -3372,7 +3372,9 @@ Example:: .def( py::init([](const c10::intrusive_ptr<::c10d::Store>& store, int rank, - int size) { + int size, + c10::intrusive_ptr<::c10d::ProcessGroupXCCL::Options> + options) { // gil_scoped_release is not safe as a call_guard in init. // https://github.com/pybind/pybind11/issues/5473 py::gil_scoped_release nogil{}; @@ -3382,7 +3384,22 @@ Example:: }), py::arg("store"), py::arg("rank"), - py::arg("size")); + py::arg("size"), + py::arg("options"), + R"(Create a new ProcessGroupXCCL instance.)"); + intrusive_ptr_class_<::c10d::ProcessGroupXCCL::Options>( + processGroupXCCL, + "Options", + backendOptions) + .def(py::init(), py::arg("is_high_priority_stream") = false) + .def_readwrite( + "is_high_priority_stream", + &::c10d::ProcessGroupXCCL::Options::is_high_priority_stream) + .def_readwrite( + "global_ranks_in_group", + &::c10d::ProcessGroupXCCL::Options::global_ranks_in_group) + .def_readwrite( + "group_name", &::c10d::ProcessGroupXCCL::Options::group_name); #endif #ifdef USE_C10D_UCC diff --git a/torch/distributed/distributed_c10d.py b/torch/distributed/distributed_c10d.py index 866658515a74..18ac8344600e 100644 --- a/torch/distributed/distributed_c10d.py +++ b/torch/distributed/distributed_c10d.py @@ -2033,8 +2033,18 @@ def _new_process_group_helper( elif backend_str == Backend.XCCL: if not is_xccl_available(): raise RuntimeError("Distributed package doesn't have XCCL built in") + if backend_options is not None: + assert isinstance(backend_options, ProcessGroupXCCL.Options), ( + "Expected backend_options argument to be of type ProcessGroupXCCL.Options" + ) + else: + # default backend_options for XCCL + backend_options = ProcessGroupXCCL.Options() + backend_options.is_high_priority_stream = False + backend_options.global_ranks_in_group = global_ranks_in_group + backend_options.group_name = group_name backend_class = ProcessGroupXCCL( - backend_prefix_store, group_rank, group_size + backend_prefix_store, group_rank, group_size, backend_options ) backend_type = ProcessGroup.BackendType.XCCL else: From 70764ffe33a98425481f6d3f55ff9b67643d2231 Mon Sep 17 00:00:00 2001 From: "Han, Chao1" Date: Mon, 7 Jul 2025 16:31:19 +0800 Subject: [PATCH 2/3] add comm split support --- torch/csrc/distributed/c10d/init.cpp | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp index fe3bc69817b0..d1f6a98b37c9 100644 --- a/torch/csrc/distributed/c10d/init.cpp +++ b/torch/csrc/distributed/c10d/init.cpp @@ -3386,15 +3386,35 @@ Example:: py::arg("rank"), py::arg("size"), py::arg("options"), - R"(Create a new ProcessGroupXCCL instance.)"); + R"(Create a new ProcessGroupXCCL instance.)") + .def( + "comm_split_count", + &::c10d::ProcessGroupXCCL::getCommSplitCounter) + .def_property( + "bound_device_id", + &::c10d::ProcessGroupXCCL::getBoundDeviceId, + &::c10d::ProcessGroupXCCL::setBoundDeviceId, + R"(Return the bound device id.)") + .def( + "perform_nocolor_split", + &::c10d::ProcessGroupXCCL::performNocolorSplit) + .def( + "_is_initialized", + &::c10d::ProcessGroupXCCL::isInitialized, + py::call_guard()); intrusive_ptr_class_<::c10d::ProcessGroupXCCL::Options>( processGroupXCCL, "Options", backendOptions) .def(py::init(), py::arg("is_high_priority_stream") = false) + .def_readwrite("config", &::c10d::ProcessGroupXCCL::Options::config) .def_readwrite( "is_high_priority_stream", &::c10d::ProcessGroupXCCL::Options::is_high_priority_stream) + .def_readwrite( + "split_from", &::c10d::ProcessGroupXCCL::Options::split_from) + .def_readwrite( + "split_color", &::c10d::ProcessGroupXCCL::Options::split_color) .def_readwrite( "global_ranks_in_group", &::c10d::ProcessGroupXCCL::Options::global_ranks_in_group) From aba09612bd9fb1f41d65892bf240b7f0d45bb32e Mon Sep 17 00:00:00 2001 From: "Han, Chao1" Date: Wed, 16 Jul 2025 15:45:10 +0800 Subject: [PATCH 3/3] update --- torch/csrc/distributed/c10d/init.cpp | 33 +++++++++++++++++++++------ torch/distributed/distributed_c10d.py | 15 ++++++++++-- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp index d1f6a98b37c9..d6fb2bd047a0 100644 --- a/torch/csrc/distributed/c10d/init.cpp +++ b/torch/csrc/distributed/c10d/init.cpp @@ -3380,17 +3380,38 @@ Example:: py::gil_scoped_release nogil{}; return c10::make_intrusive<::c10d::ProcessGroupXCCL>( - store, rank, size); + store, rank, size, std::move(options)); }), py::arg("store"), py::arg("rank"), py::arg("size"), py::arg("options"), R"(Create a new ProcessGroupXCCL instance.)") - .def( + .def( + py::init([](const c10::intrusive_ptr<::c10d::Store>& store, + int rank, + int size) { + // gil_scoped_release is not safe as a call_guard in init. + // https://github.com/pybind/pybind11/issues/5473 + py::gil_scoped_release nogil{}; + + auto options = ::c10d::ProcessGroupXCCL::Options::create(); + options->is_high_priority_stream = false; + return c10::make_intrusive<::c10d::ProcessGroupXCCL>( + store, rank, size, options); + }), + py::arg("store"), + py::arg("rank"), + py::arg("size"), + R"(Create a new ProcessGroupXCCL instance.)") + .def( "comm_split_count", &::c10d::ProcessGroupXCCL::getCommSplitCounter) - .def_property( + .def_property_readonly( + "options", + &::c10d::ProcessGroupXCCL::getOptions, + R"(Return the options used to create this ProcessGroupXCCL instance.)") + .def_property( "bound_device_id", &::c10d::ProcessGroupXCCL::getBoundDeviceId, &::c10d::ProcessGroupXCCL::setBoundDeviceId, @@ -3398,14 +3419,12 @@ Example:: .def( "perform_nocolor_split", &::c10d::ProcessGroupXCCL::performNocolorSplit) - .def( + .def( "_is_initialized", &::c10d::ProcessGroupXCCL::isInitialized, py::call_guard()); intrusive_ptr_class_<::c10d::ProcessGroupXCCL::Options>( - processGroupXCCL, - "Options", - backendOptions) + processGroupXCCL, "Options", backendOptions) .def(py::init(), py::arg("is_high_priority_stream") = false) .def_readwrite("config", &::c10d::ProcessGroupXCCL::Options::config) .def_readwrite( diff --git a/torch/distributed/distributed_c10d.py b/torch/distributed/distributed_c10d.py index 18ac8344600e..bfbf1ee73436 100644 --- a/torch/distributed/distributed_c10d.py +++ b/torch/distributed/distributed_c10d.py @@ -5052,7 +5052,7 @@ def split_group( ) parent_group_rank = parent_global_to_group_ranks[global_rank] - parent_backend = parent_pg._get_backend(torch.device("cuda")) + parent_backend = parent_pg._get_backend(device_id) # if the parent backend does not support splitting, raise error # currently this API only support NCCL backend @@ -5133,6 +5133,15 @@ def split_group( backend_class = ProcessGroupNCCL( prefix_store, group_rank, len(my_group), pg_options ) + elif parent_backend_str == Backend.XCCL: + backend_type = ProcessGroup.BackendType.XCCL + if not isinstance(pg_options, ProcessGroupXCCL.Options): + raise RuntimeError( + "Expected pg_options argument to be of type ProcessGroupXCCL.Options" + ) + backend_class = ProcessGroupXCCL( + prefix_store, group_rank, len(my_group), pg_options + ) else: assert parent_backend_str.upper() in Backend._plugins, ( f"Unknown c10d backend type {parent_backend_str.upper()}" @@ -5153,7 +5162,9 @@ def split_group( pg._set_default_backend(backend_type) backend_class._set_sequence_number_for_group() - pg._register_backend(torch.device("cuda"), backend_type, backend_class) + pg._register_backend( + torch.accelerator.current_accelerator(), backend_type, backend_class + ) # set group_name and group_desc to backend assert group_name is not None