Skip to content

Commit

Permalink
Merge branch 'master' into fetche_tissue_type_from_original_sample
Browse files Browse the repository at this point in the history
  • Loading branch information
rannick authored Nov 27, 2024
2 parents 394b6f3 + 9037ced commit e33c508
Show file tree
Hide file tree
Showing 18 changed files with 144 additions and 177 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 64.5.13
current_version = 64.5.14
commit = True
tag = True
tag_name = v{new_version}
Expand Down
2 changes: 1 addition & 1 deletion cg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__title__ = "cg"
__version__ = "64.5.13"
__version__ = "64.5.14"
15 changes: 0 additions & 15 deletions cg/io/config.py

This file was deleted.

99 changes: 63 additions & 36 deletions cg/meta/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Iterator
Expand All @@ -20,11 +22,10 @@
from cg.constants.nf_analysis import NfTowerStatus
from cg.constants.tb import AnalysisStatus
from cg.exc import CgError, HousekeeperStoreError, MetricsQCError
from cg.io.config import write_config_nextflow_style
from cg.io.controller import ReadFile, WriteFile
from cg.io.json import read_json
from cg.io.txt import concat_txt, write_txt
from cg.io.yaml import write_yaml_nextflow_style
from cg.io.yaml import read_yaml, write_yaml_nextflow_style
from cg.meta.workflow.analysis import AnalysisAPI
from cg.meta.workflow.nf_handlers import NextflowHandler, NfTowerHandler
from cg.meta.workflow.utils.genome_build_helpers import get_genome_build
Expand Down Expand Up @@ -55,14 +56,15 @@ def __init__(self, config: CGConfig, workflow: Workflow):
super().__init__(workflow=workflow, config=config)
self.workflow: Workflow = workflow
self.root_dir: str | None = None
self.nfcore_workflow_path: str | None = None
self.workflow_bin_path: str | None = None
self.references: str | None = None
self.profile: str | None = None
self.conda_env: str | None = None
self.conda_binary: str | None = None
self.config_platform: str | None = None
self.config_params: str | None = None
self.config_resources: str | None = None
self.platform: str | None = None
self.params: str | None = None
self.config: str | None = None
self.resources: str | None = None
self.tower_binary_path: str | None = None
self.tower_workflow: str | None = None
self.account: str | None = None
Expand Down Expand Up @@ -98,11 +100,6 @@ def sample_sheet_headers(self) -> list[str]:
"""Headers for sample sheet."""
raise NotImplementedError

@property
def is_params_appended_to_nextflow_config(self) -> bool:
"""Return True if parameters should be added into the nextflow config file instead of the params file."""
return False

