Skip to content

Commit ef3ae47

Browse files
authored
chore: force RateCounter tick during tests (#1613)
We avoid waiting during tests and we ensure that RateCounter has processed the last gen counter changes
1 parent 30abb49 commit ef3ae47

File tree

12 files changed

+97
-129
lines changed

12 files changed

+97
-129
lines changed

test/integration/distributed_realtime_channel_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ defmodule Realtime.Integration.DistributedRealtimeChannelTest do
1212
setup do
1313
tenant = Realtime.Api.get_tenant_by_external_id("dev_tenant")
1414

15-
Realtime.RateCounter.stop(tenant.external_id)
15+
RateCounterHelper.stop(tenant.external_id)
1616

1717
Connect.shutdown(tenant.external_id)
1818
# Sleeping so that syn can forget about this Connect process

test/integration/rt_channel_test.exs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ defmodule Realtime.Integration.RtChannelTest do
1414
alias Realtime.Api.Tenant
1515
alias Realtime.Database
1616
alias Realtime.Integration.WebsocketClient
17-
alias Realtime.RateCounter
1817
alias Realtime.Tenants
1918
alias Realtime.Tenants.Connect
2019
alias Realtime.Tenants.ReplicationConnection
@@ -1768,7 +1767,7 @@ defmodule Realtime.Integration.RtChannelTest do
17681767
end
17691768

17701769
test "max_events_per_second limit respected", %{tenant: tenant, serializer: serializer} do
1771-
RateCounter.stop(tenant.external_id)
1770+
RateCounterHelper.stop(tenant.external_id)
17721771

17731772
log =
17741773
capture_log(fn ->
@@ -1842,6 +1841,7 @@ defmodule Realtime.Integration.RtChannelTest do
18421841

18431842
# Wait for RateCounter tick
18441843
Process.sleep(1000)
1844+
18451845
# These ones will be blocked
18461846
for _ <- 1..300 do
18471847
WebsocketClient.join(socket, realtime_topic, %{config: config})
@@ -1997,7 +1997,7 @@ defmodule Realtime.Integration.RtChannelTest do
19971997
start: {Agent, :start_link, [fn -> %{} end, [name: name]]}
19981998
})
19991999

2000-
RateCounter.stop(tenant.external_id)
2000+
RateCounterHelper.stop(tenant.external_id)
20012001
on_exit(fn -> :telemetry.detach({__MODULE__, tenant.external_id}) end)
20022002
:telemetry.attach_many({__MODULE__, tenant.external_id}, events, &__MODULE__.handle_telemetry/4, name)
20032003

@@ -2018,7 +2018,7 @@ defmodule Realtime.Integration.RtChannelTest do
20182018
assert_receive %Message{topic: ^topic, event: "system"}, 5000
20192019

20202020
# Wait for RateCounter to run
2021-
Process.sleep(2000)
2021+
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
20222022

20232023
# Expected billed
20242024
# 1 joins due to two sockets
@@ -2060,7 +2060,7 @@ defmodule Realtime.Integration.RtChannelTest do
20602060
end
20612061

20622062
# Wait for RateCounter to run
2063-
Process.sleep(2000)
2063+
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
20642064

20652065
# Expected billed
20662066
# 2 joins due to two sockets
@@ -2112,7 +2112,7 @@ defmodule Realtime.Integration.RtChannelTest do
21122112
assert_receive %Message{event: "presence_diff", payload: %{"joins" => _, "leaves" => %{}}, topic: ^topic}
21132113

21142114
# Wait for RateCounter to run
2115-
Process.sleep(2000)
2115+
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
21162116

21172117
# Expected billed
21182118
# 2 joins due to two sockets
@@ -2161,7 +2161,7 @@ defmodule Realtime.Integration.RtChannelTest do
21612161
end
21622162

21632163
# Wait for RateCounter to run
2164-
Process.sleep(2000)
2164+
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
21652165

21662166
# Expected billed
21672167
# 2 joins due to two sockets
@@ -2189,7 +2189,7 @@ defmodule Realtime.Integration.RtChannelTest do
21892189
assert_receive %Message{topic: ^topic, event: "system"}, 5000
21902190

21912191
# Wait for RateCounter to run
2192-
Process.sleep(2000)
2192+
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
21932193

21942194
# Expected billed
21952195
# 1 joins due to one socket

test/realtime/extensions/cdc_rls/cdc_rls_test.exs

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,10 @@ defmodule Realtime.Extensions.CdcRlsTest do
147147
PostgresCdcRls.handle_after_connect({:manager_pid, self()}, postgres_extension, %{}, external_id)
148148
end
149149

150-
Process.sleep(1200)
151-
152150
rate = Realtime.Tenants.subscription_errors_per_second_rate(external_id, 4)
153151

