Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up run_analysis method for nf_analysis #2772

Merged
merged 9 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 66 additions & 50 deletions cg/meta/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,66 @@ def write_trailblazer_config(self, case_id: str, tower_id: str) -> None:
file_path=config_path,
)

def _run_analysis_with_nextflow(
self, case_id: str, command_args: CommandArgs, dry_run: bool
) -> None:
"""Run analysis with given options using Nextflow."""
self.process = Process(
binary=self.nextflow_binary_path,
environment=self.conda_env,
conda_binary=self.conda_binary,
launch_directory=self.get_case_path(case_id=case_id),
)
LOG.info("Pipeline will be executed using Nextflow")
parameters: list[str] = NextflowHandler.get_nextflow_run_parameters(
case_id=case_id,
pipeline_path=self.nfcore_pipeline_path,
root_dir=self.root_dir,
command_args=command_args.dict(),
)
self.process.export_variables(
export=NextflowHandler.get_variables_to_export(),
)
command: str = self.process.get_command(parameters=parameters)
LOG.info(f"{command}")
sbatch_number: int = NextflowHandler.execute_head_job(
case_id=case_id,
case_directory=self.get_case_path(case_id=case_id),
slurm_account=self.account,
email=self.email,
qos=self.get_slurm_qos_for_case(case_id=case_id),
commands=command,
dry_run=dry_run,
)
LOG.info(f"Nextflow head job running as job: {sbatch_number}")

def _run_analysis_with_tower(
self, case_id: str, command_args: CommandArgs, dry_run: bool
) -> None:
"""Run analysis with given options using NF-Tower."""
LOG.info("Pipeline will be executed using Tower")
if command_args.resume:
from_tower_id: int = command_args.id or NfTowerHandler.get_last_tower_id(
case_id=case_id,
trailblazer_config=self.get_trailblazer_config_path(case_id=case_id),
)
LOG.info(f"Pipeline will be resumed from run with Tower id: {from_tower_id}.")
parameters: list[str] = NfTowerHandler.get_tower_relaunch_parameters(
from_tower_id=from_tower_id, command_args=command_args.dict()
)
else:
parameters: list[str] = NfTowerHandler.get_tower_launch_parameters(
tower_pipeline=self.tower_pipeline,
command_args=command_args.dict(),
)
self.process.run_command(parameters=parameters, dry_run=dry_run)
if self.process.stderr:
LOG.error(self.process.stderr)
if not dry_run:
tower_id = NfTowerHandler.get_tower_id(stdout_lines=self.process.stdout_lines())
self.write_trailblazer_config(case_id=case_id, tower_id=tower_id)
LOG.info(self.process.stdout)

def run_analysis(
self,
case_id: str,
Expand All @@ -214,61 +274,17 @@ def run_analysis(
) -> None:
"""Execute run analysis with given options."""
if use_nextflow:
self.process = Process(
binary=self.nextflow_binary_path,
environment=self.conda_env,
conda_binary=self.conda_binary,
launch_directory=self.get_case_path(case_id=case_id),
)
LOG.info("Pipeline will be executed using nextflow")
parameters: list[str] = NextflowHandler.get_nextflow_run_parameters(
self._run_analysis_with_nextflow(
case_id=case_id,
pipeline_path=self.nfcore_pipeline_path,
root_dir=self.root_dir,
command_args=command_args.dict(),
)
self.process.export_variables(
export=NextflowHandler.get_variables_to_export(),
command_args=command_args,
dry_run=dry_run,
)

command = self.process.get_command(parameters=parameters)
LOG.info(f"{command}")
sbatch_number: int = NextflowHandler.execute_head_job(
else:
self._run_analysis_with_tower(
case_id=case_id,
case_directory=self.get_case_path(case_id=case_id),
slurm_account=self.account,
email=self.email,
qos=self.get_slurm_qos_for_case(case_id=case_id),
commands=command,
command_args=command_args,
dry_run=dry_run,
)
LOG.info(f"Nextflow head job running as job {sbatch_number}")

else:
LOG.info("Pipeline will be executed using tower")
if command_args.resume:
from_tower_id: int = command_args.id
if not from_tower_id:
from_tower_id: int = NfTowerHandler.get_last_tower_id(
case_id=case_id,
trailblazer_config=self.get_trailblazer_config_path(case_id=case_id),
)
LOG.info(f"Pipeline will be resumed from run {from_tower_id}.")
parameters: list[str] = NfTowerHandler.get_tower_relaunch_parameters(
from_tower_id=from_tower_id, command_args=command_args.dict()
)
else:
parameters: list[str] = NfTowerHandler.get_tower_launch_parameters(
tower_pipeline=self.tower_pipeline,
command_args=command_args.dict(),
)
self.process.run_command(parameters=parameters, dry_run=dry_run)
if self.process.stderr:
LOG.error(self.process.stderr)
if not dry_run:
tower_id = NfTowerHandler.get_tower_id(stdout_lines=self.process.stdout_lines())
self.write_trailblazer_config(case_id=case_id, tower_id=tower_id)
LOG.info(self.process.stdout)

@staticmethod
def get_deliverables_template_content() -> list[dict]:
Expand Down
4 changes: 2 additions & 2 deletions tests/cli/workflow/rnafusion/test_cli_rnafusion_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_with_config_use_nextflow(
assert result.exit_code == EXIT_SUCCESS

# THEN command should use nextflow
assert "using nextflow" in caplog.text
assert "using Nextflow" in caplog.text
assert "path/to/bin/nextflow" in caplog.text
assert "-work-dir" in caplog.text

Expand Down Expand Up @@ -145,7 +145,7 @@ def test_with_config(
assert result.exit_code == EXIT_SUCCESS

# THEN command should use tower
assert "using tower" in caplog.text
assert "using Tower" in caplog.text
assert "path/to/bin/tw launch" in caplog.text
assert "--work-dir" in caplog.text

Expand Down
4 changes: 2 additions & 2 deletions tests/cli/workflow/taxprofiler/test_cli_taxprofiler_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_with_config_use_nextflow(
assert result.exit_code == EXIT_SUCCESS

for message in [
"using nextflow",
"using Nextflow",
"path/to/bin/nextflow",
"-work-dir",
"-params-file",
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_with_config_use_tower(
# THEN command should use tower

for message in [
"using tower",
"using Tower",
"path/to/bin/tw launch",
]:
assert message in caplog.text
Expand Down
Loading