Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global_DP Tutorial: Update dependency and resolve serialization issue with ray backend #2

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@
momentum = 0.5
log_interval = 10

# Fixing the seed for result repeatation: remove below to stop repeatable runs
random_seed = 5495300300540669060

g_device = torch.Generator(device="cuda")
# Uncomment the line below to use g_cpu if not using cuda
# g_device = torch.Generator() # noqa: E800
# NOTE: remove below to stop repeatable runs
# Determine the device to use (CUDA if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create a generator for the specified device
g_device = torch.Generator(device=device)

# Set the seed for the generator
g_device.manual_seed(random_seed)
print(f"\n\nWe are using seed: {random_seed}")
print(f"\n\nWe are using seed: {random_seed} on device: {device}")

mnist_train = torchvision.datasets.MNIST(
"files/",
Expand Down Expand Up @@ -77,11 +81,10 @@ class GlobalModelTools(object):
"""

def __init__(
self, example_model_state, global_model, collaborator_names, dp_params=None
self, example_model_state, global_model, num_collaborators, dp_params=None
):
self.example_state = example_model_state
self.global_model = global_model
self.collaborator_names = collaborator_names
self.global_optimizer = torch.optim.SGD(
params=self.global_model.parameters(), lr=1.0
) # This choice of optimizer is required for correct model aggregation
Expand All @@ -95,10 +98,8 @@ def __init__(
else:
sample_rate = dp_params["sample_rate"]
self.global_data_loader = torch.utils.data.DataLoader(
TensorDataset(
torch.Tensor(list(range(len(self.collaborator_names)))).to(torch.int)
),
batch_size=int(sample_rate * float(len(collaborator_names))),
TensorDataset(torch.Tensor(list(range(num_collaborators))).to(torch.int)),
batch_size=int(sample_rate * float(num_collaborators)),
shuffle=True,
)
if dp_params is not None:
Expand All @@ -113,6 +114,13 @@ def __init__(
noise_multiplier=dp_params["noise_multiplier"],
max_grad_norm=dp_params["clip_norm"],
)
# NOTE: Added to migrate to torch 2.x versions
self.global_optimizer._optimizer_step_pre_hooks = (
self.global_optimizer.original_optimizer._optimizer_step_pre_hooks
)
self.global_optimizer._optimizer_step_post_hooks = (
self.global_optimizer.original_optimizer._optimizer_step_post_hooks
)

def populate_model_params_and_gradients(
self, state_for_params, states_for_gradients
Expand Down Expand Up @@ -223,7 +231,7 @@ def inference(network, test_loader, device):
correct += pred.eq(target.data.view_as(pred)).sum()
test_loss /= len(test_loader.dataset)
print(
"\nTest set: Avg. loss: {test_loss:.4f},"
f"\nTest set: Avg. loss: {test_loss:.4f},"
f" Accuracy: {correct}/{len(test_loader.dataset)}"
f" ({(100.0 * correct / len(test_loader.dataset)):.0f})\n"
)
Expand Down Expand Up @@ -308,7 +316,7 @@ def parse_config(config_path):
class FederatedFlow(FLSpec):
def __init__(
self,
config_path,
dp_params,
model,
collaborator_names,
device,
Expand All @@ -329,21 +337,7 @@ def __init__(
# we will set this attribute at the beginning of each round (tracks using
# indices against the collaborator list)
self.round_collaborator_idxs = None

config = parse_config(config_path)

if "differential_privacy" not in config:
self.dp_params = None
else:
self.dp_params = config["differential_privacy"]
print(f"Here are dp_params: {self.dp_params}")
validate_dp_params(self.dp_params)
self.global_model_tools = GlobalModelTools(
global_model=self.global_model,
example_model_state=self.model.state_dict(),
collaborator_names=self.collaborator_names,
dp_params=self.dp_params,
)
self.dp_params = dp_params

@aggregator
def start(self):
Expand Down Expand Up @@ -378,7 +372,7 @@ def start(self):
}

print("\n\n" + 20 * "#")
print("Round {self.round}...")
print(f"Round {self.round}...")
print("Training with collaborators: ", self.round_collaborators)
print(20 * "#" + "\n\n")

Expand Down Expand Up @@ -596,11 +590,17 @@ def end(self):
else:
device = torch.device("cpu")

# Setup participants
# Set `num_gpus=0.09` to `num_gpus=0.0` in order to run this tutorial on CPU
aggregator = Aggregator(num_gpus=0.09)
config = parse_config(args.config_path)

# Setup collaborators with private attributes
if "differential_privacy" not in config:
dp_params = None
else:
dp_params = config["differential_privacy"]
print(f"Here are dp_params: {dp_params}")
validate_dp_params(dp_params)

###### Setup participants in Federation ######
# Define collaborators
collaborator_names = [
"Portland",
"Seattle",
Expand All @@ -614,9 +614,36 @@ def end(self):
"Guadalajara",
]

def callable_to_initialize_aggregator_private_attributes(
dp_params, global_model, model, num_collaborators
):
"""Callable to initialize aggregator private attributes"""

return {
"global_model_tools": GlobalModelTools(
global_model=global_model,
example_model_state=model.state_dict(),
num_collaborators=num_collaborators,
dp_params=dp_params,
),
}

# Set `num_gpus=0.09` to `num_gpus=0.0` in order to run this tutorial on CPU
aggregator = Aggregator(
name="agg",
private_attributes_callable=callable_to_initialize_aggregator_private_attributes,
num_cpus=0.0,
num_gpus=0.0,
dp_params=dp_params,
global_model=Net(),
model=Net(),
num_collaborators=len(collaborator_names),
)

def callable_to_initialize_collaborator_private_attributes(
index, n_collaborators, batch_size, train_dataset, test_dataset
):
"""Callable to initialize collaborator private attributes"""
train = deepcopy(train_dataset)
test = deepcopy(test_dataset)
train.data = train_dataset.data[index::n_collaborators]
Expand All @@ -633,6 +660,7 @@ def callable_to_initialize_collaborator_private_attributes(
),
}

# Setup collaborators with private attributes
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
collaborators.append(
Expand All @@ -641,7 +669,7 @@ def callable_to_initialize_collaborator_private_attributes(
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
# Set `num_gpus=0.09` to `num_gpus=0.0` in order to run this tutorial on CPU
num_cpus=0.0,
num_gpus=0.09, # Assuming GPU(s) is available in the machine
num_gpus=0.0, # Assuming GPU(s) is available in the machine
index=idx,
n_collaborators=len(collaborator_names),
batch_size=batch_size_train,
Expand All @@ -651,8 +679,7 @@ def callable_to_initialize_collaborator_private_attributes(
)

local_runtime = LocalRuntime(
aggregator=aggregator, collaborators=collaborators, backend="ray"
)
aggregator=aggregator, collaborators=collaborators, backend="ray")
print(f"Local runtime collaborators = {local_runtime.collaborators}")

best_model = None
Expand All @@ -661,7 +688,7 @@ def callable_to_initialize_collaborator_private_attributes(
total_rounds = 10

flflow = FederatedFlow(
config_path=args.config_path,
dp_params=dp_params,
model=initial_model,
collaborator_names=collaborator_names,
device=device,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
cloudpickle
matplotlib==3.6.0
matplotlib==3.10.0
numpy==1.23.3
opacus==1.5.1
pillow==10.3.0
pyyaml==6.0
torch==2.2.0
torchaudio==2.2.0
torchvision==0.17.0
torch==2.5.1
torchvision==0.20.1
Loading