Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflows: update durabletask dependency #757

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ Flask>=1.1
# needed for auto fix
ruff===0.2.2
# needed for dapr-ext-workflow
durabletask>=0.1.1a1
durabletask-dapr >= 0.2.0a3
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def send_alert(ctx, message: str):
except Exception:
pass
if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
workflow=status_monitor_workflow,
input=JobStatus(job_id=job_id, is_healthy=True),
Expand Down
1 change: 1 addition & 0 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
# TODO update to set custom status
return [result1, result2, result3]


Expand Down
28 changes: 25 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from datetime import datetime
from typing import Any, Optional, TypeVar


from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb

from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
Expand Down Expand Up @@ -78,6 +80,7 @@ def schedule_new_workflow(
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
) -> str:
"""Schedules a new workflow instance for execution.

Expand All @@ -90,6 +93,8 @@ def schedule_new_workflow(
start_at: The time when the workflow instance should start executing.
If not specified or if a date-time in the past is specified, the workflow instance will
be scheduled immediately.
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
an existing workflow instance.

Returns:
The ID of the scheduled workflow instance.
Expand All @@ -100,9 +105,14 @@ def schedule_new_workflow(
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)
return self.__obj.schedule_new_orchestration(
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
workflow.__name__,
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)

def get_workflow_state(
Expand Down Expand Up @@ -208,7 +218,9 @@ def raise_workflow_event(
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
def terminate_workflow(
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
Expand All @@ -226,9 +238,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
Args:
instance_id: The ID of the workflow instance to terminate.
output: The optional output to set for the terminated workflow instance.
recursive: The optional flag to terminate all child workflows.

"""
return self.__obj.terminate_orchestration(instance_id, output=output)
return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive)

def pause_workflow(self, instance_id: str):
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
Expand All @@ -246,3 +259,12 @@ def resume_workflow(self, instance_id: str):
instance_id: The instance ID of the workflow to resume.
"""
return self.__obj.resume_orchestration(instance_id)

def purge_workflow(self, instance_id: str, recursive: bool = True):
"""Purge data from a workflow instance.

Args:
instance_id: The instance ID of the workflow to purge.
recursive: The optional flag to also purge data from all child workflows.
"""
return self.__obj.purge_orchestration(instance_id, recursive)
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def current_utc_datetime(self) -> datetime:
def is_replaying(self) -> bool:
return self.__obj.is_replaying

def set_custom_status(self, custom_status: str) -> None:
self._logger.debug(f'{self.instance_id}: Setting custom status to {custom_status}')
self.__obj.set_custom_status(custom_status)

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time')
return self.__obj.create_timer(fire_at)
Expand Down
5 changes: 5 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
"""
pass

@abstractmethod
def set_custom_status(self, custom_status: str) -> None:
"""Set the custom status."""
pass

Check warning on line 90 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py#L90

