Skip to content
This repository was archived by the owner on Sep 18, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 86 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,65 +1,46 @@
# Python framework for Cadence Workflow Service
# Intro: Fault-Oblivious Stateful Python Code

[Cadence](https://github.com/uber/cadence) is a workflow engine developed at Uber Engineering. With this framework, workflows and activities managed by Cadence can be implemented in Python code.
cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off.

## Status / TODO
This programming model is useful whenever you need to ensure that a function runs to completion. For example:

cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production
version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ March 2020.
- Business logic involving multiple micro services
- CI/CD pipelines
- Data pipelines
- RPA
- ETL
- Marketing automation / Customer journeys / Customer engagement
- Zapier/IFTTT like end user automation.
- Chat bots
- Multi-step forms
- Scheduler/Cron jobs

1.0
- [x] Tchannel implementation
- [x] Python-friendly wrapper around Cadence's Thrift API
- [x] Author activities in Python
- [x] Start workflows (synchronously)
- [x] Create workflows
- [x] Workflow execution in coroutines
- [x] Invoke activities from workflows
- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally
- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn
- [x] Activity retry
- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution()
- [x] Signals
- [x] Queries
- [x] Async workflow execution
- [x] await
- [x] now (currentTimeMillis)
- [x] Sleep
- [x] Loggers
- [x] newRandom
- [x] UUID
- [x] Workflow Versioning
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);
Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence) as its backend.

