Skip to content

Commit

Permalink
[Core-240] Compute Client: Enhance bulk job deletion (#12396)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: e2c5ea858e39fcc9e0c427a539c0311102fe26f0
  • Loading branch information
stephencpope authored and Descartes Labs Build committed Jan 16, 2024
1 parent 6e608d9 commit 1d3d4c5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 35 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Changelog
and an `error` property which can be used to determine if all submissions were successful, what errors may
have occurred, and what jobs have actually been created. Only if the first batch fails hard will the method
raise an exception.
- The efficiency of deleting many jobs at once has been significantly improved using `Function.delete` and
`Function.delete_jobs`. It is still possible to encounter request timeouts with very large numbers of jobs;
workarounds are now documented in the API documentation for the `Function.delete_jobs` method.

### General

Expand Down
26 changes: 18 additions & 8 deletions descarteslabs/core/compute/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,9 @@ def delete(self, delete_results: bool = False):
If any jobs are in a running state, the deletion will fail.
Please see the `:meth:~descarteslabs.compute.Function.delete_jobs` method for more
information on deleting large numbers of jobs.
Parameters
----------
delete_results : bool, default=False
Expand All @@ -953,11 +956,7 @@ def delete(self, delete_results: bool = False):
if self.state == DocumentState.NEW:
raise ValueError("Cannot delete a Function that has not been saved")

for job in self.jobs:
try:
job.delete(delete_result=delete_results)
except exceptions.ConflictError:
pass
self.delete_jobs(delete_results=delete_results)

self._client.session.delete(f"/functions/{self.id}")
self._deleted = True
Expand Down Expand Up @@ -1120,8 +1119,8 @@ def map(
batch_size : int, default=1000
The number of jobs to submit in each batch. The maximum batch size is 1000.
Return
------
Returns
------_
JobBulkCreateResult
An object containing the jobs that were submitted and any errors that occurred.
This object is compatible with a list of Job objects for backwards compatibility.
Expand Down Expand Up @@ -1267,7 +1266,7 @@ def delete_jobs(
query: Optional[JobSearch] = None,
job_ids: List[str] = None,
delete_results: bool = False,
):
) -> List[str]:
"""Deletes all non-running jobs for the Function matching the given query.
If both `query` and `job_ids` are None, all jobs for the Function will be deleted.
Expand All @@ -1277,6 +1276,12 @@ def delete_jobs(
Also deletes any job log blobs for the jobs. Use `delete_results=True` to delete the
job result blobs as well.
There is a limit to how many jobs can be deleted in a single request before the request
times out. If you need to delete a large number of jobs and experience timeouts, consider
using a loop to delete batches, using the `query` parameter with a limit (e.g.
``async_func.delete_jobs(async_func.jobs.limit(10000))``, or use the `job_ids`
parameter to limit the number of jobs to delete.
Parameters
----------
query : JobSearch, optional
Expand All @@ -1285,6 +1290,11 @@ def delete_jobs(
List of job ids to delete.
delete_results : bool, default=False
If True, deletes the job result blobs as well.
Returns
-------
List[str]
List of job ids that were deleted.
"""
if self.state != DocumentState.SAVED:
raise ValueError(
Expand Down
34 changes: 7 additions & 27 deletions descarteslabs/core/compute/tests/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,22 +447,7 @@ def test_build_log(self):

@responses.activate
def test_delete(self):
self.mock_response(
responses.GET,
"/jobs",
json=self.make_page(
[
self.make_job(id="1", status=JobStatus.SUCCESS),
self.make_job(id="2", status=JobStatus.SUCCESS),
self.make_job(id="3", status=JobStatus.RUNNING),
self.make_job(id="4", status=JobStatus.FAILURE),
]
),
)
self.mock_response(responses.DELETE, "/jobs/1", status=204)
self.mock_response(responses.DELETE, "/jobs/2", status=204)
self.mock_response(responses.DELETE, "/jobs/3", status=204)
self.mock_response(responses.DELETE, "/jobs/4", status=204)
self.mock_response(responses.POST, "/jobs/delete", json=["1", "2", "3"])
self.mock_response(responses.DELETE, "/functions/some-id", status=204)

fn = Function(id="some-id", saved=True)
Expand All @@ -488,7 +473,7 @@ def test_delete_new(self):

@responses.activate
def test_delete_no_jobs(self):
self.mock_response(responses.GET, "/jobs", json=self.make_page([]))
self.mock_response(responses.POST, "/jobs/delete", json=[])
self.mock_response(responses.DELETE, "/functions/some-id", status=204)

fn = Function(id="some-id", saved=True)
Expand All @@ -504,22 +489,17 @@ def test_delete_no_jobs(self):
@responses.activate
def test_delete_failed(self):
self.mock_response(
responses.GET,
"/jobs",
json=self.make_page(
[
self.make_job(id="1", status=JobStatus.SUCCESS),
]
),
responses.DELETE,
"/functions/some-id",
status=400,
)
self.mock_response(responses.DELETE, "/jobs/1", status=400)
self.mock_response(responses.POST, "/jobs/delete", json=[])

fn = Function(id="some-id", saved=True)

with self.assertRaises(Exception):
with self.assertRaises(exceptions.BadRequestError):
fn.delete()

self.assert_url_called("/jobs/1")
assert fn._deleted is False
assert fn.state == "saved"
assert fn.id == "some-id"
Expand Down

0 comments on commit 1d3d4c5

Please sign in to comment.