154152
assert {:ok, %RateCounter{id: {:channel, :subscription_errors, ^external_id}, sum: 6, limit: %{triggered: true}}} =
155-
RateCounter.get(rate)
153+
RateCounterHelper.tick!(rate)
156154

157155
# It won't even be called now
158156
reject(&Subscriptions.create/5)
@@ -238,11 +236,6 @@ defmodule Realtime.Extensions.CdcRlsTest do
238236

239237
assert_receive {:socket_push, :text, data}, 5000
240238

241-
message =
242-
data
243-
|> IO.iodata_to_binary()
244-
|> Jason.decode!()
245-
246239
assert %{
247240
"event" => "postgres_changes",
248241
"payload" => %{
@@ -259,14 +252,12 @@ defmodule Realtime.Extensions.CdcRlsTest do
259252
},
260253
"ref" => nil,
261254
"topic" => "realtime:test"
262-
} = message
263-
264-
# Wait for RateCounter to update
265-
Process.sleep(2000)
255+
} = Jason.decode!(data)
266256

267257
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
268258

269-
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
259+
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} =
260+
RateCounterHelper.tick!(rate)
270261

271262
assert Enum.sum(bucket) == 1
272263

@@ -297,15 +288,16 @@ defmodule Realtime.Extensions.CdcRlsTest do
297288
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id)
298289
assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
299290

291+
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
292+
300293
log =
301294
capture_log(fn ->
302295
# increment artifically the counter to reach the limit
303296
tenant.external_id
304297
|> Realtime.Tenants.db_events_per_second_key()
305298
|> Realtime.GenCounter.add(100_000_000)
306299

307-
# Wait for RateCounter to update
308-
Process.sleep(1500)
300+
RateCounterHelper.tick!(rate)
309301
end)
310302

311303
assert log =~ "MessagePerSecondRateLimitReached: Too many postgres changes messages per second"
@@ -315,13 +307,8 @@ defmodule Realtime.Extensions.CdcRlsTest do
315307

316308
refute_receive {:socket_push, :text, _}, 5000
317309

318-
# Wait for RateCounter to update
319-
Process.sleep(2000)
320-
321-
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
322-
323310
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket, limit: %{triggered: true}}} =
324-
RateCounter.get(rate)
311+
RateCounterHelper.tick!(rate)
325312

326313
# Nothing has changed
327314
assert Enum.sum(bucket) == 100_000_000
@@ -371,11 +358,6 @@ defmodule Realtime.Extensions.CdcRlsTest do
371358

372359
assert_receive {:socket_push, :text, data}, 5000
373360

374-
message =
375-
data
376-
|> IO.iodata_to_binary()
377-
|> Jason.decode!()
378-
379361
assert %{
380362
"event" => "postgres_changes",
381363
"payload" => %{
@@ -392,7 +374,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
392374
},
393375
"ref" => nil,
394376
"topic" => "realtime:test"
395-
} = message
377+
} = Jason.decode!(data)
396378

397379
assert_receive {
398380
:telemetry,
@@ -423,12 +405,10 @@ defmodule Realtime.Extensions.CdcRlsTest do
423405
PostgresCdcRls.handle_after_connect({pid, pid}, postgres_extension, pg_change_params, external_id)
424406
end
425407

426-
Process.sleep(1200)
427-
428408
rate = Realtime.Tenants.subscription_errors_per_second_rate(external_id, 4)
429409

430410
assert {:ok, %RateCounter{id: {:channel, :subscription_errors, ^external_id}, sum: 6, limit: %{triggered: true}}} =
431-
RateCounter.get(rate)
411+
RateCounterHelper.tick!(rate)
432412

433413
# It won't even be called now
434414
reject(&Realtime.GenRpc.call/5)
@@ -464,8 +444,8 @@ defmodule Realtime.Extensions.CdcRlsTest do
464444
Enum.each(queries, &Postgrex.query!(db_conn, &1, []))
465445
end)
466446

467-
RateCounter.stop(tenant.external_id)
468-
on_exit(fn -> RateCounter.stop(tenant.external_id) end)
447+
RateCounterHelper.stop(tenant.external_id)
448+
on_exit(fn -> RateCounterHelper.stop(tenant.external_id) end)
469449

470450
on_exit(fn -> :telemetry.detach(__MODULE__) end)
471451

test/realtime/extensions/cdc_rls/replication_poller_test.exs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
7171
},
7272
500
7373

74-
# Wait for RateCounter to update
75-
Process.sleep(1100)
76-
7774
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
7875

7976
assert {:ok,
@@ -84,7 +81,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
8481
measurement: :avg,
8582
triggered: false
8683
}
87-
}} = RateCounter.get(rate)
84+
}} = RateCounterHelper.tick!(rate)
8885

