diff --git a/.github/workflows/task_runner_fedeval_dws_e2e.yml b/.github/workflows/task_runner_fedeval_dws_e2e.yml new file mode 100644 index 0000000000..85e26a4443 --- /dev/null +++ b/.github/workflows/task_runner_fedeval_dws_e2e.yml @@ -0,0 +1,181 @@ +--- +# Task Runner E2E Evaluation tests for dockerized approach + +name: Task_Runner_FedEval_DWS_E2E # Please do not modify the name as it is used in the composite action + +on: + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "5" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + job_to_run: + description: "Job to run (tls, non_tls, no_client_auth, all)" + required: true + default: "all" + type: choice + options: + - tls + - non_tls + - no_client_auth + - all + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + JOB_TO_RUN: ${{ inputs.job_to_run || 'all' }} + +jobs: + input_selection: + if: | + (github.event_name == 'schedule' && github.repository_owner == 'securefederatedai') || + (github.event_name == 'workflow_dispatch') + name: Input value selection + runs-on: ubuntu-22.04 + outputs: + selected_jobs: ${{ env.JOB_TO_RUN }} + steps: + - name: Job to select input values + id: input_selection + run: | + echo "jobs_to_run=${{ env.JOBS_TO_RUN }}" + + test_with_tls_dockerized_ws: + name: tr_tls_dockerized_ws + needs: input_selection + runs-on: ubuntu-22.04 + timeout-minutes: 15 + if: needs.input_selection.outputs.selected_jobs == 'tls' || needs.input_selection.outputs.selected_jobs == 'all' + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_fedeval_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_dockerized_ws" + + test_with_non_tls_dockerized_ws: + name: tr_non_tls_dockerized_ws + needs: input_selection + runs-on: ubuntu-22.04 + timeout-minutes: 15 + if: needs.input_selection.outputs.selected_jobs == 'non_tls' || needs.input_selection.outputs.selected_jobs == 'all' + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_fedeval_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls_dockerized_ws" + + test_with_no_client_auth_dockerized_ws: + name: tr_no_client_auth_dockerized_ws + needs: input_selection + runs-on: ubuntu-22.04 + timeout-minutes: 15 + if: needs.input_selection.outputs.selected_jobs == 'no_client_auth' || needs.input_selection.outputs.selected_jobs == 'all' + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_fedeval_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_no_client_auth_dockerized_ws" \ No newline at end of file diff --git a/.github/workflows/task_runner_fedeval_e2e.yml b/.github/workflows/task_runner_fedeval_e2e.yml new file mode 100644 index 0000000000..fce6408686 --- /dev/null +++ b/.github/workflows/task_runner_fedeval_e2e.yml @@ -0,0 +1,158 @@ +--- +# Task Runner E2E tests with evaluation with bare metal approach + +name: Task_Runner_FedEval_E2E # Please do not modify the name as it is used in the composite action + +on: + schedule: + - cron: "0 0 * * *" # Run every day at midnight + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "5" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + +jobs: + test_with_tls: + name: tr_tls + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + # Models like XGBoost (xgb_higgs) and torch_cnn_histology require runners with higher memory and CPU to run. + # Thus these models are excluded from the matrix for now. + model_name: ["torch_cnn_mnist", "keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_fedeval_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls" + + test_with_non_tls: + name: tr_non_tls + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + # Testing this scenario only for torch_cnn_mnist model and python 3.10 + # If required, this can be extended to other models and python versions + model_name: ["torch_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_fedeval_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls" + + test_with_no_client_auth: + name: tr_no_client_auth + runs-on: ubuntu-22.04 + timeout-minutes: 30 + strategy: + matrix: + # Testing this scenario for keras_cnn_mnist model and python 3.10 + # If required, this can be extended to other models and python versions + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/tr_with_fedeval_tests.py \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_no_client_auth" \ No newline at end of file diff --git a/tests/end_to_end/models/aggregator.py b/tests/end_to_end/models/aggregator.py index 795a15bed3..29c184781c 100644 --- a/tests/end_to_end/models/aggregator.py +++ b/tests/end_to_end/models/aggregator.py @@ -19,18 +19,20 @@ class Aggregator(): 2. Starting the aggregator """ - def __init__(self, agg_domain_name=None, workspace_path=None, container_id=None): + def __init__(self, agg_domain_name=None, workspace_path=None, container_id=None, eval_scope=False): """ Initialize the Aggregator class Args: agg_domain_name (str): Aggregator domain name workspace_path (str): Workspace path container_id (str): Container ID + eval_scope (bool, optional): Scope of aggregator is evaluation. Default is False. """ self.name = "aggregator" self.agg_domain_name = agg_domain_name self.workspace_path = workspace_path self.container_id = container_id + self.eval_scope = eval_scope def generate_sign_request(self): """ @@ -63,8 +65,11 @@ def start(self, res_file, with_docker=False): log.info(f"Starting {self.name}") res_file = res_file if not with_docker else os.path.basename(res_file) error_msg = "Failed to start the aggregator" + command = "fx aggregator start" + if self.eval_scope: + command = f"{command} --task_group evaluation" fh.run_command( - "fx aggregator start", + command=command, error_msg=error_msg, container_id=self.container_id, workspace_path=self.workspace_path if not with_docker else "", diff --git a/tests/end_to_end/models/model_owner.py b/tests/end_to_end/models/model_owner.py index f2d77f360a..d03b493cbb 100644 --- a/tests/end_to_end/models/model_owner.py +++ b/tests/end_to_end/models/model_owner.py @@ -122,7 +122,7 @@ def certify_collaborator(self, collaborator_name, zip_name): raise e return True - def modify_plan(self, param_config, plan_path): + def modify_plan(self, param_config, plan_path, eval_scope=False): """ Modify the plan to train the model Args: @@ -138,7 +138,7 @@ def modify_plan(self, param_config, plan_path): try: with open(plan_file) as fp: - data = yaml.load(fp, Loader=yaml.FullLoader) + data = yaml.safe_load(fp) # NOTE: If more parameters need to be modified, add them here data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train) @@ -153,15 +153,26 @@ def modify_plan(self, param_config, plan_path): data["network"]["settings"]["require_client_auth"] = param_config.require_client_auth data["network"]["settings"]["use_tls"] = param_config.use_tls + if eval_scope: + # Remove all existing task_groups and set num_rounds to 1 + data["assigner"]["settings"]["task_groups"] = [] + # Add new task_groups for evaluation scope with task as aggregated_model_validation + new_task_group = { + "name": "evaluation", + "percentage": 1.0, + "tasks": ["aggregated_model_validation"] + } + data["assigner"]["settings"]["task_groups"].append(new_task_group) + data["aggregator"]["settings"]["rounds_to_train"] = 1 + with open(plan_file, "w+") as write_file: yaml.dump(data, write_file) - log.info(f"Modified the plan with provided parameters.") except Exception as e: log.error(f"Failed to modify the plan: {e}") raise ex.PlanModificationException(f"Failed to modify the plan: {e}") - def initialize_plan(self, agg_domain_name): + def initialize_plan(self, agg_domain_name, initial_model_path=None): """ Initialize the plan Args: @@ -170,6 +181,8 @@ def initialize_plan(self, agg_domain_name): try: log.info("Initializing the plan. It will take some time to complete..") cmd = f"fx plan initialize -a {agg_domain_name}" + if initial_model_path: + cmd += f" -i {initial_model_path}" error_msg="Failed to initialize the plan" return_code, output, error = fh.run_command( cmd, @@ -271,7 +284,7 @@ def register_collaborators(self, plan_path, num_collaborators=None): # This way even if there is a mismatch with some models having it blank # and others having values, it will be consistent with open(cols_file, "r", encoding="utf-8") as f: - doc = yaml.load(f, Loader=yaml.FullLoader) + doc = yaml.safe_load(f) doc["collaborators"] = [] # Create empty list diff --git a/tests/end_to_end/test_suites/memory_logs_tests.py b/tests/end_to_end/test_suites/memory_logs_tests.py index e4b8c3f128..09083c9512 100644 --- a/tests/end_to_end/test_suites/memory_logs_tests.py +++ b/tests/end_to_end/test_suites/memory_logs_tests.py @@ -68,11 +68,6 @@ def _log_memory_usage(request, fed_obj): # Verify the aggregator memory logs aggregator_memory_usage_file = constants.AGG_MEM_USAGE_JSON.format(fed_obj.workspace_path) - if request.config.test_env == "task_runner_dockerized_ws": - ssh.copy_file_from_docker( - "aggregator", f"/workspace/logs/aggregator_memory_usage.json", aggregator_memory_usage_file - ) - assert os.path.exists( aggregator_memory_usage_file ), "Aggregator memory usage file is not available" diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py index a6df29af3a..5520099279 100644 --- a/tests/end_to_end/test_suites/task_runner_tests.py +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -3,12 +3,14 @@ import pytest import logging +import os from tests.end_to_end.utils.tr_common_fixtures import ( fx_federation_tr, fx_federation_tr_dws, ) from tests.end_to_end.utils import federation_helper as fed_helper +from tests.end_to_end.utils.summary_helper import get_aggregated_accuracy log = logging.getLogger(__name__) @@ -32,6 +34,10 @@ def test_federation_via_native(request, fx_federation_tr): num_rounds=request.config.num_rounds, ), "Federation completion failed" + metric_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") + @pytest.mark.task_runner_dockerized_ws def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): @@ -53,3 +59,7 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): test_env=request.config.test_env, num_rounds=request.config.num_rounds, ), "Federation completion failed" + + metric_file_path = os.path.join(fx_federation_tr_dws.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") diff --git a/tests/end_to_end/test_suites/tr_with_fedeval_tests.py b/tests/end_to_end/test_suites/tr_with_fedeval_tests.py new file mode 100644 index 0000000000..71d4ce4b8f --- /dev/null +++ b/tests/end_to_end/test_suites/tr_with_fedeval_tests.py @@ -0,0 +1,107 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import os +import logging + +from tests.end_to_end.utils.tr_common_fixtures import ( + fx_federation_tr, + fx_federation_tr_dws, +) +from tests.end_to_end.utils import federation_helper as fed_helper +from tests.end_to_end.utils.tr_workspace import create_tr_workspace, create_tr_dws_workspace +from tests.end_to_end.utils.summary_helper import get_aggregated_accuracy + +log = logging.getLogger(__name__) + + +@pytest.mark.task_runner_basic +def test_eval_federation_via_native(request, fx_federation_tr): + """ + Test learning and evaluation steps via native task runner. + Args: + request (Fixture): Pytest fixture + fx_federation_tr (Fixture): Pytest fixture for native task runner + """ + # Start the federation + results = fed_helper.run_federation(fx_federation_tr) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + fx_federation_tr, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" + + # Set the best model path in request. It is used during plan initialization for evaluation step + request.config.best_model_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "save", "best.pbuf") + metric_file_path = os.path.join(fx_federation_tr.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") + # Create new workspace with evaluation scope + new_fed_obj = create_tr_workspace(request, eval_scope=True) + + results_new = fed_helper.run_federation(new_fed_obj) + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + new_fed_obj, + results_new, + test_env=request.config.test_env, + num_rounds=1, + ), "Federation completion failed" + + new_metric_file_path = os.path.join(new_fed_obj.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy_eval = get_aggregated_accuracy(new_metric_file_path) + log.info(f"Model accuracy during evaluation only on prev trained model is : {model_accuracy_eval})") + + # verify that the model accuracy is similar to the previous model accuracy max of 1% difference + assert abs(model_accuracy - model_accuracy_eval) <= 0.01, "Model accuracy is not similar to the previous model accuracy" + + +@pytest.mark.task_runner_dockerized_ws +def test_eval_federation_via_dockerized_workspace(request, fx_federation_tr_dws): + """ + Test learning and evaluation steps via dockerized workspace. + Args: + request (Fixture): Pytest fixture + fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace + """ + # Start the federation + results = fed_helper.run_federation_for_dws( + fx_federation_tr_dws, use_tls=request.config.use_tls + ) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + fx_federation_tr_dws, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" + + # Set the best model path in request. It is used during plan initialization for evaluation step + request.config.best_model_path = os.path.join(fx_federation_tr_dws.aggregator.workspace_path, "save", "best.pbuf") + metric_file_path = os.path.join(fx_federation_tr_dws.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy = get_aggregated_accuracy(metric_file_path) + log.info(f"Model accuracy post {request.config.num_rounds} rounds: {model_accuracy}") + + # Create new workspace with evaluation scope + new_fed_obj = create_tr_dws_workspace(request, eval_scope=True) + + results_new = fed_helper.run_federation_for_dws(new_fed_obj, use_tls=request.config.use_tls) + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + new_fed_obj, + results_new, + test_env=request.config.test_env, + num_rounds=1, + ), "Federation completion failed" + + new_metric_file_path = os.path.join(new_fed_obj.aggregator.workspace_path, "logs", "aggregator_metrics.txt") + model_accuracy_eval = get_aggregated_accuracy(new_metric_file_path) + log.info(f"Model accuracy during evaluation only on prev trained model is : {model_accuracy_eval})") + + # verify that the model accuracy is similar to the previous model accuracy max of 1% difference + assert abs(model_accuracy - model_accuracy_eval) <= 0.01, "Model accuracy is not similar to the previous model accuracy" diff --git a/tests/end_to_end/utils/docker_helper.py b/tests/end_to_end/utils/docker_helper.py index d8178652c0..f69e494cbb 100644 --- a/tests/end_to_end/utils/docker_helper.py +++ b/tests/end_to_end/utils/docker_helper.py @@ -94,7 +94,7 @@ def start_docker_container( docker_participant_path = mount_mapping[0].split(":")[1] else: local_participant_path = os.path.join(local_bind_path, container_name, "workspace") - docker_participant_path = f"{workspace_path}/{container_name}/workspace" + docker_participant_path = "/workspace" volumes = { local_participant_path: {"bind": docker_participant_path, "mode": "rw"}, diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 9bc3a4dcd1..067d95918d 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -134,7 +134,7 @@ def _create_tarball(collaborator_name, data_file_path, local_bind_path, add_data local_bind_path, collaborator_name ) client_cert_entries = "" - tarfiles = f"cert_col_{collaborator_name}.tar plan/data.yaml" + tarfiles = f"cert_{collaborator_name}.tar plan/data.yaml" # If TLS is enabled, client certificates and signed certificates are also included if use_tls: client_cert_entries = [ @@ -262,7 +262,7 @@ def run_federation(fed_obj, install_dependencies=True, with_docker=False): ), with_docker=with_docker, ) - for participant in fed_obj.collaborators + [fed_obj.aggregator] + for participant in [fed_obj.aggregator] + fed_obj.collaborators ] # Result will contain response files for all the participants. @@ -287,7 +287,7 @@ def run_federation_for_dws(fed_obj, use_tls): results = [ executor.submit( run_command, - command=f"tar -xf /workspace/certs.tar", + command=f"tar -xf /workspace/cert_{participant.name}.tar", workspace_path="", error_msg=f"Failed to extract certificates for {participant.name}", container_id=participant.container_id, @@ -448,7 +448,7 @@ def _verify_completion_for_participant( return True -def federation_env_setup_and_validate(request): +def federation_env_setup_and_validate(request, eval_scope=False): """ Setup the federation environment and validate the configurations Args: @@ -470,10 +470,19 @@ def federation_env_setup_and_validate(request): local_bind_path = os.path.join( home_dir, request.config.results_dir, request.config.model_name ) + num_rounds = request.config.num_rounds + + if eval_scope: + local_bind_path = f"{local_bind_path}_eval" + num_rounds = 1 + log.info(f"Running evaluation for the model: {request.config.model_name}") + workspace_path = local_bind_path + # if path exists delete it + if os.path.exists(workspace_path): + shutil.rmtree(workspace_path) if test_env == "task_runner_dockerized_ws": - agg_domain_name = "aggregator" # Cleanup docker containers dh.cleanup_docker_containers() @@ -483,7 +492,7 @@ def federation_env_setup_and_validate(request): log.info( f"Running federation setup using {test_env} API on single machine with below configurations:\n" f"\tNumber of collaborators: {request.config.num_collaborators}\n" - f"\tNumber of rounds: {request.config.num_rounds}\n" + f"\tNumber of rounds: {num_rounds}\n" f"\tModel name: {request.config.model_name}\n" f"\tClient authentication: {request.config.require_client_auth}\n" f"\tTLS: {request.config.use_tls}\n" @@ -731,7 +740,7 @@ def download_data(collaborators, model_name, local_bind_path): log.info("Downloading the data for the model. This will take some time to complete based on the data size ..") try: command = ["python", constants.DATA_SETUP_FILE, str(len(collaborators))] - subprocess.run(command, cwd=local_bind_path, check=True) + subprocess.run(command, cwd=local_bind_path, check=True) # nosec B603 except Exception: raise ex.DataSetupException(f"Failed to download data for {model_name}") @@ -824,22 +833,12 @@ def start_docker_containers_for_dws( """ for participant in participants: try: - if participant.name == "aggregator": - local_ws_path = f"{local_bind_path}/aggregator/workspace" - local_cert_tar = "cert_agg.tar" - else: - local_ws_path = f"{local_bind_path}/{participant.name}/workspace" - local_cert_tar = f"cert_col_{participant.name}.tar" - # In case of dockerized workspace, the workspace gets created inside folder with image name container = dh.start_docker_container( container_name=participant.name, workspace_path=workspace_path, local_bind_path=local_bind_path, image=image_name, - mount_mapping=[ - f"{local_ws_path}/{local_cert_tar}:/{image_name}/certs.tar" - ], ) participant.container_id = container.id except Exception as e: diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index ed94d424c2..15b267ee6b 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -11,19 +11,25 @@ import tests.end_to_end.utils.constants as constants from tests.end_to_end.utils.generate_report import convert_to_json -# Initialize the XML parser -parser = etree.XMLParser(recover=True, encoding="utf-8") - result_path = os.path.join(Path().home(), "results") -result_xml = os.path.join(result_path, "results.xml") -if not os.path.exists(result_xml): - print(f"Results XML file not found at {result_xml}. Exiting...") - exit(1) -tree = defused_parse(result_xml, parser=parser) +def initialize_xml_parser(): + """ + Initialize the XML parser and parse the results XML file. + Returns: + testsuites: the root element of the parsed XML tree + """ + parser = etree.XMLParser(recover=True, encoding="utf-8") + result_xml = os.path.join(result_path, "results.xml") + if not os.path.exists(result_xml): + print(f"Results XML file not found at {result_xml}. Exiting...") + exit(1) + + tree = defused_parse(result_xml, parser=parser) -# Get the root element -testsuites = tree.getroot() + # Get the root element + testsuites = tree.getroot() + return testsuites def get_aggregated_accuracy(agg_log_file): @@ -79,6 +85,8 @@ def get_testcase_result(): """ database_list = [] status = None + # Initialize the XML parser + testsuites = initialize_xml_parser() # Iterate over each testsuite in testsuites for testsuite in testsuites: # Populate testcase details in a dictionary diff --git a/tests/end_to_end/utils/tr_common_fixtures.py b/tests/end_to_end/utils/tr_common_fixtures.py index 88e02e631c..992fa6c605 100644 --- a/tests/end_to_end/utils/tr_common_fixtures.py +++ b/tests/end_to_end/utils/tr_common_fixtures.py @@ -2,23 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 import pytest -import collections -import concurrent.futures -import logging -import tests.end_to_end.utils.constants as constants -import tests.end_to_end.utils.federation_helper as fh -import tests.end_to_end.utils.ssh_helper as ssh -from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model - -log = logging.getLogger(__name__) - - -# Define a named tuple to store the objects for model owner, aggregator, and collaborators -federation_fixture = collections.namedtuple( - "federation_fixture", - "model_owner, aggregator, collaborators, workspace_path, local_bind_path", -) +from tests.end_to_end.utils.tr_workspace import create_tr_workspace, create_tr_dws_workspace @pytest.fixture(scope="function") @@ -30,92 +15,12 @@ def fx_federation_tr(request): Args: request: pytest request object. Model name is passed as a parameter to the fixture from test cases. Returns: - federation_fixture: Named tuple containing the objects for model owner, aggregator, and collaborators + federation_details: Named tuple containing the objects for model owner, aggregator, and collaborators Note: As this is a function level fixture, thus no import is required at test level. """ request.config.test_env = "task_runner_basic" - - collaborators = [] - executor = concurrent.futures.ThreadPoolExecutor() - - model_name, workspace_path, local_bind_path, agg_domain_name = ( - fh.federation_env_setup_and_validate(request) - ) - - agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) - - # Create model owner object and the workspace for the model - # Workspace name will be same as the model name - model_owner = mo_model.ModelOwner( - model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path - ) - - # Create workspace for given model name - fh.create_persistent_store(model_owner.name, local_bind_path) - - model_owner.create_workspace() - fh.add_local_workspace_permission(local_bind_path) - - # Modify the plan - plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) - model_owner.modify_plan(param_config=request.config, plan_path=plan_path) - - # Initialize the plan - model_owner.initialize_plan(agg_domain_name=agg_domain_name) - - # Certify the workspace in case of TLS - # Register the collaborators in case of non-TLS - if request.config.use_tls: - model_owner.certify_workspace() - else: - model_owner.register_collaborators(plan_path, request.config.num_collaborators) - - # Create the objects for aggregator and collaborators - # Workspace path for aggregator is uniform in case of docker or task_runner - # But, for collaborators, it is different - aggregator = agg_model.Aggregator( - agg_domain_name=agg_domain_name, - workspace_path=agg_workspace_path, - container_id=model_owner.container_id, # None in case of non-docker environment - ) - - # Generate the sign request and certify the aggregator in case of TLS - if request.config.use_tls: - aggregator.generate_sign_request() - model_owner.certify_aggregator(agg_domain_name) - - # Export the workspace - # By default the workspace will be exported to workspace.zip - model_owner.export_workspace() - - futures = [ - executor.submit( - fh.setup_collaborator, - index, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) - for index in range(1, request.config.num_collaborators+1) - ] - collaborators = [f.result() for f in futures] - - # Data setup requires total no of collaborators, thus keeping the function call outside of the loop - if model_name.lower() == "xgb_higgs": - fh.setup_collaborator_data(collaborators, model_name, local_bind_path) - - if request.config.use_tls: - fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) - fh.import_pki_for_collaborators(collaborators, local_bind_path) - - # Return the federation fixture - return federation_fixture( - model_owner=model_owner, - aggregator=aggregator, - collaborators=collaborators, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) + return create_tr_workspace(request) @pytest.fixture(scope="function") @@ -127,115 +32,9 @@ def fx_federation_tr_dws(request): Args: request: pytest request object. Model name is passed as a parameter to the fixture from test cases. Returns: - federation_fixture: Named tuple containing the objects for model owner, aggregator, and collaborators + federation_details: Named tuple containing the objects for model owner, aggregator, and collaborators Note: As this is a function level fixture, thus no import is required at test level. """ request.config.test_env = "task_runner_dockerized_ws" - - collaborators = [] - executor = concurrent.futures.ThreadPoolExecutor() - - model_name, workspace_path, local_bind_path, agg_domain_name = ( - fh.federation_env_setup_and_validate(request) - ) - - agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) - - # Create model owner object and the workspace for the model - # Workspace name will be same as the model name - model_owner = mo_model.ModelOwner( - model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path - ) - - # Create workspace for given model name - fh.create_persistent_store(model_owner.name, local_bind_path) - - model_owner.create_workspace() - fh.add_local_workspace_permission(local_bind_path) - - # Modify the plan - plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) - model_owner.modify_plan(param_config=request.config, plan_path=plan_path) - - # Initialize the plan - model_owner.initialize_plan(agg_domain_name=agg_domain_name) - - # Command 'fx workspace dockerize --save ..' will use the workspace name for image name - # which is 'workspace' in this case. - model_owner.dockerize_workspace() - image_name = "workspace" - - # Certify the workspace in case of TLS - # Register the collaborators in case of non-TLS - if request.config.use_tls: - model_owner.certify_workspace() - else: - model_owner.register_collaborators(plan_path, request.config.num_collaborators) - - # Create the objects for aggregator and collaborators - # Workspace path for aggregator is uniform in case of docker or task_runner - # But, for collaborators, it is different - aggregator = agg_model.Aggregator( - agg_domain_name=agg_domain_name, - workspace_path=agg_workspace_path, - container_id=model_owner.container_id, # None in case of non-docker environment - ) - - futures = [ - executor.submit( - fh.setup_collaborator, - index, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) - for index in range(1, request.config.num_collaborators+1) - ] - collaborators = [f.result() for f in futures] - - if request.config.use_tls: - fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) - - # Data setup requires total no of collaborators, thus keeping the function call outside of the loop - if model_name.lower() == "xgb_higgs": - fh.setup_collaborator_data(collaborators, model_name, local_bind_path) - - # Note: In case of multiple machines setup, scp the created tar for collaborators to the other machine(s) - fh.create_tarball_for_collaborators( - collaborators, local_bind_path, use_tls=request.config.use_tls, - add_data=True if model_name.lower() == "xgb_higgs" else False - ) - - # Generate the sign request and certify the aggregator in case of TLS - if request.config.use_tls: - aggregator.generate_sign_request() - model_owner.certify_aggregator(agg_domain_name) - - local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) - - # Note: In case of multiple machines setup, scp this tar to the other machine(s) - return_code, output, error = ssh.run_command( - f"tar -cf cert_agg.tar plan cert save", work_dir=local_agg_ws_path - ) - if return_code != 0: - raise Exception(f"Failed to create tar for aggregator: {error}") - - # Note: In case of multiple machines setup, scp this workspace tar - # to the other machine(s) so that docker load can load the image. - model_owner.load_workspace(workspace_tar_name=f"{image_name}.tar") - - fh.start_docker_containers_for_dws( - participants=[aggregator] + collaborators, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - image_name=image_name, - ) - - # Return the federation fixture - return federation_fixture( - model_owner=model_owner, - aggregator=aggregator, - collaborators=collaborators, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) + return create_tr_dws_workspace(request) diff --git a/tests/end_to_end/utils/tr_workspace.py b/tests/end_to_end/utils/tr_workspace.py new file mode 100644 index 0000000000..c9c4400c0e --- /dev/null +++ b/tests/end_to_end/utils/tr_workspace.py @@ -0,0 +1,251 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import collections +import concurrent.futures +import logging +pass +pass + +import tests.end_to_end.utils.constants as constants +import tests.end_to_end.utils.federation_helper as fh +import tests.end_to_end.utils.ssh_helper as ssh +from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model + +log = logging.getLogger(__name__) + + +# Define a named tuple to store the objects for model owner, aggregator, and collaborators +federation_details = collections.namedtuple( + "federation_details", + "model_owner, aggregator, collaborators, workspace_path, local_bind_path", +) + + +def create_tr_workspace(request, eval_scope=False): + """Summary: Create a task runner workspace. + + Args: + request (object): Pytest request object. + eval_scope (bool, optional): If True, sets up the evaluation scope for a single round. Defaults to False. + + Returns: + tuple : A named tuple containing the objects for model owner, aggregator, and collaborators. + """ + collaborators = [] + executor = concurrent.futures.ThreadPoolExecutor() + + model_name, workspace_path, local_bind_path, agg_domain_name = ( + fh.federation_env_setup_and_validate(request, eval_scope) + ) + + agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) + + # Create model owner object and the workspace for the model + # Workspace name will be same as the model name + model_owner = mo_model.ModelOwner( + model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path + ) + + # Create workspace for given model name + fh.create_persistent_store(model_owner.name, local_bind_path) + + model_owner.create_workspace() + fh.add_local_workspace_permission(local_bind_path) + + # Modify the plan + plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) + param_config=request.config + + initial_model_path = None + if eval_scope: + log.info("Setting up evaluation scope, update the plan for 1 round and initial model to previous experiment best model") + param_config.num_rounds = 1 + initial_model_path = request.config.best_model_path + + model_owner.modify_plan(param_config, plan_path=plan_path, eval_scope=eval_scope) + + # Initialize the plan + model_owner.initialize_plan(agg_domain_name=agg_domain_name, initial_model_path=initial_model_path) + + # Certify the workspace in case of TLS + # Register the collaborators in case of non-TLS + if request.config.use_tls: + model_owner.certify_workspace() + else: + model_owner.register_collaborators(plan_path, request.config.num_collaborators) + + # Create the objects for aggregator and collaborators + # Workspace path for aggregator is uniform in case of docker or task_runner + # But, for collaborators, it is different + aggregator = agg_model.Aggregator( + agg_domain_name=agg_domain_name, + workspace_path=agg_workspace_path, + container_id=model_owner.container_id, # None in case of non-docker environment + eval_scope=eval_scope + ) + + # Generate the sign request and certify the aggregator in case of TLS + if request.config.use_tls: + aggregator.generate_sign_request() + model_owner.certify_aggregator(agg_domain_name) + + # Export the workspace + # By default the workspace will be exported to workspace.zip + model_owner.export_workspace() + + futures = [ + executor.submit( + fh.setup_collaborator, + index, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + for index in range(1, request.config.num_collaborators+1) + ] + collaborators = [f.result() for f in futures] + + # Data setup requires total no of collaborators, thus keeping the function call outside of the loop + if model_name.lower() == "xgb_higgs": + fh.setup_collaborator_data(collaborators, model_name, local_bind_path) + + if request.config.use_tls: + fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) + fh.import_pki_for_collaborators(collaborators, local_bind_path) + + # Return the federation fixture + return federation_details( + model_owner=model_owner, + aggregator=aggregator, + collaborators=collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + + +def create_tr_dws_workspace(request, eval_scope=False): + """ + Run task runner experiment thru dockerized workspace. + Args: + request (object): Pytest request object. + eval_scope (bool, optional): If True, sets up the evaluation scope for a single round. Defaults to False. + + Returns: + tuple: A named tuple containing the objects for model owner, aggregator, and collaborators. + """ + + collaborators = [] + executor = concurrent.futures.ThreadPoolExecutor() + + model_name, workspace_path, local_bind_path, agg_domain_name = ( + fh.federation_env_setup_and_validate(request, eval_scope) + ) + + agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) + + # Create model owner object and the workspace for the model + # Workspace name will be same as the model name + model_owner = mo_model.ModelOwner( + model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path + ) + + # Create workspace for given model name + fh.create_persistent_store(model_owner.name, local_bind_path) + + model_owner.create_workspace() + fh.add_local_workspace_permission(local_bind_path) + + # Modify the plan + plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) + param_config=request.config + + initial_model_path = None + if eval_scope: + log.info("Setting up evaluation scope, update the plan for 1 round and initial model to previous experiment best model") + param_config.num_rounds = 1 + initial_model_path = request.config.best_model_path + + model_owner.modify_plan(param_config, plan_path=plan_path, eval_scope=eval_scope) + + # Initialize the plan + model_owner.initialize_plan(agg_domain_name=agg_domain_name, initial_model_path=initial_model_path) + + # Command 'fx workspace dockerize --save ..' will use the workspace name for image name + # which is 'workspace' in this case. + model_owner.dockerize_workspace() + image_name = "workspace" + + # Certify the workspace in case of TLS + # Register the collaborators in case of non-TLS + if request.config.use_tls: + model_owner.certify_workspace() + else: + model_owner.register_collaborators(plan_path, request.config.num_collaborators) + + # Create the objects for aggregator and collaborators + # Workspace path for aggregator is uniform in case of docker or task_runner + # But, for collaborators, it is different + aggregator = agg_model.Aggregator( + agg_domain_name=agg_domain_name, + workspace_path=agg_workspace_path, + container_id=model_owner.container_id, # None in case of non-docker environment + eval_scope=eval_scope + ) + + futures = [ + executor.submit( + fh.setup_collaborator, + index, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + for index in range(1, request.config.num_collaborators+1) + ] + collaborators = [f.result() for f in futures] + + if request.config.use_tls: + fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) + + # Data setup requires total no of collaborators, thus keeping the function call outside of the loop + if model_name.lower() == "xgb_higgs": + fh.setup_collaborator_data(collaborators, model_name, local_bind_path) + + # Note: In case of multiple machines setup, scp the created tar for collaborators to the other machine(s) + fh.create_tarball_for_collaborators( + collaborators, local_bind_path, use_tls=request.config.use_tls, + add_data=True if model_name.lower() == "xgb_higgs" else False + ) + + # Generate the sign request and certify the aggregator in case of TLS + if request.config.use_tls: + aggregator.generate_sign_request() + model_owner.certify_aggregator(agg_domain_name) + + local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) + + # Note: In case of multiple machines setup, scp this tar to the other machine(s) + return_code, output, error = ssh.run_command( + f"tar -cf cert_aggregator.tar plan cert save", work_dir=local_agg_ws_path + ) + if return_code != 0: + raise Exception(f"Failed to create tar for aggregator: {error}") + + # Note: In case of multiple machines setup, scp this workspace tar + # to the other machine(s) so that docker load can load the image. + model_owner.load_workspace(workspace_tar_name=f"{image_name}.tar") + + fh.start_docker_containers_for_dws( + participants=[aggregator] + collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + image_name=image_name, + ) + + # Return the federation fixture + return federation_details( + model_owner=model_owner, + aggregator=aggregator, + collaborators=collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + )