Skip to content

Commit f470409

Browse files
authored
(Archiving) get job status (#2319) (minor)
### Added - Support for querying ongoing archivals/retrievals and updating their statuses.
1 parent a318683 commit f470409

File tree

7 files changed

+438
-55
lines changed

7 files changed

+438
-55
lines changed

cg/apps/housekeeper/hk.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
from sqlalchemy.orm import Query
1818

1919
from cg.constants import SequencingFileTag
20-
from cg.exc import HousekeeperBundleVersionMissingError, HousekeeperFileMissingError
20+
from cg.exc import (
21+
HousekeeperArchiveMissingError,
22+
HousekeeperBundleVersionMissingError,
23+
HousekeeperFileMissingError,
24+
)
2125

2226
LOG = logging.getLogger(__name__)
2327

@@ -490,7 +494,6 @@ def set_archive_archived_at(self, file_id: int, archiving_task_id: int):
490494
f"while archiving task id in Housekeeper is {archive.archiving_task_id}."
491495
)
492496
self._store.update_archiving_time_stamp(archive=archive)
493-
self.commit()
494497

495498
def set_archive_retrieval_task_id(self, file_id: int, retrieval_task_id: int) -> None:
496499
"""Sets the retrieval_task_id for an Archive entry. Raises a ValueError if the given retrieval task id
@@ -499,7 +502,6 @@ def set_archive_retrieval_task_id(self, file_id: int, retrieval_task_id: int) ->
499502
if not archive:
500503
raise ValueError(f"No Archive entry found for file with id {file_id}.")
501504
self._store.update_retrieval_task_id(archive=archive, retrieval_task_id=retrieval_task_id)
502-
self.commit()
503505

504506
def get_sample_sheets_from_latest_version(self, flow_cell_id: str) -> list[File]:
505507
"""Returns the files tagged with 'samplesheet' for the given bundle."""
@@ -573,3 +575,54 @@ def store_fastq_path_in_housekeeper(
573575
bundle_name=sample_internal_id,
574576
tag_names=[SequencingFileTag.FASTQ, flow_cell_id, sample_internal_id],
575577
)
578+
579+
def get_archive_entries(
580+
self, archival_task_id: int = None, retrieval_task_id: int = None
581+
) -> list[Archive]:
582+
"""Returns all archives matching the provided task ids. If no task ids are provided, all archive entries are
583+
returned. If only an archival_task_id is provided, filtering is only done on that parameter and vice versa
584+
with retrieval_task_id.
585+
"""
586+
return self._store.get_archives(
587+
archival_task_id=archival_task_id, retrieval_task_id=retrieval_task_id
588+
)
589+
590+
def set_archived_at(self, archival_task_id: int) -> None:
591+
"""Sets archived_at to the current time for archive entries with matching archival task id.
592+
Raises:
593+
HousekeeperArchiveMissingError if no Archive entries match the given retrieval task id.
594+
"""
595+
archive_entries: list[Archive] = self.get_archive_entries(archival_task_id=archival_task_id)
596+
if not archive_entries:
597+
raise HousekeeperArchiveMissingError(
598+
f"Could not find any archives with archival_task_id {archival_task_id}"
599+
)
600+
for archive in archive_entries:
601+
self.set_archive_archived_at(
602+
archiving_task_id=archival_task_id, file_id=archive.file_id
603+
)
604+
self.commit()
605+
606+
def set_retrieved_at(self, retrieval_task_id: int) -> None:
607+
"""Sets retrieved_at to the current time for archive entries with matching archival task id.
608+
Raises:
609+
HousekeeperArchiveMissingError if no Archive entries match the given retrieval task id.
610+
"""
611+
archive_entries: list[Archive] = self.get_archive_entries(
612+
retrieval_task_id=retrieval_task_id
613+
)
614+
if not archive_entries:
615+
raise HousekeeperArchiveMissingError(
616+
f"Could not find any archives with retrieval_task_id {retrieval_task_id}"
617+
)
618+
for archive in archive_entries:
619+
self.set_archive_retrieved_at(
620+
retrieval_task_id=retrieval_task_id, file_id=archive.file_id
621+
)
622+
self.commit()
623+
624+
def get_ongoing_archivals(self) -> list[Archive]:
625+
return self._store.get_ongoing_archivals()
626+
627+
def get_ongoing_retrievals(self) -> list[Archive]:
628+
return self._store.get_ongoing_retrievals()

cg/exc.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ class HousekeeperBundleVersionMissingError(CgError):
120120
"""
121121

