diff --git a/README.md b/README.md index 85d085ec..7e685abf 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Downloads](https://static.pepy.tech/badge/dicee)](https://pepy.tech/project/dicee) [![Downloads](https://img.shields.io/pypi/dm/dicee)](https://pypi.org/project/dicee/) [![Coverage](https://img.shields.io/badge/coverage-54%25-green)](https://dice-group.github.io/dice-embeddings/usage/main.html#coverage-report) [![Pypi](https://img.shields.io/badge/pypi-0.1.4-blue)](https://pypi.org/project/dicee/0.1.4/) diff --git a/dicee/config.py b/dicee/config.py index 22cd7844..5c5e69a8 100644 --- a/dicee/config.py +++ b/dicee/config.py @@ -85,7 +85,6 @@ def __init__(self, **kwargs): self.label_smoothing_rate: float = 0.0 - self.num_core: int = 0 """Number of CPUs to be used in the mini-batch loading process""" @@ -139,6 +138,9 @@ def __init__(self, **kwargs): self.continual_learning=None "Path of a pretrained model size of LLM" + self.auto_batch_finding=False + "A flag for using auto batch finding" + def __iter__(self): # Iterate for k, v in self.__dict__.items(): diff --git a/dicee/models/ensemble.py b/dicee/models/ensemble.py index e6c1e5de..2cf4774e 100644 --- a/dicee/models/ensemble.py +++ b/dicee/models/ensemble.py @@ -1,11 +1,6 @@ import torch import copy -import torch._dynamo - -torch._dynamo.config.suppress_errors = True - - class EnsembleKGE: def __init__(self, seed_model): self.models = [] @@ -13,13 +8,13 @@ def __init__(self, seed_model): self.loss_history = [] for i in range(torch.cuda.device_count()): i_model=copy.deepcopy(seed_model) - i_model.to(torch.device(f"cuda:{i}")) # TODO: Why we cant send the compile model to cpu ? - # i_model = torch.compile(i_model) + #i_model = torch.compile(i_model) + i_model.to(torch.device(f"cuda:{i}")) self.optimizers.append(i_model.configure_optimizers()) self.models.append(i_model) # Maybe use the original model's name ? - self.name="TP_"+self.models[0].name + self.name=self.models[0].name self.train_mode=True def named_children(self): @@ -87,7 +82,25 @@ def __call__(self,x_batch): def step(self): for opt in self.optimizers: opt.step() - + + def get_embeddings(self): + entity_embeddings=[] + relation_embeddings=[] + # () Iterate + for trained_model in self.models: + entity_emb, relation_ebm = trained_model.get_embeddings() + entity_embeddings.append(entity_emb) + if relation_ebm is not None: + relation_embeddings.append(relation_ebm) + # () Concat the embedding vectors horizontally. + entity_embeddings=torch.cat(entity_embeddings,dim=1) + if relation_embeddings: + relation_embeddings=torch.cat(relation_embeddings,dim=1) + else: + relation_embeddings=None + + return entity_embeddings, relation_embeddings + """ def __getattr__(self, name): # Create a function that will call the same attribute/method on each model diff --git a/dicee/sanity_checkers.py b/dicee/sanity_checkers.py index 70dc94b2..1e1463aa 100644 --- a/dicee/sanity_checkers.py +++ b/dicee/sanity_checkers.py @@ -32,11 +32,11 @@ def validate_knowledge_graph(args): elif args.path_single_kg is not None: if args.sparql_endpoint is not None or args.path_single_kg is not None: - print(f'The dataset_dir and sparql_endpoint arguments ' - f'must be None if path_single_kg is given.' - f'***{args.dataset_dir}***\n' - f'***{args.sparql_endpoint}***\n' - f'These two parameters are set to None.') + #print(f'The dataset_dir and sparql_endpoint arguments ' + # f'must be None if path_single_kg is given.' + # f'***{args.dataset_dir}***\n' + # f'***{args.sparql_endpoint}***\n' + # f'These two parameters are set to None.') args.dataset_dir = None args.sparql_endpoint = None @@ -61,11 +61,11 @@ def validate_knowledge_graph(args): f"Use --path_single_kg **folder/dataset.format**, if you have a single file.") if args.sparql_endpoint is not None or args.path_single_kg is not None: - print(f'The sparql_endpoint and path_single_kg arguments ' - f'must be None if dataset_dir is given.' - f'***{args.sparql_endpoint}***\n' - f'***{args.path_single_kg}***\n' - f'These two parameters are set to None.') + #print(f'The sparql_endpoint and path_single_kg arguments ' + # f'must be None if dataset_dir is given.' + # f'***{args.sparql_endpoint}***\n' + # f'***{args.path_single_kg}***\n' + # f'These two parameters are set to None.') args.sparql_endpoint = None args.path_single_kg = None diff --git a/dicee/scripts/run.py b/dicee/scripts/run.py index cc95af50..2b6e6c7a 100755 --- a/dicee/scripts/run.py +++ b/dicee/scripts/run.py @@ -123,6 +123,9 @@ def get_default_arguments(description=None): parser.add_argument("--swa", action="store_true", help="Stochastic weight averaging") + parser.add_argument("--auto_batch_finding", + action="store_true", + help="Find a batch size fitting in GPUs. Only available for TP trainer") parser.add_argument('--degree', type=int, default=0, help='degree for polynomial embeddings') parser.add_argument('--disable_checkpointing', action='store_true', help='Disable creation of checkpoints during training') diff --git a/dicee/static_funcs.py b/dicee/static_funcs.py index e6b20168..7627fa13 100644 --- a/dicee/static_funcs.py +++ b/dicee/static_funcs.py @@ -684,16 +684,15 @@ def download_pretrained_model(url: str) -> str: download_files_from_url(url_to_download_from, destination_folder=dir_name) return dir_name -def write_csv_from_model_parallel(path: str) -> None: +def write_csv_from_model_parallel(path: str) : """Create""" assert os.path.exists(path), "Path does not exist" - # Detect files that start with model_ and end with .pt model_files = [f for f in os.listdir(path) if f.startswith("model_") and f.endswith(".pt")] model_files.sort() # Sort to maintain order if necessary (e.g., model_0.pt, model_1.pt) - entity_csv_path = os.path.join(path, "entity_embeddings.csv") - relation_csv_path = os.path.join(path, "relation_embeddings.csv") + entity_embeddings=[] + relation_embeddings=[] # Process each model file for model_file in model_files: @@ -702,65 +701,50 @@ def write_csv_from_model_parallel(path: str) -> None: model = torch.load(model_path) # Assuming model has a get_embeddings method entity_emb, relation_emb = model["_orig_mod.entity_embeddings.weight"], model["_orig_mod.relation_embeddings.weight"] - # Convert to numpy - entity_emb = entity_emb.numpy() - relation_emb = relation_emb.numpy() - - # Write or append to CSV - if not os.path.exists(entity_csv_path) or not os.path.exists(relation_csv_path): - # If CSV files do not exist, create them - pd.DataFrame(entity_emb).to_csv(entity_csv_path, index=True, header=False) - pd.DataFrame(relation_emb).to_csv(relation_csv_path, index=True, header=False) - else: - # If CSV files exist, concatenate to the existing rows - existing_entity_df = pd.read_csv(entity_csv_path, header=None) - existing_relation_df = pd.read_csv(relation_csv_path, header=None) + entity_embeddings.append(entity_emb) + relation_embeddings.append(relation_emb) - # Concatenate along the columns (axis=1) - new_entity_df = pd.concat([existing_entity_df, pd.DataFrame(entity_emb)], axis=1) - new_relation_df = pd.concat([existing_relation_df, pd.DataFrame(relation_emb)], axis=1) + return torch.cat(entity_embeddings, dim=1), torch.cat(relation_embeddings, dim=1) - # Write the updated data back to the CSV files - new_entity_df.to_csv(entity_csv_path, index=False, header=False) - new_relation_df.to_csv(relation_csv_path, index=False, header=False) def from_pretrained_model_write_embeddings_into_csv(path: str) -> None: """ """ assert os.path.exists(path), "Path does not exist" config = load_json(path + '/configuration.json') - if config["trainer"]=="MP": - write_csv_from_model_parallel(path) + entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv") + relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv") + + if config["trainer"]=="TP": + entity_emb, relation_emb = write_csv_from_model_parallel(path) else: - entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv") - relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv") # Load model model = torch.load(os.path.join(path, "model.pt")) # Assuming model has a get_embeddings method entity_emb, relation_emb = model["entity_embeddings.weight"], model["relation_embeddings.weight"] - str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"] - assert str_entity.index.is_monotonic_increasing - str_entity=str_entity.to_list() - # Write entity embeddings with headers and indices - with open(entity_csv_path, "w", newline="") as f: - writer = csv.writer(f) - # Add header (e.g., "", "0", "1", ..., "N") - headers = [""] + [f"{i}" for i in range(entity_emb.size(1))] - writer.writerow(headers) - # Add rows with index - for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)): - writer.writerow([name] + row.tolist()) - str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"] - assert str_relations.index.is_monotonic_increasing - - # Write relation embeddings with headers and indices - with open(relation_csv_path, "w", newline="") as f: - writer = csv.writer(f) - # Add header (e.g., "", "0", "1", ..., "N") - headers = [""] + [f"{i}" for i in range(relation_emb.size(1))] - writer.writerow(headers) - # Add rows with index - for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)): - writer.writerow([name]+ row.tolist()) + str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"] + assert str_entity.index.is_monotonic_increasing + str_entity=str_entity.to_list() + # Write entity embeddings with headers and indices + with open(entity_csv_path, "w", newline="") as f: + writer = csv.writer(f) + # Add header (e.g., "", "0", "1", ..., "N") + headers = [""] + [f"{i}" for i in range(entity_emb.size(1))] + writer.writerow(headers) + # Add rows with index + for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)): + writer.writerow([name] + row.tolist()) + str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"] + assert str_relations.index.is_monotonic_increasing + + # Write relation embeddings with headers and indices + with open(relation_csv_path, "w", newline="") as f: + writer = csv.writer(f) + # Add header (e.g., "", "0", "1", ..., "N") + headers = [""] + [f"{i}" for i in range(relation_emb.size(1))] + writer.writerow(headers) + # Add rows with index + for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)): + writer.writerow([name]+ row.tolist()) """ diff --git a/dicee/static_funcs_training.py b/dicee/static_funcs_training.py index 451e24b4..6e08797c 100644 --- a/dicee/static_funcs_training.py +++ b/dicee/static_funcs_training.py @@ -11,7 +11,7 @@ def make_iterable_verbose(iterable_object, verbose, desc="Default", position=Non def evaluate_lp(model, triple_idx, num_entities, er_vocab: Dict[Tuple, List], re_vocab: Dict[Tuple, List], - info='Eval Starts'): + info='Eval Starts', batch_size=128, chunk_size=1000): """ Evaluate model in a standard link prediction task @@ -21,78 +21,182 @@ def evaluate_lp(model, triple_idx, num_entities, er_vocab: Dict[Tuple, List], re :param model: :param triple_idx: :param info: + :param batch_size: + :param chunk_size: :return: """ model.eval() print(info) print(f'Num of triples {len(triple_idx)}') - print('** Evaluation without batching') + print('** Evaluation with batching') hits = dict() reciprocal_ranks = [] # Iterate over test triples all_entities = torch.arange(0, num_entities).long() all_entities = all_entities.reshape(len(all_entities), ) - # Iterating one by one is not good when you are using batch norm - for i in tqdm(range(0, len(triple_idx))): - # (1) Get a triple (head entity, relation, tail entity - data_point = triple_idx[i] - h, r, t = data_point[0], data_point[1], data_point[2] - - # (2) Predict missing heads and tails - x = torch.stack((torch.tensor(h).repeat(num_entities, ), - torch.tensor(r).repeat(num_entities, ), - all_entities), dim=1) - - predictions_tails = model(x) - x = torch.stack((all_entities, - torch.tensor(r).repeat(num_entities, ), - torch.tensor(t).repeat(num_entities) - ), dim=1) - - predictions_heads = model(x) - del x - - # 3. Computed filtered ranks for missing tail entities. - # 3.1. Compute filtered tail entity rankings - filt_tails = er_vocab[(h, r)] - # 3.2 Get the predicted target's score - target_value = predictions_tails[t].item() - # 3.3 Filter scores of all triples containing filtered tail entities - predictions_tails[filt_tails] = -np.Inf - # 3.4 Reset the target's score - predictions_tails[t] = target_value - # 3.5. Sort the score - _, sort_idxs = torch.sort(predictions_tails, descending=True) - sort_idxs = sort_idxs.detach() - filt_tail_entity_rank = np.where(sort_idxs == t)[0][0] - - # 4. Computed filtered ranks for missing head entities. - # 4.1. Retrieve head entities to be filtered - filt_heads = re_vocab[(r, t)] - # 4.2 Get the predicted target's score - target_value = predictions_heads[h].item() - # 4.3 Filter scores of all triples containing filtered head entities. - predictions_heads[filt_heads] = -np.Inf - predictions_heads[h] = target_value - _, sort_idxs = torch.sort(predictions_heads, descending=True) - sort_idxs = sort_idxs.detach() - filt_head_entity_rank = np.where(sort_idxs == h)[0][0] - - # 4. Add 1 to ranks as numpy array first item has the index of 0. - filt_head_entity_rank += 1 - filt_tail_entity_rank += 1 - - rr = 1.0 / filt_head_entity_rank + (1.0 / filt_tail_entity_rank) - # 5. Store reciprocal ranks. - reciprocal_ranks.append(rr) - # print(f'{i}.th triple: mean reciprical rank:{rr}') - - # 4. Compute Hit@N - for hits_level in range(1, 11): - res = 1 if filt_head_entity_rank <= hits_level else 0 - res += 1 if filt_tail_entity_rank <= hits_level else 0 - if res > 0: - hits.setdefault(hits_level, []).append(res) + + # Evaluation without Batching + # for i in tqdm(range(0, len(triple_idx))): + # # (1) Get a triple (head entity, relation, tail entity + # data_point = triple_idx[i] + # h, r, t = data_point[0], data_point[1], data_point[2] + + # # (2) Predict missing heads and tails + # x = torch.stack((torch.tensor(h).repeat(num_entities, ), + # torch.tensor(r).repeat(num_entities, ), + # all_entities), dim=1) + + # predictions_tails = model(x) + # x = torch.stack((all_entities, + # torch.tensor(r).repeat(num_entities, ), + # torch.tensor(t).repeat(num_entities) + # ), dim=1) + + # predictions_heads = model(x) + # del x + + # # 3. Computed filtered ranks for missing tail entities. + # # 3.1. Compute filtered tail entity rankings + # filt_tails = er_vocab[(h, r)] + # # 3.2 Get the predicted target's score + # target_value = predictions_tails[t].item() + # # 3.3 Filter scores of all triples containing filtered tail entities + # predictions_tails[filt_tails] = -np.Inf + # # 3.4 Reset the target's score + # predictions_tails[t] = target_value + # # 3.5. Sort the score + # _, sort_idxs = torch.sort(predictions_tails, descending=True) + # sort_idxs = sort_idxs.detach() + # filt_tail_entity_rank = np.where(sort_idxs == t)[0][0] + + # # 4. Computed filtered ranks for missing head entities. + # # 4.1. Retrieve head entities to be filtered + # filt_heads = re_vocab[(r, t)] + # # 4.2 Get the predicted target's score + # target_value = predictions_heads[h].item() + # # 4.3 Filter scores of all triples containing filtered head entities. + # predictions_heads[filt_heads] = -np.Inf + # predictions_heads[h] = target_value + # _, sort_idxs = torch.sort(predictions_heads, descending=True) + # sort_idxs = sort_idxs.detach() + # filt_head_entity_rank = np.where(sort_idxs == h)[0][0] + + # # 4. Add 1 to ranks as numpy array first item has the index of 0. + # filt_head_entity_rank += 1 + # filt_tail_entity_rank += 1 + + # rr = 1.0 / filt_head_entity_rank + (1.0 / filt_tail_entity_rank) + # # 5. Store reciprocal ranks. + # reciprocal_ranks.append(rr) + # # print(f'{i}.th triple: mean reciprical rank:{rr}') + + # # 4. Compute Hit@N + # for hits_level in range(1, 11): + # res = 1 if filt_head_entity_rank <= hits_level else 0 + # res += 1 if filt_tail_entity_rank <= hits_level else 0 + # if res > 0: + # hits.setdefault(hits_level, []).append(res) + + # Evaluation with Batching + for batch_start in tqdm(range(0, len(triple_idx), batch_size), desc="Evaluating Batches"): + batch_end = min(batch_start + batch_size, len(triple_idx)) + batch_triples = triple_idx[batch_start:batch_end] + batch_size_current = len(batch_triples) + + # (1) Extract heads, relations, and tails for the batch + h_batch = torch.tensor([data_point[0] for data_point in batch_triples]) + r_batch = torch.tensor([data_point[1] for data_point in batch_triples]) + t_batch = torch.tensor([data_point[2] for data_point in batch_triples]) + + # Initialize score tensors + predictions_tails = torch.zeros(batch_size_current, num_entities) + predictions_heads = torch.zeros(batch_size_current, num_entities) + + # Process entities in chunks to manage memory usage + for chunk_start in range(0, num_entities, chunk_size): + chunk_end = min(chunk_start + chunk_size, num_entities) + entities_chunk = all_entities[chunk_start:chunk_end] + chunk_size_current = entities_chunk.size(0) + + # (2) Predict missing heads and tails + # Prepare input tensors for tail prediction + x_tails = torch.stack(( + h_batch.repeat_interleave(chunk_size_current), + r_batch.repeat_interleave(chunk_size_current), + entities_chunk.repeat(batch_size_current) + ), dim=1) + + # Predict scores for missing tails + preds_tails = model.forward_triples(x_tails) + preds_tails = preds_tails.view(batch_size_current, chunk_size_current) + predictions_tails[:, chunk_start:chunk_end] = preds_tails + del x_tails + + # Prepare input tensors for head prediction + x_heads = torch.stack(( + entities_chunk.repeat(batch_size_current), + r_batch.repeat_interleave(chunk_size_current), + t_batch.repeat_interleave(chunk_size_current) + ), dim=1) + + # Predict scores for missing heads + preds_heads = model.forward_triples(x_heads) + preds_heads = preds_heads.view(batch_size_current, chunk_size_current) + predictions_heads[:, chunk_start:chunk_end] = preds_heads + del x_heads + + # Iterating one by one is not good when you are using batch norm + for i in range(0, batch_size_current): + # (1) Get a triple (head entity, relation, tail entity) + h = h_batch[i].item() + r = r_batch[i].item() + t = t_batch[i].item() + + # 3. Computed filtered ranks for missing tail entities. + # 3.1. Compute filtered tail entity rankings + filt_tails = er_vocab[(h, r)] + filt_tails_set = set(filt_tails) - {t} + filt_tails_indices = list(filt_tails_set) + # 3.2 Get the predicted target's score + target_value = predictions_tails[i, t].item() + # 3.3 Filter scores of all triples containing filtered tail entities + predictions_tails[i, filt_tails_indices] = -np.Inf + # 3.4 Reset the target's score + predictions_tails[i, t] = target_value + # 3.5. Sort the score + _, sort_idxs = torch.sort(predictions_tails[i], descending=True) + sort_idxs = sort_idxs.detach() + filt_tail_entity_rank = np.where(sort_idxs == t)[0][0] + + # 4. Computed filtered ranks for missing head entities. + # 4.1. Retrieve head entities to be filtered + filt_heads = re_vocab[(r, t)] + filt_heads_set = set(filt_heads) - {h} + filt_heads_indices = list(filt_heads_set) + # 4.2 Get the predicted target's score + target_value = predictions_heads[i, h].item() + # 4.3 Filter scores of all triples containing filtered head entities. + predictions_heads[i, filt_heads_indices] = -np.Inf + predictions_heads[i, h] = target_value + _, sort_idxs = torch.sort(predictions_heads[i], descending=True) + sort_idxs = sort_idxs.detach() + filt_head_entity_rank = np.where(sort_idxs == h)[0][0] + + # 4. Add 1 to ranks as numpy array first item has the index of 0. + filt_head_entity_rank += 1 + filt_tail_entity_rank += 1 + + rr = 1.0 / filt_head_entity_rank + (1.0 / filt_tail_entity_rank) + # 5. Store reciprocal ranks. + reciprocal_ranks.append(rr) + # print(f'{i}.th triple: mean reciprical rank:{rr}') + + # 4. Compute Hit@N + for hits_level in range(1, 11): + res = 1 if filt_head_entity_rank <= hits_level else 0 + res += 1 if filt_tail_entity_rank <= hits_level else 0 + if res > 0: + hits.setdefault(hits_level, []).append(res) mean_reciprocal_rank = sum(reciprocal_ranks) / (float(len(triple_idx) * 2)) diff --git a/dicee/trainer/model_parallelism.py b/dicee/trainer/model_parallelism.py index 38ebd39d..dd4a2929 100644 --- a/dicee/trainer/model_parallelism.py +++ b/dicee/trainer/model_parallelism.py @@ -3,6 +3,7 @@ from ..static_funcs_training import make_iterable_verbose from ..models.ensemble import EnsembleKGE from typing import Tuple +import time def extract_input_outputs(z: list, device=None): # pin arrays x,y, which allows us to move them to GPU asynchronously (non_blocking=True) @@ -23,66 +24,105 @@ def extract_input_outputs(z: list, device=None): else: raise ValueError('Unexpected batch shape..') -def find_good_batch_size(train_loader,ensemble_model,max_available_gpu_memory:float=0.05): + +def find_good_batch_size(train_loader,tp_ensemble_model): # () Initial batch size - batch_size=train_loader.batch_size - print("Automatic batch size finding") - for n in range(200): - # () Initialize a dataloader with a current batch_size - train_dataloaders = torch.utils.data.DataLoader(train_loader.dataset, - batch_size=batch_size, - shuffle=True, - sampler=None, - batch_sampler=None, - num_workers=0, - collate_fn=train_loader.dataset.collate_fn, - pin_memory=False, drop_last=False, - timeout=0, - worker_init_fn=None, - persistent_workers=False) - loss=None - for i, z in enumerate(train_dataloaders): - loss = forward_backward_update_loss(z,ensemble_model) + initial_batch_size=train_loader.batch_size + training_dataset_size=len(train_loader.dataset) + if initial_batch_size >= training_dataset_size: + return training_dataset_size, None + print("Number of training data points:",training_dataset_size) + + def increase_batch_size_until_cuda_out_of_memory(ensemble_model, train_loader, batch_size,delta: int = None): + assert delta is not None, "delta must be positive integer" + batch_sizes_and_mem_usages = [] + num_datapoints = len(train_loader.dataset) + try: + while True: + start_time=time.time() + # () Initialize a dataloader with a current batch_size + train_dataloaders = torch.utils.data.DataLoader(train_loader.dataset, + batch_size=batch_size, + shuffle=True, + sampler=None, + batch_sampler=None, + num_workers=train_loader.num_workers, + collate_fn=train_loader.dataset.collate_fn, + pin_memory=False, + drop_last=False, + timeout=0, + worker_init_fn=None, + persistent_workers=False) + + batch_loss = None + for i, batch_of_training_data in enumerate(train_dataloaders): + batch_loss = forward_backward_update_loss(batch_of_training_data, ensemble_model) + break + + global_free_memory, total_memory = torch.cuda.mem_get_info(device="cuda:0") + percentage_used_gpu_memory = (total_memory - global_free_memory) / total_memory + rt=time.time()-start_time + print(f"Random Batch Loss: {batch_loss:0.4}\tGPU Usage: {percentage_used_gpu_memory:0.3}\tRuntime: {rt:.3f}\tBatch Size: {batch_size}") + + global_free_memory, total_memory = torch.cuda.mem_get_info(device="cuda:0") + percentage_used_gpu_memory = (total_memory - global_free_memory) / total_memory + + # Store the batch size and the runtime + batch_sizes_and_mem_usages.append((batch_size, rt)) + + if batch_size < num_datapoints: + # Increase the batch size. + batch_size += int(batch_size / delta) + else: + return batch_sizes_and_mem_usages,True + + except torch.OutOfMemoryError: + print("torch.OutOfMemoryError caught!") + return batch_sizes_and_mem_usages, False + + history_batch_sizes_and_mem_usages=[] + batch_size=initial_batch_size + + for delta in range(1,5,1): + result,flag= increase_batch_size_until_cuda_out_of_memory(tp_ensemble_model, train_loader, batch_size,delta=delta) + + history_batch_sizes_and_mem_usages.extend(result) + + if flag: + batch_size, batch_rt = history_batch_sizes_and_mem_usages[-1] + else: + # CUDA ERROR Observed + batch_size, batch_rt=history_batch_sizes_and_mem_usages[-2] + + if batch_size>=training_dataset_size: + batch_size=training_dataset_size break - global_free_memory, total_memory = torch.cuda.mem_get_info() - available_gpu_memory = global_free_memory / total_memory - print(f"Random Batch Loss: {loss}\tFree/Total GPU Memory: {available_gpu_memory}\tBatch Size:{batch_size}") - # () Stepping criterion - if available_gpu_memory > max_available_gpu_memory and batch_size < len(train_loader.dataset) : - # Increment the current batch size - batch_size+=batch_size else: - if batch_size >= len(train_loader.dataset): - print("Batch size equals to the training dataset size") - else: - print(f"Max GPU memory used\tFree/Total GPU Memory:{available_gpu_memory}") + continue - return batch_size + return batch_size, batch_rt - raise RuntimeError("The computation should be here!") -def forward_backward_update_loss(z:Tuple, ensemble_model): - # () Get the i-th batch of data points. +def forward_backward_update_loss(z:Tuple, ensemble_model)->float: + # () Get a random batch of data points (z). x_batch, y_batch = extract_input_outputs(z) - # () Move the batch of labels into the master GPU : GPU-0 + # () Move the batch of labels into the master GPU : GPU-0. y_batch = y_batch.to("cuda:0") - # () Forward Pass on the batch. Yhat located on the master GPU. + # () Forward pas on the batch of input data points (yhat on the master GPU). yhat = ensemble_model(x_batch) - # () Compute the loss + # () Compute the loss. loss = torch.nn.functional.binary_cross_entropy_with_logits(yhat, y_batch) # () Compute the gradient of the loss w.r.t. parameters. loss.backward() # () Parameter update. ensemble_model.step() - # () Report the batch and epoch losses. - batch_loss = loss.item() - # () Accumulate batch loss - return batch_loss + return loss.item() class TensorParallel(AbstractTrainer): def __init__(self, args, callbacks): super().__init__(args, callbacks) self.models=[] + def get_ensemble(self): return self.models @@ -96,31 +136,42 @@ def fit(self, *args, **kwargs): self.on_fit_start(self, ensemble_model) # () Sanity checking assert torch.cuda.device_count()== len(ensemble_model) - # () + # () Get DataLoader train_dataloader = kwargs['train_dataloaders'] - # () - train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset, - batch_size=find_good_batch_size(train_dataloader, ensemble_model), - shuffle=True, - sampler=None, - batch_sampler=None, - num_workers=self.attributes.num_core, - collate_fn=train_dataloader.dataset.collate_fn, - pin_memory=False, - drop_last=False, - timeout=0, - worker_init_fn=None, - persistent_workers=False) + # () Find a batch size so that available GPU memory is *almost* fully used. + if self.attributes.auto_batch_finding: + batch_size, batch_rt=find_good_batch_size(train_dataloader, ensemble_model) + + train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset, + batch_size=batch_size, + shuffle=True, + sampler=None, + batch_sampler=None, + num_workers=self.attributes.num_core, + collate_fn=train_dataloader.dataset.collate_fn, + pin_memory=False, + drop_last=False, + timeout=0, + worker_init_fn=None, + persistent_workers=False) + if batch_rt is not None: + expected_training_time=batch_rt * len(train_dataloader) * self.attributes.num_epochs + print(f"Exp.Training Runtime: {expected_training_time/60 :.3f} in mins\t|\tBatch Size:{batch_size}\t|\tBatch RT:{batch_rt:.3f}\t|\t # of batches:{len(train_dataloader)}\t|\t# of epochs:{self.attributes.num_epochs}") + + # () Number of batches to reach a single epoch. num_of_batches = len(train_dataloader) # () Start training. for epoch in (tqdm_bar := make_iterable_verbose(range(self.attributes.num_epochs), verbose=True, position=0, leave=True)): + # () Accumulate the batch losses. epoch_loss = 0 # () Iterate over batches. for i, z in enumerate(train_dataloader): + # () Forward, Loss, Backward, and Update on a given batch of data points. batch_loss = forward_backward_update_loss(z,ensemble_model) + # () Accumulate the batch losses to compute the epoch loss. epoch_loss += batch_loss - + # if verbose=TRue, show info. if hasattr(tqdm_bar, 'set_description_str'): tqdm_bar.set_description_str(f"Epoch:{epoch + 1}") if i > 0: @@ -128,11 +179,13 @@ def fit(self, *args, **kwargs): f"batch={i} | {num_of_batches}, loss_step={batch_loss:.5f}, loss_epoch={epoch_loss / i:.5f}") else: tqdm_bar.set_postfix_str(f"loss_step={batch_loss:.5f}, loss_epoch={batch_loss:.5f}") + # Store the epoch loss ensemble_model.loss_history.append(epoch_loss) - + # Run on_fit_end callbacks after the training is done. self.on_fit_end(self, ensemble_model) # TODO: Later, maybe we should write a callback to save the models in disk return ensemble_model + """ def batchwisefit(self, *args, **kwargs): @@ -233,4 +286,4 @@ def torch_buggy_fit(self, *args, **kwargs): torch.distributed.destroy_process_group() # () . self.on_fit_end(self, model) - """ \ No newline at end of file + """ diff --git a/examples/multi_hop_query_answering/benchmarking.py b/examples/multi_hop_query_answering/benchmarking.py index 25286986..9987c0cb 100644 --- a/examples/multi_hop_query_answering/benchmarking.py +++ b/examples/multi_hop_query_answering/benchmarking.py @@ -31,7 +31,7 @@ args = Namespace() args.model = kge_name args.scoring_technique = "KvsAll" - args.path_dataset_folder = "KGs/UMLS" + args.dataset_dir = "KGs/UMLS" args.num_epochs = 20 args.batch_size = 1024 args.lr = 0.1 diff --git a/tests/test_saving_embeddings.py b/tests/test_saving_embeddings.py index 0534b3c4..f83a03e0 100644 --- a/tests/test_saving_embeddings.py +++ b/tests/test_saving_embeddings.py @@ -2,6 +2,7 @@ from dicee.static_funcs import from_pretrained_model_write_embeddings_into_csv from dicee.executer import Execute from dicee.config import Namespace +import torch class TestSavingEmbeddings: def test_saving_embeddings(self): @@ -23,9 +24,10 @@ def test_saving_embeddings(self): def test_model_parallel_saving_embeddings(self): # (1) Train a KGE model - import torch - if torch.cuda.is_available(): + from dicee.config import Namespace + from dicee.executer import Execute + import os args = Namespace() args.model = 'Keci' args.p = 0 @@ -34,9 +36,12 @@ def test_model_parallel_saving_embeddings(self): args.scoring_technique = "KvsAll" args.path_single_kg = "KGs/Family/family-benchmark_rich_background.owl" args.backend = "rdflib" + args.trainer = "TP" args.num_epochs = 1 args.batch_size = 1024 args.lr = 0.1 - args.embedding_dim = 512 + args.embedding_dim = 32 + args.save_embeddings_as_csv = True result = Execute(args).start() - from_pretrained_model_write_embeddings_into_csv(result["path_experiment_folder"]) \ No newline at end of file + assert os.path.exists(result["path_experiment_folder"] + "/Keci_entity_embeddings.csv") + assert os.path.exists(result["path_experiment_folder"] + "/Keci_relation_embeddings.csv")