From 56d8817eab67b5e21137165ec9386e64faa2f819 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Wed, 22 Jan 2025 02:52:17 -0800 Subject: [PATCH] Add tests for Fed EVal Signed-off-by: Chaurasiya, Payal --- .../workflows/task_runner_eval_dws_e2e.yml | 150 +++++++++++ .github/workflows/task_runner_eval_e2e.yml | 158 ++++++++++++ tests/end_to_end/models/aggregator.py | 8 +- tests/end_to_end/models/model_owner.py | 14 +- .../test_suites/task_runner_tests.py | 10 + .../test_suites/tr_with_eval_tests.py | 107 ++++++++ tests/end_to_end/utils/federation_helper.py | 15 +- tests/end_to_end/utils/summary_helper.py | 28 ++- tests/end_to_end/utils/tr_common_fixtures.py | 206 +-------------- tests/end_to_end/utils/tr_workspace.py | 236 ++++++++++++++++++ 10 files changed, 716 insertions(+), 216 deletions(-) create mode 100644 .github/workflows/task_runner_eval_dws_e2e.yml create mode 100644 .github/workflows/task_runner_eval_e2e.yml create mode 100644 tests/end_to_end/test_suites/tr_with_eval_tests.py create mode 100644 tests/end_to_end/utils/tr_workspace.py diff --git a/.github/workflows/task_runner_eval_dws_e2e.yml b/.github/workflows/task_runner_eval_dws_e2e.yml new file mode 100644 index 0000000000..44f73b8c8d --- /dev/null +++ b/.github/workflows/task_runner_eval_dws_e2e.yml @@ -0,0 +1,150 @@ +--- +# Task Runner E2E tests for dockerized approach + +name: Task_Runner_Eval_DWS_E2E # Please do not modify the name as it is used in the composite action + +on: + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "5" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + +jobs: + test_with_tls_dockerized_ws: + name: tr_tls_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_eval_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_dockerized_ws" + + test_with_non_tls_dockerized_ws: + name: tr_non_tls_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_eval_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls_dockerized_ws" + + test_with_no_client_auth_dockerized_ws: + name: tr_no_client_auth_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_eval_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_no_client_auth_dockerized_ws" \ No newline at end of file diff --git a/.github/workflows/task_runner_eval_e2e.yml b/.github/workflows/task_runner_eval_e2e.yml new file mode 100644 index 0000000000..201c1cdff2 --- /dev/null +++ b/.github/workflows/task_runner_eval_e2e.yml @@ -0,0 +1,158 @@ +--- +# Task Runner E2E tests for bare metal approach + +name: Task_Runner_Eval_E2E # Please do not modify the name as it is used in the composite action + +on: + schedule: + - cron: "0 0 * * *" # Run every day at midnight + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "5" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + +jobs: + test_with_tls: + name: tr_tls + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + # Models like XGBoost (xgb_higgs) and torch_cnn_histology require runners with higher memory and CPU to run. + # Thus these models are excluded from the matrix for now. + model_name: ["torch_cnn_mnist", "keras_cnn_mnist"] + python_version: ["3.10", "3.11", "3.12"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_eval_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls" + + test_with_non_tls: + name: tr_non_tls + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + # Testing this scenario only for torch_cnn_mnist model and python 3.10 + # If required, this can be extended to other models and python versions + model_name: ["torch_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_eval_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls" + + test_with_no_client_auth: + name: tr_no_client_auth + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + # Testing this scenario for keras_cnn_mnist model and python 3.10 + # If required, this can be extended to other models and python versions + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_eval_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_no_client_auth" \ No newline at end of file diff --git a/tests/end_to_end/models/aggregator.py b/tests/end_to_end/models/aggregator.py index 795a15bed3..0cfc700455 100644 --- a/tests/end_to_end/models/aggregator.py +++ b/tests/end_to_end/models/aggregator.py @@ -19,7 +19,7 @@ class Aggregator(): 2. Starting the aggregator """ - def __init__(self, agg_domain_name=None, workspace_path=None, container_id=None): + def __init__(self, agg_domain_name=None, workspace_path=None, container_id=None, eval_scope=False): """ Initialize the Aggregator class Args: @@ -31,6 +31,7 @@ def __init__(self, agg_domain_name=None, workspace_path=None, container_id=None) self.agg_domain_name = agg_domain_name self.workspace_path = workspace_path self.container_id = container_id + self.eval_scope = eval_scope def generate_sign_request(self): """ @@ -63,8 +64,11 @@ def start(self, res_file, with_docker=False): log.info(f"Starting {self.name}") res_file = res_file if not with_docker else os.path.basename(res_file) error_msg = "Failed to start the aggregator" + command = "fx aggregator start" + if self.eval_scope: + command = f"{command} --task_group evaluation" fh.run_command( - "fx aggregator start", + command=command, error_msg=error_msg, container_id=self.container_id, workspace_path=self.workspace_path if not with_docker else "", diff --git a/tests/end_to_end/models/model_owner.py b/tests/end_to_end/models/model_owner.py index f2d77f360a..e9d665cb40 100644 --- a/tests/end_to_end/models/model_owner.py +++ b/tests/end_to_end/models/model_owner.py @@ -122,7 +122,7 @@ def certify_collaborator(self, collaborator_name, zip_name): raise e return True - def modify_plan(self, param_config, plan_path): + def modify_plan(self, param_config, plan_path, eval_scope=False): """ Modify the plan to train the model Args: @@ -153,9 +153,19 @@ def modify_plan(self, param_config, plan_path): data["network"]["settings"]["require_client_auth"] = param_config.require_client_auth data["network"]["settings"]["use_tls"] = param_config.use_tls + if eval_scope: + # Remove all existing task_groups + data['assigner']['settings']['task_groups'] = [] + # Add new task_groups for evaluation scope with task as aggregated_model_validation + new_task_group = { + "name": "evaluation", + "percentage": 1.0, + "tasks": ["aggregated_model_validation"] + } + data['assigner']['settings']['task_groups'].append(new_task_group) + with open(plan_file, "w+") as write_file: yaml.dump(data, write_file) - log.info(f"Modified the plan with provided parameters.") except Exception as e: log.error(f"Failed to modify the plan: {e}") diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py index a6df29af3a..63a018877e 100644 --- a/tests/end_to_end/test_suites/task_runner_tests.py +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -3,12 +3,14 @@ import pytest import logging +import os from tests.end_to_end.utils.tr_common_fixtures import ( fx_federation_tr, fx_federation_tr_dws, ) from tests.end_to_end.utils import federation_helper as fed_helper +from tests.end_to_end.utils.summary_helper import get_aggregated_accuracy log = logging.getLogger(__name__) @@ -32,6 +34,10 @@ def test_federation_via_native(request, fx_federation_tr): num_rounds=request.config.num_rounds, ), "Federation completion failed" + metric_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") + @pytest.mark.task_runner_dockerized_ws def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): @@ -53,3 +59,7 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): test_env=request.config.test_env, num_rounds=request.config.num_rounds, ), "Federation completion failed" + + metric_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") diff --git a/tests/end_to_end/test_suites/tr_with_eval_tests.py b/tests/end_to_end/test_suites/tr_with_eval_tests.py new file mode 100644 index 0000000000..2c7c97546f --- /dev/null +++ b/tests/end_to_end/test_suites/tr_with_eval_tests.py @@ -0,0 +1,107 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import os +import logging + +from tests.end_to_end.utils.tr_common_fixtures import ( + fx_federation_tr, + fx_federation_tr_dws, +) +from tests.end_to_end.utils import federation_helper as fed_helper +from tests.end_to_end.utils.tr_workspace import create_tr_workspace, create_tr_workspace_dws +from tests.end_to_end.utils.summary_helper import get_aggregated_accuracy + +log = logging.getLogger(__name__) + + +@pytest.mark.task_runner_basic +def test_eval_federation_via_native(request, fx_federation_tr): + """ + Test federation via native task runner. + Args: + request (Fixture): Pytest fixture + fx_federation_tr (Fixture): Pytest fixture for native task runner + """ + # Start the federation + results = fed_helper.run_federation(fx_federation_tr) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + fx_federation_tr, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" + + # Set the best model path + request.config.best_model_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "save", "best.pbuf") + metric_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") + # Create new workspace with evaluation scope + new_fed_obj = create_tr_workspace(request, eval_scope=True) + + results_new = fed_helper.run_federation(new_fed_obj) + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + new_fed_obj, + results_new, + test_env=request.config.test_env, + num_rounds=1, + ), "Federation completion failed" + + new_metric_file_path = os.path.join(new_fed_obj.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy_eval = get_aggregated_accuracy(new_metric_file_path) + log.info(f"Model accuracy during evaluation only on prev trained model is : {model_accuracy_eval})") + + # verify that the model accuracy is similar to the previous model accuracy max of 1% difference + assert abs(model_accuracy - model_accuracy_eval) <= 0.01, "Model accuracy is not similar to the previous model accuracy" + + +@pytest.mark.task_runner_dockerized_ws +def test_eval_federation_via_dockerized_workspace(request, fx_federation_tr_dws): + """ + Test federation via dockerized workspace. + Args: + request (Fixture): Pytest fixture + fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace + """ + # Start the federation + results = fed_helper.run_federation_for_dws( + fx_federation_tr_dws, use_tls=request.config.use_tls + ) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + fx_federation_tr_dws, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" + + # Set the best model path + request.config.best_model_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "save", "best.pbuf") + metric_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") + + # Create new workspace with evaluation scope + new_fed_obj = create_tr_workspace_dws(request, eval_scope=True) + + results_new = fed_helper.run_federation_for_dws(new_fed_obj) + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + new_fed_obj, + results_new, + test_env=request.config.test_env, + num_rounds=1, + ), "Federation completion failed" + + new_metric_file_path = os.path.join(new_fed_obj.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy_eval = get_aggregated_accuracy(new_metric_file_path) + log.info(f"Model accuracy during evaluation only on prev trained model is : {model_accuracy_eval})") + + # verify that the model accuracy is similar to the previous model accuracy max of 1% difference + assert abs(model_accuracy - model_accuracy_eval) <= 0.01, "Model accuracy is not similar to the previous model accuracy" diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 9bc3a4dcd1..39f47638c7 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -448,7 +448,7 @@ def _verify_completion_for_participant( return True -def federation_env_setup_and_validate(request): +def federation_env_setup_and_validate(request, eval_scope=False): """ Setup the federation environment and validate the configurations Args: @@ -470,10 +470,19 @@ def federation_env_setup_and_validate(request): local_bind_path = os.path.join( home_dir, request.config.results_dir, request.config.model_name ) + num_rounds = request.config.num_rounds + + if eval_scope: + local_bind_path = f"{local_bind_path}_eval" + num_rounds = 1 + log.info(f"Running evaluation for the model: {request.config.model_name}") + workspace_path = local_bind_path + # if path exists delete it + if os.path.exists(workspace_path): + shutil.rmtree(workspace_path) if test_env == "task_runner_dockerized_ws": - agg_domain_name = "aggregator" # Cleanup docker containers dh.cleanup_docker_containers() @@ -483,7 +492,7 @@ def federation_env_setup_and_validate(request): log.info( f"Running federation setup using {test_env} API on single machine with below configurations:\n" f"\tNumber of collaborators: {request.config.num_collaborators}\n" - f"\tNumber of rounds: {request.config.num_rounds}\n" + f"\tNumber of rounds: {num_rounds}\n" f"\tModel name: {request.config.model_name}\n" f"\tClient authentication: {request.config.require_client_auth}\n" f"\tTLS: {request.config.use_tls}\n" diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index 0ed8aa0a3a..d47abd1f61 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -11,19 +11,25 @@ import tests.end_to_end.utils.constants as constants from tests.end_to_end.utils.generate_report import convert_to_json -# Initialize the XML parser -parser = etree.XMLParser(recover=True, encoding="utf-8") - result_path = os.path.join(Path().home(), "results") -result_xml = os.path.join(result_path, "results.xml") -if not os.path.exists(result_xml): - print(f"Results XML file not found at {result_xml}. Exiting...") - exit(1) -tree = defused_parse(result_xml, parser=parser) +def initialize_xml_parser(): + """ + Initialize the XML parser and parse the results XML file. + Returns: + testsuites: the root element of the parsed XML tree + """ + parser = etree.XMLParser(recover=True, encoding="utf-8") + result_xml = os.path.join(result_path, "results.xml") + if not os.path.exists(result_xml): + print(f"Results XML file not found at {result_xml}. Exiting...") + exit(1) + + tree = defused_parse(result_xml, parser=parser) -# Get the root element -testsuites = tree.getroot() + # Get the root element + testsuites = tree.getroot() + return testsuites, result_path def get_aggregated_accuracy(agg_log_file): @@ -75,6 +81,8 @@ def get_testcase_result(): """ database_list = [] status = None + # Initialize the XML parser + testsuites = initialize_xml_parser() # Iterate over each testsuite in testsuites for testsuite in testsuites: # Populate testcase details in a dictionary diff --git a/tests/end_to_end/utils/tr_common_fixtures.py b/tests/end_to_end/utils/tr_common_fixtures.py index 88e02e631c..54bd6e3339 100644 --- a/tests/end_to_end/utils/tr_common_fixtures.py +++ b/tests/end_to_end/utils/tr_common_fixtures.py @@ -2,25 +2,19 @@ # SPDX-License-Identifier: Apache-2.0 import pytest -import collections -import concurrent.futures +pass +pass import logging import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.federation_helper as fh import tests.end_to_end.utils.ssh_helper as ssh from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model +from tests.end_to_end.utils.tr_workspace import create_tr_workspace, create_tr_workspace_dws log = logging.getLogger(__name__) -# Define a named tuple to store the objects for model owner, aggregator, and collaborators -federation_fixture = collections.namedtuple( - "federation_fixture", - "model_owner, aggregator, collaborators, workspace_path, local_bind_path", -) - - @pytest.fixture(scope="function") def fx_federation_tr(request): """ @@ -30,92 +24,12 @@ def fx_federation_tr(request): Args: request: pytest request object. Model name is passed as a parameter to the fixture from test cases. Returns: - federation_fixture: Named tuple containing the objects for model owner, aggregator, and collaborators + federation_details: Named tuple containing the objects for model owner, aggregator, and collaborators Note: As this is a function level fixture, thus no import is required at test level. """ request.config.test_env = "task_runner_basic" - - collaborators = [] - executor = concurrent.futures.ThreadPoolExecutor() - - model_name, workspace_path, local_bind_path, agg_domain_name = ( - fh.federation_env_setup_and_validate(request) - ) - - agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) - - # Create model owner object and the workspace for the model - # Workspace name will be same as the model name - model_owner = mo_model.ModelOwner( - model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path - ) - - # Create workspace for given model name - fh.create_persistent_store(model_owner.name, local_bind_path) - - model_owner.create_workspace() - fh.add_local_workspace_permission(local_bind_path) - - # Modify the plan - plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) - model_owner.modify_plan(param_config=request.config, plan_path=plan_path) - - # Initialize the plan - model_owner.initialize_plan(agg_domain_name=agg_domain_name) - - # Certify the workspace in case of TLS - # Register the collaborators in case of non-TLS - if request.config.use_tls: - model_owner.certify_workspace() - else: - model_owner.register_collaborators(plan_path, request.config.num_collaborators) - - # Create the objects for aggregator and collaborators - # Workspace path for aggregator is uniform in case of docker or task_runner - # But, for collaborators, it is different - aggregator = agg_model.Aggregator( - agg_domain_name=agg_domain_name, - workspace_path=agg_workspace_path, - container_id=model_owner.container_id, # None in case of non-docker environment - ) - - # Generate the sign request and certify the aggregator in case of TLS - if request.config.use_tls: - aggregator.generate_sign_request() - model_owner.certify_aggregator(agg_domain_name) - - # Export the workspace - # By default the workspace will be exported to workspace.zip - model_owner.export_workspace() - - futures = [ - executor.submit( - fh.setup_collaborator, - index, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) - for index in range(1, request.config.num_collaborators+1) - ] - collaborators = [f.result() for f in futures] - - # Data setup requires total no of collaborators, thus keeping the function call outside of the loop - if model_name.lower() == "xgb_higgs": - fh.setup_collaborator_data(collaborators, model_name, local_bind_path) - - if request.config.use_tls: - fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) - fh.import_pki_for_collaborators(collaborators, local_bind_path) - - # Return the federation fixture - return federation_fixture( - model_owner=model_owner, - aggregator=aggregator, - collaborators=collaborators, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) + return create_tr_workspace(request) @pytest.fixture(scope="function") @@ -127,115 +41,9 @@ def fx_federation_tr_dws(request): Args: request: pytest request object. Model name is passed as a parameter to the fixture from test cases. Returns: - federation_fixture: Named tuple containing the objects for model owner, aggregator, and collaborators + federation_details: Named tuple containing the objects for model owner, aggregator, and collaborators Note: As this is a function level fixture, thus no import is required at test level. """ request.config.test_env = "task_runner_dockerized_ws" - - collaborators = [] - executor = concurrent.futures.ThreadPoolExecutor() - - model_name, workspace_path, local_bind_path, agg_domain_name = ( - fh.federation_env_setup_and_validate(request) - ) - - agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) - - # Create model owner object and the workspace for the model - # Workspace name will be same as the model name - model_owner = mo_model.ModelOwner( - model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path - ) - - # Create workspace for given model name - fh.create_persistent_store(model_owner.name, local_bind_path) - - model_owner.create_workspace() - fh.add_local_workspace_permission(local_bind_path) - - # Modify the plan - plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) - model_owner.modify_plan(param_config=request.config, plan_path=plan_path) - - # Initialize the plan - model_owner.initialize_plan(agg_domain_name=agg_domain_name) - - # Command 'fx workspace dockerize --save ..' will use the workspace name for image name - # which is 'workspace' in this case. - model_owner.dockerize_workspace() - image_name = "workspace" - - # Certify the workspace in case of TLS - # Register the collaborators in case of non-TLS - if request.config.use_tls: - model_owner.certify_workspace() - else: - model_owner.register_collaborators(plan_path, request.config.num_collaborators) - - # Create the objects for aggregator and collaborators - # Workspace path for aggregator is uniform in case of docker or task_runner - # But, for collaborators, it is different - aggregator = agg_model.Aggregator( - agg_domain_name=agg_domain_name, - workspace_path=agg_workspace_path, - container_id=model_owner.container_id, # None in case of non-docker environment - ) - - futures = [ - executor.submit( - fh.setup_collaborator, - index, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) - for index in range(1, request.config.num_collaborators+1) - ] - collaborators = [f.result() for f in futures] - - if request.config.use_tls: - fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) - - # Data setup requires total no of collaborators, thus keeping the function call outside of the loop - if model_name.lower() == "xgb_higgs": - fh.setup_collaborator_data(collaborators, model_name, local_bind_path) - - # Note: In case of multiple machines setup, scp the created tar for collaborators to the other machine(s) - fh.create_tarball_for_collaborators( - collaborators, local_bind_path, use_tls=request.config.use_tls, - add_data=True if model_name.lower() == "xgb_higgs" else False - ) - - # Generate the sign request and certify the aggregator in case of TLS - if request.config.use_tls: - aggregator.generate_sign_request() - model_owner.certify_aggregator(agg_domain_name) - - local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) - - # Note: In case of multiple machines setup, scp this tar to the other machine(s) - return_code, output, error = ssh.run_command( - f"tar -cf cert_agg.tar plan cert save", work_dir=local_agg_ws_path - ) - if return_code != 0: - raise Exception(f"Failed to create tar for aggregator: {error}") - - # Note: In case of multiple machines setup, scp this workspace tar - # to the other machine(s) so that docker load can load the image. - model_owner.load_workspace(workspace_tar_name=f"{image_name}.tar") - - fh.start_docker_containers_for_dws( - participants=[aggregator] + collaborators, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - image_name=image_name, - ) - - # Return the federation fixture - return federation_fixture( - model_owner=model_owner, - aggregator=aggregator, - collaborators=collaborators, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) + return create_tr_workspace_dws(request) diff --git a/tests/end_to_end/utils/tr_workspace.py b/tests/end_to_end/utils/tr_workspace.py new file mode 100644 index 0000000000..f5ba90f9ab --- /dev/null +++ b/tests/end_to_end/utils/tr_workspace.py @@ -0,0 +1,236 @@ +import collections +import concurrent.futures +import logging +import os +import shutil + +import tests.end_to_end.utils.constants as constants +import tests.end_to_end.utils.federation_helper as fh +import tests.end_to_end.utils.ssh_helper as ssh +from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model + +log = logging.getLogger(__name__) + + +# Define a named tuple to store the objects for model owner, aggregator, and collaborators +federation_details = collections.namedtuple( + "federation_details", + "model_owner, aggregator, collaborators, workspace_path, local_bind_path", +) + + +def create_tr_workspace(request, eval_scope=False): + collaborators = [] + executor = concurrent.futures.ThreadPoolExecutor() + + model_name, workspace_path, local_bind_path, agg_domain_name = ( + fh.federation_env_setup_and_validate(request, eval_scope) + ) + + agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) + + # Create model owner object and the workspace for the model + # Workspace name will be same as the model name + model_owner = mo_model.ModelOwner( + model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path + ) + + # Create workspace for given model name + fh.create_persistent_store(model_owner.name, local_bind_path) + + model_owner.create_workspace() + fh.add_local_workspace_permission(local_bind_path) + + # Modify the plan + plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) + param_config=request.config + + if eval_scope: + log.info("Setting up evaluation scope, so update the plan for 1 round") + param_config.num_rounds = 1 + + model_owner.modify_plan(param_config, plan_path=plan_path, eval_scope=eval_scope) + + # Initialize the plan + model_owner.initialize_plan(agg_domain_name=agg_domain_name) + + if eval_scope: + # remove initial model and replace with best model of previous round + os.remove(os.path.join(agg_workspace_path, "save", "init.pbuf")) + shutil.copy(request.config.best_model_path, os.path.join(agg_workspace_path, "save", "init.pbuf")) + + # Certify the workspace in case of TLS + # Register the collaborators in case of non-TLS + if request.config.use_tls: + model_owner.certify_workspace() + else: + model_owner.register_collaborators(plan_path, request.config.num_collaborators) + + # Create the objects for aggregator and collaborators + # Workspace path for aggregator is uniform in case of docker or task_runner + # But, for collaborators, it is different + aggregator = agg_model.Aggregator( + agg_domain_name=agg_domain_name, + workspace_path=agg_workspace_path, + container_id=model_owner.container_id, # None in case of non-docker environment + eval_scope=eval_scope + ) + + # Generate the sign request and certify the aggregator in case of TLS + if request.config.use_tls: + aggregator.generate_sign_request() + model_owner.certify_aggregator(agg_domain_name) + + # Export the workspace + # By default the workspace will be exported to workspace.zip + model_owner.export_workspace() + + futures = [ + executor.submit( + fh.setup_collaborator, + index, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + for index in range(1, request.config.num_collaborators+1) + ] + collaborators = [f.result() for f in futures] + + # Data setup requires total no of collaborators, thus keeping the function call outside of the loop + if model_name.lower() == "xgb_higgs": + fh.setup_collaborator_data(collaborators, model_name, local_bind_path) + + if request.config.use_tls: + fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) + fh.import_pki_for_collaborators(collaborators, local_bind_path) + + # Return the federation fixture + return federation_details( + model_owner=model_owner, + aggregator=aggregator, + collaborators=collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + + +def create_tr_workspace_dws(request, eval_scope=False): + + collaborators = [] + executor = concurrent.futures.ThreadPoolExecutor() + + model_name, workspace_path, local_bind_path, agg_domain_name = ( + fh.federation_env_setup_and_validate(request, eval_scope) + ) + + agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) + + # Create model owner object and the workspace for the model + # Workspace name will be same as the model name + model_owner = mo_model.ModelOwner( + model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path + ) + + # Create workspace for given model name + fh.create_persistent_store(model_owner.name, local_bind_path) + + model_owner.create_workspace() + fh.add_local_workspace_permission(local_bind_path) + + # Modify the plan + plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) + param_config=request.config + + if eval_scope: + log.info("Setting up evaluation scope, so update the plan for 1 round") + param_config.num_rounds = 1 + + model_owner.modify_plan(param_config, plan_path=plan_path, eval_scope=eval_scope) + + # Initialize the plan + model_owner.initialize_plan(agg_domain_name=agg_domain_name) + + if eval_scope: + # remove initial model and replace with best model of previous round + os.remove(os.path.join(agg_workspace_path, "save", "init.pbuf")) + shutil.copy(request.config.best_model_path, os.path.join(agg_workspace_path, "save", "init.pbuf")) + + # Command 'fx workspace dockerize --save ..' will use the workspace name for image name + # which is 'workspace' in this case. + model_owner.dockerize_workspace() + image_name = "workspace" + + # Certify the workspace in case of TLS + # Register the collaborators in case of non-TLS + if request.config.use_tls: + model_owner.certify_workspace() + else: + model_owner.register_collaborators(plan_path, request.config.num_collaborators) + + # Create the objects for aggregator and collaborators + # Workspace path for aggregator is uniform in case of docker or task_runner + # But, for collaborators, it is different + aggregator = agg_model.Aggregator( + agg_domain_name=agg_domain_name, + workspace_path=agg_workspace_path, + container_id=model_owner.container_id, # None in case of non-docker environment + eval_scope=eval_scope + ) + + futures = [ + executor.submit( + fh.setup_collaborator, + index, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + for index in range(1, request.config.num_collaborators+1) + ] + collaborators = [f.result() for f in futures] + + if request.config.use_tls: + fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) + + # Data setup requires total no of collaborators, thus keeping the function call outside of the loop + if model_name.lower() == "xgb_higgs": + fh.setup_collaborator_data(collaborators, model_name, local_bind_path) + + # Note: In case of multiple machines setup, scp the created tar for collaborators to the other machine(s) + fh.create_tarball_for_collaborators( + collaborators, local_bind_path, use_tls=request.config.use_tls, + add_data=True if model_name.lower() == "xgb_higgs" else False + ) + + # Generate the sign request and certify the aggregator in case of TLS + if request.config.use_tls: + aggregator.generate_sign_request() + model_owner.certify_aggregator(agg_domain_name) + + local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) + + # Note: In case of multiple machines setup, scp this tar to the other machine(s) + return_code, output, error = ssh.run_command( + f"tar -cf cert_agg.tar plan cert save", work_dir=local_agg_ws_path + ) + if return_code != 0: + raise Exception(f"Failed to create tar for aggregator: {error}") + + # Note: In case of multiple machines setup, scp this workspace tar + # to the other machine(s) so that docker load can load the image. + model_owner.load_workspace(workspace_tar_name=f"{image_name}.tar") + + fh.start_docker_containers_for_dws( + participants=[aggregator] + collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + image_name=image_name, + ) + + # Return the federation fixture + return federation_details( + model_owner=model_owner, + aggregator=aggregator, + collaborators=collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + )