Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflows: update durabletask dependency #757

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ Flask>=1.1
# needed for auto fix
ruff===0.2.2
# needed for dapr-ext-workflow
durabletask>=0.1.1a1
durabletask-dapr >= 0.2.0a3
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def send_alert(ctx, message: str):
except Exception:
pass
if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
workflow=status_monitor_workflow,
input=JobStatus(job_id=job_id, is_healthy=True),
Expand Down
1 change: 1 addition & 0 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
# TODO update to set custom status
return [result1, result2, result3]


Expand Down
28 changes: 25 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from datetime import datetime
from typing import Any, Optional, TypeVar


from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb

from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
Expand Down Expand Up @@ -78,6 +80,7 @@ def schedule_new_workflow(
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
) -> str:
"""Schedules a new workflow instance for execution.

Expand All @@ -90,6 +93,8 @@ def schedule_new_workflow(
start_at: The time when the workflow instance should start executing.
If not specified or if a date-time in the past is specified, the workflow instance will
be scheduled immediately.
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
an existing workflow instance.

Returns:
The ID of the scheduled workflow instance.
Expand All @@ -100,9 +105,14 @@ def schedule_new_workflow(
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)
return self.__obj.schedule_new_orchestration(
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
workflow.__name__,
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)

def get_workflow_state(
Expand Down Expand Up @@ -208,7 +218,9 @@ def raise_workflow_event(
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
def terminate_workflow(
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
Expand All @@ -226,9 +238,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
Args:
instance_id: The ID of the workflow instance to terminate.
output: The optional output to set for the terminated workflow instance.
recursive: The optional flag to terminate all child workflows.

"""
return self.__obj.terminate_orchestration(instance_id, output=output)
return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive)

def pause_workflow(self, instance_id: str):
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
Expand All @@ -246,3 +259,12 @@ def resume_workflow(self, instance_id: str):
instance_id: The instance ID of the workflow to resume.
"""
return self.__obj.resume_orchestration(instance_id)

def purge_workflow(self, instance_id: str, recursive: bool = True):
"""Purge data from a workflow instance.

Args:
instance_id: The instance ID of the workflow to purge.
recursive: The optional flag to also purge data from all child workflows.
"""
return self.__obj.purge_orchestration(instance_id, recursive)
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def current_utc_datetime(self) -> datetime:
def is_replaying(self) -> bool:
return self.__obj.is_replaying

def set_custom_status(self, custom_status: str) -> None:
self._logger.debug(f'{self.instance_id}: Setting custom status to {custom_status}')
self.__obj.set_custom_status(custom_status)

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time')
return self.__obj.create_timer(fire_at)
Expand Down
5 changes: 5 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ def is_replaying(self) -> bool:
"""
pass

@abstractmethod
def set_custom_status(self, custom_status: str) -> None:
"""Set the custom status."""
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
"""Create a Timer Task to fire after at the specified deadline.
Expand Down
2 changes: 1 addition & 1 deletion ext/dapr-ext-workflow/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ packages = find_namespace:
include_package_data = True
install_requires =
dapr-dev >= 1.13.0rc1.dev
durabletask >= 0.1.1a1
durabletask-dapr >= 0.2.0a3

[options.packages.find]
include =
Expand Down
8 changes: 8 additions & 0 deletions ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
mock_create_timer = 'create_timer'
mock_call_activity = 'call_activity'
mock_call_sub_orchestrator = 'call_sub_orchestrator'
mock_custom_status = 'custom_status'


class FakeOrchestrationContext:
def __init__(self):
self.instance_id = mock_instance_id
self.custom_status = None

def create_timer(self, fire_at):
return mock_create_timer
Expand All @@ -40,6 +42,9 @@ def call_activity(self, activity, input):
def call_sub_orchestrator(self, orchestrator, input, instance_id):
return mock_call_sub_orchestrator

def set_custom_status(self, custom_status):
self.custom_status = custom_status


class DaprWorkflowContextTest(unittest.TestCase):
def mock_client_activity(ctx: WorkflowActivityContext, input):
Expand All @@ -65,3 +70,6 @@ def test_workflow_context_functions(self):

create_timer_result = dapr_wf_ctx.create_timer(mock_date_time)
assert create_timer_result == mock_create_timer

dapr_wf_ctx.set_custom_status(mock_custom_status)
assert fakeContext.custom_status == mock_custom_status
7 changes: 7 additions & 0 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
mock_terminate_result = 'terminate001'
mock_suspend_result = 'suspend001'
mock_resume_result = 'resume001'
mock_purge_result = 'purge001'
mockInstanceId = 'instance001'


Expand Down Expand Up @@ -58,6 +59,9 @@ def suspend_orchestration(self, instance_id: str):
def resume_orchestration(self, instance_id: str):
return mock_resume_result

def purge_orchestration(self, instance_id: str, recursive: bool = True):
return mock_purge_result

def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus):
return client.OrchestrationState(
instance_id=instance_id,
Expand Down Expand Up @@ -119,3 +123,6 @@ def test_client_functions(self):

actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
assert actual_resume_result == mock_resume_result

actual_purge_result = wfClient.purge_workflow(instance_id=mockInstanceId)
assert actual_purge_result == mock_purge_result
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ include_package_data = True
zip_safe = False
install_requires =
protobuf >= 4.22
grpcio >= 1.37.0
grpcio-status>=1.37.0
grpcio >= 1.67.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@berndverst any objections on bumping the grpc library version?
This has been updated in the durable task fork and it's a dependency here.

grpcio-status>=1.67.0
aiohttp >= 3.9.0b0
python-dateutil >= 2.8.1
typing-extensions>=4.4.0
Expand Down
2 changes: 1 addition & 1 deletion tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
grpcio-tools>=1.57.0
grpcio-tools>=1.67.0
Loading