Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/integration/distributed_realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Realtime.Integration.DistributedRealtimeChannelTest do
setup do
tenant = Realtime.Api.get_tenant_by_external_id("dev_tenant")

Realtime.RateCounter.stop(tenant.external_id)
RateCounterHelper.stop(tenant.external_id)

Connect.shutdown(tenant.external_id)
# Sleeping so that syn can forget about this Connect process
Expand Down
16 changes: 8 additions & 8 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ defmodule Realtime.Integration.RtChannelTest do
alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.Integration.WebsocketClient
alias Realtime.RateCounter
alias Realtime.Tenants
alias Realtime.Tenants.Connect
alias Realtime.Tenants.ReplicationConnection
Expand Down Expand Up @@ -1768,7 +1767,7 @@ defmodule Realtime.Integration.RtChannelTest do
end

test "max_events_per_second limit respected", %{tenant: tenant, serializer: serializer} do
RateCounter.stop(tenant.external_id)
RateCounterHelper.stop(tenant.external_id)

log =
capture_log(fn ->
Expand Down Expand Up @@ -1842,6 +1841,7 @@ defmodule Realtime.Integration.RtChannelTest do

# Wait for RateCounter tick
Process.sleep(1000)

# These ones will be blocked
for _ <- 1..300 do
WebsocketClient.join(socket, realtime_topic, %{config: config})
Expand Down Expand Up @@ -1997,7 +1997,7 @@ defmodule Realtime.Integration.RtChannelTest do
start: {Agent, :start_link, [fn -> %{} end, [name: name]]}
})

RateCounter.stop(tenant.external_id)
RateCounterHelper.stop(tenant.external_id)
on_exit(fn -> :telemetry.detach({__MODULE__, tenant.external_id}) end)
:telemetry.attach_many({__MODULE__, tenant.external_id}, events, &__MODULE__.handle_telemetry/4, name)

Expand All @@ -2018,7 +2018,7 @@ defmodule Realtime.Integration.RtChannelTest do
assert_receive %Message{topic: ^topic, event: "system"}, 5000

# Wait for RateCounter to run
Process.sleep(2000)
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

# Expected billed
# 1 joins due to two sockets
Expand Down Expand Up @@ -2060,7 +2060,7 @@ defmodule Realtime.Integration.RtChannelTest do
end

# Wait for RateCounter to run
Process.sleep(2000)
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

# Expected billed
# 2 joins due to two sockets
Expand Down Expand Up @@ -2112,7 +2112,7 @@ defmodule Realtime.Integration.RtChannelTest do
assert_receive %Message{event: "presence_diff", payload: %{"joins" => _, "leaves" => %{}}, topic: ^topic}

# Wait for RateCounter to run
Process.sleep(2000)
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

# Expected billed
# 2 joins due to two sockets
Expand Down Expand Up @@ -2161,7 +2161,7 @@ defmodule Realtime.Integration.RtChannelTest do
end

# Wait for RateCounter to run
Process.sleep(2000)
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

# Expected billed
# 2 joins due to two sockets
Expand Down Expand Up @@ -2189,7 +2189,7 @@ defmodule Realtime.Integration.RtChannelTest do
assert_receive %Message{topic: ^topic, event: "system"}, 5000

# Wait for RateCounter to run
Process.sleep(2000)
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

# Expected billed
# 1 joins due to one socket
Expand Down
44 changes: 12 additions & 32 deletions test/realtime/extensions/cdc_rls/cdc_rls_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,10 @@ defmodule Realtime.Extensions.CdcRlsTest do
PostgresCdcRls.handle_after_connect({:manager_pid, self()}, postgres_extension, %{}, external_id)
end

Process.sleep(1200)

rate = Realtime.Tenants.subscription_errors_per_second_rate(external_id, 4)

assert {:ok, %RateCounter{id: {:channel, :subscription_errors, ^external_id}, sum: 6, limit: %{triggered: true}}} =
RateCounter.get(rate)
RateCounterHelper.tick!(rate)

