Skip to content

Commit 3b2d7f4

Browse files
committed
bump to latest version; fix tests
1 parent 231a49c commit 3b2d7f4

File tree

2 files changed

+176
-47
lines changed

2 files changed

+176
-47
lines changed

lib/realtime/tenants/replication_connection.ex

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,6 @@ defmodule Realtime.Tenants.ReplicationConnection do
1515
* `handler_module` - The module that will handle the data received from the replication stream.
1616
* `metadata` - The metadata to pass to the handler module.
1717
18-
## How did we understood the data tuple format when pgoutput uses binary mode?
19-
We checked the documentation with the help of AI and understood that pgoutput does the following logic:
20-
* When `pgoutput` uses binary mode goes to https://doxygen.postgresql.org/proto_8c_source.html#l00767
21-
* We end up in `OidSendFunctionCall` which gets the write function to call and transform into a binary payload based on OID - https://doxygen.postgresql.org/fmgr_8c.html#ad028dec5dc28e883308be264689f4559
22-
* The functions are referenced in https://doxygen.postgresql.org/dir_19ba549c088f18020d0eadfcd07e87be.html
23-
* It does `<data_tupe>.c` and calls `<data_type>_send` as the function to use
24-
* Jsonb - https://doxygen.postgresql.org/jsonb_8c.html#afa4708e251f3078084e9986a46f228e9
25-
* First byte is JSONB version (1), remainder is the text value
26-
* Timestamp - https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c.html#ab0f80b3ae44bdcc50d0c0abb0c1d7939
27-
* 64 bytes integer with microseconds since `2000-01-01`
28-
* Boolean - https://doxygen.postgresql.org/bool_8c.html#a71cfb4f6c28259924b231dae4781b3fc
29-
* 1 is true, 0 is false
30-
* UUID - https://doxygen.postgresql.org/uuid_8c.html#a3cd205c61c108aaf08d6746c1d321e81
31-
* 16 byte binary ( https://doxygen.postgresql.org/uuid_8h.html#a9692a0205a857ed2cc29558470c2ed77 )
32-
* Text - https://doxygen.postgresql.org/varlena_8c.html#a0ea0ddd71c614e3fbd3571e70c53ed77
33-
* Sends binary data
3418
"""
3519
use Postgrex.ReplicationConnection
3620
use Realtime.Logs
@@ -73,7 +57,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
7357
publication_name: nil,
7458
replication_slot_name: nil,
7559
output_plugin: "pgoutput",
76-
proto_version: 1,
60+
proto_version: 3,
7761
relations: %{},
7862
buffer: [],
7963
monitored_pid: nil,
@@ -332,7 +316,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
332316
topic: topic,
333317
event: event,
334318
private: private,
335-
payload: payload
319+
payload: Jason.Fragment.new(payload)
336320
},
337321
:ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
338322
latency_inserted_at = NaiveDateTime.utc_now(:microsecond) |> NaiveDateTime.diff(inserted_at, :microsecond)
@@ -397,8 +381,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
397381
|> Enum.zip(columns)
398382
|> Map.new(fn
399383
{nil, %{name: name}} -> {name, nil}
400-
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
401-
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
384+
{value, %{name: name, type: "bool"}} -> {name, value}
402385
{value, %{name: name}} -> {name, value}
403386
end)
404387
end

test/realtime/tenants/replication_connection_test.exs

Lines changed: 173 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,88 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
151151
end
152152
end
153153

154+
test "starts a handler for the tenant and broadcasts to public channel", %{tenant: tenant, db_conn: db_conn} do
155+
start_link_supervised!(
156+
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
157+
restart: :transient
158+
)
159+
160+
topic = random_string()
161+
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, true)
162+
subscribe(tenant_topic, topic)
163+
164+
total_messages = 5
165+
# Works with one insert per transaction
166+
for _ <- 1..total_messages do
167+
value = random_string()
168+
169+
row =
170+
message_fixture(tenant, %{
171+
"topic" => topic,
172+
"private" => false,
173+
"event" => "INSERT",
174+
"payload" => %{"value" => value}
175+
})
176+
177+
assert_receive {:socket_push, :text, data}
178+
message = data |> IO.iodata_to_binary() |> Jason.decode!()
179+
180+
payload = %{
181+
"event" => "INSERT",
182+
"meta" => %{"id" => row.id},
183+
"payload" => %{
184+
"value" => value
185+
},
186+
"type" => "broadcast"
187+
}
188+
189+
assert message == %{"event" => "broadcast", "payload" => payload, "ref" => nil, "topic" => topic}
190+
end
191+
192+
Process.sleep(500)
193+
# Works with batch inserts
194+
messages =
195+
for _ <- 1..total_messages do
196+
Message.changeset(%Message{}, %{
197+
"topic" => topic,
198+
"private" => false,
199+
"event" => "INSERT",
200+
"extension" => "broadcast",
201+
"payload" => %{"value" => random_string()}
202+
})
203+
end
204+
205+
{:ok, _} = Realtime.Repo.insert_all_entries(db_conn, messages, Message)
206+
207+
messages_received =
208+
for _ <- 1..total_messages, into: [] do
209+
assert_receive {:socket_push, :text, data}
210+
data |> IO.iodata_to_binary() |> Jason.decode!()
211+
end
212+
213+
for row <- messages do
214+
assert Enum.count(messages_received, fn message_received ->
215+
value = row |> Map.from_struct() |> get_in([:changes, :payload, "value"])
216+
217+
match?(
218+
%{
219+
"event" => "broadcast",
220+
"payload" => %{
221+
"event" => "INSERT",
222+
"meta" => %{"id" => _id},
223+
"payload" => %{
224+
"value" => ^value
225+
}
226+
},
227+
"ref" => nil,
228+
"topic" => ^topic
229+
},
230+
message_received
231+
)
232+
end) == 1
233+
end
234+
end
235+
154236
test "monitored pid stopping brings down ReplicationConnection ", %{tenant: tenant} do
155237
monitored_pid =
156238
spawn(fn ->
@@ -227,6 +309,86 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
227309
assert logs =~ "UnableToBroadcastChanges: %{messages: [%{payload: [\"Payload size exceeds tenant limit\"]}]}"
228310
end
229311

312+
test "payload without id", %{tenant: tenant, db_conn: db_conn} do
313+
start_link_supervised!(
314+
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
315+
restart: :transient
316+
)
317+
318+
topic = random_string()
319+
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
320+
subscribe(tenant_topic, topic)
321+
322+
value = "something"
323+
event = "INSERT"
324+
325+
Postgrex.query!(
326+
db_conn,
327+
"SELECT realtime.send (json_build_object ('value', $1 :: text)::jsonb, $2 :: text, $3 :: text, TRUE::bool);",
328+
[value, event, topic]
329+
)
330+
331+
{:ok, [%{id: id}]} = Repo.all(db_conn, from(m in Message), Message)
332+
333+
assert_receive {:socket_push, :text, data}, 500
334+
message = data |> IO.iodata_to_binary() |> Jason.decode!()
335+
336+
assert %{
337+
"event" => "broadcast",
338+
"payload" => %{
339+
"event" => "INSERT",
340+
"meta" => %{"id" => ^id},
341+
"payload" => payload,
342+
"type" => "broadcast"
343+
},
344+
"ref" => nil,
345+
"topic" => ^topic
346+
} = message
347+
348+
assert payload == %{
349+
"value" => "something",
350+
"id" => id
351+
}
352+
end
353+
354+
test "payload including id", %{tenant: tenant, db_conn: db_conn} do
355+
start_link_supervised!(
356+
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
357+
restart: :transient
358+
)
359+
360+
topic = random_string()
361+
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
362+
subscribe(tenant_topic, topic)
363+
364+
id = "123456"
365+
value = "something"
366+
event = "INSERT"
367+
368+
Postgrex.query!(
369+
db_conn,
370+
"SELECT realtime.send (json_build_object ('value', $1 :: text, 'id', $2 :: text)::jsonb, $3 :: text, $4 :: text, TRUE::bool);",
371+
[value, id, event, topic]
372+
)
373+
374+
{:ok, [%{id: message_id}]} = Repo.all(db_conn, from(m in Message), Message)
375+
376+
assert_receive {:socket_push, :text, data}, 500
377+
message = data |> IO.iodata_to_binary() |> Jason.decode!()
378+
379+
assert %{
380+
"event" => "broadcast",
381+
"payload" => %{
382+
"meta" => %{"id" => ^message_id},
383+
"event" => "INSERT",
384+
"payload" => %{"value" => "something", "id" => ^id},
385+
"type" => "broadcast"
386+
},
387+
"ref" => nil,
388+
"topic" => ^topic
389+
} = message
390+
end
391+
230392
test "fails on existing replication slot", %{tenant: tenant} do
231393
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
232394
name = "supabase_realtime_messages_replication_slot_test"
@@ -355,33 +517,17 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
355517
"payload" => %{"value" => random_string()}
356518
})
357519

358-
# Telemetry event may arrive before socket_push, so we need to handle both
359-
receive do
360-
{:socket_push, :text, data} ->
361-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
362-
assert %{"event" => "broadcast", "payload" => _, "ref" => nil, "topic" => ^topic} = message
363-
364-
assert_receive {[:realtime, :tenants, :broadcast_from_database],
365-
%{latency_committed_at: latency_committed_at, latency_inserted_at: latency_inserted_at},
366-
%{tenant: ^external_id}},
367-
1000
368-
369-
assert latency_committed_at
370-
assert latency_inserted_at != 0
371-
372-
{[:realtime, :tenants, :broadcast_from_database],
373-
%{latency_committed_at: latency_committed_at, latency_inserted_at: latency_inserted_at},
374-
%{tenant: ^external_id}} ->
375-
assert latency_committed_at
376-
assert latency_inserted_at != 0
377-
378-
assert_receive {:socket_push, :text, data}, 2000
379-
message = data |> IO.iodata_to_binary() |> Jason.decode!()
380-
assert %{"event" => "broadcast", "payload" => _, "ref" => nil, "topic" => ^topic} = message
381-
after
382-
3000 ->
383-
flunk("Expected to receive either socket_push or telemetry event")
384-
end
520+
assert_receive {:socket_push, :text, data}, 500
521+
message = data |> IO.iodata_to_binary() |> Jason.decode!()
522+
523+
assert %{"event" => "broadcast", "payload" => _, "ref" => nil, "topic" => ^topic} = message
524+
525+
assert_receive {[:realtime, :tenants, :broadcast_from_database],
526+
%{latency_committed_at: latency_committed_at, latency_inserted_at: latency_inserted_at},
527+
%{tenant: ^external_id}}
528+
529+
assert latency_committed_at > 0
530+
assert latency_inserted_at > 0
385531
end
386532
end
387533

0 commit comments

Comments
 (0)