Skip to content

Commit

Permalink
(Archiving) Rework to accomodate metadata (#2799) (patch)
Browse files Browse the repository at this point in the history
### Added

- Metadata in the archival requests towards DDN/Miria.

### Changed

- The archival flow sends one request per file.
  • Loading branch information
islean authored Jan 8, 2024
1 parent da5c063 commit cb91eb4
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 44 deletions.
52 changes: 32 additions & 20 deletions cg/meta/archive/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def __init__(
self.status_db: Store = status_db
self.data_flow_config: DataFlowConfig = data_flow_config

def archive_files_to_location(
self, files_and_samples: list[FileAndSample], archive_location: ArchiveLocations
def archive_file_to_location(
self, file_and_sample: FileAndSample, archive_location: ArchiveLocations
) -> int:
archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config)
return archive_handler.archive_files(files_and_samples=files_and_samples)
return archive_handler.archive_file(file_and_sample=file_and_sample)

def archive_spring_files_and_add_archives_to_housekeeper(
self, spring_file_count_limit: int | None
Expand All @@ -55,25 +55,37 @@ def archive_spring_files_and_add_archives_to_housekeeper(
LOG.warning("Please do not provide a non-positive integer as limit - exiting.")
return
for archive_location in ArchiveLocations:
files_to_archive: list[File] = self.housekeeper_api.get_non_archived_spring_files(
tags=[archive_location],
limit=spring_file_count_limit,
self.archive_files_to_location(
archive_location=archive_location, file_limit=spring_file_count_limit
)
if files_to_archive:
files_and_samples_for_location = self.add_samples_to_files(files_to_archive)
job_id = self.archive_files_to_location(
files_and_samples=files_and_samples_for_location,
archive_location=archive_location,
)
LOG.info(f"Files submitted to {archive_location} with archival task id {job_id}.")
self.housekeeper_api.add_archives(
files=[
file_and_sample.file for file_and_sample in files_and_samples_for_location
],
archive_task_id=job_id,

def archive_files_to_location(self, archive_location: str, file_limit: int | None) -> None:
"""Archives up to spring file count limit number of files to the provided archive location."""
files_to_archive: list[File] = self.housekeeper_api.get_non_archived_spring_files(
tags=[archive_location],
limit=file_limit,
)
if files_to_archive:
files_and_samples_for_location = self.add_samples_to_files(files_to_archive)
for file_and_sample in files_and_samples_for_location:
self.archive_file(
file_and_sample=file_and_sample, archive_location=archive_location
)
else:
LOG.info(f"No files to archive for location {archive_location}.")

else:
LOG.info(f"No files to archive for location {archive_location}.")

def archive_file(
self, file_and_sample: FileAndSample, archive_location: ArchiveLocations
) -> None:
job_id: int = self.archive_file_to_location(
file_and_sample=file_and_sample, archive_location=archive_location
)
LOG.info(f"File submitted to {archive_location} with archival task id {job_id}.")
self.housekeeper_api.add_archives(
files=[file_and_sample.file],
archive_task_id=job_id,
)

def retrieve_case(self, case_id: str) -> None:
"""Submits jobs to retrieve any archived files belonging to the given case, and updates the Archive entries
Expand Down
12 changes: 12 additions & 0 deletions cg/meta/archive/ddn/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,15 @@ class JobStatus(StrEnum):
JobStatus.ON_VALIDATION,
JobStatus.RUNNING,
]

METADATA_LIST = "metadataList"


class MetadataFields(StrEnum):
CUSTOMER_NAME: str = "customer_name"
PREP_CATEGORY: str = "prep_category"
SAMPLE_NAME: str = "sample_name"
SEQUENCED_AT: str = "sequenced_at"
TICKET_NUMBER: str = "ticket_number"
NAME = "metadataName"
VALUE = "value"
29 changes: 15 additions & 14 deletions cg/meta/archive/ddn/ddn_data_flow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
RefreshPayload,
TransferPayload,
)
from cg.meta.archive.ddn.utils import get_metadata
from cg.meta.archive.models import ArchiveHandler, FileAndSample
from cg.models.cg_config import DataFlowConfig

Expand Down Expand Up @@ -100,14 +101,15 @@ def auth_header(self) -> dict[str, str]:
self._refresh_auth_token()
return {"Authorization": f"Bearer {self.auth_token}"}

def archive_files(self, files_and_samples: list[FileAndSample]) -> int:
def archive_file(self, file_and_sample: FileAndSample) -> int:
"""Archives all files provided, to their corresponding destination, as given by sources
and destination in TransferData. Returns the job ID of the archiving task."""
miria_file_data: list[MiriaObject] = self.convert_into_transfer_data(
files_and_samples, is_archiving=True
[file_and_sample], is_archiving=True
)
metadata: list[dict] = get_metadata(file_and_sample.sample)
archival_request: TransferPayload = self.create_transfer_request(
miria_file_data=miria_file_data, is_archiving_request=True
miria_file_data=miria_file_data, is_archiving_request=True, metadata=metadata
)
return archival_request.post_request(
headers=dict(self.headers, **self.auth_header),
Expand All @@ -117,15 +119,9 @@ def archive_files(self, files_and_samples: list[FileAndSample]) -> int:
def retrieve_files(self, files_and_samples: list[FileAndSample]) -> int:
"""Retrieves the provided files and stores them in the corresponding sample bundle in
Housekeeper."""
miria_file_data: list[MiriaObject] = []
for file_and_sample in files_and_samples:
LOG.info(
f"Will retrieve file {file_and_sample.file.path} for sample {file_and_sample.sample.internal_id} via Miria."
)
miria_object: MiriaObject = MiriaObject.create_from_file_and_sample(
file=file_and_sample.file, sample=file_and_sample.sample, is_archiving=False
)
miria_file_data.append(miria_object)
miria_file_data: list[MiriaObject] = self.convert_into_transfer_data(
files_and_samples=files_and_samples, is_archiving=False
)
retrieval_request: TransferPayload = self.create_transfer_request(
miria_file_data=miria_file_data, is_archiving_request=False
)
Expand All @@ -135,7 +131,10 @@ def retrieve_files(self, files_and_samples: list[FileAndSample]) -> int:
).job_id

def create_transfer_request(
self, miria_file_data: list[MiriaObject], is_archiving_request: bool
self,
miria_file_data: list[MiriaObject],
is_archiving_request: bool,
metadata: list[dict] = [],
) -> TransferPayload:
"""Performs the necessary curation of paths for the request to be valid, depending on if
it is an archiving or a retrieve request.
Expand All @@ -151,7 +150,9 @@ def create_transfer_request(
)

transfer_request = TransferPayload(
files_to_transfer=miria_file_data, createFolder=is_archiving_request
files_to_transfer=miria_file_data,
createFolder=is_archiving_request,
metadataList=metadata,
)
transfer_request.trim_paths(attribute_to_trim=attribute)
transfer_request.add_repositories(
Expand Down
3 changes: 1 addition & 2 deletions cg/meta/archive/ddn/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def get_request_log(headers: dict, body: dict):
class MiriaObject(FileTransferData):
"""Model for representing a singular object transfer."""

_metadata = None
destination: str
source: str

Expand Down Expand Up @@ -58,6 +57,7 @@ class TransferPayload(BaseModel):
osType: str = OSTYPE
createFolder: bool = True
settings: list[dict] = []
metadataList: list[dict] = []

def trim_paths(self, attribute_to_trim: str):
"""Trims the source path from its root directory for all objects in the transfer."""
Expand All @@ -76,7 +76,6 @@ def model_dump(self, **kwargs) -> dict:
"""Creates a correctly structured dict to be used as the request payload."""
payload: dict = super().model_dump(exclude={"files_to_transfer"})
payload["pathInfo"] = [miria_file.model_dump() for miria_file in self.files_to_transfer]
payload["metadataList"] = []
return payload

def post_request(self, url: str, headers: dict) -> "TransferResponse":
Expand Down
29 changes: 29 additions & 0 deletions cg/meta/archive/ddn/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from cg.meta.archive.ddn.constants import MetadataFields
from cg.store.models import Sample


def get_metadata(sample: Sample) -> list[dict]:
"""Returns metadata generated from a sample."""
metadata: list[dict] = [
{
MetadataFields.NAME.value: MetadataFields.CUSTOMER_NAME.value,
MetadataFields.VALUE.value: sample.customer.name,
},
{
MetadataFields.NAME.value: MetadataFields.PREP_CATEGORY.value,
MetadataFields.VALUE.value: sample.prep_category,
},
{
MetadataFields.NAME.value: MetadataFields.SAMPLE_NAME.value,
MetadataFields.VALUE.value: sample.name,
},
{
MetadataFields.NAME.value: MetadataFields.SEQUENCED_AT.value,
MetadataFields.VALUE.value: sample.last_sequenced_at,
},
{
MetadataFields.NAME.value: MetadataFields.TICKET_NUMBER.value,
MetadataFields.VALUE.value: sample.original_ticket,
},
]
return metadata
2 changes: 1 addition & 1 deletion cg/meta/archive/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, config: DataFlowConfig):
pass

@abstractmethod
def archive_files(self, files_and_samples: list[FileAndSample]):
def archive_file(self, file_and_sample: FileAndSample):
"""Archives all folders provided, to their corresponding destination,
as given by sources and destination parameter."""
pass
Expand Down
13 changes: 9 additions & 4 deletions tests/meta/archive/test_archive_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cg.meta.archive.archive import ARCHIVE_HANDLERS, FileAndSample, SpringArchiveAPI
from cg.meta.archive.ddn.constants import (
FAILED_JOB_STATUSES,
METADATA_LIST,
ONGOING_JOB_STATUSES,
JobStatus,
)
Expand All @@ -22,6 +23,7 @@
GetJobStatusResponse,
MiriaObject,
)
from cg.meta.archive.ddn.utils import get_metadata
from cg.meta.archive.models import ArchiveHandler, FileTransferData
from cg.models.cg_config import DataFlowConfig
from cg.store.models import Sample
Expand Down Expand Up @@ -141,16 +143,16 @@ def test_call_corresponding_archiving_method(spring_archive_api: SpringArchiveAP
return_value=123,
), mock.patch.object(
DDNDataFlowClient,
"archive_files",
"archive_file",
return_value=123,
) as mock_request_submitter:
# WHEN calling the corresponding archive method
spring_archive_api.archive_files_to_location(
files_and_samples=[file_and_sample], archive_location=ArchiveLocations.KAROLINSKA_BUCKET
spring_archive_api.archive_file_to_location(
file_and_sample=file_and_sample, archive_location=ArchiveLocations.KAROLINSKA_BUCKET
)

# THEN the correct archive function should have been called once
mock_request_submitter.assert_called_once_with(files_and_samples=[file_and_sample])
mock_request_submitter.assert_called_once_with(file_and_sample=file_and_sample)


@pytest.mark.parametrize("limit", [None, -1, 0, 1])
Expand Down Expand Up @@ -184,6 +186,9 @@ def test_archive_all_non_archived_spring_files(

# THEN the DDN archiving function should have been called with the correct destination and source if limit > 0
if limit not in [0, -1]:
sample: Sample = spring_archive_api.status_db.get_sample_by_internal_id(sample_id)
metadata: list[dict] = get_metadata(sample)
archive_request_json[METADATA_LIST] = metadata
mock_request_submitter.assert_called_with(
api_method=APIMethods.POST,
url="some/api/files/archive",
Expand Down
7 changes: 4 additions & 3 deletions tests/meta/archive/test_archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient
from cg.meta.archive.ddn.models import MiriaObject, TransferPayload
from cg.meta.archive.ddn.utils import get_metadata
from cg.meta.archive.models import FileAndSample
from cg.models.cg_config import DataFlowConfig
from cg.store import Store
Expand Down Expand Up @@ -264,7 +265,7 @@ def test__refresh_auth_token(ddn_dataflow_client: DDNDataFlowClient, ok_response
assert ddn_dataflow_client.token_expiration.second == new_expiration.second


def test_archive_folders(
def test_archive_file(
ddn_dataflow_client: DDNDataFlowClient,
remote_storage_repository: str,
local_storage_repository: str,
Expand All @@ -281,7 +282,7 @@ def test_archive_folders(
"api_request_from_content",
return_value=ok_miria_response,
) as mock_request_submitter:
job_id: int = ddn_dataflow_client.archive_files([file_and_sample])
job_id: int = ddn_dataflow_client.archive_file(file_and_sample)

# THEN an integer should be returned
assert isinstance(job_id, int)
Expand All @@ -300,7 +301,7 @@ def test_archive_folders(
],
"osType": OSTYPE,
"createFolder": True,
"metadataList": [],
"metadataList": get_metadata(file_and_sample.sample),
"settings": [],
},
verify=False,
Expand Down

0 comments on commit cb91eb4

Please sign in to comment.