Skip to content

Commit 06b9997

Browse files
authored
Synchronous-proxy example in Ruby (#138)
* initial commit; broken and a bit messy with logging * save current code while I go work on adding SignalExternalWorkflow support * initial work to support SignalExternalWorkflow * define the serializer and hook it up * stub in what I think is the correct work for each event type * some fixes per antstorm advice * initial attempt at integration test * docs on testing and an improvement to existing test * encode the signal payload using correct helper * return a Future and fulfill it correctly upon completion * get the \*event_id from the right field in the command structure * modify test to verify the signal is only received once * test for failure to deliver a signal to external workflow * do not discard the failure command otherwise non-deterministic * simplify test workflow by eliminating unnecessary timer * oops, had double call to #schedule_command so signals were sent twice * edit description of example * split to separate files and improve test coverage * change method signature for consistency and a few other cleanups * oops, fix EventType name to match correct constant * cleanup to work with new #signal_external_workflow call * improve readme * add custom errors and set to nonretryable for upstream error handling * finish error handling logic * improve how errors are propagated, detected, and logged * improve the README to explain the pattern and give another example * small spelling fix * make code more idiomatic for Ruby * make Ruby a bit more idiomatic * switch to CAP_SNAKECASE for constants * remove task_queue setting since default is set in configuration.rb * remove unused argument * remove antiquated error handling and reraise exceptions instead * loop using retry until no exceptions are received
1 parent 25a666f commit 06b9997

File tree

8 files changed

+479
-0
lines changed

8 files changed

+479
-0
lines changed

examples/synchronous-proxy/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Purpose
2+
3+
This pattern is used when a non-workflow process needs to advance a workflow state
4+
machine from its initial state to its terminal state. It does this by adding input
5+
data to the workflow (via Signals) and receiving new information back from the
6+
workflow (when a secondary proxy workflow exits and returns a value).
7+
8+
The only way to add information to a workflow is via a Signal.
9+
10+
There are two ways to
11+
get information out of a workflow. One, the workflow has a Query handler and can respond
12+
to queries. However, this is limited in that Queries may not modify the state of the
13+
workflow itself. Two, the workflow can exit and return a result to its caller. This
14+
second approach is leveraged by the pattern to get information back from the primary
15+
workflow. This information could be used to determine branching behavior for the
16+
non-workflow caller.
17+
18+
The flow of calls is outlined in the diagram below.
19+
20+
![Flow Diagram](flow.png)
21+
22+
# Explanation
23+
24+
The primary use-case for this pattern is for a non-workflow process to *send and receive* data
25+
to and from a workflow. Note that a Temporal client may send a signal to a workflow but the
26+
information transfer is one-way (i.e. fire and forget). There is no mechanism for a workflow
27+
to send a signal to a non-workflow. A keen observer would note that a Query can be used to
28+
ask for information, however a Query is supposed to be idempotent and *should not cause any
29+
state change* in the workflow itself. Also, Queries imply polling for a result which is slow
30+
and inefficient. Therefore, it is not a mechanism for sending new information
31+
into a workflow and receiving a response.
32+
33+
So, the non-workflow process can communicate to a workflow by:
34+
35+
a) Starting that workflow, and
36+
37+
b) Communicating with the workflow by creating proxy workflows to signal the main workflow and
38+
then block for a response. When these proxy workflows exit, they can return the response to the
39+
caller.
40+
41+
In the real world, this pattern could be utilized for managing an interaction via a series of
42+
web pages. Imagine that a user lands on a home page and clicks a link to apply for a library
43+
card. The link hits the web application's controller and can now start the
44+
`ApplyForLibraryCardWorkflow`. The workflow ID could be returned back in a response to the caller
45+
as a session value, for example.
46+
47+
On the next page, the user can fill out the application for the library card by providing their
48+
name, address, and phone number. Upon submission of this form (via POST), the web application
49+
controller can 1) lookup the associated workflow from the session, and 2) create the
50+
`SubmitPersonalDetailsWorkflow` workflow and pass in the form data. This workflow packages up
51+
the data and signals it to the `ApplyForLibraryCardWorkflow` and waits for a response via another
52+
signal. The main workflow applies the appropriate business logic to the payload and advances its
53+
state. It then signals back to the proxy workflow the result of its work and then blocks to
54+
await new data.
55+
56+
Depending on the response from the `ApplyForLibraryCardWorkflow`, the controller can render a page
57+
to continue the application or ask for the user to correct some bad input.
58+
59+
Continue and repeat this action via the web application controller(s) as it moves the user
60+
through the entire library card application journey. By its nature, web applications are all stateless
61+
and asynchronous, so the state and behavior are encapsulated by the workflow and its associated
62+
activity outcomes. The only state outside of the workflow that the web application cares about is the
63+
session information so it can match the user back to the correct workflow.
64+
65+
# Execution
66+
67+
Open two shells / terminal windows. In one, execute:
68+
```shell
69+
ruby worker/worker.rb
70+
```
71+
In the second, execute:
72+
```shell
73+
ruby ui/main.rb
74+
```
75+
In the shell running `ui` it will ask a series of questions. Answer the questions and the
76+
program will send the appropriate signals around to complete the process. Upon completion it
77+
prints a success message.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
module SynchronousProxy
2+
class RegisterEmailActivity < Temporal::Activity
3+
def execute(email)
4+
logger.info "activity: registered email #{email}"
5+
nil
6+
end
7+
end
8+
9+
class ValidateSizeActivity < Temporal::Activity
10+
InvalidSize = Class.new(StandardError)
11+
12+
retry_policy(
13+
interval: 1,
14+
backoff: 1,
15+
max_attempts: 3,
16+
non_retriable_errors: [InvalidSize])
17+
18+
def execute(size)
19+
logger.info "activity: validate size #{size}"
20+
return nil if TShirtSizes.include?(size)
21+
22+
raise InvalidSize.new("#{size} is not a valid size choice.")
23+
end
24+
end
25+
26+
class ValidateColorActivity < Temporal::Activity
27+
InvalidColor = Class.new(StandardError)
28+
29+
retry_policy(
30+
interval: 1,
31+
backoff: 1,
32+
max_attempts: 3,
33+
non_retriable_errors: [InvalidColor])
34+
35+
def execute(color)
36+
logger.info "activity: validate color #{color}"
37+
return nil if TShirtColors.include?(color)
38+
39+
raise InvalidColor.new("#{color} is not a valid color choice.")
40+
end
41+
end
42+
43+
class ScheduleDeliveryActivity < Temporal::Activity
44+
def execute(order)
45+
delivery_date = Time.now + (2 * 60 * 60 * 24)
46+
logger.info "activity: scheduled delivery for order #{order} at #{delivery_date}"
47+
delivery_date
48+
end
49+
end
50+
51+
class SendDeliveryEmailActivity < Temporal::Activity
52+
def execute(order, order_id, delivery_date)
53+
logger.info "email to: #{order.email}, order: #{order}, scheduled delivery: #{delivery_date}"
54+
nil
55+
end
56+
end
57+
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
require 'bundler'
2+
Bundler.require :default
3+
4+
require 'temporal'
5+
6+
Temporal.configure do |config|
7+
config.host = 'localhost'
8+
config.port = 7233
9+
config.namespace = 'ruby-samples'
10+
config.task_queue = 'ui-driven'
11+
end
12+
13+
begin
14+
Temporal.register_namespace('ruby-samples', 'A safe space for playing with Temporal Ruby')
15+
rescue Temporal::NamespaceAlreadyExistsFailure
16+
nil # service was already registered
17+
end

