From a5081a9fca628e9a9621c8ae626b6d60aaf578e8 Mon Sep 17 00:00:00 2001 From: hzarka Date: Wed, 11 Mar 2020 22:38:30 +0400 Subject: [PATCH 1/6] fix error --- cadence/connection.py | 3 ++- cadence/workflow.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cadence/connection.py b/cadence/connection.py index 7afd942..2983d4a 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -1,5 +1,6 @@ from __future__ import annotations +import getpass import os import socket from dataclasses import dataclass @@ -194,7 +195,7 @@ def default_tchannel_headers(): @staticmethod def default_application_headers(): return { - "user-name": os.environ.get("LOGNAME", os.getlogin()), + "user-name": getpass.getuser(), "host-name": socket.gethostname(), # Copied from Java client "cadence-client-library-version": "2.2.0", diff --git a/cadence/workflow.py b/cadence/workflow.py index 08ad63c..ca64824 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -194,7 +194,7 @@ def exec_workflow(workflow_client, wm: WorkflowMethod, args, workflow_options: W start_request = create_start_workflow_request(workflow_client, wm, args) start_response, err = workflow_client.service.start_workflow(start_request) if err: - raise Exception(err) + raise Exception(repr(err)) execution = WorkflowExecution(workflow_id=start_request.workflow_id, run_id=start_response.run_id) stub_instance._execution = execution return WorkflowExecutionContext(workflow_type=wm._name, workflow_execution=execution) From cff1b242f7376d627f32c768c08834da3b33e401 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 8 Oct 2020 00:33:21 +0400 Subject: [PATCH 2/6] take latest changes from upstream --- README.md | 136 ++-- cadence/activity_loop.py | 12 +- cadence/activity_method.py | 23 + cadence/connection.py | 12 +- cadence/constants.py | 4 + cadence/decision_loop.py | 59 +- cadence/errors.py | 6 + cadence/exception_handling.py | 3 + cadence/ioutils.py | 30 +- cadence/marker.py | 2 +- cadence/samples/hello_activity_async.py | 2 +- cadence/tests/test_activity_method.py | 36 +- cadence/tests/test_clock_decision_context.py | 23 +- cadence/tests/test_decision_context.py | 23 +- cadence/tests/test_decision_events.py | 16 +- cadence/tests/test_decision_loop.py | 39 +- cadence/tests/test_java.py | 4 +- cadence/tests/test_marker.py | 32 +- cadence/tests/test_query_workflow.py | 6 +- cadence/tests/test_version.py | 12 +- cadence/tests/test_workflowservice.py | 13 - cadence/thrift/thrift-parser.js | 641 +++++++++++++++++++ cadence/thrift/thrift-parser.js-LICENSE | 27 + cadence/worker.py | 14 + cadence/workflow.py | 27 +- cadence/workflowservice.py | 9 +- requirements.txt | 1 - setup.py | 12 +- 28 files changed, 1091 insertions(+), 133 deletions(-) create mode 100644 cadence/thrift/thrift-parser.js create mode 100644 cadence/thrift/thrift-parser.js-LICENSE diff --git a/README.md b/README.md index 9ce6522..7c8541b 100644 --- a/README.md +++ b/README.md @@ -1,65 +1,46 @@ -# Python framework for Cadence Workflow Service +# Intro: Fault-Oblivious Stateful Python Code -[Cadence](https://github.com/uber/cadence) is a workflow engine developed at Uber Engineering. With this framework, workflows and activities managed by Cadence can be implemented in Python code. +cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off. -## Status / TODO +This programming model is useful whenever you need to ensure that a function runs to completion. For example: -cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production -version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ March 2020. +- Business logic involving multiple micro services +- CI/CD pipelines +- Data pipelines +- RPA +- ETL +- Marketing automation / Customer journeys / Customer engagement +- Zapier/IFTTT like end user automation. +- Chat bots +- Multi-step forms +- Scheduler/Cron jobs -1.0 -- [x] Tchannel implementation -- [x] Python-friendly wrapper around Cadence's Thrift API -- [x] Author activities in Python -- [x] Start workflows (synchronously) -- [x] Create workflows -- [x] Workflow execution in coroutines -- [x] Invoke activities from workflows -- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally -- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn -- [x] Activity retry -- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution() -- [x] Signals -- [x] Queries -- [x] Async workflow execution -- [x] await -- [x] now (currentTimeMillis) -- [x] Sleep -- [x] Loggers -- [x] newRandom -- [x] UUID -- [x] Workflow Versioning -- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId); +Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence) as its backend. -1.1 -- [ ] ActivityStub and Workflow.newUntypedActivityStub -- [ ] Classes as arguments and return values to/from activity and workflow methods -- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub -- [ ] Custom workflow ids through start() and new_workflow_stub() -- [ ] ContinueAsNew -- [ ] Compatibility with Java client -- [ ] Compatibility with Golang client +For more information about the fault-oblivious programming model refer to the Cadence documentation [here](https://cadenceworkflow.io/docs/03_concepts/01_workflows) -2.0 -- [ ] Sticky workflows +## Install Cadencce -Post 2.0: -- [ ] sideEffect/mutableSideEffect -- [ ] Parallel activity execution -- [ ] Timers -- [ ] Cancellation Scopes -- [ ] Child Workflows -- [ ] Explicit activity ids for activity invocations +``` +wget https://raw.githubusercontent.com/uber/cadence/master/docker/docker-compose.yml +docker-compose up +``` -## Installation +## Register `sample` domain ``` -pip install cadence-client +docker run --network=host --rm ubercadence/cli:master --do sample domain register -rd 1 ``` -## Hello World Sample +## Installation cadence-python ``` +pip install cadence-client==1.0.0b3 +``` + +## Hello World Sample + +```python import sys import logging from cadence.activity_method import activity_method @@ -82,7 +63,7 @@ class GreetingActivities: # Activities Implementation class GreetingActivitiesImpl: def compose_greeting(self, greeting: str, name: str): - return greeting + " " + name + "!" + return f"{greeting} {name}!" # Workflow Interface @@ -99,6 +80,9 @@ class GreetingWorkflowImpl(GreetingWorkflow): self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities) async def get_greeting(self, name): + # Place any Python code here that you want to ensure is executed to completion. + # Note: code in workflow functions must be deterministic so that the same code paths + # are ran during replay. return await self.greeting_activities.compose_greeting("Hello", name) @@ -118,4 +102,56 @@ if __name__ == '__main__': worker.stop() print("Workers stopped...") sys.exit(0) -``` +``` + +## Status / TODO + +cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production +version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ ~~March 2020~~ April 2020. + +1.0 +- [x] Tchannel implementation +- [x] Python-friendly wrapper around Cadence's Thrift API +- [x] Author activities in Python +- [x] Start workflows (synchronously) +- [x] Create workflows +- [x] Workflow execution in coroutines +- [x] Invoke activities from workflows +- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally +- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn +- [x] Activity retry +- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution() +- [x] Signals +- [x] Queries +- [x] Async workflow execution +- [x] await +- [x] now (currentTimeMillis) +- [x] Sleep +- [x] Loggers +- [x] newRandom +- [x] UUID +- [x] Workflow Versioning +- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId); + +1.1 +- [ ] ActivityStub and Workflow.newUntypedActivityStub +- [ ] Classes as arguments and return values to/from activity and workflow methods +- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub +- [ ] Custom workflow ids through start() and new_workflow_stub() +- [ ] ContinueAsNew +- [ ] Compatibility with Java client +- [ ] Compatibility with Golang client + +2.0 +- [ ] Sticky workflows + +Post 2.0: +- [ ] sideEffect/mutableSideEffect +- [ ] Local activity +- [ ] Parallel activity execution +- [ ] Timers +- [ ] Cancellation Scopes +- [ ] Child Workflows +- [ ] Explicit activity ids for activity invocations + + diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index e7394cd..b0cdcce 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -6,13 +6,13 @@ from cadence.cadence_types import PollForActivityTaskRequest, TaskListMetadata, TaskList, PollForActivityTaskResponse from cadence.conversions import json_to_args from cadence.workflowservice import WorkflowService -from cadence.worker import Worker +from cadence.worker import Worker, StopRequestedException logger = logging.getLogger(__name__) def activity_task_loop(worker: Worker): - service: WorkflowService = WorkflowService.create(worker.host, worker.port) + service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout()) worker.manage_service(service) logger.info(f"Activity task worker started: {WorkflowService.get_identity()}") try: @@ -20,6 +20,8 @@ def activity_task_loop(worker: Worker): if worker.is_stop_requested(): return try: + service.set_next_timeout_cb(worker.raise_if_stop_requested) + polling_start = datetime.datetime.now() polling_request = PollForActivityTaskRequest() polling_request.task_list_metadata = TaskListMetadata() @@ -32,6 +34,8 @@ def activity_task_loop(worker: Worker): task, err = service.poll_for_activity_task(polling_request) polling_end = datetime.datetime.now() logger.debug("PollForActivityTask: %dms", (polling_end - polling_start).total_seconds() * 1000) + except StopRequestedException: + return except Exception as ex: logger.error("PollForActivityTask error: %s", ex) continue @@ -75,4 +79,8 @@ def activity_task_loop(worker: Worker): process_end = datetime.datetime.now() logger.info("Process ActivityTask: %dms", (process_end - process_start).total_seconds() * 1000) finally: + try: + service.close() + except: + logger.warning("service.close() failed", exc_info=1) worker.notify_thread_stopped() diff --git a/cadence/activity_method.py b/cadence/activity_method.py index 4357e2c..ba5d813 100644 --- a/cadence/activity_method.py +++ b/cadence/activity_method.py @@ -54,6 +54,8 @@ async def stub_activity_fn(self, *args): assert self._decision_context assert stub_activity_fn._execute_parameters parameters = copy.deepcopy(stub_activity_fn._execute_parameters) + if hasattr(self, "_activity_options") and self._activity_options: + self._activity_options.fill_execute_activity_parameters(parameters) if self._retry_parameters: parameters.retry_parameters = self._retry_parameters parameters.input = args_to_json(args).encode("utf-8") @@ -80,3 +82,24 @@ async def stub_activity_fn(self, *args): raise Exception("activity_method must be called with arguments") else: return wrapper + + +@dataclass +class ActivityOptions: + schedule_to_close_timeout_seconds: int = None + schedule_to_start_timeout_seconds: int = None + start_to_close_timeout_seconds: int = None + heartbeat_timeout_seconds: int = None + task_list: str = None + + def fill_execute_activity_parameters(self, execute_parameters: ExecuteActivityParameters): + if self.schedule_to_close_timeout_seconds is not None: + execute_parameters.schedule_to_close_timeout_seconds = self.schedule_to_close_timeout_seconds + if self.schedule_to_start_timeout_seconds is not None: + execute_parameters.schedule_to_start_timeout_seconds = self.schedule_to_start_timeout_seconds + if self.start_to_close_timeout_seconds is not None: + execute_parameters.start_to_close_timeout_seconds = self.start_to_close_timeout_seconds + if self.heartbeat_timeout_seconds is not None: + execute_parameters.heartbeat_timeout_seconds = self.heartbeat_timeout_seconds + if self.task_list is not None: + execute_parameters.task_list = self.task_list diff --git a/cadence/connection.py b/cadence/connection.py index 2983d4a..973e89c 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -1,11 +1,10 @@ from __future__ import annotations import getpass -import os import socket from dataclasses import dataclass from io import BytesIO -from typing import IO, List, Union, Optional, Dict +from typing import IO, List, Union, Optional, Dict, Callable from cadence.frames import InitReqFrame, Frame, Arg, CallReqFrame, CallReqContinueFrame, CallResFrame, \ CallResContinueFrame, FrameWithArgs, CallFlags, ErrorFrame @@ -300,19 +299,23 @@ class TChannelConnection: s: socket.socket @classmethod - def open(cls, host: object, port: object) -> TChannelConnection: + def open(cls, host: object, port: object, timeout: int = None) -> TChannelConnection: s: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(timeout) s.connect((host, port)) return cls(s) def __init__(self, s: socket): self.s = s self.file = self.s.makefile("rwb") - self.wrapper = IOWrapper(self.file) + self.wrapper = IOWrapper(self.file, socket_=s) self.current_id = -1 self.handshake() + def set_next_timeout_cb(self, cb: Callable): + self.wrapper.set_next_timeout_cb(cb) + def new_id(self): self.current_id += 1 return self.current_id @@ -340,6 +343,7 @@ def read_frame(self): def close(self): self.s.close() + self.wrapper.close() def call_function(self, call: ThriftFunctionCall) -> ThriftFunctionResponse: frames = call.build_frames(self.new_id()) diff --git a/cadence/constants.py b/cadence/constants.py index 24a63ce..55ae6a1 100644 --- a/cadence/constants.py +++ b/cadence/constants.py @@ -1,3 +1,7 @@ CODE_OK = 0x00 CODE_ERROR = 0x01 + +# This should be at least 60 seconds because Cadence will reply after 60 seconds when polling +# if there is nothing pending +DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index 351496f..afdf0d5 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -4,11 +4,12 @@ import contextvars import datetime import json +import socket import uuid import random import logging import threading -from asyncio.base_futures import CancelledError +from asyncio import CancelledError from asyncio.events import AbstractEventLoop from asyncio.futures import Future from asyncio.tasks import Task @@ -26,7 +27,7 @@ HistoryEvent, EventType, WorkflowType, ScheduleActivityTaskDecisionAttributes, \ CancelWorkflowExecutionDecisionAttributes, StartTimerDecisionAttributes, TimerFiredEventAttributes, \ FailWorkflowExecutionDecisionAttributes, RecordMarkerDecisionAttributes, Header, WorkflowQuery, \ - RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse + RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse, DecisionTaskFailedCause from cadence.conversions import json_to_args, args_to_json from cadence.decisions import DecisionId, DecisionTarget from cadence.exception_handling import serialize_exception, deserialize_exception @@ -35,7 +36,7 @@ from cadence.state_machines import ActivityDecisionStateMachine, DecisionStateMachine, CompleteWorkflowStateMachine, \ TimerDecisionStateMachine, MarkerDecisionStateMachine from cadence.tchannel import TChannelException -from cadence.worker import Worker +from cadence.worker import Worker, StopRequestedException from cadence.workflow import QueryMethod from cadence.workflowservice import WorkflowService @@ -536,6 +537,7 @@ class ReplayDecider: decision_events: DecisionEvents = None decisions: OrderedDict[DecisionId, DecisionStateMachine] = field(default_factory=OrderedDict) decision_context: DecisionContext = None + workflow_id: str = None activity_id_to_scheduled_event_id: Dict[str, int] = field(default_factory=dict) @@ -581,7 +583,7 @@ def process_event(self, event: HistoryEvent): def handle_workflow_execution_started(self, event: HistoryEvent): start_event_attributes = event.workflow_execution_started_event_attributes self.decision_context.set_current_run_id(start_event_attributes.original_execution_run_id) - if start_event_attributes.input is None: + if start_event_attributes.input is None or start_event_attributes.input == b'': workflow_input = [] else: workflow_input = json_to_args(start_event_attributes.input) @@ -673,6 +675,11 @@ def handle_activity_task_failed(self, event: HistoryEvent): def handle_activity_task_timed_out(self, event: HistoryEvent): self.decision_context.handle_activity_task_timed_out(event) + def handle_decision_task_failed(self, event: HistoryEvent): + attr = event.decision_task_failed_event_attributes + if attr and attr.cause == DecisionTaskFailedCause.RESET_WORKFLOW: + self.decision_context.set_current_run_id(attr.new_run_id) + def handle_workflow_execution_signaled(self, event: HistoryEvent): signaled_event_attributes = event.workflow_execution_signaled_event_attributes signal_input = signaled_event_attributes.input @@ -811,9 +818,11 @@ def on_timer_canceled(self: ReplayDecider, event: HistoryEvent): event_handlers = { EventType.WorkflowExecutionStarted: ReplayDecider.handle_workflow_execution_started, EventType.WorkflowExecutionCancelRequested: ReplayDecider.handle_workflow_execution_cancel_requested, + EventType.WorkflowExecutionCompleted: noop, EventType.DecisionTaskScheduled: noop, EventType.DecisionTaskStarted: noop, # Filtered by HistoryHelper EventType.DecisionTaskTimedOut: noop, # TODO: check + EventType.DecisionTaskFailed: ReplayDecider.handle_decision_task_failed, EventType.ActivityTaskScheduled: ReplayDecider.handle_activity_task_scheduled, EventType.ActivityTaskStarted: ReplayDecider.handle_activity_task_started, EventType.ActivityTaskCompleted: ReplayDecider.handle_activity_task_completed, @@ -846,26 +855,30 @@ def run(self): logger.info(f"Decision task worker started: {WorkflowService.get_identity()}") event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) - self.service = WorkflowService.create(self.worker.host, self.worker.port) + self.service = WorkflowService.create(self.worker.host, self.worker.port, timeout=self.worker.get_timeout()) self.worker.manage_service(self.service) while True: - if self.worker.is_stop_requested(): + try: + if self.worker.is_stop_requested(): + return + self.service.set_next_timeout_cb(self.worker.raise_if_stop_requested) + decision_task: PollForDecisionTaskResponse = self.poll() + if not decision_task: + continue + if decision_task.query: + try: + result = self.process_query(decision_task) + self.respond_query(decision_task.task_token, result, None) + except Exception as ex: + logger.error("Error") + self.respond_query(decision_task.task_token, None, serialize_exception(ex)) + else: + decisions = self.process_task(decision_task) + self.respond_decisions(decision_task.task_token, decisions) + except StopRequestedException: return - decision_task: PollForDecisionTaskResponse = self.poll() - if not decision_task: - continue - if decision_task.query: - try: - result = self.process_query(decision_task) - self.respond_query(decision_task.task_token, result, None) - except Exception as ex: - logger.error("Error") - self.respond_query(decision_task.task_token, None, serialize_exception(ex)) - else: - decisions = self.process_task(decision_task) - self.respond_decisions(decision_task.task_token, decisions) finally: - # noinspection PyPep8,PyBroadException + # noinspection PyPep8,PyBroadException try: self.service.close() except: @@ -898,14 +911,16 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]: def process_task(self, decision_task: PollForDecisionTaskResponse) -> List[Decision]: execution_id = str(decision_task.workflow_execution) - decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker) + decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker, + workflow_id=decision_task.workflow_execution.workflow_id) decisions: List[Decision] = decider.decide(decision_task.history.events) decider.destroy() return decisions def process_query(self, decision_task: PollForDecisionTaskResponse) -> bytes: execution_id = str(decision_task.workflow_execution) - decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker) + decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker, + workflow_id=decision_task.workflow_execution.workflow_id) decider.decide(decision_task.history.events) try: result = decider.query(decision_task, decision_task.query) diff --git a/cadence/errors.py b/cadence/errors.py index f4ecf17..424698d 100644 --- a/cadence/errors.py +++ b/cadence/errors.py @@ -8,6 +8,9 @@ class BadRequestError(Exception): message: str + def __str__(self): + return self.message + @dataclass class InternalServiceError(Exception): @@ -38,6 +41,9 @@ def run_id(self): class EntityNotExistsError(Exception): message: str + def __str__(self): + return self.message + @dataclass class ServiceBusyError(Exception): diff --git a/cadence/exception_handling.py b/cadence/exception_handling.py index 9d53b0a..b4f5eb4 100644 --- a/cadence/exception_handling.py +++ b/cadence/exception_handling.py @@ -49,6 +49,9 @@ def serialize_exception(ex: Exception): def deserialize_exception(details) -> Exception: + """ + TODO: Support built-in types like Exception + """ exception: Exception = None details_dict = json.loads(details) source = details_dict.get("source") diff --git a/cadence/ioutils.py b/cadence/ioutils.py index 3c65e9b..a96eb2f 100644 --- a/cadence/ioutils.py +++ b/cadence/ioutils.py @@ -1,13 +1,30 @@ -from typing import IO +from select import select +from socket import socket +from typing import IO, Callable class IOWrapper: - io_stream: IO - - def __init__(self, io_stream: IO): + def __init__(self, io_stream: IO, socket_: socket = None): self.io_stream = io_stream + self.socket = socket_ + self.next_timeout_cb = None + + def set_next_timeout_cb(self, cb: Callable): + self.next_timeout_cb = cb def read_or_eof(self, size, field): + if self.next_timeout_cb and self.socket: + timeout = self.socket.gettimeout() + self.socket.setblocking(False) + while True: + ready_to_read, _, _ = select([self.socket], [], [], 1) + if ready_to_read: + break + else: + self.next_timeout_cb() + self.next_timeout_cb = None + self.socket.setblocking(True) + self.socket.settimeout(timeout) buf: bytes = self.io_stream.read(size) if len(buf) != size: raise EOFError(field) @@ -31,7 +48,7 @@ def read_string(self, n: int, field: str) -> str: return str(buf, "utf-8") def write_short(self, v: int): - self.io_stream.write(v.to_bytes(2, byteorder='big', signed= False)) + self.io_stream.write(v.to_bytes(2, byteorder='big', signed=False)) def write_long(self, v: int): self.io_stream.write(v.to_bytes(4, byteorder='big', signed=False)) @@ -47,3 +64,6 @@ def write_string(self, s: str): def flush(self): self.io_stream.flush() + + def close(self): + self.io_stream.close() diff --git a/cadence/marker.py b/cadence/marker.py index 1259dfa..6de6940 100644 --- a/cadence/marker.py +++ b/cadence/marker.py @@ -70,7 +70,7 @@ def get_id(self) -> str: class MarkerResult: data: bytes = None access_count: int = 0 - replayed = False + replayed: bool = False @dataclass diff --git a/cadence/samples/hello_activity_async.py b/cadence/samples/hello_activity_async.py index 6243fb0..9e0dc4a 100644 --- a/cadence/samples/hello_activity_async.py +++ b/cadence/samples/hello_activity_async.py @@ -53,7 +53,7 @@ async def get_greeting(self, name): client = WorkflowClient.new_client(domain=DOMAIN) greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow) execution = WorkflowClient.start(greeting_workflow.get_greeting, "Python") - print("Started: workflow_id={} run_id={}".format(execution.workflow_id, execution.run_id)) + print("Started: workflow_id={} run_id={}".format(execution.workflow_execution.workflow_id, execution.workflow_execution.run_id)) print("Result: " + str(client.wait_for_close(execution))) print("Stopping workers....") diff --git a/cadence/tests/test_activity_method.py b/cadence/tests/test_activity_method.py index af0a8b7..5c3c739 100644 --- a/cadence/tests/test_activity_method.py +++ b/cadence/tests/test_activity_method.py @@ -2,7 +2,7 @@ from unittest import TestCase from unittest.mock import Mock, MagicMock -from cadence.activity_method import activity_method, ExecuteActivityParameters +from cadence.activity_method import activity_method, ExecuteActivityParameters, ActivityOptions from cadence.decision_loop import DecisionContext from cadence.tests.test_decision_context import run_once @@ -80,6 +80,7 @@ def hello(self): stub = HelloActivities() stub._decision_context = self.decision_context + stub._retry_parameters = None async def fn(): await stub.hello() @@ -100,6 +101,7 @@ def hello(self, arg1): stub = HelloActivities() stub._decision_context = self.decision_context + stub._retry_parameters = None async def fn(): await stub.hello(1) @@ -120,6 +122,7 @@ def hello(self, arg1, arg2): stub = HelloActivities() stub._decision_context = self.decision_context + stub._retry_parameters = None async def fn(): await stub.hello(1, "one") @@ -131,3 +134,34 @@ async def fn(): self.decision_context.schedule_activity_task.assert_called_once() args, kwargs = self.decision_context.schedule_activity_task.call_args_list[0] self.assertEqual(b'[1, "one"]', kwargs["parameters"].input) + + def test_invoke_with_activity_options(self): + class HelloActivities: + @activity_method(task_list="test-tasklist") + def hello(self, arg1, arg2): + pass + + stub = HelloActivities() + stub._decision_context = self.decision_context + stub._retry_parameters = None + stub._activity_options = ActivityOptions(schedule_to_close_timeout_seconds=50, + schedule_to_start_timeout_seconds=100, + start_to_close_timeout_seconds=150, + heartbeat_timeout_seconds=200, + task_list="tasklist-tasklist-tasklist") + + async def fn(): + await stub.hello(1, "one") + + loop = get_event_loop() + self.task = loop.create_task(fn()) + run_once(loop) + + self.decision_context.schedule_activity_task.assert_called_once() + args, kwargs = self.decision_context.schedule_activity_task.call_args_list[0] + parameters: ExecuteActivityParameters = kwargs["parameters"] + self.assertEquals(parameters.schedule_to_close_timeout_seconds, 50) + self.assertEquals(parameters.schedule_to_start_timeout_seconds, 100) + self.assertEquals(parameters.start_to_close_timeout_seconds, 150) + self.assertEquals(parameters.heartbeat_timeout_seconds, 200) + self.assertEquals(parameters.task_list, "tasklist-tasklist-tasklist") diff --git a/cadence/tests/test_clock_decision_context.py b/cadence/tests/test_clock_decision_context.py index dda19c9..2ebe24c 100644 --- a/cadence/tests/test_clock_decision_context.py +++ b/cadence/tests/test_clock_decision_context.py @@ -1,12 +1,14 @@ +import json from typing import Callable from unittest.mock import MagicMock, Mock import pytest from cadence.cadence_types import StartTimerDecisionAttributes, TimerFiredEventAttributes, HistoryEvent, \ - TimerCanceledEventAttributes -from cadence.clock_decision_context import ClockDecisionContext, TimerCancellationHandler + TimerCanceledEventAttributes, EventType, MarkerRecordedEventAttributes, Header +from cadence.clock_decision_context import ClockDecisionContext, TimerCancellationHandler, VERSION_MARKER_NAME from cadence.exceptions import CancellationException +from cadence.marker import MUTABLE_MARKER_HEADER_KEY from cadence.util import OpenRequestInfo TIMER_ID = 20 @@ -26,7 +28,7 @@ def decider(): @pytest.fixture def clock_decision_context(decider): - context = ClockDecisionContext(decider=decider) + context = ClockDecisionContext(decider=decider, decision_context=MagicMock()) context.set_replay_current_time_milliseconds(REPLAY_CURRENT_TIME_MS) return context @@ -146,3 +148,18 @@ def test_handle_timer_canceled(clock_decision_context, decider, request_info): assert args[0] is None assert isinstance(args[1], Exception) + +def test_handle_marker_recorded_version(clock_decision_context): + event = HistoryEvent(event_type=EventType.MarkerRecorded) + event.marker_recorded_event_attributes = MarkerRecordedEventAttributes() + event.marker_recorded_event_attributes.details = "Blahh" + event.marker_recorded_event_attributes.marker_name = VERSION_MARKER_NAME + event.marker_recorded_event_attributes.header = Header() + event.marker_recorded_event_attributes.header.fields[MUTABLE_MARKER_HEADER_KEY] = bytes(json.dumps({ + "id": "the-id", + "eventId": 20, + "accessCount": 0 + }), "utf-8") + clock_decision_context.handle_marker_recorded(event) + assert "the-id" in clock_decision_context.version_handler.mutable_marker_results + assert clock_decision_context.version_handler.mutable_marker_results["the-id"].data == "Blahh" diff --git a/cadence/tests/test_decision_context.py b/cadence/tests/test_decision_context.py index 7a3d6b0..fda0c09 100644 --- a/cadence/tests/test_decision_context.py +++ b/cadence/tests/test_decision_context.py @@ -10,8 +10,9 @@ ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, ActivityTaskTimedOutEventAttributes, \ TimeoutType from cadence.decision_loop import DecisionContext, ReplayDecider +from cadence.exception_handling import ExternalException, serialize_exception from cadence.exceptions import NonDeterministicWorkflowException, ActivityTaskFailedException, \ - ActivityTaskTimeoutException + ActivityTaskTimeoutException, ActivityFailureException def run_once(loop): @@ -87,18 +88,23 @@ def test_raise_exception(self): self.assertFalse(self.task.done()) future = self.context.scheduled_activities[20] - exception = Exception("thrown by activity") + exception = DummyUserLevelException("thrown by activity") future.set_exception(exception) run_once(self.event_loop) self.assertTrue(self.task.done()) raised_exception = self.task.exception() - self.assertEqual(exception, raised_exception) + self.assertIsInstance(raised_exception, ActivityFailureException) + self.assertEqual(repr(exception), repr(raised_exception.get_cause())) def tearDown(self) -> None: self.task.cancel() +class DummyUserLevelException(Exception): + pass + + class TestHandleActivityTaskEvents(TestCase): def setUp(self) -> None: self.decider: ReplayDecider = Mock() @@ -135,13 +141,16 @@ def test_activity_task_failed(self): attr.scheduled_event_id = 20 event.activity_task_failed_event_attributes = attr attr.reason = "the-reason" - attr.details = bytes("details", "utf-8") + ex = None + try: + raise DummyUserLevelException("abc") + except Exception as e: + ex = e + attr.details = serialize_exception(ex) self.context.handle_activity_task_failed(event) self.assertTrue(self.future.done()) exception = self.future.exception() - self.assertIsInstance(exception, ActivityTaskFailedException) - self.assertEqual(attr.reason, exception.reason) - self.assertEqual(attr.details, exception.details) + self.assertIsInstance(exception, DummyUserLevelException) self.assertEqual(0, len(self.context.scheduled_activities)) def test_activity_task_timed_out(self): diff --git a/cadence/tests/test_decision_events.py b/cadence/tests/test_decision_events.py index 81f888b..652dba9 100644 --- a/cadence/tests/test_decision_events.py +++ b/cadence/tests/test_decision_events.py @@ -1,11 +1,12 @@ import pytest +from cadence.cadence_types import HistoryEvent, EventType from cadence.decision_loop import DecisionEvents @pytest.fixture() def event_object(): - return object() + return HistoryEvent() @pytest.fixture() @@ -31,3 +32,16 @@ def test_get_optional_event_negative(decision_events): def test_get_optional_event_too_large(decision_events): e = decision_events.get_optional_decision_event(25) assert e is None + + +def test_markers(): + marker = HistoryEvent(event_type=EventType.MarkerRecorded) + events = [ + HistoryEvent(event_type=EventType.WorkflowExecutionStarted), + HistoryEvent(event_type=EventType.ActivityTaskScheduled), + HistoryEvent(event_type=EventType.ActivityTaskCanceled), + marker + ] + decision_events = DecisionEvents(events=[], decision_events=events, replay=True, replay_current_time_milliseconds=0, next_decision_event_id=10) + assert len(decision_events.markers) == 1 + assert id(decision_events.markers[0]) == id(marker) diff --git a/cadence/tests/test_decision_loop.py b/cadence/tests/test_decision_loop.py index 56d4a24..255225c 100644 --- a/cadence/tests/test_decision_loop.py +++ b/cadence/tests/test_decision_loop.py @@ -6,7 +6,9 @@ from cadence.cadence_types import HistoryEvent, EventType, PollForDecisionTaskResponse, \ ScheduleActivityTaskDecisionAttributes, WorkflowExecutionStartedEventAttributes, Decision, \ - ActivityTaskStartedEventAttributes + ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes, DecisionTaskFailedEventAttributes, \ + DecisionTaskFailedCause +from cadence.clock_decision_context import VERSION_MARKER_NAME from cadence.decision_loop import HistoryHelper, is_decision_event, DecisionTaskLoop, ReplayDecider, DecisionEvents, \ nano_to_milli from cadence.decisions import DecisionId, DecisionTarget @@ -236,6 +238,7 @@ def setUp(self) -> None: worker.get_workflow_method = MagicMock(return_value=(DummyWorkflow, lambda *args: None)) self.decider = ReplayDecider(execution_id="", workflow_type=Mock(), worker=worker) self.decider.event_loop = Mock() + self.decider.process_event = Mock() def test_first_decision_next_decision_id(self): self.decider.process_decision_events(self.decision_events) @@ -309,9 +312,31 @@ def test_process_decision_events_notifies_when_replay(self): replay_current_time_milliseconds=0, next_decision_event_id=5) self.decider.notify_decision_sent = MagicMock() + self.decider.process_event = Mock() self.decider.process_decision_events(decision_events) self.decider.notify_decision_sent.assert_called_once() + def test_process_decision_events_markers_first(self): + self.decider.event_loop = Mock() + marker_event = HistoryEvent(event_type=EventType.MarkerRecorded) + marker_event.marker_recorded_event_attributes = MarkerRecordedEventAttributes() + marker_event.marker_recorded_event_attributes.marker_name = VERSION_MARKER_NAME + events = [ + HistoryEvent(event_type=EventType.WorkflowExecutionStarted, + workflow_execution_started_event_attributes=WorkflowExecutionStartedEventAttributes()), + HistoryEvent(event_type=EventType.DecisionTaskScheduled), + marker_event + ] + decision_events = DecisionEvents([], events, replay=True, + replay_current_time_milliseconds=0, + next_decision_event_id=5) + self.decider.process_event = Mock() + self.decider.process_decision_events(decision_events) + self.decider.process_event.assert_called() + assert len(self.decider.process_event.call_args_list ) == 4 + args, kwargs = self.decider.process_event.call_args_list[0] + assert id(args[0]) == id(marker_event) + def test_activity_task_closed(self): state_machine: DecisionStateMachine = Mock() state_machine.is_done = MagicMock(return_value=True) @@ -341,6 +366,18 @@ def test_handle_activity_task_started(self): args, kwargs = state_machine.handle_started_event.call_args_list[0] self.assertIn(event, args) + def test_handle_decision_task_failed(self): + event = HistoryEvent(event_id=15) + event.event_type = EventType.DecisionTaskFailed + event.decision_task_failed_event_attributes = DecisionTaskFailedEventAttributes() + event.decision_task_failed_event_attributes.cause = DecisionTaskFailedCause.RESET_WORKFLOW + event.decision_task_failed_event_attributes.new_run_id = "the-new-run-id" + self.decider.decision_context = decision_context = MagicMock() + self.decider.handle_decision_task_failed(event) + decision_context.set_current_run_id.assert_called() + args, kwargs = decision_context.set_current_run_id.call_args_list[0] + assert args[0] == "the-new-run-id" + def tearDown(self) -> None: self.decider.destroy() diff --git a/cadence/tests/test_java.py b/cadence/tests/test_java.py index 62cd469..fa014d7 100644 --- a/cadence/tests/test_java.py +++ b/cadence/tests/test_java.py @@ -1,3 +1,5 @@ +# TODO: Re-enable once interoperability is a design goal again +""" from unittest import TestCase import logging.config from uuid import uuid4 @@ -39,4 +41,4 @@ def test(self): greeting = stub.getGreeting("World") self.assertEqual("Hello World", greeting) - +""" diff --git a/cadence/tests/test_marker.py b/cadence/tests/test_marker.py index ee17fc8..9a8f304 100644 --- a/cadence/tests/test_marker.py +++ b/cadence/tests/test_marker.py @@ -5,6 +5,7 @@ from cadence.cadence_types import WorkflowType, DecisionType, MarkerRecordedEventAttributes, Header, HistoryEvent, \ EventType +from cadence.clock_decision_context import VERSION_MARKER_NAME from cadence.decision_loop import ReplayDecider, DecisionContext, DecisionEvents from cadence.decisions import DecisionId, DecisionTarget from cadence.marker import MarkerData, MUTABLE_MARKER_HEADER_KEY, MarkerHandler, MarkerInterface, PlainMarkerData, \ @@ -144,17 +145,28 @@ def test_get_marker_data_lower_access_count(marker_recorded_event, decision_cont assert data is None -def test_handle_replaying_get_from_history(decision_context): +def test_handle_replaying_get_from_history_before_replay(decision_context): def callback(stored): raise Exception("Should not be executed") handler = MarkerHandler(decision_context=decision_context, marker_name="the-marker-name") - handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35) + handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35, replayed=True) ret = handler.handle("the-id", callback) assert ret == b'123' assert len(decision_context.decider.decisions) == 0 +def test_handle_replaying_get_from_history_after_replay(decision_context): + def callback(stored): + raise Exception("Should not be executed") + + handler = MarkerHandler(decision_context=decision_context, marker_name="the-marker-name") + handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35, replayed=False) + ret = handler.handle("the-id", callback) + assert ret == b'123' + assert len(decision_context.decider.decisions) == 1 + + def test_handle_replaying_no_history(decision_context): def callback(stored): raise Exception("Should not be executed") @@ -164,7 +176,7 @@ def callback(stored): handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35) ret = handler.handle("the-id", callback) assert ret == b'123' - assert len(decision_context.decider.decisions) == 0 + assert len(decision_context.decider.decisions) == 1 def test_handle_not_replaying_callback_returns_not_none(decision_context): @@ -189,3 +201,17 @@ def callback(stored): ret = handler.handle("the-id", callback) assert ret == b'123' assert len(decision_context.decider.decisions) == 0 + + +def test_marker_handler_set_data(): + handler = MarkerHandler(decision_context=Mock(), marker_name=VERSION_MARKER_NAME) + handler.set_data("abc", b"stuff") + assert "abc" in handler.mutable_marker_results + assert handler.mutable_marker_results["abc"].data == b"stuff" + + +def test_marker_handler_mark_replayed(): + handler = MarkerHandler(decision_context=Mock(), marker_name=VERSION_MARKER_NAME) + handler.set_data("abc", b"stuff") + handler.mark_replayed("abc") + assert handler.mutable_marker_results["abc"].replayed diff --git a/cadence/tests/test_query_workflow.py b/cadence/tests/test_query_workflow.py index 180989f..944568b 100644 --- a/cadence/tests/test_query_workflow.py +++ b/cadence/tests/test_query_workflow.py @@ -60,7 +60,7 @@ def test_query_workflow(): client = WorkflowClient.new_client(domain=DOMAIN) workflow: TestQueryWorkflow = client.new_workflow_stub(TestQueryWorkflow) - WorkflowClient.start(workflow.get_greetings) + workflow_ec = WorkflowClient.start(workflow.get_greetings) assert workflow.get_message() == "initial-message" workflow.put_message("second-message") @@ -73,5 +73,9 @@ def test_query_workflow(): workflow.put_message("done") + client.wait_for_close(workflow_ec) + + assert workflow.get_message() == "done" + print("Stopping workers") worker.stop() diff --git a/cadence/tests/test_version.py b/cadence/tests/test_version.py index 1693a13..c09a092 100644 --- a/cadence/tests/test_version.py +++ b/cadence/tests/test_version.py @@ -50,10 +50,12 @@ def test_clock_decision_context_get_version(decision_context): def test_clock_decision_context_get_version_stored(decision_context): + # is_replaying=True decision_context.workflow_clock.version_handler.mutable_marker_results["abc"] = MarkerResult(data="3") version = decision_context.workflow_clock.get_version("abc", 1, 5) assert version == 3 - assert len(decision_context.decider.decisions) == 0 + # decision needs to be emitted during replay to make things deterministic + assert len(decision_context.decider.decisions) == 1 @pytest.fixture() @@ -86,10 +88,10 @@ def version_decision_context(version_marker_recorded_event): def test_clock_decision_context_from_replay(version_decision_context): - version_decision_context.workflow_clock.set_replaying(True) - version = version_decision_context.workflow_clock.get_version("abc", 1, 5) - assert version == -1 - assert len(version_decision_context.decider.decisions) == 0 + with pytest.raises(Exception) as exc_info: + version_decision_context.workflow_clock.set_replaying(True) + version = version_decision_context.workflow_clock.get_version("abc", 1, 5) + assert "Version -1 of changeID abc is not supported. Supported version is between 1 and 5" in str(exc_info.value) def test_validate_version(version_decision_context): diff --git a/cadence/tests/test_workflowservice.py b/cadence/tests/test_workflowservice.py index d957221..d0b2bfd 100644 --- a/cadence/tests/test_workflowservice.py +++ b/cadence/tests/test_workflowservice.py @@ -341,19 +341,6 @@ def test_reset_sticky_task_list(self): self.assertIsNone(err) self.assertIsNotNone(response) - def test_query_workflow_timeout(self): - start_response, _ = self.service.start_workflow(self.request) - request = QueryWorkflowRequest() - request.domain = "test-domain" - request.execution = WorkflowExecution() - request.execution.workflow_id = self.request.workflow_id - request.execution.run_id = start_response.run_id - request.query = WorkflowQuery() - request.query.query_type = "getDummy" - request.query.query_args = None - with self.assertRaisesRegex(TChannelException, "timeout") as context: - self.service.query_workflow(request) - def test_describe_workflow_execution(self): start_response, _ = self.service.start_workflow(self.request) request = DescribeWorkflowExecutionRequest() diff --git a/cadence/thrift/thrift-parser.js b/cadence/thrift/thrift-parser.js new file mode 100644 index 0000000..471c791 --- /dev/null +++ b/cadence/thrift/thrift-parser.js @@ -0,0 +1,641 @@ +class ThriftFileParsingError extends SyntaxError { + constructor({ message, context, line }) { + super(message); + this.context = context; + this.line = line; + this.name = 'THRIFT_FILE_PARSING_ERROR'; + } +} + +module.exports = (source, offset = 0) => { + + source += ''; + + let nCount = 0; + let rCount = 0; + + let stack = []; + + const record = char => { + if (char === '\r') rCount++; + else if (char === '\n') nCount++; + }; + const save = () => stack.push({ offset, nCount, rCount }); + const restore = () => ({ offset, nCount, rCount } = stack[stack.length - 1]); + const drop = () => stack.pop(); + + const readAnyOne = (...args) => { + save(); + for (let i = 0; i < args.length; i++) { + try { + let result = args[i](); + drop(); + return result; + } catch (ignore) { + restore(); + continue; + } + } + drop(); + throw 'Unexcepted Token'; + }; + + const readUntilThrow = (transaction, key) => { + let receiver = key ? {} : []; + for (;;) { + try { + save(); + let result = transaction(); + key ? receiver[result[key]] = result : receiver.push(result); + } catch (ignore) { + restore(); + return receiver; + } finally { + drop(); + } + } + }; + + const readKeyword = word => { + for (let i = 0; i < word.length; i++) { + if (source[offset + i] !== word[i]) { + throw 'Unexpected token "' + word + '"'; + } + } + offset += word.length; + readSpace(); + return word; + }; + + const readChar = (char) => { + if (source[offset] !== char) throw 'Unexpected char "' + char + '"'; + offset++; + readSpace(); + return char; + }; + + const readNoop = () => {}; + + const readCommentMultiple = () => { + let i = 0; + if (source[offset + i++] !== '/' || source[offset + i++] !== '*') return false; + do { + record(source[offset + i]); + while (offset + i < source.length && source[offset + i++] !== '*') { + record(source[offset + i]); + } + } while (offset + i < source.length && source[offset + i] !== '/'); + offset += i + 1; + return true; + }; + + const readCommentSharp = () => { + let i = 0; + if (source[offset + i++] !== '#') return false; + while (source[offset + i] !== '\n' && source[offset + i] !== '\r') offset++; + offset += i; + return true; + }; + + const readCommentDoubleSlash = () => { + let i = 0; + if (source[offset + i++] !== '/' || source[offset + i++] !== '/') return false; + while (source[offset + i] !== '\n' && source[offset + i] !== '\r') offset++; + offset += i; + return true; + }; + + const readSpace = () => { + for (;;) { + let byte = source[offset]; + record(byte); + if (byte === '\n' || byte === '\r' || byte === ' ' || byte === '\t') { + offset++; + } else { + if (!readCommentMultiple() && !readCommentSharp() && !readCommentDoubleSlash()) return; + } + } + }; + + const readComma = () => { + if (source[offset] === ',' || source[offset] === ';') { + offset++; + readSpace(); + return ','; + } + }; + + const readTypedef = () => { + let subject = readKeyword('typedef'); + let type = readType(); + let name = readName(); + readComma(); + return {subject, type, name}; + }; + + const readType = () => readAnyOne(readTypeMap, readTypeList, readTypeNormal); + + const readTypeMap = () => { + let name = readKeyword('map'); + readChar('<'); + let keyType = readType(); + readComma(); + let valueType = readType(); + readChar('>'); + return {name, keyType, valueType}; + }; + + const readTypeList = () => { + let name = readAnyOne(() => readKeyword('list'), () => readKeyword('set')); + readChar('<'); + let valueType = readType(); + readChar('>'); + return {name, valueType}; + }; + + const readTypeNormal = () => readName(); + + const readName = () => { + let i = 0; + let byte = source[offset]; + while ( + (byte >= 'a' && byte <= 'z') || + byte === '.' || + byte === '_' || + (byte >= 'A' && byte <= 'Z') || + (byte >= '0' && byte <= '9') + ) byte = source[offset + ++i]; + if (i === 0) throw 'Unexpected token on readName'; + let value = source.slice(offset, offset += i); + readSpace(); + return value; + }; + + const readScope = () => { + let i = 0; + let byte = source[offset]; + while ( + (byte >= 'a' && byte <= 'z') || + byte === '_' || + (byte >= 'A' && byte <= 'Z') || + (byte >= '0' && byte <= '9') || + (byte === '*') || + (byte === '.') + ) byte = source[offset + ++i]; + if (i === 0) throw 'Unexpected token on readScope'; + let value = source.slice(offset, offset += i); + readSpace(); + return value; + }; + + const readNumberSign = () => { + let result; + if (source[offset] === '+' || source[offset] === '-') { + result = source[offset]; + offset++; + } + return result; + }; + + const readIntegerValue = () => { + let result = []; + let sign = readNumberSign(); + if (sign !== void 0) result.push(sign); + + for (; ;) { + let byte = source[offset]; + if ((byte >= '0' && byte <= '9')) { + offset++; + result.push(byte); + } else if ( + byte === 'E' || byte === 'e' || + byte === 'X' || byte === 'x' || + byte === '.' + ) { + throw `Unexpected token ${byte} for integer value`; + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readDecimalValue = () => { + let result = []; + let sign = readNumberSign(); + if (sign !== void 0) result.push(sign); + + for (;;) { + let byte = source[offset]; + if ((byte >= '0' && byte <= '9') || byte === '.') { + offset++; + result.push(byte); + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readEnotationValue = () => { + let result = []; + if (source[offset] === '-') { + result.push(source[offset]); + offset++; + } + + for (;;) { + let byte = source[offset]; + if ((byte >= '0' && byte <= '9') || byte === '.') { + result.push(byte); + offset++; + } else { + break; + } + } + + if (source[offset] !== 'e' && source[offset] !== 'E') throw 'Unexpected token'; + result.push(source[offset]); + offset++; + + for (;;) { + let byte = source[offset]; + if (byte >= '0' && byte <= '9') { + offset++; + result.push(byte); + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readHexadecimalValue = () => { + let result = []; + if (source[offset] === '-') { + result.push(source[offset]); + offset++; + } + + if (source[offset] !== '0') throw 'Unexpected token'; + result.push(source[offset]); + offset++; + + if (source[offset] !== 'x' && source[offset] !== 'X') throw 'Unexpected token'; + result.push(source[offset]); + offset++; + + for (;;) { + let byte = source[offset]; + if ( + (byte >= '0' && byte <= '9') || + (byte >= 'A' && byte <= 'F') || + (byte >= 'a' && byte <= 'f') + ) { + offset++; + result.push(byte); + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readBooleanValue = () => JSON.parse(readAnyOne(() => readKeyword('true'), () => readKeyword('false'))); + + const readRefValue = () => { + let list = [readName()]; + readUntilThrow(() => { + readChar('.'); + list.push(readName()); + }); + return {'=': list}; + }; + + const readStringValue = () => { + let receiver = []; + let start; + while (source[offset] != null) { + let byte = source[offset++]; + if (receiver.length) { + if (byte === start) { + receiver.push(byte); + readSpace(); + return receiver.slice(1, -1).join(''); + } else if (byte === '\\') { + receiver.push(byte); + offset++; + receiver.push(source[offset++]); + } else { + receiver.push(byte); + } + } else { + if (byte === '"' || byte === '\'') { + start = byte; + receiver.push(byte); + } else { + throw 'Unexpected token ILLEGAL'; + } + } + } + throw 'Unterminated string value'; + }; + + const readListValue = () => { + readChar('['); + let list = readUntilThrow(() => { + let value = readValue(); + readComma(); + return value; + }); + readChar(']'); + return list; + }; + + const readMapValue = () => { + readChar('{'); + let list = readUntilThrow(() => { + let key = readValue(); + readChar(':'); + let value = readValue(); + readComma(); + return {key, value}; + }); + readChar('}'); + return list; + }; + + const readValue = () => readAnyOne( + readHexadecimalValue, // This coming before readNumberValue is important, unfortunately + readEnotationValue, // This also needs to come before readNumberValue + readDecimalValue, + readIntegerValue, + readStringValue, + readBooleanValue, + readListValue, + readMapValue, + readRefValue + ); + + const readConst = () => { + let subject = readKeyword('const'); + let type = readType(); + let name = readName(); + readChar('='); + let value = readValue(); + readComma(); + return {subject, type, name, value}; + }; + + const readEnum = () => { + let subject = readKeyword('enum'); + let name = readName(); + let items = readEnumBlock(); + return {subject, name, items}; + }; + + const readEnumBlock = () => { + readChar('{'); + let receiver = readUntilThrow(readEnumItem); + readChar('}'); + return receiver; + }; + + const readEnumItem = () => { + let name = readName(); + let value = readEnumValue(); + readComma(); + let result = {name}; + if (value !== void 0) result.value = value; + return result; + }; + + const readEnumValue = () => { + let beginning = offset; + try { + readChar('='); + } catch (ignore) { + offset = beginning; + return; + } + return readAnyOne(readHexadecimalValue, readIntegerValue); + }; + + const readAssign = () => { + try { + save(); + readChar('='); + return readValue(); + } catch (ignore) { + restore(); + } finally { + drop(); + } + }; + + const readStruct = () => { + let subject = readKeyword('struct'); + let name = readName(); + let items = readStructLikeBlock(); + return {subject, name, items}; + }; + + const readStructLikeBlock = () => { + readChar('{'); + let receiver = readUntilThrow(readStructLikeItem); + readChar('}'); + return receiver; + }; + + const readStructLikeItem = () => { + let id; + try { + id = readAnyOne(readHexadecimalValue, readIntegerValue); + readChar(':'); + } catch (err) { + + } + + let option = readAnyOne(() => readKeyword('required'), () => readKeyword('optional'), readNoop); + let type = readType(); + let name = readName(); + let defaultValue = readAssign(); + readComma(); + let result = {type, name}; + if (id !== void 0) result.id = id; + if (option !== void 0) result.option = option; + if (defaultValue !== void 0) result.defaultValue = defaultValue; + return result; + }; + + const readUnion = () => { + let subject = readKeyword('union'); + let name = readName(); + let items = readStructLikeBlock(); + return {subject, name, items}; + }; + + const readException = () => { + let subject = readKeyword('exception'); + let name = readName(); + let items = readStructLikeBlock(); + return {subject, name, items}; + }; + + const readExtends = () => { + try { + save(); + readKeyword('extends'); + let name = readRefValue()['='].join('.'); + return name; + } catch (ignore) { + restore(); + return; + } finally { + drop(); + } + }; + + const readService = () => { + let subject = readKeyword('service'); + let name = readName(); + let extend = readExtends(); // extends is a reserved keyword + let functions = readServiceBlock(); + let result = {subject, name}; + if (extend !== void 0) result.extends = extend; + if (functions !== void 0) result.functions = functions; + return result; + }; + + const readNamespace = () => { + let subject = readKeyword('namespace'); + let name = readScope(); + let serviceName = readRefValue()['='].join('.'); + return {subject, name, serviceName}; + }; + + const readInclude = () => { + let subject = readKeyword('include'); + readSpace(); + let includePath = readQuotation(); + let name = includePath.replace(/^.*?([^/\\]*?)(?:\.thrift)?$/, '$1'); + readSpace(); + return {subject, name, path: includePath}; + }; + + const readQuotation = () => { + let quoteMatch; + if (source[offset] === '"' || source[offset] === '\'') { + quoteMatch = source[offset]; + offset++; + } else { + throw 'include error'; + } + let i = offset; + // Read until it finds a matching quote or end-of-file + while (source[i] !== quoteMatch && source[i] != null) { + i++; + } + if (source[i] === quoteMatch) { + let value = source.slice(offset, i); + offset = i + 1; + return value; + } else { + throw 'include error'; + } + }; + + const readServiceBlock = () => { + readChar('{'); + let receiver = readUntilThrow(readServiceItem, 'name'); + readChar('}'); + return receiver; + }; + + const readOneway = () => readKeyword('oneway'); + + const readServiceItem = () => { + let oneway = !!readAnyOne(readOneway, readNoop); + let type = readType(); + let name = readName(); + let args = readServiceArgs(); + let throws = readServiceThrow(); + readComma(); + return {type, name, args, throws, oneway}; + }; + + const readServiceArgs = () => { + readChar('('); + let receiver = readUntilThrow(readStructLikeItem); + readChar(')'); + readSpace(); + return receiver; + }; + + const readServiceThrow = () => { + try { + save(); + readKeyword('throws'); + return readServiceArgs(); + } catch (ignore) { + restore(); + return []; + } finally { + drop(); + } + }; + + const readSubject = () => { + return readAnyOne(readTypedef, readConst, readEnum, readStruct, readUnion, readException, readService, readNamespace, readInclude); + }; + + const readThrift = () => { + readSpace(); + let storage = {}; + for (;;) { + try { + let block = readSubject(); + let {subject, name} = block; + if (!storage[subject]) storage[subject] = {}; + delete block.subject; + delete block.name; + switch (subject) { + case 'exception': + case 'struct': + case 'union': + storage[subject][name] = block.items; + break; + default: + storage[subject][name] = block; + } + } catch (message) { + let context = source.slice(offset, offset + 50); + let line = Math.max(nCount, rCount) + 1; + console.log("message=" + message + " context=" + context + " line=" + line); + throw new ThriftFileParsingError({ message, context, line }); + } finally { + if (source.length === offset) break; + } + } + return storage; + }; + + return readThrift(); + +}; diff --git a/cadence/thrift/thrift-parser.js-LICENSE b/cadence/thrift/thrift-parser.js-LICENSE new file mode 100644 index 0000000..eaa1b1c --- /dev/null +++ b/cadence/thrift/thrift-parser.js-LICENSE @@ -0,0 +1,27 @@ +I took thrift-parser.js from here: + +https://github.com/eleme/thrift-parser + +The license for which is: + +The MIT License (MIT) + +Copyright (c) <2016-2017> + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/cadence/worker.py b/cadence/worker.py index 31f4caf..75ea9e1 100644 --- a/cadence/worker.py +++ b/cadence/worker.py @@ -5,6 +5,7 @@ import logging import time +from cadence.constants import DEFAULT_SOCKET_TIMEOUT_SECONDS from cadence.conversions import camel_to_snake, snake_to_camel from cadence.workflow import WorkflowMethod, SignalMethod, QueryMethod from cadence.workflowservice import WorkflowService @@ -69,6 +70,7 @@ class Worker: threads_stopped: int = 0 stop_requested: bool = False service_instances: List[WorkflowService] = field(default_factory=list) + timeout: int = DEFAULT_SOCKET_TIMEOUT_SECONDS def register_activities_implementation(self, activities_instance: object, activities_cls_name: str = None): cls_name = activities_cls_name if activities_cls_name else type(activities_instance).__name__ @@ -149,4 +151,16 @@ def get_workflow_method(self, workflow_type_name: str) -> Tuple[type, Callable]: def manage_service(self, service: WorkflowService): self.service_instances.append(service) + def set_timeout(self, timeout): + self.timeout = timeout + def get_timeout(self): + return self.timeout + + def raise_if_stop_requested(self): + if self.is_stop_requested(): + raise StopRequestedException() + + +class StopRequestedException(Exception): + pass diff --git a/cadence/workflow.py b/cadence/workflow.py index ca64824..be570a8 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -8,14 +8,14 @@ from typing import Callable, List, Type, Dict, Tuple from uuid import uuid4 -from six import reraise from cadence.activity import ActivityCompletionClient -from cadence.activity_method import RetryParameters +from cadence.activity_method import RetryParameters, ActivityOptions from cadence.cadence_types import WorkflowIdReusePolicy, StartWorkflowExecutionRequest, TaskList, WorkflowType, \ GetWorkflowExecutionHistoryRequest, WorkflowExecution, HistoryEventFilterType, EventType, HistoryEvent, \ StartWorkflowExecutionResponse, SignalWorkflowExecutionRequest, QueryWorkflowRequest, WorkflowQuery, \ QueryWorkflowResponse +from cadence.constants import DEFAULT_SOCKET_TIMEOUT_SECONDS from cadence.conversions import args_to_json, json_to_args from cadence.errors import QueryFailedError from cadence.exception_handling import deserialize_exception @@ -27,13 +27,14 @@ class Workflow: @staticmethod - def new_activity_stub(activities_cls, retry_parameters: RetryParameters = None): + def new_activity_stub(activities_cls, retry_parameters: RetryParameters = None, activity_options: ActivityOptions = None): from cadence.decision_loop import ITask task: ITask = ITask.current() assert task cls = activities_cls() cls._decision_context = task.decider.decision_context cls._retry_parameters = retry_parameters + cls._activity_options = activity_options return cls @staticmethod @@ -88,6 +89,19 @@ def get_logger(name): task: ITask = ITask.current() return task.decider.decision_context.get_logger(name) + @staticmethod + def get_workflow_id(): + from cadence.decision_loop import ITask + task: ITask = ITask.current() + return task.decider.workflow_id + + @staticmethod + def get_execution_id(): + from cadence.decision_loop import ITask + task: ITask = ITask.current() + return task.decider.execution_id + + class WorkflowStub: pass @@ -107,8 +121,8 @@ class WorkflowClient: @classmethod def new_client(cls, host: str = "localhost", port: int = 7933, domain: str = "", - options: WorkflowClientOptions = None) -> WorkflowClient: - service = WorkflowService.create(host, port) + options: WorkflowClientOptions = None, timeout: int = DEFAULT_SOCKET_TIMEOUT_SECONDS) -> WorkflowClient: + service = WorkflowService.create(host, port, timeout=timeout) return cls(service=service, domain=domain, options=options) @classmethod @@ -194,7 +208,7 @@ def exec_workflow(workflow_client, wm: WorkflowMethod, args, workflow_options: W start_request = create_start_workflow_request(workflow_client, wm, args) start_response, err = workflow_client.service.start_workflow(start_request) if err: - raise Exception(repr(err)) + raise Exception(err) execution = WorkflowExecution(workflow_id=start_request.workflow_id, run_id=start_response.run_id) stub_instance._execution = execution return WorkflowExecutionContext(workflow_type=wm._name, workflow_execution=execution) @@ -427,3 +441,4 @@ class WorkflowExecutionTerminatedException(Exception): def __str__(self) -> str: return self.reason + diff --git a/cadence/workflowservice.py b/cadence/workflowservice.py index c094437..54759d4 100644 --- a/cadence/workflowservice.py +++ b/cadence/workflowservice.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Tuple +from typing import Tuple, Callable from uuid import uuid4 import os @@ -31,8 +31,8 @@ class WorkflowService: @classmethod - def create(cls, host: str, port: int): - connection = TChannelConnection.open(host, port) + def create(cls, host: str, port: int, timeout: int = None): + connection = TChannelConnection.open(host, port, timeout=timeout) return cls(connection) @classmethod @@ -170,3 +170,6 @@ def describe_task_list(self, request) -> Tuple[DescribeTaskListResponse, object] def close(self): self.connection.close() + + def set_next_timeout_cb(self, cb: Callable): + self.connection.set_next_timeout_cb(cb) diff --git a/requirements.txt b/requirements.txt index 9d6fbd7..5b30432 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ more-itertools==7.0.0 ply==3.11 -six==1.12.0 tblib==1.6.0 thriftrw==1.7.2 dataclasses-json==0.3.8 diff --git a/setup.py b/setup.py index d0ac9e5..ea3d576 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="cadence-client", - version="0.0.4", + version="1.0.0-beta3", author="Mohammed Firdaus", author_email="firdaus.halim@gmail.com", description="Python framework for Cadence Workflow Service", @@ -13,9 +13,17 @@ long_description_content_type="text/markdown", url="https://github.com/firdaus/cadence-python", packages=setuptools.find_packages(exclude=["cadence.tests", "cadence.spikes"]), - install_requires=["more-itertools>=7.0.0", "thriftrw>=1.7.2"], + install_requires=[ + "dataclasses-json>=0.3.8", + "more-itertools>=7.0.0", + "ply>=3.11", + "tblib>=1.6.0", + "thriftrw>=1.7.2", + ], classifiers=[ "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], From ef1e8a0375a74abca187fdb0d3027c552e618668 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 8 Oct 2020 00:38:13 +0400 Subject: [PATCH 3/6] trying to fix broken pipe error --- cadence/activity_loop.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index b0cdcce..3e3ea42 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -41,7 +41,9 @@ def activity_task_loop(worker: Worker): continue if err: logger.error("PollForActivityTask failed: %s", err) - continue + logger.info(f"trying to restart worker and returning from current method") + worker.start() + return task_token = task.task_token if not task_token: logger.debug("PollForActivityTask has no task_token (expected): %s", task) From e3fb4b14c091e5726c3a5a88aa5995be941b422c Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 8 Oct 2020 00:39:07 +0400 Subject: [PATCH 4/6] trying to fix broken pipe error --- cadence/activity_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 3e3ea42..99a9676 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -42,6 +42,7 @@ def activity_task_loop(worker: Worker): if err: logger.error("PollForActivityTask failed: %s", err) logger.info(f"trying to restart worker and returning from current method") + worker.stop() worker.start() return task_token = task.task_token From 6687615bccf606d93ea16f8e0567537c93792381 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 8 Oct 2020 00:47:48 +0400 Subject: [PATCH 5/6] trying to fix broken pipe error --- cadence/activity_loop.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 99a9676..b8d1739 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -38,13 +38,13 @@ def activity_task_loop(worker: Worker): return except Exception as ex: logger.error("PollForActivityTask error: %s", ex) - continue - if err: - logger.error("PollForActivityTask failed: %s", err) logger.info(f"trying to restart worker and returning from current method") worker.stop() worker.start() return + if err: + logger.error("PollForActivityTask failed: %s", err) + continue task_token = task.task_token if not task_token: logger.debug("PollForActivityTask has no task_token (expected): %s", task) From a5efc606737bbed16473be39c46ecb128387c7d2 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 8 Oct 2020 00:52:16 +0400 Subject: [PATCH 6/6] trying to fix broken pipe error --- cadence/activity_loop.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index b8d1739..7c6f261 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -38,10 +38,7 @@ def activity_task_loop(worker: Worker): return except Exception as ex: logger.error("PollForActivityTask error: %s", ex) - logger.info(f"trying to restart worker and returning from current method") - worker.stop() - worker.start() - return + raise if err: logger.error("PollForActivityTask failed: %s", err) continue