Skip to content

Commit

Permalink
patch(PDC fetching) Except warnings from the dsmc command (#2638) (pa…
Browse files Browse the repository at this point in the history
…tch)

### Changed

- Warning exit codes from the dsmc command no longer raise errors
  • Loading branch information
Vince-janv authored Nov 1, 2023
1 parent 8389d16 commit f3ea276
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 169 deletions.
2 changes: 1 addition & 1 deletion cg/constants/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
)
from cg.constants.paths import TMP_DIR
from cg.constants.priority import Priority
from cg.constants.process import EXIT_FAIL, EXIT_SUCCESS, RETURN_SUCCESS
from cg.constants.process import EXIT_FAIL, EXIT_SUCCESS
from cg.constants.report import *
from cg.constants.sample_sources import ANALYSIS_SOURCES, METAGENOME_SOURCES
from cg.constants.sequencing import FLOWCELL_Q30_THRESHOLD
Expand Down
7 changes: 0 additions & 7 deletions cg/constants/pdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,3 @@ class DSMCParameters(ListEnum):
ARCHIVE_COMMAND: list = ["archive"]
QUERY_COMMAND: list = ["q", "archive"]
RETRIEVE_COMMAND: list = ["retrieve", "-replace=yes"]


class PDCExitCodes(IntEnum):
"""Exit codes for PDC commands"""

SUCCESS: int = 0
NO_FILES_FOUND: int = 8
3 changes: 1 addition & 2 deletions cg/constants/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Constants for Processes"""

RETURN_SUCCESS = 0
RETURN_WARNING = 8
EXIT_SUCCESS = 0
EXIT_WARNING = 8
EXIT_FAIL = 1
74 changes: 25 additions & 49 deletions cg/meta/backup/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
from cg.constants.constants import FileExtensions, FlowCellStatus
from cg.constants.demultiplexing import DemultiplexingDirsAndFiles
from cg.constants.indexes import ListIndexes
from cg.constants.pdc import PDCExitCodes
from cg.constants.process import RETURN_WARNING
from cg.constants.symbols import NEW_LINE
from cg.exc import ChecksumFailedError, PdcNoFilesMatchingSearchError
from cg.exc import ChecksumFailedError, PdcError, PdcNoFilesMatchingSearchError
from cg.meta.backup.pdc import PdcAPI
from cg.meta.encryption.encryption import EncryptionAPI, SpringEncryptionAPI
from cg.meta.tar.tar import TarAPI
Expand Down Expand Up @@ -86,12 +84,7 @@ def fetch_flow_cell(self, flow_cell: Optional[Flowcell] = None) -> Optional[floa
self.status.session.commit()
LOG.info(f"{flow_cell.name}: retrieving from PDC")

try:
dsmc_output: list[str] = self.query_pdc_for_flow_cell(flow_cell.name)

except PdcNoFilesMatchingSearchError as error:
LOG.error(f"PDC query failed: {error}")
raise error
dsmc_output: list[str] = self.query_pdc_for_flow_cell(flow_cell.name)

archived_key: Path = self.get_archived_encryption_key_path(dsmc_output=dsmc_output)
archived_flow_cell: Path = self.get_archived_flow_cell_path(dsmc_output=dsmc_output)
Expand Down Expand Up @@ -209,18 +202,12 @@ def retrieve_archived_key(self, archived_key: Path, flow_cell: Flowcell, run_dir
archived_file=archived_key,
run_dir=run_dir,
)
except subprocess.CalledProcessError as error:
if error.returncode == RETURN_WARNING:
LOG.warning(
f"WARNING for retrieval of encryption key of flow cell {flow_cell.name}, please check "
"dsmerror.log"
)
else:
LOG.error(f"{flow_cell.name}: key retrieval failed")
if not self.dry_run:
flow_cell.status = FlowCellStatus.REQUESTED
self.status.session.commit()
raise error
except PdcError as error:
LOG.error(f"{flow_cell.name}: key retrieval failed")
if not self.dry_run:
flow_cell.status = FlowCellStatus.REQUESTED
self.status.session.commit()
raise error

def retrieve_archived_flow_cell(
self, archived_flow_cell: Path, flow_cell: Flowcell, run_dir: Path
Expand All @@ -231,23 +218,14 @@ def retrieve_archived_flow_cell(
archived_file=archived_flow_cell,
run_dir=run_dir,
)
except subprocess.CalledProcessError as error:
if error.returncode == RETURN_WARNING:
LOG.warning(
f"WARNING for retrieval of flow cell {flow_cell.name}, please check dsmerror.log"
)
else:
LOG.error(f"{flow_cell.name}: run directory retrieval failed")
if not self.dry_run:
flow_cell.status = FlowCellStatus.REQUESTED
self.status.session.commit()
raise error
if not self.dry_run:
try:
if not self.dry_run:
self._set_flow_cell_status_to_retrieved(flow_cell)
except OperationalError as error:
LOG.error(f"Could not set status for flow cell {flow_cell.name}: {error}")
raise error
except PdcError as error:
LOG.error(f"{flow_cell.name}: run directory retrieval failed")
if not self.dry_run:
flow_cell.status = FlowCellStatus.REQUESTED
self.status.session.commit()
raise error

def _set_flow_cell_status_to_retrieved(self, flow_cell: Flowcell):
flow_cell.status = FlowCellStatus.RETRIEVED
Expand All @@ -257,21 +235,19 @@ def _set_flow_cell_status_to_retrieved(self, flow_cell: Flowcell):
def query_pdc_for_flow_cell(self, flow_cell_id: str) -> list[str]:
"""Query PDC for a given flow cell id.
Raise:
CalledProcessError if an error OTHER THAN no files found is raised.
PdcNoFilesMatchingSearchError if no files are found.
"""
dsmc_output: list[str] = []
for _, encryption_directory in self.encryption_directories:
search_pattern = f"{encryption_directory}*{flow_cell_id}*{FileExtensions.GPG}"
try:
self.pdc.query_pdc(search_pattern)
dsmc_output: list[str] = self.pdc.process.stdout.split(NEW_LINE)
except subprocess.CalledProcessError as error:
if error.returncode != PDCExitCodes.NO_FILES_FOUND:
raise error
LOG.debug(f"No archived files found for PDC query: {search_pattern}")
continue
LOG.info(f"Found archived files for PDC query: {search_pattern}")
return dsmc_output
self.pdc.query_pdc(search_pattern)
if self.pdc.was_file_found(self.pdc.process.stderr):
LOG.info(f"Found archived files for PDC query: {search_pattern}")
return self.pdc.process.stdout.split(NEW_LINE)
LOG.debug(f"No archived files found for PDC query: {search_pattern}")

raise PdcNoFilesMatchingSearchError(
message=f"No flow cell files found at PDC for {flow_cell_id}"
)

def retrieve_archived_file(self, archived_file: Path, run_dir: Path) -> None:
"""Retrieve the archived file from PDC to a flow cell runs directory."""
Expand Down
26 changes: 15 additions & 11 deletions cg/meta/backup/pdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import logging
from pathlib import Path
from subprocess import CalledProcessError

import psutil

from cg.constants.pdc import DSMCParameters
from cg.constants.process import EXIT_WARNING
from cg.exc import (
DsmcAlreadyRunningError,
FlowCellAlreadyBackedUpError,
Expand All @@ -20,6 +22,7 @@
LOG = logging.getLogger(__name__)

SERVER = "hasta"
NO_FILE_FOUND_ANSWER = "ANS1092W"


class PdcAPI:
Expand Down Expand Up @@ -55,9 +58,7 @@ def query_pdc(self, search_pattern: str) -> None:
"""Query PDC based on a given search pattern."""
command: list = DSMCParameters.QUERY_COMMAND.copy()
command.append(search_pattern)
LOG.debug("Starting DSMC command:")
LOG.debug(f"{self.process.binary} {' '.join(command)}")
self.process.run_command(parameters=command)
self.run_dsmc_command(command=command)

def retrieve_file_from_pdc(self, file_path: str, target_path: str = None) -> None:
"""Retrieve a file from PDC"""
Expand All @@ -76,8 +77,11 @@ def run_dsmc_command(self, command: list) -> None:
LOG.debug(f"{self.process.binary} {' '.join(command)}")
try:
self.process.run_command(parameters=command, dry_run=self.dry_run)
except Exception as error:
raise PdcError(f"{error}") from error
except CalledProcessError as error:
if error.returncode == EXIT_WARNING:
LOG.warning(f"{error}")
return
raise PdcError(message=f"{error}") from error

def validate_is_flow_cell_backup_possible(
self, db_flow_cell: Flowcell, flow_cell_encryption_api: FlowCellEncryptionAPI
Expand All @@ -104,14 +108,9 @@ def backup_flow_cell(
self, files_to_archive: list[Path], store: Store, db_flow_cell: Flowcell
) -> None:
"""Back-up flow cell files."""
archived_file_count: int = 0
for encrypted_file in files_to_archive:
try:
if not self.dry_run:
self.archive_file_to_pdc(file_path=encrypted_file.as_posix())
archived_file_count += 1
except PdcError:
LOG.warning(f"{encrypted_file.as_posix()} cannot be archived")
if archived_file_count == len(files_to_archive) and not self.dry_run:
store.update_flow_cell_has_backup(flow_cell=db_flow_cell, has_backup=True)
LOG.info(f"Flow cell: {db_flow_cell.name} has been backed up")

Expand All @@ -133,3 +132,8 @@ def start_flow_cell_backup(
store=status_db,
db_flow_cell=db_flow_cell,
)

@staticmethod
def was_file_found(dsmc_output: str) -> bool:
"""Check if file was found in PDC."""
return NO_FILE_FOUND_ANSWER not in dsmc_output
6 changes: 3 additions & 3 deletions cg/utils/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import subprocess
from subprocess import CalledProcessError

from cg.constants.process import RETURN_SUCCESS
from cg.constants.process import EXIT_SUCCESS

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -83,7 +83,7 @@ def run_command(self, parameters: list = None, dry_run: bool = False) -> int:
LOG.info("Running command %s", " ".join(command))
if dry_run:
LOG.info("Dry run: process call will not be executed!!")
return RETURN_SUCCESS
return EXIT_SUCCESS

if self.environment:
res = subprocess.run(
Expand All @@ -100,7 +100,7 @@ def run_command(self, parameters: list = None, dry_run: bool = False) -> int:

self.stdout = res.stdout.decode("utf-8").rstrip()
self.stderr = res.stderr.decode("utf-8").rstrip()
if res.returncode != RETURN_SUCCESS:
if res.returncode != EXIT_SUCCESS:
LOG.critical("Call %s exit with a non zero exit code", command)
LOG.critical(self.stderr)
raise CalledProcessError(res.returncode, command)
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/get/test_cli_get_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from click.testing import CliRunner

from cg.cli.get import get
from cg.constants import RETURN_SUCCESS
from cg.constants import EXIT_SUCCESS
from cg.models.cg_config import CGConfig
from cg.store import Store
from cg.store.models import Analysis
Expand All @@ -19,7 +19,7 @@ def test_get_analysis_bad_case(cli_runner: CliRunner, base_context: CGConfig):
result = cli_runner.invoke(get, ["analysis", name], obj=base_context)

# THEN it should error about missing case instead of getting a analysis
assert result.exit_code != RETURN_SUCCESS
assert result.exit_code != EXIT_SUCCESS


def test_get_analysis_required(
Expand All @@ -35,7 +35,7 @@ def test_get_analysis_required(
result = cli_runner.invoke(get, ["analysis", internal_id], obj=base_context)

# THEN it should have been gotten
assert result.exit_code == RETURN_SUCCESS
assert result.exit_code == EXIT_SUCCESS
assert str(analysis.started_at) in result.output
assert analysis.pipeline in result.output
assert analysis.pipeline_version in result.output
24 changes: 15 additions & 9 deletions tests/meta/backup/conftest.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
import fnmatch
import subprocess
from pathlib import Path
from subprocess import CompletedProcess
from typing import Callable

import pytest

from cg.constants import FileExtensions
from cg.constants.pdc import PDCExitCodes
from cg.models.cg_config import EncryptionDirectories


@pytest.fixture
def mock_pdc_query_method(archived_flow_cells: list[str]) -> Callable:
"""Returns a mock method mimicking the pattern search made by the dsmc q archive command."""

def mock_method(search_pattern: str) -> list[str]:
match = fnmatch.filter(archived_flow_cells, search_pattern)
if not match:
raise subprocess.CalledProcessError(
cmd="dummy_method", returncode=PDCExitCodes.NO_FILES_FOUND
)
return match[0]
def mock_method(search_pattern: str) -> list[str] | None:
if match := fnmatch.filter(archived_flow_cells, search_pattern):
return match

return mock_method

Expand Down Expand Up @@ -78,3 +73,14 @@ def archived_flow_cell() -> Path:
def archived_key() -> Path:
"""Path of archived key"""
return Path("/path/to/archived/encryption_key.key.gpg")


def create_process_response(
return_code: int, args: str = "", std_out: str = "", std_err: str = ""
) -> CompletedProcess:
return CompletedProcess(
args=args,
returncode=return_code,
stderr=std_err.encode("utf-8"),
stdout=std_out.encode("utf-8"),
)
63 changes: 0 additions & 63 deletions tests/meta/backup/test_meta_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,6 @@ def test_query_pdc_for_flow_cell(
assert fnmatch.filter(
names=caplog.messages, pat=f"Found archived files for PDC query:*{flow_cell_name}*.gpg"
)
# THEN the flow cell is logged as not found for two of the search patterns
assert (
len(
fnmatch.filter(
names=caplog.messages,
pat=f"No archived files found for PDC query: *{flow_cell_name}*{FileExtensions.GPG}",
)
)
== 2
)


def test_get_archived_encryption_key_path(dsmc_q_archive_output: list[str], flow_cell_name: str):
Expand Down Expand Up @@ -393,59 +383,6 @@ def test_fetch_flow_cell_retrieve_specified_flow_cell(
assert result > 0


@mock.patch("cg.meta.backup.backup.BackupAPI.unlink_files")
@mock.patch("cg.meta.backup.backup.BackupAPI.create_rta_complete")
@mock.patch("cg.meta.backup.backup.BackupAPI.get_archived_flow_cell_path")
@mock.patch("cg.meta.backup.backup.BackupAPI.get_archived_encryption_key_path")
@mock.patch("cg.meta.backup.backup.BackupAPI.check_processing")
@mock.patch("cg.meta.backup.backup.BackupAPI.get_first_flow_cell")
@mock.patch("cg.meta.tar.tar.TarAPI")
@mock.patch("cg.store.models.Flowcell")
@mock.patch("cg.meta.backup.pdc.PdcAPI")
@mock.patch("cg.store")
def test_fetch_flow_cell_pdc_retrieval_failed(
mock_store,
mock_pdc,
mock_flow_cell,
mock_tar,
mock_get_first_flow_cell,
mock_check_processing,
mock_get_archived_key,
mock_get_archived_flow_cell,
archived_key,
archived_flow_cell,
cg_context,
caplog,
):
"""Tests the fetch_flow_cell method of the backup API when PDC retrieval failed"""

caplog.set_level(logging.INFO)

# GIVEN we are going to retrieve a flow cell from PDC
backup_api = BackupAPI(
encryption_api=mock.Mock(),
encryption_directories=cg_context.backup.encryption_directories,
status=mock_store,
tar_api=mock_tar,
pdc_api=mock_pdc,
flow_cells_dir="cg_context.flow_cells_dir",
)
mock_flow_cell.status = FlowCellStatus.REQUESTED
mock_flow_cell.sequencer_type = Sequencers.NOVASEQ
backup_api.check_processing.return_value = True
backup_api.get_archived_encryption_key_path.return_value = archived_key
backup_api.get_archived_flow_cell_path.return_value = archived_flow_cell
backup_api.tar_api.run_tar_command.return_value = None

# WHEN the retrieval process fails
mock_pdc.retrieve_file_from_pdc.side_effect = subprocess.CalledProcessError(1, "echo")
with pytest.raises(subprocess.CalledProcessError):
backup_api.fetch_flow_cell(flow_cell=mock_flow_cell)

# THEN the failure to retrieve is logged
assert "retrieval failed" in caplog.text


@mock.patch("cg.meta.backup.backup.BackupAPI.unlink_files")
@mock.patch("cg.meta.backup.backup.BackupAPI.create_rta_complete")
@mock.patch("cg.meta.backup.backup.BackupAPI.query_pdc_for_flow_cell")
Expand Down
Loading

0 comments on commit f3ea276

Please sign in to comment.