|
1 | 1 | import subprocess
|
| 2 | +from datetime import datetime |
2 | 3 | from pathlib import Path
|
3 | 4 |
|
4 | 5 | from cg.apps.slurm.slurm_api import SlurmAPI
|
|
11 | 12 | IlluminaRunEncryptionError,
|
12 | 13 | PdcError,
|
13 | 14 | PdcNoFilesMatchingSearchError,
|
14 |
| - ValidationError, |
15 | 15 | )
|
16 | 16 | from cg.meta.backup.backup import LOG
|
17 | 17 | from cg.meta.encryption.encryption import EncryptionAPI
|
18 | 18 | from cg.meta.tar.tar import TarAPI
|
19 | 19 | from cg.models.cg_config import PDCArchivingDirectory
|
20 | 20 | from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
|
21 | 21 | from cg.services.illumina.backup.encrypt_service import IlluminaRunEncryptionService
|
| 22 | +from cg.services.illumina.backup.models import DsmcEncryptionKey, DsmcSequencingFile |
22 | 23 | from cg.services.illumina.backup.utils import (
|
23 | 24 | DsmcOutput,
|
| 25 | + convert_string_to_datetime_object, |
24 | 26 | get_latest_dsmc_archived_sequencing_run,
|
25 | 27 | get_latest_dsmc_encryption_key,
|
26 |
| - contains_dsmc_key, |
27 |
| - contains_dsmc_sequencing_path, |
28 |
| -) |
29 |
| -from cg.services.illumina.file_parsing.models import ( |
30 |
| - DsmcEncryptionKey, |
31 |
| - DsmcSequencingFile, |
| 28 | + is_dsmc_encryption_key, |
| 29 | + is_dsmc_sequencing_path, |
32 | 30 | )
|
33 | 31 | from cg.services.pdc_service.pdc_service import PdcService
|
34 | 32 | from cg.store.models import IlluminaSequencingRun
|
@@ -293,58 +291,71 @@ def parse_dsmc_output_sequencing_path(dsmc_output: list[str]) -> list[DsmcSequen
|
293 | 291 | """Parses the DSMC command output to extract validated sequencing paths."""
|
294 | 292 | validated_responses = []
|
295 | 293 | for line in dsmc_output:
|
296 |
| - if contains_dsmc_sequencing_path(line): |
| 294 | + if is_dsmc_sequencing_path(line): |
297 | 295 | parts = line.split()
|
298 |
| - try: |
299 |
| - query_response = DsmcSequencingFile( |
300 |
| - date=f"{parts[DsmcOutput.DATE_COLUMN_INDEX]} {parts[DsmcOutput.TIME_COLUMN_INDEX]}", |
301 |
| - sequencing_path=parts[DsmcOutput.PATH_COLUMN_INDEX], |
302 |
| - ) |
303 |
| - validated_responses.append(query_response) |
304 |
| - except ValidationError as e: |
305 |
| - LOG.error(f"Validation error for line: {line}\nError: {e}") |
| 296 | + |
| 297 | + fileDateTime: datetime = convert_string_to_datetime_object( |
| 298 | + f"{parts[DsmcOutput.DATE_COLUMN_INDEX]} {parts[DsmcOutput.TIME_COLUMN_INDEX]}" |
| 299 | + ) |
| 300 | + |
| 301 | + query_response = DsmcSequencingFile( |
| 302 | + dateTime=fileDateTime, |
| 303 | + path=Path(parts[DsmcOutput.PATH_COLUMN_INDEX]), |
| 304 | + ) |
| 305 | + validated_responses.append(query_response) |
306 | 306 |
|
307 | 307 | return validated_responses
|
308 | 308 |
|
309 | 309 | @classmethod
|
310 | 310 | def get_latest_archived_sequencing_run_path(cls, dsmc_output: list[str]) -> Path | None:
|
311 | 311 | """Get the path of the archived sequencing run from a PDC query."""
|
312 |
| - validated_sequencing_paths = cls.parse_dsmc_output_sequencing_path(dsmc_output) |
| 312 | + validated_sequencing_paths: list[DsmcSequencingFile] = ( |
| 313 | + cls.parse_dsmc_output_sequencing_path(dsmc_output) |
| 314 | + ) |
313 | 315 |
|
314 |
| - archived_run = get_latest_dsmc_archived_sequencing_run(validated_sequencing_paths) |
| 316 | + archived_run: DsmcSequencingFile = get_latest_dsmc_archived_sequencing_run( |
| 317 | + validated_sequencing_paths |
| 318 | + ) |
315 | 319 |
|
316 | 320 | if archived_run:
|
317 | 321 | LOG.info(f"Sequencing run found: {archived_run}")
|
318 |
| - return archived_run |
| 322 | + return archived_run.path |
319 | 323 |
|
320 | 324 | @staticmethod
|
321 | 325 | def parse_dsmc_output_key_path(dsmc_output: list[str]) -> list[DsmcEncryptionKey]:
|
322 | 326 | """Parses the DSMC command output to extract validated encryption keys."""
|
323 | 327 | validated_responses = []
|
324 | 328 | for line in dsmc_output:
|
325 |
| - if contains_dsmc_key(line): |
326 |
| - parts = line.split() |
327 |
| - try: |
328 |
| - query_response = DsmcEncryptionKey( |
329 |
| - date=f"{parts[DsmcOutput.DATE_COLUMN_INDEX]} {parts[DsmcOutput.TIME_COLUMN_INDEX]}", |
330 |
| - key_path=parts[DsmcOutput.PATH_COLUMN_INDEX], |
331 |
| - ) |
332 |
| - validated_responses.append(query_response) |
333 |
| - except ValidationError as e: |
334 |
| - LOG.error(f"Validation error for line: {line}\nError: {e}") |
| 329 | + if is_dsmc_encryption_key(line): |
| 330 | + parts: list[str] = line.split() |
| 331 | + |
| 332 | + fileDateTime: datetime = convert_string_to_datetime_object( |
| 333 | + f"{parts[DsmcOutput.DATE_COLUMN_INDEX]} {parts[DsmcOutput.TIME_COLUMN_INDEX]}" |
| 334 | + ) |
| 335 | + |
| 336 | + query_response = DsmcEncryptionKey( |
| 337 | + dateTime=fileDateTime, |
| 338 | + path=Path(parts[DsmcOutput.PATH_COLUMN_INDEX]), |
| 339 | + ) |
| 340 | + |
| 341 | + validated_responses.append(query_response) |
335 | 342 |
|
336 | 343 | return validated_responses
|
337 | 344 |
|
338 | 345 | @classmethod
|
339 | 346 | def get_archived_encryption_key_path(cls, dsmc_output: list[str]) -> Path | None:
|
340 | 347 | """Get the encryption key for the archived sequencing run from a PDC query."""
|
341 |
| - validated_encryption_keys = cls.parse_dsmc_output_key_path(dsmc_output) |
| 348 | + validated_encryption_keys: list[DsmcEncryptionKey] = cls.parse_dsmc_output_key_path( |
| 349 | + dsmc_output |
| 350 | + ) |
342 | 351 |
|
343 |
| - archived_encryption_key = get_latest_dsmc_encryption_key(validated_encryption_keys) |
| 352 | + archived_encryption_key: DsmcEncryptionKey = get_latest_dsmc_encryption_key( |
| 353 | + validated_encryption_keys |
| 354 | + ) |
344 | 355 |
|
345 | 356 | if archived_encryption_key:
|
346 | 357 | LOG.info(f"Encryption key found: {archived_encryption_key}")
|
347 |
| - return archived_encryption_key |
| 358 | + return archived_encryption_key.path |
348 | 359 |
|
349 | 360 | def validate_is_run_backup_possible(
|
350 | 361 | self,
|
|
0 commit comments