Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ jobs:
run: mix ecto.migrate --log-migrator-sql
- name: Run database tenant migrations
run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
- name: Check for warnings
run: mix compile --force --warnings-as-errors
- name: Run format check
run: mix format --check-formatted
- name: Credo checks
Expand Down
6 changes: 1 addition & 5 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5)
regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)

no_channel_timeout_in_ms =
if config_env() == :test,
do: :timer.seconds(3),
else: Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10))
no_channel_timeout_in_ms = Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10))

if !(db_version in [nil, "ipv6", "ipv4"]),
do: raise("Invalid IP version, please set either ipv6 or ipv4")
Expand Down
60 changes: 52 additions & 8 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ defmodule Extensions.PostgresCdcRls do
@behaviour Realtime.PostgresCdc
use Realtime.Logs

alias RealtimeWeb.Endpoint
alias Extensions.PostgresCdcRls, as: Rls
alias Rls.Subscriptions
alias Realtime.GenCounter
alias Realtime.GenRpc
alias RealtimeWeb.Endpoint
alias Rls.Subscriptions

@impl true
@spec handle_connect(map()) :: {:ok, {pid(), pid()}} | nil
Expand All @@ -30,18 +31,61 @@ defmodule Extensions.PostgresCdcRls do
@impl true
def handle_after_connect({manager_pid, conn}, settings, params_list, tenant) do
with {:ok, subscription_list} <- subscription_list(params_list) do
pool_size = Map.get(settings, "subcriber_pool_size", 4)
publication = settings["publication"]
opts = [conn, publication, subscription_list, manager_pid, self()]
conn_node = node(conn)
create_subscription(conn, tenant, publication, pool_size, subscription_list, manager_pid, self())
end
end

if conn_node !== node() do
GenRpc.call(conn_node, Subscriptions, :create, opts, timeout: 15_000, tenant_id: tenant)
else
apply(Subscriptions, :create, opts)
@database_timeout_reason "Too many database timeouts"

def create_subscription(conn, tenant, publication, pool_size, subscription_list, manager_pid, caller)
when node(conn) == node() do
rate_counter = rate_counter(tenant, pool_size)

if rate_counter.limit.triggered == false do
case Subscriptions.create(conn, publication, subscription_list, manager_pid, caller) do
{:error, %DBConnection.ConnectionError{}} ->
GenCounter.add(rate_counter.id)
{:error, @database_timeout_reason}

{:error, {:exit, _}} ->
GenCounter.add(rate_counter.id)
{:error, @database_timeout_reason}

response ->
response
end
else
{:error, @database_timeout_reason}
end
end

def create_subscription(conn, tenant, publication, pool_size, subscription_list, manager_pid, caller) do
rate_counter = rate_counter(tenant, pool_size)

if rate_counter.limit.triggered == false do
args = [conn, tenant, publication, pool_size, subscription_list, manager_pid, caller]

case GenRpc.call(node(conn), __MODULE__, :create_subscription, args, timeout: 15_000, tenant_id: tenant) do
{:error, @database_timeout_reason} ->
GenCounter.add(rate_counter.id)
{:error, @database_timeout_reason}

response ->
response
end
else
{:error, @database_timeout_reason}
end
end

defp rate_counter(tenant_id, pool_size) do
rate_counter_args = Realtime.Tenants.subscription_errors_per_second_rate(tenant_id, pool_size)
{:ok, rate_counter} = Realtime.RateCounter.get(rate_counter_args)
rate_counter
end

