diff --git a/cg/meta/archive/archive.py b/cg/meta/archive/archive.py index 2f76068b75..4e95b39e63 100644 --- a/cg/meta/archive/archive.py +++ b/cg/meta/archive/archive.py @@ -9,7 +9,7 @@ from cg.constants import SequencingFileTag from cg.constants.archiving import ArchiveLocations from cg.exc import ArchiveJobFailedError -from cg.meta.archive.ddn_dataflow import DDNDataFlowClient +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination from cg.models.cg_config import DataFlowConfig from cg.store import Store diff --git a/cg/meta/archive/ddn/__init__.py b/cg/meta/archive/ddn/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/cg/meta/archive/ddn/constants.py b/cg/meta/archive/ddn/constants.py new file mode 100644 index 0000000000..2267622286 --- /dev/null +++ b/cg/meta/archive/ddn/constants.py @@ -0,0 +1,51 @@ +from enum import StrEnum + +OSTYPE: str = "Unix/MacOS" +ROOT_TO_TRIM: str = "/home" + +DESTINATION_ATTRIBUTE: str = "destination" +SOURCE_ATTRIBUTE: str = "source" + + +class DataflowEndpoints(StrEnum): + """Enum containing all DDN dataflow endpoints used.""" + + ARCHIVE_FILES = "files/archive" + DELETE_FILE = "files/delete" + GET_AUTH_TOKEN = "auth/token" + REFRESH_AUTH_TOKEN = "auth/token/refresh" + RETRIEVE_FILES = "files/retrieve" + GET_JOB_STATUS = "activity/jobs/" + + +class JobStatus(StrEnum): + """Enum for the different job statuses which can be returned via Miria.""" + + CANCELED = "Canceled" + COMPLETED = "Completed" + DENIED = "Denied" + CREATION_IN_PROGRESS = "Creation in progress" + IN_QUEUE = "In Queue" + INVALID_LICENSE = "Invalid license" + ON_VALIDATION = "On validation" + REFUSED = "Refused" + RUNNING = "Running" + SUSPENDED = "Suspended" + TERMINATED_ON_ERROR = "Terminated on error" + TERMINATED_ON_WARNING = "Terminated on warning" + + +FAILED_JOB_STATUSES: list[str] = [ + JobStatus.CANCELED, + JobStatus.DENIED, + JobStatus.INVALID_LICENSE, + JobStatus.REFUSED, + JobStatus.TERMINATED_ON_ERROR, + JobStatus.TERMINATED_ON_WARNING, +] +ONGOING_JOB_STATUSES: list[str] = [ + JobStatus.CREATION_IN_PROGRESS, + JobStatus.IN_QUEUE, + JobStatus.ON_VALIDATION, + JobStatus.RUNNING, +] diff --git a/cg/meta/archive/ddn_dataflow.py b/cg/meta/archive/ddn/ddn_data_flow_client.py similarity index 50% rename from cg/meta/archive/ddn_dataflow.py rename to cg/meta/archive/ddn/ddn_data_flow_client.py index 756a806f5d..a36dbb8303 100644 --- a/cg/meta/archive/ddn_dataflow.py +++ b/cg/meta/archive/ddn/ddn_data_flow_client.py @@ -1,265 +1,37 @@ """Module for archiving and retrieving folders via DDN Dataflow.""" import logging from datetime import datetime -from enum import StrEnum from pathlib import Path from urllib.parse import urljoin from housekeeper.store.models import File -from pydantic import BaseModel, Field -from requests.models import Response +from requests import Response from cg.constants.constants import APIMethods from cg.exc import ArchiveJobFailedError, DdnDataflowAuthenticationError from cg.io.controller import APIRequest -from cg.meta.archive.models import ( - ArchiveHandler, - FileAndSample, - FileTransferData, - SampleAndDestination, +from cg.meta.archive.ddn.constants import ( + DESTINATION_ATTRIBUTE, + FAILED_JOB_STATUSES, + SOURCE_ATTRIBUTE, + DataflowEndpoints, + JobStatus, ) +from cg.meta.archive.ddn.models import ( + AuthPayload, + AuthToken, + DeleteFilePayload, + GetJobStatusPayload, + GetJobStatusResponse, + MiriaObject, + RefreshPayload, + TransferPayload, +) +from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination from cg.models.cg_config import DataFlowConfig -from cg.store.models import Sample LOG = logging.getLogger(__name__) -OSTYPE: str = "Unix/MacOS" -ROOT_TO_TRIM: str = "/home" - -DESTINATION_ATTRIBUTE: str = "destination" -SOURCE_ATTRIBUTE: str = "source" - - -def get_request_log(headers: dict, body: dict): - return "Sending request with headers: \n" + f"{headers} \n" + "and body: \n" + f"{body}" - - -class DataflowEndpoints(StrEnum): - """Enum containing all DDN dataflow endpoints used.""" - - ARCHIVE_FILES = "files/archive" - DELETE_FILE = "files/delete" - GET_AUTH_TOKEN = "auth/token" - REFRESH_AUTH_TOKEN = "auth/token/refresh" - RETRIEVE_FILES = "files/retrieve" - GET_JOB_STATUS = "activity/jobs/" - - -class JobStatus(StrEnum): - """Enum for the different job statuses which can be returned via Miria.""" - - CANCELED = "Canceled" - COMPLETED = "Completed" - DENIED = "Denied" - CREATION_IN_PROGRESS = "Creation in progress" - IN_QUEUE = "In Queue" - INVALID_LICENSE = "Invalid license" - ON_VALIDATION = "On validation" - REFUSED = "Refused" - RUNNING = "Running" - SUSPENDED = "Suspended" - TERMINATED_ON_ERROR = "Terminated on error" - TERMINATED_ON_WARNING = "Terminated on warning" - - -FAILED_JOB_STATUSES: list[str] = [ - JobStatus.CANCELED, - JobStatus.DENIED, - JobStatus.INVALID_LICENSE, - JobStatus.REFUSED, - JobStatus.TERMINATED_ON_ERROR, - JobStatus.TERMINATED_ON_WARNING, -] -ONGOING_JOB_STATUSES: list[str] = [ - JobStatus.CREATION_IN_PROGRESS, - JobStatus.IN_QUEUE, - JobStatus.ON_VALIDATION, - JobStatus.RUNNING, -] - - -class MiriaObject(FileTransferData): - """Model for representing a singular object transfer.""" - - _metadata = None - destination: str - source: str - - @classmethod - def create_from_file_and_sample( - cls, file: File, sample: Sample, is_archiving: bool = True - ) -> "MiriaObject": - """Instantiates the class from a File and Sample object.""" - if is_archiving: - return cls(destination=sample.internal_id, source=file.full_path) - return cls(destination=file.full_path, source=sample.internal_id) - - @classmethod - def create_from_sample_and_destination( - cls, sample_and_destination: SampleAndDestination - ) -> "MiriaObject": - """Instantiates the class from a SampleAndDestination object, - i.e. when we want to fetch a folder containing all spring files for said sample.""" - return cls( - destination=sample_and_destination.destination, - source=sample_and_destination.sample.internal_id, - ) - - def trim_path(self, attribute_to_trim: str): - """Trims the given attribute (source or destination) from its root directory.""" - setattr( - self, - attribute_to_trim, - f"/{Path(getattr(self, attribute_to_trim)).relative_to(ROOT_TO_TRIM)}", - ) - - def add_repositories(self, source_prefix: str, destination_prefix: str): - """Prepends the given repositories to the source and destination paths.""" - self.source: str = source_prefix + self.source - self.destination: str = destination_prefix + self.destination - - -class TransferPayload(BaseModel): - """Model for representing a Dataflow transfer task.""" - - files_to_transfer: list[MiriaObject] - osType: str = OSTYPE - createFolder: bool = True - settings: list[dict] = [] - - def trim_paths(self, attribute_to_trim: str): - """Trims the source path from its root directory for all objects in the transfer.""" - for miria_file in self.files_to_transfer: - miria_file.trim_path(attribute_to_trim=attribute_to_trim) - - def add_repositories(self, source_prefix: str, destination_prefix: str): - """Prepends the given repositories to the source and destination paths all objects in the - transfer.""" - for miria_file in self.files_to_transfer: - miria_file.add_repositories( - source_prefix=source_prefix, destination_prefix=destination_prefix - ) - - def model_dump(self, **kwargs) -> dict: - """Creates a correctly structured dict to be used as the request payload.""" - payload: dict = super().model_dump(exclude={"files_to_transfer"}) - payload["pathInfo"] = [miria_file.model_dump() for miria_file in self.files_to_transfer] - payload["metadataList"] = [] - return payload - - def post_request(self, url: str, headers: dict) -> "TransferJob": - """Sends a request to the given url with, the given headers, and its own content as payload. - - Arguments: - url: URL to which the POST goes to. - headers: Headers which are set in the request - Raises: - HTTPError if the response status is not successful. - ValidationError if the response does not conform to the expected response structure. - Returns: - The job ID of the launched transfer task. - """ - - LOG.info(get_request_log(headers=headers, body=self.model_dump())) - - response: Response = APIRequest.api_request_from_content( - api_method=APIMethods.POST, - url=url, - headers=headers, - json=self.model_dump(), - verify=False, - ) - response.raise_for_status() - return TransferJob.model_validate(response.json()) - - -class AuthPayload(BaseModel): - """Model representing the payload for an Authentication request.""" - - dbName: str - name: str - password: str - superUser: bool = False - - -class RefreshPayload(BaseModel): - """Model representing the payload for Auth-token refresh request.""" - - refresh: str - - -class AuthToken(BaseModel): - """Model representing the response fields from an access request to the Dataflow API.""" - - access: str - expire: int - refresh: str | None = None - - -class TransferJob(BaseModel): - """Model representing th response fields of an archive or retrieve reqeust to the Dataflow - API.""" - - job_id: int = Field(alias="jobId") - - -class GetJobStatusResponse(BaseModel): - """Model representing the response fields from a get_job_status post.""" - - job_id: int = Field(alias="id") - status: JobStatus - - -class GetJobStatusPayload(BaseModel): - """Model representing the payload for a get_job_status request.""" - - id: int - - def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse: - """Sends a get request to the given URL with the given headers. - Returns the parsed status response of the task specified in the URL. - Raises: - HTTPError if the response code is not ok. - """ - - LOG.info(get_request_log(headers=headers, body=self.model_dump())) - - response: Response = APIRequest.api_request_from_content( - api_method=APIMethods.GET, - url=url, - headers=headers, - json=self.model_dump(), - verify=False, - ) - response.raise_for_status() - return GetJobStatusResponse.model_validate(response.json()) - - -class DeleteFileResponse(BaseModel): - message: str - - -class DeleteFilePayload(BaseModel): - global_path: str - - def delete_file(self, url: str, headers: dict) -> DeleteFileResponse: - """Posts to the given URL with the given headers to delete the file or directory at the specified global path. - Returns the parsed response. - Raises: - HTTPError if the response code is not ok. - """ - LOG.info(get_request_log(headers=headers, body=self.model_dump())) - - response: Response = APIRequest.api_request_from_content( - api_method=APIMethods.POST, - url=url, - headers=headers, - json=self.model_dump(), - verify=False, - ) - response.raise_for_status() - return DeleteFileResponse.model_validate(response.json()) - class DDNDataFlowClient(ArchiveHandler): """Class for archiving and retrieving folders via DDN Dataflow.""" diff --git a/cg/meta/archive/ddn/models.py b/cg/meta/archive/ddn/models.py new file mode 100644 index 0000000000..d8c6e0b0b8 --- /dev/null +++ b/cg/meta/archive/ddn/models.py @@ -0,0 +1,201 @@ +import logging +from pathlib import Path + +from housekeeper.store.models import File +from pydantic import BaseModel, Field +from requests import Response + +from cg.constants.constants import APIMethods +from cg.io.controller import APIRequest +from cg.meta.archive.ddn.constants import OSTYPE, ROOT_TO_TRIM, JobStatus +from cg.meta.archive.models import FileTransferData, SampleAndDestination +from cg.store.models import Sample + +LOG = logging.getLogger(__name__) + + +def get_request_log(headers: dict, body: dict): + return "Sending request with headers: \n" + f"{headers} \n" + "and body: \n" + f"{body}" + + +class MiriaObject(FileTransferData): + """Model for representing a singular object transfer.""" + + _metadata = None + destination: str + source: str + + @classmethod + def create_from_file_and_sample( + cls, file: File, sample: Sample, is_archiving: bool = True + ) -> "MiriaObject": + """Instantiates the class from a File and Sample object.""" + if is_archiving: + return cls(destination=sample.internal_id, source=file.full_path) + return cls(destination=file.full_path, source=sample.internal_id) + + @classmethod + def create_from_sample_and_destination( + cls, sample_and_destination: SampleAndDestination + ) -> "MiriaObject": + """Instantiates the class from a SampleAndDestination object, + i.e. when we want to fetch a folder containing all spring files for said sample.""" + return cls( + destination=sample_and_destination.destination, + source=sample_and_destination.sample.internal_id, + ) + + def trim_path(self, attribute_to_trim: str): + """Trims the given attribute (source or destination) from its root directory.""" + setattr( + self, + attribute_to_trim, + f"/{Path(getattr(self, attribute_to_trim)).relative_to(ROOT_TO_TRIM)}", + ) + + def add_repositories(self, source_prefix: str, destination_prefix: str): + """Prepends the given repositories to the source and destination paths.""" + self.source: str = source_prefix + self.source + self.destination: str = destination_prefix + self.destination + + +class TransferPayload(BaseModel): + """Model for representing a Dataflow transfer task.""" + + files_to_transfer: list[MiriaObject] + osType: str = OSTYPE + createFolder: bool = True + settings: list[dict] = [] + + def trim_paths(self, attribute_to_trim: str): + """Trims the source path from its root directory for all objects in the transfer.""" + for miria_file in self.files_to_transfer: + miria_file.trim_path(attribute_to_trim=attribute_to_trim) + + def add_repositories(self, source_prefix: str, destination_prefix: str): + """Prepends the given repositories to the source and destination paths all objects in the + transfer.""" + for miria_file in self.files_to_transfer: + miria_file.add_repositories( + source_prefix=source_prefix, destination_prefix=destination_prefix + ) + + def model_dump(self, **kwargs) -> dict: + """Creates a correctly structured dict to be used as the request payload.""" + payload: dict = super().model_dump(exclude={"files_to_transfer"}) + payload["pathInfo"] = [miria_file.model_dump() for miria_file in self.files_to_transfer] + payload["metadataList"] = [] + return payload + + def post_request(self, url: str, headers: dict) -> "TransferResponse": + """Sends a request to the given url with, the given headers, and its own content as payload. + + Arguments: + url: URL to which the POST goes to. + headers: Headers which are set in the request + Raises: + HTTPError if the response status is not successful. + ValidationError if the response does not conform to the expected response structure. + Returns: + The job ID of the launched transfer task. + """ + + LOG.info(get_request_log(headers=headers, body=self.model_dump())) + + response: Response = APIRequest.api_request_from_content( + api_method=APIMethods.POST, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, + ) + response.raise_for_status() + return TransferResponse.model_validate(response.json()) + + +class TransferResponse(BaseModel): + """Model representing th response fields of an archive or retrieve reqeust to the Dataflow + API.""" + + job_id: int = Field(alias="jobId") + + +class AuthPayload(BaseModel): + """Model representing the payload for an Authentication request.""" + + dbName: str + name: str + password: str + superUser: bool = False + + +class RefreshPayload(BaseModel): + """Model representing the payload for Auth-token refresh request.""" + + refresh: str + + +class AuthToken(BaseModel): + """Model representing the response fields from an access request to the Dataflow API.""" + + access: str + expire: int + refresh: str | None = None + + +class GetJobStatusResponse(BaseModel): + """Model representing the response fields from a get_job_status post.""" + + job_id: int = Field(alias="id") + status: JobStatus + + +class GetJobStatusPayload(BaseModel): + """Model representing the payload for a get_job_status request.""" + + id: int + + def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse: + """Sends a get request to the given URL with the given headers. + Returns the parsed status response of the task specified in the URL. + Raises: + HTTPError if the response code is not ok. + """ + + LOG.info(get_request_log(headers=headers, body=self.model_dump())) + + response: Response = APIRequest.api_request_from_content( + api_method=APIMethods.GET, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, + ) + response.raise_for_status() + return GetJobStatusResponse.model_validate(response.json()) + + +class DeleteFileResponse(BaseModel): + message: str + + +class DeleteFilePayload(BaseModel): + global_path: str + + def delete_file(self, url: str, headers: dict) -> DeleteFileResponse: + """Posts to the given URL with the given headers to delete the file or directory at the specified global path. + Returns the parsed response. + Raises: + HTTPError if the response code is not ok. + """ + LOG.info(get_request_log(headers=headers, body=self.model_dump())) + + response: Response = APIRequest.api_request_from_content( + api_method=APIMethods.POST, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, + ) + response.raise_for_status() + return DeleteFileResponse.model_validate(response.json()) diff --git a/tests/meta/archive/conftest.py b/tests/meta/archive/conftest.py index dc260eb6ee..993d90eeed 100644 --- a/tests/meta/archive/conftest.py +++ b/tests/meta/archive/conftest.py @@ -16,13 +16,9 @@ from cg.constants.subject import Gender from cg.io.controller import WriteStream from cg.meta.archive.archive import SpringArchiveAPI -from cg.meta.archive.ddn_dataflow import ( - ROOT_TO_TRIM, - AuthToken, - DDNDataFlowClient, - MiriaObject, - TransferPayload, -) +from cg.meta.archive.ddn.constants import ROOT_TO_TRIM +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import AuthToken, MiriaObject, TransferPayload from cg.meta.archive.models import FileAndSample from cg.models.cg_config import CGConfig, DataFlowConfig from cg.store import Store @@ -143,7 +139,7 @@ def ddn_dataflow_client(ddn_dataflow_config: DataFlowConfig) -> DDNDataFlowClien }, ).encode() with mock.patch( - "cg.meta.archive.ddn_dataflow.APIRequest.api_request_from_content", + "cg.meta.archive.ddn.ddn_data_flow_client.APIRequest.api_request_from_content", return_value=mock_ddn_auth_success_response, ): return DDNDataFlowClient(ddn_dataflow_config) diff --git a/tests/meta/archive/test_archive_api.py b/tests/meta/archive/test_archive_api.py index 8defc705ba..5e68495bef 100644 --- a/tests/meta/archive/test_archive_api.py +++ b/tests/meta/archive/test_archive_api.py @@ -9,14 +9,16 @@ from cg.constants.housekeeper_tags import SequencingFileTag from cg.io.controller import APIRequest from cg.meta.archive.archive import ARCHIVE_HANDLERS, FileAndSample, SpringArchiveAPI -from cg.meta.archive.ddn_dataflow import ( +from cg.meta.archive.ddn.constants import ( FAILED_JOB_STATUSES, ONGOING_JOB_STATUSES, + JobStatus, +) +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import ( AuthToken, - DDNDataFlowClient, GetJobStatusPayload, GetJobStatusResponse, - JobStatus, MiriaObject, ) from cg.meta.archive.models import ArchiveHandler, FileTransferData diff --git a/tests/meta/archive/test_archive_cli.py b/tests/meta/archive/test_archive_cli.py index 8f78d4336a..497775045a 100644 --- a/tests/meta/archive/test_archive_cli.py +++ b/tests/meta/archive/test_archive_cli.py @@ -10,16 +10,18 @@ from cg.constants import EXIT_SUCCESS, SequencingFileTag from cg.constants.archiving import ArchiveLocations from cg.io.controller import APIRequest -from cg.meta.archive.ddn_dataflow import ( +from cg.meta.archive.ddn.constants import ( FAILED_JOB_STATUSES, ONGOING_JOB_STATUSES, + JobStatus, +) +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import ( AuthToken, - DDNDataFlowClient, GetJobStatusPayload, GetJobStatusResponse, - JobStatus, - TransferJob, TransferPayload, + TransferResponse, ) from cg.models.cg_config import CGConfig @@ -74,7 +76,7 @@ def test_archive_spring_files_success( "api_request_from_content", return_value=ok_miria_response, ), mock.patch.object( - TransferPayload, "post_request", return_value=TransferJob(jobId=archival_job_id) + TransferPayload, "post_request", return_value=TransferResponse(jobId=archival_job_id) ): result = cli_runner.invoke( archive_spring_files, diff --git a/tests/meta/archive/test_archiving.py b/tests/meta/archive/test_archiving.py index 4a1b0ddb94..df82702265 100644 --- a/tests/meta/archive/test_archiving.py +++ b/tests/meta/archive/test_archiving.py @@ -9,22 +9,21 @@ from cg.constants.constants import APIMethods, FileFormat from cg.exc import DdnDataflowAuthenticationError from cg.io.controller import APIRequest, WriteStream -from cg.meta.archive.ddn_dataflow import ( +from cg.meta.archive.ddn.constants import ( DESTINATION_ATTRIBUTE, OSTYPE, ROOT_TO_TRIM, SOURCE_ATTRIBUTE, DataflowEndpoints, - DDNDataFlowClient, - MiriaObject, - TransferPayload, ) +from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient +from cg.meta.archive.ddn.models import MiriaObject, TransferPayload from cg.meta.archive.models import FileAndSample, SampleAndDestination from cg.models.cg_config import DataFlowConfig from cg.store import Store from cg.store.models import Sample -FUNCTION_TO_MOCK = "cg.meta.archive.ddn_dataflow.APIRequest.api_request_from_content" +FUNCTION_TO_MOCK = "cg.meta.archive.ddn.ddn_data_flow_client.APIRequest.api_request_from_content" def test_correct_source_root( @@ -168,7 +167,7 @@ def test_ddn_dataflow_client_initialization_invalid_credentials( # WHEN initializing the DDNDataFlowClient class with the invalid credentials with mock.patch( - "cg.meta.archive.ddn_dataflow.APIRequest.api_request_from_content", + "cg.meta.archive.ddn.ddn_data_flow_client.APIRequest.api_request_from_content", return_value=unauthorized_response, ): # THEN an exception should be raised