Skip to content

Commit

Permalink
add replication streaming on user connect
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Sep 27, 2024
1 parent 25a965b commit 69ce712
Show file tree
Hide file tree
Showing 22 changed files with 740 additions and 36 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
6 changes: 5 additions & 1 deletion lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ defmodule Realtime.Api.Message do
schema "messages" do
field :topic, :string
field :extension, Ecto.Enum, values: [:broadcast, :presence]
field :payload, :map
field :event, :string
field :private, :boolean

timestamps()
end

def changeset(message, attrs) do
message
|> cast(attrs, [:topic, :extension, :inserted_at, :updated_at])
|> cast(attrs, [:topic, :extension, :payload, :event, :private, :inserted_at, :updated_at])
|> validate_required([:topic, :extension])
|> put_timestamp(:updated_at)
|> maybe_put_timestamp(:inserted_at)
Expand Down
22 changes: 12 additions & 10 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,6 @@ defmodule Realtime.Application do

Realtime.PromEx.set_metrics_tags()

Registry.start_link(
keys: :duplicate,
name: Realtime.Registry
)

Registry.start_link(
keys: :unique,
name: Realtime.Registry.Unique
)

:syn.set_event_handler(Realtime.SynHandler)

:ok = :syn.add_node_to_scopes([Realtime.Tenants.Connect])
Expand All @@ -73,6 +63,10 @@ defmodule Realtime.Application do
Realtime.GenCounter.DynamicSupervisor,
Realtime.RateCounter.DynamicSupervisor,
Realtime.Latency,
{Registry, keys: :duplicate, name: Realtime.Registry},
{Registry, keys: :unique, name: Realtime.Registry.Unique},
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Handler.Registry},
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Listener.Registry},
{Task.Supervisor, name: Realtime.TaskSupervisor},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
Expand All @@ -85,6 +79,14 @@ defmodule Realtime.Application do
max_restarts: 5},
{DynamicSupervisor,
name: Realtime.Tenants.Migrations.DynamicSupervisor, strategy: :one_for_one},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.BroadcastChanges.Listener.DynamicSupervisor},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.BroadcastChanges.Handler.DynamicSupervisor},
RealtimeWeb.Endpoint,
RealtimeWeb.Presence
] ++ extensions_supervisors()
Expand Down
199 changes: 199 additions & 0 deletions lib/realtime/broadcast_changes/handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
defmodule Realtime.BroadcastChanges.Handler do
use GenServer
require Logger

import Realtime.Adapters.Postgres.Protocol
import Realtime.Adapters.Postgres.Decoder
import Realtime.Helpers, only: [log_error: 2]

alias Realtime.Adapters.Postgres.Decoder
alias Realtime.Adapters.Postgres.Protocol.KeepAlive
alias Realtime.Adapters.Postgres.Protocol.Write
alias Realtime.Api.Tenant
alias Realtime.BroadcastChanges.PostgresReplication
alias Realtime.Database
alias Realtime.Tenants.BatchBroadcast

defstruct [:tenant_id, relations: %{}, buffer: []]

@behaviour PostgresReplication.Handler

def start(%Tenant{external_id: tenant_id} = tenant, opts \\ []) do
supervisor_spec =
{:via, PartitionSupervisor,
{Realtime.BroadcastChanges.Handler.DynamicSupervisor, tenant_id}}

connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true)
name = {:via, Registry, {Realtime.BroadcastChanges.Handler.Registry, tenant_id}}
opts = [tenant_id: tenant_id, connection_opts: connection_opts, name: name] ++ opts
chidlren_spec = {__MODULE__, opts}
Logger.info("Initializing handler for #{tenant_id}")

case DynamicSupervisor.start_child(supervisor_spec, chidlren_spec) do
{:ok, pid} ->
{:ok, pid}

{:error, {:already_started, pid}} ->
{:ok, pid}

error ->
log_error("UnableToStartHandler", error)
{:error, :handler_failed_to_start}
end
end

@impl true
def call(message, metadata) when is_write(message) do
%{tenant_id: tenant_id} = metadata
%Write{message: message} = parse(message)