Added line #L90 was not covered by tests

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
"""Create a Timer Task to fire after at the specified deadline.
Expand Down
2 changes: 1 addition & 1 deletion ext/dapr-ext-workflow/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ packages = find_namespace:
include_package_data = True
install_requires =
dapr-dev >= 1.13.0rc1.dev
durabletask >= 0.1.1a1
durabletask-dapr >= 0.2.0a3

[options.packages.find]
include =
Expand Down
8 changes: 8 additions & 0 deletions ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
mock_create_timer = 'create_timer'
mock_call_activity = 'call_activity'
mock_call_sub_orchestrator = 'call_sub_orchestrator'
mock_custom_status = 'custom_status'


class FakeOrchestrationContext:
def __init__(self):
self.instance_id = mock_instance_id
self.custom_status = None

def create_timer(self, fire_at):
return mock_create_timer
Expand All @@ -40,6 +42,9 @@ def call_activity(self, activity, input):
def call_sub_orchestrator(self, orchestrator, input, instance_id):
return mock_call_sub_orchestrator

def set_custom_status(self, custom_status):
self.custom_status = custom_status


class DaprWorkflowContextTest(unittest.TestCase):
def mock_client_activity(ctx: WorkflowActivityContext, input):
Expand All @@ -65,3 +70,6 @@ def test_workflow_context_functions(self):

create_timer_result = dapr_wf_ctx.create_timer(mock_date_time)
assert create_timer_result == mock_create_timer

dapr_wf_ctx.set_custom_status(mock_custom_status)
assert fakeContext.custom_status == mock_custom_status
43 changes: 30 additions & 13 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@
from unittest import mock
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb

mock_schedule_result = 'workflow001'
mock_raise_event_result = 'event001'
mock_terminate_result = 'terminate001'
mock_suspend_result = 'suspend001'
mock_resume_result = 'resume001'
mockInstanceId = 'instance001'
mock_purge_result = 'purge001'
mock_instance_id = 'instance001'


class FakeTaskHubGrpcClient:
def schedule_new_orchestration(self, workflow, input, instance_id, start_at):
def schedule_new_orchestration(
self,
workflow,
input,
instance_id,
start_at,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None,
):
return mock_schedule_result

def get_orchestration_state(self, instance_id, fetch_payloads):
Expand All @@ -49,7 +58,9 @@ def raise_orchestration_event(
):
return mock_raise_event_result

def terminate_orchestration(self, instance_id: str, *, output: Union[Any, None] = None):
def terminate_orchestration(
self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True
):
return mock_terminate_result

def suspend_orchestration(self, instance_id: str):
Expand All @@ -58,6 +69,9 @@ def suspend_orchestration(self, instance_id: str):
def resume_orchestration(self, instance_id: str):
return mock_resume_result

def purge_orchestration(self, instance_id: str, recursive: bool = True):
return mock_purge_result

def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus):
return client.OrchestrationState(
instance_id=instance_id,
Expand Down Expand Up @@ -87,35 +101,38 @@ def test_client_functions(self):
assert actual_schedule_result == mock_schedule_result

actual_get_result = wfClient.get_workflow_state(
instance_id=mockInstanceId, fetch_payloads=True
instance_id=mock_instance_id, fetch_payloads=True
)
assert actual_get_result.runtime_status.name == 'PENDING'
assert actual_get_result.instance_id == mockInstanceId
assert actual_get_result.instance_id == mock_instance_id

actual_wait_start_result = wfClient.wait_for_workflow_start(
instance_id=mockInstanceId, timeout_in_seconds=30
instance_id=mock_instance_id, timeout_in_seconds=30
)
assert actual_wait_start_result.runtime_status.name == 'RUNNING'
assert actual_wait_start_result.instance_id == mockInstanceId
assert actual_wait_start_result.instance_id == mock_instance_id

actual_wait_completion_result = wfClient.wait_for_workflow_completion(
instance_id=mockInstanceId, timeout_in_seconds=30
instance_id=mock_instance_id, timeout_in_seconds=30
)
assert actual_wait_completion_result.runtime_status.name == 'COMPLETED'
assert actual_wait_completion_result.instance_id == mockInstanceId
assert actual_wait_completion_result.instance_id == mock_instance_id

actual_raise_event_result = wfClient.raise_workflow_event(
instance_id=mockInstanceId, event_name='test_event', data='test_data'
instance_id=mock_instance_id, event_name='test_event', data='test_data'
)
assert actual_raise_event_result == mock_raise_event_result

actual_terminate_result = wfClient.terminate_workflow(
instance_id=mockInstanceId, output='test_output'
instance_id=mock_instance_id, output='test_output'
)
assert actual_terminate_result == mock_terminate_result

actual_suspend_result = wfClient.pause_workflow(instance_id=mockInstanceId)
actual_suspend_result = wfClient.pause_workflow(instance_id=mock_instance_id)
assert actual_suspend_result == mock_suspend_result

actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
actual_resume_result = wfClient.resume_workflow(instance_id=mock_instance_id)
assert actual_resume_result == mock_resume_result

actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id)
assert actual_purge_result == mock_purge_result
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ include_package_data = True
zip_safe = False
install_requires =
protobuf >= 4.22
grpcio >= 1.37.0
grpcio-status>=1.37.0
grpcio >= 1.67.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@berndverst any objections on bumping the grpc library version?
This has been updated in the durable task fork and it's a dependency here.

grpcio-status>=1.67.0
aiohttp >= 3.9.0b0
python-dateutil >= 2.8.1
typing-extensions>=4.4.0
Expand Down
2 changes: 1 addition & 1 deletion tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
grpcio-tools>=1.57.0
grpcio-tools>=1.67.0
Loading