diff --git a/cg/meta/workflow/nf_analysis.py b/cg/meta/workflow/nf_analysis.py index b679c77194..e8642d0e4a 100644 --- a/cg/meta/workflow/nf_analysis.py +++ b/cg/meta/workflow/nf_analysis.py @@ -205,6 +205,66 @@ def write_trailblazer_config(self, case_id: str, tower_id: str) -> None: file_path=config_path, ) + def _run_analysis_with_nextflow( + self, case_id: str, command_args: CommandArgs, dry_run: bool + ) -> None: + """Run analysis with given options using Nextflow.""" + self.process = Process( + binary=self.nextflow_binary_path, + environment=self.conda_env, + conda_binary=self.conda_binary, + launch_directory=self.get_case_path(case_id=case_id), + ) + LOG.info("Pipeline will be executed using Nextflow") + parameters: list[str] = NextflowHandler.get_nextflow_run_parameters( + case_id=case_id, + pipeline_path=self.nfcore_pipeline_path, + root_dir=self.root_dir, + command_args=command_args.dict(), + ) + self.process.export_variables( + export=NextflowHandler.get_variables_to_export(), + ) + command: str = self.process.get_command(parameters=parameters) + LOG.info(f"{command}") + sbatch_number: int = NextflowHandler.execute_head_job( + case_id=case_id, + case_directory=self.get_case_path(case_id=case_id), + slurm_account=self.account, + email=self.email, + qos=self.get_slurm_qos_for_case(case_id=case_id), + commands=command, + dry_run=dry_run, + ) + LOG.info(f"Nextflow head job running as job: {sbatch_number}") + + def _run_analysis_with_tower( + self, case_id: str, command_args: CommandArgs, dry_run: bool + ) -> None: + """Run analysis with given options using NF-Tower.""" + LOG.info("Pipeline will be executed using Tower") + if command_args.resume: + from_tower_id: int = command_args.id or NfTowerHandler.get_last_tower_id( + case_id=case_id, + trailblazer_config=self.get_trailblazer_config_path(case_id=case_id), + ) + LOG.info(f"Pipeline will be resumed from run with Tower id: {from_tower_id}.") + parameters: list[str] = NfTowerHandler.get_tower_relaunch_parameters( + from_tower_id=from_tower_id, command_args=command_args.dict() + ) + else: + parameters: list[str] = NfTowerHandler.get_tower_launch_parameters( + tower_pipeline=self.tower_pipeline, + command_args=command_args.dict(), + ) + self.process.run_command(parameters=parameters, dry_run=dry_run) + if self.process.stderr: + LOG.error(self.process.stderr) + if not dry_run: + tower_id = NfTowerHandler.get_tower_id(stdout_lines=self.process.stdout_lines()) + self.write_trailblazer_config(case_id=case_id, tower_id=tower_id) + LOG.info(self.process.stdout) + def run_analysis( self, case_id: str, @@ -214,61 +274,17 @@ def run_analysis( ) -> None: """Execute run analysis with given options.""" if use_nextflow: - self.process = Process( - binary=self.nextflow_binary_path, - environment=self.conda_env, - conda_binary=self.conda_binary, - launch_directory=self.get_case_path(case_id=case_id), - ) - LOG.info("Pipeline will be executed using nextflow") - parameters: list[str] = NextflowHandler.get_nextflow_run_parameters( + self._run_analysis_with_nextflow( case_id=case_id, - pipeline_path=self.nfcore_pipeline_path, - root_dir=self.root_dir, - command_args=command_args.dict(), - ) - self.process.export_variables( - export=NextflowHandler.get_variables_to_export(), + command_args=command_args, + dry_run=dry_run, ) - - command = self.process.get_command(parameters=parameters) - LOG.info(f"{command}") - sbatch_number: int = NextflowHandler.execute_head_job( + else: + self._run_analysis_with_tower( case_id=case_id, - case_directory=self.get_case_path(case_id=case_id), - slurm_account=self.account, - email=self.email, - qos=self.get_slurm_qos_for_case(case_id=case_id), - commands=command, + command_args=command_args, dry_run=dry_run, ) - LOG.info(f"Nextflow head job running as job {sbatch_number}") - - else: - LOG.info("Pipeline will be executed using tower") - if command_args.resume: - from_tower_id: int = command_args.id - if not from_tower_id: - from_tower_id: int = NfTowerHandler.get_last_tower_id( - case_id=case_id, - trailblazer_config=self.get_trailblazer_config_path(case_id=case_id), - ) - LOG.info(f"Pipeline will be resumed from run {from_tower_id}.") - parameters: list[str] = NfTowerHandler.get_tower_relaunch_parameters( - from_tower_id=from_tower_id, command_args=command_args.dict() - ) - else: - parameters: list[str] = NfTowerHandler.get_tower_launch_parameters( - tower_pipeline=self.tower_pipeline, - command_args=command_args.dict(), - ) - self.process.run_command(parameters=parameters, dry_run=dry_run) - if self.process.stderr: - LOG.error(self.process.stderr) - if not dry_run: - tower_id = NfTowerHandler.get_tower_id(stdout_lines=self.process.stdout_lines()) - self.write_trailblazer_config(case_id=case_id, tower_id=tower_id) - LOG.info(self.process.stdout) @staticmethod def get_deliverables_template_content() -> list[dict]: diff --git a/tests/cli/workflow/rnafusion/test_cli_rnafusion_run.py b/tests/cli/workflow/rnafusion/test_cli_rnafusion_run.py index a03591193c..f4697319e6 100644 --- a/tests/cli/workflow/rnafusion/test_cli_rnafusion_run.py +++ b/tests/cli/workflow/rnafusion/test_cli_rnafusion_run.py @@ -116,7 +116,7 @@ def test_with_config_use_nextflow( assert result.exit_code == EXIT_SUCCESS # THEN command should use nextflow - assert "using nextflow" in caplog.text + assert "using Nextflow" in caplog.text assert "path/to/bin/nextflow" in caplog.text assert "-work-dir" in caplog.text @@ -145,7 +145,7 @@ def test_with_config( assert result.exit_code == EXIT_SUCCESS # THEN command should use tower - assert "using tower" in caplog.text + assert "using Tower" in caplog.text assert "path/to/bin/tw launch" in caplog.text assert "--work-dir" in caplog.text diff --git a/tests/cli/workflow/taxprofiler/test_cli_taxprofiler_run.py b/tests/cli/workflow/taxprofiler/test_cli_taxprofiler_run.py index 70e14d12b3..e76b54fbbe 100644 --- a/tests/cli/workflow/taxprofiler/test_cli_taxprofiler_run.py +++ b/tests/cli/workflow/taxprofiler/test_cli_taxprofiler_run.py @@ -65,7 +65,7 @@ def test_with_config_use_nextflow( assert result.exit_code == EXIT_SUCCESS for message in [ - "using nextflow", + "using Nextflow", "path/to/bin/nextflow", "-work-dir", "-params-file", @@ -97,7 +97,7 @@ def test_with_config_use_tower( # THEN command should use tower for message in [ - "using tower", + "using Tower", "path/to/bin/tw launch", ]: assert message in caplog.text