Skip to content

Commit befdf0e

Browse files
committed
feat: speed up messages replication by avoiding re-encoding JSON
We can use the already JSON encoded payload and pass a Jason.Fragment instead of decoding to then re-encode when the message gets sent to a websocket
1 parent c8d6063 commit befdf0e

File tree

2 files changed

+2
-81
lines changed

2 files changed

+2
-81
lines changed

lib/realtime/tenants/replication_connection.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
159159

160160
@impl true
161161
def init(%__MODULE__{tenant_id: tenant_id, monitored_pid: monitored_pid} = state) do
162+
Process.flag(:fullsweep_after, 20)
162163
Logger.metadata(external_id: tenant_id, project: tenant_id)
163164
Process.monitor(monitored_pid)
164165

@@ -315,7 +316,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
315316
topic: topic,
316317
event: event,
317318
private: private,
318-
payload: Map.put_new(payload, "id", id)
319+
payload: Jason.Fragment.new(payload)
319320
},
320321
:ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
321322
inserted_at = NaiveDateTime.from_iso8601!(inserted_at)
@@ -381,7 +382,6 @@ defmodule Realtime.Tenants.ReplicationConnection do
381382
|> Enum.zip(columns)
382383
|> Map.new(fn
383384
{nil, %{name: name}} -> {name, nil}
384-
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
385385
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
386386
{value, %{name: name}} -> {name, value}
387387
end)

test/realtime/tenants/replication_connection_test.exs

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
100100
"event" => "INSERT",
101101
"meta" => %{"id" => row.id},
102102
"payload" => %{
103-
"id" => row.id,
104103
"value" => value
105104
},
106105
"type" => "broadcast"
@@ -142,7 +141,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
142141
"event" => "INSERT",
143142
"meta" => %{"id" => id},
144143
"payload" => %{
145-
"id" => id,
146144
"value" => ^value
147145
}
148146
},
@@ -231,83 +229,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
231229
assert logs =~ "UnableToBroadcastChanges: %{messages: [%{payload: [\"Payload size exceeds tenant limit\"]}]}"
232230
end
233231

234-
test "payload without id", %{tenant: tenant} do
235-
start_link_supervised!(
236-
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
237-
restart: :transient
238-
)
239-
240-
topic = random_string()
241-
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
242-
subscribe(tenant_topic, topic)
243-
244-
fixture =
245-
message_fixture(tenant, %{
246-
"topic" => topic,
247-
"private" => true,
248-
"event" => "INSERT",
249-
"payload" => %{"value" => "something"}
250-
})
251-
252-
fixture_id = fixture.id
253-
254-
assert_receive {:socket_push, :text, data}, 500
255-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
256-
257-
assert %{
258-
"event" => "broadcast",
259-
"payload" => %{
260-
"event" => "INSERT",
261-
"meta" => %{"id" => ^fixture_id},
262-
"payload" => payload,
263-
"type" => "broadcast"
264-
},
265-
"ref" => nil,
266-
"topic" => ^topic
267-
} = message
268-
269-
assert payload == %{
270-
"value" => "something",
271-
"id" => fixture_id
272-
}
273-
end
274-
275-
test "payload including id", %{tenant: tenant} do
276-
start_link_supervised!(
277-
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
278-
restart: :transient
279-
)
280-
281-
topic = random_string()
282-
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
283-
subscribe(tenant_topic, topic)
284-
285-
payload = %{"value" => "something", "id" => "123456"}
286-
287-
%{id: fixture_id} =
288-
message_fixture(tenant, %{
289-
"topic" => topic,
290-
"private" => true,
291-
"event" => "INSERT",
292-
"payload" => payload
293-
})
294-
295-
assert_receive {:socket_push, :text, data}, 500
296-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
297-
298-
assert %{
299-
"event" => "broadcast",
300-
"payload" => %{
301-
"meta" => %{"id" => ^fixture_id},
302-
"event" => "INSERT",
303-
"payload" => ^payload,
304-
"type" => "broadcast"
305-
},
306-
"ref" => nil,
307-
"topic" => ^topic
308-
} = message
309-
end
310-
311232
test "fails on existing replication slot", %{tenant: tenant} do
312233
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
313234
name = "supabase_realtime_messages_replication_slot_test"

0 commit comments

Comments
 (0)