122122

123+
class HousekeeperArchiveMissingError(CgError):
124+
"""
125+
Exception raised when an archive is missing in Housekeeper.
126+
"""
127+
128+
123129
class LimsDataError(CgError):
124130
"""
125131
Error related to missing/incomplete data in LIMS.

cg/meta/archive/archive.py

Lines changed: 124 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from pathlib import Path
33
from typing import Callable, Optional, Type
44

5-
from housekeeper.store.models import File
5+
from housekeeper.store.models import Archive, File
66
from pydantic import BaseModel, ConfigDict
77

88
from cg.apps.housekeeper.hk import HousekeeperAPI
@@ -16,6 +16,9 @@
1616

1717
LOG = logging.getLogger(__name__)
1818
DEFAULT_SPRING_ARCHIVE_COUNT = 200
19+
ARCHIVE_HANDLERS: dict[str, Type[ArchiveHandler]] = {
20+
ArchiveLocations.KAROLINSKA_BUCKET: DDNDataFlowClient
21+
}
1922

2023

2124
class ArchiveModels(BaseModel):
@@ -53,11 +56,6 @@ def filter_samples_on_archive_location(
5356
]
5457

5558

56-
ARCHIVE_HANDLERS: dict[str, Type[ArchiveHandler]] = {
57-
ArchiveLocations.KAROLINSKA_BUCKET: DDNDataFlowClient
58-
}
59-
60-
6159
class SpringArchiveAPI:
6260
"""Class handling the archiving of sample SPRING files to an off-premise location for long
6361
term storage."""
@@ -188,7 +186,125 @@ def add_samples_to_files(self, files_to_archive: list[File]) -> list[FileAndSamp
188186
adds it to the list which is returned."""
189187
files_and_samples: list[FileAndSample] = []
190188
for file in files_to_archive:
191-
sample: Optional[Sample] = self.get_sample(file)
192-
if sample:
189+
if sample := self.get_sample(file):
193190
files_and_samples.append(FileAndSample(file=file, sample=sample))
194191
return files_and_samples
192+
193+
def update_status_for_ongoing_tasks(self) -> None:
194+
"""Updates any completed jobs with a finished timestamp."""
195+
self.update_ongoing_archivals()
196+
self.update_ongoing_retrievals()
197+
198+
def update_ongoing_archivals(self) -> None:
199+
ongoing_archivals: list[Archive] = self.housekeeper_api.get_ongoing_archivals()
200+
archival_ids_per_location: dict[
201+
ArchiveLocations, list[int]
202+
] = self.sort_archival_ids_on_archive_location(ongoing_archivals)
203+
for archive_location in ArchiveLocations:
204+
self.update_archival_jobs_for_archive_location(
205+
archive_location=archive_location,
206+
job_ids=archival_ids_per_location.get(archive_location),
207+
)
208+
209+
def update_ongoing_retrievals(self) -> None:
210+
ongoing_retrievals: list[Archive] = self.housekeeper_api.get_ongoing_retrievals()
211+
retrieval_ids_per_location: dict[
212+
ArchiveLocations, list[int]
213+
] = self.sort_retrieval_ids_on_archive_location(ongoing_retrievals)
214+
for archive_location in ArchiveLocations:
215+
self.update_retrieval_jobs_for_archive_location(
216+
archive_location=archive_location,
217+
job_ids=retrieval_ids_per_location.get(archive_location),
218+
)
219+
220+
def update_archival_jobs_for_archive_location(
221+
self, archive_location: ArchiveLocations, job_ids: list[int]
222+
) -> None:
223+
for job_id in job_ids:
224+
self.update_ongoing_task(
225+
task_id=job_id, archive_location=archive_location, is_archival=True
226+
)
227+
228+
def update_retrieval_jobs_for_archive_location(
229+
self, archive_location: ArchiveLocations, job_ids: list[int]
230+
) -> None:
231+
for job_id in job_ids:
232+
self.update_ongoing_task(
233+
task_id=job_id, archive_location=archive_location, is_archival=False
234+
)
235+
236+
def update_ongoing_task(
237+
self, task_id: int, archive_location: ArchiveLocations, is_archival: bool
238+
) -> None:
239+
"""Fetches info on an ongoing job and updates the Archive entry in Housekeeper."""
240+
archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config)
241+
is_job_done: bool = archive_handler.is_job_done(task_id)
242+
if is_job_done:
243+
LOG.info(f"Job with id {task_id} has finished, updating Archive entries.")
244+
if is_archival:
245+
self.housekeeper_api.set_archived_at(task_id)
246+
else:
247+
self.housekeeper_api.set_retrieved_at(task_id)
248+
else:
249+
LOG.info(f"Job with id {task_id} has not yet finished.")
250+
251+
def sort_archival_ids_on_archive_location(
252+
self, archive_entries: list[Archive]
253+
) -> dict[ArchiveLocations, list[int]]:
254+
"""Returns a dictionary with keys being ArchiveLocations and the values being the subset of the given
255+
archival jobs which should be archived there."""
256+
257+
jobs_per_location: dict[ArchiveLocations, list[int]] = {}
258+
jobs_and_locations: set[
259+
tuple[int, ArchiveLocations]
260+
] = self.get_unique_archival_ids_and_their_archive_location(archive_entries)
261+
262+
for archive_location in ArchiveLocations:
263+
jobs_per_location[ArchiveLocations(archive_location)] = [
264+
job_and_location[0]
265+
for job_and_location in jobs_and_locations
266+
if job_and_location[1] == archive_location
267+
]
268+
return jobs_per_location
269+
270+
def get_unique_archival_ids_and_their_archive_location(
271+
self, archive_entries: list[Archive]
272+
) -> set[tuple[int, ArchiveLocations]]:
273+
return set(
274+
[
275+
(archive.archiving_task_id, self.get_archive_location_from_file(archive.file))
276+
for archive in archive_entries
277+
]
278+
)
279+
280+
def sort_retrieval_ids_on_archive_location(
281+
self, archive_entries: list[Archive]
282+
) -> dict[ArchiveLocations, list[int]]:
283+
"""Returns a dictionary with keys being ArchiveLocations and the values being the subset of the given
284+
retrieval jobs which should be archived there."""
285+
jobs_per_location: dict[ArchiveLocations, list[int]] = {}
286+
jobs_and_locations: set[
287+
tuple[int, ArchiveLocations]
288+
] = self.get_unique_retrieval_ids_and_their_archive_location(archive_entries)
289+
for archive_location in ArchiveLocations:
290+
jobs_per_location[ArchiveLocations(archive_location)] = [
291+
job_and_location[0]
292+
for job_and_location in jobs_and_locations
293+
if job_and_location[1] == archive_location
294+
]
295+
return jobs_per_location
296+
297+
def get_unique_retrieval_ids_and_their_archive_location(
298+
self, archive_entries: list[Archive]
299+
) -> set[tuple[int, ArchiveLocations]]:
300+
return set(
301+
[
302+
(archive.retrieval_task_id, self.get_archive_location_from_file(archive.file))
303+
for archive in archive_entries
304+
]
305+
)
306+
307+
def get_archive_location_from_file(self, file: File) -> ArchiveLocations:
308+
return ArchiveLocations(
309+
self.status_db.get_sample_by_internal_id(file.version.bundle.name).archive_location
310+
)

