diff --git a/CHANGELOG.md b/CHANGELOG.md index 83ce97b..241a07b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added + +- ([#167](https://github.com/stac-utils/stac-task/issues/167)) Adds workflow-level + options to the ProcessDefinition object in a new `workflow_options` field. They are + combined with each task's options, giving precedence to the task options on conflict. +- ([#167](https://github.com/stac-utils/stac-task/issues/167)) Adds a `workflow_options` + property to the `Task` class that returns the `workflow_options` dictionary from the + `ProcessDefinition` object. +- ([#167](https://github.com/stac-utils/stac-task/issues/167)) Adds a `task_options` + property to the `Task` class that returns the task options from the `tasks` dictionary + in the `ProcessDefinition` object. + ### Deprecated -- ([#123](https://github.com/stac-utils/stac-task/issues/123)) Bare `ProcessDefinition` +- ([#166](https://github.com/stac-utils/stac-task/issues/123)) Bare `ProcessDefinition` objects are deprecated in favor of arrays of `ProcessDefinition` objects. ## [0.6.0] diff --git a/README.md b/README.md index b255702..a4e548c 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ - [collections](#collections) - [tasks](#tasks) - [TaskConfig Object](#taskconfig-object) + - [workflow_options](#workflow_options) - [Full ProcessDefinition Example](#full-processdefinition-example) - [Migration](#migration) - [0.4.x -\> 0.5.x](#04x---05x) @@ -76,12 +77,13 @@ Task input is often referred to as a 'payload'. A Task can be provided additional configuration via the 'process' field in the input payload. -| Field Name | Type | Description | -| -------------- | ------------------ | ---------------------------------------------- | -| description | string | Description of the process configuration | -| upload_options | `UploadOptions` | An `UploadOptions` object | -| tasks | Map | Dictionary of task configurations. | -| ~~tasks~~ | ~~[`TaskConfig`]~~ | **DEPRECATED** A list of `TaskConfig` objects. | +| Field Name | Type | Description | +| ---------------- | ------------------ | ------------------------------------------------------------------------ | +| description | string | Description of the process configuration | +| upload_options | `UploadOptions` | An `UploadOptions` object | +| tasks | Map | Dictionary of task configurations. | +| ~~tasks~~ | ~~[`TaskConfig`]~~ | **DEPRECATED** A list of `TaskConfig` objects. | +| workflow_options | Map | Dictionary of configuration options applied to all tasks in the workflow | #### UploadOptions Object @@ -162,6 +164,12 @@ for backwards compatibility. | name | str | **REQUIRED** Name of the task | | parameters | Map | Dictionary of keyword parameters that will be passed to the Task `process` function | +#### workflow_options + +The 'workflow_options' field is a dictionary of options that apply to all tasks in the +workflow. The 'workflow_options' dictionary is combined with each task's option +dictionary. If a key in the 'workflow_options' dictionary conflicts with a key in a +task's option dictionary, the task option value takes precedence. ### Full ProcessDefinition Example diff --git a/stactask/task.py b/stactask/task.py index f6b2b8e..71b3424 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -136,15 +136,33 @@ def process_definition(self) -> dict[str, Any]: return process[0] @property - def parameters(self) -> dict[str, Any]: - task_configs = self.process_definition.get("tasks", {}) - if isinstance(task_configs, list): + def workflow_options(self) -> dict[str, Any]: + workflow_options_ = self.process_definition.get("workflow_options", {}) + if not isinstance(workflow_options_, dict): + raise TypeError("unable to parse `workflow_options`: must be type dict") + return workflow_options_ + + @property + def task_options(self) -> dict[str, Any]: + task_options_ = self.process_definition.get("tasks", {}) + if not isinstance(task_options_, (dict, list)): + raise TypeError( + "unable to parse `tasks`: must be type dict or type list (deprecated)" + ) + + if isinstance(task_options_, list): warnings.warn( - "task configs is list, use a dictionary instead", + ( + "`tasks` as a list of TaskConfig objects will be unsupported in a " + "future version; use a dictionary of task options to remove this " + "warning" + ), DeprecationWarning, stacklevel=2, ) - task_config_list = [cfg for cfg in task_configs if cfg["name"] == self.name] + task_config_list = [ + cfg for cfg in task_options_ if cfg["name"] == self.name + ] if len(task_config_list) == 0: return {} else: @@ -153,17 +171,20 @@ def parameters(self) -> dict[str, Any]: if isinstance(parameters, dict): return parameters else: - raise ValueError(f"parameters is not a dict: {type(parameters)}") - elif isinstance(task_configs, dict): - config = task_configs.get(self.name, {}) - if isinstance(config, dict): - return config + raise TypeError("unable to parse `parameters`: must be type dict") + + if isinstance(task_options_, dict): + options = task_options_.get(self.name, {}) + if isinstance(options, dict): + return options else: - raise ValueError( - f"task config for {self.name} is not a dict: {type(config)}" + raise TypeError( + f"unable to parse options for task '{self.name}': must be type dict" ) - else: - raise ValueError(f"unexpected value for 'tasks': {task_configs}") + + @property + def parameters(self) -> dict[str, Any]: + return {**self.workflow_options, **self.task_options} @property def upload_options(self) -> dict[str, Any]: diff --git a/tests/test_task.py b/tests/test_task.py index 526976a..cba6594 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -54,6 +54,40 @@ def test_deprecated_payload_dict(nothing_task: Task) -> None: nothing_task.process_definition +def test_workflow_options_append_task_options(nothing_task: Task) -> None: + nothing_task._payload["process"][0]["workflow_options"] = { + "workflow_option": "workflow_option_value" + } + parameters = nothing_task.parameters + assert parameters == { + "do_nothing": True, + "workflow_option": "workflow_option_value", + } + + +def test_workflow_options_populate_when_no_task_options(nothing_task: Task) -> None: + nothing_task._payload["process"][0]["tasks"].pop("nothing-task") + nothing_task._payload["process"][0]["workflow_options"] = { + "workflow_option": "workflow_option_value" + } + parameters = nothing_task.parameters + assert parameters == { + "workflow_option": "workflow_option_value", + } + + +def test_task_options_supersede_workflow_options(nothing_task: Task) -> None: + nothing_task._payload["process"][0]["workflow_options"] = { + "do_nothing": False, + "workflow_option": "workflow_option_value", + } + parameters = nothing_task.parameters + assert parameters == { + "do_nothing": True, + "workflow_option": "workflow_option_value", + } + + def test_edit_items(nothing_task: Task) -> None: nothing_task.process_definition["workflow"] = "test-task-workflow" assert nothing_task._payload["process"][0]["workflow"] == "test-task-workflow"