Skip to content

Commit

Permalink
Add automatic quality control of analyses for microbial samples (#2754)…
Browse files Browse the repository at this point in the history
…(minor)
  • Loading branch information
seallard authored Jan 2, 2024
1 parent ea9e2ae commit 0ae1afe
Show file tree
Hide file tree
Showing 24 changed files with 1,327 additions and 303 deletions.
16 changes: 5 additions & 11 deletions cg/cli/workflow/microsalt/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cg.meta.workflow.analysis import AnalysisAPI
from cg.meta.workflow.microsalt import MicrosaltAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.store.models import Sample
from cg.store.models import Case, Sample

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -218,21 +218,15 @@ def start_available(context: click.Context, dry_run: bool = False):
raise click.Abort


@microsalt.command("qc-microsalt")
@microsalt.command("qc")
@ARGUMENT_UNIQUE_IDENTIFIER
@click.pass_context
def qc_microsalt(context: click.Context, unique_id: str) -> None:
"""Perform QC on a microsalt case."""
analysis_api: MicrosaltAnalysisAPI = context.obj.meta_apis["analysis_api"]
metrics_file_path: Path = analysis_api.get_metrics_file_path(unique_id)
try:
analysis_api.microsalt_qc(
case_id=unique_id,
run_dir_path=analysis_api.get_latest_case_path(case_id=unique_id),
lims_project=analysis_api.get_project(
analysis_api.status_db.get_case_by_internal_id(internal_id=unique_id)
.samples[0]
.internal_id
),
)
LOG.info(f"Performing QC on case {unique_id}")
analysis_api.quality_checker.quality_control(metrics_file_path)
except IndexError:
LOG.error(f"No existing analysis directories found for case {unique_id}.")
7 changes: 6 additions & 1 deletion cg/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,15 @@ class APIMethods(StrEnum):


class MicrosaltQC:
QC_PERCENT_THRESHOLD_MWX: float = 0.1
AVERAGE_COVERAGE_THRESHOLD: int = 10
MWX_THRESHOLD_SAMPLES_PASSING: float = 0.9
COVERAGE_10X_THRESHOLD: float = 0.75
DUPLICATION_RATE_THRESHOLD: float = 0.8
INSERT_SIZE_THRESHOLD: int = 100
MAPPED_RATE_THRESHOLD: float = 0.3
NEGATIVE_CONTROL_READS_THRESHOLD: float = 0.2
TARGET_READS: int = 6000000
TARGET_READS_FAIL_THRESHOLD: float = 0.7


class MicrosaltAppTags(StrEnum):
Expand Down
12 changes: 12 additions & 0 deletions cg/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class LimsDataError(CgError):
"""


class MicrosaltError(CgError):
"""
Error related to Microsalt analysis.
"""


class MissingAnalysisDir(CgError):
"""
Error related to missing analysis.
"""


class OrderError(CgError):
"""
Exception related to orders.
Expand Down
1 change: 1 addition & 0 deletions cg/meta/workflow/microsalt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from cg.meta.workflow.microsalt.microsalt import MicrosaltAnalysisAPI
4 changes: 4 additions & 0 deletions cg/meta/workflow/microsalt/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from cg.constants.constants import FileExtensions


QUALITY_REPORT_FILE_NAME: str = f"QC_done{FileExtensions.JSON}"
2 changes: 2 additions & 0 deletions cg/meta/workflow/microsalt/metrics_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from cg.meta.workflow.microsalt.metrics_parser.metrics_parser import MetricsParser
from cg.meta.workflow.microsalt.metrics_parser.models import QualityMetrics, SampleMetrics
12 changes: 12 additions & 0 deletions cg/meta/workflow/microsalt/metrics_parser/metrics_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pathlib import Path

from cg.io.json import read_json
from cg.meta.workflow.microsalt.metrics_parser.models import QualityMetrics


class MetricsParser:
@staticmethod
def parse(file_path: Path) -> QualityMetrics:
data = read_json(file_path)
formatted_data = {"samples": data}
return QualityMetrics.model_validate(formatted_data)
27 changes: 27 additions & 0 deletions cg/meta/workflow/microsalt/metrics_parser/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Annotated
from pydantic import BaseModel, BeforeValidator


def empty_str_to_none(v: str) -> str | None:
return v or None


class PicardMarkduplicate(BaseModel):
insert_size: Annotated[int | None, BeforeValidator(empty_str_to_none)]
duplication_rate: Annotated[float | None, BeforeValidator(empty_str_to_none)]


class MicrosaltSamtoolsStats(BaseModel):
total_reads: Annotated[int | None, BeforeValidator(empty_str_to_none)]
mapped_rate: Annotated[float | None, BeforeValidator(empty_str_to_none)]
average_coverage: Annotated[float | None, BeforeValidator(empty_str_to_none)]
coverage_10x: Annotated[float | None, BeforeValidator(empty_str_to_none)]


class SampleMetrics(BaseModel):
picard_markduplicate: PicardMarkduplicate
microsalt_samtools_stats: MicrosaltSamtoolsStats


class QualityMetrics(BaseModel):
samples: dict[str, SampleMetrics]
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
""" API to manage Microsalt Analyses
Organism - Fallback based on reference, ‘Other species’ and ‘Comment’. Default to “Unset”.
Priority = Default to empty string. Weird response. Typically “standard” or “research”.
Reference = Defaults to “None”
Method: Outputted as “1273:23”. Defaults to “Not in LIMS”
Date: Returns latest == most recent date. Outputted as DT object “YYYY MM DD”. Defaults to
datetime.min"""
import glob
import logging
import os
Expand All @@ -17,13 +10,14 @@
import click

from cg.constants import EXIT_FAIL, EXIT_SUCCESS, Pipeline, Priority
from cg.constants.constants import MicrosaltAppTags, MicrosaltQC
from cg.exc import CgDataError
from cg.io.json import read_json, write_json
from cg.constants.constants import FileExtensions
from cg.constants.tb import AnalysisStatus
from cg.exc import CgDataError, MissingAnalysisDir
from cg.meta.workflow.analysis import AnalysisAPI
from cg.meta.workflow.fastq import MicrosaltFastqHandler
from cg.meta.workflow.microsalt.quality_controller import QualityController
from cg.meta.workflow.microsalt.quality_controller.models import QualityResult
from cg.models.cg_config import CGConfig
from cg.models.orders.sample_base import ControlEnum
from cg.store.models import Case, Sample
from cg.utils import Process

Expand All @@ -37,6 +31,7 @@ def __init__(self, config: CGConfig, pipeline: Pipeline = Pipeline.MICROSALT):
super().__init__(pipeline, config)
self.root_dir = config.microsalt.root
self.queries_path = config.microsalt.queries_path
self.quality_checker = QualityController(config.status_db)

@property
def use_read_count_threshold(self) -> bool:
Expand All @@ -60,33 +55,6 @@ def process(self) -> Process:
)
return self._process

