Skip to content

Commit 852028f

Browse files
committed
add replication streaming on user connect
1 parent 25a965b commit 852028f

File tree

22 files changed

+725
-35
lines changed

22 files changed

+725
-35
lines changed
File renamed without changes.

lib/realtime/api/message.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,16 @@ defmodule Realtime.Api.Message do
1111
schema "messages" do
1212
field :topic, :string
1313
field :extension, Ecto.Enum, values: [:broadcast, :presence]
14+
field :payload, :map
15+
field :event, :string
16+
field :private, :boolean
17+
1418
timestamps()
1519
end
1620

1721
def changeset(message, attrs) do
1822
message
19-
|> cast(attrs, [:topic, :extension, :inserted_at, :updated_at])
23+
|> cast(attrs, [:topic, :extension, :payload, :event, :private, :inserted_at, :updated_at])
2024
|> validate_required([:topic, :extension])
2125
|> put_timestamp(:updated_at)
2226
|> maybe_put_timestamp(:inserted_at)

lib/realtime/application.ex

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,6 @@ defmodule Realtime.Application do
4141

4242
Realtime.PromEx.set_metrics_tags()
4343

44-
Registry.start_link(
45-
keys: :duplicate,
46-
name: Realtime.Registry
47-
)
48-
49-
Registry.start_link(
50-
keys: :unique,
51-
name: Realtime.Registry.Unique
52-
)
53-
5444
:syn.set_event_handler(Realtime.SynHandler)
5545