# It won't even be called now
reject(&Subscriptions.create/5)
Expand Down Expand Up @@ -238,11 +236,6 @@ defmodule Realtime.Extensions.CdcRlsTest do

assert_receive {:socket_push, :text, data}, 5000

message =
data
|> IO.iodata_to_binary()
|> Jason.decode!()

assert %{
"event" => "postgres_changes",
"payload" => %{
Expand All @@ -259,14 +252,12 @@ defmodule Realtime.Extensions.CdcRlsTest do
},
"ref" => nil,
"topic" => "realtime:test"
} = message

# Wait for RateCounter to update
Process.sleep(2000)
} = Jason.decode!(data)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} =
RateCounterHelper.tick!(rate)

assert Enum.sum(bucket) == 1

Expand Down Expand Up @@ -297,15 +288,16 @@ defmodule Realtime.Extensions.CdcRlsTest do
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id)
assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

log =
capture_log(fn ->
# increment artifically the counter to reach the limit
tenant.external_id
|> Realtime.Tenants.db_events_per_second_key()
|> Realtime.GenCounter.add(100_000_000)

# Wait for RateCounter to update
Process.sleep(1500)
RateCounterHelper.tick!(rate)
end)

assert log =~ "MessagePerSecondRateLimitReached: Too many postgres changes messages per second"
Expand All @@ -315,13 +307,8 @@ defmodule Realtime.Extensions.CdcRlsTest do

refute_receive {:socket_push, :text, _}, 5000

# Wait for RateCounter to update
Process.sleep(2000)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket, limit: %{triggered: true}}} =
RateCounter.get(rate)
RateCounterHelper.tick!(rate)

# Nothing has changed
assert Enum.sum(bucket) == 100_000_000
Expand Down Expand Up @@ -371,11 +358,6 @@ defmodule Realtime.Extensions.CdcRlsTest do

assert_receive {:socket_push, :text, data}, 5000

message =
data
|> IO.iodata_to_binary()
|> Jason.decode!()

assert %{
"event" => "postgres_changes",
"payload" => %{
Expand All @@ -392,7 +374,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
},
"ref" => nil,
"topic" => "realtime:test"
} = message
} = Jason.decode!(data)

assert_receive {
:telemetry,
Expand Down Expand Up @@ -423,12 +405,10 @@ defmodule Realtime.Extensions.CdcRlsTest do
PostgresCdcRls.handle_after_connect({pid, pid}, postgres_extension, pg_change_params, external_id)
end

Process.sleep(1200)

rate = Realtime.Tenants.subscription_errors_per_second_rate(external_id, 4)

assert {:ok, %RateCounter{id: {:channel, :subscription_errors, ^external_id}, sum: 6, limit: %{triggered: true}}} =
RateCounter.get(rate)
RateCounterHelper.tick!(rate)

# It won't even be called now
reject(&Realtime.GenRpc.call/5)
Expand Down Expand Up @@ -464,8 +444,8 @@ defmodule Realtime.Extensions.CdcRlsTest do
Enum.each(queries, &Postgrex.query!(db_conn, &1, []))
end)

RateCounter.stop(tenant.external_id)
on_exit(fn -> RateCounter.stop(tenant.external_id) end)
RateCounterHelper.stop(tenant.external_id)
on_exit(fn -> RateCounterHelper.stop(tenant.external_id) end)

on_exit(fn -> :telemetry.detach(__MODULE__) end)

Expand Down
25 changes: 5 additions & 20 deletions test/realtime/extensions/cdc_rls/replication_poller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
},
500

# Wait for RateCounter to update
Process.sleep(1100)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

assert {:ok,
Expand All @@ -84,7 +81,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
measurement: :avg,
triggered: false
}
}} = RateCounter.get(rate)
}} = RateCounterHelper.tick!(rate)

