Skip to content

Commit a641fce

Browse files
author
ChristianOertlin
authored
add(pacbio run validator) (#3738) (patch)
# Description add run validator for pacbio
1 parent a58c2ad commit a641fce

File tree

15 files changed

+306
-31
lines changed

15 files changed

+306
-31
lines changed

cg/constants/pacbio.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class PacBioDirsAndFiles:
1212
SMRTLINK_DATASETS_REPORT: str = "smrtlink-datasets.json"
1313
STATISTICS_DIR: str = "statistics"
1414
UNZIPPED_REPORTS_DIR: str = "unzipped_reports"
15+
METADATA_DIR: str = "metadata"
16+
RUN_IS_VALID: str = "is_valid"
1517

1618

1719
class CCSAttributeIDs:
@@ -80,3 +82,6 @@ class PacBioBundleTypes:
8082
PacBioDirsAndFiles.SMRTLINK_DATASETS_REPORT: PacBioBundleTypes.SMRT_CELL,
8183
f"{PacBioDirsAndFiles.HIFI_READS}{FileExtensions.BAM}$": PacBioBundleTypes.SAMPLE,
8284
}
85+
86+
ZIPPED_REPORTS_PATTERN: str = "*reports.zip"
87+
MANIFEST_FILE_PATTERN: str = "*.transferdone"

cg/models/cg_config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from cg.constants.priority import SlurmQos
2727
from cg.meta.delivery.delivery import DeliveryAPI
2828
from cg.services.analysis_service.analysis_service import AnalysisService
29+
from cg.services.decompression_service.decompressor import Decompressor
2930
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
3031
FastqConcatenationService,
3132
)
@@ -54,11 +55,15 @@
5455
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import (
5556
PacBioRunFileManager,
5657
)
58+
from cg.services.run_devices.pacbio.run_validator.pacbio_run_validator import PacBioRunValidator
5759
from cg.services.sequencing_qc_service.sequencing_qc_service import SequencingQCService
5860
from cg.services.slurm_service.slurm_cli_service import SlurmCLIService
5961
from cg.services.slurm_service.slurm_service import SlurmService
6062
from cg.services.slurm_upload_service.slurm_upload_config import SlurmUploadConfig
6163
from cg.services.slurm_upload_service.slurm_upload_service import SlurmUploadService
64+
from cg.services.validate_file_transfer_service.validate_file_transfer_service import (
65+
ValidateFileTransferService,
66+
)
6267
from cg.store.database import initialize_database
6368
from cg.store.store import Store
6469

@@ -612,6 +617,11 @@ def get_pacbio_post_processing_service(self) -> PacBioPostProcessingService:
612617
LOG.debug("Instantiating PacBio post-processing service")
613618
run_data_generator = PacBioRunDataGenerator()
614619
file_manager = PacBioRunFileManager()
620+
run_validator = PacBioRunValidator(
621+
file_manager=file_manager,
622+
decompressor=Decompressor(),
623+
file_transfer_validator=ValidateFileTransferService(),
624+
)
615625
metrics_parser = PacBioMetricsParser(file_manager=file_manager)
616626
transfer_service = PacBioDataTransferService(metrics_service=metrics_parser)
617627
store_service = PacBioStoreService(
@@ -623,6 +633,7 @@ def get_pacbio_post_processing_service(self) -> PacBioPostProcessingService:
623633
metrics_parser=metrics_parser,
624634
)
625635
return PacBioPostProcessingService(
636+
run_validator=run_validator,
626637
run_data_generator=run_data_generator,
627638
hk_service=hk_service,
628639
store_service=store_service,

cg/services/decompression_service/decompressor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ class Decompressor:
66

77
def decompress(self, source_path: Path, destination_path: Path) -> Path:
88
"""Decompress a file to a directory."""
9+
if not source_path.exists():
10+
raise FileNotFoundError(f"Source path {source_path} does not exist.")
911
with zipfile.ZipFile(source_path, "r") as zip_file:
1012
zip_file.extractall(destination_path)
1113
if not self._decompressed_path_exists(destination_path):

cg/services/run_devices/abstract_classes.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ def get_run_data(self, run_name: str, sequencing_dir: str) -> RunData:
1616
pass
1717

1818

19+
class RunValidator(ABC):
20+
"""Abstract class that holds functionality to validate a run and ensure it can start post processing."""
21+
22+
@abstractmethod
23+
def ensure_post_processing_can_start(self, run_data: RunData):
24+
"""Ensure a post processing run can start."""
25+
pass
26+
27+
1928
class RunFileManager(ABC):
2029
"""Abstract class that manages files related to an instrument run."""
2130

@@ -79,12 +88,3 @@ def _touch_post_processing_complete(run_data: RunData) -> None:
7988
"""Touch the post-processing complete file."""
8089
processing_complete_file = Path(run_data.full_path, POST_PROCESSING_COMPLETED)
8190
processing_complete_file.touch()
82-
83-
84-
class FileTransferValidationService(ABC):
85-
"""Abstract class that validates file transfers for instrument runs from NAS to Hasta."""
86-
87-
@abstractmethod
88-
def validate_file_transfer(self, run_data: RunData):
89-
"""Validate an instrument run transfer."""
90-
pass

cg/services/run_devices/pacbio/post_processing_service.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
PacBioRunDataGenerator,
2121
)
2222
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
23+
from cg.services.run_devices.pacbio.run_validator.pacbio_run_validator import PacBioRunValidator
2324

