diff --git a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml index cae2fd0028..0dfa740e42 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml @@ -10,16 +10,8 @@ aggregator: rounds_to_train: 2 write_logs: false template: openfl.component.aggregator.Aggregator -assigner: - settings: - task_groups: - - name: learning - percentage: 1.0 - tasks: - - aggregated_model_validation - - train - - locally_tuned_model_validation - template: openfl.component.RandomGroupedAssigner +assigner : + defaults : plan/defaults/assigner.yaml collaborator: settings: db_store_rounds: 1 diff --git a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml index 580ce79760..fdb889e596 100644 --- a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml @@ -32,10 +32,12 @@ network : defaults : plan/defaults/network.yaml assigner : - defaults : plan/defaults/federated-evaluation/assigner.yaml - + defaults : plan/defaults/assigner.yaml + settings : + selected_task_group : evaluation + tasks : - defaults : plan/defaults/federated-evaluation/tasks_torch.yaml + defaults : plan/defaults/tasks_torch.yaml compression_pipeline : defaults : plan/defaults/compression_pipeline.yaml diff --git a/openfl-workspace/workspace/plan/defaults/aggregator.yaml b/openfl-workspace/workspace/plan/defaults/aggregator.yaml index 9fc0481f29..5ef44847f6 100644 --- a/openfl-workspace/workspace/plan/defaults/aggregator.yaml +++ b/openfl-workspace/workspace/plan/defaults/aggregator.yaml @@ -3,3 +3,4 @@ settings : db_store_rounds : 2 persist_checkpoint: True persistent_db_path: local_state/tensor.db + diff --git a/openfl-workspace/workspace/plan/defaults/assigner.yaml b/openfl-workspace/workspace/plan/defaults/assigner.yaml index 6a5903794f..4487d42a59 100644 --- a/openfl-workspace/workspace/plan/defaults/assigner.yaml +++ b/openfl-workspace/workspace/plan/defaults/assigner.yaml @@ -7,3 +7,7 @@ settings : - aggregated_model_validation - train - locally_tuned_model_validation + - name : evaluation + percentage : 1.0 + tasks : + - aggregated_model_validation diff --git a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml b/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml deleted file mode 100644 index c660659e83..0000000000 --- a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml +++ /dev/null @@ -1,7 +0,0 @@ -template : openfl.component.RandomGroupedAssigner -settings : - task_groups : - - name : evaluation - percentage : 1.0 - tasks : - - aggregated_model_validation \ No newline at end of file diff --git a/openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml b/openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml deleted file mode 100644 index f497ca845c..0000000000 --- a/openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml +++ /dev/null @@ -1,7 +0,0 @@ -aggregated_model_validation: - function : validate_task - kwargs : - apply : global - metrics : - - acc - \ No newline at end of file diff --git a/openfl/component/__init__.py b/openfl/component/__init__.py index 3b787f87d0..45c5226cf0 100644 --- a/openfl/component/__init__.py +++ b/openfl/component/__init__.py @@ -1,6 +1,7 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +"""OpenFL Component Module.""" from openfl.component.aggregator.aggregator import Aggregator from openfl.component.assigner.assigner import Assigner diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index d51ea68291..07c1d38ac1 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -84,7 +84,6 @@ def __init__( callbacks: Optional[List] = None, persist_checkpoint=True, persistent_db_path=None, - task_group: str = "learning", ): """Initializes the Aggregator. @@ -111,9 +110,7 @@ def __init__( Defaults to 1. initial_tensor_dict (dict, optional): Initial tensor dictionary. callbacks: List of callbacks to be used during the experiment. - task_group (str, optional): Selected task_group for assignment. """ - self.task_group = task_group self.round_number = 0 self.next_model_round_number = 0 @@ -130,16 +127,21 @@ def __init__( self.straggler_handling_policy = ( straggler_handling_policy or CutoffTimeBasedStragglerHandling() ) - self._end_of_round_check_done = [False] * rounds_to_train - self.stragglers = [] self.rounds_to_train = rounds_to_train + self.assigner = assigner + if self.assigner.is_task_group_evaluation(): + self.rounds_to_train = 1 + logger.info(f"For evaluation tasks setting rounds_to_train = {self.rounds_to_train}") + + self._end_of_round_check_done = [False] * rounds_to_train + self.stragglers = [] # if the collaborator requests a delta, this value is set to true self.authorized_cols = authorized_cols self.uuid = aggregator_uuid self.federation_uuid = federation_uuid - self.assigner = assigner + self.quit_job_sent_to = [] self.tensor_db = TensorDB() @@ -301,8 +303,8 @@ def _load_initial_tensors(self): ) # Check selected task_group before updating round number - if self.task_group == "evaluation": - logger.info(f"Skipping round_number check for {self.task_group} task_group") + if self.assigner.is_task_group_evaluation(): + logger.info("Skipping round_number check for evaluation run") elif round_number > self.round_number: logger.info(f"Starting training from round {round_number} of previously saved model") self.round_number = round_number diff --git a/openfl/component/assigner/__init__.py b/openfl/component/assigner/__init__.py index 980a524b7f..18adaab240 100644 --- a/openfl/component/assigner/__init__.py +++ b/openfl/component/assigner/__init__.py @@ -1,6 +1,7 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +"""OpenFL Assigner Module.""" from openfl.component.assigner.assigner import Assigner from openfl.component.assigner.random_grouped_assigner import RandomGroupedAssigner diff --git a/openfl/component/assigner/assigner.py b/openfl/component/assigner/assigner.py index c86aea1ae1..d49a68ffd6 100644 --- a/openfl/component/assigner/assigner.py +++ b/openfl/component/assigner/assigner.py @@ -4,6 +4,11 @@ """Assigner module.""" +import logging +from functools import wraps + +logger = logging.getLogger(__name__) + class Assigner: r""" @@ -35,18 +40,27 @@ class Assigner: \* - ``tasks`` argument is taken from ``tasks`` section of FL plan YAML file. """ - def __init__(self, tasks, authorized_cols, rounds_to_train, **kwargs): + def __init__( + self, + tasks, + authorized_cols, + rounds_to_train, + selected_task_group: str = "learning", + **kwargs, + ): """Initializes the Assigner. Args: tasks (list of object): List of tasks to assign. authorized_cols (list of str): Collaborators. rounds_to_train (int): Number of training rounds. + selected_task_group (str, optional): Selected task_group. Defaults to "learning". **kwargs: Additional keyword arguments. """ self.tasks = tasks self.authorized_cols = authorized_cols self.rounds = rounds_to_train + self.selected_task_group = selected_task_group self.all_tasks_in_groups = [] self.task_group_collaborators = {} @@ -67,6 +81,16 @@ def get_collaborators_for_task(self, task_name, round_number): """Abstract method.""" raise NotImplementedError + def is_task_group_evaluation(self): + """Check if the selected task group is for 'evaluation' run. + + Returns: + bool: True if the selected task group is 'evaluation', False otherwise. + """ + if hasattr(self, "selected_task_group"): + return self.selected_task_group == "evaluation" + return False + def get_all_tasks_for_round(self, round_number): """Return tasks for the current round. @@ -93,3 +117,37 @@ def get_aggregation_type_for_task(self, task_name): if "aggregation_type" not in self.tasks[task_name]: return None return self.tasks[task_name]["aggregation_type"] + + @classmethod + def task_group_filtering(cls, func): + """Decorator to filter task groups based on selected_task_group. + + This decorator should be applied to define_task_assignments() method + in Assigner subclasses to handle task_group filtering. + """ + + @wraps(func) + def wrapper(self, *args, **kwargs): + # First check if selection of task_group is applicable + if hasattr(self, "selected_task_group"): + # Verify task_groups exists before attempting filtering + if not hasattr(self, "task_groups"): + logger.warning( + "Task group specified for selection but no task_groups found. " + "Skipping filtering. This might be intentional for custom assigners." + ) + return func(self, *args, **kwargs) + + assert self.task_groups, "No task_groups defined in assigner." + + # Perform the filtering + self.task_groups = [ + group for group in self.task_groups if group["name"] == self.selected_task_group + ] + + assert self.task_groups, f"No task groups found for : {self.selected_task_group}" + + # Call the original method + return func(self, *args, **kwargs) + + return wrapper diff --git a/openfl/component/assigner/random_grouped_assigner.py b/openfl/component/assigner/random_grouped_assigner.py index dea00022a4..24931dbd96 100644 --- a/openfl/component/assigner/random_grouped_assigner.py +++ b/openfl/component/assigner/random_grouped_assigner.py @@ -6,7 +6,7 @@ import numpy as np -from openfl.component.assigner.assigner import Assigner +from openfl.component.assigner import Assigner class RandomGroupedAssigner(Assigner): @@ -33,16 +33,19 @@ class RandomGroupedAssigner(Assigner): \* - Plan setting. """ + task_group_filtering = Assigner.task_group_filtering + def __init__(self, task_groups, **kwargs): """Initializes the RandomGroupedAssigner. Args: task_groups (list of object): Task groups to assign. - **kwargs: Additional keyword arguments. + **kwargs: Additional keyword arguments, including mode. """ self.task_groups = task_groups super().__init__(**kwargs) + @task_group_filtering def define_task_assignments(self): """Define task assignments for each round and collaborator. diff --git a/openfl/component/assigner/static_grouped_assigner.py b/openfl/component/assigner/static_grouped_assigner.py index fcb5a59034..885ff1cd8f 100644 --- a/openfl/component/assigner/static_grouped_assigner.py +++ b/openfl/component/assigner/static_grouped_assigner.py @@ -32,6 +32,8 @@ class StaticGroupedAssigner(Assigner): \* - Plan setting. """ + task_group_filtering = Assigner.task_group_filtering + def __init__(self, task_groups, **kwargs): """Initializes the StaticGroupedAssigner. @@ -42,6 +44,7 @@ def __init__(self, task_groups, **kwargs): self.task_groups = task_groups super().__init__(**kwargs) + @task_group_filtering def define_task_assignments(self): """Define task assignments for each round and collaborator. diff --git a/openfl/interface/aggregator.py b/openfl/interface/aggregator.py index 2297cc4d2b..73f25311f7 100644 --- a/openfl/interface/aggregator.py +++ b/openfl/interface/aggregator.py @@ -94,10 +94,10 @@ def start_(plan, authorized_cols, task_group): cols_config_path=Path(authorized_cols).absolute(), ) - # Set task_group in aggregator settings - if "settings" not in parsed_plan.config["aggregator"]: - parsed_plan.config["aggregator"]["settings"] = {} - parsed_plan.config["aggregator"]["settings"]["task_group"] = task_group + # Set selected_task_group in assigner settings + if "settings" not in parsed_plan.config["assigner"]: + parsed_plan.config["assigner"]["settings"] = {} + parsed_plan.config["assigner"]["settings"]["selected_task_group"] = task_group logger.info(f"Setting aggregator to assign: {task_group} task_group") logger.info("🧿 Starting the Aggregator Service.") diff --git a/tests/openfl/component/aggregator/test_aggregator.py b/tests/openfl/component/aggregator/test_aggregator.py index f90b457925..f9883fd7b7 100644 --- a/tests/openfl/component/aggregator/test_aggregator.py +++ b/tests/openfl/component/aggregator/test_aggregator.py @@ -48,16 +48,13 @@ def agg(mocker, model, assigner): 'some_uuid', 'federation_uuid', ['col1', 'col2'], - 'init_state_path', 'best_state_path', 'last_state_path', - assigner, - ) + ) return agg - @pytest.mark.parametrize( 'cert_common_name,collaborator_common_name,authorized_cols,single_cccn,expected_is_valid', [ ('col1', 'col1', ['col1', 'col2'], '', True), diff --git a/tests/openfl/component/assigner/test_assigner.py b/tests/openfl/component/assigner/test_assigner.py index ff95644e91..df084b297d 100644 --- a/tests/openfl/component/assigner/test_assigner.py +++ b/tests/openfl/component/assigner/test_assigner.py @@ -19,7 +19,7 @@ def assigner(): def test_get_aggregation_type_for_task_none(assigner): """Assert that aggregation type of custom task is None.""" - task_name = 'test_name' + task_name = "test_name" tasks = {task_name: {}} assigner = assigner(tasks, None, None) @@ -31,11 +31,9 @@ def test_get_aggregation_type_for_task_none(assigner): def test_get_aggregation_type_for_task(assigner): """Assert that aggregation type of task is getting correctly.""" - task_name = 'test_name' - test_aggregation_type = 'test_aggregation_type' - tasks = {task_name: { - 'aggregation_type': test_aggregation_type - }} + task_name = "test_name" + test_aggregation_type = "test_aggregation_type" + tasks = {task_name: {"aggregation_type": test_aggregation_type}} assigner = assigner(tasks, None, None) aggregation_type = assigner.get_aggregation_type_for_task(task_name) @@ -46,13 +44,23 @@ def test_get_aggregation_type_for_task(assigner): def test_get_all_tasks_for_round(assigner): """Assert that assigner tasks object is list.""" assigner = Assigner(None, None, None) - tasks = assigner.get_all_tasks_for_round('test') + tasks = assigner.get_all_tasks_for_round("test") assert isinstance(tasks, list) +def test_default_task_group(assigner): + """Assert that by default learning task_group is assigned.""" + assigner = Assigner(None,None,None) + assert assigner.selected_task_group == 'learning' -class TestNotImplError(TestCase): +def test_task_group_filtering_no_task_groups(assigner): + """Assert that task_group_filtering does not filter when no task_groups are defined.""" + assigner = Assigner(None,None,None) + assigner.selected_task_group = "test_group" + assigner.define_task_assignments() + assert not hasattr(assigner, "task_groups") +class TestNotImplError(TestCase): def test_define_task_assignments(self): # TODO: define_task_assignments is defined as a mock in multiple fixtures, # which leads the function to behave as a mock here and other tests. @@ -61,9 +69,9 @@ def test_define_task_assignments(self): def test_get_tasks_for_collaborator(self): with self.assertRaises(NotImplementedError): assigner = Assigner(None, None, None) - assigner.get_tasks_for_collaborator('col1', 0) + assigner.get_tasks_for_collaborator("col1", 0) def test_get_collaborators_for_task(self): with self.assertRaises(NotImplementedError): assigner = Assigner(None, None, None) - assigner.get_collaborators_for_task('task_name', 0) + assigner.get_collaborators_for_task("task_name", 0) diff --git a/tests/openfl/component/assigner/test_random_grouped_assigner.py b/tests/openfl/component/assigner/test_random_grouped_assigner.py index 3245f2c036..f547fa6097 100644 --- a/tests/openfl/component/assigner/test_random_grouped_assigner.py +++ b/tests/openfl/component/assigner/test_random_grouped_assigner.py @@ -4,7 +4,7 @@ import pytest -from openfl.component.assigner import RandomGroupedAssigner +from openfl.component.assigner import RandomGroupedAssigner, Assigner ROUNDS_TO_TRAIN = 10 @@ -14,13 +14,20 @@ def task_groups(): """Initialize task groups.""" task_groups = [ { - 'name': 'train_and_validate', + 'name': 'learning', 'percentage': 1.0, 'tasks': [ 'aggregated_model_validation', 'train', 'locally_tuned_model_validation' ] + }, + { + 'name': 'evaluation', + 'percentage': 1.0, + 'tasks': [ + 'aggregated_model_validation' + ] } ] return task_groups @@ -35,12 +42,12 @@ def authorized_cols(): @pytest.fixture def assigner(task_groups, authorized_cols): """Initialize assigner.""" - assigner = RandomGroupedAssigner - - assigner = assigner(task_groups, - tasks=None, - authorized_cols=authorized_cols, - rounds_to_train=ROUNDS_TO_TRAIN) + assigner = RandomGroupedAssigner( + task_groups=task_groups, # Pass task_groups here + tasks=None, + authorized_cols=authorized_cols, + rounds_to_train=ROUNDS_TO_TRAIN + ) return assigner @@ -48,6 +55,29 @@ def test_define_task_assignments(assigner): """Test `define_task_assignments` is working.""" assigner.define_task_assignments() +def test_check_default_task_group(): + """Assert that by default learning task_group is assigned.""" + assigner = Assigner(None, None, None) + assert assigner.selected_task_group == 'learning' + +@pytest.mark.parametrize('round_number', range(ROUNDS_TO_TRAIN)) +def test_get_default_tasks_for_collaborator(assigner, task_groups, + authorized_cols, round_number): + """Test that assigner tasks correspond to task groups defined.""" + tasks = assigner.get_tasks_for_collaborator( + authorized_cols[0], round_number) + assert tasks == task_groups[0]['tasks'] + assert assigner.selected_task_group == task_groups[0]['name'] + +# @pytest.mark.parametrize('round_number', range(ROUNDS_TO_TRAIN)) +# def test_get_filtered_tasks_for_collaborator(assigner, task_groups, +# authorized_cols, round_number): +# """Test that assigner tasks correspond to task groups defined.""" +# assigner.selected_task_group=task_groups[1]['name'] +# assigner.define_task_assignments() +# tasks = assigner.get_tasks_for_collaborator( +# authorized_cols[0], round_number) +# assert tasks == task_groups[1]['tasks'] @pytest.mark.parametrize('round_number', range(ROUNDS_TO_TRAIN)) def test_get_tasks_for_collaborator(assigner, task_groups, diff --git a/tests/openfl/component/assigner/test_static_grouped_assigner.py b/tests/openfl/component/assigner/test_static_grouped_assigner.py index 8703c7b462..2a821fee20 100644 --- a/tests/openfl/component/assigner/test_static_grouped_assigner.py +++ b/tests/openfl/component/assigner/test_static_grouped_assigner.py @@ -20,7 +20,7 @@ def task_groups(authorized_cols): """Initialize task groups.""" task_groups = [ { - 'name': 'train_and_validate', + 'name': 'learning', 'percentage': 1.0, 'collaborators': authorized_cols, 'tasks': [ diff --git a/tests/openfl/interface/test_aggregator_api.py b/tests/openfl/interface/test_aggregator_api.py index 7986634368..9a010e3f10 100644 --- a/tests/openfl/interface/test_aggregator_api.py +++ b/tests/openfl/interface/test_aggregator_api.py @@ -23,9 +23,9 @@ def test_aggregator_start(mock_parse): mock_plan.get = {'task_group': 'learning'}.get # Add the config attribute with proper nesting mock_plan.config = { - 'aggregator': { + 'assigner': { 'settings': { - 'task_group': 'learning' + 'selected_task_group': 'learning' } } } @@ -50,9 +50,9 @@ def test_aggregator_start_illegal_plan(mock_parse, mock_is_directory_traversal): mock_plan.get = {'task_group': 'learning'}.get # Add the config attribute with proper nesting mock_plan.config = { - 'aggregator': { + 'assigner': { 'settings': { - 'task_group': 'learning' + 'selected_task_group': 'learning' } } } @@ -79,9 +79,9 @@ def test_aggregator_start_illegal_cols(mock_parse, mock_is_directory_traversal): mock_plan.get = {'task_group': 'learning'}.get # Add the config attribute with proper nesting mock_plan.config = { - 'aggregator': { + 'assigner': { 'settings': { - 'task_group': 'learning' + 'selected_task_group': 'learning' } } }