examples/synchronous-proxy/flow.png

107 KB
Loading
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
module SynchronousProxy
2+
module Proxy
3+
# We support talking between two workflows using these helper methods. Each workflow
4+
# that wants to communicate will include this module. Unlike the Go examples which use
5+
# channels, the Ruby support is via #on_signal (to receive), Temporal.signal_workflow
6+
# (to send), and #wait_for (to block and wait for incoming signal).
7+
#
8+
# The basic trick is that we register a single #on_signal signal handler per workflow
9+
# via the #setup_signal_handler method. Each incoming signal is parsed to determine if
10+
# it's a request or response and then the appropriate ivar is set. After the signal handler
11+
# runs, this client executes the block attached to the #wait_for method to see if it
12+
# returns true. If the block evaluates that it has received a value into the ivar, it
13+
# returns true and unblocks.
14+
#
15+
module Communications
16+
REQUEST_SIGNAL_NAME = "proxy-request-signal".freeze
17+
RESPONSE_SIGNAL_NAME = "proxy-response-signal".freeze
18+
19+
SignalDetails = Struct.new(
20+
:name, :key, :value, :error, :calling_workflow_id,
21+
keyword_init: true
22+
) do
23+
def error?
24+
key == "error"
25+
end
26+
27+
def to_input
28+
[calling_workflow_id, name, key, value]
29+
end
30+
31+
def self.from_input(input)
32+
new(name: input[1], key: input[2], value: input[3], calling_workflow_id: input[0])
33+
end
34+
end
35+
36+
def setup_signal_handler
37+
w_id = workflow.metadata.id
38+
logger.info("#{self.class.name}#setup_signal_handler, Setup signal handler for workflow #{w_id}")
39+
40+
workflow.on_signal do |signal, input|
41+
logger.info("#{self.class.name}#setup_signal_handler, Received signal for workflow #{w_id}, signal #{signal}, input #{input.inspect}")
42+
details = SignalDetails.from_input(input)
43+
44+
case signal
45+
when REQUEST_SIGNAL_NAME
46+
@request_signal = details
47+
48+
when RESPONSE_SIGNAL_NAME
49+
@response_signal = details
50+
51+
else
52+
logger.warn "#{self.class.name}#setup_signal_handler, Unknown signal received"
53+
end
54+
end
55+
end
56+
57+
def wait_for_response
58+
# #workflow is defined as part of the Temporal::Workflow class and is therefore available to
59+
# any methods inside the class plus methods that are included from a Module like this one
60+
workflow.wait_for { !!@response_signal }
61+
end
62+
63+
def wait_for_request
64+
workflow.wait_for { !!@request_signal }
65+
end
66+
67+
def send_error_response(target_workflow_id, err)
68+
w_id = workflow.metadata.id
69+
70+
logger.info("#{self.class.name}#send_error_response, Sending error response from #{w_id} to #{target_workflow_id}")
71+
logger.info("#{self.class.name}#send_error_response, err is #{err.inspect}")
72+
details = SignalDetails.new(key: "error", value: err, calling_workflow_id: w_id)
73+
workflow.signal_external_workflow(workflow, RESPONSE_SIGNAL_NAME, target_workflow_id, "", details.to_input)
74+
nil
75+
end
76+
77+
def send_response(target_workflow_id, key, value)
78+
w_id = workflow.metadata.id
79+
80+
logger.info("#{self.class.name}#send_response, Sending response from #{w_id} to #{target_workflow_id}")
81+
details = SignalDetails.new(key: key, value: value, calling_workflow_id: w_id)
82+
workflow.signal_external_workflow(workflow, RESPONSE_SIGNAL_NAME, target_workflow_id, "", details.to_input)
83+
nil
84+
end
85+
86+
def send_request(target_workflow_id, key, value)
87+
w_id = workflow.metadata.id
88+
89+
logger.info("#{self.class.name}#send_request, Sending request from #{w_id} to #{target_workflow_id}, key #{key}, value #{value}, calling workflow #{w_id}")
90+
details = SignalDetails.new(key: key, value: value, calling_workflow_id: w_id)
91+
workflow.signal_external_workflow(workflow, REQUEST_SIGNAL_NAME, target_workflow_id, "", details.to_input)
92+
nil
93+
end
94+
95+
def receive_response(description="unknown")
96+
@response_signal = nil
97+
w_id = workflow.metadata.id
98+
Temporal.logger.info("#{self.class.name}#receive_response, Waiting for response on [#{description}] in workflow #{w_id}")
99+
wait_for_response
100+
@response_signal
101+
end
102+
103+
def receive_request(description="unknown")
104+
@request_signal = nil
105+
w_id = workflow.metadata.id
106+
Temporal.logger.info("#{self.class.name}#receive_request, Waiting for request on [#{description}] in workflow #{w_id}")
107+
wait_for_request
108+
@request_signal
109+
end
110+
end
111+
end
112+
end

