Skip to content

Commit b33501a

Browse files
committed
fix: invalid PG changes subscriptions should not retry
* Table that does not exist * Realtime publication does not exist * Column does not exist
1 parent 0423825 commit b33501a

File tree

7 files changed

+127
-16
lines changed

7 files changed

+127
-16
lines changed

lib/extensions/postgres_cdc_rls/cdc_rls.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ defmodule Extensions.PostgresCdcRls do
4949
{:cont, {:ok, [%{id: params.id, claims: params.claims, subscription_params: subscription_params} | acc]}}
5050

5151
{:error, reason} ->
52-
{:halt, {:error, :malformed_subscription_params, reason}}
52+
{:halt, {:error, {:malformed_subscription_params, reason}}}
5353
end
5454
end)
5555
end

lib/extensions/postgres_cdc_rls/subscriptions.ex

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
1515

1616
@spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
1717
{:ok, Postgrex.Result.t()}
18-
| {:error, Exception.t() | :malformed_subscription_params | {:subscription_insert_failed, map()}}
18+
| {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}}
19+
1920
def create(conn, publication, subscription_list, manager, caller) do
2021
sql = "with sub_tables as (
2122
select
@@ -63,16 +64,20 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
6364
msg =
6465
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
6566

66-
rollback(conn, msg)
67+
rollback(conn, {:subscription_insert_failed, msg})
6768

6869
{:error, exception} ->
6970
msg =
7071
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
7172

72-
rollback(conn, msg)
73+
rollback(conn, {:subscription_insert_failed, msg})
7374
end
7475
end)
7576
end)
77+
rescue
78+
e -> {:error, e}
79+
catch
80+
:exit, reason -> {:error, {:exit, reason}}
7681
end
7782

7883
defp params_to_log({schema, table, filters}) do

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ defmodule RealtimeWeb.RealtimeChannel do
297297
push_system_message("postgres_changes", socket, "ok", message, channel_name)
298298
{:noreply, assign(socket, :pg_sub_ref, nil)}
299299

300-
{:error, :malformed_subscription_params, error} ->
300+
{:error, {reason, error}} when reason in [:malformed_subscription_params, :subscription_insert_failed] ->
301301
maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)
302302
push_system_message("postgres_changes", socket, "error", error, channel_name)
303303
# No point in retrying if the params are invalid

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.57.1",
7+
version: "2.57.2",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

priv/repo/dev_seeds.exs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import Ecto.Adapters.SQL, only: [query!: 3]
2-
31
alias Realtime.Api.Tenant
42
alias Realtime.Database
53
alias Realtime.Repo

test/realtime/extensions/cdc_rls/subscriptions_test.exs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,35 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do
4444
%Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
4545
end
4646

47+
test "publication does not exist", %{conn: conn} do
48+
{:ok, subscription_params} = Subscriptions.parse_subscription_params(%{"schema" => "public", "table" => "test"})
49+
50+
subscription_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
51+
52+
Postgrex.query!(conn, "drop publication if exists supabase_realtime_test", [])
53+
54+
assert {:error,
55+
{:subscription_insert_failed,
56+
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [schema: public, table: test, filters: []]"}} =
57+
Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self())
58+
59+
%Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
60+
end
61+
62+
test "table does not exist", %{conn: conn} do
63+
{:ok, subscription_params} =
64+
Subscriptions.parse_subscription_params(%{"schema" => "public", "table" => "doesnotexist"})
65+
66+
subscription_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
67+
68+
assert {:error,
69+
{:subscription_insert_failed,
70+
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [schema: public, table: doesnotexist, filters: []]"}} =
71+
Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self())
72+
73+
%Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
74+
end
75+
4776
test "column does not exist", %{conn: conn} do
4877
{:ok, subscription_params} =
4978
Subscriptions.parse_subscription_params(%{
@@ -55,7 +84,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do
5584
subscription_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
5685

5786
assert {:error,
58-
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"subject\", \"eq\", \"hey\"}]]. Exception: ERROR P0001 (raise_exception) invalid column for filter subject"} =
87+
{:subscription_insert_failed,
88+
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"subject\", \"eq\", \"hey\"}]]. Exception: ERROR P0001 (raise_exception) invalid column for filter subject"}} =
5989
Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self())
6090

6191
%Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
@@ -72,11 +102,34 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do
72102
subscription_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
73103

74104
assert {:error,
75-
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"id\", \"eq\", \"hey\"}]]. Exception: ERROR 22P02 (invalid_text_representation) invalid input syntax for type integer: \"hey\""} =
105+
{:subscription_insert_failed,
106+
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"id\", \"eq\", \"hey\"}]]. Exception: ERROR 22P02 (invalid_text_representation) invalid input syntax for type integer: \"hey\""}} =
76107
Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self())
77108

