Skip to content

Commit af60cc3

Browse files
committed
feat: add in use verification on handle ping
1 parent 65d448b commit af60cc3

File tree

2 files changed

+106
-14
lines changed

2 files changed

+106
-14
lines changed

lib/finch/http1/pool.ex

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -176,33 +176,33 @@ defmodule Finch.HTTP1.Pool do
176176
# to determine the correct pool module to use to make the request
177177
{:ok, _} = Registry.register(registry, shp, __MODULE__)
178178

179-
{:ok, {registry, shp, pool_idx, metric_ref, opts, System.monotonic_time(:millisecond)}}
179+
{:ok, {registry, shp, pool_idx, metric_ref, opts, init_activity_info()}}
180180
end
181181

182182
@impl NimblePool
183183
def init_worker(
184-
{_name, {scheme, host, port}, _pool_idx, _metric_ref, opts, _last_checkout_at} =
184+
{_name, {scheme, host, port}, _pool_idx, _metric_ref, opts, _actt_info} =
185185
pool_state
186186
) do
187187
{:ok, Conn.new(scheme, host, port, opts, self()), pool_state}
188188
end
189189

190190
@impl NimblePool
191191
def handle_checkout(:checkout, _, %{mint: nil} = conn, pool_state) do
192-
{_name, _shp, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state
192+
{_name, _shp, _pool_idx, metric_ref, _opts, _actt_info} = pool_state
193193
idle_time = System.monotonic_time() - conn.last_checkin
194194
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
195195
{:ok, {:fresh, conn, idle_time}, conn, pool_state}
196196
end
197197

198198
def handle_checkout(:checkout, _from, conn, pool_state) do
199199
idle_time = System.monotonic_time() - conn.last_checkin
200-
{_name, {scheme, host, port}, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state
200+
{_name, {scheme, host, port}, _pool_idx, metric_ref, _opts, _actt_info} = pool_state
201201

202202
with true <- Conn.reusable?(conn, idle_time),
203203
{:ok, conn} <- Conn.set_mode(conn, :passive) do
204204
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
205-
{:ok, {:reuse, conn, idle_time}, conn, update_last_checkout_ts(pool_state)}
205+
{:ok, {:reuse, conn, idle_time}, conn, update_activity_info(:checkout, pool_state)}
206206
else
207207
false ->
208208
meta = %{
@@ -225,15 +225,19 @@ defmodule Finch.HTTP1.Pool do
225225

226226
@impl NimblePool
227227
def handle_checkin(checkin, _from, _old_conn, pool_state) do
228-
{_name, _shp, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state
228+
{_name, _shp, _pool_idx, metric_ref, _opts, _actt_info} = pool_state
229229
PoolMetrics.maybe_add(metric_ref, in_use_connections: -1)
230230

231231
with {:ok, conn} <- checkin,
232232
{:ok, conn} <- Conn.set_mode(conn, :active) do
233-
{:ok, %{conn | last_checkin: System.monotonic_time()}, pool_state}
233+
{
234+
:ok,
235+
%{conn | last_checkin: System.monotonic_time()},
236+
update_activity_info(:checkin, pool_state)
237+
}
234238
else
235239
_ ->
236-
{:remove, :closed, pool_state}
240+
{:remove, :closed, update_activity_info(:checkin, pool_state)}
237241
end
238242
end
239243

@@ -253,14 +257,24 @@ defmodule Finch.HTTP1.Pool do
253257

254258
@impl NimblePool
255259
def handle_ping(conn, pool_state) do
256-
{_name, {scheme, host, port}, _pool_idx, _metric_ref, opts, last_checkout_ts} = pool_state
260+
{_name, {scheme, host, port}, _pool_idx, _metric_ref, opts, activity_info} = pool_state
257261

258262
max_idle_time = Map.get(opts, :pool_max_idle_time, :infinity)
259263
now = System.monotonic_time(:millisecond)
260-
diff_from_last_checkout = now - last_checkout_ts
264+
diff_from_last_checkout = now - activity_info.last_checkout_ts
265+
266+
is_idle? = diff_from_last_checkout > max_idle_time
267+
max_idle_time_configured? = is_number(max_idle_time)
268+
any_connection_in_use? = activity_info.in_use_count > 0
261269

262270
cond do
263-
is_number(max_idle_time) and diff_from_last_checkout > max_idle_time ->
271+
not max_idle_time_configured? ->
272+
{:ok, conn}
273+
274+
any_connection_in_use? ->
275+
{:ok, conn}
276+
277+
is_idle? ->
264278
meta = %{
265279
scheme: scheme,
266280
host: host,
@@ -285,7 +299,7 @@ defmodule Finch.HTTP1.Pool do
285299

286300
@impl NimblePool
287301
def handle_cancelled(:checked_out, pool_state) do
288-
{_name, _shp, _pool_idx, metric_ref, _opts, _last_checkout_ts} = pool_state
302+
{_name, _shp, _pool_idx, metric_ref, _opts, _actt_info} = pool_state
289303
PoolMetrics.maybe_add(metric_ref, in_use_connections: -1)
290304
:ok
291305
end
@@ -315,6 +329,22 @@ defmodule Finch.HTTP1.Pool do
315329
defp pool_idle_timeout(:infinity), do: nil
316330
defp pool_idle_timeout(pool_max_idle_time), do: pool_max_idle_time
317331

318-
defp update_last_checkout_ts(pool_state),
319-
do: put_elem(pool_state, 5, System.monotonic_time(:millisecond))
332+
defp init_activity_info(),
333+
do: %{in_use_count: 0, last_checkout_ts: System.monotonic_time(:millisecond)}
334+
335+
defp update_activity_info(:checkout, pool_state) do
336+
info = %{in_use_count: count} = elem(pool_state, 5)
337+
338+
put_elem(pool_state, 5, %{
339+
info
340+
| in_use_count: count + 1,
341+
last_checkout_ts: System.monotonic_time(:millisecond)
342+
})
343+
end
344+
345+
defp update_activity_info(:checkin, pool_state) do
346+
info = %{in_use_count: count} = elem(pool_state, 5)
347+
348+
put_elem(pool_state, 5, %{info | in_use_count: max(count - 1, 0)})
349+
end
320350
end

test/finch/http1/pool_test.exs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,68 @@ defmodule Finch.HTTP1.PoolTest do
125125
assert [] = DynamicSupervisor.which_children(:"#{finch_name}.PoolSupervisor")
126126
end
127127

128+
# @tag capture_log: true
129+
test "should not terminate if a connection is checked out", %{
130+
bypass: bypass,
131+
finch_name: finch_name
132+
} do
133+
parent = self()
134+
135+
start_supervised!(
136+
{Finch,
137+
name: finch_name,
138+
pools: %{
139+
default: [count: 1, size: 2, pool_max_idle_time: 100]
140+
}}
141+
)
142+
143+
Bypass.expect(bypass, fn conn ->
144+
{"delay", str_delay} =
145+
Enum.find(conn.req_headers, fn h -> match?({"delay", _}, h) end)
146+
147+
Process.sleep(String.to_integer(str_delay))
148+
Plug.Conn.send_resp(conn, 200, "OK")
149+
end)
150+
151+
delay_exec = fn ref, delay ->
152+
send(parent, {ref, :start})
153+
154+
resp =
155+
Finch.build(:get, endpoint(bypass), [{"delay", "#{delay}"}])
156+
|> Finch.request(finch_name)
157+
158+
send(parent, {ref, :done})
159+
160+
resp
161+
end
162+
163+
ref1 = make_ref()
164+
ref2 = make_ref()
165+
166+
Task.async(fn -> delay_exec.(ref1, 10) end)
167+
Task.async(fn -> delay_exec.(ref2, 10) end)
168+
169+
assert_receive {^ref1, :done}
170+
assert_receive {^ref2, :done}
171+
172+
assert [{pool, _pool_mod}] = Registry.lookup(finch_name, shp(bypass))
173+
174+
Process.monitor(pool)
175+
176+
ref2 = make_ref()
177+
Task.async(fn -> delay_exec.(ref2, 1000) end)
178+
179+
assert_receive {^ref2, :start}
180+
181+
refute_receive {:DOWN, _, :process, ^pool, {:shutdown, :idle_timeout}}, 1000
182+
183+
assert_receive {^ref2, :done}
184+
185+
assert_receive {:DOWN, _, :process, ^pool, {:shutdown, :idle_timeout}}, 200
186+
187+
assert [] = DynamicSupervisor.which_children(:"#{finch_name}.PoolSupervisor")
188+
end
189+
128190
describe "async_request" do
129191
@describetag bypass: false
130192

0 commit comments

Comments
 (0)