Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Archiving) Modify according to documentation #2692

Merged
merged 13 commits into from
Nov 27, 2023
75 changes: 24 additions & 51 deletions cg/meta/archive/ddn_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from urllib.parse import urljoin

from housekeeper.store.models import File
from pydantic import BaseModel
from pydantic import BaseModel, Field
from requests.models import Response

from cg.constants.constants import APIMethods
Expand Down Expand Up @@ -38,20 +38,24 @@ class DataflowEndpoints(StrEnum):
GET_AUTH_TOKEN = "auth/token"
REFRESH_AUTH_TOKEN = "auth/token/refresh"
RETRIEVE_FILES = "files/retrieve"
GET_JOB_STATUS = "getJobStatus"
GET_JOB_STATUS = "activity/jobs/"


class JobDescription(StrEnum):
class JobStatus(StrEnum):
"""Enum for the different job statuses which can be returned via Miria."""

CANCELED = "Canceled"
COMPLETED = "Completed"
CREATION = "Creation"
DENIED = "Denied"
CREATION_IN_PROGRESS = "Creation in progress"
IN_QUEUE = "In Queue"
INVALID_LICENSE = "Invalid license"
ON_VALIDATION = "On validation"
REFUSED = "Refused"
RUNNING = "Running"
SUSPENDED = "Suspended"
TERMINATED_ON_ERROR = "Terminated on Error"
TERMINATED_ON_ERROR = "Terminated on error"
TERMINATED_ON_WARNING = "Terminated on warning"


class MiriaObject(FileTransferData):
Expand Down Expand Up @@ -101,6 +105,7 @@ class TransferPayload(BaseModel):
files_to_transfer: list[MiriaObject]
osType: str = OSTYPE
createFolder: bool = False
settings: list[dict] = []

def trim_paths(self, attribute_to_trim: str):
"""Trims the source path from its root directory for all objects in the transfer."""
Expand Down Expand Up @@ -172,60 +177,28 @@ class TransferJob(BaseModel):
"""Model representing th response fields of an archive or retrieve reqeust to the Dataflow
API."""

job_id: int


class SubJob(BaseModel):
"""Model representing the response fields in a subjob returned in a get_job_status post."""

subjob_id: int
subjob_type: str
status: int
description: str
progress: float
total_rate: int
throughput: int
estimated_end: datetime
estimated_left: int
job_id: int = Field(alias="jobId")


class GetJobStatusResponse(BaseModel):
"""Model representing the response fields from a get_job_status post."""

request_date: Optional[datetime] = None
operation: Optional[str] = None
job_id: int
type: Optional[str] = None
status: Optional[int] = None
description: str
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
durationTime: Optional[int] = None
priority: Optional[int] = None
progress: Optional[float] = None
subjobs: Optional[list[SubJob]] = None
job_id: int = Field(serialization_alias="id")
status: str


class GetJobStatusPayload(BaseModel):
"""Model representing the payload for a get_job_status request."""

job_id: int
subjob_id: Optional[int] = None
related_jobs: Optional[bool] = None
main_subjob: Optional[bool] = None
debug: Optional[bool] = None
id: int

def post_request(self, url: str, headers: dict) -> GetJobStatusResponse:
"""Sends a request to the given url with the given headers, and its own content as
payload. Returns the job ID of the launched transfer task.
def get_status(self, url: str, headers: dict) -> GetJobStatusResponse:
islean marked this conversation as resolved.
Show resolved Hide resolved
"""Sends a get request to the given url with the given headers.
islean marked this conversation as resolved.
Show resolved Hide resolved
Returns the parsed status response of the task specified in the URL.
Raises:
HTTPError if the response code is not ok.
"""
HTTPError if the response code is not ok."""
islean marked this conversation as resolved.
Show resolved Hide resolved
response: Response = APIRequest.api_request_from_content(
api_method=APIMethods.POST,
url=url,
headers=headers,
json=self.model_dump(),
api_method=APIMethods.GET, url=url, headers=headers, json={}
)
response.raise_for_status()
return GetJobStatusResponse.model_validate(response.json())
Expand Down Expand Up @@ -356,15 +329,15 @@ def convert_into_transfer_data(
]

def is_job_done(self, job_id: int) -> bool:
get_job_status_payload = GetJobStatusPayload(job_id=job_id)
get_job_status_response: GetJobStatusResponse = get_job_status_payload.post_request(
url=urljoin(self.url, DataflowEndpoints.GET_JOB_STATUS),
get_job_status_payload = GetJobStatusPayload(id=job_id)
get_job_status_response: GetJobStatusResponse = get_job_status_payload.get_status(
url=urljoin(self.url, DataflowEndpoints.GET_JOB_STATUS + str(job_id)),
headers=dict(self.headers, **self.auth_header),
)
if get_job_status_response.description == JobDescription.COMPLETED:
if get_job_status_response.status == JobStatus.COMPLETED:
return True
LOG.info(
f"Job with id {job_id} has not been completed. "
f"Current job description is {get_job_status_response.description}"
f"Current job description is {get_job_status_response.status}"
)
return False
15 changes: 4 additions & 11 deletions tests/meta/archive/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ def ddn_dataflow_config(

@pytest.fixture
def ok_ddn_response(ok_response: Response):
ok_response._content = b'{"job_id": "123"}'
ok_response._content = b'{"jobId": "123"}'
return ok_response


@pytest.fixture
def ok_ddn_job_status_response(ok_response: Response):
islean marked this conversation as resolved.
Show resolved Hide resolved
ok_response._content = b'{"job_id": "123", "description": "Completed"}'
ok_response._content = b'{"jobId": "123", "status": "Completed"}'
return ok_response


Expand All @@ -69,15 +69,7 @@ def archive_request_json(
}
],
"metadataList": [],
}


@pytest.fixture
def get_job_status_request_json(
remote_storage_repository: str, local_storage_repository: str, trimmed_local_path: str
) -> dict:
return {
"job_id": 123,
"settings": [],
}


Expand All @@ -97,6 +89,7 @@ def retrieve_request_json(
}
],
"metadataList": [],
"settings": [],
}


Expand Down
16 changes: 8 additions & 8 deletions tests/meta/archive/test_archive_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
DDNDataFlowClient,
GetJobStatusPayload,
GetJobStatusResponse,
JobDescription,
JobStatus,
MiriaObject,
)
from cg.meta.archive.models import ArchiveHandler, FileTransferData
Expand Down Expand Up @@ -222,7 +222,7 @@ def test_archive_all_non_archived_spring_files(

@pytest.mark.parametrize(
"job_status, should_date_be_set",
[(JobDescription.COMPLETED, True), (JobDescription.RUNNING, False)],
[(JobStatus.COMPLETED, True), (JobStatus.RUNNING, False)],
)
def test_get_archival_status(
spring_archive_api: SpringArchiveAPI,
Expand All @@ -232,7 +232,7 @@ def test_get_archival_status(
header_with_test_auth_token,
test_auth_token: AuthToken,
archival_job_id: int,
job_status: JobDescription,
job_status: JobStatus,
should_date_be_set: bool,
):
# GIVEN a file with an ongoing archival
Expand All @@ -250,8 +250,8 @@ def test_get_archival_status(
return_value=ok_ddn_job_status_response,
), mock.patch.object(
GetJobStatusPayload,
"post_request",
return_value=GetJobStatusResponse(job_id=archival_job_id, description=job_status),
"get_status",
return_value=GetJobStatusResponse(job_id=archival_job_id, status=job_status),
):
spring_archive_api.update_ongoing_task(
task_id=archival_job_id,
Expand All @@ -265,7 +265,7 @@ def test_get_archival_status(

@pytest.mark.parametrize(
"job_status, should_date_be_set",
[(JobDescription.COMPLETED, True), (JobDescription.RUNNING, False)],
[(JobStatus.COMPLETED, True), (JobStatus.RUNNING, False)],
)
def test_get_retrieval_status(
spring_archive_api: SpringArchiveAPI,
Expand Down Expand Up @@ -296,8 +296,8 @@ def test_get_retrieval_status(
return_value=ok_ddn_job_status_response,
), mock.patch.object(
GetJobStatusPayload,
"post_request",
return_value=GetJobStatusResponse(job_id=retrieval_job_id, description=job_status),
"get_status",
return_value=GetJobStatusResponse(job_id=retrieval_job_id, status=job_status),
):
spring_archive_api.update_ongoing_task(
task_id=retrieval_job_id,
Expand Down
2 changes: 2 additions & 0 deletions tests/meta/archive/test_archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ def test_archive_folders(
"osType": OSTYPE,
"createFolder": False,
"metadataList": [],
"settings": [],
},
)

Expand Down Expand Up @@ -349,6 +350,7 @@ def test_retrieve_samples(
"osType": OSTYPE,
"createFolder": False,
"metadataList": [],
"settings": [],
},
)

Expand Down
Loading