Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds pgt_outbox_setup and pgt_outbox_events for Transactional Outbox Pattern #19

Closed
wants to merge 7 commits into from
49 changes: 49 additions & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,55 @@ function :: The name of the trigger function to call to log changes
Note that it is probably a bad idea to use the same table argument
to both +pgt_json_audit_log_setup+ and +pgt_json_audit_log+.

=== Transactional Outbox Events - pgt_outbox_setup and pgt_outbox_events

These methods setup an outbox table and write events to it when
writes happen to the watched table.

==== pgt_outbox_setup

This creates an outbox table and a trigger function that will write
event data to the outbox table. This returns the name of the
trigger function created, which should be passed to
+pgt_outbox_events+.

Arguments:
table :: The name of the table storing the audit logs.

Options:
function_name :: The name of the trigger function
outbox_table :: The name for the outbox table. Defaults to table_outbox
event_prefix :: The prefix to use for event_type, defaults to table_ (table_updated, table_created, table_deleted)
boolean_completed_column :: If this is true, the :completed column will be boolean, otherwise it will be Time
uuid_primary_key :: Use a uuid type for the primary key of the outbox table
uuid_function :: The pl/pgsql function name to use for generating a uuid pkey. defaults to :uuid_generate_v4
function_opts :: Options to pass to +create_function+ when creating the trigger function.
Column Name Options: (column type in parenthesis)
created_column :: defaults to :created (Time)
updated_column :: defaults to :updated (Time)
attempts_column :: defaults to :attempts (Integer)
attempted_column :: defaults to :attempted (Time)
completed_column :: defaults to :completed (Boolean or Time, depending on :boolean_completed_column)
event_type_column :: defaults to :event_type (String)
last_error_column :: defaults to :last_error (String)
data_before_column :: defaults to :data_before (jsonb)
data_after_column :: defaults to :data_after (jsonb)
metadata_column :: defaults to :metadata (jsonb)

==== pgt_outbox_events

This adds a trigger to the table that will store events in the outbox table
when updates occur on the table (and match the filter).

Arguments:
table :: The name of the table to audit
function :: The name of the trigger function to call to log changes (usually returned from pgt_outbox_setup)

Options:
events :: The events to care about. Defaults to [:updated, :deleted, :created] (all writes)
trigger_name :: The name for the trigger
when :: A filter for the trigger, where clause if you will

== Caveats

If you have defined counter or sum cache triggers using this library
Expand Down
72 changes: 72 additions & 0 deletions lib/sequel/extensions/pg_triggers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,78 @@ def pgt_foreign_key_array(opts={})
SQL
end

