Skip to content

Commit

Permalink
[Core-223] Compute Client: Add support for environment variables. (#1…
Browse files Browse the repository at this point in the history
…2356)

GitOrigin-RevId: c9ada4ea35e6d37d6b37978efd61a9bc0ef73fc0
  • Loading branch information
stephencpope authored and Descartes Labs Build committed Dec 11, 2023
1 parent a63e98f commit 1f0c3dd
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 16 deletions.
38 changes: 32 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,36 @@ The documentation for the latest release can be found at [https://docs.descartes

Changelog
=========
## Unreleased

### Catalog

- The `tags` attributes on Catalog objects can now contain up to 32 elements, each up to 1000 characters long.
But why would you even want to go there?

### Compute

- `Function` and `Job` objects now have a new `environment` attribute which can be used to define environment
variables for the jobs when they are run.
- Breaking Change: The `Function.map` method previously had no bound on how many jobs could be created at one time.
This led to operational problems with very large numbers of jobs. Now it submits jobs in batches (up to 1000
jobs per batch) to avoid request timeouts, and is more robust on retryable errors so that duplicate jobs are not
submitted accidently. There is still no bound on how many jobs you may create with a single call to `Function.map`.
Additionally, since it is possible that some jobs may be successfully submitted, and others not, the return
value, while still behaving as a list of `Job`s, is now a `JobBulkCreateResult` object which has a `is_success`
and an `error` property which can be used to determine if all submissions were successful, what errors may
have occurred, and what jobs have actually been created. Only if the first batch fails hard will the method
raise an exception.

### General

- The minimum required version of `urllib3` has been bumped to 1.26.18 to address a security vulnerability.
- The old client version v1.12.1 is reaching end of life and will no longer be supported as of February 2024.
You can expect the version to stop working at any point after that as legacy backend support is turned off.

## [2.1.2] - 2023-10-31

## Compute
### Compute

- `Function.delete_jobs` was failing to implement the `delete_results` parameter, so job result blobs
were not being deleted. This has been fixed.
Expand All @@ -31,24 +57,24 @@ Changelog

## [2.1.1] - 2023-10-16

## Compute
### Compute

- Filtering on datetime attributes (such as `Function.created_at`) didn't previously work with anything
but `datetime` instances. Now it also handles iso format strings and unix timestamps (int or float).

## [2.1.0] - 2023-10-04

## General
### General

- Following our lifecycle policy, client versions v1.11.0 and earlier are no longer supported. They may
cease to work with the Platform at any time.

## Catalog
### Catalog

- The Catalog `Blob` class now has a `get_data()` method which can be used to retrieve the blob
data directly given the id, without having to first retrieve the `Blob` metadata.

## Compute
### Compute

- *Breaking Change* The status values for `Function` and `Job` objects have changed, to provide a
better experience managing the flow of jobs. Please see the updated Compute guide for a full explanation.
Expand Down Expand Up @@ -125,7 +151,7 @@ Changelog

- The compute package was restructured to make all the useful and relevant classes available at the top level.

## Utils
### Utils

- Property filters can now be deserialized as well as serialized.

Expand Down
46 changes: 42 additions & 4 deletions descarteslabs/core/compute/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ def __init__(
function: "Function",
args,
kwargs,
environments,
exception: Exception,
reference_id: str,
):
self.function = function
self.args = args
self.kwargs = kwargs
self.environments = environments
self.exception = exception
self.reference_id = reference_id

Expand Down Expand Up @@ -251,6 +253,10 @@ class Function(Document):
sortable=True,
doc="Whether the Function will be placed into READY status once building is complete.",
)
environment: Dict[str, str] = Attribute(
dict,
doc="The environment variables to set for Jobs run by the Function.",
)
modified_date: datetime = DatetimeAttribute(
filterable=True,
readonly=True,
Expand Down Expand Up @@ -376,7 +382,13 @@ def __init__(
**extra,
)

def __call__(self, *args, tags: List[str] = None, **kwargs):
def __call__(
self,
*args,
tags: List[str] = None,
environment: Dict[str, str] = None,
**kwargs,
):
"""Execute the function with the given arguments.
This call will return a Job object that can be used to monitor the state
Expand All @@ -395,9 +407,19 @@ def __call__(self, *args, tags: List[str] = None, **kwargs):
A list of tags to apply to the Job.
kwargs : Any, optional
Keyword arguments to pass to the function.
environment : Dict[str, str], optional
Environment variables to be set in the environment of the running Job.
Will be merged with environment variables set on the Function, with
the Job environment variables taking precedence.
"""
self.save()
job = Job(function_id=self.id, args=args, kwargs=kwargs, tags=tags)
job = Job(
function_id=self.id,
args=args,
kwargs=kwargs,
environment=environment,
tags=tags,
)
job.save()
return job

Expand Down Expand Up @@ -1050,6 +1072,7 @@ def map(
kwargs: Iterable[Mapping[str, Any]] = None,
tags: List[str] = None,
batch_size: int = 1000,
environments: Iterable[Mapping[str, str]] = None,
) -> JobBulkCreateResult:
"""Submits multiple jobs efficiently with positional args to each function call.
Expand Down Expand Up @@ -1088,6 +1111,10 @@ def map(
kwargs : Iterable[Mapping[str, Any]], optional
An iterable of Mappings with keyword arguments. For each outer element, the Mapping will
be expanded into keyword arguments for the function.
environments : Iterable[Mapping[str, str]], optional
AN iterable of Mappings of Environment variables to be set in the environment of the
running Jobs. The values for each job will be merged with environment variables set
on the Function, with the Job environment variables taking precedence.
tags : List[str], optional
A list of tags to apply to all jobs submitted.
batch_size : int, default=1000
Expand Down Expand Up @@ -1124,19 +1151,29 @@ def map(
"The number of kwargs must match the number of args. "
f"Got {len(args)} args and {len(kwargs)} kwargs."
)
if environments is not None:
environments = [dict(mapping) for mapping in environments]
if len(environments) != len(args):
raise ValueError(
"The number of environments must match the number of args. "
f"Got {len(args)} args and {len(environments)} environments."
)

result = JobBulkCreateResult()

# Send the jobs in batches of batch_size
for index, (positional, named) in enumerate(
for index, (positional, named, env) in enumerate(
itertools.zip_longest(
batched(args, batch_size), batched(kwargs or [], batch_size)
batched(args, batch_size),
batched(kwargs or [], batch_size),
batched(environments or [], batch_size),
)
):
payload = {
"function_id": self.id,
"bulk_args": positional,
"bulk_kwargs": named,
"bulk_environments": env,
"reference_id": str(uuid.uuid4()),
}

Expand All @@ -1162,6 +1199,7 @@ def map(
function=self,
args=payload["bulk_args"],
kwargs=payload["bulk_kwargs"],
environments=payload["bulk_environments"],
reference_id=payload["reference_id"],
exception=exc,
)
Expand Down
22 changes: 19 additions & 3 deletions descarteslabs/core/compute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import time
import warnings
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Type
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type

from strenum import StrEnum

Expand Down Expand Up @@ -119,7 +119,12 @@ class Job(Document):
sortable=True,
doc="The exit code of the Job.",
)
kwargs: Optional[Dict] = Attribute(dict, doc="The parameters provided to the Job.")
kwargs: Optional[Dict[str, Any]] = Attribute(
dict, doc="The parameters provided to the Job."
)
environment: Optional[Dict[str, str]] = Attribute(
dict, doc="The environment variables provided to the Job."
)
last_completion_date: Optional[datetime] = DatetimeAttribute(
filterable=True,
readonly=True,
Expand Down Expand Up @@ -187,6 +192,7 @@ def __init__(
args: Optional[List] = None,
kwargs: Optional[Dict] = None,
client: ComputeClient = None,
environment: Optional[Dict[str, str]] = None,
**extra,
):
"""
Expand All @@ -198,12 +204,22 @@ def __init__(
A list of positional arguments to pass to the function.
kwargs : Dict, optional
A dictionary of named arguments to pass to the function.
environment : Dict[str, str], optional
Environment variables to be set in the environment of the running Job.
Will be merged with environment variables set on the Function, with
the Job environment variables taking precedence.
client: ComputeClient, optional
The compute client to use for requests.
If not set, the default client will be used.
"""
self._client = client or ComputeClient.get_default_client()
super().__init__(function_id=function_id, args=args, kwargs=kwargs, **extra)
super().__init__(
function_id=function_id,
args=args,
kwargs=kwargs,
environment=environment,
**extra,
)

# support use of jobs in sets
def __hash__(self):
Expand Down
16 changes: 14 additions & 2 deletions descarteslabs/core/compute/tests/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def generate_function(self, **params):
"retry_count": random.randint(1, 10),
"status": str(FunctionStatus.READY),
}
function.update(**params)
return function


Expand Down Expand Up @@ -653,6 +654,7 @@ def test_map(self):
result = fn.map(
[[1, 2], [3, 4]],
kwargs=[{"first": 1, "second": 2}, {"first": 1.0, "second": 2.0}],
environments=[{"FOO": "BAR"}, {"FOO": "BAZ"}],
)
assert result.is_success
assert len(result) == 2
Expand All @@ -665,6 +667,7 @@ def test_map(self):
assert request_json == {
"bulk_args": [[1, 2], [3, 4]],
"bulk_kwargs": [{"first": 1, "second": 2}, {"first": 1.0, "second": 2.0}],
"bulk_environments": [{"FOO": "BAR"}, {"FOO": "BAZ"}],
"function_id": "some-id",
}

Expand All @@ -676,9 +679,10 @@ def request_callback(request: PreparedRequest):

args = payload["bulk_args"] or []
kwargs = payload["bulk_kwargs"] or []
environments = payload["bulk_environments"] or []

for args, kwargs in itertools.zip_longest(args, kwargs):
jobs.append(self.make_job(args=args, kwargs=kwargs))
for args, kwargs, envs in itertools.zip_longest(args, kwargs, environments):
jobs.append(self.make_job(args=args, kwargs=kwargs, environments=envs))

return (200, {}, json.dumps(jobs))

Expand Down Expand Up @@ -762,6 +766,7 @@ def test_map_deprecated(self):
assert request_json == {
"bulk_args": [[1, 2], [3, 4]],
"bulk_kwargs": [{"first": 1, "second": 2}, {"first": 1.0, "second": 2.0}],
"bulk_environments": None,
"function_id": "some-id",
}

Expand All @@ -787,16 +792,22 @@ def inner(t):
yield inner(int)
yield inner(float)

def envgenerator():
for i in range(2):
yield {"FOO": str(i)}

fn.map(
generator(),
kwgenerator(),
environments=envgenerator(),
)
request = responses.calls[-1].request
request_json: dict = json.loads(request.body)
assert request_json.pop("reference_id") is not None
assert request_json == {
"bulk_args": [[1, 2], [3, 4]],
"bulk_kwargs": [{"first": 1, "second": 2}, {"first": 1.0, "second": 2.0}],
"bulk_environments": [{"FOO": "0"}, {"FOO": "1"}],
"function_id": "some-id",
}

Expand All @@ -820,6 +831,7 @@ def test_map_with_tags(self):
assert request_json == {
"bulk_args": [[1, 2], [3, 4]],
"bulk_kwargs": [{"first": 1, "second": 2}, {"first": 1.0, "second": 2.0}],
"bulk_environments": None,
"function_id": "some-id",
"tags": ["tag1", "tag2"],
}
Expand Down
9 changes: 8 additions & 1 deletion descarteslabs/core/compute/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
class TestCreateJob(BaseTestCase):
@responses.activate
def test_create(self):
params = dict(function_id="some-fn", args=[1, 2], kwargs={"key": "blah"})
params = dict(
function_id="some-fn",
args=[1, 2],
kwargs={"key": "blah"},
environment={"FOO": "BAR"},
)
self.mock_job_create(params)

job = Job(**params)
Expand Down Expand Up @@ -108,6 +113,7 @@ def test_get(self):
function_id="function-id",
args=[1, 2],
kwargs={"first": "blah", "second": "blah"},
environment={"FOO": "BAR"},
),
)
job = Job.get("some-id")
Expand All @@ -121,6 +127,7 @@ def test_get(self):
"function_id": "function-id",
"id": "some-id",
"kwargs": {"first": "blah", "second": "blah"},
"environment": {"FOO": "BAR"},
"last_completion_date": None,
"last_execution_date": None,
"runtime": None,
Expand Down

0 comments on commit 1f0c3dd

Please sign in to comment.