78109
%Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
79110
end
111+
112+
test "connection error" do
113+
{:ok, subscription_params} =
114+
Subscriptions.parse_subscription_params(%{"schema" => "public", "table" => "test"})
115+
116+
subscription_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
117+
conn = spawn(fn -> :ok end)
118+
119+
assert {:error, {:exit, _}} =
120+
Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self())
121+
end
122+
123+
test "timeout", %{conn: conn} do
124+
{:ok, subscription_params} = Subscriptions.parse_subscription_params(%{"schema" => "public", "table" => "test"})
125+
126+
Task.start(fn -> Postgrex.query!(conn, "SELECT pg_sleep(20)", []) end)
127+
128+
subscription_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
129+
130+
assert {:error, %DBConnection.ConnectionError{reason: :queue_timeout}} =
131+
Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self())
132+
end
80133
end
81134

82135
describe "delete_all/1" do

test/realtime_web/channels/realtime_channel_test.exs

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
6969
reply
7070

7171
assert_push "system",
72-
%{
73-
message: "Subscribed to PostgreSQL",
74-
status: "ok",
75-
extension: "postgres_changes",
76-
channel: "test"
77-
},
72+
%{message: "Subscribed to PostgreSQL", status: "ok", extension: "postgres_changes", channel: "test"},
7873
3000
7974

8075
{:ok, conn} = Connect.lookup_or_start_connection(tenant.external_id)
@@ -125,6 +120,66 @@ defmodule RealtimeWeb.RealtimeChannelTest do
125120
# It won't re-subscribe
126121
assert socket.assigns.pg_sub_ref == nil
127122
end
123+
124+
test "invalid subscription table does not exist", %{tenant: tenant} do
125+
jwt = Generators.generate_jwt_token(tenant)
126+
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))
127+
128+
config = %{
129+
"presence" => %{"enabled" => false},
130+
"postgres_changes" => [%{"event" => "*", "schema" => "public", "table" => "doesnotexist"}]
131+
}
132+
133+
assert {:ok, reply, socket} = subscribe_and_join(socket, "realtime:test", %{"config" => config})
134+
135+
assert %{postgres_changes: [%{"event" => "*", "schema" => "public", "table" => "doesnotexist"}]} = reply
136+
137+
assert_push "system",
138+
%{
139+
message:
140+
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [schema: public, table: doesnotexist, filters: []]",
141+
status: "error",
142+
extension: "postgres_changes",
143+
channel: "test"
144+
},
145+
3000
146+
147+
socket = Server.socket(socket.channel_pid)
148+
149+
# It won't re-subscribe
150+
assert socket.assigns.pg_sub_ref == nil
151+
end
152+
153+
test "invalid subscription column does not exist", %{tenant: tenant} do
154+
jwt = Generators.generate_jwt_token(tenant)
155+
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))
156+
157+
config = %{
158+
"presence" => %{"enabled" => false},
159+
"postgres_changes" => [
160+
%{"event" => "*", "schema" => "public", "table" => "test", "filter" => "notacolumn=eq.123"}
161+
]
162+
}
163+
164+
assert {:ok, reply, socket} = subscribe_and_join(socket, "realtime:test", %{"config" => config})
165+
166+
assert %{postgres_changes: [%{"event" => "*", "schema" => "public", "table" => "test"}]} = reply
167+
168+
assert_push "system",
169+
%{
170+
message:
171+
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"notacolumn\", \"eq\", \"123\"}]]. Exception: ERROR P0001 (raise_exception) invalid column for filter notacolumn",
172+
status: "error",
173+
extension: "postgres_changes",
174+
channel: "test"
175+
},
176+
3000
177+
178+
socket = Server.socket(socket.channel_pid)
179+
180+
# It won't re-subscribe
181+
assert socket.assigns.pg_sub_ref == nil
182+
end
128183
end
129184

130185
describe "broadcast" do

0 commit comments

Comments
 (0)