Skip to content

Commit

Permalink
feat: Add broadcast functionality from triggers
Browse files Browse the repository at this point in the history
Adds a new functionality to broadcast db changes. It adds:
* New function to broadcast from postgres using triggers
  • Loading branch information
filipecabaco committed Sep 17, 2024
1 parent bcfc92a commit af3caf2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
6 changes: 4 additions & 2 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ defmodule Realtime.Tenants.Migrations do
FixWalrusRoleHandling,
UnloggedMessagesTable,
LoggedMessagesTable,
FilterDeletePostgresChanges
FilterDeletePostgresChanges,
AddPayloadToMessages
}

@migrations [
Expand Down Expand Up @@ -109,7 +110,8 @@ defmodule Realtime.Tenants.Migrations do
{20_240_618_124_746, FixWalrusRoleHandling},
{20_240_801_235_015, UnloggedMessagesTable},
{20_240_805_133_720, LoggedMessagesTable},
{20_240_827_160_934, FilterDeletePostgresChanges}
{20_240_827_160_934, FilterDeletePostgresChanges},
{20_240_917_170_412, AddPayloadToMessages}
]
defstruct [:tenant_external_id, :settings]
@spec run_migrations(map()) :: :ok | {:error, any()}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule Realtime.Tenants.Migrations.AddPayloadToMessages do
@moduledoc false
use Ecto.Migration

def change do
alter table(:messages) do
add_if_not_exists :payload, :map
add_if_not_exists :event, :text
add_if_not_exists :topic, :text
modify :inserted_at, :utc_datetime, default: fragment("now()")
modify :updated_at, :utc_datetime, default: fragment("now()")
end

execute """
CREATE OR REPLACE FUNCTION realtime.broadcast ()
RETURNS TRIGGER
AS $$
DECLARE
-- Declare a variable to hold the JSONB representation of the row
row_data jsonb := '{}'::jsonb;
-- Declare entry that will be written to the realtime.messages table
topic_name text := TG_ARGV[0]::text;
event_name text := COALESCE(TG_ARGV[1]::text, TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME);
BEGIN
-- Ensure topic_name is provided
IF topic_name IS NULL THEN
RAISE EXCEPTION 'Topic name must be provided';
END IF;
-- Check the operation type and handle accordingly
IF TG_OP = 'INSERT' THEN
row_data := jsonb_set(row_data, '{old_data}', '{}'::jsonb);
row_data := jsonb_set(row_data, '{new_data}', to_jsonb (NEW));
ELSIF TG_OP = 'DELETE' THEN
row_data := jsonb_set(row_data, '{old_data}', to_jsonb (OLD));
row_data := jsonb_set(row_data, '{new_data}', '{}'::jsonb);
ELSIF TG_OP = 'UPDATE' THEN
row_data := jsonb_set(row_data, '{old_data}', to_jsonb (OLD));
row_data := jsonb_set(row_data, '{new_data}', to_jsonb (NEW));
ELSE
RAISE EXCEPTION 'Unexpected operation type: %', TG_OP;
END IF;
-- Add the operation type, table and schema to the JSONB data
row_data := jsonb_set(row_data, '{operation}', to_jsonb (TG_OP));
row_data := jsonb_set(row_data, '{table}', to_jsonb (TG_TABLE_NAME));
row_data := jsonb_set(row_data, '{schema}', to_jsonb (TG_TABLE_SCHEMA));
-- Insert the JSONB data into the realtime.messages table
INSERT INTO realtime.messages (payload, event, topic, extension)
VALUES (row_data, event_name, topic_name, 'outbox');
RETURN NULL;
-- Triggers should return NULL unless they are `BEFORE` triggers
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'Failed to process the row: %', SQLERRM;
END;
$$
LANGUAGE plpgsql;
"""
end
end

0 comments on commit af3caf2

Please sign in to comment.