Skip to content

Commit 5c8e8fa

Browse files
committed
fix: include payload id if not defined on realtime.send
1 parent 8fa1fac commit 5c8e8fa

File tree

3 files changed

+128
-7
lines changed

3 files changed

+128
-7
lines changed

lib/realtime/tenants/migrations.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ defmodule Realtime.Tenants.Migrations do
7575
SubscriptionIndexBridgingDisabled,
7676
RunSubscriptionIndexBridgingDisabled,
7777
BroadcastSendErrorLogging,
78-
CreateMessagesReplayIndex
78+
CreateMessagesReplayIndex,
79+
BroadcastSendIncludePayloadId
7980
}
8081

8182
@migrations [
@@ -142,7 +143,8 @@ defmodule Realtime.Tenants.Migrations do
142143
{20_250_506_224_012, SubscriptionIndexBridgingDisabled},
143144
{20_250_523_164_012, RunSubscriptionIndexBridgingDisabled},
144145
{20_250_714_121_412, BroadcastSendErrorLogging},
145-
{20_250_905_041_441, CreateMessagesReplayIndex}
146+
{20_250_905_041_441, CreateMessagesReplayIndex},
147+
{20_251_103_001_201, BroadcastSendIncludePayloadId}
146148
]
147149

148150
defstruct [:tenant_external_id, :settings]
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
defmodule Realtime.Tenants.Migrations.BroadcastSendIncludePayloadId do
2+
@moduledoc false
3+
use Ecto.Migration
4+
5+
# Include ID in the payload if not defined
6+
def change do
7+
execute("""
8+
CREATE OR REPLACE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true ) RETURNS void
9+
AS $$
10+
DECLARE
11+
generated_id uuid;
12+
final_payload jsonb;
13+
BEGIN
14+
BEGIN
15+
-- Generate a new UUID for the id
16+
generated_id := gen_random_uuid();
17+
18+
-- Check if payload has an 'id' key, if not, add the generated UUID
19+
IF payload ? 'id' THEN
20+
final_payload := payload;
21+
ELSE
22+
final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));
23+
END IF;
24+
25+
-- Set the topic configuration
26+
EXECUTE format('SET LOCAL realtime.topic TO %L', topic);
27+
28+
-- Attempt to insert the message
29+
INSERT INTO realtime.messages (id, payload, event, topic, private, extension)
30+
VALUES (generated_id, final_payload, event, topic, private, 'broadcast');
31+
EXCEPTION
32+
WHEN OTHERS THEN
33+
-- Capture and notify the error
34+
RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;
35+
END;
36+
END;
37+
$$
38+
LANGUAGE plpgsql;
39+
""")
40+
end
41+
end

test/realtime/tenants/replication_connection_test.exs

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
2222
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
2323
name = "supabase_realtime_messages_replication_slot_test"
2424
Postgrex.query(db_conn, "SELECT pg_drop_replication_slot($1)", [name])
25-
Process.exit(db_conn, :normal)
2625

27-
%{tenant: tenant}
26+
%{tenant: tenant, db_conn: db_conn}
2827
end
2928

3029
describe "temporary process" do
@@ -70,7 +69,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
7069
assert {:error, _} = ReplicationConnection.start(tenant, self())
7170
end
7271

73-
test "starts a handler for the tenant and broadcasts", %{tenant: tenant} do
72+
test "starts a handler for the tenant and broadcasts", %{tenant: tenant, db_conn: db_conn} do
7473
start_link_supervised!(
7574
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
7675
restart: :transient
@@ -121,7 +120,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
121120
})
122121
end
123122

124-
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
125123
{:ok, _} = Realtime.Repo.insert_all_entries(db_conn, messages, Message)
126124

127125
messages_received =
@@ -139,7 +137,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
139137
"event" => "broadcast",
140138
"payload" => %{
141139
"event" => "INSERT",
142-
"meta" => %{"id" => id},
140+
"meta" => %{"id" => _id},
143141
"payload" => %{
144142
"value" => ^value
145143
}
@@ -229,6 +227,86 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
229227
assert logs =~ "UnableToBroadcastChanges: %{messages: [%{payload: [\"Payload size exceeds tenant limit\"]}]}"
230228
end
231229

230+
test "payload without id", %{tenant: tenant, db_conn: db_conn} do
231+
start_link_supervised!(
232+
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
233+
restart: :transient
234+
)
235+
236+
topic = random_string()
237+
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
238+
subscribe(tenant_topic, topic)
239+
240+
value = "something"
241+
event = "INSERT"
242+
243+
Postgrex.query!(
244+
db_conn,
245+
"SELECT realtime.send (json_build_object ('value', $1 :: text)::jsonb, $2 :: text, $3 :: text, TRUE::bool);",
246+
[value, event, topic]
247+
)
248+
249+
{:ok, [%{id: id}]} = Repo.all(db_conn, from(m in Message), Message)
250+
251+
assert_receive {:socket_push, :text, data}, 500
252+
message = data |> IO.iodata_to_binary() |> Jason.decode!()
253+
254+
assert %{
255+
"event" => "broadcast",
256+
"payload" => %{
257+
"event" => "INSERT",
258+
"meta" => %{"id" => ^id},
259+
"payload" => payload,
260+
"type" => "broadcast"
261+
},
262+
"ref" => nil,
263+
"topic" => ^topic
264+
} = message
265+
266+
assert payload == %{
267+
"value" => "something",
268+
"id" => id
269+
}
270+
end
271+
272+
test "payload including id", %{tenant: tenant, db_conn: db_conn} do
273+
start_link_supervised!(
274+
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
275+
restart: :transient
276+
)
277+
278+
topic = random_string()
279+
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
280+
subscribe(tenant_topic, topic)
281+
282+
id = "123456"
283+
value = "something"
284+
event = "INSERT"
285+
286+
Postgrex.query!(
287+
db_conn,
288+
"SELECT realtime.send (json_build_object ('value', $1 :: text, 'id', $2 :: text)::jsonb, $3 :: text, $4 :: text, TRUE::bool);",
289+
[value, id, event, topic]
290+
)
291+
292+
{:ok, [%{id: message_id}]} = Repo.all(db_conn, from(m in Message), Message)
293+
294+
assert_receive {:socket_push, :text, data}, 500
295+
message = data |> IO.iodata_to_binary() |> Jason.decode!()
296+
297+
assert %{
298+
"event" => "broadcast",
299+
"payload" => %{
300+
"meta" => %{"id" => ^message_id},
301+
"event" => "INSERT",
302+
"payload" => %{"value" => "something", "id" => ^id},
303+
"type" => "broadcast"
304+
},
305+
"ref" => nil,
306+
"topic" => ^topic
307+
} = message
308+
end
309+
232310
test "fails on existing replication slot", %{tenant: tenant} do
233311
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
234312
name = "supabase_realtime_messages_replication_slot_test"

0 commit comments

Comments
 (0)