2425
LOG = logging.getLogger(__name__)
2526

@@ -29,11 +30,13 @@ class PacBioPostProcessingService(PostProcessingService):
2930

3031
def __init__(
3132
self,
33+
run_validator: PacBioRunValidator,
3234
run_data_generator: PacBioRunDataGenerator,
3335
hk_service: PacBioHousekeeperService,
3436
store_service: PacBioStoreService,
3537
sequencing_dir: str,
3638
):
39+
self.run_validator: PacBioRunValidator = run_validator
3740
self.run_data_generator: PacBioRunDataGenerator = run_data_generator
3841
self.hk_service: PacBioHousekeeperService = hk_service
3942
self.store_service: PacBioStoreService = store_service
@@ -52,6 +55,7 @@ def post_process(self, run_name: str, dry_run: bool = False) -> None:
5255
run_data: PacBioRunData = self.run_data_generator.get_run_data(
5356
run_name=run_name, sequencing_dir=self.sequencing_dir
5457
)
58+
self.run_validator.ensure_post_processing_can_start(run_data)
5559
self.store_service.store_post_processing_data(run_data=run_data, dry_run=dry_run)
5660
self.hk_service.store_files_in_housekeeper(run_data=run_data, dry_run=dry_run)
57-
self._touch_post_processing_complete(run_data=run_data)
61+
self._touch_post_processing_complete(run_data)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from pathlib import Path
2+
3+
from pydantic import BaseModel
4+
5+
6+
class PacBioRunValidatorFiles(BaseModel):
7+
manifest_file: Path
8+
decompression_target: Path
9+
decompression_destination: Path

cg/services/run_devices/pacbio/run_file_manager/run_file_manager.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from pathlib import Path
22

33
from cg.constants import FileExtensions
4-
from cg.constants.pacbio import PacBioDirsAndFiles
4+
from cg.constants.pacbio import PacBioDirsAndFiles, MANIFEST_FILE_PATTERN, ZIPPED_REPORTS_PATTERN
55
from cg.services.run_devices.abstract_classes import RunFileManager
66
from cg.services.run_devices.error_handler import handle_post_processing_errors
77
from cg.services.run_devices.exc import PostProcessingRunFileManagerError
88
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
9+
from cg.services.run_devices.pacbio.run_file_manager.models import PacBioRunValidatorFiles
910
from cg.services.run_devices.validators import validate_files_or_directories_exist
1011
from cg.utils.files import get_files_matching_pattern
1112

@@ -29,10 +30,22 @@ def get_files_to_store(self, run_data: PacBioRunData) -> list[Path]:
2930
run_path: Path = run_data.full_path
3031
return self.get_files_to_parse(run_data) + self._get_hifi_read_files(run_path)
3132

