Skip to content

Commit

Permalink
Merge branch 'master' into parse-concentration
Browse files Browse the repository at this point in the history
  • Loading branch information
islean authored Dec 15, 2023
2 parents 7bae2a9 + cf2dca5 commit e6a9ff5
Show file tree
Hide file tree
Showing 39 changed files with 750 additions and 521 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 54.4.1
current_version = 54.4.8
commit = True
tag = True
tag_name = v{new_version}
Expand Down
2 changes: 1 addition & 1 deletion cg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__title__ = "cg"
__version__ = "54.4.1"
__version__ = "54.4.8"
11 changes: 0 additions & 11 deletions cg/constants/cgstats.py

This file was deleted.

13 changes: 13 additions & 0 deletions cg/constants/housekeeper_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,17 @@ class BalsamicProtectedTags:
["gisaid-log"],
["gisaid-csv"],
],
str(Pipeline.RNAFUSION): [
[AnalysisTag.FUSION, AnalysisTag.ARRIBA],
[AnalysisTag.FUSION, AnalysisTag.STARFUSION],
[AnalysisTag.FUSION, AnalysisTag.FUSIONCATCHER],
[AnalysisTag.FUSIONINSPECTOR],
[AnalysisTag.FUSIONREPORT, AnalysisTag.RESEARCH],
[AnalysisTag.FUSIONINSPECTOR_HTML, AnalysisTag.RESEARCH],
[AnalysisTag.ARRIBA_VISUALIZATION, AnalysisTag.RESEARCH],
[AnalysisTag.MULTIQC_HTML, AnalysisTag.RNA],
[HK_DELIVERY_REPORT_TAG],
[AnalysisTag.VCF_FUSION],
[AnalysisTag.GENE_COUNTS],
],
}
8 changes: 8 additions & 0 deletions cg/io/gzip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import gzip
from pathlib import Path


def read_gzip_first_line(file_path: Path) -> str:
"""Return first line of gzip file."""
with gzip.open(file_path) as file:
return file.readline().decode()
2 changes: 1 addition & 1 deletion cg/meta/archive/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cg.constants import SequencingFileTag
from cg.constants.archiving import ArchiveLocations
from cg.exc import ArchiveJobFailedError
from cg.meta.archive.ddn_dataflow import DDNDataFlowClient
from cg.meta.archive.ddn.ddn_data_flow_client import DDNDataFlowClient
from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination
from cg.models.cg_config import DataFlowConfig
from cg.store import Store
Expand Down
Empty file added cg/meta/archive/ddn/__init__.py
Empty file.
51 changes: 51 additions & 0 deletions cg/meta/archive/ddn/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from enum import StrEnum

OSTYPE: str = "Unix/MacOS"
ROOT_TO_TRIM: str = "/home"

DESTINATION_ATTRIBUTE: str = "destination"
SOURCE_ATTRIBUTE: str = "source"


class DataflowEndpoints(StrEnum):
"""Enum containing all DDN dataflow endpoints used."""

ARCHIVE_FILES = "files/archive"
DELETE_FILE = "files/delete"
GET_AUTH_TOKEN = "auth/token"
REFRESH_AUTH_TOKEN = "auth/token/refresh"
RETRIEVE_FILES = "files/retrieve"
GET_JOB_STATUS = "activity/jobs/"


class JobStatus(StrEnum):
"""Enum for the different job statuses which can be returned via Miria."""

CANCELED = "Canceled"
COMPLETED = "Completed"
DENIED = "Denied"
CREATION_IN_PROGRESS = "Creation in progress"
IN_QUEUE = "In Queue"
INVALID_LICENSE = "Invalid license"
ON_VALIDATION = "On validation"
REFUSED = "Refused"
RUNNING = "Running"
SUSPENDED = "Suspended"
TERMINATED_ON_ERROR = "Terminated on error"
TERMINATED_ON_WARNING = "Terminated on warning"