1.1
- [ ] ActivityStub and Workflow.newUntypedActivityStub
- [ ] Classes as arguments and return values to/from activity and workflow methods
- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub
- [ ] Custom workflow ids through start() and new_workflow_stub()
- [ ] ContinueAsNew
- [ ] Compatibility with Java client
- [ ] Compatibility with Golang client
For more information about the fault-oblivious programming model refer to the Cadence documentation [here](https://cadenceworkflow.io/docs/03_concepts/01_workflows)

2.0
- [ ] Sticky workflows
## Install Cadencce

Post 2.0:
- [ ] sideEffect/mutableSideEffect
- [ ] Parallel activity execution
- [ ] Timers
- [ ] Cancellation Scopes
- [ ] Child Workflows
- [ ] Explicit activity ids for activity invocations
```
wget https://raw.githubusercontent.com/uber/cadence/master/docker/docker-compose.yml
docker-compose up
```

## Installation
## Register `sample` domain

```
pip install cadence-client
docker run --network=host --rm ubercadence/cli:master --do sample domain register -rd 1
```

## Hello World Sample
## Installation cadence-python

```
pip install cadence-client==1.0.0b3
```

## Hello World Sample

```python
import sys
import logging
from cadence.activity_method import activity_method
Expand All @@ -82,7 +63,7 @@ class GreetingActivities:
# Activities Implementation
class GreetingActivitiesImpl:
def compose_greeting(self, greeting: str, name: str):
return greeting + " " + name + "!"
return f"{greeting} {name}!"


# Workflow Interface
Expand All @@ -99,6 +80,9 @@ class GreetingWorkflowImpl(GreetingWorkflow):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

async def get_greeting(self, name):
# Place any Python code here that you want to ensure is executed to completion.
# Note: code in workflow functions must be deterministic so that the same code paths
# are ran during replay.
return await self.greeting_activities.compose_greeting("Hello", name)


Expand All @@ -118,4 +102,56 @@ if __name__ == '__main__':
worker.stop()
print("Workers stopped...")
sys.exit(0)
```
```

## Status / TODO

cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production
version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ ~~March 2020~~ April 2020.

1.0
- [x] Tchannel implementation
- [x] Python-friendly wrapper around Cadence's Thrift API
- [x] Author activities in Python
- [x] Start workflows (synchronously)
- [x] Create workflows
- [x] Workflow execution in coroutines
- [x] Invoke activities from workflows
- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally
- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn
- [x] Activity retry
- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution()
- [x] Signals
- [x] Queries
- [x] Async workflow execution
- [x] await
- [x] now (currentTimeMillis)
- [x] Sleep
- [x] Loggers
- [x] newRandom
- [x] UUID
- [x] Workflow Versioning
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);

1.1
- [ ] ActivityStub and Workflow.newUntypedActivityStub
- [ ] Classes as arguments and return values to/from activity and workflow methods
- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub
- [ ] Custom workflow ids through start() and new_workflow_stub()
- [ ] ContinueAsNew
- [ ] Compatibility with Java client
- [ ] Compatibility with Golang client

2.0
- [ ] Sticky workflows

Post 2.0:
- [ ] sideEffect/mutableSideEffect
- [ ] Local activity
- [ ] Parallel activity execution
- [ ] Timers
- [ ] Cancellation Scopes
- [ ] Child Workflows
- [ ] Explicit activity ids for activity invocations


14 changes: 11 additions & 3 deletions cadence/activity_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
from cadence.cadence_types import PollForActivityTaskRequest, TaskListMetadata, TaskList, PollForActivityTaskResponse
from cadence.conversions import json_to_args
from cadence.workflowservice import WorkflowService
from cadence.worker import Worker
from cadence.worker import Worker, StopRequestedException

logger = logging.getLogger(__name__)


def activity_task_loop(worker: Worker):
service: WorkflowService = WorkflowService.create(worker.host, worker.port)
service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout())
worker.manage_service(service)
logger.info(f"Activity task worker started: {WorkflowService.get_identity()}")
try:
while True:
if worker.is_stop_requested():
return
try:
service.set_next_timeout_cb(worker.raise_if_stop_requested)

polling_start = datetime.datetime.now()
polling_request = PollForActivityTaskRequest()
polling_request.task_list_metadata = TaskListMetadata()
Expand All @@ -32,9 +34,11 @@ def activity_task_loop(worker: Worker):
task, err = service.poll_for_activity_task(polling_request)
polling_end = datetime.datetime.now()
logger.debug("PollForActivityTask: %dms", (polling_end - polling_start).total_seconds() * 1000)
except StopRequestedException:
return
except Exception as ex:
logger.error("PollForActivityTask error: %s", ex)
continue
raise
if err:
logger.error("PollForActivityTask failed: %s", err)
continue
Expand Down Expand Up @@ -75,4 +79,8 @@ def activity_task_loop(worker: Worker):
process_end = datetime.datetime.now()
logger.info("Process ActivityTask: %dms", (process_end - process_start).total_seconds() * 1000)
finally:
try:
service.close()
except:
logger.warning("service.close() failed", exc_info=1)
worker.notify_thread_stopped()
23 changes: 23 additions & 0 deletions cadence/activity_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ async def stub_activity_fn(self, *args):
assert self._decision_context
assert stub_activity_fn._execute_parameters
parameters = copy.deepcopy(stub_activity_fn._execute_parameters)
if hasattr(self, "_activity_options") and self._activity_options:
self._activity_options.fill_execute_activity_parameters(parameters)
if self._retry_parameters:
parameters.retry_parameters = self._retry_parameters
parameters.input = args_to_json(args).encode("utf-8")
Expand All @@ -80,3 +82,24 @@ async def stub_activity_fn(self, *args):
raise Exception("activity_method must be called with arguments")
else:
return wrapper


@dataclass
class ActivityOptions:
schedule_to_close_timeout_seconds: int = None
schedule_to_start_timeout_seconds: int = None
start_to_close_timeout_seconds: int = None
heartbeat_timeout_seconds: int = None
task_list: str = None

def fill_execute_activity_parameters(self, execute_parameters: ExecuteActivityParameters):
if self.schedule_to_close_timeout_seconds is not None:
execute_parameters.schedule_to_close_timeout_seconds = self.schedule_to_close_timeout_seconds
if self.schedule_to_start_timeout_seconds is not None:
execute_parameters.schedule_to_start_timeout_seconds = self.schedule_to_start_timeout_seconds
if self.start_to_close_timeout_seconds is not None:
execute_parameters.start_to_close_timeout_seconds = self.start_to_close_timeout_seconds
if self.heartbeat_timeout_seconds is not None:
execute_parameters.heartbeat_timeout_seconds = self.heartbeat_timeout_seconds
if self.task_list is not None:
execute_parameters.task_list = self.task_list
15 changes: 10 additions & 5 deletions cadence/connection.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import os
import getpass
import socket
from dataclasses import dataclass
from io import BytesIO
from typing import IO, List, Union, Optional, Dict
from typing import IO, List, Union, Optional, Dict, Callable

from cadence.frames import InitReqFrame, Frame, Arg, CallReqFrame, CallReqContinueFrame, CallResFrame, \
CallResContinueFrame, FrameWithArgs, CallFlags, ErrorFrame
Expand Down Expand Up @@ -194,7 +194,7 @@ def default_tchannel_headers():
@staticmethod
def default_application_headers():
return {
"user-name": os.environ.get("LOGNAME", os.getlogin()),
"user-name": getpass.getuser(),
"host-name": socket.gethostname(),
# Copied from Java client
"cadence-client-library-version": "2.2.0",
Expand Down Expand Up @@ -299,19 +299,23 @@ class TChannelConnection:
s: socket.socket

@classmethod
def open(cls, host: object, port: object) -> TChannelConnection:
def open(cls, host: object, port: object, timeout: int = None) -> TChannelConnection:
s: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect((host, port))
return cls(s)

def __init__(self, s: socket):
self.s = s
self.file = self.s.makefile("rwb")
self.wrapper = IOWrapper(self.file)
self.wrapper = IOWrapper(self.file, socket_=s)
self.current_id = -1

self.handshake()

def set_next_timeout_cb(self, cb: Callable):
self.wrapper.set_next_timeout_cb(cb)

def new_id(self):
self.current_id += 1
return self.current_id
Expand Down Expand Up @@ -339,6 +343,7 @@ def read_frame(self):

def close(self):
self.s.close()
self.wrapper.close()

def call_function(self, call: ThriftFunctionCall) -> ThriftFunctionResponse:
frames = call.build_frames(self.new_id())
Expand Down
4 changes: 4 additions & 0 deletions cadence/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@

CODE_OK = 0x00
CODE_ERROR = 0x01

# This should be at least 60 seconds because Cadence will reply after 60 seconds when polling
# if there is nothing pending
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
Loading