5646
:ok = :syn.add_node_to_scopes([Realtime.Tenants.Connect])
@@ -73,6 +63,10 @@ defmodule Realtime.Application do
7363
Realtime.GenCounter.DynamicSupervisor,
7464
Realtime.RateCounter.DynamicSupervisor,
7565
Realtime.Latency,
66+
{Registry, keys: :duplicate, name: Realtime.Registry},
67+
{Registry, keys: :unique, name: Realtime.Registry.Unique},
68+
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Handler.Registry},
69+
{Registry, keys: :unique, name: Realtime.BroadcastChanges.Listener.Registry},
7670
{Task.Supervisor, name: Realtime.TaskSupervisor},
7771
{PartitionSupervisor,
7872
child_spec: DynamicSupervisor,
@@ -85,6 +79,14 @@ defmodule Realtime.Application do
8579
max_restarts: 5},
8680
{DynamicSupervisor,
8781
name: Realtime.Tenants.Migrations.DynamicSupervisor, strategy: :one_for_one},
82+
{PartitionSupervisor,
83+
child_spec: DynamicSupervisor,
84+
strategy: :one_for_one,
85+
name: Realtime.BroadcastChanges.Listener.DynamicSupervisor},
86+
{PartitionSupervisor,
87+
child_spec: DynamicSupervisor,
88+
strategy: :one_for_one,
89+
name: Realtime.BroadcastChanges.Handler.DynamicSupervisor},
8890
RealtimeWeb.Endpoint,
8991
RealtimeWeb.Presence
9092
] ++ extensions_supervisors()
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
defmodule Realtime.BroadcastChanges.Handler do
2+
use GenServer
3+
require Logger
4+
5+
import Realtime.Adapters.Postgres.Protocol
6+
import Realtime.Adapters.Postgres.Decoder
7+
import Realtime.Helpers, only: [log_error: 2]
8+
9+
alias Realtime.Adapters.Postgres.Decoder
10+
alias Realtime.Adapters.Postgres.Protocol.KeepAlive
11+
alias Realtime.Adapters.Postgres.Protocol.Write
12+
alias Realtime.Api.Tenant
13+
alias Realtime.BroadcastChanges.PostgresReplication
14+
alias Realtime.Database
15+
alias Realtime.Tenants.BatchBroadcast
16+
17+
defstruct [:tenant_id, relations: %{}, buffer: []]
18+
19+
@behaviour PostgresReplication.Handler
20+
21+
def start(%Tenant{external_id: tenant_id} = tenant, opts \\ []) do
22+
supervisor_spec =
23+
{:via, PartitionSupervisor,
24+
{Realtime.BroadcastChanges.Handler.DynamicSupervisor, tenant_id}}
25+
26+
connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true)
27+
name = {:via, Registry, {Realtime.BroadcastChanges.Handler.Registry, tenant_id}}
28+
opts = [tenant_id: tenant_id, connection_opts: connection_opts, name: name] ++ opts
29+
chidlren_spec = {__MODULE__, opts}
30+
Logger.info("Initializing handler for #{tenant_id}")
31+
32+
case DynamicSupervisor.start_child(supervisor_spec, chidlren_spec) do
33+
{:ok, pid} ->
34+
{:ok, pid}
35+
36+
{:error, {:already_started, pid}} ->
37+
{:ok, pid}
38+
39+
error ->
40+
log_error("UnableToStartHandler", error)
41+
{:error, :handler_failed_to_start}
42+
end
43+
end
44+
45+
@impl true
46+
def call(message, metadata) when is_write(message) do
47+
%{tenant_id: tenant_id} = metadata
48+
%Write{message: message} = parse(message)
49+
50+
case Registry.lookup(Realtime.BroadcastChanges.Handler.Registry, tenant_id) do
51+
[{pid, _}] ->
52+
message
53+
|> decode_message()
54+
|> then(&send(pid, &1))
55+
56+
_ ->
57+
Logger.warning("Unable to find BroadcastChanges for tenant: #{tenant_id}")
58+
:ok
59+
end
60+
61+
:noreply
62+
end
63+
64+
def call(message, _metadata) when is_keep_alive(message) do
65+
%KeepAlive{reply: reply, wal_end: wal_end} = parse(message)
66+
wal_end = wal_end + 1
67+
68+
message =
69+
case reply do
70+
:now -> standby_status(wal_end, wal_end, wal_end, reply)
71+
:later -> hold()
72+
end
73+
74+
{:reply, message}
75+
end
76+
77+
def call(msg, state) do
78+
Logger.info("Unknown message received: #{inspect(%{msg: parse(msg), state: state})}")
79+
:noreply
80+
end
81+
82+
@impl true
83+
84+
def handle_info(%Decoder.Messages.Relation{} = msg, state) do
85+
%Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
86+
%{relations: relations} = state
87+
relation = %{name: name, columns: columns, namespace: namespace}
88+
relations = Map.put(relations, id, relation)
89+
{:noreply, %{state | relations: relations}}
90+
end
91+
92+
def handle_info(%Decoder.Messages.Insert{} = msg, state) do
93+
%Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
94+
%{buffer: buffer, relations: relations} = state
95+
96+
case Map.get(relations, relation_id) do
97+
%{columns: columns} ->
98+
to_broadcast =
99+
tuple_data
100+
|> Tuple.to_list()
101+
|> Enum.zip(columns)
102+
|> Enum.map(fn
103+
{nil, %{name: name}} -> {name, nil}
104+
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
105+
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
106+
{value, %{name: name}} -> {name, value}
107+
end)
108+
|> Map.new()
109+
110+
topic = Map.get(to_broadcast, "topic")
111+
private = Map.get(to_broadcast, "private")
112+
event = Map.get(to_broadcast, "event")
113+
payload = Map.get(to_broadcast, "payload")
114+
id = Map.get(to_broadcast, "id")
115+
116+
payload = Map.put(payload, "id", id)
117+
118+
to_broadcast = %{
119+
topic: topic,
120+
event: event,
121+
private: private,
122+
payload: payload
123+
}
124+
125+
buffer = buffer ++ [to_broadcast]
126+
{:noreply, %{state | buffer: buffer}}
127+
128+
_ ->
129+
log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}")
130+
{:noreply, state}
131+
end
132+
end
133+
134+
def handle_info(%Decoder.Messages.Commit{}, %{buffer: []} = state) do
135+
{:noreply, state}
136+
end
137+
138+
def handle_info(%Decoder.Messages.Commit{}, state) do
139+
%{buffer: buffer, tenant_id: tenant_id} = state
140+
tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)
141+
142+
case BatchBroadcast.broadcast(nil, tenant, %{messages: buffer}, true) do
143+
:ok -> :ok
144+
error -> log_error("UnableToBatchBroadcastChanges", error)
145+
end
146+
147+
{:noreply, %{state | buffer: []}}
148+
end
149+
150+
def handle_info(_, state), do: {:noreply, state}
151+
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, opts)
152+
153+
@impl true
154+
def init(opts) do
155+
Logger.info("Initializing connection with the status: #{inspect(opts)}")
156+
157+
tenant_id = Keyword.fetch!(opts, :tenant_id)
158+
connection_opts = Keyword.fetch!(opts, :connection_opts)
159+
160+
supervisor =
161+
{:via, PartitionSupervisor,
162+
{Realtime.BroadcastChanges.Listener.DynamicSupervisor, tenant_id}}
163+
164+
name = {:via, Registry, {Realtime.BroadcastChanges.Listener.Registry, tenant_id}}
165+
166+
configuration = %PostgresReplication{
167+
connection_opts: [
168+
hostname: connection_opts.host,
169+
username: connection_opts.user,
170+
password: connection_opts.pass,
171+
database: connection_opts.name,
172+
port: connection_opts.port,
173+
parameters: [
174+
application_name: connection_opts.application_name
175+
]
176+
],
177+
table: "messages",
178+
schema: "realtime",
179+
handler_module: __MODULE__,
180+
opts: [name: name],
181+
metadata: %{tenant_id: tenant_id}
182+
}
183+
184+
children_spec = {PostgresReplication, configuration}
185+
state = %__MODULE__{tenant_id: tenant_id, buffer: [], relations: %{}}
186+
187+
case DynamicSupervisor.start_child(supervisor, children_spec) do
188+
{:ok, _pid} ->
189+
{:ok, state}
190+
191+
{:error, {:already_started, _pid}} ->
192+
{:ok, state}
193+
194+
error ->
195+
log_error("UnableToStartPostgresReplication", error)
196+
{:stop, :shutdown}
197+
end
198+
end
199+
end

0 commit comments

Comments
 (0)