def get_case_path(self, case_id: str) -> list[Path]:
"""Returns all paths associated with the case or single sample analysis."""
case_obj: Case = self.status_db.get_case_by_internal_id(internal_id=case_id)
lims_project: str = self.get_project(case_obj.links[0].sample.internal_id)
lims_project_dir_path: Path = Path(self.root_dir, "results", lims_project)

case_directories: list[Path] = [
Path(path) for path in glob.glob(f"{lims_project_dir_path}*", recursive=True)
]

return sorted(case_directories, key=os.path.getctime, reverse=True)

def get_latest_case_path(self, case_id: str) -> Path | None:
"""Return latest run dir for a microbial case, if no path found it returns None."""
lims_project: str = self.get_project(
self.status_db.get_case_by_internal_id(internal_id=case_id).links[0].sample.internal_id
)

return next(
(
path
for path in self.get_case_path(case_id=case_id)
if lims_project + "_" in str(path)
),
None,
)

def clean_run_dir(self, case_id: str, yes: bool, case_path: list[Path] | Path) -> int:
"""Remove workflow run directories for a MicroSALT case."""

Expand Down Expand Up @@ -167,10 +135,7 @@ def get_samples(self, case_id: str, sample_id: str | None = None) -> list[Sample
def get_lims_comment(self, sample_id: str) -> str:
"""Returns the comment associated with a sample stored in lims"""
comment: str = self.lims_api.get_sample_comment(sample_id) or ""
if re.match(r"\w{4}\d{2,3}", comment):
return comment

return ""
return comment if re.match(r"\w{4}\d{2,3}", comment) else ""

def get_organism(self, sample_obj: Sample) -> str:
"""Organism
Expand Down Expand Up @@ -282,94 +247,71 @@ def get_case_id_from_case(self, unique_id: str) -> tuple[str, None]:
case_id = case_obj.internal_id
return case_id, None

def microsalt_qc(self, case_id: str, run_dir_path: Path, lims_project: str) -> bool:
"""Check if given microSALT case passes QC check."""
failed_samples: dict = {}
case_qc: dict = read_json(file_path=Path(run_dir_path, f"{lims_project}.json"))
def get_cases_to_store(self) -> list[Case]:
cases_qc_ready: list[Case] = self.get_completed_cases()
cases_to_store: list[Case] = []
LOG.info(f"Found {len(cases_qc_ready)} cases to perform QC on!")

for case in cases_qc_ready:
case_run_dir: Path | None = self.get_case_path(case.internal_id)
LOG.info(f"Checking QC for case {case.internal_id} in {case_run_dir}")

if self.quality_checker.is_qc_required(case_run_dir):
LOG.info(f"QC required for case {case.internal_id}")
metrics_file_path = self.get_metrics_file_path(case.internal_id)

if not metrics_file_path.exists():
continue

result: QualityResult = self.quality_checker.quality_control(metrics_file_path)
self.trailblazer_api.add_comment(case_id=case.internal_id, comment=result.summary)
if result.passes_qc:
cases_to_store.append(case)
else:
self.trailblazer_api.set_analysis_status(
case_id=case.internal_id, status=AnalysisStatus.FAILED
)
else:
cases_to_store.append(case)

for sample_id in case_qc:
sample: Sample = self.status_db.get_sample_by_internal_id(internal_id=sample_id)
sample_check: dict | None = self.qc_sample_check(
sample=sample,
sample_qc=case_qc[sample_id],
)
if sample_check is not None:
failed_samples[sample_id] = sample_check

return self.qc_case_check(
case_id=case_id,
failed_samples=failed_samples,
number_of_samples=len(case_qc),
run_dir_path=run_dir_path,
)
return cases_to_store

def qc_case_check(
self, case_id: str, failed_samples: dict, number_of_samples: int, run_dir_path: Path
) -> bool:
"""Perform the final QC check for a microbial case based on failed samples."""
qc_pass: bool = True

for sample_id in failed_samples:
sample: Sample = self.status_db.get_sample_by_internal_id(internal_id=sample_id)
if sample.control == ControlEnum.negative:
qc_pass = False
if sample.application_version.application.tag == MicrosaltAppTags.MWRNXTR003:
qc_pass = False

# Check if more than 10% of MWX samples failed
if len(failed_samples) / number_of_samples > MicrosaltQC.QC_PERCENT_THRESHOLD_MWX:
qc_pass = False

if not qc_pass:
LOG.warning(
f"Case {case_id} failed QC, see {run_dir_path}/QC_done.json for more information."
)
else:
LOG.info(f"Case {case_id} passed QC.")
def get_completed_cases(self) -> list[Case]:
"""Return cases that are completed in trailblazer."""
return [
case
for case in self.status_db.get_running_cases_in_pipeline(self.pipeline)
if self.trailblazer_api.is_latest_analysis_completed(case.internal_id)
]

self.create_qc_done_file(
run_dir_path=run_dir_path,
failed_samples=failed_samples,
)
return qc_pass

def create_qc_done_file(self, run_dir_path: Path, failed_samples: dict) -> None:
"""Creates a QC_done when a QC check is performed."""
write_json(file_path=run_dir_path.joinpath("QC_done.json"), content=failed_samples)

def qc_sample_check(self, sample: Sample, sample_qc: dict) -> dict | None:
"""Perform a QC on a sample."""
if sample.control == ControlEnum.negative:
reads_pass: bool = self.check_external_negative_control_sample(sample)
if not reads_pass:
LOG.warning(f"Negative control sample {sample.internal_id} failed QC.")
return {"Passed QC Reads": reads_pass}
else:
reads_pass: bool = sample.sequencing_qc
coverage_10x_pass: bool = self.check_coverage_10x(
sample_name=sample.internal_id, sample_qc=sample_qc
)
if not reads_pass or not coverage_10x_pass:
LOG.warning(f"Sample {sample.internal_id} failed QC.")
return {"Passed QC Reads": reads_pass, "Passed Coverage 10X": coverage_10x_pass}

def check_coverage_10x(self, sample_name: str, sample_qc: dict) -> bool:
"""Check if a sample passed the coverage_10x criteria."""
try:
return (
sample_qc["microsalt_samtools_stats"]["coverage_10x"]
>= MicrosaltQC.COVERAGE_10X_THRESHOLD
)
except TypeError as e:
LOG.error(
f"There is no 10X coverage value for sample {sample_name}, setting qc to fail for this sample"
)
LOG.error(f"See error: {e}")
return False

def check_external_negative_control_sample(self, sample: Sample) -> bool:
"""Check if external negative control passed read check"""
return sample.reads < (
sample.application_version.application.target_reads
* MicrosaltQC.NEGATIVE_CONTROL_READS_THRESHOLD
)
def get_metrics_file_path(self, case_id: str) -> Path:
"""Return path to metrics file for a case."""
project_id: str = self.get_project_id(case_id)
case_run_dir: Path = self.get_case_path(case_id)
return Path(case_run_dir, f"{project_id}{FileExtensions.JSON}")

def extract_project_id(self, sample_id: str) -> str:
return sample_id.rsplit("A", maxsplit=1)[0]

def get_project_id(self, case_id: str) -> str:
case: Case = self.status_db.get_case_by_internal_id(case_id)
sample_id: str = case.links[0].sample.internal_id
return self.extract_project_id(sample_id)

def get_results_dir(self) -> Path:
return Path(self.root_dir, "results")

def get_analyses_result_dirs(self, case_id: str) -> list[str]:
project_id: str = self.get_project_id(case_id)
results_dir: Path = self.get_results_dir()
matches: list[str] = [d for d in os.listdir(results_dir) if d.startswith(project_id)]
if not matches:
LOG.error(f"No result directory found for {case_id} with project id {project_id}")
raise MissingAnalysisDir
return matches

def get_case_path(self, case_id: str) -> Path:
results_dir: Path = self.get_results_dir()
matching_cases: list[str] = self.get_analyses_result_dirs(case_id)
case_dir: str = max(matching_cases, default=None)
return Path(results_dir, case_dir)
1 change: 1 addition & 0 deletions cg/meta/workflow/microsalt/quality_controller/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from cg.meta.workflow.microsalt.quality_controller.quality_controller import QualityController
Loading

0 comments on commit 0ae1afe

Please sign in to comment.