Skip to content

Commit

Permalink
add replication streaming on user connect
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Oct 2, 2024
1 parent 25a965b commit 7dc2e04
Show file tree
Hide file tree
Showing 27 changed files with 1,198 additions and 160 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ This is the list of operational codes that can help you understand your deployme
| UnableToProcessListenPayload | Payload sent in NOTIFY operation was JSON parsable |
| UnableToListenToTenantDatabase | Unable to LISTEN for notifications against the Tenant Database |
| UnprocessableEntity | Received a HTTP request with a body that was not able to be processed by the endpoint |
| InitializingProjectConnection | Connection against Tenant database is still starting |
| ErrorOnRpcCall | Error when calling another realtime node |
| ErrorExecutingTransaction | Error executing a database transaction in tenant database |
| SynInitializationError | Our framework to syncronize processes has failed to properly startup a connection to the database |
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
6 changes: 5 additions & 1 deletion lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ defmodule Realtime.Api.Message do
schema "messages" do
field :topic, :string
field :extension, Ecto.Enum, values: [:broadcast, :presence]
field :payload, :map
field :event, :string
field :private, :boolean

timestamps()
end

def changeset(message, attrs) do
message
|> cast(attrs, [:topic, :extension, :inserted_at, :updated_at])
|> cast(attrs, [:topic, :extension, :payload, :event, :private, :inserted_at, :updated_at])
|> validate_required([:topic, :extension])
|> put_timestamp(:updated_at)
|> maybe_put_timestamp(:inserted_at)
Expand Down
22 changes: 12 additions & 10 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,6 @@ defmodule Realtime.Application do

Realtime.PromEx.set_metrics_tags()

Registry.start_link(
keys: :duplicate,
name: Realtime.Registry
)

Registry.start_link(
keys: :unique,
name: Realtime.Registry.Unique
)

:syn.set_event_handler(Realtime.SynHandler)

:ok = :syn.add_node_to_scopes([Realtime.Tenants.Connect])
Expand All @@ -73,6 +63,10 @@ defmodule Realtime.Application do
Realtime.GenCounter.DynamicSupervisor,
Realtime.RateCounter.DynamicSupervisor,
Realtime.Latency,
{Registry, keys: :duplicate, name: Realtime.Registry},
{Registry, keys: :unique, name: Realtime.Registry.Unique},
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Handler.Registry},
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Listener.Registry},
{Task.Supervisor, name: Realtime.TaskSupervisor},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
Expand All @@ -85,6 +79,14 @@ defmodule Realtime.Application do
max_restarts: 5},
{DynamicSupervisor,
name: Realtime.Tenants.Migrations.DynamicSupervisor, strategy: :one_for_one},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.BroadcastChanges.Listener.DynamicSupervisor},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.BroadcastChanges.Handler.DynamicSupervisor},
RealtimeWeb.Endpoint,
RealtimeWeb.Presence
] ++ extensions_supervisors()
Expand Down
204 changes: 204 additions & 0 deletions lib/realtime/broadcast_changes/handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
defmodule Realtime.BroadcastChanges.Handler do
use GenServer
require Logger

import Realtime.Adapters.Postgres.Protocol
import Realtime.Adapters.Postgres.Decoder
import Realtime.Helpers, only: [log_error: 2]

alias Realtime.Adapters.Postgres.Decoder
alias Realtime.Adapters.Postgres.Protocol.KeepAlive
alias Realtime.Adapters.Postgres.Protocol.Write
alias Realtime.Api.Tenant
alias Realtime.BroadcastChanges.PostgresReplication
alias Realtime.Database
alias Realtime.Tenants.BatchBroadcast
alias Realtime.Tenants.Cache

defstruct [:tenant_id, relations: %{}, buffer: [], postgres_replication_pid: nil]

@behaviour PostgresReplication.Handler
@registry Realtime.BroadcastChanges.Handler.Registry

@spec name(Tenant.t()) :: term()
def name(%Tenant{external_id: tenant_id}) do
{:via, Registry, {@registry, tenant_id}}
end

@spec supervisor_spec(Tenant.t()) :: term()
def supervisor_spec(%Tenant{external_id: tenant_id}) do
{:via, PartitionSupervisor, {Realtime.BroadcastChanges.Handler.DynamicSupervisor, tenant_id}}
end

