Skip to content

Commit

Permalink
Add mutant qc (#3300) (minor)
Browse files Browse the repository at this point in the history
### Added

- Added  property `is_negative_control` to `Sample` model.

- Defined a mutant specific `store-available` function that calls `run_qc_and_fail_analyses()`
- `run_qc_and_fail_analyses()` performs qc on a case, generates a qc_report file, adds qc summary to the comment on the analyses on trailblazer and sets analyses that fail QC as failed on Trailblazer.
- CLI `run-qc` command to manually run QC on case and generate qc_report file. 


### Changed

- `MockLimsApi` to have more functionalities for testing.
  • Loading branch information
beatrizsavinhas authored Aug 28, 2024
1 parent e663e09 commit 534bf8d
Show file tree
Hide file tree
Showing 27 changed files with 1,643 additions and 15 deletions.
81 changes: 79 additions & 2 deletions cg/apps/lims/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
from genologics.lims import Lims
from requests.exceptions import HTTPError

from cg.constants import Priority
from cg.constants.lims import MASTER_STEPS_UDFS, PROP2UDF, DocumentationMethod, LimsArtifactTypes
from cg.constants.constants import ControlOptions, CustomerId
from cg.constants.lims import (
MASTER_STEPS_UDFS,
PROP2UDF,
DocumentationMethod,
LimsArtifactTypes,
LimsProcess,
)
from cg.constants.priority import Priority
from cg.exc import LimsDataError

from .order import OrderHandler
Expand Down Expand Up @@ -478,3 +485,73 @@ def get_latest_rna_input_amount(self, sample_id: str) -> float | None:
)
input_amount: float | None = self._get_last_used_input_amount(input_amounts=input_amounts)
return input_amount

def get_latest_artifact_for_sample(
self,
process_type: LimsProcess,
sample_internal_id: str,
artifact_type: LimsArtifactTypes | None = LimsArtifactTypes.ANALYTE,
) -> Artifact:
"""Return latest artifact for a given sample, process and artifact type."""

artifacts: list[Artifact] = self.get_artifacts(
process_type=process_type,
type=artifact_type,
samplelimsid=sample_internal_id,
)

if not artifacts:
raise LimsDataError(
f"No artifacts were found for process {process_type}, type {artifact_type} and sample {sample_internal_id}."
)

latest_artifact: Artifact = self._get_latest_artifact_from_list(artifact_list=artifacts)
return latest_artifact

def _get_latest_artifact_from_list(self, artifact_list: list[Artifact]) -> Artifact:
"""Returning the latest artifact in a list of artifacts."""
artifacts = []
for artifact in artifact_list:
date = artifact.parent_process.date_run or datetime.today().strftime("%Y-%m-%d")
artifacts.append((date, artifact.id, artifact))

artifacts.sort()
date, id, latest_artifact = artifacts[-1]
return latest_artifact

def get_internal_negative_control_id_from_sample_in_pool(
self, sample_internal_id: str, pooling_step: LimsProcess
) -> str:
"""Retrieve from LIMS the sample ID for the internal negative control sample present in the same pool as the given sample."""
artifact: Artifact = self.get_latest_artifact_for_sample(
process_type=pooling_step,
sample_internal_id=sample_internal_id,
)
negative_controls: list[Sample] = self._get_negative_controls_from_list(
samples=artifact.samples
)

if not negative_controls:
raise LimsDataError(
f"No internal negative controls found in the pool of sample {sample_internal_id}."
)

if len(negative_controls) > 1:
sample_ids = [sample.id for sample in negative_controls]
raise LimsDataError(
f"Multiple internal negative control samples found: {' '.join(sample_ids)}"
)

return negative_controls[0].id

@staticmethod
def _get_negative_controls_from_list(samples: list[Sample]) -> list[Sample]:
"""Filter and return a list of internal negative controls from a given sample list."""
negative_controls = []
for sample in samples:
if (
sample.udf.get("Control") == ControlOptions.NEGATIVE
and sample.udf.get("customer") == CustomerId.CG_INTERNAL_CUSTOMER
):
negative_controls.append(sample)
return negative_controls
50 changes: 47 additions & 3 deletions cg/cli/workflow/mutant/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
link,
resolve_compression,
store,
store_available,
)
from cg.constants import EXIT_FAIL, EXIT_SUCCESS
from cg.constants.cli_options import DRY_RUN
from cg.exc import AnalysisNotReadyError, CgError
from cg.meta.workflow.analysis import AnalysisAPI
from cg.meta.workflow.mutant import MutantAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.store.models import Case

