Skip to content

Commit 66c6560

Browse files
authored
Merge branch 'master' into pagaray_master_tweak
2 parents a513055 + 5dc19c8 commit 66c6560

File tree

8 files changed

+99
-26
lines changed

8 files changed

+99
-26
lines changed

launcher_scripts/conf/config.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ env_vars:
5656
TRANSFORMERS_OFFLINE: 1
5757
TORCH_NCCL_AVOID_RECORD_STREAMS: 1
5858
NCCL_NVLS_ENABLE: 0
59-
NVTE_APPLY_QK_LAYER_SCALING: 1
6059

6160
# GPU Mapping
6261
numa_mapping:

launcher_scripts/conf/data_preparation/generic/custom_dataset.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ bpe_save_dir: ${.custom_dataset_dir}/bpe # Dir to save sentence piece tokenizer
2626
preprocess_data: True # True to preprocess the data from json, jsonl or json.gz files, False otherwise.
2727
raw_dataset_files: # Either a string (path to dataset folder) or a list (of files)
2828
- null # Each file should be input json, jsonl or json.gz file
29+
tokenizer_library: sentencepiece # Name of the tokenizer library, such as "sentencepiece" or "megatron"
30+
tokenizer_type: null # Type of tokenizer to use if not training a tokenizer from scratch, such as "GPT2BPETokenizer"
2931
tokenizer_model: ${.bpe_save_dir}/${data_preparation.train_tokenizer_args.model_prefix}.model # trained SentencePiece tokenizer model
32+
vocab_file: null # Path to a vocab file if using BPE tokenizer. Leave "null" if not using BPE.
33+
merges_file: null # Path to a merges file if using BPE tokenizer. Leave "null" if not using BPE.
3034
preprocess_worker_mapping: ${.custom_dataset_dir}/preprocess_mapping
31-
preprocessed_dir: ${.custom_dataset_dir}/preprocessed
35+
preprocessed_dir: ${.custom_dataset_dir}/preprocessed

launcher_scripts/nemo_launcher/collections/dataprep_scripts/custom_dataprep/preprocess.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,41 @@
5050
type=int,
5151
)
5252
parser.add_argument("--bcp", action="store_true", help="Whether on BCP platform")
53+
parser.add_argument(
54+
"--vocab-file",
55+
default=None,
56+
help="If using BPE tokenizer, specify the path to a vocab file. Keep None if not using BPE.",
57+
type=str,
58+
)
59+
parser.add_argument(
60+
"--merges-file",
61+
default=None,
62+
help="If using BPE tokenizer, specify the path to a merges file. Keep None if not using BPE.",
63+
type=str,
64+
)
65+
parser.add_argument(
66+
"--tokenizer-library",
67+
default="sentencepiece",
68+
help="Name of the tokenizer library, such as sentencepiece or megatron",
69+
type=str,
70+
)
71+
parser.add_argument(
72+
"--tokenizer-type",
73+
default=None,
74+
help="Name of the tokenizer type to use, such as GPT2BPETokenizer",
75+
type=str,
76+
)
77+
parser.add_argument(
78+
"--dataset-impl",
79+
default="mmap",
80+
help="Specify how the dataset is stored and will be processed.",
81+
type=str,
82+
)
5383
args, other_args = parser.parse_known_args()
5484

