-
-
Notifications
You must be signed in to change notification settings - Fork 315
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add broadcast functionality from triggers
Adds a new functionality to broadcast db changes. It adds: * New function to broadcast from postgres using triggers
- Loading branch information
1 parent
bcfc92a
commit ef0d4ca
Showing
3 changed files
with
78 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
lib/realtime/tenants/repo/migrations/20240917170412_add_payload_to_messages.ex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
defmodule Realtime.Tenants.Migrations.AddPayloadToMessages do | ||
@moduledoc false | ||
use Ecto.Migration | ||
|
||
def change do | ||
alter table(:messages) do | ||
add_if_not_exists :payload, :map | ||
add_if_not_exists :event, :text | ||
add_if_not_exists :topic, :text | ||
modify :inserted_at, :utc_datetime, default: fragment("now()") | ||
modify :updated_at, :utc_datetime, default: fragment("now()") | ||
end | ||
|
||
execute """ | ||
CREATE OR REPLACE FUNCTION realtime.send(payload jsonb, event text, topic text, extension text DEFAULT 'broadcast') | ||
RETURNS void | ||
AS $$ | ||
BEGIN | ||
INSERT INTO realtime.messages (payload, event, topic, extension) | ||
VALUES (payload, event, topic, extension); | ||
END; | ||
$$ | ||
LANGUAGE plpgsql; | ||
""" | ||
|
||
execute """ | ||
CREATE OR REPLACE FUNCTION realtime.broadcast_change () | ||
RETURNS TRIGGER | ||
AS $$ | ||
DECLARE | ||
-- Declare a variable to hold the JSONB representation of the row | ||
row_data jsonb := '{}'::jsonb; | ||
-- Declare entry that will be written to the realtime.messages table | ||
topic_name text := TG_ARGV[0]::text; | ||
event_name text := COALESCE(TG_ARGV[1]::text, TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME); | ||
BEGIN | ||
-- Ensure topic_name is provided | ||
IF topic_name IS NULL THEN | ||
RAISE EXCEPTION 'Topic name must be provided'; | ||
END IF; | ||
-- Check the operation type and handle accordingly | ||
IF TG_OP = 'INSERT' THEN | ||
row_data := jsonb_set(row_data, '{old_data}', '{}'::jsonb); | ||
row_data := jsonb_set(row_data, '{new_data}', to_jsonb (NEW)); | ||
ELSIF TG_OP = 'DELETE' THEN | ||
row_data := jsonb_set(row_data, '{old_data}', to_jsonb (OLD)); | ||
row_data := jsonb_set(row_data, '{new_data}', '{}'::jsonb); | ||
ELSIF TG_OP = 'UPDATE' THEN | ||
row_data := jsonb_set(row_data, '{old_data}', to_jsonb (OLD)); | ||
row_data := jsonb_set(row_data, '{new_data}', to_jsonb (NEW)); | ||
ELSE | ||
RAISE EXCEPTION 'Unexpected operation type: %', TG_OP; | ||
END IF; | ||
-- Add the operation type, table and schema to the JSONB data | ||
row_data := jsonb_set(row_data, '{operation}', to_jsonb (TG_OP)); | ||
row_data := jsonb_set(row_data, '{table}', to_jsonb (TG_TABLE_NAME)); | ||
row_data := jsonb_set(row_data, '{schema}', to_jsonb (TG_TABLE_SCHEMA)); | ||
-- Insert the JSONB data into the realtime.messages table | ||
PERFORM realtime.send(row_data, event_name, topic_name); | ||
RETURN NULL; | ||
-- Triggers should return NULL unless they are `BEFORE` triggers | ||
EXCEPTION | ||
WHEN OTHERS THEN | ||
RAISE EXCEPTION 'Failed to process the row: %', SQLERRM; | ||
END; | ||
$$ | ||
LANGUAGE plpgsql; | ||
""" | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters