Skip to content

Commit

Permalink
Merge pull request #276 from dice-group/tensor_parallel
Browse files Browse the repository at this point in the history
Tensor parallel
  • Loading branch information
Demirrr authored Nov 26, 2024
2 parents f38bfa8 + 9ccee78 commit f53c1e7
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 76 deletions.
4 changes: 3 additions & 1 deletion dicee/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def __init__(self, **kwargs):

self.label_smoothing_rate: float = 0.0


self.num_core: int = 0
"""Number of CPUs to be used in the mini-batch loading process"""

Expand Down Expand Up @@ -139,6 +138,9 @@ def __init__(self, **kwargs):
self.continual_learning=None
"Path of a pretrained model size of LLM"

self.auto_batch_finding=False
"A flag for using auto batch finding"

def __iter__(self):
# Iterate
for k, v in self.__dict__.items():
Expand Down
3 changes: 3 additions & 0 deletions dicee/scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def get_default_arguments(description=None):
parser.add_argument("--swa",
action="store_true",
help="Stochastic weight averaging")
parser.add_argument("--auto_batch_finding",
action="store_true",
help="Find a batch size fitting in GPUs. Only available for TP trainer")
parser.add_argument('--degree', type=int, default=0,
help='degree for polynomial embeddings')

Expand Down
86 changes: 35 additions & 51 deletions dicee/static_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,16 +684,15 @@ def download_pretrained_model(url: str) -> str:
download_files_from_url(url_to_download_from, destination_folder=dir_name)
return dir_name

def write_csv_from_model_parallel(path: str) -> None:
def write_csv_from_model_parallel(path: str) :
"""Create"""
assert os.path.exists(path), "Path does not exist"

# Detect files that start with model_ and end with .pt
model_files = [f for f in os.listdir(path) if f.startswith("model_") and f.endswith(".pt")]
model_files.sort() # Sort to maintain order if necessary (e.g., model_0.pt, model_1.pt)

entity_csv_path = os.path.join(path, "entity_embeddings.csv")
relation_csv_path = os.path.join(path, "relation_embeddings.csv")
entity_embeddings=[]
relation_embeddings=[]

# Process each model file
for model_file in model_files:
Expand All @@ -702,65 +701,50 @@ def write_csv_from_model_parallel(path: str) -> None:
model = torch.load(model_path)
# Assuming model has a get_embeddings method
entity_emb, relation_emb = model["_orig_mod.entity_embeddings.weight"], model["_orig_mod.relation_embeddings.weight"]
# Convert to numpy
entity_emb = entity_emb.numpy()
relation_emb = relation_emb.numpy()

# Write or append to CSV
if not os.path.exists(entity_csv_path) or not os.path.exists(relation_csv_path):
# If CSV files do not exist, create them
pd.DataFrame(entity_emb).to_csv(entity_csv_path, index=True, header=False)
pd.DataFrame(relation_emb).to_csv(relation_csv_path, index=True, header=False)
else:
# If CSV files exist, concatenate to the existing rows
existing_entity_df = pd.read_csv(entity_csv_path, header=None)
existing_relation_df = pd.read_csv(relation_csv_path, header=None)
entity_embeddings.append(entity_emb)
relation_embeddings.append(relation_emb)

# Concatenate along the columns (axis=1)
new_entity_df = pd.concat([existing_entity_df, pd.DataFrame(entity_emb)], axis=1)
new_relation_df = pd.concat([existing_relation_df, pd.DataFrame(relation_emb)], axis=1)
return torch.cat(entity_embeddings, dim=1), torch.cat(relation_embeddings, dim=1)

# Write the updated data back to the CSV files
new_entity_df.to_csv(entity_csv_path, index=False, header=False)
new_relation_df.to_csv(relation_csv_path, index=False, header=False)

def from_pretrained_model_write_embeddings_into_csv(path: str) -> None:
""" """
assert os.path.exists(path), "Path does not exist"
config = load_json(path + '/configuration.json')
if config["trainer"]=="MP":
write_csv_from_model_parallel(path)
entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv")
relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv")

if config["trainer"]=="TP":
entity_emb, relation_emb = write_csv_from_model_parallel(path)
else:
entity_csv_path = os.path.join(path, f"{config['model']}_entity_embeddings.csv")
relation_csv_path = os.path.join(path, f"{config['model']}_relation_embeddings.csv")
# Load model
model = torch.load(os.path.join(path, "model.pt"))
# Assuming model has a get_embeddings method
entity_emb, relation_emb = model["entity_embeddings.weight"], model["relation_embeddings.weight"]
str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"]
assert str_entity.index.is_monotonic_increasing
str_entity=str_entity.to_list()
# Write entity embeddings with headers and indices
with open(entity_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(entity_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)):
writer.writerow([name] + row.tolist())
str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"]
assert str_relations.index.is_monotonic_increasing

