Skip to content

Commit

Permalink
change results retrieval (#12144)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 721f1fe6c99998f029d3db16034056cd61df320b
  • Loading branch information
philnugent authored and Descartes Labs Build committed Sep 7, 2023
1 parent a8a3859 commit cbad043
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 45 deletions.
29 changes: 25 additions & 4 deletions descarteslabs/core/compute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Type

from strenum import StrEnum

from ..catalog import Blob, CatalogClient, DeletedObjectError, StorageType
from ..common.client import (
Attribute,
DatetimeAttribute,
Expand Down Expand Up @@ -152,6 +152,17 @@ def __init__(
"""
super().__init__(function_id=function_id, args=args, kwargs=kwargs, **extra)

def _get_result_namespace(self):
"""Returns the namespace for the Job result blob."""
client = ComputeClient.get_default_client()
auth = client.auth

namespace = f"{auth.namespace}"
if auth.payload["org"]:
namespace = f"{auth.payload['org']}:{namespace}"

return namespace

@property
def function(self) -> "Function":
"""Returns the Function the Job belongs to."""
Expand Down Expand Up @@ -257,9 +268,19 @@ def result(self, cast_type: Optional[Type[Serializable]] = None):
ValueError
When `cast_type` does not implement Serializable.
"""
client = ComputeClient.get_default_client()
response = client.session.get(f"/jobs/{self.id}/result", stream=True)
result = response.content
try:
client = CatalogClient(auth=ComputeClient.get_default_client().auth)

result = Blob.get_data(
name=f"{self.function_id}/{self.id}",
namespace=self._get_result_namespace(),
storage_type=StorageType.COMPUTE,
client=client,
)
except ValueError:
raise
except DeletedObjectError:
raise

if not result:
return None
Expand Down
27 changes: 0 additions & 27 deletions descarteslabs/core/compute/tests/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,33 +666,6 @@ def test_map_with_tags(self):
"tags": ["tag1", "tag2"],
}

@responses.activate
def test_results(self):
self.mock_response(
responses.GET,
"/jobs",
json=self.make_page(
[
self.make_job(id="1", status=JobStatus.SUCCESS),
self.make_job(id="2", status=JobStatus.SUCCESS),
]
),
)
self.mock_response(responses.GET, "/jobs/1/result", body=json.dumps(1))
self.mock_response(responses.GET, "/jobs/2/result", body=json.dumps(2))

fn = Function(id="some-id", saved=True)
assert fn.results() == [1, 2]
self.assert_url_called(
"/jobs",
params={
"filter": [
{"op": "eq", "name": "function_id", "val": "some-id"},
{"op": "eq", "name": "status", "val": "success"},
]
},
)

@responses.activate
def test_wait_for_completion(self):
self.mock_response(
Expand Down
42 changes: 28 additions & 14 deletions descarteslabs/core/compute/tests/test_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from datetime import timezone
from unittest import mock

import responses

Expand Down Expand Up @@ -159,33 +160,46 @@ def test_delete_failed(self):
assert job.state == "saved"

@responses.activate
def test_result_empty(self):
self.mock_response(responses.GET, "/jobs/some-id/result", body=None)
job = Job(id="some-id", function_id="some-fn", saved=True)
assert job.result() is None

@responses.activate
def test_result_json(self):
body = json.dumps({"test": "blah"}).encode()
self.mock_response(responses.GET, "/jobs/some-id/result", body=body)
@mock.patch(
"descarteslabs.compute.job.Job._get_result_namespace",
return_value="some-org:some-user",
)
@mock.patch(
"descarteslabs.catalog.blob.Blob.get_data",
return_value=json.dumps({"test": "blah"}).encode(),
)
def test_result_json(self, mock_get_data, mock_get_result_namespace):
job = Job(id="some-id", function_id="some-fn", saved=True)
assert job.result() == {"test": "blah"}

@responses.activate
def test_result_float(self):
body = json.dumps(15.68).encode()
self.mock_response(responses.GET, "/jobs/some-id/result", body=body)
@mock.patch(
"descarteslabs.compute.job.Job._get_result_namespace",
return_value="some-org:some-user",
)
@mock.patch(
"descarteslabs.catalog.blob.Blob.get_data",
return_value=json.dumps(15.68).encode(),
)
def test_result_float(self, mock_get_data, mock_get_result_namespace):
job = Job(id="some-id", function_id="some-fn", saved=True)
assert job.result() == 15.68

@responses.activate
def test_result_cast(self):
@mock.patch(
"descarteslabs.compute.job.Job._get_result_namespace",
return_value="some-org:some-user",
)
@mock.patch(
"descarteslabs.catalog.blob.Blob.get_data",
return_value="blah",
)
def test_result_cast(self, mock_get_data, mock_get_result_namespace):
class CustomString:
@classmethod
def deserialize(cls, data: bytes):
return "custom"

self.mock_response(responses.GET, "/jobs/some-id/result", body="blah")
job = Job(id="some-id", function_id="some-fn", saved=True)
assert job.result(CustomString) == "custom"

Expand Down

0 comments on commit cbad043

Please sign in to comment.