assert sum == 0
end
Expand Down Expand Up @@ -133,11 +130,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
},
500

# Wait for RateCounter to update
Process.sleep(1100)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
assert sum == 2
end

Expand Down Expand Up @@ -183,11 +177,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
},
500

# Wait for RateCounter to update
Process.sleep(1100)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
assert sum == 2
end

Expand Down Expand Up @@ -236,11 +227,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
},
500

# Wait for RateCounter to update
Process.sleep(1100)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
assert sum == 2
end

Expand Down Expand Up @@ -300,11 +288,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
assert {node(), MapSet.new([sub1, sub3])} in node_subs
assert {:"[email protected]", MapSet.new([sub2])} in node_subs

# Wait for RateCounter to update
Process.sleep(1100)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
assert sum == 3
end
end
Expand Down
32 changes: 0 additions & 32 deletions test/realtime/rate_counter/rate_counter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -316,37 +316,5 @@ defmodule Realtime.RateCounterTest do
end
end

describe "stop/1" do
test "stops rate counters for a given entity" do
entity_id = Ecto.UUID.generate()
fake_terms = Enum.map(1..10, fn _ -> {:domain, :"metric_#{random_string()}", Ecto.UUID.generate()} end)
terms = Enum.map(1..10, fn _ -> {:domain, :"metric_#{random_string()}", entity_id} end)

for term <- terms do
args = %Args{id: term}
{:ok, _} = RateCounter.new(args)
assert {:ok, %RateCounter{}} = RateCounter.get(args)
end

for term <- fake_terms do
args = %Args{id: term}
{:ok, _} = RateCounter.new(args)
assert {:ok, %RateCounter{}} = RateCounter.get(args)
end

assert :ok = RateCounter.stop(entity_id)
# Wait for processes to shut down and Registry to update
Process.sleep(100)

for term <- terms do
assert [] = Registry.lookup(Realtime.Registry.Unique, {RateCounter, :rate_counter, term})
end

for term <- fake_terms do
assert [{_pid, _value}] = Registry.lookup(Realtime.Registry.Unique, {RateCounter, :rate_counter, term})
end
end
end

def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {event, measures, metadata})
end
17 changes: 10 additions & 7 deletions test/realtime/tenants/authorization_remote_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
Authorization.get_read_authorizations(%Policies{}, pid, context.authorization_context)
end

# Waiting for RateCounter to limit
Process.sleep(1100)
# Force RateCounter to tick
rate_counter = Realtime.Tenants.authorization_errors_per_second_rate(context.tenant)
RateCounterHelper.tick!(rate_counter)

for _ <- 1..10 do
{:error, :increase_connection_pool} =
Expand All @@ -127,8 +128,9 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
Authorization.get_write_authorizations(%Policies{}, pid, context.authorization_context)
end

# Waiting for RateCounter to limit
Process.sleep(1100)
# Force RateCounter to tick
rate_counter = Realtime.Tenants.authorization_errors_per_second_rate(context.tenant)
RateCounterHelper.tick!(rate_counter)

for _ <- 1..10 do
{:error, :increase_connection_pool} =
Expand Down Expand Up @@ -184,8 +186,9 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
end)

Task.await_many([t1, t2], 20_000)
# Wait for RateCounter to log
Process.sleep(1000)
# Force RateCounter to tick and log error
rate_counter = Realtime.Tenants.authorization_errors_per_second_rate(context.tenant)
RateCounterHelper.tick!(rate_counter)
end)

external_id = context.tenant.external_id
Expand Down Expand Up @@ -241,7 +244,7 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
Connect.shutdown("dev_tenant")
# Waiting for :syn to unregister
Process.sleep(100)
Realtime.RateCounter.stop("dev_tenant")
RateCounterHelper.stop("dev_tenant")

{:ok, local_db_conn} = Database.connect(tenant, "realtime_test", :stop)
topic = random_string()
Expand Down
Loading