examples/synchronous-proxy/ui/main.rb

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
require_relative "../configuration"
2+
require_relative "../workflows"
3+
4+
module SynchronousProxy
5+
module UI
6+
class Main
7+
def run
8+
random_id = rand(999_999_999)
9+
sequence_no = 0
10+
status = create_order(random_id, sequence_no)
11+
12+
sequence_no += 1
13+
email = prompt_and_read_input("Please enter you email address:")
14+
status = update_order(random_id: random_id, sequence_no: sequence_no, order_id: status.order_id, stage: SynchronousProxy::RegisterStage, value: email)
15+
puts "status #{status.inspect}"
16+
17+
sequence_no += 1
18+
begin
19+
size = prompt_and_read_input("Please enter your requested size:")
20+
status = update_order(random_id: random_id, sequence_no: sequence_no, order_id: status.order_id, stage: SynchronousProxy::SizeStage, value: size)
21+
puts "status #{status.inspect}"
22+
rescue SynchronousProxy::ValidateSizeActivity::InvalidSize => e
23+
STDERR.puts e.message
24+
retry
25+
end
26+
27+
sequence_no += 1
28+
begin
29+
color = prompt_and_read_input("Please enter your required tshirt color:")
30+
status = update_order(random_id: random_id, sequence_no: sequence_no, order_id: status.order_id, stage: SynchronousProxy::ColorStage, value: color)
31+
puts "status #{status.inspect}"
32+
rescue SynchronousProxy::ValidateColorActivity::InvalidColor => e
33+
STDERR.puts e.message
34+
retry
35+
end
36+
37+
puts "Thanks for your order!"
38+
puts "You will receive an email with shipping details shortly"
39+
puts "Exiting at #{Time.now}"
40+
end
41+
42+
def create_order(random_id, sequence_no)
43+
w_id = "new-tshirt-order-#{random_id}-#{sequence_no}"
44+
workflow_options = {workflow_id: w_id}
45+
Temporal.start_workflow(SynchronousProxy::OrderWorkflow, options: workflow_options)
46+
status = SynchronousProxy::OrderStatus.new
47+
status.order_id = w_id
48+
status
49+
end
50+
51+
def update_order(random_id:, sequence_no:, order_id:, stage:, value:)
52+
w_id = "update_#{stage}_#{random_id}-#{sequence_no}"
53+
workflow_options = {workflow_id: w_id}
54+
run_id = Temporal.start_workflow(SynchronousProxy::UpdateOrderWorkflow, order_id, stage, value, options: workflow_options)
55+
Temporal.await_workflow_result(SynchronousProxy::UpdateOrderWorkflow, workflow_id: w_id, run_id: run_id)
56+
end
57+
58+
def prompt_and_read_input(prompt)
59+
print(prompt + " ")
60+
gets.chomp
61+
end
62+
end
63+
end
64+
end
65+
66+
if $0 == __FILE__
67+
SynchronousProxy::UI::Main.new.run
68+
end
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
require_relative "../configuration"
2+
require_relative "../workflows"
3+
require_relative "../activities"
4+
require 'temporal/worker'
5+
6+
worker = Temporal::Worker.new
7+
worker.register_workflow(SynchronousProxy::OrderWorkflow)
8+
worker.register_workflow(SynchronousProxy::UpdateOrderWorkflow)
9+
worker.register_workflow(SynchronousProxy::ShippingWorkflow)
10+
worker.register_activity(SynchronousProxy::RegisterEmailActivity)
11+
worker.register_activity(SynchronousProxy::ValidateSizeActivity)
12+
worker.register_activity(SynchronousProxy::ValidateColorActivity)
13+
worker.register_activity(SynchronousProxy::ScheduleDeliveryActivity)
14+
worker.register_activity(SynchronousProxy::SendDeliveryEmailActivity)
15+
worker.start

0 commit comments

Comments
 (0)