case Registry.lookup(Realtime.BroadcastChanges.Handler.Registry, tenant_id) do
[{pid, _}] ->
message
|> decode_message()
|> then(&send(pid, &1))

_ ->
Logger.warning("Unable to find BroadcastChanges for tenant: #{tenant_id}")
:ok
end

:noreply
end

def call(message, _metadata) when is_keep_alive(message) do
%KeepAlive{reply: reply, wal_end: wal_end} = parse(message)
wal_end = wal_end + 1

message =
case reply do
:now -> standby_status(wal_end, wal_end, wal_end, reply)
:later -> hold()
end

{:reply, message}
end

def call(msg, state) do
Logger.info("Unknown message received: #{inspect(%{msg: parse(msg), state: state})}")
:noreply
end

@impl true

def handle_info(%Decoder.Messages.Relation{} = msg, state) do
%Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
%{relations: relations} = state
relation = %{name: name, columns: columns, namespace: namespace}
relations = Map.put(relations, id, relation)
{:noreply, %{state | relations: relations}}
end

def handle_info(%Decoder.Messages.Insert{} = msg, state) do
%Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
%{buffer: buffer, relations: relations} = state

case Map.get(relations, relation_id) do
%{columns: columns} ->
to_broadcast =
tuple_data
|> Tuple.to_list()
|> Enum.zip(columns)
|> Enum.map(fn
{nil, %{name: name}} -> {name, nil}
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
{value, %{name: name}} -> {name, value}
end)
|> Map.new()

topic = Map.get(to_broadcast, "topic")
private = Map.get(to_broadcast, "private")
event = Map.get(to_broadcast, "event")
payload = Map.get(to_broadcast, "payload")
id = Map.get(to_broadcast, "id")

payload = Map.put(payload, "id", id)

to_broadcast = %{
topic: topic,
event: event,
private: private,
payload: payload
}

buffer = buffer ++ [to_broadcast]
{:noreply, %{state | buffer: buffer}}

_ ->
log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}")
{:noreply, state}
end
end

def handle_info(%Decoder.Messages.Commit{}, %{buffer: []} = state) do
{:noreply, state}
end

def handle_info(%Decoder.Messages.Commit{}, state) do
%{buffer: buffer, tenant_id: tenant_id} = state
tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)

case BatchBroadcast.broadcast(nil, tenant, %{messages: buffer}, true) do
:ok -> :ok
error -> log_error("UnableToBatchBroadcastChanges", error)
end

{:noreply, %{state | buffer: []}}
end

def handle_info(_, state), do: {:noreply, state}
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, opts)

@impl true
def init(opts) do
Logger.info("Initializing connection with the status: #{inspect(opts)}")

tenant_id = Keyword.fetch!(opts, :tenant_id)
connection_opts = Keyword.fetch!(opts, :connection_opts)

supervisor =
{:via, PartitionSupervisor,
{Realtime.BroadcastChanges.Listener.DynamicSupervisor, tenant_id}}

name = {:via, Registry, {Realtime.BroadcastChanges.Listener.Registry, tenant_id}}

configuration = %PostgresReplication{
connection_opts: [
hostname: connection_opts.host,
username: connection_opts.user,
password: connection_opts.pass,
database: connection_opts.name,
port: connection_opts.port,
parameters: [
application_name: connection_opts.application_name
]
],
table: "messages",
schema: "realtime",
handler_module: __MODULE__,
opts: [name: name],
metadata: %{tenant_id: tenant_id}
}

children_spec = {PostgresReplication, configuration}
state = %__MODULE__{tenant_id: tenant_id, buffer: [], relations: %{}}

case DynamicSupervisor.start_child(supervisor, children_spec) do
{:ok, _pid} ->
{:ok, state}

{:error, {:already_started, _pid}} ->
{:ok, state}

error ->
log_error("UnableToStartPostgresReplication", error)
{:stop, :shutdown}
end
end
end
Loading

0 comments on commit 69ce712

Please sign in to comment.