LOG = logging.getLogger(__name__)

Expand All @@ -32,7 +32,6 @@ def mutant(context: click.Context) -> None:
mutant.add_command(resolve_compression)
mutant.add_command(link)
mutant.add_command(store)
mutant.add_command(store_available)


@mutant.command("config-case")
Expand Down Expand Up @@ -75,7 +74,6 @@ def start(context: click.Context, dry_run: bool, case_id: str, config_artic: str
context.invoke(link, case_id=case_id, dry_run=dry_run)
context.invoke(config_case, case_id=case_id, dry_run=dry_run)
context.invoke(run, case_id=case_id, dry_run=dry_run, config_artic=config_artic)
context.invoke(store, case_id=case_id, dry_run=dry_run)


@mutant.command("start-available")
Expand All @@ -100,3 +98,49 @@ def start_available(context: click.Context, dry_run: bool = False):
exit_code = EXIT_FAIL
if exit_code:
raise click.Abort


@mutant.command("store-available")
@DRY_RUN
@click.pass_context
def store_available(context: click.Context, dry_run: bool) -> None:
"""Run QC checks and store bundles for all finished analyses in Housekeeper."""

analysis_api: MutantAnalysisAPI = context.obj.meta_apis["analysis_api"]

exit_code: int = EXIT_SUCCESS

cases_ready_for_qc: list[Case] = analysis_api.get_cases_to_perform_qc_on()
LOG.info(f"Found {len(cases_ready_for_qc)} cases to perform QC on!")
for case in cases_ready_for_qc:
LOG.info(f"Performing QC on case {case.internal_id}.")
try:
analysis_api.run_qc_on_case(case=case, dry_run=dry_run)
except Exception:
exit_code = EXIT_FAIL

cases_to_store: list[Case] = analysis_api.get_cases_to_store()
LOG.info(f"Found {len(cases_to_store)} cases to store!")
for case in cases_to_store:
LOG.info(f"Storing deliverables for {case.internal_id}")
try:
context.invoke(store, case_id=case.internal_id, dry_run=dry_run)
except Exception as exception_object:
LOG.error(f"Error storingc {case.internal_id}: {exception_object}")
exit_code = EXIT_FAIL

if exit_code:
raise click.Abort


@mutant.command("run-qc")
@DRY_RUN
@ARGUMENT_CASE_ID
@click.pass_context
def run_qc(context: click.Context, case_id: str, dry_run: bool) -> None:
"""
Run QC on case and generate QC_report file.
"""
analysis_api: MutantAnalysisAPI = context.obj.meta_apis["analysis_api"]

analysis_api.run_qc(case_id=case_id, dry_run=dry_run)
7 changes: 7 additions & 0 deletions cg/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ class MicrosaltAppTags(StrEnum):
PREP_CATEGORY: str = "mic"


class MutantQC:
EXTERNAL_NEGATIVE_CONTROL_READS_THRESHOLD: int = 100000
INTERNAL_NEGATIVE_CONTROL_READS_THRESHOLD: int = 2000
FRACTION_OF_SAMPLES_WITH_FAILED_QC_TRESHOLD: float = 0.2
QUALITY_REPORT_FILE_NAME: str = f"QC_report{FileExtensions.JSON}"


DRY_RUN_MESSAGE = "Dry run: process call will not be executed!"


Expand Down
4 changes: 4 additions & 0 deletions cg/constants/lims.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,7 @@ class DocumentationMethod(StrEnum):
class LimsArtifactTypes(StrEnum):
ANALYTE: str = "Analyte"
RESULT_FILE: str = "ResultFile"


class LimsProcess(StrEnum):
COVID_POOLING_STEP: str = "Pooling and Clean-up (Cov) v1"
1 change: 1 addition & 0 deletions cg/meta/workflow/mutant/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from cg.meta.workflow.mutant.mutant import MutantAnalysisAPI
87 changes: 80 additions & 7 deletions cg/meta/workflow/mutant.py → cg/meta/workflow/mutant/mutant.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import logging
import shutil
from pathlib import Path

from cg.constants import SequencingFileTag, Workflow
from cg.constants.constants import FileFormat
from cg.constants.constants import FileFormat, MutantQC
from cg.constants.tb import AnalysisStatus
from cg.exc import CgError
from cg.io.controller import WriteFile
from cg.meta.workflow.analysis import AnalysisAPI
from cg.meta.workflow.fastq import MutantFastqHandler
from cg.services.sequencing_qc_service.sequencing_qc_service import SequencingQCService
from cg.meta.workflow.mutant.quality_controller.models import MutantQualityResult
from cg.meta.workflow.mutant.quality_controller.quality_controller import MutantQualityController
from cg.models.cg_config import CGConfig
from cg.models.workflow.mutant import MutantSampleConfig
from cg.store.models import Application, Case, Sample
Expand All @@ -24,6 +27,9 @@ def __init__(
):
super().__init__(workflow=workflow, config=config)
self.root_dir = config.mutant.root
self.quality_checker = MutantQualityController(
status_db=config.status_db, lims=config.lims_api
)

@property
def conda_binary(self) -> str:
Expand All @@ -49,9 +55,17 @@ def get_case_path(self, case_id: str) -> Path:
def get_case_output_path(self, case_id: str) -> Path:
return Path(self.get_case_path(case_id=case_id), "results")

def get_case_results_file_path(self, case: Case) -> Path:
case_output_path: Path = self.get_case_output_path(case.internal_id)
return Path(case_output_path, f"sars-cov-2_{case.latest_ticket}_results.csv")

def get_case_fastq_dir(self, case_id: str) -> Path:
return Path(self.get_case_path(case_id=case_id), "fastq")

def get_case_qc_report_path(self, case_id: str) -> Path:
case_path: Path = self.get_case_path(case_id=case_id)
return Path(case_path, MutantQC.QUALITY_REPORT_FILE_NAME)

def get_job_ids_path(self, case_id: str) -> Path:
return Path(self.get_case_output_path(case_id=case_id), "trailblazer_config.yaml")

Expand Down Expand Up @@ -188,13 +202,24 @@ def run_analysis(self, case_id: str, dry_run: bool, config_artic: str = None) ->
)

