Skip to content

fix stream dependencies in callbacks #246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions torchft/collectives.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ def callback(fut: Future[list[torch.Tensor]]) -> list[torch.Tensor]:
nonlocal tensors, quantized_tensors, world_size, sync_stream

with torch.cuda.stream(sync_stream):
# Setup stream dependency
fut.wait()
# Dequantize the result back to the original precision
fused_dequantize_from_fp8(tensors, quantized_tensors, world_size)
return tensors
Expand Down
11 changes: 8 additions & 3 deletions torchft/local_sgd.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,14 @@ def _bucketize_and_allreduce(
)

def callback(fut: torch.futures.Future[torch.Tensor]) -> None:
nonlocal bucket_tensors, flat_buffer
for t, pack_offset, numel in bucket_tensors:
t.copy_(flat_buffer[pack_offset : pack_offset + numel].view_as(t))
with torch.cuda.stream(self._stream) if self._stream else nullcontext():
nonlocal bucket_tensors, flat_buffer
# Setup stream dependency
fut.wait()
for t, pack_offset, numel in bucket_tensors:
t.copy_(
flat_buffer[pack_offset : pack_offset + numel].view_as(t)
)

work = work.then(callback)
self._allreduce_futures.append(work)
Expand Down
2 changes: 2 additions & 0 deletions torchft/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ def callback(
# change the stream to avoid making the callback stream
# dependent on process group stream running the allreduce
with torch.cuda.stream(stream) if stream is not None else nullcontext():
# Setup stream dependency
fut.wait()
fut.value()
tensor /= num_participants

Expand Down
9 changes: 4 additions & 5 deletions train_diloco.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
ProcessGroupGloo,
ProcessGroupNCCL,
)
from torchft.checkpointing.pg_transport import PGTransport
from torchft.checkpointing.http_transport import HTTPTransport
from torchft.local_sgd import DiLoCo

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -67,13 +67,12 @@ def state_dict():
timeout=timedelta(seconds=10),
)
if torch.cuda.is_available() and USE_NCCL
else ProcessGroupGloo(timeout=timedelta(seconds=5))
else ProcessGroupGloo(timeout=timedelta(seconds=10))
)

transport = PGTransport(
pg,
transport = HTTPTransport(
timeout=timedelta(seconds=10),
device=device,
num_chunks=0,
)

manager = Manager(
Expand Down