def pgt_outbox_setup(table, opts={})
function_name = opts.fetch(:function_name, "pgt_outbox_#{pgt_mangled_table_name(table)}")
outbox_table = opts.fetch(:outbox_table, "#{table}_outbox")
quoted_outbox = quote_schema_table(outbox_table)
event_prefix = opts.fetch(:event_prefix, table)
created_column = opts.fetch(:created_column, :created)
updated_column = opts.fetch(:updated_column, :updated)
completed_column = opts.fetch(:completed_column, :completed)
attempts_column = opts.fetch(:attempts_column, :attempts)
attempted_column = opts.fetch(:attempted_column, :attempted)
event_type_column = opts.fetch(:event_type_column, :event_type)
last_error_column = opts.fetch(:last_error_column, :last_error)
data_after_column = opts.fetch(:data_after_column, :data_after)
data_before_column = opts.fetch(:data_before_column, :data_before)
metadata_column = opts.fetch(:metadata_column, :metadata)
boolean_completed_column = opts.fetch(:boolean_completed_column, false)
uuid_primary_key = opts.fetch(:uuid_primary_key, false)
run 'CREATE EXTENSION IF NOT EXISTS "uuid-ossp"' if uuid_primary_key
create_table(outbox_table) do
if uuid_primary_key
uuid_function = opts.fetch(:uuid_function, :uuid_generate_v4)
uuid :id, default: Sequel.function(uuid_function), primary_key: true
else
primary_key :id
end
Integer attempts_column, null: false, default: 0
Time created_column
Time updated_column
Time attempted_column
if boolean_completed_column
FalseClass completed_column, null: false, default: false
else
Time completed_column
end
String event_type_column, null: false
String last_error_column
jsonb data_before_column
jsonb data_after_column
jsonb metadata_column
index Sequel.asc(created_column)
index Sequel.desc(attempted_column)
end
pgt_created_at outbox_table, created_column
pgt_updated_at outbox_table, updated_column
function_opts = { language: :plpgsql, returns: :trigger, replace: true }.merge(opts.fetch(:function_opts, {}))
create_function(function_name, <<-SQL, function_opts)
BEGIN
#{pgt_pg_trigger_depth_guard_clause(opts)}
IF (TG_OP = 'INSERT') THEN
INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_after_column}") VALUES
('#{event_prefix}_created', to_jsonb(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_before_column}", "#{data_after_column}") VALUES
('#{event_prefix}_updated', to_jsonb(OLD), to_jsonb(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_before_column}") VALUES
('#{event_prefix}_deleted', to_jsonb(OLD));
RETURN OLD;
END IF;
END;
SQL
function_name
end

def pgt_outbox_events(table, function, opts={})
events = opts.fetch(:events, [:insert, :update, :delete])
trigger_name = opts.fetch(:trigger_name, "pgt_outbox_#{pgt_mangled_table_name(table)}")
create_trigger(table, trigger_name, function, events: events, replace: true, each_row: true, after: true, when: opts[:when])
end

private

# Add or replace a function that returns trigger to handle the action,
Expand Down
135 changes: 135 additions & 0 deletions spec/sequel_postgresql_triggers_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,138 @@
h.must_equal(:schema=>"public", :table=>"accounts", :action=>"DELETE", :prior=>{"a"=>3, "id"=>2})
end
end if DB.server_version >= 90400

describe "Basic PostgreSQL Transactional Outbox" do
before do
DB.extension :pg_json
DB.create_table(:accounts){integer :id; String :s}
function_name = DB.pgt_outbox_setup(:accounts, :function_name=>:spgt_outbox_events)
DB.pgt_outbox_events(:accounts, function_name)
@logs = DB[:accounts_outbox].reverse(:created)
end

after do
DB.drop_table(:accounts, :accounts_outbox)
DB.drop_function(:spgt_outbox_events)
end

it "should store outbox events for writes on main table" do
@logs.first.must_be_nil

ds = DB[:accounts]
ds.insert(id: 1, s: 'string')
ds.all.must_equal [{id: 1, s: 'string'}]
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.must_equal(id: 1, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil)

ds.where(id: 1).update(s: 'string2')
ds.all.must_equal [{id: 1, s: 'string2'}]
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.must_equal(id: 2, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil)

ds.delete
ds.all.must_equal []
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.must_equal(id: 3, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil)
end
end if DB.server_version >= 90400

describe "PostgreSQL Transactional Outbox With UUID Pkey" do
before do
DB.extension :pg_json
DB.create_table(:accounts){integer :id; String :s}
function_name = DB.pgt_outbox_setup(:accounts, uuid_primary_key: true, function_name: :spgt_outbox_events)
DB.pgt_outbox_events(:accounts, function_name)
@logs = DB[:accounts_outbox].reverse(:created)
end

after do
DB.drop_table(:accounts, :accounts_outbox)
DB.drop_function(:spgt_outbox_events)
end

it "should store outbox events for writes on main table" do
@logs.first.must_be_nil

ds = DB[:accounts]
ds.insert(id: 1, s: 'string')
ds.all.must_equal [{id: 1, s: 'string'}]
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
id = h.delete(:id)
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil)

ds.where(id: 1).update(s: 'string2')
ds.all.must_equal [{id: 1, s: 'string2'}]
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
id = h.delete(:id)
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil)

ds.delete
ds.all.must_equal []
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
id = h.delete(:id)
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil)
end
end if DB.server_version >= 90400

describe "PostgreSQL Transactional Outbox With Boolean :completed field" do
before do
DB.extension :pg_json
DB.create_table(:accounts){integer :id; String :s}
function_name = DB.pgt_outbox_setup(:accounts, uuid_primary_key: true, boolean_completed_column: true, function_name: :spgt_outbox_events)
DB.pgt_outbox_events(:accounts, function_name)
@logs = DB[:accounts_outbox].reverse(:created)
end

after do
DB.drop_table(:accounts, :accounts_outbox)
DB.drop_function(:spgt_outbox_events)
end

it "should store outbox events for writes on main table" do
@logs.first.must_be_nil

ds = DB[:accounts]
ds.insert(id: 1, s: 'string')
ds.all.must_equal [{id: 1, s: 'string'}]
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
id = h.delete(:id)
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil)

ds.where(id: 1).update(s: 'string2')
ds.all.must_equal [{id: 1, s: 'string2'}]
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
id = h.delete(:id)
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil)

ds.delete
ds.all.must_equal []
h = @logs.first
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
id = h.delete(:id)
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil)
end
end if DB.server_version >= 90400