Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Archiving) Refactor DDN flow #2766

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cg/meta/archive/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cg.constants import SequencingFileTag
from cg.constants.archiving import ArchiveLocations
from cg.exc import ArchiveJobFailedError
from cg.meta.archive.ddn_dataflow import DDNDataFlowClient
from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient
from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination
from cg.models.cg_config import DataFlowConfig
from cg.store import Store
Expand Down
Empty file.
51 changes: 51 additions & 0 deletions cg/meta/archive/ddn/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from enum import StrEnum

OSTYPE: str = "Unix/MacOS"
ROOT_TO_TRIM: str = "/home"

DESTINATION_ATTRIBUTE: str = "destination"
SOURCE_ATTRIBUTE: str = "source"


class DataflowEndpoints(StrEnum):
"""Enum containing all DDN dataflow endpoints used."""

ARCHIVE_FILES = "files/archive"
DELETE_FILE = "files/delete"
GET_AUTH_TOKEN = "auth/token"
REFRESH_AUTH_TOKEN = "auth/token/refresh"
RETRIEVE_FILES = "files/retrieve"
GET_JOB_STATUS = "activity/jobs/"


class JobStatus(StrEnum):
"""Enum for the different job statuses which can be returned via Miria."""

CANCELED = "Canceled"
COMPLETED = "Completed"
DENIED = "Denied"
CREATION_IN_PROGRESS = "Creation in progress"
IN_QUEUE = "In Queue"
INVALID_LICENSE = "Invalid license"
ON_VALIDATION = "On validation"
REFUSED = "Refused"
RUNNING = "Running"
SUSPENDED = "Suspended"
TERMINATED_ON_ERROR = "Terminated on error"
TERMINATED_ON_WARNING = "Terminated on warning"


FAILED_JOB_STATUSES: list[str] = [
JobStatus.CANCELED,
JobStatus.DENIED,
JobStatus.INVALID_LICENSE,
JobStatus.REFUSED,
JobStatus.TERMINATED_ON_ERROR,
JobStatus.TERMINATED_ON_WARNING,
]
ONGOING_JOB_STATUSES: list[str] = [
islean marked this conversation as resolved.
Show resolved Hide resolved
JobStatus.CREATION_IN_PROGRESS,
JobStatus.IN_QUEUE,
JobStatus.ON_VALIDATION,
JobStatus.RUNNING,
]
Original file line number Diff line number Diff line change
@@ -1,265 +1,37 @@
"""Module for archiving and retrieving folders via DDN Dataflow."""
import logging
from datetime import datetime
from enum import StrEnum
from pathlib import Path
from urllib.parse import urljoin

from housekeeper.store.models import File
from pydantic import BaseModel, Field
from requests.models import Response
from requests import Response

