Skip to content

Commit

Permalink
165: Add workflow-level option support (#167)
Browse files Browse the repository at this point in the history
* feat: add workflow_options to process definition

* docs: add workflow_options to README

* docs: update CHANGELOG

* refactor: add properties for workflow_options and task_options

* docs: update CHANGELOG to reflect the refactored Task class properties
  • Loading branch information
pjhartzell authored Nov 22, 2024
1 parent 061090d commit a30192c
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 21 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added

- ([#167](https://github.com/stac-utils/stac-task/issues/167)) Adds workflow-level
options to the ProcessDefinition object in a new `workflow_options` field. They are
combined with each task's options, giving precedence to the task options on conflict.
- ([#167](https://github.com/stac-utils/stac-task/issues/167)) Adds a `workflow_options`
property to the `Task` class that returns the `workflow_options` dictionary from the
`ProcessDefinition` object.
- ([#167](https://github.com/stac-utils/stac-task/issues/167)) Adds a `task_options`
property to the `Task` class that returns the task options from the `tasks` dictionary
in the `ProcessDefinition` object.

### Deprecated

- ([#123](https://github.com/stac-utils/stac-task/issues/123)) Bare `ProcessDefinition`
- ([#166](https://github.com/stac-utils/stac-task/issues/123)) Bare `ProcessDefinition`
objects are deprecated in favor of arrays of `ProcessDefinition` objects.

## [0.6.0]
Expand Down
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- [collections](#collections)
- [tasks](#tasks)
- [TaskConfig Object](#taskconfig-object)
- [workflow_options](#workflow_options)
- [Full ProcessDefinition Example](#full-processdefinition-example)
- [Migration](#migration)
- [0.4.x -\> 0.5.x](#04x---05x)
Expand Down Expand Up @@ -76,12 +77,13 @@ Task input is often referred to as a 'payload'.
A Task can be provided additional configuration via the 'process' field in the input
payload.

| Field Name | Type | Description |
| -------------- | ------------------ | ---------------------------------------------- |
| description | string | Description of the process configuration |
| upload_options | `UploadOptions` | An `UploadOptions` object |
| tasks | Map<str, Map> | Dictionary of task configurations. |
| ~~tasks~~ | ~~[`TaskConfig`]~~ | **DEPRECATED** A list of `TaskConfig` objects. |
| Field Name | Type | Description |
| ---------------- | ------------------ | ------------------------------------------------------------------------ |
| description | string | Description of the process configuration |
| upload_options | `UploadOptions` | An `UploadOptions` object |
| tasks | Map<str, Map> | Dictionary of task configurations. |
| ~~tasks~~ | ~~[`TaskConfig`]~~ | **DEPRECATED** A list of `TaskConfig` objects. |
| workflow_options | Map<str, Any> | Dictionary of configuration options applied to all tasks in the workflow |


#### UploadOptions Object
Expand Down Expand Up @@ -162,6 +164,12 @@ for backwards compatibility.
| name | str | **REQUIRED** Name of the task |
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Task `process` function |

#### workflow_options

The 'workflow_options' field is a dictionary of options that apply to all tasks in the
workflow. The 'workflow_options' dictionary is combined with each task's option
dictionary. If a key in the 'workflow_options' dictionary conflicts with a key in a
task's option dictionary, the task option value takes precedence.

### Full ProcessDefinition Example

Expand Down
49 changes: 35 additions & 14 deletions stactask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,33 @@ def process_definition(self) -> dict[str, Any]:
return process[0]

@property
def parameters(self) -> dict[str, Any]:
task_configs = self.process_definition.get("tasks", {})
if isinstance(task_configs, list):
def workflow_options(self) -> dict[str, Any]:
workflow_options_ = self.process_definition.get("workflow_options", {})
if not isinstance(workflow_options_, dict):
raise TypeError("unable to parse `workflow_options`: must be type dict")
return workflow_options_

@property
def task_options(self) -> dict[str, Any]:
task_options_ = self.process_definition.get("tasks", {})
if not isinstance(task_options_, (dict, list)):
raise TypeError(
"unable to parse `tasks`: must be type dict or type list (deprecated)"
)

if isinstance(task_options_, list):
warnings.warn(
"task configs is list, use a dictionary instead",
(
"`tasks` as a list of TaskConfig objects will be unsupported in a "
"future version; use a dictionary of task options to remove this "
"warning"
),
DeprecationWarning,
stacklevel=2,
)
task_config_list = [cfg for cfg in task_configs if cfg["name"] == self.name]
task_config_list = [
cfg for cfg in task_options_ if cfg["name"] == self.name
]
if len(task_config_list) == 0:
return {}
else:
Expand All @@ -153,17 +171,20 @@ def parameters(self) -> dict[str, Any]:
if isinstance(parameters, dict):
return parameters
else:
raise ValueError(f"parameters is not a dict: {type(parameters)}")
elif isinstance(task_configs, dict):
config = task_configs.get(self.name, {})
if isinstance(config, dict):
return config
raise TypeError("unable to parse `parameters`: must be type dict")

if isinstance(task_options_, dict):
options = task_options_.get(self.name, {})
if isinstance(options, dict):
return options
else:
raise ValueError(
f"task config for {self.name} is not a dict: {type(config)}"
raise TypeError(
f"unable to parse options for task '{self.name}': must be type dict"
)
else:
raise ValueError(f"unexpected value for 'tasks': {task_configs}")

@property
def parameters(self) -> dict[str, Any]:
return {**self.workflow_options, **self.task_options}

@property
def upload_options(self) -> dict[str, Any]:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,40 @@ def test_deprecated_payload_dict(nothing_task: Task) -> None:
nothing_task.process_definition


def test_workflow_options_append_task_options(nothing_task: Task) -> None:
nothing_task._payload["process"][0]["workflow_options"] = {
"workflow_option": "workflow_option_value"
}
parameters = nothing_task.parameters
assert parameters == {
"do_nothing": True,
"workflow_option": "workflow_option_value",
}


def test_workflow_options_populate_when_no_task_options(nothing_task: Task) -> None:
nothing_task._payload["process"][0]["tasks"].pop("nothing-task")
nothing_task._payload["process"][0]["workflow_options"] = {
"workflow_option": "workflow_option_value"
}
parameters = nothing_task.parameters
assert parameters == {
"workflow_option": "workflow_option_value",
}


def test_task_options_supersede_workflow_options(nothing_task: Task) -> None:
nothing_task._payload["process"][0]["workflow_options"] = {
"do_nothing": False,
"workflow_option": "workflow_option_value",
}
parameters = nothing_task.parameters
assert parameters == {
"do_nothing": True,
"workflow_option": "workflow_option_value",
}


def test_edit_items(nothing_task: Task) -> None:
nothing_task.process_definition["workflow"] = "test-task-workflow"
assert nothing_task._payload["process"][0]["workflow"] == "test-task-workflow"
Expand Down

0 comments on commit a30192c

Please sign in to comment.