diff --git a/slurm_b200 b/slurm_b200 new file mode 100644 index 000000000..daa958bda --- /dev/null +++ b/slurm_b200 @@ -0,0 +1,80 @@ +#!/bin/bash +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. + +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# --- This script is optimized for AWS with EFA +# --- adjust NCCL_BUFFSIZE if you encounter memory +# --- constraint issues or to tune for improved performance. +# --- + +#SBATCH --job-name=ahmads_titan1 + +#SBATCH --ntasks=2 + +#SBATCH --nodes=2 + +#SBATCH --cpus-per-task=96 +#use this to run with specific nodes: +# --nodelist=slurm-compute-node-[3-8,22-31,33-55,57-74,76-80,82-92,94,96,98,187,193-194,200,202-203,205,207-212,214-215,217-225,227-234,236,238-248] +# and --exclude=slurm-compute-node-[node ids list here] + +# not needed for b200 .... export NCCL_IB_HCA=^mlx5_0:1 +# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml +export UCX_NET_DEVICES=ens7 + +nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) +nodes_array=($nodes) +head_node=${nodes_array[0]} +head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address) + +echo All nodes: ${nodes[@]} +echo Node IP: $head_node_ip +export LOGLEVEL=INFO + +# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml +# Enable for A100 +# export FI_PROVIDER="efa" +# Ensure that P2P is available +# export NCCL_P2P_DISABLE=1 +export NCCL_IB_DISABLE=0 + +# debugging flags (optional) +export NCCL_DEBUG=WARN +export PYTHONFAULTHANDLER=1 +# optional debug settings +# export NCCL_DEBUG=INFO +# NCCL_DEBUG_SUBSYS=INIT,GRAPH,ENV + +#export LD_LIBRARY_PATH=/opt/amazon/efa/lib:$LD_LIBRARY_PATH +#export LD_LIBRARY_PATH=/usr/local/lib/:$LD_LIBRARY_PATH +export CUDA_LAUNCH_BLOCKING=0 +export WANDB_PROJECT="b200_titan" +# on your cluster you might need these: +# set the network interface +export NCCL_SOCKET_IFNAME="eth0,en,eth,em,bond" +export NCCL_BUFFSIZE=2097152 +#export TORCH_DIST_INIT_BARRIER=1 +export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 + +TITAN_CONFIG_FILE=${TITAN_CONFIG_FILE:-"./torchtitan/models/llama3/train_configs/llama3_8b.toml"} + +# dcgmi profile --pause +# adjust sbatch --ntasks and sbatch --nodes above and --nnodes below +# to your specific node count, and update target launch file. +# This works with multiple nodes now: +#srun conda run -n ahmads_titan torchrun --nnodes 2 --nproc_per_node 8 --rdzv_id 101 --rdzv_backend c10d --rdzv_endpoint "$head_node_ip:29500" -m torchtitan.train --job.config_file ${TITAN_CONFIG_FILE} +#exit + +# This is the baseline and it works. +#export NGPU=8 +#srun conda run -n ahmads_titan ./run_train.sh + +export LOCAL_WORLD_SIZE=8 +export NUM_HOSTS=8 +export MASTER_ADDR=$head_node_ip +export MASTER_PORT=12347 + +srun --nodes=$NUM_HOSTS --ntasks=$NUM_HOSTS conda run -n ahmads_titan python -m torchtitan.train_monarch --job.config_file ./torchtitan/models/llama3/train_configs/llama3_8b.toml diff --git a/slurm_hyperactor b/slurm_hyperactor new file mode 100644 index 000000000..16521eb09 --- /dev/null +++ b/slurm_hyperactor @@ -0,0 +1,96 @@ +#!/bin/bash +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. + +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# --- This script is optimized for AWS with EFA +# --- adjust NCCL_BUFFSIZE if you encounter memory +# --- constraint issues or to tune for improved performance. +# --- + +#SBATCH --job-name=ahmads_titan1 + +#SBATCH --ntasks=8 + +#SBATCH --nodes=8 + +#SBATCH --cpus-per-task=96 +#use this to run with specific nodes: +# --nodelist=slurm-compute-node-[3-8,22-31,33-55,57-74,76-80,82-92,94,96,98,187,193-194,200,202-203,205,207-212,214-215,217-225,227-234,236,238-248] +# and --exclude=slurm-compute-node-[node ids list here] + +# not needed for b200 .... export NCCL_IB_HCA=^mlx5_0:1 +# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml +export UCX_NET_DEVICES=ens7 + +nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) +nodes_array=($nodes) +head_node=${nodes_array[0]} +head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address) + +echo All nodes: ${nodes[@]} +echo Node IP: $head_node_ip + +node_ips=() +for node in "${nodes[@]}" +do + node_ip=$(getent hosts $node | awk '{ print $1 }') + node_ips+=($node_ip) +done + +echo "All ip addresses" +echo "${node_ips[@]}" + + +export LOGLEVEL=INFO + +# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml +# Enable for A100 +# export FI_PROVIDER="efa" +# Ensure that P2P is available +# export NCCL_P2P_DISABLE=1 +export NCCL_IB_DISABLE=0 + +# debugging flags (optional) +export NCCL_DEBUG=WARN +export PYTHONFAULTHANDLER=1 +# optional debug settings +# export NCCL_DEBUG=INFO +# NCCL_DEBUG_SUBSYS=INIT,GRAPH,ENV + +#export LD_LIBRARY_PATH=/opt/amazon/efa/lib:$LD_LIBRARY_PATH +#export LD_LIBRARY_PATH=/usr/local/lib/:$LD_LIBRARY_PATH +export CUDA_LAUNCH_BLOCKING=0 +export WANDB_PROJECT="b200_titan" +# on your cluster you might need these: +# set the network interface +export NCCL_SOCKET_IFNAME="eth0,en,eth,em,bond" +export NCCL_BUFFSIZE=2097152 +#export TORCH_DIST_INIT_BARRIER=1 +export FI_EFA_SET_CUDA_SYNC_MEMOPS=0 + +TITAN_CONFIG_FILE=${TITAN_CONFIG_FILE:-"./torchtitan/models/llama3/train_configs/llama3_8b.toml"} + +# dcgmi profile --pause +# adjust sbatch --ntasks and sbatch --nodes above and --nnodes below +# to your specific node count, and update target launch file. +# This works with multiple nodes now: +#srun conda run -n ahmads_titan torchrun --nnodes 2 --nproc_per_node 8 --rdzv_id 101 --rdzv_backend c10d --rdzv_endpoint "$head_node_ip:29500" -m torchtitan.train --job.config_file ${TITAN_CONFIG_FILE} +#exit + +# This is the baseline and it works. +#export NGPU=8 +#srun conda run -n ahmads_titan ./run_train.sh + +export LOCAL_WORLD_SIZE=8 +export NUM_HOSTS=8 +export MASTER_ADDR=$head_node_ip +export MASTER_PORT=12347 +export PYTHONPATH=$PYTHONPATH:$(readlink -f .) + +echo "About to run process allocator" +# Run a process allocator in all hosts +srun --nodes=$NUM_HOSTS --ntasks=$NUM_HOSTS conda run -n ahmads_titan3 process_allocator --program=monarch_bootstrap +#python -m torchtitan.train_monarch --job.config_file ./torchtitan/models/llama3/train_configs/llama3_8b.toml diff --git a/torchtitan/train.py b/torchtitan/train.py index 9340671d7..4d8f25f20 100644 --- a/torchtitan/train.py +++ b/torchtitan/train.py @@ -32,6 +32,8 @@ maybe_enable_profiling, ) +logger.info = logger.error + class Trainer(torch.distributed.checkpoint.stateful.Stateful): job_config: JobConfig @@ -536,6 +538,9 @@ def close(self) -> None: config_manager = ConfigManager() config = config_manager.parse_args() trainer: Optional[Trainer] = None + rank = int(os.environ["RANK"]) + if rank == 15: + print(os.environ) try: trainer = Trainer(config) diff --git a/torchtitan/train_monarch.py b/torchtitan/train_monarch.py new file mode 100644 index 000000000..7842a4d4a --- /dev/null +++ b/torchtitan/train_monarch.py @@ -0,0 +1,141 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +import asyncio +import socket +import importlib +import os +import pickle +import threading +import sys +import time +from datetime import timedelta +from logging import getLogger +from typing import Any, Generator, Iterable, Optional +import torch +import torchtitan.components.ft as ft +import torchtitan.protocols.train_spec as train_spec_module +from monarch._rust_bindings.monarch_hyperactor.proc_mesh import ProcMesh as HyProcMesh +from monarch.actor_mesh import Actor, current_rank, endpoint +from monarch.proc_mesh import proc_mesh, ProcMesh +from monarch_meta._monarch_meta import hyperactor_meta +from torchtitan.config_manager import ConfigManager, JobConfig +from torchtitan.tools.logging import init_logger, logger +from .train import Trainer + +def pretend_you_are_torchrun(global_rank): + """ + Eventually, Monarch should handle all of this, but it's necessary for now because the job is + not running torchrun. Also there are already better ways to avoid hardcoding this, but + it's a demo and we'll live for now. + """ + # task_id = int(os.environ["TW_TASK_ID"]) + # global_rank = task_id * 8 + (global_rank % 8) + task_id = int(os.environ["SLURM_NODEID"]) + local_world_size = int(os.environ["LOCAL_WORLD_SIZE"]) + num_hosts = int(os.environ["NUM_HOSTS"]) + + global_rank = task_id * local_world_size + global_rank + + world_size = num_hosts * local_world_size + local_rank = min(world_size, global_rank % local_world_size) + + group_rank = global_rank // local_world_size + group_world_size = (world_size + local_world_size - 1) // local_world_size + + env = { + # "MASTER_ADDR": get_master_addr(), + # "MASTER_PORT": str(20101), + "RANK": str(global_rank), + "LOCAL_RANK": str(local_rank), + + + # Note that local_world_size is already set. + + "GROUP_RANK": str(group_rank), + "GROUP_WORLD_SIZE": str(group_world_size), + + "ROLE_RANK": str(global_rank), + "ROLE_WORLD_SIZE": str(world_size), + "ROLE_NAME": "rank", + + "WORLD_SIZE": str(world_size), + } + print(f" AHMAD: {global_rank=} {env}") + os.environ.update(env) + if global_rank == 0: + print(f" AHMAD: {global_rank=} {os.environ}") + + +class TrainerActorWrapper(Actor): + def __init__(self, job_config: JobConfig): + self.job_config = job_config + self.rank = current_rank().rank + hostname = socket.gethostname() + print(f" ===> AHMAD: {self.rank} {hostname=} {current_rank()=}") + pretend_you_are_torchrun(self.rank) + + @endpoint + def train(self): + print("Starting training") + pretend_you_are_torchrun(self.rank) + config = self.job_config + trainer: Optional[Trainer] = None + + try: + trainer = Trainer(config) + # trainer = self.trainer + tid = threading.get_native_id() + logger.error(f"AHMAD tid in train: {self.rank=} {tid=}") + trainer.train() + + if config.checkpoint.create_seed_checkpoint: + assert ( + int(os.environ["WORLD_SIZE"]) == 1 + ), "Must create seed checkpoint using a single device, to disable sharding." + assert ( + config.checkpoint.enable_checkpoint + ), "Must enable checkpointing when creating a seed checkpoint." + trainer.checkpointer.save(curr_step=0, force=True) + logger.info("Created seed checkpoint") + else: + trainer.train() + finally: + if trainer: + trainer.close() + + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + logger.info("Process group destroyed.") + print("Done training") + +async def async_main(job_config: JobConfig): + torch.use_deterministic_algorithms(True) + local_world_size = int(os.environ["LOCAL_WORLD_SIZE"]) + num_hosts = int(os.environ["NUM_HOSTS"]) + master_addr = os.environ["MASTER_ADDR"] + master_port = os.environ["MASTER_PORT"] + world_size = local_world_size * num_hosts + + local_proc_mesh = await proc_mesh( + gpus=local_world_size, + env={ + "MASTER_ADDR": master_addr, + "MASTER_PORT": master_port, + }, + ) + print(job_config) + trainer_actor = await local_proc_mesh.spawn( + "trainer_actor", TrainerActorWrapper, job_config + ) + await trainer_actor.train.call() + + +if __name__ == "__main__": + init_logger() + config_manager = ConfigManager() + config = config_manager.parse_args() + asyncio.run(async_main(config)) + sys.exit(0) diff --git a/torchtitan/train_monarch2.py b/torchtitan/train_monarch2.py new file mode 100644 index 000000000..944c4c516 --- /dev/null +++ b/torchtitan/train_monarch2.py @@ -0,0 +1,171 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +import asyncio +import socket +import importlib +import os +import pickle +import threading +import sys +import time +from datetime import timedelta +from logging import getLogger +from typing import Any, Generator, Iterable, Optional +import torch +import torchtitan.components.ft as ft +import torchtitan.protocols.train_spec as train_spec_module +from monarch._rust_bindings.monarch_hyperactor.proc_mesh import ProcMesh as HyProcMesh +from monarch.actor_mesh import Actor, current_rank, endpoint +from monarch.proc_mesh import proc_mesh, ProcMesh +from monarch_meta._monarch_meta import hyperactor_meta +from torchtitan.config_manager import ConfigManager, JobConfig +from torchtitan.tools.logging import init_logger, logger +from monarch.allocator import RemoteAllocator, StaticRemoteAllocInitializer, RemoteAllocInitializer +from monarch._rust_bindings.hyperactor_extension.alloc import ( + AllocConstraints, + AllocSpec, +) +from .train import Trainer + +def pretend_you_are_torchrun(global_rank): + """ + Eventually, Monarch should handle all of this, but it's necessary for now because the job is + not running torchrun. Also there are already better ways to avoid hardcoding this, but + it's a demo and we'll live for now. + """ + # task_id = int(os.environ["TW_TASK_ID"]) + # global_rank = task_id * 8 + (global_rank % 8) + task_id = int(os.environ["SLURM_NODEID"]) + local_world_size = int(os.environ["LOCAL_WORLD_SIZE"]) + num_hosts = int(os.environ["NUM_HOSTS"]) + + # Ahmad commented out this because this is only needed for + # multiple controllers. + #global_rank = task_id * local_world_size + global_rank + + world_size = num_hosts * local_world_size + local_rank = min(world_size, global_rank % local_world_size) + + group_rank = global_rank // local_world_size + group_world_size = (world_size + local_world_size - 1) // local_world_size + + env = { + # "MASTER_ADDR": get_master_addr(), + # "MASTER_PORT": str(20101), + "RANK": str(global_rank), + "LOCAL_RANK": str(local_rank), + + + # Note that local_world_size is already set. + + "GROUP_RANK": str(group_rank), + "GROUP_WORLD_SIZE": str(group_world_size), + + "ROLE_RANK": str(global_rank), + "ROLE_WORLD_SIZE": str(world_size), + "ROLE_NAME": "rank", + + "WORLD_SIZE": str(world_size), + } + print(f" AHMAD: {global_rank=} {env}") + os.environ.update(env) + if global_rank == 0: + print(f" AHMAD: {global_rank=} {os.environ}") + + +class TrainerActorWrapper(Actor): + def __init__(self, job_config: JobConfig, env_to_merge={}): + self.job_config = job_config + self.rank = current_rank().rank + hostname = socket.gethostname() + print(f" ===> AHMAD: {self.rank} {hostname=} {current_rank()=}") + pretend_you_are_torchrun(self.rank) + os.environ.update(env_to_merge) + + @endpoint + def train(self): + print("Starting training") + pretend_you_are_torchrun(self.rank) + config = self.job_config + trainer: Optional[Trainer] = None + + try: + trainer = Trainer(config) + # trainer = self.trainer + tid = threading.get_native_id() + logger.error(f"AHMAD tid in train: {self.rank=} {tid=}") + trainer.train() + + if config.checkpoint.create_seed_checkpoint: + assert ( + int(os.environ["WORLD_SIZE"]) == 1 + ), "Must create seed checkpoint using a single device, to disable sharding." + assert ( + config.checkpoint.enable_checkpoint + ), "Must enable checkpointing when creating a seed checkpoint." + trainer.checkpointer.save(curr_step=0, force=True) + logger.info("Created seed checkpoint") + else: + trainer.train() + finally: + if trainer: + trainer.close() + + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + logger.info("Process group destroyed.") + print("Done training") + +class MyAllocInitializer(RemoteAllocInitializer): + def __init__(self, l): + super().__init__() + self.l = l + async def initialize_alloc(self) -> list[str]: + return self.l + +async def async_main(job_config: JobConfig): + torch.use_deterministic_algorithms(True) + local_world_size = int(os.environ["LOCAL_WORLD_SIZE"]) + num_hosts = int(os.environ["NUM_HOSTS"]) + slurm_job_nodes = os.environ["SLURM_JOB_NODES"] + world_size = local_world_size * num_hosts + + node_ips = slurm_job_nodes.split(" ") + all_nodes = [f"tcp!{n}:26600" for n in node_ips] + proc_mesh = None + env_to_merge = {} + if len(all_nodes): + hosts = num_hosts + gpus = local_world_size + spec = AllocSpec(AllocConstraints(), host=hosts, gpu=gpus) + print(f"AHMAD: {all_nodes=}\n{node_ips=}") + allocator = RemoteAllocator(world_id="test_remote_allocator", initializer=MyAllocInitializer(all_nodes)) + alloc = await allocator.allocate(spec) + proc_mesh = await ProcMesh.from_alloc(alloc) + env_to_merge["MASTER_ADDR"] = str(node_ips[0]) + env_to_merge["MASTER_PORT"] = str(12347) + else: + proc_mesh = await proc_mesh( + gpus=local_world_size, + env={ + "MASTER_ADDR": master_addr, + "MASTER_PORT": master_port, + }, + ) + + print(job_config) + trainer_actor = await proc_mesh.spawn( + "trainer_actor", TrainerActorWrapper, job_config, env_to_merge + ) + await trainer_actor.train.call() + + +if __name__ == "__main__": + init_logger() + config_manager = ConfigManager() + config = config_manager.parse_args() + asyncio.run(async_main(config)) + sys.exit(0)