Skip to content

Commit 1926b12

Browse files
authored
Add _sync_closure call to reduce load from wait_for_completion calls (#114)
1 parent 597a7b6 commit 1926b12

File tree

6 files changed

+37
-5
lines changed

6 files changed

+37
-5
lines changed

flytekit/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
import flytekit.plugins
44

5-
__version__ = '0.8.0b1'
5+
__version__ = '0.8.0b2'

flytekit/common/mixins/artifact.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ def sync(self):
5151
"""
5252
pass
5353

54+
@_abc.abstractmethod
55+
def _sync_closure(self):
56+
"""
57+
Syncs the closure of the underlying execution artifact with the state observed by the platform.
58+
:rtype: None
59+
"""
60+
pass
61+
5462
def wait_for_completion(self, timeout=None, poll_interval=None):
5563
"""
5664
:param datetime.timedelta timeout: Amount of time to wait until the execution has completed before timing
@@ -64,10 +72,11 @@ def wait_for_completion(self, timeout=None, poll_interval=None):
6472
else:
6573
time_to_give_up = _datetime.datetime.utcnow() + timeout
6674

67-
self.sync()
75+
self._sync_closure()
6876
while _datetime.datetime.utcnow() < time_to_give_up:
6977
if self.is_complete:
78+
self.sync()
7079
return
7180
_time.sleep(poll_interval.total_seconds())
72-
self.sync()
81+
self._sync_closure()
7382
raise _user_exceptions.FlyteTimeout("Execution {} did not complete before timeout.".format(self))

flytekit/common/nodes.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,3 +414,11 @@ def sync(self):
414414
_task_executions.SdkTaskExecution.promote_from_model(te) for te in ne.get_task_executions()
415415
]
416416
# TODO: Sub-workflows too once implemented
417+
418+
def _sync_closure(self):
419+
"""
420+
Syncs the closure of the underlying execution artifact with the state observed by the platform.
421+
:rtype: None
422+
"""
423+
ne = _engine_loader.get_engine().get_node_execution(self)
424+
ne.sync()

flytekit/common/tasks/executions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,11 @@ def promote_from_model(cls, base_model):
106106
)
107107

108108
def sync(self):
109+
self._sync_closure()
110+
111+
def _sync_closure(self):
112+
"""
113+
Syncs the closure of the underlying execution artifact with the state observed by the platform.
114+
:rtype: None
115+
"""
109116
_engine_loader.get_engine().get_task_execution(self).sync()

flytekit/common/workflow_execution.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,17 @@ def sync(self):
123123
:rtype: None
124124
"""
125125
if not self.is_complete or self._node_executions is None:
126-
_engine_loader.get_engine().get_workflow_execution(self).sync()
126+
self._sync_closure()
127127
self._node_executions = self.get_node_executions()
128128

129+
def _sync_closure(self):
130+
"""
131+
Syncs the closure of the underlying execution artifact with the state observed by the platform.
132+
:rtype: None
133+
"""
134+
if not self.is_complete:
135+
_engine_loader.get_engine().get_workflow_execution(self).sync()
136+
129137
def get_node_executions(self, filters=None):
130138
"""
131139
:param list[flytekit.models.filters.Filter] filters:

flytekit/plugins/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
_lazy_loader.LazyLoadPlugin(
2727
"sidecar",
28-
["k8s-proto>=0.0.2,<1.0.0"],
28+
["k8s-proto>=0.0.3,<1.0.0"],
2929
[k8s, flyteidl]
3030
)
3131

0 commit comments

Comments
 (0)