diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index 489431c11..ef726bc0c 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -59,7 +59,8 @@ defmodule Realtime.Tenants.Migrations do FixWalrusRoleHandling, UnloggedMessagesTable, LoggedMessagesTable, - FilterDeletePostgresChanges + FilterDeletePostgresChanges, + AddPayloadToMessages } @migrations [ @@ -109,7 +110,8 @@ defmodule Realtime.Tenants.Migrations do {20_240_618_124_746, FixWalrusRoleHandling}, {20_240_801_235_015, UnloggedMessagesTable}, {20_240_805_133_720, LoggedMessagesTable}, - {20_240_827_160_934, FilterDeletePostgresChanges} + {20_240_827_160_934, FilterDeletePostgresChanges}, + {20_240_917_170_412, AddPayloadToMessages} ] defstruct [:tenant_external_id, :settings] @spec run_migrations(map()) :: :ok | {:error, any()} diff --git a/lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex b/lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex new file mode 100644 index 000000000..9d3731181 --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex @@ -0,0 +1,62 @@ +defmodule Realtime.Tenants.Migrations.AddPayloadToMessages do + @moduledoc false + use Ecto.Migration + + def change do + alter table(:messages) do + add_if_not_exists :payload, :map + add_if_not_exists :event, :text + add_if_not_exists :topic, :text + modify :inserted_at, :utc_datetime, default: fragment("now()") + modify :updated_at, :utc_datetime, default: fragment("now()") + end + + execute """ + CREATE OR REPLACE FUNCTION realtime.broadcast () + RETURNS TRIGGER + AS $$ + DECLARE + -- Declare a variable to hold the JSONB representation of the row + row_data jsonb := '{}'::jsonb; + -- Declare entry that will be written to the realtime.messages table + topic_name text := TG_ARGV[0]::text; + event_name text := COALESCE(TG_ARGV[1]::text, TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME); + BEGIN + -- Ensure topic_name is provided + IF topic_name IS NULL THEN + RAISE EXCEPTION 'Topic name must be provided'; + END IF; + -- Check the operation type and handle accordingly + IF TG_OP = 'INSERT' THEN + row_data := jsonb_set(row_data, '{old_data}', '{}'::jsonb); + row_data := jsonb_set(row_data, '{new_data}', to_jsonb (NEW)); + ELSIF TG_OP = 'DELETE' THEN + row_data := jsonb_set(row_data, '{old_data}', to_jsonb (OLD)); + row_data := jsonb_set(row_data, '{new_data}', '{}'::jsonb); + ELSIF TG_OP = 'UPDATE' THEN + row_data := jsonb_set(row_data, '{old_data}', to_jsonb (OLD)); + row_data := jsonb_set(row_data, '{new_data}', to_jsonb (NEW)); + ELSE + RAISE EXCEPTION 'Unexpected operation type: %', TG_OP; + END IF; + + -- Add the operation type, table and schema to the JSONB data + row_data := jsonb_set(row_data, '{operation}', to_jsonb (TG_OP)); + row_data := jsonb_set(row_data, '{table}', to_jsonb (TG_TABLE_NAME)); + row_data := jsonb_set(row_data, '{schema}', to_jsonb (TG_TABLE_SCHEMA)); + + -- Insert the JSONB data into the realtime.messages table + INSERT INTO realtime.messages (payload, event, topic, extension) + VALUES (row_data, event_name, topic_name, 'outbox'); + RETURN NULL; + -- Triggers should return NULL unless they are `BEFORE` triggers + EXCEPTION + WHEN OTHERS THEN + RAISE EXCEPTION 'Failed to process the row: %', SQLERRM; + END; + + $$ + LANGUAGE plpgsql; + """ + end +end