cg/meta/archive/ddn_dataflow.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Module for archiving and retrieving folders via DDN Dataflow."""
2+
import logging
23
from datetime import datetime
34
from enum import StrEnum
45
from pathlib import Path
@@ -21,6 +22,8 @@
2122
from cg.models.cg_config import DataFlowConfig
2223
from cg.store.models import Sample
2324

25+
LOG = logging.getLogger(__name__)
26+
2427
OSTYPE: str = "Unix/MacOS"
2528
ROOT_TO_TRIM: str = "/home"
2629

@@ -35,6 +38,20 @@ class DataflowEndpoints(StrEnum):
3538
GET_AUTH_TOKEN = "auth/token"
3639
REFRESH_AUTH_TOKEN = "auth/token/refresh"
3740
RETRIEVE_FILES = "files/retrieve"
41+
GET_JOB_STATUS = "getJobStatus"
42+
43+
44+
class JobDescription(StrEnum):
45+
"""Enum for the different job statuses which can be returned via Miria."""
46+
47+
CANCELED = "Canceled"
48+
COMPLETED = "Completed"
49+
CREATION = "Creation"
50+
IN_QUEUE = "In Queue"
51+
REFUSED = "Refused"
52+
RUNNING = "Running"
53+
SUSPENDED = "Suspended"
54+
TERMINATED_ON_ERROR = "Terminated on Error"
3855

3956

4057
class MiriaObject(FileTransferData):
@@ -158,6 +175,62 @@ class TransferJob(BaseModel):
158175
job_id: int
159176

160177

178+
class SubJob(BaseModel):
179+
"""Model representing the response fields in a subjob returned in a get_job_status post."""
180+
181+
subjob_id: int
182+
subjob_type: str
183+
status: int
184+
description: str
185+
progress: float
186+
total_rate: int
187+
throughput: int
188+
estimated_end: datetime
189+
estimated_left: int
190+
191+
192+
class GetJobStatusResponse(BaseModel):
193+
"""Model representing the response fields from a get_job_status post."""
194+
195+
request_date: Optional[datetime] = None
196+
operation: Optional[str] = None
197+
job_id: int
198+
type: Optional[str] = None
199+
status: Optional[int] = None
200+
description: str
201+
start_date: Optional[datetime] = None
202+
end_date: Optional[datetime] = None
203+
durationTime: Optional[int] = None
204+
priority: Optional[int] = None
205+
progress: Optional[float] = None
206+
subjobs: Optional[list[SubJob]] = None
207+
208+
209+
class GetJobStatusPayload(BaseModel):
210+
"""Model representing the payload for a get_job_status request."""
211+
212+
job_id: int
213+
subjob_id: Optional[int] = None
214+
related_jobs: Optional[bool] = None
215+
main_subjob: Optional[bool] = None
216+
debug: Optional[bool] = None
217+
218+
def post_request(self, url: str, headers: dict) -> GetJobStatusResponse:
219+
"""Sends a request to the given url with the given headers, and its own content as
220+
payload. Returns the job ID of the launched transfer task.
221+
Raises:
222+
HTTPError if the response code is not ok.
223+
"""
224+
response: Response = APIRequest.api_request_from_content(
225+
api_method=APIMethods.POST,
226+
url=url,
227+
headers=headers,
228+
json=self.model_dump(),
229+
)
230+
response.raise_for_status()
231+
return GetJobStatusResponse.model_validate(response.json())
232+
233+
161234
class DDNDataFlowClient(ArchiveHandler):
162235
"""Class for archiving and retrieving folders via DDN Dataflow."""
163236

@@ -281,3 +354,17 @@ def convert_into_transfer_data(
281354
)
282355
for file_and_sample in files_and_samples
283356
]
357+
358+
def is_job_done(self, job_id: int) -> bool:
359+
get_job_status_payload = GetJobStatusPayload(job_id=job_id)
360+
get_job_status_response: GetJobStatusResponse = get_job_status_payload.post_request(
361+
url=urljoin(self.url, DataflowEndpoints.GET_JOB_STATUS),
362+
headers=dict(self.headers, **self.auth_header),
363+
)
364+
if get_job_status_response.description == JobDescription.COMPLETED:
365+
return True
366+
LOG.info(
367+
f"Job with id {job_id} has not been completed. "
368+
f"Current job description is {get_job_status_response.description}"
369+
)
370+
return False

cg/meta/archive/models.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ def retrieve_samples(self, samples_and_destinations: list[SampleAndDestination])
5353
"""Retrieves all files for all samples for the given flowcell."""
5454
pass
5555

56-
@abstractmethod
57-
def retrieve_file(self, file_and_sample: FileAndSample):
58-
"""Retrieves the specified archived file."""
59-
pass
60-
6156
@abstractmethod
6257
def convert_into_transfer_data(
6358
self, files_and_samples: list[FileAndSample], is_archiving: bool = True
6459
) -> list[FileTransferData]:
6560
"""Converts the provided files_and_samples into a list of objects formatted for the specific archiving flow."""
6661
pass
62+
63+
@abstractmethod
64+
def is_job_done(self, job_id: int) -> bool:
65+
"""Returns true if job has been completed, false otherwise."""
66+
pass

0 commit comments

Comments
 (0)