Skip to content

Commit

Permalink
Add cli option to update workflow (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Apr 3, 2020
1 parent f046647 commit 607222e
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import absolute_import
import flytekit.plugins

__version__ = '0.6.0'
__version__ = '0.6.1'
22 changes: 21 additions & 1 deletion flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,13 @@ def list_workflows_paginated(

def get_workflow(self, id):
"""
This returns a single task for a given ID.
This returns a single workflow for a given ID.
:param flytekit.models.core.identifier.Identifier id: The ID representing a given task.
:raises: TODO
:rtype: flytekit.models.admin.workflow.Workflow
"""

return _workflow.Workflow.from_flyte_idl(
super(SynchronousFlyteClient, self).get_workflow(
_common_pb2.ObjectGetRequest(
Expand All @@ -317,6 +318,25 @@ def get_workflow(self, id):
)
)

def update_workflow(self, id, state):
"""
This updates the state of the workflow specified by the ID.
:param id: flytekit.models.core.identifier.Identifier id: The ID representing a given task.
:param int state: Enum value from flytekit.models.workflow.WorkflowState
:return:
"""
if state == "active":
state = _workflow.WorkflowState.ACTIVE
else:
state = _workflow.WorkflowState.ARCHIVED
super(SynchronousFlyteClient, self).update_workflow(
_workflow_pb2.WorkflowUpdateRequest(
id=id.to_flyte_idl(),
state=state,
)
)


####################################################################################################################
#
# Launch Plan Endpoints
Expand Down
10 changes: 10 additions & 0 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ def create_workflow(self, workflow_create_request):
"""
return self._stub.CreateWorkflow(workflow_create_request, metadata=self._metadata)

@_handle_rpc_error
def update_workflow(self, update_request):
"""
Allows updates to a workflow for a given identifier. Currently, only the workflow state can updated for
ACTIVE and ARCHIVED values.
:param flyteidl.admin.workflow_pb2.WorkflowUpdateRequest update_request:
:rtype: flyteidl.admin.workflow_pb2.WorkflowPlanUpdateResponse
"""
return self._stub.UpdateWorkflow(update_request, metadata=self._metadata)

@_handle_rpc_error
def list_workflow_ids_paginated(self, identifier_list_request):
"""
Expand Down
23 changes: 23 additions & 0 deletions flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,12 @@ def _render_schedule_expr(lp):
required=True,
help="Whether or not to set schedule as active."
)
_workflow_state_choice = _click.option(
"--state",
type=_click.Choice(["active", "archived"]),
required=True,
help="Whether or not to set a workflow as visible in the UI."
)
_sort_by_option = _click.option(
"--sort-by",
required=False,
Expand Down Expand Up @@ -774,6 +780,23 @@ def get_workflow(urn, host, insecure):
_click.echo("")


@_flyte_cli.command('update-workflow', cls=_FlyteSubCommand)
@_workflow_state_choice
@_urn_option
@_host_option
@_insecure_option
def update_workflow(state, urn, host, insecure):
"""
Update the state of a certain version of a workflow identified by the URN in the form of
``wf:<project>:<domain>:<workflow_name>:<version>``
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
client.update_workflow(_identifier.Identifier.from_python_std(urn), state)
_click.echo("Successfully updated {}".format(_tt(urn)))



########################################################################################################################
#
# Launch Plan Commands
Expand Down
18 changes: 18 additions & 0 deletions flytekit/models/admin/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,21 @@ def from_flyte_idl(cls, p):
return cls(
compiled_workflow=_compiler_models.CompiledWorkflowClosure.from_flyte_idl(p.compiled_workflow)
)


class WorkflowState(object):
ACTIVE = _admin_workflow.WORKFLOW_ACTIVE
ARCHIVED = _admin_workflow.WORKFLOW_ARCHIVED

@classmethod
def enum_to_string(cls, val):
"""
:param int val:
:rtype: Text
"""
if val == cls.ACTIVE:
return "WORKFLOW_ACTIVE"
elif val == cls.ARCHIVED:
return "WORKFLOW_ARCHIVED"
else:
return "<UNKNOWN>"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
]
},
install_requires=[
"flyteidl>=0.17.9,<1.0.0",
"flyteidl>=0.17.25,<1.0.0",
"click>=6.6,<8.0",
"croniter>=0.3.20,<4.0.0",
"deprecation>=2.0,<3.0",
Expand Down

0 comments on commit 607222e

Please sign in to comment.