Skip to content

Commit

Permalink
new post process all (#3845)(minor)
Browse files Browse the repository at this point in the history
## Description
Closes Clinical-Genomics/add-new-tech#118
Create a command `post-process all` that will post-process all instrument runs regardless of the instrument. Currently only works for Pacbio. The instrument can be added as a flag so that only runs of a given instrument are post-processed.

### Added

- Cli command function `post_process_all_runs` in `cg/cli/post_process/post_process.py`.
- Utils functions and a model to get and filter the runs to post-process in `cg/cli/post_process/utils.py`
- Tests and fixtures for these utils functions
- Abstract service family ` RunNamesService` with the child `PacbioRunNamesService` in charge of generating sequencing run names
- Tests and fixtures for this new service
- New property `run_names_services` in the `CGConfig` class to access the run names service from the config
- New check function `is_run_processed` defined abstractly in the post-processing class and in implemented in the Pacbio child class that checks if a run has been post-processed
  • Loading branch information
diitaz93 authored Oct 17, 2024
1 parent f80c621 commit f8a6ac8
Show file tree
Hide file tree
Showing 25 changed files with 422 additions and 75 deletions.
38 changes: 35 additions & 3 deletions cg/cli/post_process/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

import click

from cg.cli.post_process.utils import get_post_processing_service_from_run_name
from cg.cli.post_process.utils import (
UnprocessedRunInfo,
get_post_processing_service_from_run_name,
get_unprocessed_runs_info,
)
from cg.cli.utils import CLICK_CONTEXT_SETTINGS
from cg.constants.cli_options import DRY_RUN
from cg.models.cg_config import CGConfig
Expand All @@ -23,7 +27,7 @@ def post_process_group():
@DRY_RUN
@click.argument("run-name")
@click.pass_obj
def post_process_sequencing_run(context: CGConfig, run_name: str, dry_run: bool):
def post_process_run(context: CGConfig, run_name: str, dry_run: bool) -> None:
"""Post-process a sequencing run from the PacBio instrument.
run-name is the full name of the sequencing unit of run. For example:
Expand All @@ -35,4 +39,32 @@ def post_process_sequencing_run(context: CGConfig, run_name: str, dry_run: bool)
post_processing_service.post_process(run_name=run_name, dry_run=dry_run)


post_process_group.add_command(post_process_sequencing_run)
@post_process_group.command(name="all")
@DRY_RUN
@click.option(
"--instrument",
type=click.Choice(["pacbio", "all"]),
default="all",
show_default=True,
help="The instrument for which the runs will be post-processed. Choose 'all' to post-process runs from all instruments.",
required=False,
)
@click.pass_obj
def post_process_all_runs(context: CGConfig, instrument: str, dry_run: bool) -> None:
"""Post-process all available runs."""
exit_success: bool = True
unprocessed_runs: list[UnprocessedRunInfo] = get_unprocessed_runs_info(
context=context, instrument=instrument
)
for run in unprocessed_runs:
try:
run.post_processing_service.post_process(run_name=run.name, dry_run=dry_run)
except Exception as error:
LOG.error(f"Could not post-process {run.instrument} run {run.name}: {error}")
exit_success = False
if not exit_success:
raise click.Abort


post_process_group.add_command(post_process_run)
post_process_group.add_command(post_process_all_runs)
58 changes: 58 additions & 0 deletions cg/cli/post_process/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
"""Utility functions for the post-process command."""

import logging

from pydantic import BaseModel, ConfigDict

from cg.exc import CgError
from cg.models.cg_config import CGConfig
from cg.services.run_devices.abstract_classes import PostProcessingService
from cg.services.run_devices.pacbio.post_processing_service import PacBioPostProcessingService
from cg.services.run_devices.run_names.service import RunNamesService
from cg.utils.mapping import get_item_by_pattern_in_source

LOG = logging.getLogger(__name__)

PATTERN_TO_DEVICE_MAP: dict[str, str] = {
r"^r\d+_\d+_\d+/(1|2)_[^/]+$": "pacbio",
}


class UnprocessedRunInfo(BaseModel):
name: str
post_processing_service: PostProcessingService
instrument: str
model_config = ConfigDict(arbitrary_types_allowed=True)


def get_post_processing_service_from_run_name(
context: CGConfig, run_name: str
) -> PacBioPostProcessingService:
Expand All @@ -21,3 +38,44 @@ def get_post_processing_service_from_run_name(
f"Run name {run_name} does not match with any known sequencing run name pattern"
) from error
return getattr(context.post_processing_services, device)


def get_unprocessed_runs_info(context: CGConfig, instrument: str) -> list[UnprocessedRunInfo]:
"""Return a list of unprocessed runs for a given instrument or for all instruments."""
runs: list[UnprocessedRunInfo] = []
instruments_to_check: list[str] = _instruments_to_check(instrument)
for instrument_name in instruments_to_check:
run_names_service: RunNamesService = getattr(context.run_names_services, instrument_name)
runs.extend(
_get_unprocessed_runs_from_run_names(
run_names=run_names_service.get_run_names(),
post_processing_service=getattr(context.post_processing_services, instrument_name),
instrument_name=instrument_name,
)
)
return runs


def _instruments_to_check(instrument: str) -> list[str]:
"""Return a list of instruments to check for unprocessed runs."""
possible_instruments: list[str] = ["pacbio"] # Add more instruments here
return [instrument] if instrument != "all" else possible_instruments


def _get_unprocessed_runs_from_run_names(
run_names: list[str], post_processing_service: PostProcessingService, instrument_name
) -> list[UnprocessedRunInfo]:
LOG.debug(f"Adding {instrument_name} run names to the post-processing list")
runs: list[UnprocessedRunInfo] = []
for name in run_names:
if post_processing_service.is_run_processed(name):
LOG.debug(f"Run {name} has already been post-processed. Skipping")
continue
runs.append(
UnprocessedRunInfo(
name=name,
post_processing_service=post_processing_service,
instrument=instrument_name,
)
)
return runs
40 changes: 24 additions & 16 deletions cg/models/cg_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pathlib import Path
from typing import Any

from pydantic import BaseModel, EmailStr, Field, ConfigDict
from pydantic import BaseModel, ConfigDict, EmailStr, Field
from typing_extensions import Literal

from cg.apps.coverage import ChanjoAPI
Expand Down Expand Up @@ -47,21 +47,14 @@
from cg.services.run_devices.pacbio.housekeeper_service.pacbio_houskeeper_service import (
PacBioHousekeeperService,
)
from cg.services.run_devices.pacbio.metrics_parser.metrics_parser import (
PacBioMetricsParser,
)
from cg.services.run_devices.pacbio.post_processing_service import (
PacBioPostProcessingService,
)
from cg.services.run_devices.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.run_devices.pacbio.post_processing_service import PacBioPostProcessingService
from cg.services.run_devices.pacbio.run_data_generator.pacbio_run_data_generator import (
PacBioRunDataGenerator,
)
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import (
PacBioRunFileManager,
)
from cg.services.run_devices.pacbio.run_validator.pacbio_run_validator import (
PacBioRunValidator,
)
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import PacBioRunFileManager
from cg.services.run_devices.pacbio.run_validator.pacbio_run_validator import PacBioRunValidator
from cg.services.run_devices.run_names.pacbio import PacbioRunNamesService
from cg.services.sequencing_qc_service.sequencing_qc_service import SequencingQCService
from cg.services.slurm_service.slurm_cli_service import SlurmCLIService
from cg.services.slurm_service.slurm_service import SlurmService
Expand Down Expand Up @@ -360,11 +353,14 @@ class RunInstruments(BaseModel):
illumina: IlluminaConfig


class RunNamesServices(BaseModel):
pacbio: PacbioRunNamesService
model_config = ConfigDict(arbitrary_types_allowed=True)


class PostProcessingServices(BaseModel):
pacbio: PacBioPostProcessingService

class Config:
arbitrary_types_allowed = True
model_config = ConfigDict(arbitrary_types_allowed=True)


class CGConfig(BaseModel):
Expand Down Expand Up @@ -427,6 +423,7 @@ class CGConfig(BaseModel):
pdc_service_: PdcService | None = None
post_processing_services_: PostProcessingServices | None = None
pigz: CommonAppConfig | None = None
run_names_services_: RunNamesServices | None = None
sample_sheet_api_: IlluminaSampleSheetService | None = None
scout: CommonAppConfig = None
scout_api_: ScoutAPI = None
Expand Down Expand Up @@ -633,6 +630,17 @@ def pdc_service(self) -> PdcService:
self.pdc_service_ = service
return service

@property
def run_names_services(self) -> RunNamesServices:
services = self.run_names_services_
if services is None:
LOG.debug("Instantiating run directory names services")
services = RunNamesServices(
pacbio=PacbioRunNamesService(self.run_instruments.pacbio.data_dir)
)
self.run_names_services_ = services
return services

@property
def sample_sheet_api(self) -> IlluminaSampleSheetService:
sample_sheet_api = self.__dict__.get("sample_sheet_api_")
Expand Down
14 changes: 12 additions & 2 deletions cg/services/run_devices/abstract_classes.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Post-processing service abstract classes."""

import logging
from abc import ABC, abstractmethod
from pathlib import Path

from cg.services.run_devices.abstract_models import PostProcessingDTOs, RunData, RunMetrics
from cg.services.run_devices.constants import POST_PROCESSING_COMPLETED

LOG = logging.getLogger(__name__)


class RunDataGenerator(ABC):
"""Abstract class that holds functionality to create a run data object."""
Expand Down Expand Up @@ -83,8 +86,15 @@ def post_process(self, run_name: str, dry_run: bool = False):
"""Store sequencing metrics in StatusDB and relevant files in Housekeeper."""
pass

@abstractmethod
def is_run_processed(self, run_name: str) -> bool:
"""Check if a run has been post-processed."""
pass

@staticmethod
def _touch_post_processing_complete(run_data: RunData) -> None:
def _touch_post_processing_complete(run_data: RunData, dry_run: bool = False) -> None:
"""Touch the post-processing complete file."""
processing_complete_file = Path(run_data.full_path, POST_PROCESSING_COMPLETED)
processing_complete_file.touch()
if not dry_run:
processing_complete_file.touch()
LOG.debug(f"Post-processing complete for {run_data.full_path}")
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Module for the Pacbio database store service."""

import logging
from datetime import datetime

Expand Down Expand Up @@ -85,4 +87,5 @@ def store_post_processing_data(self, run_data: PacBioRunData, dry_run: bool = Fa
f"Dry run, no entries will be added to database for SMRT cell {run_data.full_path}."
)
return
LOG.debug(f"Data stored in statusDB for run {run_data.sequencing_run_name}")
self.store.commit_to_store()
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""Module for the Pacbio data transfer object creation service."""

import logging

from pydantic import ValidationError

from cg.services.run_devices.abstract_classes import PostProcessingDataTransferService
Expand All @@ -17,12 +21,12 @@
get_sequencing_run_dto,
get_smrt_cell_dto,
)
from cg.services.run_devices.pacbio.metrics_parser.metrics_parser import (
PacBioMetricsParser,
)
from cg.services.run_devices.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.run_devices.pacbio.metrics_parser.models import PacBioMetrics
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData

