Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2396 get the latest archived flowcell #3751

Merged
merged 26 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6b34f88
Update to get the latest archived sequencing run
RasmusBurge-CG Sep 20, 2024
a6ca173
Update to get the latest archived sequencing run
RasmusBurge-CG Sep 20, 2024
73b1cd5
Working prototype
RasmusBurge-CG Sep 20, 2024
2c8cd16
Functional and working
RasmusBurge-CG Sep 23, 2024
9defde1
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Sep 23, 2024
23ab294
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Sep 23, 2024
95bad81
Improved after feedback
RasmusBurge-CG Oct 15, 2024
234c748
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Oct 15, 2024
d5ce9fe
Code improved after a strong coffee and some deep thought
RasmusBurge-CG Oct 16, 2024
65914eb
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Oct 16, 2024
9a89668
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Oct 17, 2024
4298118
Rethinking time and dates
RasmusBurge-CG Oct 17, 2024
6ea50be
Updates after feedback
RasmusBurge-CG Oct 18, 2024
954dbaa
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Oct 18, 2024
edae9e1
Refactoring code after feedback
RasmusBurge-CG Oct 18, 2024
f40c542
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 24, 2024
b13b8f8
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 25, 2024
0ad2e32
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 25, 2024
4cadc5a
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 25, 2024
f5c538e
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 25, 2024
f3807e8
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 25, 2024
411e19b
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 25, 2024
120db71
Update cg/services/illumina/backup/utils.py
RasmusBurge-CG Oct 25, 2024
2ca28d1
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Oct 25, 2024
f84dbfe
Minor: changed one colon to equal
RasmusBurge-CG Oct 25, 2024
f63831a
Merge branch 'master' into 2396-update-latest-archivedf-lowcell
RasmusBurge-CG Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 58 additions & 22 deletions cg/services/illumina/backup/backup_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@
IlluminaRunEncryptionError,
PdcError,
PdcNoFilesMatchingSearchError,
ValidationError,
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
)
from cg.meta.backup.backup import LOG
from cg.meta.encryption.encryption import EncryptionAPI
from cg.meta.tar.tar import TarAPI
from cg.models.cg_config import PDCArchivingDirectory
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
from cg.services.illumina.backup.encrypt_service import (
IlluminaRunEncryptionService,
from cg.services.illumina.backup.encrypt_service import IlluminaRunEncryptionService
from cg.services.illumina.backup.utils import (
DsmcOutput,
get_latest_dsmc_archived_sequencing_run,
get_latest_dsmc_encryption_key,
contains_dsmc_key,
contains_dsmc_sequencing_path,
)
from cg.services.illumina.file_parsing.models import (
DsmcEncryptionKey,
DsmcSequencingFile,
)
from cg.services.pdc_service.pdc_service import PdcService
from cg.store.models import IlluminaSequencingRun
Expand Down Expand Up @@ -97,7 +107,7 @@ def fetch_sequencing_run(
)

archived_key: Path = self.get_archived_encryption_key_path(dsmc_output=dsmc_output)
archived_run: Path = self.get_archived_sequencing_run_path(dsmc_output=dsmc_output)
archived_run: Path = self.get_latest_archived_sequencing_run_path(dsmc_output=dsmc_output)

if not self.dry_run:
return self._process_run(
Expand Down Expand Up @@ -278,34 +288,60 @@ def retrieve_archived_file(self, archived_file: Path, run_dir: Path) -> None:
file_path=str(archived_file), target_path=str(retrieved_file)
)

@staticmethod
def parse_dsmc_output_sequencing_path(dsmc_output: list[str]) -> list[DsmcSequencingFile]:
"""Parses the DSMC command output to extract validated sequencing paths."""
validated_responses = []
for line in dsmc_output:
if contains_dsmc_sequencing_path(line):
parts = line.split()
try:
query_response = DsmcSequencingFile(
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
date=f"{parts[DsmcOutput.DATE_COLUMN_INDEX]} {parts[DsmcOutput.TIME_COLUMN_INDEX]}",
sequencing_path=parts[DsmcOutput.PATH_COLUMN_INDEX],
)
validated_responses.append(query_response)
except ValidationError as e:
LOG.error(f"Validation error for line: {line}\nError: {e}")
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved

return validated_responses

@classmethod
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
def get_archived_sequencing_run_path(cls, dsmc_output: list[str]) -> Path | None:
def get_latest_archived_sequencing_run_path(cls, dsmc_output: list[str]) -> Path | None:
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
ChrOertlin marked this conversation as resolved.
Show resolved Hide resolved
"""Get the path of the archived sequencing run from a PDC query."""
run_line: str = [
row
for row in dsmc_output
if FileExtensions.TAR in row
and FileExtensions.GZIP in row
and FileExtensions.GPG in row
][0]

archived_run = Path(run_line.split()[4])
validated_sequencing_paths = cls.parse_dsmc_output_sequencing_path(dsmc_output)

archived_run = get_latest_dsmc_archived_sequencing_run(validated_sequencing_paths)

if archived_run:
LOG.info(f"Sequencing run found: {archived_run}")
return archived_run

@staticmethod
def parse_dsmc_output_key_path(dsmc_output: list[str]) -> list[DsmcEncryptionKey]:
"""Parses the DSMC command output to extract validated encryption keys."""
validated_responses = []
for line in dsmc_output:
if contains_dsmc_key(line):
parts = line.split()
try:
query_response = DsmcEncryptionKey(
date=f"{parts[DsmcOutput.DATE_COLUMN_INDEX]} {parts[DsmcOutput.TIME_COLUMN_INDEX]}",
key_path=parts[DsmcOutput.PATH_COLUMN_INDEX],
)
validated_responses.append(query_response)
except ValidationError as e:
LOG.error(f"Validation error for line: {line}\nError: {e}")
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved

return validated_responses

@classmethod
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
def get_archived_encryption_key_path(cls, dsmc_output: list[str]) -> Path | None:
"""Get the encryption key for the archived sequencing run from a PDC query."""
encryption_key_line: str = [
row
for row in dsmc_output
if FileExtensions.KEY in row
and FileExtensions.GPG in row
and FileExtensions.GZIP not in row
][0]

archived_encryption_key = Path(encryption_key_line.split()[4])
validated_encryption_keys = cls.parse_dsmc_output_key_path(dsmc_output)

archived_encryption_key = get_latest_dsmc_encryption_key(validated_encryption_keys)

if archived_encryption_key:
LOG.info(f"Encryption key found: {archived_encryption_key}")
return archived_encryption_key
Expand Down
58 changes: 58 additions & 0 deletions cg/services/illumina/backup/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Helper functions."""
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved

from enum import IntEnum
from pathlib import Path

from cg.constants import FileExtensions
from cg.exc import ValidationError
from cg.services.illumina.file_parsing.models import (
DsmcEncryptionKey,
DsmcSequencingFile,
)
from cg.constants import FileExtensions


class DsmcOutput:
DATE_COLUMN_INDEX = 2
TIME_COLUMN_INDEX = 3
PATH_COLUMN_INDEX = 4


def contains_dsmc_key(line: str) -> bool:
if (
FileExtensions.KEY in line
and FileExtensions.GPG in line
and FileExtensions.GZIP not in line
):
return True
return False
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved


def contains_dsmc_sequencing_path(line: str) -> bool:
if FileExtensions.TAR in line and FileExtensions.GZIP in line and FileExtensions.GPG in line:
return True
return False
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved


def get_latest_dsmc_archived_sequencing_run(dsmc_files: list[DsmcSequencingFile]) -> Path:
"""Return the latest file path based on the date attribute."""
if not dsmc_files:
return None # Return None if the list is empty
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved

# Get the file with the latest date
latest_file = max(dsmc_files, key=lambda file: file.date)

# Return the sequencing_path as a Path object
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
return Path(latest_file.sequencing_path)


def get_latest_dsmc_encryption_key(dsmc_files: list[DsmcEncryptionKey]) -> Path:
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
"""Return the latest file path based on the date attribute."""
if not dsmc_files:
return None # Return None if the list is empty

# Get the file with the latest date
latest_file = max(dsmc_files, key=lambda file: file.date)

# Return the sequencing_path as a Path object
return Path(latest_file.key_path)
41 changes: 40 additions & 1 deletion cg/services/illumina/file_parsing/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from datetime import datetime
from pathlib import Path

from pydantic import BaseModel, Field, field_validator

from cg.constants import SequencingRunDataAvailability
from cg.constants import FileExtensions, SequencingRunDataAvailability
from cg.constants.devices import DeviceType
from cg.constants.metrics import DemuxMetricsColumnNames, QualityMetricsColumnNames
from cg.constants.sequencing import Sequencers
Expand All @@ -28,3 +29,41 @@ class DemuxMetrics(BaseModel):
lane: int = Field(..., alias=DemuxMetricsColumnNames.LANE)
sample_internal_id: str = Field(..., alias=DemuxMetricsColumnNames.SAMPLE_INTERNAL_ID)
read_pair_count: int = Field(..., alias=DemuxMetricsColumnNames.READ_PAIR_COUNT)


class DsmcEncryptionKey(BaseModel):
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
"""Model representing the response from a PDC query."""

date: str
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
key_path: str
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved

@field_validator("date")
def parse_date(cls, value: str) -> datetime:
return datetime.strptime(value, "%m/%d/%Y %H:%M:%S")

@field_validator("key_path")
def validate_sequencing_path(cls, value: str) -> str:
if not value.endswith(f"{FileExtensions.KEY}{FileExtensions.GPG}"):
raise ValueError(f'"{value}" - is not the path to the Encryption key')
if not Path(value):
raise ValueError(f'"{value}" - is not a valid file path.')
return value
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved


class DsmcSequencingFile(BaseModel):
"""Model representing the response from a PDC query."""

date: str
sequencing_path: str

@field_validator("date")
def parse_date(cls, value: str) -> datetime:
return datetime.strptime(value, "%m/%d/%Y %H:%M:%S")

@field_validator("sequencing_path")
def validate_sequencing_path(cls, value: str) -> str:
if not value.endswith(f"{FileExtensions.TAR}{FileExtensions.GZIP}{FileExtensions.GPG}"):
raise ValueError(f'"{value}" - is not the path to the archived sequencing file')
if not Path(value):
raise ValueError(f'"{value}" - is not a valid file path.')
return value
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 14 additions & 13 deletions tests/fixture_plugins/backup_fixtures/backup_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import pytest

from cg.constants import FileExtensions
from cg.services.illumina.backup.backup_service import IlluminaBackupService
from cg.services.pdc_service.pdc_service import PdcService
from cg.meta.encryption.encryption import EncryptionAPI
from cg.meta.tar.tar import TarAPI
from cg.models.cg_config import PDCArchivingDirectory
from cg.models.cg_config import CGConfig
from cg.models.cg_config import CGConfig, PDCArchivingDirectory
from cg.services.illumina.backup.backup_service import IlluminaBackupService
from cg.services.pdc_service.pdc_service import PdcService
from cg.store.store import Store


Expand All @@ -27,22 +26,24 @@ def mock_method(search_pattern: str) -> list[str] | None:

@pytest.fixture
def dsmc_q_archive_output() -> list[str]:
output: str = """IBM Tivoli Storage Manager
output: str = """IBM Spectrum Protect
Command Line Backup-Archive Client Interface
Client Version 7, Release 1, Level 4.0
Client date/time: 09/22/2023 10:42:42
(c) Copyright by IBM Corporation and other(s) 1990, 2015. All Rights Reserved.
Client Version 8, Release 1, Level 11.0
Client date/time: 09/16/2024 09:11:47
(c) Copyright by IBM Corporation and other(s) 1990, 2020. All Rights Reserved.

Node Name: HASTA.SCILIFELAB.SE_CLINICAL
Node Name: INFINITE.IMPROBABILITY.DRIVE
Session established with server BLACKHOLE: Linux/x86_64
Server Version 8, Release 1, Level 9.300
Server date/time: 09/22/2023 10:42:42 Last access: 09/22/2023 10:42:26
Server date/time: 09/16/2024 09:11:48 Last access: 09/16/2024 09:11:19

Accessing as node: SLLCLINICAL
Accessing as node: ArthurDent
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved
Size Archive Date - Time File - Expires on - Description
---- ------------------- -------------------------------
607 B 04/07/2019 07:30:15 /home/hiseq.clinical/ENCRYPT/190329_A00689_0018_AHVKJCDRXX.key.gpg Never Archive Date: 04/07/2019
1,244,997,334 KB 04/07/2019 04:00:05 /home/hiseq.clinical/ENCRYPT/190329_A00689_0018_AHVKJCDRXX.tar.gz.gpg Never Archive Date: 04/07/2019"""
607 B 12/03/2016 11:51:24 /home/hiseq.clinical/ENCRYPT/161115_ST-E00214_0117_BHVKJCDRXX.key.gpg Never Archive Date: 12/03/2016
323,235,178 B 12/03/2016 11:51:18 /home/hiseq.clinical/ENCRYPT/161115_ST-E00214_0117_BHVKJCDRXX.tar.gz.gpg Never Archive Date: 12/03/2016
607 B 12/03/2016 20:18:46 /home/hiseq.clinical/ENCRYPT/161115_ST-E00214_0118_BHVKJCDRXX.key.gpg Never Archive Date: 12/03/2016
647,336,368 KB 12/03/2016 18:30:04 /home/hiseq.clinical/ENCRYPT/161115_ST-E00214_0118_BHVKJCDRXX.tar.gz.gpg Never Archive Date: 12/03/2016"""
return output.splitlines()


Expand Down
36 changes: 18 additions & 18 deletions tests/services/illumina/backup/test_backup_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
)
from cg.models.cg_config import CGConfig, PDCArchivingDirectory
from cg.services.illumina.backup.backup_service import IlluminaBackupService
from cg.services.illumina.backup.encrypt_service import (
IlluminaRunEncryptionService,
)
from cg.services.illumina.backup.encrypt_service import IlluminaRunEncryptionService
from cg.services.pdc_service.pdc_service import PdcService
from cg.store.models import IlluminaSequencingRun
from cg.store.store import Store
Expand Down Expand Up @@ -60,7 +58,7 @@ def test_query_pdc_for_run(
)


def test_get_archived_encryption_key_path(dsmc_q_archive_output: list[str], flow_cell_name: str):
def test_get_latest_cryption_key_path(dsmc_q_archive_output: list[str], flow_cell_name: str):
"""Tests returning an encryption key path from DSMC output."""
# GIVEN an DSMC output and a flow cell id

Expand All @@ -80,14 +78,14 @@ def test_get_archived_encryption_key_path(dsmc_q_archive_output: list[str], flow
# THEN this method should return a path object
assert isinstance(key_path, Path)

# THEN return the key file name
# THEN the latest key file name should be returned
assert (
key_path.name
== f"190329_A00689_0018_A{flow_cell_name}{FileExtensions.KEY}{FileExtensions.GPG}"
)
== f"161115_ST-E00214_0118_B{flow_cell_name}{FileExtensions.KEY}{FileExtensions.GPG}"
), f'TEST FAILED: "{key_path.name}" is not the latest cryption key path'
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved


def test_get_archived_run_path(dsmc_q_archive_output: list[str], flow_cell_name: str):
def test_get_latest_archived_run_path(dsmc_q_archive_output: list[str], flow_cell_name: str):
"""Tests returning an Illumina run path from DSMC output."""
# GIVEN an DSMC output and a flow cell id

Expand All @@ -102,16 +100,18 @@ def test_get_archived_run_path(dsmc_q_archive_output: list[str], flow_cell_name:
)

# WHEN getting the run path
runs_path: Path = backup_api.get_archived_sequencing_run_path(dsmc_output=dsmc_q_archive_output)
runs_path: Path = backup_api.get_latest_archived_sequencing_run_path(
dsmc_output=dsmc_q_archive_output
)

# THEN this method should return a path object
assert isinstance(runs_path, Path)

# THEN return the Illumina run file name
# THEN the latest Illumina run file name should be returned
assert (
runs_path.name
== f"190329_A00689_0018_A{flow_cell_name}{FileExtensions.TAR}{FileExtensions.GZIP}{FileExtensions.GPG}"
)
== f"161115_ST-E00214_0118_B{flow_cell_name}{FileExtensions.TAR}{FileExtensions.GZIP}{FileExtensions.GPG}"
), f'TEST FAILED: "{runs_path.name}" is not the latest archived run path'
RasmusBurge-CG marked this conversation as resolved.
Show resolved Hide resolved


def test_maximum_processing_queue_full(store_with_illumina_sequencing_data: Store):
Expand Down Expand Up @@ -280,7 +280,7 @@ def test_fetch_sequencing_run_no_runs_requested(
@mock.patch("cg.services.illumina.backup.backup_service.IlluminaBackupService.create_rta_complete")
@mock.patch("cg.services.illumina.backup.backup_service.IlluminaBackupService.create_copy_complete")
@mock.patch(
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_archived_sequencing_run_path"
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_latest_archived_sequencing_run_path"
)
@mock.patch(
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_archived_encryption_key_path"
Expand All @@ -290,7 +290,7 @@ def test_fetch_sequencing_run_no_runs_requested(
)
def test_fetch_sequencing_run_retrieve_next_run(
mock_get_archived_encryption_key_path,
mock_get_archived_sequencing_run_path,
mock_get_latest_archived_sequencing_run_path,
mock_create_copy_complete,
mock_create_rta_complete,
archived_key,
Expand Down Expand Up @@ -323,7 +323,7 @@ def test_fetch_sequencing_run_retrieve_next_run(
sequencing_run.data_availability = SequencingRunDataAvailability.REQUESTED
backup_api.has_processing_queue_capacity.return_value = True
backup_api.get_archived_encryption_key_path.return_value = archived_key
backup_api.get_archived_sequencing_run_path.return_value = archived_illumina_run
backup_api.get_latest_archived_sequencing_run_path.return_value = archived_illumina_run
backup_api.tar_api.run_tar_command.return_value = None
result = backup_api.fetch_sequencing_run(sequencing_run=None)

Expand All @@ -341,7 +341,7 @@ def test_fetch_sequencing_run_retrieve_next_run(
@mock.patch("cg.services.illumina.backup.backup_service.IlluminaBackupService.create_rta_complete")
@mock.patch("cg.services.illumina.backup.backup_service.IlluminaBackupService.create_copy_complete")
@mock.patch(
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_archived_sequencing_run_path"
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_latest_archived_sequencing_run_path"
)
@mock.patch(
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_archived_encryption_key_path"
Expand Down Expand Up @@ -386,7 +386,7 @@ def test_fetch_sequencing_run_retrieve_specified_run(
sequencing_run.data_availability = SequencingRunDataAvailability.REQUESTED
backup_api.has_processing_queue_capacity.return_value = True
backup_api.get_archived_encryption_key_path.return_value = archived_key
backup_api.get_archived_sequencing_run_path.return_value = archived_illumina_run
backup_api.get_latest_archived_sequencing_run_path.return_value = archived_illumina_run
backup_api.tar_api.run_tar_command.return_value = None
result = backup_api.fetch_sequencing_run(sequencing_run=sequencing_run)

Expand All @@ -413,7 +413,7 @@ def test_fetch_sequencing_run_retrieve_specified_run(
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_archived_encryption_key_path"
)
@mock.patch(
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_archived_sequencing_run_path"
"cg.services.illumina.backup.backup_service.IlluminaBackupService.get_latest_archived_sequencing_run_path"
)
def test_fetch_sequencing_run_integration(
mock_sequencing_run_path,
Expand Down
Loading