@property
def is_multiqc_pattern_search_exact(self) -> bool:
"""Return True if only exact pattern search is allowed to collect metrics information from MultiQC file.
Expand Down Expand Up @@ -131,24 +128,20 @@ def get_workflow_version(self, case_id: str) -> str:
"""Get workflow version from config."""
return self.revision

def get_workflow_parameters(self, case_id: str) -> WorkflowParameters:
def get_built_workflow_parameters(self, case_id: str) -> WorkflowParameters:
"""Return workflow parameters."""
raise NotImplementedError

def get_nextflow_config_content(self, case_id: str) -> str:
"""Return nextflow config content."""
config_files_list: list[str] = [
self.config_platform,
self.config_params,
self.config_resources,
self.platform,
self.config,
self.resources,
]
extra_parameters_str: list[str] = [
self.set_cluster_options(case_id=case_id),
]
if self.is_params_appended_to_nextflow_config:
extra_parameters_str.append(
write_config_nextflow_style(self.get_workflow_parameters(case_id=case_id).dict())
)
return concat_txt(
file_paths=config_files_list,
str_content=extra_parameters_str,
Expand Down Expand Up @@ -284,12 +277,12 @@ def verify_deliverables_file_exists(self, case_id: str) -> None:
if not Path(self.get_deliverables_file_path(case_id=case_id)).exists():
raise CgError(f"No deliverables file found for case {case_id}")

def write_params_file(self, case_id: str, workflow_parameters: dict = None) -> None:
def write_params_file(self, case_id: str, replaced_workflow_parameters: dict = None) -> None:
"""Write params-file for analysis."""
LOG.debug("Writing parameters file")
if workflow_parameters:
if replaced_workflow_parameters:
write_yaml_nextflow_style(
content=workflow_parameters,
content=replaced_workflow_parameters,
file_path=self.get_params_file_path(case_id=case_id),
)
else:
Expand Down Expand Up @@ -330,7 +323,7 @@ def write_trailblazer_config(self, case_id: str, tower_id: str) -> None:
file_path=config_path,
)

def create_sample_sheet(self, case_id: str, dry_run: bool):
def create_sample_sheet(self, case_id: str, dry_run: bool) -> None:
"""Create sample sheet for a case."""
sample_sheet_content: list[list[Any]] = self.get_sample_sheet_content(case_id=case_id)
if not dry_run:
Expand All @@ -340,25 +333,59 @@ def create_sample_sheet(self, case_id: str, dry_run: bool):
header=self.sample_sheet_headers,
)

def create_params_file(self, case_id: str, dry_run: bool):
def create_params_file(self, case_id: str, dry_run: bool) -> None:
"""Create parameters file for a case."""
LOG.debug("Getting parameters information")
workflow_parameters = None
if not self.is_params_appended_to_nextflow_config:
workflow_parameters: dict | None = self.get_workflow_parameters(case_id=case_id).dict()
LOG.debug("Getting parameters information built on-the-fly")
built_workflow_parameters: dict | None = self.get_built_workflow_parameters(
case_id=case_id
).model_dump()
LOG.debug("Adding parameters from the pipeline config file if it exist")

workflow_parameters: dict = built_workflow_parameters | (
read_yaml(self.params) if hasattr(self, "params") and self.params else {}
)
replaced_workflow_parameters: dict = self.replace_values_in_params_file(
workflow_parameters=workflow_parameters
)
if not dry_run:
self.write_params_file(case_id=case_id, workflow_parameters=workflow_parameters)
self.write_params_file(
case_id=case_id, replaced_workflow_parameters=replaced_workflow_parameters
)

def replace_values_in_params_file(self, workflow_parameters: dict) -> dict:
replaced_workflow_parameters = copy.deepcopy(workflow_parameters)
"""Iterate through the dictionary until all placeholders are replaced with the corresponding value from the dictionary"""
while True:
resolved: bool = True
for key, value in replaced_workflow_parameters.items():
new_value: str | int = self.replace_params_placeholders(value, workflow_parameters)
if new_value != value:
resolved = False
replaced_workflow_parameters[key] = new_value
if resolved:
break
return replaced_workflow_parameters

def replace_params_placeholders(self, value: str | int, workflow_parameters: dict) -> str:
"""Replace values marked as placeholders with values from the given dictionary"""
if isinstance(value, str):
placeholders: list[str] = re.findall(r"{{\s*([^{}\s]+)\s*}}", value)
for placeholder in placeholders:
if placeholder in workflow_parameters:
value = value.replace(
f"{{{{{placeholder}}}}}", str(workflow_parameters[placeholder])
)
return value

def create_nextflow_config(self, case_id: str, dry_run: bool = False) -> None:
"""Create nextflow config file."""
if content := self.get_nextflow_config_content(case_id=case_id):
LOG.debug("Writing nextflow config file")
if dry_run:
return
write_txt(
content=content,
file_path=self.get_nextflow_config_path(case_id=case_id),
)
if not dry_run:
write_txt(
content=content,
file_path=self.get_nextflow_config_path(case_id=case_id),
)

def create_gene_panel(self, case_id: str, dry_run: bool) -> None:
"""Create and write an aggregated gene panel file exported from Scout."""
Expand Down Expand Up @@ -402,7 +429,7 @@ def _run_analysis_with_nextflow(
LOG.info("Workflow will be executed using Nextflow")
parameters: list[str] = NextflowHandler.get_nextflow_run_parameters(
case_id=case_id,
workflow_path=self.nfcore_workflow_path,
workflow_bin_path=self.workflow_bin_path,
root_dir=self.root_dir,
command_args=command_args.dict(),
)
Expand Down
10 changes: 3 additions & 7 deletions cg/meta/workflow/nf_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@

from cg.apps.slurm.slurm_api import SlurmAPI
from cg.constants.constants import FileExtensions, FileFormat
from cg.constants.nextflow import (
JAVA_MEMORY_HEADJOB,
NXF_JVM_ARGS_ENV,
SlurmHeadJobDefaults,
)
from cg.constants.nextflow import JAVA_MEMORY_HEADJOB, NXF_JVM_ARGS_ENV, SlurmHeadJobDefaults
from cg.io.controller import ReadFile
from cg.models.slurm.sbatch import Sbatch
from cg.utils.utils import build_command_from_dict
Expand Down Expand Up @@ -116,7 +112,7 @@ def get_variables_to_export() -> dict[str, str]:

@classmethod
def get_nextflow_run_parameters(
cls, case_id: str, workflow_path: str, root_dir: str, command_args: dict
cls, case_id: str, workflow_bin_path: str, root_dir: str, command_args: dict
) -> list[str]:
"""Returns a Nextflow run command given a dictionary with arguments."""

Expand All @@ -137,7 +133,7 @@ def get_nextflow_run_parameters(
),
exclude_true=True,
)
return nextflow_options + ["run", workflow_path] + run_options
return nextflow_options + ["run", workflow_bin_path] + run_options

@staticmethod
def get_head_job_sbatch_path(case_directory: Path) -> Path:
Expand Down
27 changes: 11 additions & 16 deletions cg/meta/workflow/raredisease.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ def __init__(
):
super().__init__(config=config, workflow=workflow)
self.root_dir: str = config.raredisease.root
self.nfcore_workflow_path: str = config.raredisease.workflow_path
self.references: str = config.raredisease.references
self.workflow_bin_path: str = config.raredisease.workflow_bin_path
self.profile: str = config.raredisease.profile
self.conda_env: str = config.raredisease.conda_env
self.conda_binary: str = config.raredisease.conda_binary
self.config_platform: str = config.raredisease.config_platform
self.config_params: str = config.raredisease.config_params
self.config_resources: str = config.raredisease.config_resources
self.platform: str = config.raredisease.platform
self.params: str = config.raredisease.params
self.config: str = config.raredisease.config
self.resources: str = config.raredisease.resources
self.tower_binary_path: str = config.tower_binary_path
self.tower_workflow: str = config.raredisease.tower_workflow
self.account: str = config.raredisease.slurm.account
Expand Down Expand Up @@ -96,31 +96,30 @@ def get_target_bed(self, case_id: str, analysis_type: str) -> str:
"""
Return the target bed file from LIMS and use default capture kit for WHOLE_GENOME_SEQUENCING.
"""
target_bed: str = self.get_target_bed_from_lims(case_id=case_id)
if not target_bed:
target_bed_file: str = self.get_target_bed_from_lims(case_id=case_id)
if not target_bed_file:
if analysis_type == AnalysisType.WHOLE_GENOME_SEQUENCING:
return DEFAULT_CAPTURE_KIT
raise ValueError("No capture kit was found in LIMS")
return target_bed
return target_bed_file

def get_germlinecnvcaller_flag(self, analysis_type: str) -> bool:
if analysis_type == AnalysisType.WHOLE_GENOME_SEQUENCING:
return True
return False

def get_workflow_parameters(self, case_id: str) -> RarediseaseParameters:
def get_built_workflow_parameters(self, case_id: str) -> RarediseaseParameters:
"""Return parameters."""
analysis_type: AnalysisType = self.get_data_analysis_type(case_id=case_id)
target_bed: str = self.get_target_bed(case_id=case_id, analysis_type=analysis_type)
target_bed_file: str = self.get_target_bed(case_id=case_id, analysis_type=analysis_type)
skip_germlinecnvcaller = self.get_germlinecnvcaller_flag(analysis_type=analysis_type)
outdir = self.get_case_path(case_id=case_id)

return RarediseaseParameters(
local_genomes=str(self.references),
input=self.get_sample_sheet_path(case_id=case_id),
outdir=outdir,
analysis_type=analysis_type,
target_bed=Path(self.references, target_bed).as_posix(),
target_bed_file=target_bed_file,
save_mapped_as_cram=True,
skip_germlinecnvcaller=skip_germlinecnvcaller,
vcfanno_extra_resources=f"{outdir}/{ScoutExportFileName.MANAGED_VARIANTS}",
Expand Down Expand Up @@ -157,10 +156,6 @@ def is_managed_variants_required(self) -> bool:
"""Return True if a managed variants needs to be exported from Scout."""
return True

@property
def root(self) -> str:
return self.config.raredisease.root

def write_managed_variants(self, case_id: str, content: list[str]) -> None:
self._write_managed_variants(out_dir=Path(self.root, case_id), content=content)

Expand Down
23 changes: 6 additions & 17 deletions cg/meta/workflow/rnafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def __init__(
):
super().__init__(config=config, workflow=workflow)
self.root_dir: str = config.rnafusion.root
self.nfcore_workflow_path: str = config.rnafusion.workflow_path
self.references: str = config.rnafusion.references
self.workflow_bin_path: str = config.rnafusion.workflow_bin_path
self.profile: str = config.rnafusion.profile
self.conda_env: str = config.rnafusion.conda_env
self.conda_binary: str = config.rnafusion.conda_binary
self.config_platform: str = config.rnafusion.config_platform
self.config_params: str = config.rnafusion.config_params
self.config_resources: str = config.rnafusion.config_resources
self.platform: str = config.rnafusion.platform
self.params: str = config.rnafusion.params
self.config: str = config.rnafusion.config
self.resources: str = config.rnafusion.resources
self.tower_binary_path: str = config.tower_binary_path
self.tower_workflow: str = config.rnafusion.tower_workflow
self.account: str = config.rnafusion.slurm.account
Expand All @@ -50,11 +50,6 @@ def sample_sheet_headers(self) -> list[str]:
"""Headers for sample sheet."""
return RnafusionSampleSheetEntry.headers()

@property
def is_params_appended_to_nextflow_config(self) -> bool:
"""Return True if parameters should be added into the nextflow config file instead of the params file."""
return False

@property
def is_multiple_samples_allowed(self) -> bool:
"""Return whether the analysis supports multiple samples to be linked to the case."""
Expand Down Expand Up @@ -82,21 +77,15 @@ def get_sample_sheet_content_per_sample(self, case_sample: CaseSample) -> list[l
)
return sample_sheet_entry.reformat_sample_content()

def get_workflow_parameters(
def get_built_workflow_parameters(
self, case_id: str, genomes_base: Path | None = None
) -> RnafusionParameters:
"""Get Rnafusion parameters."""
return RnafusionParameters(
genomes_base=genomes_base or self.get_references_path(),
input=self.get_sample_sheet_path(case_id=case_id),
outdir=self.get_case_path(case_id=case_id),
)

def get_references_path(self, genomes_base: Path | None = None) -> Path:
if genomes_base:
return genomes_base.absolute()
return Path(self.references).absolute()

@staticmethod
def ensure_mandatory_metrics_present(metrics: list[MetricsBase]) -> None:
"""Check that all mandatory metrics are present.
Expand Down
9 changes: 2 additions & 7 deletions cg/meta/workflow/taxprofiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(
):
super().__init__(config=config, workflow=workflow)
self.root_dir: str = config.taxprofiler.root
self.nfcore_workflow_path: str = config.taxprofiler.workflow_path
self.workflow_bin_path: str = config.taxprofiler.workflow_bin_path
self.conda_env: str = config.taxprofiler.conda_env
self.conda_binary: str = config.taxprofiler.conda_binary
self.profile: str = config.taxprofiler.profile
Expand All @@ -47,11 +47,6 @@ def sample_sheet_headers(self) -> list[str]:
"""Headers for sample sheet."""
return TaxprofilerSampleSheetEntry.headers()

@property
def is_params_appended_to_nextflow_config(self) -> bool:
"""Return True if parameters should be added into the nextflow config file instead of the params file."""
return False

@property
def is_multiqc_pattern_search_exact(self) -> bool:
"""Only exact pattern search is allowed to collect metrics information from multiqc file."""
Expand Down Expand Up @@ -82,7 +77,7 @@ def get_sample_sheet_content_per_sample(self, case_sample: CaseSample) -> list[l
)
return sample_sheet_entry.reformat_sample_content()

def get_workflow_parameters(self, case_id: str) -> TaxprofilerParameters:
def get_built_workflow_parameters(self, case_id: str) -> TaxprofilerParameters:
"""Return Taxprofiler parameters."""
return TaxprofilerParameters(
cluster_options=f"--qos={self.get_slurm_qos_for_case(case_id=case_id)}",
Expand Down
Loading

0 comments on commit e33c508

Please sign in to comment.