Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay optimized postgres persistor fixes #403

Merged
merged 20 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
938b3a1
Ensure every record class has an index on aggregate_id as documented
erikrozendaal Jan 24, 2024
8a7873a
Add simple performance test to ensure index is used correctly
erikrozendaal Jan 24, 2024
5ca8bcd
Indexes should not use hashed keys
erikrozendaal Jan 24, 2024
de4b0f5
Optimize index lookup
erikrozendaal Jan 24, 2024
6528da5
Simplify index key generation
erikrozendaal Jan 24, 2024
c5d52f7
Rubocop Hash#compare_by_identity instead of using object_id
erikrozendaal Jan 24, 2024
a47d02a
More (ugly) rubocop fixes
erikrozendaal Jan 24, 2024
5745993
Do not replace the reverse index compare by identity hash
erikrozendaal Jan 25, 2024
754685c
Remove some unnecessary string to symbol conversion
erikrozendaal Jan 25, 2024
26c3168
Use meta-programming instead of eval to generate structs
erikrozendaal Jan 25, 2024
0d97408
Make the struct cache a normal instance variable
erikrozendaal Jan 25, 2024
614e62f
Clarified overriding of equality/hash for in-memory structs
erikrozendaal Jan 25, 2024
8334836
Store indexed records using sets as well
erikrozendaal Jan 25, 2024
3af3926
Fix context scope
erikrozendaal Jan 25, 2024
d891a29
Compare values indifferently w.r.t. strings and symbols
erikrozendaal Jan 25, 2024
56633ee
Use one index per indexed attribute
erikrozendaal Jan 29, 2024
cf0decb
Use symbols instead of strings as index field names
erikrozendaal Jan 29, 2024
90dd671
Store index fields as symbols instead of strings
erikrozendaal Jan 29, 2024
03f4a3f
Split where-clause into indexed and non-indexed columns
erikrozendaal Jan 29, 2024
67f8ff0
Optimize record set union and intersection
erikrozendaal Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 42 additions & 46 deletions lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,39 +72,34 @@ def set_values(values)
class Index
def initialize(indexed_columns)
@indexed_columns = Hash.new do |hash, record_class|
hash[record_class] = if record_class.column_names.include? 'aggregate_id'
['aggregate_id']
else
[]
end
hash[record_class] = default_indexes(record_class)
end

@indexed_columns = @indexed_columns.merge(
indexed_columns.reduce({}) do |memo, (key, ics)|
memo.merge({key => ics.map { |c| c.map(&:to_s) }})
end,
)
indexed_columns.each do |record_class, indexes|
normalized = indexes.map { |index| index.map(&:to_s).sort }
@indexed_columns[record_class] = (normalized + default_indexes(record_class)).uniq
end

@index = {}
@reverse_index = {}
@reverse_index = {}.compare_by_identity
end

def add(record_class, record)
return unless indexed?(record_class)

get_keys(record_class, record).each do |key|
@index[key.hash] = [] unless @index.key? key.hash
@index[key.hash] << record
@index[key] = [] unless @index.key? key
@index[key] << record

@reverse_index[record.object_id.hash] = [] unless @reverse_index.key? record.object_id.hash
@reverse_index[record.object_id.hash] << key.hash
@reverse_index[record] = [] unless @reverse_index.key? record
@reverse_index[record] << key
end
end

def remove(record_class, record)
return unless indexed?(record_class)

keys = @reverse_index.delete(record.object_id.hash) { [] }
keys = @reverse_index.delete(record) { [] }

return unless keys.any?

Expand All @@ -120,12 +115,12 @@ def update(record_class, record)
end

def find(record_class, where_clause)
key = [record_class.name]
get_index(record_class, where_clause).each do |field|
key << field
key << where_clause.stringify_keys[field]
end
@index[key.hash] || []
index = get_index(record_class, where_clause)
return nil unless index

normalized_where_clause = where_clause.stringify_keys
key = [record_class.name, index] + index.map { |field| normalized_where_clause[field] }
@index[key] || []
end

def clear
Expand All @@ -134,29 +129,34 @@ def clear
end

def use_index?(record_class, where_clause)
@indexed_columns.key?(record_class) && get_index(record_class, where_clause).present?
indexed?(record_class) && get_index(record_class, where_clause).present?
end

private

def indexed?(record_class)
@indexed_columns.key?(record_class)
# Do not use `key?` here or similar, since the
# `@indexed_columns#default_proc` automatically adds new
# indexes as required.
@indexed_columns[record_class].present?
end

def get_keys(record_class, record)
@indexed_columns[record_class].map do |index|
arr = [record_class.name]
index.each do |key|
arr << key
arr << record[key]
end
arr
[record_class.name, index] + index.map { |field| record[field] }
end
end

def get_index(record_class, where_clause)
@indexed_columns[record_class].find do |indexed_where|
where_clause.keys.size == indexed_where.size && (where_clause.keys.map(&:to_s) - indexed_where).empty?
where_clause_keys = where_clause.keys.map(&:to_s).sort
@indexed_columns[record_class].find { |index| index == where_clause_keys }
end

def default_indexes(record_class)
if record_class.column_names.include? 'aggregate_id'
[['aggregate_id']]
else
[]
end
end
end
Expand Down Expand Up @@ -287,22 +287,18 @@ def do_with_record(record_class, where_clause)
end

