Skip to content

Commit 1b04c70

Browse files
authored
feat: Adding support for exclude_txn_from_change_streams option (#368)
* feat: Adding support for exclude_txn_from_change_streams option This allows developers to selectively prevent specific transactions from being recorded in a database's change stream. Usage Example: # This transaction will NOT be captured by any change stream. Singer.transaction(exclude_txn_from_change_streams: true) do # Perform high-volume or low-value writes here. Singer.upsert_all(...) end Fixes #367
1 parent 0a1020a commit 1b04c70

File tree

5 files changed

+52
-7
lines changed

5 files changed

+52
-7
lines changed

lib/active_record/connection_adapters/spanner/database_statements.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,10 @@ def execute_ddl statements
231231

232232
def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs
233233
commit_options = kwargs.delete :commit_options
234-
234+
exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams
235+
@_spanner_begin_transaction_options = {
236+
exclude_txn_from_change_streams: exclude_from_streams
237+
}
235238
if !requires_new && current_transaction.joinable?
236239
return super
237240
end
@@ -253,6 +256,9 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs
253256
retry
254257
end
255258
raise
259+
ensure
260+
# Clean up the instance variable to avoid leaking options.
261+
@_spanner_begin_transaction_options = nil
256262
end
257263
end
258264

@@ -272,7 +278,8 @@ def transaction_isolation_levels
272278

273279
def begin_db_transaction
274280
log "BEGIN" do
275-
@connection.begin_transaction
281+
opts = @_spanner_begin_transaction_options || {}
282+
@connection.begin_transaction nil, **opts
276283
end
277284
end
278285

@@ -306,7 +313,8 @@ def begin_isolated_db_transaction isolation
306313
end
307314

308315
log "BEGIN #{isolation}" do
309-
@connection.begin_transaction isolation
316+
opts = @_spanner_begin_transaction_options || {}
317+
@connection.begin_transaction isolation, **opts
310318
end
311319
end
312320

lib/activerecord_spanner_adapter/connection.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,11 @@ def create_transaction_after_failed_first_statement original_error
276276

277277
# Transactions
278278

279-
def begin_transaction isolation = nil
279+
def begin_transaction isolation = nil, **options
280280
raise "Nested transactions are not allowed" if current_transaction&.active?
281-
self.current_transaction = Transaction.new self, isolation || @isolation_level
281+
exclude_from_streams = options.fetch :exclude_txn_from_change_streams, false
282+
self.current_transaction = Transaction.new self, isolation || @isolation_level,
283+
exclude_txn_from_change_streams: exclude_from_streams
282284
current_transaction.begin
283285
current_transaction
284286
end

lib/activerecord_spanner_adapter/transaction.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,20 @@ module ActiveRecordSpannerAdapter
88
class Transaction
99
attr_reader :state
1010
attr_reader :commit_options
11+
attr_reader :begin_transaction_selector
12+
attr_accessor :exclude_txn_from_change_streams
1113

1214

1315

14-
def initialize connection, isolation, commit_options = nil
16+
def initialize connection, isolation, commit_options = nil, exclude_txn_from_change_streams: false
1517
@connection = connection
1618
@isolation = isolation
1719
@committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash)
1820
@state = :INITIALIZED
1921
@sequence_number = 0
2022
@mutations = []
2123
@commit_options = commit_options
24+
@exclude_txn_from_change_streams = exclude_txn_from_change_streams
2225
end
2326

2427
def active?
@@ -63,7 +66,8 @@ def begin
6366
@begin_transaction_selector = Google::Cloud::Spanner::V1::TransactionSelector.new \
6467
begin: Google::Cloud::Spanner::V1::TransactionOptions.new(
6568
read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new,
66-
isolation_level: grpc_isolation
69+
isolation_level: grpc_isolation,
70+
exclude_txn_from_change_streams: @exclude_txn_from_change_streams
6771
)
6872
end
6973
@state = :STARTED

test/activerecord_spanner_adapter/transaction_test.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ def test_commit_options
4242
assert_equal 1000, commit_options[:max_commit_delay]
4343
end
4444

45+
def test_exclude_txn_from_change_streams
46+
transaction.exclude_txn_from_change_streams = true
47+
assert transaction.exclude_txn_from_change_streams
48+
transaction.begin
49+
assert_equal true, transaction.begin_transaction_selector.begin.exclude_txn_from_change_streams
50+
transaction.commit
51+
assert_equal :COMMITTED, transaction.state
52+
end
53+
4554
def test_no_nested_transactions
4655
transaction.begin
4756

test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,6 +1335,28 @@ def test_upsert_all_dml
13351335
assert_equal 1, execute_requests.length
13361336
end
13371337

1338+
def test_create_with_sequence_and_exclude_from_change_streams
1339+
sql = "INSERT INTO `table_with_sequence` (`name`) VALUES (@p1) THEN RETURN `id`"
1340+
@mock.put_statement_result sql, MockServerTests::create_id_returning_result_set(1, 1)
1341+
1342+
record = TableWithSequence.transaction(exclude_txn_from_change_streams: true) do
1343+
TableWithSequence.create(name: "Foo")
1344+
end
1345+
1346+
assert_equal 1, record.id
1347+
execute_requests = @mock.requests.select do |req|
1348+
req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == sql
1349+
end
1350+
1351+
assert_equal 1, execute_requests.length
1352+
exec_req = execute_requests.first
1353+
1354+
refute_nil exec_req.transaction
1355+
begin_opts = exec_req.transaction&.begin
1356+
refute_nil begin_opts
1357+
assert_equal true, begin_opts.exclude_txn_from_change_streams
1358+
end
1359+
13381360
def test_binary_id
13391361
user = User.create!(
13401362

0 commit comments

Comments
 (0)