# Write relation embeddings with headers and indices
with open(relation_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(relation_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)):
writer.writerow([name]+ row.tolist())
str_entity = pd.read_csv(f"{path}/entity_to_idx.csv", index_col=0)["entity"]
assert str_entity.index.is_monotonic_increasing
str_entity=str_entity.to_list()
# Write entity embeddings with headers and indices
with open(entity_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(entity_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name,row) in enumerate(zip(str_entity,entity_emb)):
writer.writerow([name] + row.tolist())
str_relations = pd.read_csv(f"{path}/relation_to_idx.csv", index_col=0)["relation"]
assert str_relations.index.is_monotonic_increasing

# Write relation embeddings with headers and indices
with open(relation_csv_path, "w", newline="") as f:
writer = csv.writer(f)
# Add header (e.g., "", "0", "1", ..., "N")
headers = [""] + [f"{i}" for i in range(relation_emb.size(1))]
writer.writerow(headers)
# Add rows with index
for i_row, (name, row) in enumerate(zip(str_relations,relation_emb)):
writer.writerow([name]+ row.tolist())

"""
Expand Down
44 changes: 24 additions & 20 deletions dicee/trainer/model_parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def find_good_batch_size(train_loader,ensemble_model, max_available_gpu_memory:f
if batch_size >= len(train_loader.dataset):
return batch_size
first_batch_size = train_loader.batch_size

print("Automatic batch size finding")
num_datapoints=len(train_loader.dataset)
print(f"Increment the batch size by {first_batch_size} until the Free/Total GPU memory is reached to {1-max_available_gpu_memory} or batch_size={num_datapoints} is achieved.")
while True:
# () Initialize a dataloader with a current batch_size
train_dataloaders = torch.utils.data.DataLoader(train_loader.dataset,
Expand All @@ -52,15 +52,16 @@ def find_good_batch_size(train_loader,ensemble_model, max_available_gpu_memory:f
avg_global_free_memory.append(global_free_memory / total_memory)
if i==3:
break

avg_global_free_memory=sum(avg_global_free_memory)/len(avg_global_free_memory)
print(f"Random Batch Loss: {loss}\tFree/Total GPU Memory: {avg_global_free_memory}\tBatch Size:{batch_size}")
# () Stepping criterion
if avg_global_free_memory > max_available_gpu_memory and batch_size < len(train_loader.dataset) :
# Increment the current batch size
batch_size+=first_batch_size
if avg_global_free_memory > max_available_gpu_memory and batch_size < num_datapoints :
if batch_size+first_batch_size <= num_datapoints:
batch_size+=first_batch_size
else:
batch_size=num_datapoints
else:
if batch_size >= len(train_loader.dataset):
assert batch_size<=num_datapoints
if batch_size == num_datapoints:
print("Batch size equals to the training dataset size")
else:
print(f"Max GPU memory used\tFree/Total GPU Memory:{avg_global_free_memory}")
Expand Down Expand Up @@ -88,6 +89,7 @@ class TensorParallel(AbstractTrainer):
def __init__(self, args, callbacks):
super().__init__(args, callbacks)
self.models=[]

def get_ensemble(self):
return self.models

Expand All @@ -104,18 +106,20 @@ def fit(self, *args, **kwargs):
# ()
train_dataloader = kwargs['train_dataloaders']
# ()
train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset,
batch_size=find_good_batch_size(train_dataloader, ensemble_model),
shuffle=True,
sampler=None,
batch_sampler=None,
num_workers=self.attributes.num_core,
collate_fn=train_dataloader.dataset.collate_fn,
pin_memory=False,
drop_last=False,
timeout=0,
worker_init_fn=None,
persistent_workers=False)
if self.attributes.auto_batch_finding:
train_dataloader = torch.utils.data.DataLoader(train_dataloader.dataset,
batch_size=find_good_batch_size(train_dataloader, ensemble_model),
shuffle=True,
sampler=None,
batch_sampler=None,
num_workers=self.attributes.num_core,
collate_fn=train_dataloader.dataset.collate_fn,
pin_memory=False,
drop_last=False,
timeout=0,
worker_init_fn=None,
persistent_workers=False)

num_of_batches = len(train_dataloader)
# () Start training.
for epoch in (tqdm_bar := make_iterable_verbose(range(self.attributes.num_epochs),
Expand Down
13 changes: 9 additions & 4 deletions tests/test_saving_embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dicee.static_funcs import from_pretrained_model_write_embeddings_into_csv
from dicee.executer import Execute
from dicee.config import Namespace
import torch

class TestSavingEmbeddings:
def test_saving_embeddings(self):
Expand All @@ -23,9 +24,10 @@ def test_saving_embeddings(self):

def test_model_parallel_saving_embeddings(self):
# (1) Train a KGE model
import torch

if torch.cuda.is_available():
from dicee.config import Namespace
from dicee.executer import Execute
import os
args = Namespace()
args.model = 'Keci'
args.p = 0
Expand All @@ -34,9 +36,12 @@ def test_model_parallel_saving_embeddings(self):
args.scoring_technique = "KvsAll"
args.path_single_kg = "KGs/Family/family-benchmark_rich_background.owl"
args.backend = "rdflib"
args.trainer = "TP"
args.num_epochs = 1
args.batch_size = 1024
args.lr = 0.1
args.embedding_dim = 512
args.embedding_dim = 32
args.save_embeddings_as_csv = True
result = Execute(args).start()
from_pretrained_model_write_embeddings_into_csv(result["path_experiment_folder"])
assert os.path.exists(result["path_experiment_folder"] + "/Keci_entity_embeddings.csv")
assert os.path.exists(result["path_experiment_folder"] + "/Keci_relation_embeddings.csv")

0 comments on commit f53c1e7

Please sign in to comment.