def get_cases_to_store(self) -> list[Case]:
"""Return cases where analysis has a deliverables file,
and is ready to be stored in Housekeeper."""
return [
"""Return cases for which the analysis is complete on Traiblazer and a QC report has been generated."""
cases_to_store: list[Case] = [
case
for case in self.status_db.get_running_cases_in_workflow(workflow=self.workflow)
if Path(self.get_deliverables_file_path(case_id=case.internal_id)).exists()
for case in self.status_db.get_running_cases_in_workflow(self.workflow)
if self.trailblazer_api.is_latest_analysis_completed(case.internal_id)
and self.get_case_qc_report_path(case_id=case.internal_id).exists()
]
return cases_to_store

def get_cases_to_perform_qc_on(self) -> list[Case]:
"""Return cases with a completed analysis that are not yet stored."""
cases_to_perform_qc_on: list[Case] = [
case
for case in self.status_db.get_running_cases_in_workflow(self.workflow)
if self.trailblazer_api.is_latest_analysis_completed(case.internal_id)
and not self.get_case_qc_report_path(case_id=case.internal_id).exists()
]
return cases_to_perform_qc_on

def get_metadata_for_nanopore_sample(self, sample: Sample) -> list[dict]:
return [
Expand Down Expand Up @@ -249,3 +274,51 @@ def link_nanopore_fastq_for_sample(
LOG.info(f"Concatenation in progress for sample {sample.internal_id}.")
self.fastq_handler.concatenate(read_paths, concatenated_path)
self.fastq_handler.remove_files(read_paths)

def run_qc_on_case(self, case: Case, dry_run: bool) -> None:
"""Run qc check on case, report qc summary on Trailblazer and set analysis status to fail if it fails QC."""
try:
qc_result: MutantQualityResult = self.get_qc_result(case=case)
except Exception as exception:
error_message: str = f"Could not perform QC on case {case.internal_id}: {exception}"
LOG.error(error_message)
if not dry_run:
self.trailblazer_api.add_comment(
case_id=case.internal_id, comment="ERROR: Could not perform QC on case"
)
self.trailblazer_api.set_analysis_status(
case_id=case.internal_id, status=AnalysisStatus.ERROR
)
raise CgError(error_message)

if not dry_run:
self.report_qc_on_trailblazer(case=case, qc_result=qc_result)
if not qc_result.passes_qc:
self.trailblazer_api.set_analysis_status(
case_id=case.internal_id, status=AnalysisStatus.FAILED
)

def get_qc_result(self, case: Case) -> MutantQualityResult:
case_results_file_path: Path = self.get_case_results_file_path(case=case)
case_qc_report_path: Path = self.get_case_qc_report_path(case_id=case.internal_id)
qc_result: MutantQualityResult = self.quality_checker.get_quality_control_result(
case=case,
case_results_file_path=case_results_file_path,
case_qc_report_path=case_qc_report_path,
)
return qc_result

def report_qc_on_trailblazer(self, case: Case, qc_result: MutantQualityResult) -> None:
report_file_path: Path = self.get_case_qc_report_path(case_id=case.internal_id)

comment = qc_result.summary + (
f" QC report: {report_file_path}" if not qc_result.passes_qc else ""
)
self.trailblazer_api.add_comment(case_id=case.internal_id, comment=comment)

def run_qc(self, case_id: str, dry_run: bool) -> None:
LOG.info(f"Running QC on case {case_id}.")

case: Case = self.status_db.get_case_by_internal_id(case_id)

self.run_qc_on_case(case=case, dry_run=dry_run)
1 change: 1 addition & 0 deletions cg/meta/workflow/mutant/quality_controller/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from cg.meta.workflow.mutant.quality_controller.quality_controller import MutantQualityController
50 changes: 50 additions & 0 deletions cg/meta/workflow/mutant/quality_controller/metrics_parser_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from pathlib import Path

from pydantic import TypeAdapter
from cg.io.csv import read_csv
from typing import Any

from cg.meta.workflow.mutant.quality_controller.models import ParsedSampleResults
from cg.store.models import Case


def parse_samples_results(case: Case, results_file_path: Path) -> dict[str, ParsedSampleResults]:
"""Takes a case object and a results_file_path and resturns dict[str, SampleResults] with sample.internal_id as keys."""

validated_results_list: list[ParsedSampleResults] = _get_validated_results_list(
results_file_path=results_file_path
)

samples_results: dict[str, ParsedSampleResults] = _get_samples_results(
case=case, results_list=validated_results_list
)

return samples_results


def _get_validated_results_list(results_file_path: Path) -> list[ParsedSampleResults]:
"""Parses the results file and returns a list of validated SampleResults."""
raw_results: list[dict[Any, Any]] = read_csv(file_path=results_file_path, read_to_dict=True)
adapter = TypeAdapter(list[ParsedSampleResults])
return adapter.validate_python(raw_results)


def _get_sample_name_to_id_mapping(case: Case) -> dict[str, str]:
sample_name_to_id_mapping: dict[str, str] = {}
for sample in case.samples:
sample_name_to_id_mapping[sample.name] = sample.internal_id
return sample_name_to_id_mapping


def _get_samples_results(
case: Case, results_list: list[ParsedSampleResults]
) -> dict[str, ParsedSampleResults]:
"""Return the mapping of sample internal ids to SampleResults for a case."""

sample_name_to_id_mapping: dict[str, str] = _get_sample_name_to_id_mapping(case=case)

samples_results: dict[str, ParsedSampleResults] = {}
for result in results_list:
sample_internal_id = sample_name_to_id_mapping[result.sample_name]
samples_results[sample_internal_id] = result
return samples_results
Loading

0 comments on commit 534bf8d

Please sign in to comment.