Skip to content

Commit

Permalink
Merge pull request #435 from zilverline/avoid-exhausting-type-ids
Browse files Browse the repository at this point in the history
Avoid exhausting type ids due to transaction rollbacks
erikrozendaal authored Jan 17, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents a7abf4d + 217e61d commit 6db3b3a
Showing 3 changed files with 66 additions and 31 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -21,6 +21,15 @@
is used by `CommandHandlerHelpers` but in the future might have
other uses (such as tailing the event store as events are being
committed).
- Use `max(id) + 1` query to assign type ids to avoid exhausing the
SMALLINT range. Sequences are not rolled back when a transaction is
aborted so type ids may become exhausted. This is mainly a problem
in development when running tests, but could occur in production if
a transaction that adds a new type is buggy and gets rolled back
multiple times.

Re-apply the `sequent_pgsql.sql` file to your Sequent schema to
apply this fix.

# Changelog 8.0.2 (changes since 8.0.1)

82 changes: 54 additions & 28 deletions db/sequent_pgsql.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
ALTER TABLE aggregate_types ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE command_types ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE event_types ALTER COLUMN id DROP IDENTITY IF EXISTS;

DROP TYPE IF EXISTS aggregate_event_type CASCADE;
CREATE TYPE aggregate_event_type AS (
aggregate_type text,
@@ -95,19 +99,60 @@ BEGIN
END;
$$;

CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint
CREATE OR REPLACE PROCEDURE update_types(_command jsonb, _aggregates_with_events jsonb)
LANGUAGE plpgsql AS $$
DECLARE
_id commands.id%TYPE;
_command_json jsonb = _command->'command_json';
_type TEXT;
BEGIN
IF NOT EXISTS (SELECT 1 FROM command_types t WHERE t.type = _command->>'command_type') THEN
-- Only try inserting if it doesn't exist to avoid exhausting the id sequence
INSERT INTO command_types (type)
VALUES (_command->>'command_type')
ON CONFLICT DO NOTHING;
-- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid
-- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id.
LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE;
INSERT INTO command_types (id, type)
VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM command_types), _command->>'command_type')
ON CONFLICT (type) DO NOTHING;
END IF;

FOR _type IN (
SELECT DISTINCT row->0->>'aggregate_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
EXCEPT
SELECT type FROM aggregate_types
ORDER BY 1
) LOOP
-- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid
-- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id.
LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE;
LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE;
INSERT INTO aggregate_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM aggregate_types), _type)
ON CONFLICT (type) DO NOTHING;
END LOOP;

FOR _type IN (
SELECT DISTINCT events->>'event_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events
EXCEPT
SELECT type FROM event_types
ORDER BY 1
) LOOP
-- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid
-- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id.
LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE;
LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE;
LOCK TABLE event_types IN ACCESS EXCLUSIVE MODE;
INSERT INTO event_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM event_types), _type)
ON CONFLICT (type) DO NOTHING;
END LOOP;
END;
$$;

CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint
LANGUAGE plpgsql AS $$
DECLARE
_id commands.id%TYPE;
_command_json jsonb = _command->'command_json';
BEGIN
INSERT INTO commands (
created_at, user_id, aggregate_id, command_type_id, command_json,
event_aggregate_id, event_sequence_number
@@ -137,28 +182,9 @@ DECLARE
_snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE;
_unique_keys jsonb;
BEGIN
_command_id = store_command(_command);
CALL update_types(_command, _aggregates_with_events);

WITH types AS (
SELECT DISTINCT row->0->>'aggregate_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
)
INSERT INTO aggregate_types (type)
SELECT type FROM types
WHERE type NOT IN (SELECT type FROM aggregate_types)
ORDER BY 1
ON CONFLICT DO NOTHING;

WITH types AS (
SELECT DISTINCT events->>'event_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events
)
INSERT INTO event_types (type)
SELECT type FROM types
WHERE type NOT IN (SELECT type FROM event_types)
ORDER BY 1
ON CONFLICT DO NOTHING;
_command_id = store_command(_command);

FOR _aggregate IN SELECT row->0 FROM jsonb_array_elements(_aggregates_with_events) AS row LOOP
_aggregate_id = _aggregate->>'aggregate_id';
6 changes: 3 additions & 3 deletions db/sequent_schema_tables.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE TABLE command_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE aggregate_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE event_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE command_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE aggregate_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE event_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL);

CREATE SEQUENCE IF NOT EXISTS commands_id_seq;

0 comments on commit 6db3b3a

Please sign in to comment.