8986
assert sum == 0
9087
end
@@ -133,11 +130,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
133130
},
134131
500
135132

136-
# Wait for RateCounter to update
137-
Process.sleep(1100)
138-
139133
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
140-
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
134+
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
141135
assert sum == 2
142136
end
143137

@@ -183,11 +177,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
183177
},
184178
500
185179

186-
# Wait for RateCounter to update
187-
Process.sleep(1100)
188-
189180
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
190-
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
181+
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
191182
assert sum == 2
192183
end
193184

@@ -236,11 +227,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
236227
},
237228
500
238229

239-
# Wait for RateCounter to update
240-
Process.sleep(1100)
241-
242230
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
243-
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
231+
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
244232
assert sum == 2
245233
end
246234

@@ -300,11 +288,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
300288
assert {node(), MapSet.new([sub1, sub3])} in node_subs
301289
assert {:"[email protected]", MapSet.new([sub2])} in node_subs
302290

303-
# Wait for RateCounter to update
304-
Process.sleep(1100)
305-
306291
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
307-
assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate)
292+
assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate)
308293
assert sum == 3
309294
end
310295
end

test/realtime/rate_counter/rate_counter_test.exs

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -316,37 +316,5 @@ defmodule Realtime.RateCounterTest do
316316
end
317317
end
318318

319-
describe "stop/1" do
320-
test "stops rate counters for a given entity" do
321-
entity_id = Ecto.UUID.generate()
322-
fake_terms = Enum.map(1..10, fn _ -> {:domain, :"metric_#{random_string()}", Ecto.UUID.generate()} end)
323-
terms = Enum.map(1..10, fn _ -> {:domain, :"metric_#{random_string()}", entity_id} end)
324-
325-
for term <- terms do
326-
args = %Args{id: term}
327-
{:ok, _} = RateCounter.new(args)
328-
assert {:ok, %RateCounter{}} = RateCounter.get(args)
329-
end
330-
331-
for term <- fake_terms do
332-
args = %Args{id: term}
333-
{:ok, _} = RateCounter.new(args)
334-
assert {:ok, %RateCounter{}} = RateCounter.get(args)
335-
end
336-
337-
assert :ok = RateCounter.stop(entity_id)
338-
# Wait for processes to shut down and Registry to update
339-
Process.sleep(100)
340-
341-
for term <- terms do
342-
assert [] = Registry.lookup(Realtime.Registry.Unique, {RateCounter, :rate_counter, term})
343-
end
344-
345-
for term <- fake_terms do
346-
assert [{_pid, _value}] = Registry.lookup(Realtime.Registry.Unique, {RateCounter, :rate_counter, term})
347-
end
348-
end
349-
end
350-
351319
def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {event, measures, metadata})
352320
end

test/realtime/tenants/authorization_remote_test.exs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,9 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
100100
Authorization.get_read_authorizations(%Policies{}, pid, context.authorization_context)
101101
end
102102

103-
# Waiting for RateCounter to limit
104-
Process.sleep(1100)
103+
# Force RateCounter to tick
104+
rate_counter = Realtime.Tenants.authorization_errors_per_second_rate(context.tenant)
105+
RateCounterHelper.tick!(rate_counter)
105106

106107
for _ <- 1..10 do
107108
{:error, :increase_connection_pool} =
@@ -127,8 +128,9 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
127128
Authorization.get_write_authorizations(%Policies{}, pid, context.authorization_context)
128129
end
129130

130-
# Waiting for RateCounter to limit
131-
Process.sleep(1100)
131+
# Force RateCounter to tick
132+
rate_counter = Realtime.Tenants.authorization_errors_per_second_rate(context.tenant)
133+
RateCounterHelper.tick!(rate_counter)
132134

133135
for _ <- 1..10 do
134136
{:error, :increase_connection_pool} =
@@ -184,8 +186,9 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
184186
end)
185187

186188
Task.await_many([t1, t2], 20_000)
187-
# Wait for RateCounter to log
188-
Process.sleep(1000)
189+
# Force RateCounter to tick and log error
190+
rate_counter = Realtime.Tenants.authorization_errors_per_second_rate(context.tenant)
191+
RateCounterHelper.tick!(rate_counter)
189192
end)
190193

191194
external_id = context.tenant.external_id
@@ -241,7 +244,7 @@ defmodule Realtime.Tenants.AuthorizationRemoteTest do
241244
Connect.shutdown("dev_tenant")
242245
# Waiting for :syn to unregister
243246
Process.sleep(100)
244-
Realtime.RateCounter.stop("dev_tenant")
247+
RateCounterHelper.stop("dev_tenant")
245248

246249
{:ok, local_db_conn} = Database.connect(tenant, "realtime_test", :stop)
247250
topic = random_string()

0 commit comments

Comments
 (0)