Skip to content

Commit c786504

Browse files
committed
add replication streaming on user connect
1 parent 25a965b commit c786504

File tree

23 files changed

+953
-111
lines changed

23 files changed

+953
-111
lines changed
File renamed without changes.

lib/realtime/api/message.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,16 @@ defmodule Realtime.Api.Message do
1111
schema "messages" do
1212
field :topic, :string
1313
field :extension, Ecto.Enum, values: [:broadcast, :presence]
14+
field :payload, :map
15+
field :event, :string
16+
field :private, :boolean
17+
1418
timestamps()
1519
end
1620

1721
def changeset(message, attrs) do
1822
message
19-
|> cast(attrs, [:topic, :extension, :inserted_at, :updated_at])
23+
|> cast(attrs, [:topic, :extension, :payload, :event, :private, :inserted_at, :updated_at])
2024
|> validate_required([:topic, :extension])
2125
|> put_timestamp(:updated_at)
2226
|> maybe_put_timestamp(:inserted_at)

lib/realtime/application.ex

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,6 @@ defmodule Realtime.Application do
4141

4242
Realtime.PromEx.set_metrics_tags()
4343

44-
Registry.start_link(
45-
keys: :duplicate,
46-
name: Realtime.Registry
47-
)
48-
49-
Registry.start_link(
50-
keys: :unique,
51-
name: Realtime.Registry.Unique
52-
)
53-
5444
:syn.set_event_handler(Realtime.SynHandler)
5545

