From 023a6f28b82e4e55c703acabaa8c898f381d1036 Mon Sep 17 00:00:00 2001 From: nik-mosaic Date: Mon, 18 Dec 2023 23:01:38 -0800 Subject: [PATCH 1/5] fix multigpu trt race condition --- .../in_context_learning_evaluation.py | 76 +++++++++++++++---- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/composer/datasets/in_context_learning_evaluation.py b/composer/datasets/in_context_learning_evaluation.py index c98cfcd1c6..8090e22af3 100644 --- a/composer/datasets/in_context_learning_evaluation.py +++ b/composer/datasets/in_context_learning_evaluation.py @@ -8,6 +8,7 @@ import os import random from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +import time import torch import transformers @@ -21,6 +22,12 @@ if TYPE_CHECKING: import transformers +try: + import tensorrt_llm + TENSORRT_LLM = True +except: + TENSORRT_LLM = False + # Allow models to have slightly more tokens than were used in the most verbose CoT in the dataset _MAX_ANSWER_BUFFER_LENGTH = 10 @@ -160,9 +167,16 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: + if TENSORRT_LLM == False: + with dist.local_rank_zero_download_and_wait(destination_path): + if dist.get_local_rank() == 0: + get_file(dataset_uri, destination_path, overwrite=True) + else: + if tensorrt_llm.mpi_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) + else: + while not os.path.exists(destination_path): + time.sleep(0.1) dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) self.samples = self._read_dataset(dataset) self.samples = strip_data(self.samples) @@ -379,9 +393,16 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: + if TENSORRT_LLM == False: + with dist.local_rank_zero_download_and_wait(destination_path): + if dist.get_local_rank() == 0: + get_file(dataset_uri, destination_path, overwrite=True) + else: + if tensorrt_llm.mpi_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) + else: + while not os.path.exists(destination_path): + time.sleep(0.1) dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) self.samples = list( dataset.map(lambda examples: { @@ -543,10 +564,16 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: + if TENSORRT_LLM == False: + with dist.local_rank_zero_download_and_wait(destination_path): + if dist.get_local_rank() == 0: + get_file(dataset_uri, destination_path, overwrite=True) + else: + if tensorrt_llm.mpi_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) + else: + while not os.path.exists(destination_path): + time.sleep(0.1) dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) self.samples = list( dataset.map(lambda examples: { @@ -771,10 +798,16 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: + if TENSORRT_LLM == False: + with dist.local_rank_zero_download_and_wait(destination_path): + if dist.get_local_rank() == 0: + get_file(dataset_uri, destination_path, overwrite=True) + else: + if tensorrt_llm.mpi_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) + else: + while not os.path.exists(destination_path): + time.sleep(0.1) dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) self.samples = list( dataset.map( @@ -954,9 +987,17 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: + if TENSORRT_LLM == False: + with dist.local_rank_zero_download_and_wait(destination_path): + if dist.get_local_rank() == 0: + get_file(dataset_uri, destination_path, overwrite=True) + else: + if tensorrt_llm.mpi_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) + else: + while not os.path.exists(destination_path): + time.sleep(0.1) + dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) self.samples = list( dataset.map( @@ -1281,9 +1322,16 @@ def partition_dataset_by_category(dataset_uri: str, destination_path: str) -> Di raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: + if TENSORRT_LLM == False: + with dist.local_rank_zero_download_and_wait(destination_path): + if dist.get_local_rank() == 0: + get_file(dataset_uri, destination_path, overwrite=True) + else: + if tensorrt_llm.mpi_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) + else: + while not os.path.exists(destination_path): + time.sleep(0.1) dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) if 'category' not in dataset.features.keys(): raise Exception( From f40c66cf15946200ad020049048dc03437e0d18b Mon Sep 17 00:00:00 2001 From: nik-mosaic Date: Thu, 11 Jan 2024 01:15:45 -0800 Subject: [PATCH 2/5] Change if and remove padding --- .../in_context_learning_evaluation.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/composer/datasets/in_context_learning_evaluation.py b/composer/datasets/in_context_learning_evaluation.py index 8090e22af3..75eba5ccac 100644 --- a/composer/datasets/in_context_learning_evaluation.py +++ b/composer/datasets/in_context_learning_evaluation.py @@ -24,9 +24,12 @@ try: import tensorrt_llm - TENSORRT_LLM = True + if tensorrt_llm.mpi_world_size() > 1: + TRTLLM_MULTIGPU = True + else: + TRTLLM_MULTIGPU = False except: - TENSORRT_LLM = False + TRTLLM_MULTIGPU = False # Allow models to have slightly more tokens than were used in the most verbose CoT in the dataset _MAX_ANSWER_BUFFER_LENGTH = 10 @@ -167,7 +170,7 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - if TENSORRT_LLM == False: + if TRTLLM_MULTIGPU == False: with dist.local_rank_zero_download_and_wait(destination_path): if dist.get_local_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) @@ -302,8 +305,8 @@ def collate_fn(self, data): context_enc = preamble['input_ids'] + context['input_ids'] inp, _ = _make_padded_input(context_enc, [], self.max_seq_len - self.max_answer_length, - self.pad_tok_id, - padding_side=self.padding_side) + self.pad_tok_id) + # padding_side=self.padding_side) inputs.append(inp) answers.append(aliases) @@ -393,7 +396,7 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - if TENSORRT_LLM == False: + if TRTLLM_MULTIGPU == False: with dist.local_rank_zero_download_and_wait(destination_path): if dist.get_local_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) @@ -564,7 +567,7 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - if TENSORRT_LLM == False: + if TRTLLM_MULTIGPU == False: with dist.local_rank_zero_download_and_wait(destination_path): if dist.get_local_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) @@ -798,7 +801,7 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - if TENSORRT_LLM == False: + if TRTLLM_MULTIGPU == False: with dist.local_rank_zero_download_and_wait(destination_path): if dist.get_local_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) @@ -987,7 +990,7 @@ def __init__( raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - if TENSORRT_LLM == False: + if TRTLLM_MULTIGPU == False: with dist.local_rank_zero_download_and_wait(destination_path): if dist.get_local_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) @@ -1322,7 +1325,7 @@ def partition_dataset_by_category(dataset_uri: str, destination_path: str) -> Di raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='datasets', conda_channel='conda-forge') from e - if TENSORRT_LLM == False: + if TRTLLM_MULTIGPU == False: with dist.local_rank_zero_download_and_wait(destination_path): if dist.get_local_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) From 94e7d3f8cb85e5b50ce4e0ace163f9c59faeff74 Mon Sep 17 00:00:00 2001 From: nik-mosaic Date: Thu, 11 Jan 2024 04:43:09 -0800 Subject: [PATCH 3/5] Do not initialize dist --- composer/trainer/trainer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/trainer/trainer.py b/composer/trainer/trainer.py index 8497e1f41b..58a7b37b93 100644 --- a/composer/trainer/trainer.py +++ b/composer/trainer/trainer.py @@ -960,7 +960,7 @@ def __init__( assert not isinstance(device_train_microbatch_size, str) # Distributed - dist.initialize_dist(device, dist_timeout) + # dist.initialize_dist(device, dist_timeout) # Reproducibility rank_zero_seed, seed = _distribute_and_get_random_seed(seed, device) From 4e0653815379607e5dcbadd2bfcbc49d44005243 Mon Sep 17 00:00:00 2001 From: nik-mosaic Date: Mon, 11 Mar 2024 11:42:55 -0700 Subject: [PATCH 4/5] Update icl --- .../in_context_learning_evaluation.py | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/composer/datasets/in_context_learning_evaluation.py b/composer/datasets/in_context_learning_evaluation.py index 313ff464b3..f2caa473a8 100644 --- a/composer/datasets/in_context_learning_evaluation.py +++ b/composer/datasets/in_context_learning_evaluation.py @@ -8,8 +8,8 @@ import json import os import random -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union import time +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union import torch from torch.utils.data import DataLoader, Dataset @@ -217,20 +217,20 @@ def _get_fewshot_sample_idxs(dataset_size: int, num_fewshot: int, example_idx: i fewshot_idxs.add(replacement_sample) return fewshot_idxs - + def _rank_zero_download(dataset_uri, destination_path): - if TRTLLM_MULTIGPU == TRUE: - if tensorrt_llm.mpi_rank() == 0: - get_file(dataset_uri, destination_path, overwrite=True) - else: - while not os.path.exists(destination_path): - time.sleep(0.1) - else: - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: + if TRTLLM_MULTIGPU == True: + if tensorrt_llm.mpi_rank() == 0: get_file(dataset_uri, destination_path, overwrite=True) - - + else: + while not os.path.exists(destination_path): + time.sleep(0.1) + else: + with dist.local_rank_zero_download_and_wait(destination_path): + if dist.get_local_rank() == 0: + get_file(dataset_uri, destination_path, overwrite=True) + + class InContextLearningDataset(Dataset): """ A base dataset that constructs batches for in-context learning task evaluations. @@ -426,9 +426,7 @@ def read_dataset( assert isinstance(dataset, HFDataset) dataset = dataset.map(dataset_parsing_func, remove_columns=dataset.column_names) else: - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: - get_file(dataset_uri, destination_path, overwrite=True) + _rank_zero_download(dataset_uri, destination_path) dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) assert isinstance(dataset, HFDataset) return dataset @@ -1653,9 +1651,7 @@ def partition_dataset_by_category(dataset_uri: str, destination_path: str, hf_lo assert hasattr(dataset, 'column_names') dataset = dataset.map(dataset_parsing_func, remove_columns=dataset.column_names) else: - with dist.local_rank_zero_download_and_wait(destination_path): - if dist.get_local_rank() == 0: - get_file(dataset_uri, destination_path, overwrite=True) + _rank_zero_download(dataset_uri, destination_path) dataset = load_dataset('json', data_files=destination_path, split='train', streaming=False) assert isinstance(dataset, HFDataset) or isinstance(dataset, IterableDataset) assert hasattr(dataset, 'features') From 636457fe64e0eac73e1adf54619c26db53208dde Mon Sep 17 00:00:00 2001 From: nik-mosaic Date: Mon, 15 Jul 2024 21:04:29 -0700 Subject: [PATCH 5/5] update with small fixes --- composer/datasets/in_context_learning_evaluation.py | 2 +- composer/trainer/_scaler.py | 8 +++++++- composer/trainer/trainer.py | 8 +++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/composer/datasets/in_context_learning_evaluation.py b/composer/datasets/in_context_learning_evaluation.py index 80e2075b01..d7c0672f9c 100644 --- a/composer/datasets/in_context_learning_evaluation.py +++ b/composer/datasets/in_context_learning_evaluation.py @@ -724,7 +724,7 @@ def __init__( tensor_keys = ['input_ids', 'attention_mask'] list_keys = ['labels'] super().__init__( - padding_side='left', + padding_side='right', tokenize_labels=False, static_keys=static_keys, list_keys=list_keys, diff --git a/composer/trainer/_scaler.py b/composer/trainer/_scaler.py index e36057b1b5..eb45443d2b 100644 --- a/composer/trainer/_scaler.py +++ b/composer/trainer/_scaler.py @@ -5,9 +5,15 @@ from typing import Optional, Union import torch -from torch.cuda.amp.grad_scaler import GradScaler, OptState, _refresh_per_optimizer_state +from torch.cuda.amp.grad_scaler import GradScaler, OptState from torch.optim import Optimizer +from packaging import version +if version.parse(torch.__version__) >= version.parse('2.2.9'): + from torch.amp.grad_scaler import _refresh_per_optimizer_state # type: ignore +else: + from torch.cuda.amp.grad_scaler import _refresh_per_optimizer_state # type: ignore + from composer.utils import dist __all__ = ['ClosureGradScaler'] diff --git a/composer/trainer/trainer.py b/composer/trainer/trainer.py index 46843efa50..915e76d95a 100644 --- a/composer/trainer/trainer.py +++ b/composer/trainer/trainer.py @@ -42,7 +42,13 @@ import torch.nn as nn import torch.utils.data from torch._dynamo import OptimizedModule -from torch.cuda.amp.grad_scaler import GradScaler, _refresh_per_optimizer_state + +from packaging import version +if version.parse(torch.__version__) >= version.parse('2.2.9'): + from torch.amp.grad_scaler import _refresh_per_optimizer_state # type: ignore +else: + from torch.cuda.amp.grad_scaler import _refresh_per_optimizer_state # type: ignore + from torch.distributed.fsdp import FullyShardedDataParallel from torch.distributed.fsdp._runtime_utils import _post_backward_final_callback from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler