Skip to content

Commit

Permalink
(Archiving) Retrieve files from DDN in start command (#2674) (minor)
Browse files Browse the repository at this point in the history
### Changed

- Added check for any archived spring files related to the case, and if so, a job is launched to retrieve them.
  • Loading branch information
islean authored Jan 3, 2024
1 parent 7218388 commit aa65526
Show file tree
Hide file tree
Showing 13 changed files with 361 additions and 168 deletions.
4 changes: 3 additions & 1 deletion cg/apps/housekeeper/hk.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def get_files_from_latest_version(self, bundle_name: str, tags: list[str]) -> Qu
return self.files(version=version.id, tags=tags)

def is_fastq_or_spring_in_all_bundles(self, bundle_names: list[str]) -> bool:
"""Return whether or not all FASTQ/SPRING files are included for the given bundles."""
"""Return whether all FASTQ/SPRING files are included for the given bundles."""
sequencing_files_in_hk: dict[str, bool] = {}
if not bundle_names:
return False
Expand Down Expand Up @@ -435,6 +435,7 @@ def get_archived_files_for_bundle(
self, bundle_name: str, tags: list | None = None
) -> list[File]:
"""Returns all archived_files from a given bundle, tagged with the given tags"""
LOG.debug(f"Getting archived files for bundle {bundle_name}")
return self._store.get_archived_files_for_bundle(bundle_name=bundle_name, tags=tags or [])

def add_archives(self, files: list[File], archive_task_id: int) -> None:
Expand Down Expand Up @@ -509,6 +510,7 @@ def set_archive_retrieval_task_id(self, file_id: int, retrieval_task_id: int) ->
if not archive:
raise ValueError(f"No Archive entry found for file with id {file_id}.")
self._store.update_retrieval_task_id(archive=archive, retrieval_task_id=retrieval_task_id)
self.commit()

def get_sample_sheets_from_latest_version(self, flow_cell_id: str) -> list[File]:
"""Returns the files tagged with 'samplesheet' for the given bundle."""
Expand Down
101 changes: 39 additions & 62 deletions cg/meta/archive/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from cg.constants.archiving import ArchiveLocations
from cg.exc import ArchiveJobFailedError
from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient
from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination
from cg.meta.archive.models import ArchiveHandler, FileAndSample
from cg.models.cg_config import DataFlowConfig
from cg.store import Store
from cg.store.models import Sample
from cg.store.models import Case, Sample

LOG = logging.getLogger(__name__)
ARCHIVE_HANDLERS: dict[str, Type[ArchiveHandler]] = {
Expand All @@ -29,20 +29,6 @@ class ArchiveModels(BaseModel):
handler: ArchiveHandler


def filter_samples_on_archive_location(
samples_and_destinations: list[SampleAndDestination],
archive_location: ArchiveLocations,
) -> list[SampleAndDestination]:
"""
Returns a list of SampleAndHousekeeperDestinations where the associated sample has a specific archive location.
"""
return [
sample_and_destination
for sample_and_destination in samples_and_destinations
if sample_and_destination.sample.archive_location == archive_location
]


class SpringArchiveAPI:
"""Class handling the archiving of sample SPRING files to an off-premise location for long
term storage."""
Expand Down Expand Up @@ -89,31 +75,45 @@ def archive_spring_files_and_add_archives_to_housekeeper(
else:
LOG.info(f"No files to archive for location {archive_location}.")

def retrieve_samples(self, sample_internal_ids: list[str]) -> None:
"""Retrieves the archived spring files for a list of samples."""
samples: list[Sample] = [
self.status_db.get_sample_by_internal_id(sample_internal_id)
for sample_internal_id in sample_internal_ids
]
samples_and_destinations: list[SampleAndDestination] = self.join_destinations_and_samples(
samples
def retrieve_case(self, case_id: str) -> None:
"""Submits jobs to retrieve any archived files belonging to the given case, and updates the Archive entries
with the retrieval job id."""
case: Case = self.status_db.get_case_by_internal_id(case_id)
files_to_retrieve: list[File] = self.get_files_to_retrieve(case)
self.retrieve_files_from_archive_location(
files_and_samples=self.add_samples_to_files(files=files_to_retrieve),
archive_location=case.customer.data_archive_location,
)
for archive_location in ArchiveLocations:
filtered_samples: list[SampleAndDestination] = filter_samples_on_archive_location(
samples_and_destinations=samples_and_destinations,
archive_location=archive_location,

def get_files_to_retrieve(self, case: Case) -> list[File]:
return [
file
for file in self.get_archived_spring_files_for_case(case)
if file.archive.retrieval_task_id is None
]

def get_archived_spring_files_for_case(self, case: Case) -> list[File]:
"""Returns a list of archived Spring files, i.e. they have entries in the Archive table
in Housekeeper."""
archived_files: list[File] = []
for link in case.links:
archived_files += self.housekeeper_api.get_archived_files_for_bundle(
bundle_name=link.sample.internal_id,
tags=[SequencingFileTag.SPRING],
)
if filtered_samples:
job_id: int = self.retrieve_samples_from_archive_location(
samples_and_destinations=filtered_samples,
archive_location=archive_location,
)
self.set_archive_retrieval_task_ids(
retrieval_task_id=job_id,
files=self.get_archived_files_from_samples(
[sample.sample for sample in filtered_samples]
),
)
return archived_files

def retrieve_files_from_archive_location(
self, files_and_samples: list[FileAndSample], archive_location: str
) -> None:
"""Retrieves the archived spring files for a list of samples and sets retrieval ids in Housekeeper."""
archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config)
job_id: int = archive_handler.retrieve_files(files_and_samples)
LOG.info(f"Retrieval job launched with ID {job_id}")
self.set_archive_retrieval_task_ids(
retrieval_task_id=job_id,
files=[file_and_sample.file for file_and_sample in files_and_samples],
)

def get_archived_files_from_samples(self, samples: list[Sample]) -> list[File]:
"""Gets archived spring files from the bundles corresponding to the given list of samples."""
Expand All @@ -126,29 +126,6 @@ def get_archived_files_from_samples(self, samples: list[Sample]) -> list[File]:
)
return files

def join_destinations_and_samples(self, samples: list[Sample]) -> list[SampleAndDestination]:
"""Gets all samples and combines them with their desired destination in Housekeeper."""
samples_to_retrieve: list[SampleAndDestination] = []
for sample in samples:
LOG.debug(f"Will try to retrieve sample: {sample.internal_id}.")
destination: str = self.get_destination_from_sample_internal_id(sample.internal_id)
samples_to_retrieve.append(SampleAndDestination(sample=sample, destination=destination))
return samples_to_retrieve

def retrieve_samples_from_archive_location(
self,
samples_and_destinations: list[SampleAndDestination],
archive_location: ArchiveLocations,
):
archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config)
return archive_handler.retrieve_samples(samples_and_destinations)

def get_destination_from_sample_internal_id(self, sample_internal_id) -> str:
"""Returns where in Housekeeper to put the retrieved spring files for the specified sample."""
return self.housekeeper_api.get_latest_bundle_version(
sample_internal_id
).full_path.as_posix()

def set_archive_retrieval_task_ids(self, retrieval_task_id: int, files: list[File]) -> None:
for file in files:
self.housekeeper_api.set_archive_retrieval_task_id(
Expand Down
15 changes: 9 additions & 6 deletions cg/meta/archive/ddn/ddn_data_flow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
RefreshPayload,
TransferPayload,
)
from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination
from cg.meta.archive.models import ArchiveHandler, FileAndSample
from cg.models.cg_config import DataFlowConfig

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,13 +114,16 @@ def archive_files(self, files_and_samples: list[FileAndSample]) -> int:
url=urljoin(base=self.url, url=DataflowEndpoints.ARCHIVE_FILES),
).job_id

def retrieve_samples(self, samples_and_destinations: list[SampleAndDestination]) -> int:
"""Retrieves all archived files for the provided samples and stores them in the specified location in
def retrieve_files(self, files_and_samples: list[FileAndSample]) -> int:
"""Retrieves the provided files and stores them in the corresponding sample bundle in
Housekeeper."""
miria_file_data: list[MiriaObject] = []
for sample_and_housekeeper_destination in samples_and_destinations:
miria_object: MiriaObject = MiriaObject.create_from_sample_and_destination(
sample_and_housekeeper_destination
for file_and_sample in files_and_samples:
LOG.info(
f"Will retrieve file {file_and_sample.file.path} for sample {file_and_sample.sample.internal_id} via Miria."
)
miria_object: MiriaObject = MiriaObject.create_from_file_and_sample(
file=file_and_sample.file, sample=file_and_sample.sample, is_archiving=False
)
miria_file_data.append(miria_object)
retrieval_request: TransferPayload = self.create_transfer_request(
Expand Down
14 changes: 3 additions & 11 deletions cg/meta/archive/ddn/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from cg.constants.constants import APIMethods
from cg.io.controller import APIRequest
from cg.meta.archive.ddn.constants import OSTYPE, ROOT_TO_TRIM, JobStatus
from cg.meta.archive.models import FileTransferData, SampleAndDestination
from cg.meta.archive.models import FileTransferData
from cg.store.models import Sample

LOG = logging.getLogger(__name__)
Expand All @@ -32,17 +32,9 @@ def create_from_file_and_sample(
"""Instantiates the class from a File and Sample object."""
if is_archiving:
return cls(destination=sample.internal_id, source=file.full_path)
return cls(destination=file.full_path, source=sample.internal_id)

@classmethod
def create_from_sample_and_destination(
cls, sample_and_destination: SampleAndDestination
) -> "MiriaObject":
"""Instantiates the class from a SampleAndDestination object,
i.e. when we want to fetch a folder containing all spring files for said sample."""
return cls(
destination=sample_and_destination.destination,
source=sample_and_destination.sample.internal_id,
destination=Path(file.full_path).parent.as_posix(),
source=Path(sample.internal_id, Path(file.path).name).as_posix(),
)

def trim_path(self, attribute_to_trim: str):
Expand Down
10 changes: 1 addition & 9 deletions cg/meta/archive/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ class FileAndSample(BaseModel):
sample: Sample


class SampleAndDestination(BaseModel):
"""Contains a sample and the path to the sample bundle in Housekeeper where any retrieved files should be stored."""

model_config = ConfigDict(arbitrary_types_allowed=True)
sample: Sample
destination: str


class FileTransferData(BaseModel):
"""Base class for classes representing files to be archived."""

Expand Down Expand Up @@ -49,7 +41,7 @@ def archive_files(self, files_and_samples: list[FileAndSample]):
pass

@abstractmethod
def retrieve_samples(self, samples_and_destinations: list[SampleAndDestination]):
def retrieve_files(self, files_and_samples: list[FileAndSample]):
"""Retrieves all files for all samples for the given flowcell."""
pass

Expand Down
57 changes: 49 additions & 8 deletions cg/meta/workflow/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from cg.constants.scout import ScoutExportFileName
from cg.exc import AnalysisNotReadyError, BundleAlreadyAddedError, CgDataError, CgError
from cg.io.controller import WriteFile
from cg.meta.archive.archive import SpringArchiveAPI
from cg.meta.meta import MetaAPI
from cg.meta.workflow.fastq import FastqHandler
from cg.models.analysis import AnalysisModel
Expand Down Expand Up @@ -499,10 +500,9 @@ def ensure_flow_cells_on_disk(self, case_id: str) -> None:
self.status_db.request_flow_cells_for_case(case_id)

def is_case_ready_for_analysis(self, case_id: str) -> bool:
if self._is_flow_cell_check_applicable(
case_id
) and not self.status_db.are_all_flow_cells_on_disk(case_id):
LOG.warning(f"Case {case_id} is not ready - all flow cells not present on disk.")
"""Returns True if no files need to be retrieved from an external location and if all Spring files are
decompressed."""
if self.does_any_file_need_to_be_retrieved(case_id):
return False
if self.prepare_fastq_api.is_spring_decompression_needed(
case_id
Expand All @@ -511,13 +511,54 @@ def is_case_ready_for_analysis(self, case_id: str) -> bool:
return False
return True

def does_any_file_need_to_be_retrieved(self, case_id: str) -> bool:
"""Checks whether we need to retrieve files from an external data location."""
if self._is_flow_cell_check_applicable(
case_id
) and not self.status_db.are_all_flow_cells_on_disk(case_id):
LOG.warning(f"Case {case_id} is not ready - all flow cells not present on disk.")
return True
else:
if not self.are_all_spring_files_present(case_id):
LOG.warning(f"Case {case_id} is not ready - some files are archived.")
return True
return False

def prepare_fastq_files(self, case_id: str, dry_run: bool) -> None:
"""Retrieves or decompresses fastq files if needed, upon which an AnalysisNotReady error
"""Retrieves or decompresses Spring files if needed. If so, an AnalysisNotReady error
is raised."""
self.ensure_flow_cells_on_disk(case_id)
self.resolve_decompression(case_id, dry_run=dry_run)
self.ensure_files_are_present(case_id)
self.resolve_decompression(case_id=case_id, dry_run=dry_run)
if not self.is_case_ready_for_analysis(case_id):
raise AnalysisNotReadyError("FASTQ file are not present for the analysis to start")
raise AnalysisNotReadyError("FASTQ files are not present for the analysis to start")

def ensure_files_are_present(self, case_id: str):
"""Checks if any flow cells need to be retrieved and submits a job if that is the case.
Also checks if any spring files are archived and submits a job to retrieve any which are."""
self.ensure_flow_cells_on_disk(case_id)
if not self.are_all_spring_files_present(case_id):
LOG.warning(f"Files are archived for case {case_id}")
spring_archive_api = SpringArchiveAPI(
status_db=self.status_db,
housekeeper_api=self.housekeeper_api,
data_flow_config=self.config.data_flow,
)
spring_archive_api.retrieve_case(case_id)

def are_all_spring_files_present(self, case_id: str) -> bool:
"""Return True if no Spring files for the case are archived in the data location used by the customer."""
case: Case = self.status_db.get_case_by_internal_id(case_id)
for sample in [link.sample for link in case.links]:
if (
files := self.housekeeper_api.get_archived_files_for_bundle(
bundle_name=sample.internal_id, tags=[SequencingFileTag.SPRING]
)
) and not all(file.archive.retrieved_at for file in files):
return False
return True

def get_archive_location_for_case(self, case_id: str) -> str:
return self.status_db.get_case_by_internal_id(case_id).customer.data_archive_location

@staticmethod
def _write_managed_variants(out_dir: Path, content: list[str]) -> None:
Expand Down
2 changes: 2 additions & 0 deletions cg/store/api/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def add_case(
cohorts: list[str] | None = None,
priority: Priority | None = Priority.standard,
synopsis: str | None = None,
customer_id: int | None = None,
) -> Case:
"""Build a new Case record."""

Expand All @@ -222,6 +223,7 @@ def add_case(
priority=priority,
synopsis=synopsis,
tickets=ticket,
customer_id=customer_id,
)

def relate_sample(
Expand Down
15 changes: 13 additions & 2 deletions tests/meta/archive/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.constants import SequencingFileTag
from cg.constants.archiving import ArchiveLocations
from cg.constants.constants import FileFormat
from cg.constants.constants import DataDelivery, FileFormat, Pipeline
from cg.constants.subject import Sex
from cg.io.controller import WriteStream
from cg.meta.archive.archive import SpringArchiveAPI
Expand All @@ -22,7 +22,7 @@
from cg.meta.archive.models import FileAndSample
from cg.models.cg_config import CGConfig, DataFlowConfig
from cg.store import Store
from cg.store.models import Customer, Sample
from cg.store.models import Case, Customer, Sample
from tests.store_helpers import StoreHelpers


Expand Down Expand Up @@ -262,6 +262,7 @@ def archive_store(
new_samples[0].customer = customer_ddn
new_samples[1].customer = customer_ddn
new_samples[2].customer = customer_without_ddn

external_app = base_store.get_application_by_tag("WGXCUSC000").versions[0]
wgs_app = base_store.get_application_by_tag("WGSPCFC030").versions[0]
for sample in new_samples:
Expand All @@ -270,6 +271,16 @@ def archive_store(
base_store.session.add(customer_without_ddn)
base_store.session.add_all(new_samples)
base_store.session.commit()
case: Case = base_store.add_case(
data_analysis=Pipeline.MIP_DNA,
data_delivery=DataDelivery.NO_DELIVERY,
name="dummy_name",
ticket="123",
customer_id=customer_ddn.id,
)
base_store.relate_sample(case=case, sample=new_samples[0], status="unknown")
base_store.session.add(case)
base_store.session.commit()
return base_store


Expand Down
Loading

0 comments on commit aa65526

Please sign in to comment.