Skip to content

Commit

Permalink
split massive method
Browse files Browse the repository at this point in the history
  • Loading branch information
fevac committed Dec 14, 2023
1 parent 562eeeb commit 773b792
Showing 1 changed file with 66 additions and 50 deletions.
116 changes: 66 additions & 50 deletions cg/meta/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,66 @@ def write_trailblazer_config(self, case_id: str, tower_id: str) -> None:
file_path=config_path,
)

def _run_analysis_with_nextflow(
self, case_id: str, command_args: CommandArgs, dry_run: bool
) -> None:
"""Run analysis with given options using Nextflow."""
self.process = Process(
binary=self.nextflow_binary_path,
environment=self.conda_env,
conda_binary=self.conda_binary,
launch_directory=self.get_case_path(case_id=case_id),
)
LOG.info("Pipeline will be executed using nextflow")
parameters: list[str] = NextflowHandler.get_nextflow_run_parameters(
case_id=case_id,
pipeline_path=self.nfcore_pipeline_path,
root_dir=self.root_dir,
command_args=command_args.dict(),
)
self.process.export_variables(
export=NextflowHandler.get_variables_to_export(),
)
command = self.process.get_command(parameters=parameters)
LOG.info(f"{command}")
sbatch_number: int = NextflowHandler.execute_head_job(
case_id=case_id,
case_directory=self.get_case_path(case_id=case_id),
slurm_account=self.account,
email=self.email,
qos=self.get_slurm_qos_for_case(case_id=case_id),
commands=command,
dry_run=dry_run,
)
LOG.info(f"Nextflow head job running as job {sbatch_number}")

def _run_analysis_with_tower(
self, case_id: str, command_args: CommandArgs, dry_run: bool
) -> None:
"""Run analysis with given options using NF-Tower."""
LOG.info("Pipeline will be executed using tower")
if command_args.resume:
from_tower_id: int = command_args.id or NfTowerHandler.get_last_tower_id(
case_id=case_id,
trailblazer_config=self.get_trailblazer_config_path(case_id=case_id),
)
LOG.info(f"Pipeline will be resumed from run {from_tower_id}.")
parameters: list[str] = NfTowerHandler.get_tower_relaunch_parameters(
from_tower_id=from_tower_id, command_args=command_args.dict()
)
else:
parameters: list[str] = NfTowerHandler.get_tower_launch_parameters(
tower_pipeline=self.tower_pipeline,
command_args=command_args.dict(),
)
self.process.run_command(parameters=parameters, dry_run=dry_run)
if self.process.stderr:
LOG.error(self.process.stderr)
if not dry_run:
tower_id = NfTowerHandler.get_tower_id(stdout_lines=self.process.stdout_lines())
self.write_trailblazer_config(case_id=case_id, tower_id=tower_id)
LOG.info(self.process.stdout)

def run_analysis(
self,
case_id: str,
Expand All @@ -210,61 +270,17 @@ def run_analysis(
) -> None:
"""Execute run analysis with given options."""
if use_nextflow:
self.process = Process(
binary=self.nextflow_binary_path,
environment=self.conda_env,
conda_binary=self.conda_binary,
launch_directory=self.get_case_path(case_id=case_id),
)
LOG.info("Pipeline will be executed using nextflow")
parameters: list[str] = NextflowHandler.get_nextflow_run_parameters(
self._run_analysis_with_nextflow(
case_id=case_id,
pipeline_path=self.nfcore_pipeline_path,
root_dir=self.root_dir,
command_args=command_args.dict(),
)
self.process.export_variables(
export=NextflowHandler.get_variables_to_export(),
command_args=command_args,
dry_run=dry_run,
)

command = self.process.get_command(parameters=parameters)
LOG.info(f"{command}")
sbatch_number: int = NextflowHandler.execute_head_job(
else:
self._run_analysis_with_tower(
case_id=case_id,
case_directory=self.get_case_path(case_id=case_id),
slurm_account=self.account,
email=self.email,
qos=self.get_slurm_qos_for_case(case_id=case_id),
commands=command,
command_args=command_args,
dry_run=dry_run,
)
LOG.info(f"Nextflow head job running as job {sbatch_number}")

else:
LOG.info("Pipeline will be executed using tower")
if command_args.resume:
from_tower_id: int = command_args.id
if not from_tower_id:
from_tower_id: int = NfTowerHandler.get_last_tower_id(
case_id=case_id,
trailblazer_config=self.get_trailblazer_config_path(case_id=case_id),
)
LOG.info(f"Pipeline will be resumed from run {from_tower_id}.")
parameters: list[str] = NfTowerHandler.get_tower_relaunch_parameters(
from_tower_id=from_tower_id, command_args=command_args.dict()
)
else:
parameters: list[str] = NfTowerHandler.get_tower_launch_parameters(
tower_pipeline=self.tower_pipeline,
command_args=command_args.dict(),
)
self.process.run_command(parameters=parameters, dry_run=dry_run)
if self.process.stderr:
LOG.error(self.process.stderr)
if not dry_run:
tower_id = NfTowerHandler.get_tower_id(stdout_lines=self.process.stdout_lines())
self.write_trailblazer_config(case_id=case_id, tower_id=tower_id)
LOG.info(self.process.stdout)

@staticmethod
def get_deliverables_template_content() -> list[dict]:
Expand Down

0 comments on commit 773b792

Please sign in to comment.