def find_records(record_class, where_clause)
if @record_index.use_index?(record_class, where_clause)
@record_index.find(record_class, where_clause)
else
@record_store[record_class].select do |record|
where_clause.all? do |k, v|
expected_value = v.is_a?(Symbol) ? v.to_s : v
actual_value = record[k.to_sym]
actual_value = actual_value.to_s if actual_value.is_a? Symbol
if expected_value.is_a?(Array)
expected_value.include?(actual_value)
else
actual_value == expected_value
end
(@record_index.find(record_class, where_clause) || @record_store[record_class].select do |record|
lvonk marked this conversation as resolved.
Show resolved Hide resolved
where_clause.all? do |k, v|
expected_value = v.is_a?(Symbol) ? v.to_s : v
actual_value = record[k.to_sym]
actual_value = actual_value.to_s if actual_value.is_a? Symbol
if expected_value.is_a?(Array)
expected_value.include?(actual_value)
else
actual_value == expected_value
end
end
end.dup
end).dup
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this dup only needs to happen when the records are found in an index. So I propose moving this to Index#find.

Also, can it just be freeze and let the caller dup if required? Or is this too much of a trap?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this dup a bit weird indeed. I can't really recall why this is needed, and can't find anything in old commit messages unfortunately. It has been in there since the beginning. Since it is only the array that is dupped, not the objects in it, I am not sure what it supposed to protect against....

end

def last_record(record_class, where_clause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ def initialize
end
end

def measure_elapsed_time(&block)
starting = Process.clock_gettime(Process::CLOCK_MONOTONIC)
yield block
ending = Process.clock_gettime(Process::CLOCK_MONOTONIC)
ending - starting
end

describe Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor do
let(:persistor) { Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor.new }
let(:record_class) { Sequent::Core::EventRecord }
Expand Down Expand Up @@ -378,17 +385,64 @@ class ReplayOptimizedPostgresTest < Sequent::ApplicationRecord; end
end
end

describe Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor::Index do
describe '#use_index?' do
let(:indices) { [] }
let(:index) do
Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor::Index.new(
{
Sequent::Core::EventRecord => indices,
},
context 'with thousands of records' do
COUNT = 1000
ITERATIONS = 10
MAX_TIME_S = 1

let(:persistor) do
Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor.new(
50,
{
Sequent::Core::EventRecord => [%i[id command_record_id], %i[id sequence_number]],
},
)
end
let(:aggregate_ids) { (0...COUNT).map { Sequent.new_uuid } }

before do
aggregate_ids.each_with_index do |aggregate_id, i|
persistor.create_record(
Sequent::Core::EventRecord,
{id: i, aggregate_id: aggregate_id, command_record_id: i * 7},
)
end
end

it 'performs well using an aggregate_id lookup' do
elapsed = measure_elapsed_time do
ITERATIONS.times do
aggregate_ids.each do |aggregate_id|
expect(persistor.get_record(Sequent::Core::EventRecord, {aggregate_id: aggregate_id})).to be_present
end
end
end
expect(elapsed).to be <= MAX_TIME_S
end

it 'performs well using a multi-index lookup' do
elapsed = measure_elapsed_time do
ITERATIONS.times do
(0...COUNT).each do |i|
expect(persistor.get_record(Sequent::Core::EventRecord, {id: i, command_record_id: i * 7})).to be_present
end
end
end
expect(elapsed).to be <= MAX_TIME_S
end
end

describe Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor::Index do
let(:indices) { [] }
let(:index) do
Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor::Index.new(
{
Sequent::Core::EventRecord => indices,
},
)
end

describe '#use_index?' do
context 'symbolized single indices' do
let(:indices) { [[:id]] }
it 'uses the index for strings and symbols where clause' do
Expand Down Expand Up @@ -427,6 +481,27 @@ class ReplayOptimizedPostgresTest < Sequent::ApplicationRecord; end
expect(index.use_index?(Sequent::Core::EventRecord, {sequence_number: 1})).to be_falsey
expect(index.use_index?(Sequent::Core::EventRecord, {id: 1, sequence_number: 1})).to be_falsey
end

context 'duplicate indexes' do
let(:indices) { [%i[aggregate_id], %i[command_record_id id], %i[id command_record_id]] }
it 'are removed' do
expect(index.instance_variable_get(:@indexed_columns)[Sequent::Core::EventRecord])
.to match_array [['aggregate_id'], %w[command_record_id id]]
end
end
end

context 'default index when record class is specified' do
it 'adds a default index for aggregate_id' do
expect(index.use_index?(Sequent::Core::EventRecord, {aggregate_id: 1})).to be_truthy
end
end

context 'default index when record class is not specified' do
let(:index) { Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor::Index.new({}) }
it 'adds a default index for aggregate_id' do
expect(index.use_index?(Sequent::Core::EventRecord, {aggregate_id: 1})).to be_truthy
end
end

context 'where clause order' do
Expand All @@ -437,5 +512,24 @@ class ReplayOptimizedPostgresTest < Sequent::ApplicationRecord; end
end
end
end

context 'duplicate hash values' do
class BadHash < Struct.new(:value)
def hash
0
end
end

it 'should not match records even when hash collision occurs' do
one = persistor.create_record(Sequent::Core::EventRecord, aggregate_id: BadHash.new(1), sequence_number: 1)
two = persistor.create_record(Sequent::Core::EventRecord, aggregate_id: BadHash.new(2), sequence_number: 1)

index.add(Sequent::Core::EventRecord, one)
index.add(Sequent::Core::EventRecord, two)

expect(index.find(Sequent::Core::EventRecord, {aggregate_id: one.aggregate_id})).to match_array [one]
expect(index.find(Sequent::Core::EventRecord, {aggregate_id: two.aggregate_id})).to match_array [two]
end
end
end
end
Loading