Skip to content

Commit 5290aa1

Browse files
authored
fix: change CdcRls.WorkerSupervisor to be temporary instead of transient (#1587)
RealtimeChannel already reacts to WorkerSupervisor going down to reconnect Having it transient can bring down other unrelated WorkerSupervisor in some cases
1 parent 53c98c6 commit 5290aa1

File tree

4 files changed

+23
-23
lines changed

4 files changed

+23
-23
lines changed

lib/extensions/postgres_cdc_rls/cdc_rls.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ defmodule Extensions.PostgresCdcRls do
9999
%{
100100
id: tenant,
101101
start: {Rls.WorkerSupervisor, :start_link, [args]},
102-
restart: :transient
102+
restart: :temporary
103103
}
104104
)
105105
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.56.1",
7+
version: "2.56.2",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

priv/repo/dev_seeds.exs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,30 +41,29 @@ default_db_host = "127.0.0.1"
4141
})
4242
|> Repo.insert!()
4343

44-
publication = "supabase_realtime"
45-
46-
[
47-
"drop publication if exists #{publication}",
48-
"drop table if exists public.test_tenant;",
49-
"create table public.test_tenant ( id SERIAL PRIMARY KEY, details text );",
50-
"grant all on table public.test_tenant to anon;",
51-
"grant all on table public.test_tenant to postgres;",
52-
"grant all on table public.test_tenant to authenticated;",
53-
"create publication #{publication} for table public.test_tenant"
54-
]
55-
|> Enum.each(&query!(Repo, &1, []))
56-
5744
tenant
5845
end)
5946

6047
# Reset Tenant DB
6148
settings = Database.from_tenant(tenant, "realtime_migrations", :stop)
6249
settings = %{settings | max_restarts: 0, ssl: false}
6350
{:ok, tenant_conn} = Database.connect_db(settings)
51+
publication = "supabase_realtime"
6452

6553
Postgrex.transaction(tenant_conn, fn db_conn ->
6654
Postgrex.query!(db_conn, "DROP SCHEMA IF EXISTS realtime CASCADE", [])
6755
Postgrex.query!(db_conn, "CREATE SCHEMA IF NOT EXISTS realtime", [])
56+
57+
[
58+
"drop publication if exists #{publication}",
59+
"drop table if exists public.test_tenant;",
60+
"create table public.test_tenant ( id SERIAL PRIMARY KEY, details text );",
61+
"grant all on table public.test_tenant to anon;",
62+
"grant all on table public.test_tenant to postgres;",
63+
"grant all on table public.test_tenant to authenticated;",
64+
"create publication #{publication} for table public.test_tenant"
65+
]
66+
|> Enum.each(&Postgrex.query!(db_conn, &1))
6867
end)
6968

7069
case Tenants.Migrations.run_migrations(tenant) do

test/realtime/extensions/cdc_rls/cdc_rls_test.exs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,21 +76,23 @@ defmodule Realtime.Extensions.CdcRlsTest do
7676
metadata = [metadata: subscription_metadata]
7777
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
7878

79+
RealtimeWeb.Endpoint.subscribe(PostgresCdcRls.syn_topic(tenant.external_id))
7980
# First time it will return nil
8081
PostgresCdcRls.handle_connect(args)
8182
# Wait for it to start
82-
Process.sleep(3000)
83+
assert_receive %{event: "ready"}, 1000
84+
85+
on_exit(fn -> PostgresCdcRls.handle_stop(external_id, 10_000) end)
8386
{:ok, response} = PostgresCdcRls.handle_connect(args)
8487

8588
# Now subscribe to the Postgres Changes
8689
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
8790

88-
on_exit(fn -> PostgresCdcRls.handle_stop(external_id, 10_000) end)
91+
RealtimeWeb.Endpoint.unsubscribe(PostgresCdcRls.syn_topic(tenant.external_id))
8992
%{tenant: tenant}
9093
end
9194

92-
@tag skip: "Flaky test. When logger handle_sasl_reports is enabled this test doesn't break"
93-
test "Check supervisor crash and respawn", %{tenant: tenant} do
95+
test "supervisor crash must not respawn", %{tenant: tenant} do
9496
sup =
9597
Enum.reduce_while(1..30, nil, fn _, acc ->
9698
:syn.lookup(Extensions.PostgresCdcRls, tenant.external_id)
@@ -112,12 +114,11 @@ defmodule Realtime.Extensions.CdcRlsTest do
112114
Process.exit(sup, :kill)
113115
assert_receive {:DOWN, _, :process, ^sup, _reason}, 5000
114116

115-
assert_receive %{event: "ready"}, 5000
117+
assert_receive %{event: "postgres_cdc_rls_down"}
116118

117-
{sup2, _} = :syn.lookup(Extensions.PostgresCdcRls, tenant.external_id)
119+
refute_receive %{event: "ready"}, 1000
118120

119-
assert(sup != sup2)
120-
assert Process.alive?(sup2)
121+
:undefined = :syn.lookup(Extensions.PostgresCdcRls, tenant.external_id)
121122
end
122123

123124
test "Subscription manager updates oids", %{tenant: tenant} do

0 commit comments

Comments
 (0)