5646
:ok = :syn.add_node_to_scopes([Realtime.Tenants.Connect])
@@ -73,6 +63,10 @@ defmodule Realtime.Application do
7363
Realtime.GenCounter.DynamicSupervisor,
7464
Realtime.RateCounter.DynamicSupervisor,
7565
Realtime.Latency,
66+
{Registry, keys: :duplicate, name: Realtime.Registry},
67+
{Registry, keys: :unique, name: Realtime.Registry.Unique},
68+
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Handler.Registry},
69+
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Listener.Registry},
7670
{Task.Supervisor, name: Realtime.TaskSupervisor},
7771
{PartitionSupervisor,
7872
child_spec: DynamicSupervisor,
@@ -85,6 +79,14 @@ defmodule Realtime.Application do
8579
max_restarts: 5},
8680
{DynamicSupervisor,
8781
name: Realtime.Tenants.Migrations.DynamicSupervisor, strategy: :one_for_one},
82+
{PartitionSupervisor,
83+
child_spec: DynamicSupervisor,
84+
strategy: :one_for_one,
85+
name: Realtime.BroadcastChanges.Listener.DynamicSupervisor},
86+
{PartitionSupervisor,
87+
child_spec: DynamicSupervisor,
88+
strategy: :one_for_one,
89+
name: Realtime.BroadcastChanges.Handler.DynamicSupervisor},
8890
RealtimeWeb.Endpoint,
8991
RealtimeWeb.Presence
9092
] ++ extensions_supervisors()
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
defmodule Realtime.BroadcastChanges.Handler do
2+
use GenServer
3+
require Logger
4+
5+
import Realtime.Adapters.Postgres.Protocol
6+
import Realtime.Adapters.Postgres.Decoder
7+
import Realtime.Helpers, only: [log_error: 2]
8+
9+
alias Realtime.Adapters.Postgres.Decoder
10+
alias Realtime.Adapters.Postgres.Protocol.KeepAlive
11+
alias Realtime.Adapters.Postgres.Protocol.Write
12+
alias Realtime.Api.Tenant
13+
alias Realtime.BroadcastChanges.PostgresReplication
14+
alias Realtime.Database
15+
alias Realtime.Tenants.BatchBroadcast
16+
17+
defstruct [:tenant_id, relations: %{}, buffer: []]
18+
19+
@behaviour PostgresReplication.Handler
20+
21+
def start(%Tenant{external_id: tenant_id} = tenant, opts \\ []) do
22+
supervisor_spec =
23+
{:via, PartitionSupervisor,
24+
{Realtime.BroadcastChanges.Handler.DynamicSupervisor, tenant_id}}
25+
26+
connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true)
27+
name = {:via, Registry, {Realtime.BroadcastChanges.Handler.Registry, tenant_id}}
28+
opts = [tenant_id: tenant_id, connection_opts: connection_opts, name: name] ++ opts
29+
chidlren_spec = {__MODULE__, opts}
30+
31+
Logger.info("Initializing handler for #{tenant_id}")
32+
33+
case DynamicSupervisor.start_child(supervisor_spec, chidlren_spec) |> IO.inspect() do
34+
{:ok, pid} ->
35+
{:ok, pid}
36+
37+
{:error, {:already_started, pid}} ->
38+
{:ok, pid}
39+
40+
error ->
41+
log_error("UnableToStartHandler", error)
42+
{:error, :handler_failed_to_start}
43+
end
44+
end
45+
46+
@impl true
47+
def call(message, metadata) when is_write(message) do
48+
%{tenant_id: tenant_id} = metadata
49+
%Write{message: message} = parse(message)
50+
51+
case Registry.lookup(Realtime.BroadcastChanges.Handler.Registry, tenant_id) do
52+
[{pid, _}] ->
53+
message
54+
|> decode_message()
55+
|> then(&send(pid, &1))
56+
57+
_ ->
58+
Logger.warning("Unable to find BroadcastChanges for tenant: #{tenant_id}")
59+
:ok
60+
end
61+
62+
:noreply
63+
end
64+
65+
def call(message, _metadata) when is_keep_alive(message) do
66+
%KeepAlive{reply: reply, wal_end: wal_end} = parse(message)
67+
wal_end = wal_end + 1
68+
69+
message =
70+
case reply do
71+
:now -> standby_status(wal_end, wal_end, wal_end, reply)
72+
:later -> hold()
73+
end
74+
75+
{:reply, message}
76+
end
77+
78+
def call(msg, state) do
79+
Logger.info("Unknown message received: #{inspect(%{msg: parse(msg), state: state})}")
80+
:noreply
81+
end
82+
83+
@impl true
84+
85+
def handle_info(%Decoder.Messages.Relation{} = msg, state) do
86+
%Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
87+
%{relations: relations} = state
88+
relation = %{name: name, columns: columns, namespace: namespace}
89+
relations = Map.put(relations, id, relation)
90+
{:noreply, %{state | relations: relations}}
91+
end
92+
93+
def handle_info(%Decoder.Messages.Insert{} = msg, state) do
94+
%Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
95+
%{buffer: buffer, relations: relations} = state
96+
97+
case Map.get(relations, relation_id) do
98+
%{columns: columns} ->
99+
to_broadcast =
100+
tuple_data
101+
|> Tuple.to_list()
102+
|> Enum.zip(columns)
103+
|> Enum.map(fn
104+
{nil, %{name: name}} -> {name, nil}
105+
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
106+
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
107+
{value, %{name: name}} -> {name, value}
108+
end)
109+
|> Map.new()
110+
111+
topic = Map.get(to_broadcast, "topic")
112+
private = Map.get(to_broadcast, "private")
113+
event = Map.get(to_broadcast, "event")
114+
payload = Map.get(to_broadcast, "payload")
115+
id = Map.get(to_broadcast, "id")
116+
117+
payload = Map.put(payload, "id", id)
118+
119+
to_broadcast = %{
120+
topic: topic,
121+
event: event,
122+
private: private,
123+
payload: payload
124+
}
125+
126+
buffer = buffer ++ [to_broadcast]
127+
{:noreply, %{state | buffer: buffer}}
128+
129+
_ ->
130+
log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}")
131+
{:noreply, state}
132+
end
133+
end
134+
135+
def handle_info(%Decoder.Messages.Commit{}, %{buffer: []} = state) do
136+
{:noreply, state}
137+
end
138+
139+
def handle_info(%Decoder.Messages.Commit{}, state) do
140+
%{buffer: buffer, tenant_id: tenant_id} = state
141+
tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)
142+
143+
case BatchBroadcast.broadcast(nil, tenant, %{messages: buffer}, true) do
144+
:ok -> :ok
145+
error -> log_error("UnableToBatchBroadcastChanges", error)
146+
end
147+
148+
{:noreply, %{state | buffer: []}}
149+
end
150+
151+
def handle_info(_, state), do: {:noreply, state}
152+
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, opts)
153+
154+
@impl true
155+
def init(opts) do
156+
Logger.info("Initializing connection with the status: #{inspect(opts)}")
157+
158+
tenant_id = Keyword.fetch!(opts, :tenant_id)
159+
connection_opts = Keyword.fetch!(opts, :connection_opts)
160+
161+
supervisor =
162+
{:via, PartitionSupervisor,
163+
{Realtime.BroadcastChanges.Listener.DynamicSupervisor, tenant_id}}
164+
165+
name = {:via, Registry, {Realtime.BroadcastChanges.Listener.Registry, tenant_id}}
166+
167+
configuration = %PostgresReplication{
168+
connection_opts: [
169+
hostname: connection_opts.host,
170+
username: connection_opts.user,
171+
password: connection_opts.pass,
172+
database: connection_opts.name,
173+
port: connection_opts.port,
174+
parameters: [
175+
application_name: connection_opts.application_name
176+
]
177+
],
178+
table: "messages",
179+
schema: "realtime",
180+
handler_module: __MODULE__,
181+
opts: [name: name],
182+
metadata: %{tenant_id: tenant_id}
183+
}
184+
185+
children_spec = {PostgresReplication, configuration}
186+
state = %__MODULE__{tenant_id: tenant_id, buffer: [], relations: %{}}
187+
188+
case DynamicSupervisor.start_child(supervisor, children_spec) do
189+
{:ok, _pid} ->
190+
{:ok, state}
191+
192+
{:error, {:already_started, _pid}} ->
193+
{:ok, state}
194+
195+
error ->
196+
log_error("UnableToStartPostgresReplication", error)
197+
{:stop, :shutdown}
198+
end
199+
end
200+
end

0 commit comments

Comments
 (0)