Skip to content

Commit

Permalink
(feat taxprofiler): Add report-deliver (#2912)(minor)
Browse files Browse the repository at this point in the history
### Added

- cg workflow taxprofiler report-deliver case_id
  • Loading branch information
sofstam authored Feb 29, 2024
1 parent bba565f commit d27fe02
Show file tree
Hide file tree
Showing 16 changed files with 483 additions and 203 deletions.
26 changes: 26 additions & 0 deletions cg/cli/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
"""CLI options for Nextflow and NF-Tower."""

import logging

import click
from pydantic.v1 import ValidationError

from cg.cli.workflow.commands import ARGUMENT_CASE_ID, OPTION_DRY
from cg.constants.constants import MetaApis
from cg.exc import CgError
from cg.meta.workflow.nf_analysis import NfAnalysisAPI
from cg.models.cg_config import CGConfig

LOG = logging.getLogger(__name__)

OPTION_WORKDIR = click.option(
"--work-dir",
type=click.Path(),
Expand Down Expand Up @@ -91,3 +96,24 @@ def metrics_deliver(context: CGConfig, case_id: str, dry_run: bool) -> None:
analysis_api.validate_qc_metrics(case_id=case_id, dry_run=dry_run)
except CgError as error:
raise click.Abort() from error


@click.command("report-deliver")
@ARGUMENT_CASE_ID
@OPTION_DRY
@click.pass_obj
def report_deliver(context: CGConfig, case_id: str, dry_run: bool) -> None:
"""Create a Housekeeper deliverables file for given case id."""

analysis_api: NfAnalysisAPI = context.meta_apis[MetaApis.ANALYSIS_API]

try:
analysis_api.status_db.verify_case_exists(case_internal_id=case_id)
analysis_api.trailblazer_api.is_latest_analysis_completed(case_id=case_id)
if not dry_run:
analysis_api.report_deliver(case_id=case_id)
else:
LOG.info(f"Dry-run: Would have created delivery files for case {case_id}")
except Exception as error:
LOG.error(f"Could not create report file: {error}")
raise click.Abort()
26 changes: 2 additions & 24 deletions cg/cli/workflow/rnafusion/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
OPTION_USE_NEXTFLOW,
OPTION_WORKDIR,
metrics_deliver,
report_deliver,
)
from cg.cli.workflow.rnafusion.options import (
OPTION_FROM_START,
Expand Down Expand Up @@ -222,30 +223,7 @@ def start_available(context: click.Context, dry_run: bool = False) -> None:


rnafusion.add_command(metrics_deliver)


@rnafusion.command("report-deliver")
@ARGUMENT_CASE_ID
@DRY_RUN
@click.pass_obj
def report_deliver(context: CGConfig, case_id: str, dry_run: bool) -> None:
"""Create a housekeeper deliverables file for given CASE ID."""

analysis_api: RnafusionAnalysisAPI = context.meta_apis[MetaApis.ANALYSIS_API]

try:
analysis_api.status_db.verify_case_exists(case_internal_id=case_id)
analysis_api.trailblazer_api.is_latest_analysis_completed(case_id=case_id)
if not dry_run:
analysis_api.report_deliver(case_id=case_id)
else:
LOG.info("Dry-run")
except (CgError, ValidationError) as error:
LOG.error(f"Could not create report file: {error}")
raise click.Abort()
except Exception as error:
LOG.error(f"Could not create report file: {error}")
raise click.Abort()
rnafusion.add_command(report_deliver)


@rnafusion.command("store-housekeeper")
Expand Down
2 changes: 2 additions & 0 deletions cg/cli/workflow/taxprofiler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
OPTION_USE_NEXTFLOW,
OPTION_WORKDIR,
metrics_deliver,
report_deliver,
)
from cg.cli.workflow.taxprofiler.options import OPTION_FROM_START, OPTION_INSTRUMENT_PLATFORM
from cg.constants import EXIT_FAIL, EXIT_SUCCESS
Expand All @@ -42,6 +43,7 @@ def taxprofiler(context: click.Context) -> None:

taxprofiler.add_command(resolve_compression)
taxprofiler.add_command(metrics_deliver)
taxprofiler.add_command(report_deliver)


@taxprofiler.command("config-case")
Expand Down
88 changes: 71 additions & 17 deletions cg/meta/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
MetricsDeliverablesCondition,
)
from cg.models.fastq import FastqFileMeta
from cg.models.nf_analysis import FileDeliverable, PipelineDeliverables
from cg.models.nf_analysis import FileDeliverable, WorkflowDeliverables
from cg.models.rnafusion.rnafusion import CommandArgs
from cg.utils import Process
from cg.store.models import Sample

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -307,27 +308,69 @@ def run_analysis(
dry_run=dry_run,
)

@staticmethod
def get_deliverables_template_content() -> list[dict]:
def get_deliverables_template_content(self) -> list[dict]:
"""Return deliverables file template content."""
raise NotImplementedError

