From 6fd789a1ecb8b7229ba7b760200b9b943b87ec09 Mon Sep 17 00:00:00 2001 From: Sachin Gupta Date: Mon, 30 Dec 2024 18:25:23 +0530 Subject: [PATCH 1/2] Updated to maintain compatibility with Pytorch 2.x --- ...rkflow_Interface_Mnist_Implementation_2.py | 84 +++++++++++++------ .../Global_DP/requirements_global_dp.txt | 7 +- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py b/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py index af2870b2c3..ddb8fa7385 100644 --- a/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py +++ b/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py @@ -33,14 +33,18 @@ momentum = 0.5 log_interval = 10 +# Fixing the seed for result repeatation: remove below to stop repeatable runs random_seed = 5495300300540669060 -g_device = torch.Generator(device="cuda") -# Uncomment the line below to use g_cpu if not using cuda -# g_device = torch.Generator() # noqa: E800 -# NOTE: remove below to stop repeatable runs +# Determine the device to use (CUDA if available, otherwise CPU) +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + +# Create a generator for the specified device +g_device = torch.Generator(device=device) + +# Set the seed for the generator g_device.manual_seed(random_seed) -print(f"\n\nWe are using seed: {random_seed}") +print(f"\n\nWe are using seed: {random_seed} on device: {device}") mnist_train = torchvision.datasets.MNIST( "files/", @@ -77,11 +81,10 @@ class GlobalModelTools(object): """ def __init__( - self, example_model_state, global_model, collaborator_names, dp_params=None + self, example_model_state, global_model, num_collaborators, dp_params=None ): self.example_state = example_model_state self.global_model = global_model - self.collaborator_names = collaborator_names self.global_optimizer = torch.optim.SGD( params=self.global_model.parameters(), lr=1.0 ) # This choice of optimizer is required for correct model aggregation @@ -95,10 +98,8 @@ def __init__( else: sample_rate = dp_params["sample_rate"] self.global_data_loader = torch.utils.data.DataLoader( - TensorDataset( - torch.Tensor(list(range(len(self.collaborator_names)))).to(torch.int) - ), - batch_size=int(sample_rate * float(len(collaborator_names))), + TensorDataset(torch.Tensor(list(range(num_collaborators))).to(torch.int)), + batch_size=int(sample_rate * float(num_collaborators)), shuffle=True, ) if dp_params is not None: @@ -113,6 +114,13 @@ def __init__( noise_multiplier=dp_params["noise_multiplier"], max_grad_norm=dp_params["clip_norm"], ) + # NOTE: Added to migrate to torch 2.x versions + self.global_optimizer._optimizer_step_pre_hooks = ( + self.global_optimizer.original_optimizer._optimizer_step_pre_hooks + ) + self.global_optimizer._optimizer_step_post_hooks = ( + self.global_optimizer.original_optimizer._optimizer_step_post_hooks + ) def populate_model_params_and_gradients( self, state_for_params, states_for_gradients @@ -223,7 +231,7 @@ def inference(network, test_loader, device): correct += pred.eq(target.data.view_as(pred)).sum() test_loss /= len(test_loader.dataset) print( - "\nTest set: Avg. loss: {test_loss:.4f}," + f"\nTest set: Avg. loss: {test_loss:.4f}," f" Accuracy: {correct}/{len(test_loader.dataset)}" f" ({(100.0 * correct / len(test_loader.dataset)):.0f})\n" ) @@ -338,12 +346,6 @@ def __init__( self.dp_params = config["differential_privacy"] print(f"Here are dp_params: {self.dp_params}") validate_dp_params(self.dp_params) - self.global_model_tools = GlobalModelTools( - global_model=self.global_model, - example_model_state=self.model.state_dict(), - collaborator_names=self.collaborator_names, - dp_params=self.dp_params, - ) @aggregator def start(self): @@ -596,11 +598,8 @@ def end(self): else: device = torch.device("cpu") - # Setup participants - # Set `num_gpus=0.09` to `num_gpus=0.0` in order to run this tutorial on CPU - aggregator = Aggregator(num_gpus=0.09) - - # Setup collaborators with private attributes + ###### Setup participants in Federation ###### + # Define collaborators collaborator_names = [ "Portland", "Seattle", @@ -614,9 +613,45 @@ def end(self): "Guadalajara", ] + def callable_to_initialize_aggregator_private_attributes( + path, global_model, model, num_collaborators + ): + """Callable to initialize aggregator private attributes""" + + config = parse_config(path) + + if "differential_privacy" not in config: + dp_params = None + else: + dp_params = config["differential_privacy"] + print(f"Here are dp_params: {dp_params}") + validate_dp_params(dp_params) + + return { + "global_model_tools": GlobalModelTools( + global_model=global_model, + example_model_state=model.state_dict(), + num_collaborators=num_collaborators, + dp_params=dp_params, + ) + } + + # Set `num_gpus=0.09` to `num_gpus=0.0` in order to run this tutorial on CPU + aggregator = Aggregator( + name="agg", + private_attributes_callable=callable_to_initialize_aggregator_private_attributes, + num_cpus=0.0, + num_gpus=0.0, + path=args.config_path, + global_model=Net(), + model=Net(), + num_collaborators=len(collaborator_names), + ) + def callable_to_initialize_collaborator_private_attributes( index, n_collaborators, batch_size, train_dataset, test_dataset ): + """Callable to initialize collaborator private attributes""" train = deepcopy(train_dataset) test = deepcopy(test_dataset) train.data = train_dataset.data[index::n_collaborators] @@ -633,6 +668,7 @@ def callable_to_initialize_collaborator_private_attributes( ), } + # Setup collaborators with private attributes collaborators = [] for idx, collaborator_name in enumerate(collaborator_names): collaborators.append( @@ -641,7 +677,7 @@ def callable_to_initialize_collaborator_private_attributes( private_attributes_callable=callable_to_initialize_collaborator_private_attributes, # Set `num_gpus=0.09` to `num_gpus=0.0` in order to run this tutorial on CPU num_cpus=0.0, - num_gpus=0.09, # Assuming GPU(s) is available in the machine + num_gpus=0.0, # Assuming GPU(s) is available in the machine index=idx, n_collaborators=len(collaborator_names), batch_size=batch_size_train, diff --git a/openfl-tutorials/experimental/workflow/Global_DP/requirements_global_dp.txt b/openfl-tutorials/experimental/workflow/Global_DP/requirements_global_dp.txt index f5118684aa..fd15c0985b 100644 --- a/openfl-tutorials/experimental/workflow/Global_DP/requirements_global_dp.txt +++ b/openfl-tutorials/experimental/workflow/Global_DP/requirements_global_dp.txt @@ -1,9 +1,8 @@ cloudpickle -matplotlib==3.6.0 +matplotlib==3.10.0 numpy==1.23.3 opacus==1.5.1 pillow==10.3.0 pyyaml==6.0 -torch==2.2.0 -torchaudio==2.2.0 -torchvision==0.17.0 +torch==2.5.1 +torchvision==0.20.1 From 0f31ae4065b1247cbf34e323bf90f12db9284eaa Mon Sep 17 00:00:00 2001 From: Sachin Gupta Date: Mon, 30 Dec 2024 18:53:14 +0530 Subject: [PATCH 2/2] Updates --- ...rkflow_Interface_Mnist_Implementation_2.py | 43 ++++++++----------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py b/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py index ddb8fa7385..54a771ae3d 100644 --- a/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py +++ b/openfl-tutorials/experimental/workflow/Global_DP/Workflow_Interface_Mnist_Implementation_2.py @@ -316,7 +316,7 @@ def parse_config(config_path): class FederatedFlow(FLSpec): def __init__( self, - config_path, + dp_params, model, collaborator_names, device, @@ -337,15 +337,7 @@ def __init__( # we will set this attribute at the beginning of each round (tracks using # indices against the collaborator list) self.round_collaborator_idxs = None - - config = parse_config(config_path) - - if "differential_privacy" not in config: - self.dp_params = None - else: - self.dp_params = config["differential_privacy"] - print(f"Here are dp_params: {self.dp_params}") - validate_dp_params(self.dp_params) + self.dp_params = dp_params @aggregator def start(self): @@ -380,7 +372,7 @@ def start(self): } print("\n\n" + 20 * "#") - print("Round {self.round}...") + print(f"Round {self.round}...") print("Training with collaborators: ", self.round_collaborators) print(20 * "#" + "\n\n") @@ -598,6 +590,15 @@ def end(self): else: device = torch.device("cpu") + config = parse_config(args.config_path) + + if "differential_privacy" not in config: + dp_params = None + else: + dp_params = config["differential_privacy"] + print(f"Here are dp_params: {dp_params}") + validate_dp_params(dp_params) + ###### Setup participants in Federation ###### # Define collaborators collaborator_names = [ @@ -614,26 +615,17 @@ def end(self): ] def callable_to_initialize_aggregator_private_attributes( - path, global_model, model, num_collaborators + dp_params, global_model, model, num_collaborators ): """Callable to initialize aggregator private attributes""" - config = parse_config(path) - - if "differential_privacy" not in config: - dp_params = None - else: - dp_params = config["differential_privacy"] - print(f"Here are dp_params: {dp_params}") - validate_dp_params(dp_params) - return { "global_model_tools": GlobalModelTools( global_model=global_model, example_model_state=model.state_dict(), num_collaborators=num_collaborators, dp_params=dp_params, - ) + ), } # Set `num_gpus=0.09` to `num_gpus=0.0` in order to run this tutorial on CPU @@ -642,7 +634,7 @@ def callable_to_initialize_aggregator_private_attributes( private_attributes_callable=callable_to_initialize_aggregator_private_attributes, num_cpus=0.0, num_gpus=0.0, - path=args.config_path, + dp_params=dp_params, global_model=Net(), model=Net(), num_collaborators=len(collaborator_names), @@ -687,8 +679,7 @@ def callable_to_initialize_collaborator_private_attributes( ) local_runtime = LocalRuntime( - aggregator=aggregator, collaborators=collaborators, backend="ray" - ) + aggregator=aggregator, collaborators=collaborators, backend="ray") print(f"Local runtime collaborators = {local_runtime.collaborators}") best_model = None @@ -697,7 +688,7 @@ def callable_to_initialize_collaborator_private_attributes( total_rounds = 10 flflow = FederatedFlow( - config_path=args.config_path, + dp_params=dp_params, model=initial_model, collaborator_names=collaborator_names, device=device,