from cg.constants.constants import APIMethods
from cg.exc import ArchiveJobFailedError, DdnDataflowAuthenticationError
from cg.io.controller import APIRequest
from cg.meta.archive.models import (
ArchiveHandler,
FileAndSample,
FileTransferData,
SampleAndDestination,
from cg.meta.archive.ddn.constants import (
DESTINATION_ATTRIBUTE,
FAILED_JOB_STATUSES,
SOURCE_ATTRIBUTE,
DataflowEndpoints,
JobStatus,
)
from cg.meta.archive.ddn.models import (
AuthPayload,
AuthToken,
DeleteFilePayload,
GetJobStatusPayload,
GetJobStatusResponse,
MiriaObject,
RefreshPayload,
TransferPayload,
)
from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination
from cg.models.cg_config import DataFlowConfig
from cg.store.models import Sample

LOG = logging.getLogger(__name__)

OSTYPE: str = "Unix/MacOS"
ROOT_TO_TRIM: str = "/home"

DESTINATION_ATTRIBUTE: str = "destination"
SOURCE_ATTRIBUTE: str = "source"


def get_request_log(headers: dict, body: dict):
return "Sending request with headers: \n" + f"{headers} \n" + "and body: \n" + f"{body}"


class DataflowEndpoints(StrEnum):
"""Enum containing all DDN dataflow endpoints used."""

ARCHIVE_FILES = "files/archive"
DELETE_FILE = "files/delete"
GET_AUTH_TOKEN = "auth/token"
REFRESH_AUTH_TOKEN = "auth/token/refresh"
RETRIEVE_FILES = "files/retrieve"
GET_JOB_STATUS = "activity/jobs/"


class JobStatus(StrEnum):
"""Enum for the different job statuses which can be returned via Miria."""

CANCELED = "Canceled"
COMPLETED = "Completed"
DENIED = "Denied"
CREATION_IN_PROGRESS = "Creation in progress"
IN_QUEUE = "In Queue"
INVALID_LICENSE = "Invalid license"
ON_VALIDATION = "On validation"
REFUSED = "Refused"
RUNNING = "Running"
SUSPENDED = "Suspended"
TERMINATED_ON_ERROR = "Terminated on error"
TERMINATED_ON_WARNING = "Terminated on warning"


FAILED_JOB_STATUSES: list[str] = [
JobStatus.CANCELED,
JobStatus.DENIED,
JobStatus.INVALID_LICENSE,
JobStatus.REFUSED,
JobStatus.TERMINATED_ON_ERROR,
JobStatus.TERMINATED_ON_WARNING,
]
ONGOING_JOB_STATUSES: list[str] = [
JobStatus.CREATION_IN_PROGRESS,
JobStatus.IN_QUEUE,
JobStatus.ON_VALIDATION,
JobStatus.RUNNING,
]


class MiriaObject(FileTransferData):
"""Model for representing a singular object transfer."""

_metadata = None
destination: str
source: str

@classmethod
def create_from_file_and_sample(
cls, file: File, sample: Sample, is_archiving: bool = True
) -> "MiriaObject":
"""Instantiates the class from a File and Sample object."""
if is_archiving:
return cls(destination=sample.internal_id, source=file.full_path)
return cls(destination=file.full_path, source=sample.internal_id)

@classmethod
def create_from_sample_and_destination(
cls, sample_and_destination: SampleAndDestination
) -> "MiriaObject":
"""Instantiates the class from a SampleAndDestination object,
i.e. when we want to fetch a folder containing all spring files for said sample."""
return cls(
destination=sample_and_destination.destination,
source=sample_and_destination.sample.internal_id,
)

def trim_path(self, attribute_to_trim: str):
"""Trims the given attribute (source or destination) from its root directory."""
setattr(
self,
attribute_to_trim,
f"/{Path(getattr(self, attribute_to_trim)).relative_to(ROOT_TO_TRIM)}",
)

def add_repositories(self, source_prefix: str, destination_prefix: str):
"""Prepends the given repositories to the source and destination paths."""
self.source: str = source_prefix + self.source
self.destination: str = destination_prefix + self.destination


class TransferPayload(BaseModel):
"""Model for representing a Dataflow transfer task."""

files_to_transfer: list[MiriaObject]
osType: str = OSTYPE
createFolder: bool = True
settings: list[dict] = []

def trim_paths(self, attribute_to_trim: str):
"""Trims the source path from its root directory for all objects in the transfer."""
for miria_file in self.files_to_transfer:
miria_file.trim_path(attribute_to_trim=attribute_to_trim)

def add_repositories(self, source_prefix: str, destination_prefix: str):
"""Prepends the given repositories to the source and destination paths all objects in the
transfer."""
for miria_file in self.files_to_transfer:
miria_file.add_repositories(
source_prefix=source_prefix, destination_prefix=destination_prefix
)

def model_dump(self, **kwargs) -> dict:
"""Creates a correctly structured dict to be used as the request payload."""
payload: dict = super().model_dump(exclude={"files_to_transfer"})
payload["pathInfo"] = [miria_file.model_dump() for miria_file in self.files_to_transfer]
payload["metadataList"] = []
return payload

def post_request(self, url: str, headers: dict) -> "TransferJob":
"""Sends a request to the given url with, the given headers, and its own content as payload.

Arguments:
url: URL to which the POST goes to.
headers: Headers which are set in the request
Raises:
HTTPError if the response status is not successful.
ValidationError if the response does not conform to the expected response structure.
Returns:
The job ID of the launched transfer task.
"""

LOG.info(get_request_log(headers=headers, body=self.model_dump()))

response: Response = APIRequest.api_request_from_content(
api_method=APIMethods.POST,
url=url,
headers=headers,
json=self.model_dump(),
verify=False,
)
response.raise_for_status()
return TransferJob.model_validate(response.json())


class AuthPayload(BaseModel):
"""Model representing the payload for an Authentication request."""

dbName: str
name: str
password: str
superUser: bool = False


class RefreshPayload(BaseModel):
"""Model representing the payload for Auth-token refresh request."""

refresh: str


class AuthToken(BaseModel):
"""Model representing the response fields from an access request to the Dataflow API."""

access: str
expire: int
refresh: str | None = None


class TransferJob(BaseModel):
"""Model representing th response fields of an archive or retrieve reqeust to the Dataflow
API."""

job_id: int = Field(alias="jobId")


class GetJobStatusResponse(BaseModel):
"""Model representing the response fields from a get_job_status post."""

job_id: int = Field(alias="id")
status: JobStatus


class GetJobStatusPayload(BaseModel):
"""Model representing the payload for a get_job_status request."""

id: int

def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse:
"""Sends a get request to the given URL with the given headers.
Returns the parsed status response of the task specified in the URL.
Raises:
HTTPError if the response code is not ok.
"""

LOG.info(get_request_log(headers=headers, body=self.model_dump()))

response: Response = APIRequest.api_request_from_content(
api_method=APIMethods.GET,
url=url,
headers=headers,
json=self.model_dump(),
verify=False,
)
response.raise_for_status()
return GetJobStatusResponse.model_validate(response.json())


class DeleteFileResponse(BaseModel):
message: str


class DeleteFilePayload(BaseModel):
global_path: str

def delete_file(self, url: str, headers: dict) -> DeleteFileResponse:
"""Posts to the given URL with the given headers to delete the file or directory at the specified global path.
Returns the parsed response.
Raises:
HTTPError if the response code is not ok.
"""
LOG.info(get_request_log(headers=headers, body=self.model_dump()))

response: Response = APIRequest.api_request_from_content(
api_method=APIMethods.POST,
url=url,
headers=headers,
json=self.model_dump(),
verify=False,
)
response.raise_for_status()
return DeleteFileResponse.model_validate(response.json())


class DDNDataFlowClient(ArchiveHandler):
"""Class for archiving and retrieving folders via DDN Dataflow."""
Expand Down
Loading
Loading