FAILED_JOB_STATUSES: list[str] = [
JobStatus.CANCELED,
JobStatus.DENIED,
JobStatus.INVALID_LICENSE,
JobStatus.REFUSED,
JobStatus.TERMINATED_ON_ERROR,
JobStatus.TERMINATED_ON_WARNING,
]
ONGOING_JOB_STATUSES: list[str] = [
JobStatus.CREATION_IN_PROGRESS,
JobStatus.IN_QUEUE,
JobStatus.ON_VALIDATION,
JobStatus.RUNNING,
]
Original file line number Diff line number Diff line change
@@ -1,265 +1,37 @@
"""Module for archiving and retrieving folders via DDN Dataflow."""
import logging
from datetime import datetime
from enum import StrEnum
from pathlib import Path
from urllib.parse import urljoin

from housekeeper.store.models import File
from pydantic import BaseModel, Field
from requests.models import Response
from requests import Response

from cg.constants.constants import APIMethods
from cg.exc import ArchiveJobFailedError, DdnDataflowAuthenticationError
from cg.io.controller import APIRequest
from cg.meta.archive.models import (
ArchiveHandler,
FileAndSample,
FileTransferData,
SampleAndDestination,
from cg.meta.archive.ddn.constants import (
DESTINATION_ATTRIBUTE,
FAILED_JOB_STATUSES,
SOURCE_ATTRIBUTE,
DataflowEndpoints,
JobStatus,
)
from cg.meta.archive.ddn.models import (
AuthPayload,
AuthToken,
DeleteFilePayload,
GetJobStatusPayload,
GetJobStatusResponse,
MiriaObject,
RefreshPayload,
TransferPayload,
)
from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination
from cg.models.cg_config import DataFlowConfig
from cg.store.models import Sample

LOG = logging.getLogger(__name__)

OSTYPE: str = "Unix/MacOS"
ROOT_TO_TRIM: str = "/home"

DESTINATION_ATTRIBUTE: str = "destination"
SOURCE_ATTRIBUTE: str = "source"


def get_request_log(headers: dict, body: dict):
return "Sending request with headers: \n" + f"{headers} \n" + "and body: \n" + f"{body}"


class DataflowEndpoints(StrEnum):
"""Enum containing all DDN dataflow endpoints used."""

ARCHIVE_FILES = "files/archive"
DELETE_FILE = "files/delete"
GET_AUTH_TOKEN = "auth/token"
REFRESH_AUTH_TOKEN = "auth/token/refresh"
RETRIEVE_FILES = "files/retrieve"
GET_JOB_STATUS = "activity/jobs/"


class JobStatus(StrEnum):
"""Enum for the different job statuses which can be returned via Miria."""

CANCELED = "Canceled"
COMPLETED = "Completed"
DENIED = "Denied"
CREATION_IN_PROGRESS = "Creation in progress"
IN_QUEUE = "In Queue"
INVALID_LICENSE = "Invalid license"
ON_VALIDATION = "On validation"
REFUSED = "Refused"
RUNNING = "Running"
SUSPENDED = "Suspended"
TERMINATED_ON_ERROR = "Terminated on error"
TERMINATED_ON_WARNING = "Terminated on warning"


FAILED_JOB_STATUSES: list[str] = [
JobStatus.CANCELED,
JobStatus.DENIED,
JobStatus.INVALID_LICENSE,
JobStatus.REFUSED,
JobStatus.TERMINATED_ON_ERROR,
JobStatus.TERMINATED_ON_WARNING,
]
ONGOING_JOB_STATUSES: list[str] = [
JobStatus.CREATION_IN_PROGRESS,
JobStatus.IN_QUEUE,
JobStatus.ON_VALIDATION,
JobStatus.RUNNING,
]


class MiriaObject(FileTransferData):
"""Model for representing a singular object transfer."""

_metadata = None
destination: str
source: str

@classmethod
def create_from_file_and_sample(
cls, file: File, sample: Sample, is_archiving: bool = True
) -> "MiriaObject":
"""Instantiates the class from a File and Sample object."""
if is_archiving:
return cls(destination=sample.internal_id, source=file.full_path)
return cls(destination=file.full_path, source=sample.internal_id)

@classmethod
def create_from_sample_and_destination(
cls, sample_and_destination: SampleAndDestination
) -> "MiriaObject":
"""Instantiates the class from a SampleAndDestination object,
i.e. when we want to fetch a folder containing all spring files for said sample."""
return cls(
destination=sample_and_destination.destination,
source=sample_and_destination.sample.internal_id,
)

def trim_path(self, attribute_to_trim: str):
"""Trims the given attribute (source or destination) from its root directory."""
setattr(
self,
attribute_to_trim,
f"/{Path(getattr(self, attribute_to_trim)).relative_to(ROOT_TO_TRIM)}",
)

def add_repositories(self, source_prefix: str, destination_prefix: str):
"""Prepends the given repositories to the source and destination paths."""
self.source: str = source_prefix + self.source
self.destination: str = destination_prefix + self.destination


class TransferPayload(BaseModel):
"""Model for representing a Dataflow transfer task."""

files_to_transfer: list[MiriaObject]
osType: str = OSTYPE
createFolder: bool = True
settings: list[dict] = []

def trim_paths(self, attribute_to_trim: str):
"""Trims the source path from its root directory for all objects in the transfer."""
for miria_file in self.files_to_transfer:
miria_file.trim_path(attribute_to_trim=attribute_to_trim)

def add_repositories(self, source_prefix: str, destination_prefix: str):
"""Prepends the given repositories to the source and destination paths all objects in the
transfer."""
for miria_file in self.files_to_transfer:
miria_file.add_repositories(
source_prefix=source_prefix, destination_prefix=destination_prefix
)

def model_dump(self, **kwargs) -> dict:
"""Creates a correctly structured dict to be used as the request payload."""
payload: dict = super().model_dump(exclude={"files_to_transfer"})
payload["pathInfo"] = [miria_file.model_dump() for miria_file in self.files_to_transfer]
payload["metadataList"] = []
return payload

def post_request(self, url: str, headers: dict) -> "TransferJob":
"""Sends a request to the given url with, the given headers, and its own content as payload.
Arguments:
url: URL to which the POST goes to.
headers: Headers which are set in the request
Raises:
HTTPError if the response status is not successful.
ValidationError if the response does not conform to the expected response structure.
Returns:
The job ID of the launched transfer task.
"""

LOG.info(get_request_log(headers=headers, body=self.model_dump()))

response: Response = APIRequest.api_request_from_content(
api_method=APIMethods.POST,
url=url,
headers=headers,
json=self.model_dump(),
verify=False,
)
response.raise_for_status()
return TransferJob.model_validate(response.json())


class AuthPayload(BaseModel):
"""Model representing the payload for an Authentication request."""

dbName: str
name: str
password: str
superUser: bool = False


class RefreshPayload(BaseModel):
"""Model representing the payload for Auth-token refresh request."""

refresh: str


class AuthToken(BaseModel):
"""Model representing the response fields from an access request to the Dataflow API."""

access: str
expire: int
refresh: str | None = None


class TransferJob(BaseModel):
"""Model representing th response fields of an archive or retrieve reqeust to the Dataflow
API."""

job_id: int = Field(alias="jobId")


class GetJobStatusResponse(BaseModel):
"""Model representing the response fields from a get_job_status post."""

job_id: int = Field(alias="id")
status: JobStatus


class GetJobStatusPayload(BaseModel):
"""Model representing the payload for a get_job_status request."""

id: int

def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse:
"""Sends a get request to the given URL with the given headers.
Returns the parsed status response of the task specified in the URL.
Raises:
HTTPError if the response code is not ok.
"""

LOG.info(get_request_log(headers=headers, body=self.model_dump()))

response: Response = APIRequest.api_request_from_content(
api_method=APIMethods.GET,
url=url,
headers=headers,
json=self.model_dump(),
verify=False,
)
response.raise_for_status()
return GetJobStatusResponse.model_validate(response.json())


class DeleteFileResponse(BaseModel):
message: str


class DeleteFilePayload(BaseModel):
global_path: str

def delete_file(self, url: str, headers: dict) -> DeleteFileResponse:
"""Posts to the given URL with the given headers to delete the file or directory at the specified global path.
Returns the parsed response.
Raises:
HTTPError if the response code is not ok.
"""
LOG.info(get_request_log(headers=headers, body=self.model_dump()))

response: Response = APIRequest.api_request_from_content(
api_method=APIMethods.POST,
url=url,
headers=headers,
json=self.model_dump(),
verify=False,
)
response.raise_for_status()
return DeleteFileResponse.model_validate(response.json())


class DDNDataFlowClient(ArchiveHandler):
"""Class for archiving and retrieving folders via DDN Dataflow."""
Expand Down
Loading

0 comments on commit e6a9ff5

Please sign in to comment.