32-
@staticmethod
33-
def _get_ccs_report_file(run_path: Path) -> Path:
33+
@handle_post_processing_errors(
34+
to_except=(FileNotFoundError,), to_raise=PostProcessingRunFileManagerError
35+
)
36+
def get_run_validation_files(self, run_data: PacBioRunData) -> PacBioRunValidatorFiles:
37+
manifest_file: Path = self._get_manifest_file(run_data.full_path)
38+
decompression_target: Path = self._get_zipped_reports_file(run_data.full_path)
39+
decompression_destination: Path = self._get_unzipped_reports_dir(run_data.full_path)
40+
return PacBioRunValidatorFiles(
41+
manifest_file=manifest_file,
42+
decompression_target=decompression_target,
43+
decompression_destination=decompression_destination,
44+
)
45+
46+
def _get_ccs_report_file(self, run_path: Path) -> Path:
3447
"""Return the path to the CCS report file."""
35-
statistics_dir: Path = Path(run_path, PacBioDirsAndFiles.STATISTICS_DIR)
48+
statistics_dir: Path = self._get_statistics_dir(run_path)
3649
files: list[Path] = get_files_matching_pattern(
3750
directory=statistics_dir, pattern=f"*{PacBioDirsAndFiles.CCS_REPORT_SUFFIX}"
3851
)
@@ -42,9 +55,7 @@ def _get_ccs_report_file(run_path: Path) -> Path:
4255

4356
def _get_report_files(self, run_path: Path) -> list[Path]:
4457
"""Return the paths to the unzipped report files."""
45-
unzipped_dir: Path = Path(
46-
run_path, PacBioDirsAndFiles.STATISTICS_DIR, PacBioDirsAndFiles.UNZIPPED_REPORTS_DIR
47-
)
58+
unzipped_dir: Path = self._get_unzipped_reports_dir(run_path)
4859
report_files: list[Path] = [
4960
Path(unzipped_dir, PacBioDirsAndFiles.CONTROL_REPORT),
5061
Path(unzipped_dir, PacBioDirsAndFiles.LOADING_REPORT),
@@ -64,3 +75,28 @@ def _get_hifi_read_files(run_path: Path) -> list[Path]:
6475
)
6576
validate_files_or_directories_exist(bam_files)
6677
return bam_files
78+
79+
@staticmethod
80+
def _get_unzipped_reports_dir(run_path) -> Path:
81+
return Path(
82+
run_path, PacBioDirsAndFiles.STATISTICS_DIR, PacBioDirsAndFiles.UNZIPPED_REPORTS_DIR
83+
)
84+
85+
@staticmethod
86+
def _get_statistics_dir(run_path) -> Path:
87+
return Path(run_path, PacBioDirsAndFiles.STATISTICS_DIR)
88+
89+
@staticmethod
90+
def _get_manifest_file(run_path) -> Path:
91+
file_list: list[Path] = get_files_matching_pattern(
92+
directory=Path(run_path, PacBioDirsAndFiles.METADATA_DIR), pattern=MANIFEST_FILE_PATTERN
93+
)
94+
if not file_list:
95+
raise FileNotFoundError(f"No Manifest file found in {run_path}")
96+
return file_list[0]
97+
98+
def _get_zipped_reports_file(self, run_path) -> Path:
99+
return get_files_matching_pattern(
100+
directory=self._get_statistics_dir(run_path),
101+
pattern=ZIPPED_REPORTS_PATTERN,
102+
)[0]
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from pathlib import Path
2+
import logging
3+
from cg.constants.constants import FileFormat
4+
from cg.constants.pacbio import PacBioDirsAndFiles
5+
from cg.services.decompression_service.decompressor import Decompressor
6+
from cg.services.run_devices.abstract_classes import RunValidator
7+
from cg.services.run_devices.pacbio.run_data_generator.run_data import PacBioRunData
8+
from cg.services.run_devices.pacbio.run_file_manager.models import PacBioRunValidatorFiles
9+
from cg.services.run_devices.pacbio.run_file_manager.run_file_manager import PacBioRunFileManager
10+
from cg.services.validate_file_transfer_service.validate_file_transfer_service import (
11+
ValidateFileTransferService,
12+
)
13+
14+
LOG = logging.getLogger(__name__)
15+
16+
17+
class PacBioRunValidator(RunValidator):
18+
"""
19+
PacBio run validator.
20+
Ensure that the post-processing of a pacbio run can start.
21+
"""
22+
23+
def __init__(
24+
self,
25+
decompressor: Decompressor,
26+
file_transfer_validator: ValidateFileTransferService,
27+
file_manager: PacBioRunFileManager,
28+
):
29+
self.decompressor = decompressor
30+
self.file_transfer_validator = file_transfer_validator
31+
self.file_manager = file_manager
32+
33+
def ensure_post_processing_can_start(self, run_data: PacBioRunData) -> None:
34+
"""
35+
Ensure that a post-processing run can start.
36+
1. Check if all files are present listed in a manifest file.
37+
2. Decompresses the zipped reports.
38+
3. Touches a file to indicate that the run is validated
39+
4. Skips validation if the run is already validated
40+
"""
41+
if self._is_validated(run_data.full_path):
42+
LOG.debug(f"Run for {run_data.full_path} is validated.")
43+
return
44+
paths_information: PacBioRunValidatorFiles = self.file_manager.get_run_validation_files(
45+
run_data
46+
)
47+
self.file_transfer_validator.validate_file_transfer(
48+
manifest_file=paths_information.manifest_file,
49+
source_dir=run_data.full_path,
50+
manifest_file_format=FileFormat.TXT,
51+
)
52+
53+
self.decompressor.decompress(
54+
source_path=paths_information.decompression_target,
55+
destination_path=paths_information.decompression_destination,
56+
)
57+
self._touch_is_validated(run_data.full_path)
58+
LOG.debug(f"Run for {run_data.full_path} is validated.")
59+
60+
@staticmethod
61+
def _touch_is_validated(run_path: Path):
62+
Path(run_path, PacBioDirsAndFiles.RUN_IS_VALID).touch()
63+
64+
@staticmethod
65+
def _is_validated(run_path: Path) -> bool:
66+
return Path(run_path, PacBioDirsAndFiles.RUN_IS_VALID).exists()