LOG = logging.getLogger(__name__)


class PacBioDataTransferService(PostProcessingDataTransferService):
def __init__(self, metrics_service: PacBioMetricsParser):
Expand All @@ -41,6 +45,7 @@ def get_post_processing_dtos(self, run_data: PacBioRunData) -> PacBioDTOs:
sample_sequencing_metrics_dtos: list[PacBioSampleSequencingMetricsDTO] = (
get_sample_sequencing_metrics_dtos(metrics.samples)
)
LOG.debug("DTOs created")
return PacBioDTOs(
run_device=smrt_cell_dto,
sequencing_run=sequencing_run_dto,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
PostProcessingStoreFileError,
)
from cg.services.run_devices.pacbio.housekeeper_service.models import PacBioFileData
from cg.services.run_devices.pacbio.metrics_parser.metrics_parser import (
PacBioMetricsParser,
)
from cg.services.run_devices.pacbio.metrics_parser.metrics_parser import PacBioMetricsParser
from cg.services.run_devices.pacbio.metrics_parser.models import PacBioMetrics
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import (
PacBioRunFileManager,
)
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import PacBioRunFileManager
from cg.utils.mapping import get_item_by_pattern_in_source

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,6 +60,7 @@ def store_files_in_housekeeper(self, run_data: PacBioRunData, dry_run: bool = Fa
file_path=bundle_info.file_path,
tags=bundle_info.tags,
)
LOG.debug(f"Files stored in Housekeeper for run {run_data.sequencing_run_name}")