defp subscription_list(params_list) do
Enum.reduce_while(params_list, {:ok, []}, fn params, {:ok, acc} ->
case Subscriptions.parse_subscription_params(params[:params]) do
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/rate_counter/rate_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Realtime.RateCounter do
defstruct id: nil, opts: []
end

@idle_shutdown :timer.minutes(15)
@idle_shutdown :timer.minutes(10)
@tick :timer.seconds(1)
@max_bucket_len 60
@cache __MODULE__
Expand Down
5 changes: 3 additions & 2 deletions lib/realtime/syn_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Realtime.SynHandler do

@behaviour :syn_event_handler

@postgres_cdc_scope_prefix PostgresCdc.syn_topic_prefix()

@impl true
def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do
# Update that a database connection is ready
Expand All @@ -19,7 +21,7 @@ defmodule Realtime.SynHandler do
scope = Atom.to_string(scope)

case scope do
"realtime_postgres_cdc_" <> _ ->
@postgres_cdc_scope_prefix <> _ ->
Endpoint.local_broadcast(PostgresCdc.syn_topic(tenant_id), "ready", meta)

_ ->
Expand All @@ -36,7 +38,6 @@ defmodule Realtime.SynHandler do
We want to log conflict resolutions to know when more than one process on the cluster
was started, and subsequently stopped because :syn handled the conflict.
"""
@postgres_cdc_scope_prefix PostgresCdc.syn_topic_prefix()
@impl true
def on_process_unregistered(scope, name, pid, _meta, reason) do
case Atom.to_string(scope) do
Expand Down
25 changes: 22 additions & 3 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ defmodule Realtime.Tenants do
opts = [
max_bucket_len: 30,
limit: [
value: pool_size(tenant),
value: authorization_pool_size(tenant),
measurement: :sum,
log_fn: fn ->
Logger.critical("IncreaseConnectionPool: Too many database timeouts",
Expand All @@ -354,6 +354,25 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
end

@spec subscription_errors_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
def subscription_errors_per_second_rate(tenant_id, pool_size) do
opts = [
max_bucket_len: 30,
limit: [
value: pool_size,
measurement: :sum,
log_fn: fn ->
Logger.error("IncreaseSubscriptionConnectionPool: Too many database timeouts",
external_id: tenant_id,
project: tenant_id
)
end
]
]

%RateCounter.Args{id: {:channel, :subscription_errors, tenant_id}, opts: opts}
end

@connect_errors_per_second_default 10
@doc "RateCounter arguments for counting connect per second."
@spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
Expand All @@ -380,11 +399,11 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts}
end

defp pool_size(%{extensions: [%{settings: settings} | _]}) do
defp authorization_pool_size(%{extensions: [%{settings: settings} | _]}) do
Database.pool_size_by_application_name("realtime_connect", settings)
end

defp pool_size(_), do: 1
defp authorization_pool_size(_), do: 1

@spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list
def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do
Expand Down
14 changes: 7 additions & 7 deletions lib/realtime_web/controllers/fallback_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ defmodule RealtimeWeb.FallbackController do
|> render("error.json", changeset: changeset)
end

def call(conn, {:error, _}) do
conn
|> put_status(:unauthorized)
|> put_view(RealtimeWeb.ErrorView)
|> render("error.json", message: "Unauthorized")
end

def call(conn, {:error, status, message}) when is_atom(status) and is_binary(message) do
log_error("UnprocessableEntity", message)

Expand All @@ -57,6 +50,13 @@ defmodule RealtimeWeb.FallbackController do
|> render("error.json", changeset: changeset)
end

def call(conn, {:error, _}) do
conn
|> put_status(:unauthorized)
|> put_view(RealtimeWeb.ErrorView)
|> render("error.json", message: "Unauthorized")
end

def call(conn, %Ecto.Changeset{valid?: false} = changeset) do
log_error(
"UnprocessableEntity",
Expand Down
3 changes: 2 additions & 1 deletion test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2295,8 +2295,9 @@ defmodule Realtime.Integration.RtChannelTest do
assert_receive %Message{topic: ^topic, event: "phx_close"}, 500
end

start_supervised!({Tracker, check_interval_in_ms: 100})
# wait to trigger tracker
assert_process_down(socket, 5000)
assert_process_down(socket, 1000)
assert [] = Tracker.list_pids()
end

Expand Down
Loading
Loading