diff --git a/CHANGELOG.md b/CHANGELOG.md index ae14d96d..59f63929 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,15 @@ is used by `CommandHandlerHelpers` but in the future might have other uses (such as tailing the event store as events are being committed). +- Use `max(id) + 1` query to assign type ids to avoid exhausing the + SMALLINT range. Sequences are not rolled back when a transaction is + aborted so type ids may become exhausted. This is mainly a problem + in development when running tests, but could occur in production if + a transaction that adds a new type is buggy and gets rolled back + multiple times. + + Re-apply the `sequent_pgsql.sql` file to your Sequent schema to + apply this fix. # Changelog 8.0.2 (changes since 8.0.1) diff --git a/db/sequent_pgsql.sql b/db/sequent_pgsql.sql index 93eff3f6..27d422d9 100644 --- a/db/sequent_pgsql.sql +++ b/db/sequent_pgsql.sql @@ -1,3 +1,7 @@ +ALTER TABLE aggregate_types ALTER COLUMN id DROP IDENTITY IF EXISTS; +ALTER TABLE command_types ALTER COLUMN id DROP IDENTITY IF EXISTS; +ALTER TABLE event_types ALTER COLUMN id DROP IDENTITY IF EXISTS; + DROP TYPE IF EXISTS aggregate_event_type CASCADE; CREATE TYPE aggregate_event_type AS ( aggregate_type text, @@ -95,19 +99,60 @@ BEGIN END; $$; -CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint +CREATE OR REPLACE PROCEDURE update_types(_command jsonb, _aggregates_with_events jsonb) LANGUAGE plpgsql AS $$ DECLARE - _id commands.id%TYPE; - _command_json jsonb = _command->'command_json'; + _type TEXT; BEGIN IF NOT EXISTS (SELECT 1 FROM command_types t WHERE t.type = _command->>'command_type') THEN - -- Only try inserting if it doesn't exist to avoid exhausting the id sequence - INSERT INTO command_types (type) - VALUES (_command->>'command_type') - ON CONFLICT DO NOTHING; + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO command_types (id, type) + VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM command_types), _command->>'command_type') + ON CONFLICT (type) DO NOTHING; END IF; + FOR _type IN ( + SELECT DISTINCT row->0->>'aggregate_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + EXCEPT + SELECT type FROM aggregate_types + ORDER BY 1 + ) LOOP + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE; + LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO aggregate_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM aggregate_types), _type) + ON CONFLICT (type) DO NOTHING; + END LOOP; + + FOR _type IN ( + SELECT DISTINCT events->>'event_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events + EXCEPT + SELECT type FROM event_types + ORDER BY 1 + ) LOOP + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE; + LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE; + LOCK TABLE event_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO event_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM event_types), _type) + ON CONFLICT (type) DO NOTHING; + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + _id commands.id%TYPE; + _command_json jsonb = _command->'command_json'; +BEGIN INSERT INTO commands ( created_at, user_id, aggregate_id, command_type_id, command_json, event_aggregate_id, event_sequence_number @@ -137,28 +182,9 @@ DECLARE _snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE; _unique_keys jsonb; BEGIN - _command_id = store_command(_command); + CALL update_types(_command, _aggregates_with_events); - WITH types AS ( - SELECT DISTINCT row->0->>'aggregate_type' AS type - FROM jsonb_array_elements(_aggregates_with_events) AS row - ) - INSERT INTO aggregate_types (type) - SELECT type FROM types - WHERE type NOT IN (SELECT type FROM aggregate_types) - ORDER BY 1 - ON CONFLICT DO NOTHING; - - WITH types AS ( - SELECT DISTINCT events->>'event_type' AS type - FROM jsonb_array_elements(_aggregates_with_events) AS row - CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events - ) - INSERT INTO event_types (type) - SELECT type FROM types - WHERE type NOT IN (SELECT type FROM event_types) - ORDER BY 1 - ON CONFLICT DO NOTHING; + _command_id = store_command(_command); FOR _aggregate IN SELECT row->0 FROM jsonb_array_elements(_aggregates_with_events) AS row LOOP _aggregate_id = _aggregate->>'aggregate_id'; diff --git a/db/sequent_schema_tables.sql b/db/sequent_schema_tables.sql index 78241f3f..bffaf432 100644 --- a/db/sequent_schema_tables.sql +++ b/db/sequent_schema_tables.sql @@ -1,6 +1,6 @@ -CREATE TABLE command_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); -CREATE TABLE aggregate_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); -CREATE TABLE event_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE command_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE aggregate_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE event_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL); CREATE SEQUENCE IF NOT EXISTS commands_id_seq;