Skip to content

Commit b18184b

Browse files
authored
feat: rate counter for failing auth due to PG connection errors (#1508)
* Change RateCounter to allow for not only average but sum limits * Change Authorization to rate limit based on the amount of Connection errors we get when calling verifying channel authorisation
1 parent d8fba86 commit b18184b

File tree

12 files changed

+491
-143
lines changed

12 files changed

+491
-143
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ This is the list of operational codes that can help you understand your deployme
239239
| ReconnectSubscribeToPostgres | Postgres changes still waiting to be subscribed |
240240
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
241241
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
242-
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
243-
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients as reached the channel limits |
242+
| ClientJoinRateLimitReached | The rate of joins per second from your clients has reached the channel limits |
243+
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients has reached the channel limits |
244244
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
245245
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
246246
| DatabaseLackOfConnections | Realtime was not able to connect to the tenant's database due to not having enough available connections |

lib/realtime/database.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ defmodule Realtime.Database do
186186
e ->
187187
log_error("ErrorExecutingTransaction", e, metadata)
188188
{:error, e}
189+
catch
190+
:exit, reason ->
191+
log_error("ErrorExecutingTransaction", reason, metadata)
192+
{:error, {:exit, reason}}
189193
end
190194

191195
@spec connect_db(__MODULE__.t()) :: {:ok, pid()} | {:error, any()}

lib/realtime/rate_counter/rate_counter.ex

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,40 +28,43 @@ defmodule Realtime.RateCounter do
2828

2929
defstruct id: nil,
3030
avg: 0.0,
31+
sum: 0,
3132
bucket: [],
3233
max_bucket_len: @max_bucket_len,
3334
tick: @tick,
3435
tick_ref: nil,
3536
idle_shutdown: @idle_shutdown,
3637
idle_shutdown_ref: nil,
3738
limit: %{log: false},
38-
telemetry: %{
39-
event_name: [@app_name] ++ [:rate_counter],
40-
measurements: %{sum: 0},
41-
metadata: %{}
42-
}
39+
telemetry: %{emit: false}
4340

4441
@type t :: %__MODULE__{
4542
id: term(),
4643
avg: float(),
44+
sum: non_neg_integer(),
4745
bucket: list(),
4846
max_bucket_len: integer(),
4947
tick: integer(),
50-
tick_ref: reference(),
48+
tick_ref: reference() | nil,
5149
idle_shutdown: integer() | :infinity,
52-
idle_shutdown_ref: reference(),
53-
limit: %{
54-
log: boolean(),
55-
value: integer(),
56-
triggered: boolean(),
57-
log_fn: (-> term())
58-
},
59-
telemetry: %{
60-
emit: false,
61-
event_name: :telemetry.event_name(),
62-
measurements: :telemetry.event_measurements(),
63-
metadata: :telemetry.event_metadata()
64-
}
50+
idle_shutdown_ref: reference() | nil,
51+
limit:
52+
%{log: false}
53+
| %{
54+
log: true,
55+
value: integer(),
56+
measurement: :sum | :avg,
57+
triggered: boolean(),
58+
log_fn: (-> term())
59+
},
60+
telemetry:
61+
%{emit: false}
62+
| %{
63+
emit: true,
64+
event_name: :telemetry.event_name(),
65+
measurements: :telemetry.event_measurements(),
66+
metadata: :telemetry.event_metadata()
67+
}
6568
}
6669

6770
@spec start_link([keyword()]) :: {:ok, pid()} | {:error, {:already_started, pid()}}
@@ -166,8 +169,9 @@ defmodule Realtime.RateCounter do
166169
if limit_opts do
167170
%{
168171
log: true,
169-
value: limit_opts[:value],
170-
log_fn: limit_opts[:log_fn],
172+
value: Keyword.fetch!(limit_opts, :value),
173+
measurement: Keyword.fetch!(limit_opts, :measurement),
174+
log_fn: Keyword.fetch!(limit_opts, :log_fn),
171175
triggered: false
172176
}
173177
else
@@ -211,12 +215,10 @@ defmodule Realtime.RateCounter do
211215
bucket = [count | state.bucket] |> Enum.take(state.max_bucket_len)
212216
bucket_len = Enum.count(bucket)
213217

214-
avg =
215-
bucket
216-
|> Enum.sum()
217-
|> Kernel./(bucket_len)
218+
sum = Enum.sum(bucket)
219+
avg = sum / bucket_len
218220

219-
state = %{state | bucket: bucket, avg: avg}
221+
state = %{state | bucket: bucket, sum: sum, avg: avg}
220222

221223
state = maybe_trigger_limit(state)
222224
tick(state.tick)
@@ -252,18 +254,18 @@ defmodule Realtime.RateCounter do
252254

253255
defp maybe_trigger_limit(%{limit: %{log: false}} = state), do: state
254256

255-
defp maybe_trigger_limit(%{limit: %{triggered: true}} = state) do
257+
defp maybe_trigger_limit(%{limit: %{triggered: true, measurement: measurement}} = state) do
256258
# Limit has been triggered, but we need to check if it is still above the limit
257-
if state.avg < state.limit.value do
259+
if Map.fetch!(state, measurement) < state.limit.value do
258260
%{state | limit: %{state.limit | triggered: false}}
259261
else
260262
# Limit is still above the threshold, so we keep the state as is
261263
state
262264
end
263265
end
264266

265-
defp maybe_trigger_limit(state) do
266-
if state.avg >= state.limit.value do
267+
defp maybe_trigger_limit(%{limit: %{measurement: measurement}} = state) do
268+
if Map.fetch!(state, measurement) >= state.limit.value do
267269
state.limit.log_fn.()
268270

269271
%{state | limit: %{state.limit | triggered: true}}

lib/realtime/tenants.ex

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ defmodule Realtime.Tenants do
154154
},
155155
limit: [
156156
value: max_joins_per_second,
157+
measurement: :avg,
157158
log_fn: fn ->
158159
Logger.critical("ClientJoinRateLimitReached: Too many joins per second",
159160
external_id: tenant_id,
@@ -199,6 +200,7 @@ defmodule Realtime.Tenants do
199200
},
200201
limit: [
201202
value: max_events_per_second,
203+
measurement: :avg,
202204
log: true,
203205
log_fn: fn ->
204206
Logger.error("MessagePerSecondRateLimitReached: Too many messages per second",
@@ -277,6 +279,7 @@ defmodule Realtime.Tenants do
277279
},
278280
limit: [
279281
value: max_presence_events_per_second,
282+
measurement: :avg,
280283
log_fn: fn ->
281284
Logger.error("PresenceRateLimitReached: Too many presence events per second",
282285
external_id: tenant_id,
@@ -306,6 +309,31 @@ defmodule Realtime.Tenants do
306309
{:channel, :presence_events, tenant.external_id}
307310
end
308311

312+
@spec authorization_errors_per_second_rate(Tenant.t()) :: RateCounter.Args.t()
313+
def authorization_errors_per_second_rate(%Tenant{external_id: external_id} = tenant) do
314+
opts = [
315+
max_bucket_len: 30,
316+
limit: [
317+
value: pool_size(tenant),
318+
measurement: :sum,
319+
log_fn: fn ->
320+
Logger.critical("IncreaseConnectionPool: Too many database timeouts",
321+
external_id: external_id,
322+
project: external_id
323+
)
324+
end
325+
]
326+
]
327+
328+
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
329+
end
330+
331+
defp pool_size(%{extensions: [%{settings: settings} | _]}) do
332+
Database.pool_size_by_application_name("realtime_connect", settings)
333+
end
334+
335+
defp pool_size(_), do: 1
336+
309337
@spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list
310338
def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do
311339
nodes = [Node.self() | Node.list()]

lib/realtime/tenants/authorization.ex

Lines changed: 93 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ defmodule Realtime.Tenants.Authorization do
1313

1414
alias DBConnection.ConnectionError
1515
alias Realtime.Api.Message
16+
alias Realtime.Api.Tenant
1617
alias Realtime.Database
17-
alias Realtime.Repo
18+
alias Realtime.GenCounter
1819
alias Realtime.GenRpc
20+
alias Realtime.Repo
1921
alias Realtime.Tenants.Authorization.Policies
2022

2123
defstruct [:tenant_id, :topic, :headers, :jwt, :claims, :role, :sub]
@@ -60,26 +62,42 @@ defmodule Realtime.Tenants.Authorization do
6062
@spec get_read_authorizations(Policies.t(), pid(), t()) ::
6163
{:ok, Policies.t()} | {:error, any()} | {:error, :rls_policy_error, any()}
6264
def get_read_authorizations(policies, db_conn, authorization_context) when node() == node(db_conn) do
63-
case get_read_policies_for_connection(db_conn, authorization_context, policies) do
64-
{:ok, %Policies{} = policies} -> {:ok, policies}
65-
{:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error}
66-
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
67-
{:error, error} -> {:error, error}
65+
rate_counter = rate_counter(authorization_context.tenant_id)
66+
67+
if rate_counter.limit.triggered == false do
68+
db_conn
69+
|> get_read_policies_for_connection(authorization_context, policies)
70+
|> handle_policies_result(rate_counter)
71+
else
72+
{:error, :increase_connection_pool}
6873
end
6974
end
7075

7176
# Remote call
7277
def get_read_authorizations(policies, db_conn, authorization_context) do
73-
case GenRpc.call(
74-
node(db_conn),
75-
__MODULE__,
76-
:get_read_authorizations,
77-
[policies, db_conn, authorization_context],
78-
tenant_id: authorization_context.tenant_id,
79-
key: authorization_context.tenant_id
80-
) do
81-
{:error, :rpc_error, reason} -> {:error, reason}
82-
response -> response
78+
rate_counter = rate_counter(authorization_context.tenant_id)
79+
80+
if rate_counter.limit.triggered == false do
81+
case GenRpc.call(
82+
node(db_conn),
83+
__MODULE__,
84+
:get_read_authorizations,
85+
[policies, db_conn, authorization_context],
86+
tenant_id: authorization_context.tenant_id,
87+
key: authorization_context.tenant_id
88+
) do
89+
{:error, :increase_connection_pool} = error ->
90+
GenCounter.add(rate_counter.id)
91+
error
92+
93+
{:error, :rpc_error, reason} ->
94+
{:error, reason}
95+
96+
response ->
97+
response
98+
end
99+
else
100+
{:error, :increase_connection_pool}
83101
end
84102
end
85103

@@ -91,33 +109,70 @@ defmodule Realtime.Tenants.Authorization do
91109
@spec get_write_authorizations(Policies.t(), pid(), __MODULE__.t()) ::
92110
{:ok, Policies.t()} | {:error, any()} | {:error, :rls_policy_error, any()}
93111
def get_write_authorizations(policies, db_conn, authorization_context) when node() == node(db_conn) do
94-
case get_write_policies_for_connection(db_conn, authorization_context, policies) do
95-
{:ok, %Policies{} = policies} -> {:ok, policies}
96-
{:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error}
97-
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
98-
{:error, error} -> {:error, error}
112+
rate_counter = rate_counter(authorization_context.tenant_id)
113+
114+
if rate_counter.limit.triggered == false do
115+
db_conn
116+
|> get_write_policies_for_connection(authorization_context, policies)
117+
|> handle_policies_result(rate_counter)
118+
else
119+
{:error, :increase_connection_pool}
99120
end
100121
end
101122

102123
# Remote call
103124
def get_write_authorizations(policies, db_conn, authorization_context) do
104-
case GenRpc.call(
105-
node(db_conn),
106-
__MODULE__,
107-
:get_write_authorizations,
108-
[policies, db_conn, authorization_context],
109-
tenant_id: authorization_context.tenant_id,
110-
key: authorization_context.tenant_id
111-
) do
112-
{:error, :rpc_error, reason} -> {:error, reason}
113-
response -> response
125+
rate_counter = rate_counter(authorization_context.tenant_id)
126+
127+
if rate_counter.limit.triggered == false do
128+
case GenRpc.call(
129+
node(db_conn),
130+
__MODULE__,
131+
:get_write_authorizations,
132+
[policies, db_conn, authorization_context],
133+
tenant_id: authorization_context.tenant_id,
134+
key: authorization_context.tenant_id
135+
) do
136+
{:error, :increase_connection_pool} = error ->
137+
GenCounter.add(rate_counter.id)
138+
error
139+
140+
{:error, :rpc_error, reason} ->
141+
{:error, reason}
142+
143+
response ->
144+
response
145+
end
146+
else
147+
{:error, :increase_connection_pool}
114148
end
115149
end
116150

117151
def get_write_authorizations(db_conn, authorization_context) do
118152
get_write_authorizations(%Policies{}, db_conn, authorization_context)
119153
end
120154

155+
defp handle_policies_result(result, rate_counter) do
156+
case result do
157+
{:ok, %Policies{} = policies} ->
158+
{:ok, policies}
159+
160+
{:ok, {:error, %Postgrex.Error{} = error}} ->
161+
{:error, :rls_policy_error, error}
162+
163+
{:error, %ConnectionError{reason: :queue_timeout}} ->
164+
GenCounter.add(rate_counter.id)
165+
{:error, :increase_connection_pool}
166+
167+
{:error, {:exit, _}} ->
168+
GenCounter.add(rate_counter.id)
169+
{:error, :increase_connection_pool}
170+
171+
{:error, error} ->
172+
{:error, error}
173+
end
174+
end
175+
121176
@doc """
122177
Sets the current connection configuration with the following config values:
123178
* role: The role of the user
@@ -282,4 +337,11 @@ defmodule Realtime.Tenants.Authorization do
282337
e
283338
end
284339
end
340+
341+
defp rate_counter(tenant_id) do
342+
%Tenant{} = tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)
343+
rate_counter = Realtime.Tenants.authorization_errors_per_second_rate(tenant)
344+
{:ok, rate_counter} = Realtime.RateCounter.get(rate_counter)
345+
rate_counter
346+
end
285347
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,12 @@ defmodule RealtimeWeb.RealtimeChannel do
205205
end
206206

207207
@impl true
208-
def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do
208+
def handle_info(:update_rate_counter, socket) do
209209
count(socket)
210210

211211
{:ok, rate_counter} = RateCounter.get(socket.assigns.rate_counter)
212212

213-
if rate_counter.avg > max do
213+
if rate_counter.limit.triggered do
214214
message = "Too many messages per second"
215215
shutdown_response(socket, message)
216216
else
@@ -490,11 +490,11 @@ defmodule RealtimeWeb.RealtimeChannel do
490490
RateCounter.new(rate_args)
491491

492492
case RateCounter.get(rate_args) do
493-
{:ok, %{avg: avg}} when avg < limits.max_joins_per_second ->
493+
{:ok, %{limit: %{triggered: false}}} ->
494494
GenCounter.add(rate_args.id)
495495
:ok
496496

497-
{:ok, %{avg: _}} ->
497+
{:ok, %{limit: %{triggered: true}}} ->
498498
{:error, :too_many_joins}
499499

500500
error ->

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.44.1",
7+
version: "2.45.0",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

0 commit comments

Comments
 (0)