def get_deliverables_for_case(self, case_id: str) -> PipelineDeliverables:
"""Return PipelineDeliverables for a given case."""
deliverable_template: list[dict] = self.get_deliverables_template_content()
sample_id: str = self.status_db.get_samples_by_case_id(case_id).pop().internal_id
def get_bundle_filenames_path(self) -> Path | None:
"""Return bundle filenames path."""
return None

@staticmethod
def get_formatted_file_deliverable(
file_template: dict[str | None, str | None],
case_id: str,
sample_id: str,
sample_name: str,
case_path: str,
) -> FileDeliverable:
"""Return the formatted file deliverable with the case and sample attributes."""
deliverables = file_template.copy()
for deliverable_field, deliverable_value in file_template.items():
if deliverable_value is None:
continue
deliverables[deliverable_field] = (
deliverables[deliverable_field]
.replace("CASEID", case_id)
.replace("SAMPLEID", sample_id)
.replace("SAMPLENAME", sample_name)
.replace("PATHTOCASE", case_path)
)
return FileDeliverable(**deliverables)

def get_deliverables_for_sample(
self, sample: Sample, case_id: str, template: list[dict[str, str]]
) -> list[FileDeliverable]:
"""Return a list of FileDeliverables for each sample."""
sample_id: str = sample.internal_id
sample_name: str = sample.name
case_path = str(self.get_case_path(case_id=case_id))
files: list[FileDeliverable] = []
for file in deliverable_template:
for deliverable_field, deliverable_value in file.items():
if deliverable_value is None:
continue
file[deliverable_field] = file[deliverable_field].replace("CASEID", case_id)
file[deliverable_field] = file[deliverable_field].replace("SAMPLEID", sample_id)
file[deliverable_field] = file[deliverable_field].replace(
"PATHTOCASE", str(self.get_case_path(case_id=case_id))
for file in template:
files.append(
self.get_formatted_file_deliverable(
file_template=file,
case_id=case_id,
sample_id=sample_id,
sample_name=sample_name,
case_path=case_path,
)
files.append(FileDeliverable(**file))
return PipelineDeliverables(files=files)
)
return files

def get_deliverables_for_case(self, case_id: str) -> WorkflowDeliverables:
"""Return workflow deliverables for a given case."""
deliverable_template: list[dict] = self.get_deliverables_template_content()
samples: list[Sample] = self.status_db.get_samples_by_case_id(case_id=case_id)
files: list[FileDeliverable] = []

for sample in samples:
bundles_per_sample = self.get_deliverables_for_sample(
sample=sample, case_id=case_id, template=deliverable_template
)
files.extend(bundle for bundle in bundles_per_sample if bundle not in files)

return WorkflowDeliverables(files=files)

def get_multiqc_json_path(self, case_id: str) -> Path:
"""Return the path of the multiqc_data.json file."""
Expand Down Expand Up @@ -415,3 +458,14 @@ def validate_qc_metrics(self, case_id: str, dry_run: bool = False) -> None:
self.trailblazer_api.set_analysis_status(case_id=case_id, status=AnalysisStatus.ERROR)
raise CgError from error
self.trailblazer_api.set_analysis_status(case_id=case_id, status=AnalysisStatus.COMPLETED)

def report_deliver(self, case_id: str) -> None:
"""Write deliverables file."""
workflow_content: WorkflowDeliverables = self.get_deliverables_for_case(case_id=case_id)
self.write_deliverables_file(
deliverables_content=workflow_content.dict(),
file_path=self.get_deliverables_file_path(case_id=case_id),
)
LOG.info(
f"Writing deliverables file in {self.get_deliverables_file_path(case_id=case_id).as_posix()}"
)
23 changes: 8 additions & 15 deletions cg/meta/workflow/rnafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from cg.constants import Workflow
from cg.constants.constants import FileFormat, Strandedness
from cg.constants.nf_analysis import MULTIQC_NEXFLOW_CONFIG, RNAFUSION_METRIC_CONDITIONS
from cg.resources import RNAFUSION_BUNDLE_FILENAMES_PATH
from cg.exc import MissingMetrics
from cg.io.controller import ReadFile
from cg.io.json import read_json
from cg.meta.workflow.nf_analysis import NfAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.models.deliverables.metric_deliverables import MetricsBase, MultiqcDataJson
from cg.models.fastq import FastqFileMeta
from cg.models.nf_analysis import PipelineDeliverables
from cg.models.rnafusion.rnafusion import (
RnafusionAnalysis,
RnafusionParameters,
Expand Down Expand Up @@ -56,18 +56,22 @@ def use_read_count_threshold(self) -> bool:
when determining if the analysis for a case should be automatically started."""
return True

@staticmethod
def get_deliverables_template_content() -> list[dict]:
def get_deliverables_template_content(self) -> list[dict]:
"""Return deliverables file template content."""
return ReadFile.get_content_from_file(
file_format=FileFormat.YAML,
file_path=resources.RNAFUSION_BUNDLE_FILENAMES_PATH,
file_path=self.get_bundle_filenames_path(),
)

def get_nextflow_config_content(self) -> str:
"""Return nextflow config content."""
return MULTIQC_NEXFLOW_CONFIG

@staticmethod
def get_bundle_filenames_path() -> Path:
"""Return Rnafusion bundle filenames path."""
return RNAFUSION_BUNDLE_FILENAMES_PATH

def get_sample_sheet_content_per_sample(
self, sample: Sample, case_id: str, strandedness: Strandedness
) -> list[list[str]]:
Expand Down Expand Up @@ -168,17 +172,6 @@ def get_multiqc_json_metrics(self, case_id: str) -> list[MetricsBase]:
)
return metric_base_list

def report_deliver(self, case_id: str) -> None:
"""Create deliverables file."""
deliverables_content: PipelineDeliverables = self.get_deliverables_for_case(case_id=case_id)
self.write_deliverables_file(
deliverables_content=deliverables_content.dict(),
file_path=self.get_deliverables_file_path(case_id=case_id),
)
LOG.info(
f"Writing deliverables file in {self.get_deliverables_file_path(case_id=case_id).as_posix()}"
)

@staticmethod
def ensure_mandatory_metrics_present(metrics: list[MetricsBase]) -> None:
"""Check that all mandatory metrics are present. Raise error if missing."""
Expand Down
16 changes: 15 additions & 1 deletion cg/meta/workflow/taxprofiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

from cg.constants import Workflow
from cg.constants.nf_analysis import MULTIQC_NEXFLOW_CONFIG
from cg.constants.sequencing import SequencingPlatform
from cg.io.json import read_json
from cg.resources import TAXPROFILER_BUNDLE_FILENAMES_PATH
from cg.constants.sequencing import SequencingPlatform
from cg.constants.constants import FileFormat
from cg.meta.workflow.nf_analysis import NfAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.models.deliverables.metric_deliverables import MetricsBase, MultiqcDataJson
from cg.models.fastq import FastqFileMeta
from cg.io.controller import ReadFile
from cg.models.taxprofiler.taxprofiler import (
TaxprofilerParameters,
TaxprofilerSampleSheetEntry,
Expand Down Expand Up @@ -50,6 +53,10 @@ def get_nextflow_config_content(self) -> str:
"""Return nextflow config content."""
return MULTIQC_NEXFLOW_CONFIG

def get_bundle_filenames_path(self) -> Path:
"""Return Taxprofiler bundle filenames path."""
return TAXPROFILER_BUNDLE_FILENAMES_PATH

def get_sample_sheet_content_per_sample(
self, sample: Sample, instrument_platform: SequencingPlatform.ILLUMINA, fasta: str = ""
) -> list[list[str]]:
Expand Down Expand Up @@ -158,3 +165,10 @@ def parse_multiqc_json_for_sample(sample_name: str, multiqc_json: list[dict]) ->
metrics_values.update(sample_values)

return metrics_values

def get_deliverables_template_content(self) -> list[dict[str, str]]:
"""Return deliverables file template content."""
return ReadFile.get_content_from_file(
file_format=FileFormat.YAML,
file_path=self.get_bundle_filenames_path(),
)
2 changes: 1 addition & 1 deletion cg/models/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def set_path_as_string(cls, file_path: str | Path) -> str | None:
return None


class PipelineDeliverables(BaseModel):
class WorkflowDeliverables(BaseModel):
"""Specification for workflow deliverables."""

files: list[FileDeliverable]
3 changes: 3 additions & 0 deletions cg/models/taxprofiler/taxprofiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class TaxprofilerParameters(PipelineParameters):
run_kraken2: bool = True
kraken2_save_reads: bool = True
kraken2_save_readclassification: bool = True
run_bracken: bool = True
run_centrifuge: bool = True
centrifuge_save_reads: bool = True
run_krona: bool = True
run_profile_standardisation: bool = True

Expand Down
6 changes: 6 additions & 0 deletions cg/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

RNAFUSION_BUNDLE_FILENAMES: str = Path("resources", "rnafusion_bundle_filenames.csv").as_posix()

TAXPROFILER_BUNDLE_FILENAMES: str = Path("resources", "taxprofiler_bundle_filenames.csv").as_posix()

RNAFUSION_BUNDLE_FILENAMES_PATH: Path = Path(
pkg_resources.resource_filename("cg", RNAFUSION_BUNDLE_FILENAMES)
)

TAXPROFILER_BUNDLE_FILENAMES_PATH: Path = Path(
pkg_resources.resource_filename("cg", TAXPROFILER_BUNDLE_FILENAMES)
)
Loading

0 comments on commit d27fe02

Please sign in to comment.