@staticmethod
def _get_bundle_type_for_file(file_path: Path) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Module for teh Pacbio sequenicng metrics parsing service."""

import logging
from pathlib import Path

from pydantic import ValidationError
Expand Down Expand Up @@ -26,6 +29,8 @@
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import PacBioRunFileManager

LOG = logging.getLogger(__name__)


class PacBioMetricsParser(PostProcessingMetricsParser):
"""Class for parsing PacBio sequencing metrics."""
Expand Down Expand Up @@ -59,7 +64,7 @@ def parse_metrics(self, run_data: PacBioRunData) -> PacBioMetrics:
metrics_files=metrics_files, file_name=PacBioDirsAndFiles.BARCODES_REPORT
)
sample_metrics: list[SampleMetrics] = get_parsed_sample_metrics(metrics_files)

LOG.debug(f"All metrics parsed for run {run_data.sequencing_run_name}")
return PacBioMetrics(
read=read_metrics,
control=control_metrics,
Expand Down
11 changes: 9 additions & 2 deletions cg/services/run_devices/pacbio/post_processing_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from pathlib import Path

from cg.services.run_devices.abstract_classes import PostProcessingService
from cg.services.run_devices.constants import POST_PROCESSING_COMPLETED
from cg.services.run_devices.error_handler import handle_post_processing_errors
from cg.services.run_devices.exc import (
PostProcessingError,
Expand Down Expand Up @@ -49,11 +51,16 @@ def __init__(
to_raise=PostProcessingError,
)
def post_process(self, run_name: str, dry_run: bool = False) -> None:
LOG.info(f"Starting PacBio post-processing for run: {run_name}")
LOG.info(f"Starting Pacbio post-processing for run: {run_name}")
run_data: PacBioRunData = self.run_data_generator.get_run_data(
run_name=run_name, sequencing_dir=self.sequencing_dir
)
self.run_validator.ensure_post_processing_can_start(run_data)
self.store_service.store_post_processing_data(run_data=run_data, dry_run=dry_run)
self.hk_service.store_files_in_housekeeper(run_data=run_data, dry_run=dry_run)
self._touch_post_processing_complete(run_data)
self._touch_post_processing_complete(run_data=run_data, dry_run=dry_run)

def is_run_processed(self, run_name: str) -> bool:
"""Check if a run has been post-processed."""
processing_complete_file = Path(self.sequencing_dir, run_name, POST_PROCESSING_COMPLETED)
return processing_complete_file.exists()
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
import logging
from pathlib import Path

from cg.constants.constants import FileFormat
from cg.constants.pacbio import PacBioDirsAndFiles
from cg.services.decompression_service.decompressor import Decompressor
Expand Down Expand Up @@ -49,18 +50,17 @@ def ensure_post_processing_can_start(self, run_data: PacBioRunData) -> None:
source_dir=run_data.full_path,
manifest_file_format=FileFormat.TXT,
)

self.decompressor.decompress(
source_path=paths_information.decompression_target,
destination_path=paths_information.decompression_destination,
)
self._touch_is_validated(run_data.full_path)
LOG.debug(f"Run for {run_data.full_path} is validated.")

@staticmethod
def _touch_is_validated(run_path: Path):
Path(run_path, PacBioDirsAndFiles.RUN_IS_VALID).touch()

@staticmethod
def _is_validated(run_path: Path) -> bool:
return Path(run_path, PacBioDirsAndFiles.RUN_IS_VALID).exists()

@staticmethod
def _touch_is_validated(run_path: Path) -> None:
Path(run_path, PacBioDirsAndFiles.RUN_IS_VALID).touch()
Empty file.
Loading

0 comments on commit f8a6ac8

Please sign in to comment.