Skip to content

Commit 21252c4

Browse files
calum-stripenagl-stripejeffschoner-stripedrewhoskins-stripechristopherb-stripe
authored
Adding payload codec pipeline (#224)
* Revert signal_with_start (before taking a different approach) * Add signal arguments to start_workflow (to support signal_with_start) * Merge memo changes * Address PR feedback * Update method signature in temporal test fixture * Add detail to a few error messages * Update our FailWorkflowTask logic's call to ErrorHandler.handle * Workflow await * Make dispatch more generic * Fix race condition * Merge await into wait_for * Update sample workflow to use wait_for, rename to WaitForWorkflow * Reorganize and extend wait_for tests * Check for completed futures before setting dispatcher and yielding the fiber * Extend wait_for to take multiple futures and a condition block * Differentiate TARGET_WILDCARD and WILDCARD, allow comparison with EventTarget objects * Use Ruby 2.7 to be consistent with pay-server * Turn off schedule_to_start activity timeout by default * Refactor metadata generation * Make task queue available on workflow metadata, add example test * Expose workflow start time metadata * Add memos * Make error deserialization more resilient * Make temporal-ruby able to deserialize larger histories * Remove temporary test * Expose workflow name in activity metadata in temporal-ruby's unit tester * Add a workflow-level test * add namespace to emitted metrics * emit the workflow name tag during activity processing * fix typo * added failworkflowtaskerror * updated register namespace to accept new params * examples * fixed namespace test * empty * updated unit tests * removed unncessary code * updated seconds * Add replay flag to workflow context * fixed nits * Rename to replay? to history_replaying? * updated sleep to 0.5 * updated unit tests and nits * fixed unit tests * added link to comment * Merge fix * Fix upsert_search_attributes * added fix for nil search attributes * added unit test * updated unit test * added expect to be nil * Expose scheduled_time and current_attempt_scheduled_time on activity metadata * Implement ParentClosePolicy for child workflows * Add e2e test for child workflow execution * move serialization logic farther down the stack * Refactor serialize_parent_close_policy; add unit tests * Expose wait_for_start for child workflow execution * Remove future `workflow_id,run_id` annotations; simplify wait_for logic * Add parent_run_id, parent_id to workflow metadata * Allow opting out of child workflow futures * Remove duplicate describe block * Factor out workflow_id_reuse_policy serialization * Respect workflow_id_reuse_policy for child workflows * Add integration tests * Use a nicer exception type * Refactor wait_for into distinct wait_for_any and wait_for_condition methods * Order wildcard dispatch handlers * Remove finished handlers * Check finished? on wait_for_any, add more unit specs * More dispatch unit specs * Use hash instead of list for callbacks per target * Remove dead code, improve error messages in local workflow context * Correct swapped arguments * Eliminate unnecessary IDs for dispatcher handlers * Raise on duplicate ID * added paginated workflows * client spec * reset * Downstream the rest of [activity_metadata.workflow_name](#130) * Downstream the rest of [activity_metadata.scheduled_time](#164) * Fix an activity_metadata related test that doesn't exist upstream * Downstream the rest of [child workflow workflow_id_reuse_policy fixes](#172) * Downstream [merge error fix](#180) * added json protobuf * Update json_protobuf.rb * added unit test * Remove duplicate tests in context_spec * allow connection options to be set * add missing comma * add test for interceptors * remove unused variable * use new Temporal client in interceptor test to avoid test pollution * add option to specify search attributes when starting workflows * move empty check out of Temporal::Workflow::Context::Helpers.process_search_attributes * move process_search_attributes out of ExecutionOptions.initialize * allow default search attributes to be configured globally * fix tests, unit test global default search attributes * add unit test for gRPC serialization * clean up the integration test * Include remaining changes from upstream #188 (#98) ### Summary This PR is a follow-up to #97 that includes the changes made to the corresponding upstream PR ([github.com/#188](#188)) after that downstream PR was eagerly merged (with a subset of the changes from upstream). The changes that were not included in the downstream PR when it was merged are: [compare](https://github.com/coinbase/temporal-ruby/pull/188/files/05b6eafeb43ac717c15ae683e6249c4de876ef3d..c6f614325695cc666e066aa218a329c9bf7504f3) (that should be identical to the changes in this PR). With the merging of the upstream PR, I also updated the [non-upstreamed changed doc](https://paper.dropbox.com/doc/Non-upstreamed-temporal-ruby-changes--Bm39qa99fshz6nbw9RG37pgFAg-AYkwiCfkcoM66adjZMwAZ) to remove the entry on Initial workflow search attributes. ### Motivation The intention of this PR is to synchronize upstream and downstream, with relation to the changeset applied in the upstream PR. ### Test plan N/A ### Rollout/monitoring/revert plan Safe to revert. * Remove dead code from previous messy merge * Separate wait_until handlers, execute at end * Modify signal_with_start_workflow to cover dwillett's repro * Disable running rubyfmt on save * Consolidate metrics constants * Add workflow task failure counter * Use metric_keys.rb for filename * Allow client identity to be configurable * Use PID instead of thread ID in default identity * Add poller completed metrics * Gauge metrics for queue size and available threads per thread pool * Add task queue and namespace to thread pool metrics * Fix tag * reverted proto_json changes * fixed import * remmoved test file * Revert "Merge pull request #108 from stripe-private-oss-forks/calum/fixing-import" This reverts commit 48cbe20, reversing changes made to e805a38. * Revert "Merge pull request #107 from stripe-private-oss-forks/calum/reverting-json-proto-changes" This reverts commit e805a38, reversing changes made to 3deda64. * Exempt the necessary system workflows from normal JSON proto deserialization * Emit task queue for workflow task failures * Added task queue tags to workflow and activity task queue time metrics * DynamicActivity * Use const_get * Cleanup error message * Jeff feedback * do not fail workflow task if completing it errors It's possible for a workflow task to fail to complete even during normal operation. For example, if a signal is added to the workflow history concurrently with a workflow task attempting to complete the workflow then the `RespondWorkflowTaskCompleted` call will recieve an `InvalidArgument` error with an `UnhandledCommand` cause. If this happens then the Ruby SDK would get this error and then try and fail the workflow task due to how the `rescue` block encompassed the completion function. This would then lead to the `RespondWorkflowTaskFailed` call failing because the server doesn't recognize the task anymore. This is because the task was actually finalized when the original `RespondWorkflowTaskCompleted` call was made and so it should not be interacted with anymore. * add a comment * use more realistic error type in tests * Remove orphaned code post-merge * Allow empty pages when paginating through history * Remove unused error * Allow ActivityException to accept args * Improve deserialization code flow * Use the default converter to serialize errors when configured to do so * Get tests working * Cleanup * fix nit * Fix extraneous debug spew * added max page size param * Show bad data on Activity error serialization failure * Upgrade Temporal proto API to version 1.16 * Rename Temporal -> Temporalio * Remove deprecated namespace field for activity task scheduling * Methods for operating on custom search attributes * Example test for custom search attribute APIs * Unit tests * Treat missing history submessage on GetWorkflowExecutionHistoryResponse as timeout * Remove unnecessary &. * Dynamic Workflows * Add terminate-if-running workflow id reuse policy mappings * Add integration test for terminate-if-running * Move misplaced test * Feedback * Use terminate-if-running as policy for both invocations * added payload codecs * added search attribute payload methods * fixed tests * added codec tests * added chain tests * updated nits * Revert "Merge branch 'master' into calum/adding-payload-codecs" This reverts commit 145290c, reversing changes made to d8a42d8. * fixed tests * fixed * examples/lib/crypt_payload_codec.rb --------- Co-authored-by: Nathan Glass <[email protected]> Co-authored-by: Jeff Schoner <[email protected]> Co-authored-by: Andrew Hoskins <[email protected]> Co-authored-by: Christopher Brown <[email protected]> Co-authored-by: Arya Kumar <[email protected]> Co-authored-by: Joseph Azevedo <[email protected]> Co-authored-by: Jeff Schoner <[email protected]> Co-authored-by: Bryton Shoffner <[email protected]>
1 parent 6e5fc34 commit 21252c4

File tree

16 files changed

+272
-38
lines changed

16 files changed

+272
-38
lines changed

examples/bin/worker

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env ruby
22
require_relative '../init'
3-
require_relative '../lib/cryptconverter'
3+
require_relative '../lib/crypt_payload_codec'
44

55
require 'temporal/worker'
66

@@ -11,8 +11,10 @@ Dir[File.expand_path('../middleware/*.rb', __dir__)].each { |f| require f }
1111
if !ENV['USE_ENCRYPTION'].nil?
1212
Temporal.configure do |config|
1313
config.task_queue = 'crypt'
14-
config.converter = Temporal::CryptConverter.new(
15-
payload_converter: Temporal::Configuration::DEFAULT_CONVERTER
14+
config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new(
15+
payload_codecs: [
16+
Temporal::CryptPayloadCodec.new
17+
]
1618
)
1719
end
1820
end

examples/lib/cryptconverter.rb renamed to examples/lib/crypt_payload_codec.rb

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
require 'openssl'
2+
require 'temporal/connection/converter/codec/base'
23

34
module Temporal
4-
class CryptConverter < Temporal::Connection::Converter::Base
5+
class CryptPayloadCodec < Temporal::Connection::Converter::Codec::Base
56
CIPHER = 'aes-256-gcm'.freeze
67
GCM_NONCE_SIZE = 12
78
GCM_TAG_SIZE = 16
@@ -10,26 +11,23 @@ class CryptConverter < Temporal::Connection::Converter::Base
1011
METADATA_ENCODING_KEY = 'encoding'.freeze
1112
METADATA_ENCODING = 'binary/encrypted'.freeze
1213

13-
def to_payloads(data)
14+
def encode(payload)
15+
return nil if payload.nil?
16+
1417
key_id = get_key_id
1518
key = get_key(key_id)
1619

17-
payloads = super(data)
18-
19-
Temporalio::Api::Common::V1::Payloads.new(
20-
payloads: payloads.payloads.map { |payload| encrypt_payload(payload, key_id, key) }
21-
)
20+
encrypt_payload(payload, key_id, key)
2221
end
22+
23+
def decode(payload)
24+
return nil if payload.nil?
2325

24-
def from_payloads(payloads)
25-
return nil if payloads.nil?
26-
27-
payloads.payloads.map do |payload|
28-
if payload.metadata[METADATA_ENCODING_KEY] == METADATA_ENCODING
29-
payload = decrypt_payload(payload)
30-
end
31-
from_payload(payload)
26+
if payload.metadata[METADATA_ENCODING_KEY] == METADATA_ENCODING
27+
payload = decrypt_payload(payload)
3228
end
29+
30+
payload
3331
end
3432

3533
private

examples/spec/integration/converter_spec.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
require 'workflows/hello_world_workflow'
2-
require 'lib/cryptconverter'
2+
require 'lib/crypt_payload_codec'
33
require 'grpc/errors'
44

55
describe 'Converter', :integration do
@@ -8,8 +8,10 @@
88

99
Temporal.configure do |config|
1010
config.task_queue = 'crypt'
11-
config.converter = Temporal::CryptConverter.new(
12-
payload_converter: Temporal::Configuration::DEFAULT_CONVERTER
11+
config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new(
12+
payload_codecs: [
13+
Temporal::CryptPayloadCodec.new
14+
]
1315
)
1416
end
1517

@@ -65,8 +67,8 @@
6567
completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first
6668
result = completion_event.workflow_execution_completed_event_attributes.result
6769

68-
converter = Temporal.configuration.converter
70+
payload_codec = Temporal.configuration.payload_codec
6971

70-
expect(converter.from_payloads(result)&.first).to eq('Hello World, Tom')
72+
expect(payload_codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"')
7173
end
7274
end

lib/temporal/concerns/payloads.rb

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ module Temporal
22
module Concerns
33
module Payloads
44
def from_payloads(payloads)
5+
payloads = payload_codec.decodes(payloads)
56
payload_converter.from_payloads(payloads)
67
end
78

89
def from_payload(payload)
10+
payload = payload_codec.decode(payload)
911
payload_converter.from_payload(payload)
1012
end
1113

14+
def from_payload_map_without_codec(payload_map)
15+
payload_map.map { |key, value| [key, payload_converter.from_payload(value)] }.to_h
16+
end
17+
1218
def from_result_payloads(payloads)
1319
from_payloads(payloads)&.first
1420
end
@@ -30,11 +36,20 @@ def from_payload_map(payload_map)
3036
end
3137

3238
def to_payloads(data)
33-
payload_converter.to_payloads(data)
39+
payloads = payload_converter.to_payloads(data)
40+
payload_codec.encodes(payloads)
3441
end
3542

3643
def to_payload(data)
37-
payload_converter.to_payload(data)
44+
payload = payload_converter.to_payload(data)
45+
payload_codec.encode(payload)
46+
end
47+
48+
def to_payload_map_without_codec(data)
49+
# skips the payload_codec step because search attributes don't use this pipeline
50+
data.transform_values do |value|
51+
payload_converter.to_payload(value)
52+
end
3853
end
3954

4055
def to_result_payloads(data)
@@ -62,6 +77,10 @@ def to_payload_map(data)
6277
def payload_converter
6378
Temporal.configuration.converter
6479
end
80+
81+
def payload_codec
82+
Temporal.configuration.payload_codec
83+
end
6584
end
6685
end
6786
end

lib/temporal/configuration.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
require 'temporal/connection/converter/payload/json'
88
require 'temporal/connection/converter/payload/proto_json'
99
require 'temporal/connection/converter/composite'
10+
require 'temporal/connection/converter/codec/chain'
1011

1112
module Temporal
1213
class Configuration
1314
Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true)
1415
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)
1516

1617
attr_reader :timeouts, :error_handlers
17-
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators
18+
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, :payload_codec
1819

1920
# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
2021
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
@@ -45,6 +46,14 @@ class Configuration
4546
Temporal::Connection::Converter::Payload::JSON.new
4647
]
4748
).freeze
49+
50+
# The Payload Codec is an optional step that happens between the wire and the Payload Converter:
51+
# Temporal Server <--> Wire <--> Payload Codec <--> Payload Converter <--> User code
52+
# which can be useful for transformations such as compression and encryption
53+
# more info at https://docs.temporal.io/security#payload-codec
54+
DEFAULT_PAYLOAD_CODEC = Temporal::Connection::Converter::Codec::Chain.new(
55+
payload_codecs: []
56+
).freeze
4857

4958
def initialize
5059
@connection_type = :grpc
@@ -55,6 +64,7 @@ def initialize
5564
@task_queue = DEFAULT_TASK_QUEUE
5665
@headers = DEFAULT_HEADERS
5766
@converter = DEFAULT_CONVERTER
67+
@payload_codec = DEFAULT_PAYLOAD_CODEC
5868
@use_error_serialization_v2 = false
5969
@error_handlers = []
6070
@credentials = :this_channel_is_insecure
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
module Temporal
2+
module Connection
3+
module Converter
4+
module Codec
5+
class Base
6+
def encodes(payloads)
7+
return nil if payloads.nil?
8+
9+
Temporalio::Api::Common::V1::Payloads.new(
10+
payloads: payloads.payloads.map(&method(:encode))
11+
)
12+
end
13+
14+
def decodes(payloads)
15+
return nil if payloads.nil?
16+
17+
Temporalio::Api::Common::V1::Payloads.new(
18+
payloads: payloads.payloads.map(&method(:decode))
19+
)
20+
end
21+
22+
def encode(payload)
23+
# should return Temporalio::Api::Common::V1::Payload
24+
raise NotImplementedError, 'codec converter needs to implement encode'
25+
end
26+
27+
def decode(payload)
28+
# should return Temporalio::Api::Common::V1::Payload
29+
raise NotImplementedError, 'codec converter needs to implement decode'
30+
end
31+
end
32+
end
33+
end
34+
end
35+
end
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
require 'temporal/connection/converter/codec/base'
2+
3+
module Temporal
4+
module Connection
5+
module Converter
6+
module Codec
7+
# Performs encoding/decoding on the payloads via the given payload codecs. When encoding
8+
# the codecs are applied last to first meaning the earlier encodings wrap the later ones.
9+
# When decoding, the codecs are applied first to last to reverse the effect.
10+
class Chain < Base
11+
def initialize(payload_codecs:)
12+
@payload_codecs = payload_codecs
13+
end
14+
15+
def encode(payload)
16+
payload_codecs.reverse_each do |payload_codec|
17+
payload = payload_codec.encode(payload)
18+
end
19+
payload
20+
end
21+
22+
def decode(payload)
23+
payload_codecs.each do |payload_codec|
24+
payload = payload_codec.decode(payload)
25+
end
26+
payload
27+
end
28+
29+
private
30+
31+
attr_reader :payload_codecs
32+
end
33+
end
34+
end
35+
end
36+
end

lib/temporal/connection/grpc.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def start_workflow_execution(
138138
fields: to_payload_map(memo || {})
139139
),
140140
search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new(
141-
indexed_fields: to_payload_map(search_attributes || {})
141+
indexed_fields: to_payload_map_without_codec(search_attributes || {})
142142
),
143143
)
144144

@@ -401,7 +401,7 @@ def signal_with_start_workflow_execution(
401401
fields: to_payload_map(memo || {})
402402
),
403403
search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new(
404-
indexed_fields: to_payload_map(search_attributes || {})
404+
indexed_fields: to_payload_map_without_codec(search_attributes || {})
405405
),
406406
)
407407

lib/temporal/connection/serializer/continue_as_new.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def serialize_memo(memo)
4343
def serialize_search_attributes(search_attributes)
4444
return unless search_attributes
4545

46-
Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map(search_attributes))
46+
Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes))
4747
end
4848
end
4949
end

lib/temporal/connection/serializer/start_child_workflow.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def serialize_parent_close_policy(parent_close_policy)
6666
def serialize_search_attributes(search_attributes)
6767
return unless search_attributes
6868

69-
Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map(search_attributes))
69+
Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes))
7070
end
7171
end
7272
end

0 commit comments

Comments
 (0)