Skip to content

Commit 0a1020a

Browse files
authored
feat: support commit_options (#364)
* feat: Adding support for max_commit_delay commit_optins This change introduces the ability to configure transaction commit behavior using the commit_options parameter. Specifically, it enables setting: max_commit_delay: To fine-tune commit behavior for throughput-optimized writes. return_commit_stats: To request commit statistics. Fixes #365
1 parent fa44725 commit 0a1020a

File tree

6 files changed

+63
-8
lines changed

6 files changed

+63
-8
lines changed

lib/active_record/connection_adapters/spanner/database_statements.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,17 +229,27 @@ def execute_ddl statements
229229

230230
# Transaction
231231

232-
def transaction requires_new: nil, isolation: nil, joinable: true
232+
def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs
233+
commit_options = kwargs.delete :commit_options
234+
233235
if !requires_new && current_transaction.joinable?
234236
return super
235237
end
236238

237239
backoff = 0.2
238240
begin
239-
super
241+
super do
242+
# Once the transaction has been started by `super`, apply your custom options
243+
# to the Spanner transaction object.
244+
if commit_options && @connection.current_transaction
245+
@connection.current_transaction.set_commit_options commit_options
246+
end
247+
248+
yield
249+
end
240250
rescue ActiveRecord::StatementInvalid => err
241251
if err.cause.is_a? Google::Cloud::AbortedError
242-
sleep(delay_from_aborted(err) || backoff *= 1.3)
252+
sleep(delay_from_aborted(err) || (backoff *= 1.3))
243253
retry
244254
end
245255
raise

lib/activerecord_spanner_adapter/connection.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ def self.spanners config
4444
# Call this method if you drop and recreate a database with the same name
4545
# to prevent the cached information to be used for the new database.
4646
def self.reset_information_schemas!
47+
return unless @database
48+
4749
@information_schemas.each_value do |info_schema|
4850
info_schema.connection.disconnect!
4951
end

lib/activerecord_spanner_adapter/transaction.rb

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@
77
module ActiveRecordSpannerAdapter
88
class Transaction
99
attr_reader :state
10+
attr_reader :commit_options
1011

11-
def initialize connection, isolation
12+
13+
14+
def initialize connection, isolation, commit_options = nil
1215
@connection = connection
1316
@isolation = isolation
1417
@committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash)
1518
@state = :INITIALIZED
1619
@sequence_number = 0
1720
@mutations = []
21+
@commit_options = commit_options
1822
end
1923

2024
def active?
@@ -95,14 +99,23 @@ def next_sequence_number
9599
@sequence_number += 1 if @committable
96100
end
97101

102+
# Sets the commit options for this transaction.
103+
# This is used to set the options for the commit RPC, such as return_commit_stats and max_commit_delay.
104+
def set_commit_options options # rubocop:disable Naming/AccessorMethodName
105+
@commit_options = options&.dup
106+
end
107+
98108
def commit
99109
raise "This transaction is not active" unless active?
100110

101111
begin
102112
# Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations.
103113
force_begin_read_write if @committable && !@mutations.empty? && !@grpc_transaction
104-
105-
@connection.session.commit_transaction @grpc_transaction, @mutations if @committable && @grpc_transaction
114+
if @committable && @grpc_transaction
115+
@connection.session.commit_transaction @grpc_transaction,
116+
@mutations,
117+
commit_options: commit_options
118+
end
106119
@state = :COMMITTED
107120
rescue Google::Cloud::NotFoundError => e
108121
if @connection.session_not_found? e

lib/spanner_client_ext.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ def create_session instance_id, database_id, labels: nil
2323
end
2424

2525
class Session
26-
def commit_transaction transaction, mutations = []
26+
def commit_transaction transaction, mutations = [], commit_options: nil
2727
ensure_service!
2828

2929
resp = service.commit(
3030
path,
3131
mutations,
32-
transaction_id: transaction.transaction_id
32+
transaction_id: transaction.transaction_id,
33+
commit_options: commit_options
3334
)
3435
@last_updated_at = Time.now
3536
Convert.timestamp_to_time resp.commit_timestamp

test/activerecord_spanner_adapter/transaction_test.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ def test_rollback
3232
assert_equal :ROLLED_BACK, transaction.state
3333
end
3434

35+
def test_commit_options
36+
transaction.begin
37+
transaction.set_commit_options return_commit_stats: true, max_commit_delay: 1000
38+
transaction.commit
39+
assert_equal :COMMITTED, transaction.state
40+
commit_options = transaction.commit_options
41+
assert commit_options[:return_commit_stats]
42+
assert_equal 1000, commit_options[:max_commit_delay]
43+
end
44+
3545
def test_no_nested_transactions
3646
transaction.begin
3747

test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,25 @@ def test_read_write_transaction_uses_inlined_begin
3535
}
3636
end
3737

38+
def test_read_write_transaction_with_commit_options
39+
insert_sql = register_insert_singer_result
40+
options_to_test = { return_commit_stats: true, max_commit_delay: 1000 }
41+
# Start a transaction, passing the commit_options.
42+
ActiveRecord::Base.transaction commit_options: options_to_test do
43+
Singer.create(first_name: "Test", last_name: "User")
44+
end
45+
# Find the CommitRequest sent to the mock server.
46+
commit_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::CommitRequest) }
47+
assert_equal 1, commit_requests.length
48+
commit_request = commit_requests.first
49+
refute_nil commit_request
50+
51+
# Assert that the commit_options are present and have the correct values.
52+
assert_equal true, commit_request.return_commit_stats
53+
refute_nil commit_request.max_commit_delay
54+
assert_equal 1, commit_request.max_commit_delay.seconds
55+
end
56+
3857
def test_read_write_transaction_aborted_dml_is_automatically_retried_with_inline_begin
3958
insert_sql = register_insert_singer_result
4059

0 commit comments

Comments
 (0)