tests/fixture_plugins/pacbio_fixtures/name_fixtures.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ def pac_bio_smrt_cell_name() -> str:
88
return "1_A01"
99

1010

11+
@pytest.fixture
12+
def pac_bio_another_smrt_cell_name() -> str:
13+
return "1_B01"
14+
15+
1116
@pytest.fixture
1217
def pac_bio_test_run_name() -> str:
1318
"""Return the name of a PacBio SMRT cell."""

tests/fixture_plugins/pacbio_fixtures/path_fixtures.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pytest
66

77
from cg.constants.pacbio import PacBioDirsAndFiles
8+
from cg.services.run_devices.pacbio.run_file_manager.models import PacBioRunValidatorFiles
89

910

1011
# Directory fixtures
@@ -38,6 +39,14 @@ def pac_bio_smrt_cell_dir_1_a01(pac_bio_test_run_dir: Path, pac_bio_smrt_cell_na
3839
return Path(pac_bio_test_run_dir, pac_bio_smrt_cell_name)
3940

4041

42+
@pytest.fixture
43+
def pac_bio_smrt_cell_dir_1_b01(
44+
pac_bio_test_run_dir: Path, pac_bio_another_smrt_cell_name: str
45+
) -> Path:
46+
"""Return the path to a PacBio SMRT cell directory."""
47+
return Path(pac_bio_test_run_dir, pac_bio_another_smrt_cell_name)
48+
49+
4150
@pytest.fixture
4251
def pac_bio_hifi_reads_dir(pac_bio_smrt_cell_dir_1_a01: Path) -> Path:
4352
"""Return the path to a PacBio HiFi reads directory."""
@@ -50,12 +59,30 @@ def pac_bio_run_statistics_dir(pac_bio_smrt_cell_dir_1_a01: Path) -> Path:
5059
return Path(pac_bio_smrt_cell_dir_1_a01, PacBioDirsAndFiles.STATISTICS_DIR)
5160

5261

62+
@pytest.fixture
63+
def pac_bio_run_statistics_dir_1_b01(pac_bio_smrt_cell_dir_1_b01: Path) -> Path:
64+
"""Return the path to the PacBio SMRT cell statistics directory."""
65+
return Path(pac_bio_smrt_cell_dir_1_b01, PacBioDirsAndFiles.STATISTICS_DIR)
66+
67+
5368
@pytest.fixture
5469
def pac_bio_run_reports_dir(pac_bio_run_statistics_dir: Path) -> Path:
5570
"""Return the path to the PacBio SMRT cell unzipped_reports directory"""
5671
return Path(pac_bio_run_statistics_dir, PacBioDirsAndFiles.UNZIPPED_REPORTS_DIR)
5772

5873

74+
@pytest.fixture
75+
def pac_bio_run_reports_dir_1_b01(pac_bio_run_statistics_dir_1_b01: Path) -> Path:
76+
"""Return the path to the PacBio SMRT cell unzipped_reports directory"""
77+
return Path(pac_bio_run_statistics_dir_1_b01, PacBioDirsAndFiles.UNZIPPED_REPORTS_DIR)
78+
79+
80+
@pytest.fixture
81+
def pac_bio_run_metadata_dir_1_b01(pac_bio_smrt_cell_dir_1_b01: Path) -> Path:
82+
"""Return the path to the PacBio SMRT cell metadata directory"""
83+
return Path(pac_bio_smrt_cell_dir_1_b01, PacBioDirsAndFiles.METADATA_DIR)
84+
85+
5986
@pytest.fixture
6087
def pac_bio_wrong_metrics_file(pac_bio_wrong_metrics_dir: Path) -> Path:
6188
"""Return the path to a temporary PacBio statistics directory."""
@@ -100,6 +127,18 @@ def pac_bio_smrtlink_datasets_report_file(pac_bio_run_reports_dir: Path) -> Path
100127
return Path(pac_bio_run_reports_dir, PacBioDirsAndFiles.SMRTLINK_DATASETS_REPORT)
101128

102129

130+
@pytest.fixture
131+
def pac_bio_transferdone_file_1_b01(pac_bio_run_metadata_dir_1_b01: Path) -> Path:
132+
"""Return the path to the PacBio SMRTLink datasets report file."""
133+
return Path(pac_bio_run_metadata_dir_1_b01, "m84202_240522_155607_s2.transferdone")
134+
135+
136+
@pytest.fixture
137+
def pac_bio_zipped_reports_file_1_b01(pac_bio_run_statistics_dir_1_b01: Path) -> Path:
138+
"""Return the path to the PacBio SMRTLink datasets report file."""
139+
return Path(pac_bio_run_statistics_dir_1_b01, "m84202_240522_155607_s2.reports.zip")
140+
141+
103142
@pytest.fixture
104143
def pac_bio_report_files_to_parse(
105144
pac_bio_ccs_report_file: Path,
@@ -122,3 +161,17 @@ def pac_bio_report_files_to_parse(
122161
def pac_bio_hifi_read_file(pac_bio_hifi_reads_dir: Path, pac_bio_1_a01_cell_full_name: str) -> Path:
123162
"""Return the PacBio HiFi read file."""
124163
return Path(pac_bio_hifi_reads_dir, f"{pac_bio_1_a01_cell_full_name}.hifi_reads.bam")
164+
165+
166+
@pytest.fixture
167+
def expected_1_b01_run_validation_files(
168+
pac_bio_transferdone_file_1_b01: Path,
169+
pac_bio_zipped_reports_file_1_b01: Path,
170+
pac_bio_run_reports_dir_1_b01: Path,
171+
) -> PacBioRunValidatorFiles:
172+
"""Return the expected run validation files for the 1_B01 SMRT cell."""
173+
return PacBioRunValidatorFiles(
174+
manifest_file=pac_bio_transferdone_file_1_b01,
175+
decompression_target=pac_bio_zipped_reports_file_1_b01,
176+
decompression_destination=pac_bio_run_reports_dir_1_b01,
177+
)

0 commit comments

Comments
 (0)