5585
workers_per_node = args.workers_per_node # local world size
5686
if args.bcp:
57-
global_rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", 0))
87+
global_rank = int(os.environ.get("RANK", 0))
5888
task_id = global_rank // workers_per_node
5989
rank = global_rank % workers_per_node
6090
else: # on slurm based platforms
@@ -82,12 +112,25 @@
82112
print(
83113
f" ****** Task ID {task_id:02d} Rank {rank:02d} starts to preprocess {os.path.basename(split)}..."
84114
)
85-
input_arg = ["--input", split]
86-
output_arg = [
87-
"--output-prefix",
88-
os.path.join(args.output_path, os.path.basename(split)),
115+
input_arg = split
116+
output_arg = os.path.join(args.output_path, os.path.basename(split))
117+
118+
flags = [
119+
f"--input={split}",
120+
f"--output-prefix={output_arg}",
121+
f"--dataset-impl={args.dataset_impl}",
122+
f"--tokenizer-library={args.tokenizer_library}",
123+
f"--tokenizer-type={args.tokenizer_type}",
89124
]
90-
subprocess.check_call(cmd + input_arg + output_arg + other_args)
125+
126+
if args.vocab_file and args.merges_file:
127+
flags += [
128+
f"--vocab={args.vocab_file}",
129+
f"--merge-file={args.merges_file}",
130+
f"--append-eod",
131+
]
132+
133+
subprocess.check_call(cmd + flags + other_args)
91134
print(
92135
f" ****** Task ID {task_id:02d} Rank {rank:02d} finished preprocessing {os.path.basename(split)}..."
93136
)

launcher_scripts/nemo_launcher/collections/dataprep_scripts/mc4_dataprep/preprocess.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858

5959
workers_per_node = args.workers_per_node # local world size
6060
if args.bcp:
61-
global_rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", 0))
61+
global_rank = int(os.environ.get("RANK", 0))
6262
task_id = global_rank // workers_per_node
6363
rank = global_rank % workers_per_node
6464
else: # on slurm based platforms

launcher_scripts/nemo_launcher/collections/dataprep_scripts/pile_dataprep/preprocess.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,16 @@ def main(cfg):
106106
elif cfg.get("cluster_type") in ["bcp", "k8s"]:
107107
file_numbers = cfg.get("file_numbers")
108108
files_list = utils.convert_file_numbers(file_numbers)
109-
# Assumes launched via mpirun:
110-
# mpirun -N <nnodes> -npernode 1 ...
111-
wrank = int(os.environ.get("OMPI_COMM_WORLD_RANK", 0))
112-
wsize = int(os.environ.get("OMPI_COMM_WORLD_SIZE", 0))
113-
lrank = int(os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK", 0))
109+
if cfg.get("cluster_type") == "bcp":
110+
wrank = int(os.environ.get("RANK", 0))
111+
wsize = int(os.environ.get("WORLD_SIZE", 0))
112+
lrank = int(os.environ.get("LOCAL_RANK", 0))
113+
else:
114+
# Assumes launched via mpirun:
115+
# mpirun -N <nnodes> -npernode 1 ...
116+
wrank = int(os.environ.get("OMPI_COMM_WORLD_RANK", 0))
117+
wsize = int(os.environ.get("OMPI_COMM_WORLD_SIZE", 0))
118+
lrank = int(os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK", 0))
114119

115120
if lrank == 0:
116121
# Compile once per node. Should be one container instance per node.

launcher_scripts/nemo_launcher/collections/eval_harness/download.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def parse_args(parser_main):
2626
# parser = argparse.ArgumentParser()
2727
parser = parser_main.add_argument_group(title="download-tasks")
2828
parser.add_argument("--tasks", default="all_tasks")
29-
parser.add_argument("--cache_dir", default="")
29+
parser.add_argument("--cache-dir", default="")
3030
# return parser.parse_args()
3131
return parser_main
3232

launcher_scripts/nemo_launcher/core/data_stages.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,6 @@ def _make_cluster_parameters(
152152
env_vars[
153153
"PYTHONPATH"
154154
] = f"{self._launcher_scripts_path}:${{PYTHONPATH}}" # Required by pile download
155-
env_vars["NGC_ARRAY_TYPE"] = "MPIJob" # Required by BCP
156155
setup = [f"export {k}={v}" for k, v in env_vars.items()]
157156

158157
cluster_parameters = {}
@@ -369,7 +368,6 @@ def _make_private_cluster_parameters(self, cluster: str, sub_stage: str) -> Dict
369368
return {
370369
"nodes": node_array_size,
371370
"ntasks_per_node": bcp_preproc_npernode,
372-
"bcp_launcher": "'mpirun --allow-run-as-root'",
373371
}
374372
return {}
375373

@@ -499,7 +497,6 @@ def _make_private_cluster_parameters(self, cluster: str, sub_stage: str) -> Dict
499497
return {
500498
"nodes": node_array_size,
501499
"ntasks_per_node": ntasks_per_node,
502-
"bcp_launcher": "'mpirun --allow-run-as-root'",
503500
}
504501
return {}
505502

@@ -588,6 +585,26 @@ def _make_sub_stages(self) -> List[str]:
588585
sub_stages += ["preprocess"]
589586
return sub_stages
590587

588+
def _filter_raw_json_files(self, raw_dataset_files: list) -> List:
589+
"""
590+
Filter the input dataset files to only include json files and derivatives.
591+
592+
:param list raw_dataset_files: List of the raw dataset files specified in the config
593+
:return: a list of only the json files in the dataset.
594+
:rtype: list
595+
"""
596+
if isinstance(raw_dataset_files, omegaconf.listconfig.ListConfig):
597+
return raw_dataset_files
598+
599+
filtered_files = []
600+
601+
for raw_file in os.listdir(raw_dataset_files):
602+
# Only select files that end in .jsonl
603+
if not Path(raw_file).suffix.lower() in [".json", ".jsonl", "json.gz"]:
604+
continue
605+
filtered_files.append(os.path.join(raw_dataset_files, raw_file))
606+
return filtered_files
607+
591608
def setup_folder_and_data(self) -> None:
592609
"""Setup job/data folders and fine-tuning/prompt-learning dataset"""
593610
job_path = self.get_job_path()
@@ -602,11 +619,8 @@ def setup_folder_and_data(self) -> None:
602619
preprocess_worker_mapping = data_cfg.get("preprocess_worker_mapping")
603620

604621
if data_cfg.get("preprocess_data", False):
605-
if not isinstance(raw_dataset_files, omegaconf.listconfig.ListConfig):
606-
raw_dataset_files = [
607-
os.path.join(raw_dataset_files, raw_file)
608-
for raw_file in os.listdir(raw_dataset_files)
609-
]
622+
raw_dataset_files = self._filter_raw_json_files(raw_dataset_files)
623+
610624
# Sort list of files in directory by size
611625
sorted_files = sorted(raw_dataset_files, key=lambda x: os.stat(x).st_size)
612626
file_sizes = [os.stat(x).st_size for x in sorted_files]
@@ -674,14 +688,14 @@ def _make_private_cluster_parameters(self, cluster: str, sub_stage: str) -> Dict
674688
return {
675689
"nodes": node_array_size,
676690
"ntasks_per_node": ntasks_per_node,
677-
"bcp_launcher": "'mpirun --allow-run-as-root'",
678691
}
679692
return {}
680693

681694
def _make_sub_stage_command(self, sub_stage: str) -> List[str]:
682695
"""Make a command of the specified sub-stage"""
683696
data_cfg = self.stage_cfg
684697
run_cfg = data_cfg.get("run")
698+
cluster_type = self.cfg.cluster_type
685699

686700
if sub_stage == "train_tokenizer":
687701
bpe_save_dir = Path(data_cfg.get("bpe_save_dir"))
@@ -699,14 +713,23 @@ def _make_sub_stage_command(self, sub_stage: str) -> List[str]:
699713
output_path=data_cfg.get("preprocessed_dir"),
700714
workers_per_node=run_cfg.get("workers_per_node"),
701715
worker_mapping_file=data_cfg.get("preprocess_worker_mapping"),
702-
tokenizer_library="sentencepiece",
716+
tokenizer_library=data_cfg.get("tokenizer_library"),
703717
tokenizer_model=data_cfg.get("tokenizer_model"),
718+
tokenizer_type=data_cfg.get("tokenizer_type"),
704719
dataset_impl="mmap",
705720
log_interval="2000",
706721
apply_ftfy="store_true",
707722
workers=run_cfg.get("cpus_per_node") // run_cfg.get("workers_per_node"),
708723
)
709724

725+
if cluster_type == "bcp":
726+
args += create_args_list(bcp="store_true")
727+
728+
if data_cfg.vocab_file and data_cfg.merges_file:
729+
args += create_args_list(
730+
vocab_file=data_cfg.vocab_file, merges_file=data_cfg.merges_file
731+
)
732+
710733
sub_stage_command = [f"python3 -u {code_path}", *args]
711734
sub_stage_command = " \\\n ".join(sub_stage_command)
712735
return [sub_stage_command]

launcher_scripts/tests/unit_tests/config_tests/test_main_config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def test_config(self):
6363
TRANSFORMERS_OFFLINE: 1
6464
TORCH_NCCL_AVOID_RECORD_STREAMS: 1
6565
NCCL_NVLS_ENABLE: 0
66-
NVTE_APPLY_QK_LAYER_SCALING: 1
6766
6867
# GPU Mapping
6968
numa_mapping:

0 commit comments

Comments
 (0)