@impl true
def call(message, metadata) when is_write(message) do
%{tenant_id: tenant_id} = metadata
%Write{message: message} = parse(message)

case Registry.lookup(@registry, tenant_id) do
[{pid, _}] ->
message |> decode_message() |> then(&send(pid, &1))
:noreply

_ ->
Logger.error("Unable to find BroadcastChanges for tenant: #{tenant_id}")
:shutdown
end
end

def call(message, _metadata) when is_keep_alive(message) do
%KeepAlive{reply: reply, wal_end: wal_end} = parse(message)
wal_end = wal_end + 1

message =
case reply do
:now -> standby_status(wal_end, wal_end, wal_end, reply)
:later -> hold()
end

{:reply, message}
end

def call(msg, state) do
Logger.info("Unknown message received: #{inspect(%{msg: parse(msg), state: state})}")
:noreply
end

@impl true
def handle_info(%Decoder.Messages.Relation{} = msg, state) do
%Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
%{relations: relations} = state
relation = %{name: name, columns: columns, namespace: namespace}
relations = Map.put(relations, id, relation)
{:noreply, %{state | relations: relations}}
end

def handle_info(%Decoder.Messages.Insert{} = msg, state) do
%Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
%{buffer: buffer, relations: relations} = state

case Map.get(relations, relation_id) do
%{columns: columns} ->
to_broadcast =
tuple_data
|> Tuple.to_list()
|> Enum.zip(columns)
|> Enum.map(fn
{nil, %{name: name}} -> {name, nil}
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
{value, %{name: name}} -> {name, value}
end)
|> Map.new()

payload = Map.get(to_broadcast, "payload")

case payload do
nil ->
{:noreply, state}

payload ->
topic = Map.get(to_broadcast, "topic")
private = Map.get(to_broadcast, "private")
event = Map.get(to_broadcast, "event")

id = Map.get(to_broadcast, "id")

payload = Map.put(payload, "id", id)

to_broadcast =
%{
topic: topic,
event: event,
private: private,
payload: payload
}

buffer = buffer ++ [to_broadcast]
{:noreply, %{state | buffer: buffer}}
end

_ ->
log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}")
{:noreply, state}
end
end

def handle_info(%Decoder.Messages.Commit{}, %{buffer: []} = state) do
{:noreply, state}
end

def handle_info(%Decoder.Messages.Commit{}, state) do
%{buffer: buffer, tenant_id: tenant_id} = state
tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)

case BatchBroadcast.broadcast(nil, tenant, %{messages: buffer}, true) do
:ok -> :ok
error -> log_error("UnableToBatchBroadcastChanges", error)
end

{:noreply, %{state | buffer: []}}
end

def handle_info(_, state), do: {:noreply, state}

@impl true
def terminate(reason, _state) do
log_error("BroadcastChangesHandlerTerminated", reason)
:ok
end

def start_link(opts), do: GenServer.start_link(__MODULE__, opts, opts)

@impl true
def init(opts) do
tenant_id = Keyword.fetch!(opts, :tenant_id)

tenant = Cache.get_tenant_by_external_id(tenant_id)
connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true)

supervisor =
{:via, PartitionSupervisor,
{Realtime.BroadcastChanges.Listener.DynamicSupervisor, tenant_id}}

name = {:via, Registry, {Realtime.BroadcastChanges.Listener.Registry, tenant_id}}

configuration = %PostgresReplication{
connection_opts: [
hostname: connection_opts.host,
username: connection_opts.user,
password: connection_opts.pass,
database: connection_opts.name,
port: connection_opts.port,
parameters: [
application_name: connection_opts.application_name
]
],
table: "messages",
schema: "realtime",
handler_module: __MODULE__,
opts: [name: name],
metadata: %{tenant_id: tenant_id}
}

children_spec = %{
id: Handler,
start: {PostgresReplication, :start_link, [configuration]},
type: :worker
}

state = %__MODULE__{tenant_id: tenant_id, buffer: [], relations: %{}}

case DynamicSupervisor.start_child(supervisor, children_spec) do
{:ok, pid} ->
{:ok, %{state | postgres_replication_pid: pid}}

{:error, {:already_started, pid}} ->
{:ok, %{state | postgres_replication_pid: pid}}

error ->
log_error("UnableToStartPostgresReplication", error)
{:stop, error}
end
end
end
Loading

0 comments on commit 7dc2e04

Please sign in to comment.