Skip to content

Commit ec167bd

Browse files
Add condition to generation of dynamic job spec (#109)
1 parent 07f8e80 commit ec167bd

File tree

3 files changed

+76
-2
lines changed

3 files changed

+76
-2
lines changed

flytekit/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from __future__ import absolute_import
22
import flytekit.plugins
33

4-
__version__ = '0.7.0'
4+
__version__ = '0.7.1'

flytekit/common/tasks/sdk_dynamic.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ def _produce_dynamic_job_spec(self, context, inputs):
198198
if not node_output.sdk_node.id:
199199
node_output.sdk_node.assign_id_and_return(node.id)
200200

201+
if len(sub_task_node.inputs) > 0:
201202
# Upload inputs to working directory under /array_job.input_ref/inputs.pb
202203
input_path = _os.path.join(node.id, _constants.INPUT_FILE_NAME)
203204
generated_files[input_path] = _literal_models.LiteralMap(
@@ -280,7 +281,7 @@ def execute(self, context, inputs):
280281
spec, generated_files = self._produce_dynamic_job_spec(context, inputs)
281282

282283
# If no sub-tasks are requested to run, just produce an outputs file like any other single-step tasks.
283-
if len(generated_files) == 0:
284+
if len(generated_files) == 0 and len(spec.nodes) == 0:
284285
return {
285286
_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(
286287
literals={binding.var: binding.binding.to_literal_model() for binding in spec.outputs})

tests/flytekit/unit/models/test_dynamic_wfs.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,76 @@ def test_dynamic_launch_plan_yielding():
5252
assert dj_spec.outputs[0].var == "out"
5353
assert dj_spec.outputs[0].binding.promise.node_id == node_id
5454
assert dj_spec.outputs[0].binding.promise.var == "task_output"
55+
56+
57+
@_tasks.python_task
58+
def empty_task(wf_params):
59+
wf_params.logging.info("Running empty task")
60+
61+
62+
@_workflow.workflow_class()
63+
class EmptyWorkflow(object):
64+
empty_task_task_execution = empty_task()
65+
66+
67+
constant_workflow_lp = EmptyWorkflow.create_launch_plan()
68+
69+
70+
@_tasks.outputs(out=_Types.Integer)
71+
@_tasks.dynamic_task
72+
def lp_yield_empty_wf(wf_params, out):
73+
wf_params.logging.info("Running inner task... yielding a launchplan for empty workflow")
74+
constant_lp_yielding_task_execution = constant_workflow_lp()
75+
yield constant_lp_yielding_task_execution
76+
out.set(42)
77+
78+
79+
def test_dynamic_launch_plan_yielding_of_constant_workflow():
80+
outputs = lp_yield_empty_wf.unit_test()
81+
# TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected
82+
# in the output of a dynamic task.
83+
dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME]
84+
85+
assert len(dj_spec.nodes) == 1
86+
assert len(dj_spec.outputs) == 1
87+
assert dj_spec.outputs[0].var == "out"
88+
assert len(outputs.keys()) == 1
89+
90+
91+
@_tasks.inputs(num=_Types.Integer)
92+
@_tasks.python_task
93+
def log_only_task(wf_params, num):
94+
wf_params.logging.info("{} was called".format(num))
95+
96+
97+
@_workflow.workflow_class()
98+
class InputOnlyWorkflow(object):
99+
a = _workflow.Input(_Types.Integer, default=5, help="Input for inner workflow")
100+
log_only_task_execution = log_only_task(num=a)
101+
102+
103+
input_only_workflow_lp = InputOnlyWorkflow.create_launch_plan()
104+
105+
106+
@_tasks.dynamic_task
107+
def lp_yield_input_only_wf(wf_params):
108+
wf_params.logging.info("Running inner task... yielding a launchplan for input only workflow")
109+
input_only_workflow_lp_execution = input_only_workflow_lp()
110+
yield input_only_workflow_lp_execution
111+
112+
113+
def test_dynamic_launch_plan_yielding_of_input_only_workflow():
114+
outputs = lp_yield_input_only_wf.unit_test()
115+
# TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected
116+
# in the output of a dynamic task.
117+
dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME]
118+
119+
assert len(dj_spec.nodes) == 1
120+
assert len(dj_spec.outputs) == 0
121+
assert len(outputs.keys()) == 2
122+
123+
# Using the id of the launch plan node, and then appending /inputs.pb to the string, should give you in the outputs
124+
# map the LiteralMap of the inputs of that node
125+
input_key = "{}/inputs.pb".format(dj_spec.nodes[0].id)
126+
lp_input_map = outputs[input_key]
127+
assert lp_input_map.literals['a'] is not None

0 commit comments

Comments
 (0)