Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ where
// again while TTL expiry or a concurrent refetch is changing it.
let fetched_overlay = Arc::clone(cache_entry.startup_overlay.map());
let fetched_overlay_hash = cache_entry.startup_overlay.hash();
let mut pool = create_dynamic_pool(
let (mut pool, init_guard) = create_dynamic_pool(
pool_name,
username,
backend_auth,
Expand Down Expand Up @@ -1049,17 +1049,13 @@ where
} = &err
{
error!("[{username}@{pool_name}] auth_query passthrough: PG rejected operator-supplied startup parameter: {pg_message}");
// Invalidate before dropping the pool. A
// concurrent reconnect that races us between
// these two calls would otherwise read the
// still-cached bad overlay, rebuild the same
// dynamic pool against it, and trigger the
// same rejection. Invalidating first guarantees
// any racing get_or_fetch refetches before
// reconstructing the pool.
// Invalidate the cache so concurrent reconnects
// see the new (or, if user fixes the row, fixed)
// entry instead of the still-cached bad overlay.
// The pool entry itself is removed by
// `init_guard` falling out of scope without a
// `commit`.
cache.invalidate(username);
let identifier = crate::pool::PoolIdentifier::new(pool_name, username);
crate::pool::drop_dynamic_pool(&identifier);
error_response(write, pg_message, sqlstate).await?;
return Err(err);
}
Expand All @@ -1074,6 +1070,10 @@ where
}
};

// First connection established — release the guard so GC
// resumes normal behavior for this pool.
init_guard.commit();

aq_state.stats.auth_success.fetch_add(1, Ordering::Relaxed);
info!("[{username}@{pool_name}] auth_query: authenticated (passthrough mode)");

Expand Down
14 changes: 8 additions & 6 deletions src/pool/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! and garbage-collected when idle. On RELOAD, dynamic pools are dropped and recreated
//! on the next client connection with fresh settings.

use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;

use log::{debug, info, warn};
Expand Down Expand Up @@ -37,7 +37,7 @@ pub fn create_dynamic_pool(
backend_auth: Option<BackendAuthMethod>,
fetched_overlay: Arc<std::collections::HashMap<String, String>>,
fetched_overlay_hash: u64,
) -> Result<ConnectionPool, Error> {
) -> Result<(ConnectionPool, super::PoolInitGuard), Error> {
// Fast path: pool already exists. The cache-side refetch path
// already drops the live pool when an auth_query refetch changes
// the overlay (see `drop_dynamic_pool_if_overlay_drifted`), but a
Expand Down Expand Up @@ -68,7 +68,7 @@ pub fn create_dynamic_pool(
*ba_lock.write() = new_ba.clone();
}
}
return Ok(existing);
return Ok((existing, super::PoolInitGuard::already_committed()));
}
if super::drop_dynamic_pool(&identifier) {
info!(
Expand Down Expand Up @@ -268,7 +268,7 @@ pub fn create_dynamic_pool(
check_query_cache: Arc::new(CheckQueryCache::new()),
coordinator: get_coordinator(pool_name),
replenish_failures: Arc::new(AtomicU32::new(0)),
created_at: std::time::Instant::now(),
init_complete: Arc::new(AtomicBool::new(false)),
};

// Atomic insert into POOLS
Expand Down Expand Up @@ -300,7 +300,7 @@ pub fn create_dynamic_pool(
*ba_lock.write() = new_ba.read().clone();
}
}
return Ok(existing.clone());
return Ok((existing.clone(), super::PoolInitGuard::already_committed()));
}
info!(
"[{username}@{pool_name}] auth_query: per-user startup_parameters overlay drift on slow-path race — replacing concurrently-built pool"
Expand Down Expand Up @@ -348,7 +348,9 @@ pub fn create_dynamic_pool(
.fetch_add(1, Ordering::Relaxed);
}

Ok(conn_pool)
let guard =
super::PoolInitGuard::for_new_pool(identifier, Arc::clone(&conn_pool.init_complete));
Ok((conn_pool, guard))
}

/// Decide whether `create_dynamic_pool` should replace an existing
Expand Down
154 changes: 137 additions & 17 deletions src/pool/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;

use log::{debug, info};

use super::{PoolIdentifier, AUTH_QUERY_STATE, DYNAMIC_POOLS, POOLS};
use super::{ConnectionPool, PoolIdentifier, AUTH_QUERY_STATE, DYNAMIC_POOLS, POOLS};

/// Spawn a background task that periodically removes idle dynamic pools.
/// Dynamic pools are created by auth_query passthrough mode — one per user.
Expand Down Expand Up @@ -34,22 +34,7 @@ fn gc_idle_dynamic_pools() {
for id in &dynamic_ids {
match pools.get(id) {
Some(pool) if pool.pool_state().size == 0 => {
// Don't GC paused pools — they're under admin control
if pool.database.is_paused() {
debug!("[{id}] GC: paused, skipping");
continue;
}
// Don't GC pools with min_pool_size — retain cycle manages them
if pool.settings.user.min_pool_size.unwrap_or(0) > 0 {
debug!("[{id}] GC: min_pool_size > 0, skipping despite size=0");
continue;
}
// Grace period: allow-mode retry does two sequential TCP connects
// (plain + TLS), each needing 1-2 RTT for handshake. Over WAN
// this totals ~1s. 2s = 2x worst case.
let age = pool.created_at.elapsed();
if age < std::time::Duration::from_secs(2) {
debug!("[{id}] GC: pool age {age:?} < 2s, skipping");
if !should_gc_idle_pool(pool, id) {
continue;
}
debug!("[{id}] GC: 0 connections, marking for removal");
Expand Down Expand Up @@ -102,3 +87,138 @@ fn gc_idle_dynamic_pools() {
.join(", ")
);
}

/// Decide whether an idle dynamic pool (`pool_state().size == 0`) is
/// eligible for removal during this GC sweep. A pool is kept when it is
/// paused (admin control), when it has a `min_pool_size` (retain cycle
/// is responsible), or when its first server connection is still being
/// established (`init_complete == false`). The last case is the race
/// fix for issue #209 — `PoolInitGuard::commit` is what flips the flag
/// to `true` after `get_server_parameters` succeeds, and any guard
/// dropped without `commit` has already removed the pool entry by the
/// time the next sweep observes the map.
fn should_gc_idle_pool(pool: &ConnectionPool, id: &PoolIdentifier) -> bool {
if pool.database.is_paused() {
debug!("[{id}] GC: paused, skipping");
return false;
}
if pool.settings.user.min_pool_size.unwrap_or(0) > 0 {
debug!("[{id}] GC: min_pool_size > 0, skipping despite size=0");
return false;
}
if !pool.init_complete.load(Ordering::Acquire) {
debug!("[{id}] GC: init not complete, skipping");
return false;
}
true
}

#[cfg(test)]
mod tests {
use super::*;
use crate::config::{Address, PoolMode, User};
use crate::pool::{CheckQueryCache, Pool, PoolSettings, ServerParameters, ServerPool};
use dashmap::DashMap;
use std::sync::atomic::{AtomicBool, AtomicU32};

fn pool(init_complete: bool, min_pool_size: u32) -> ConnectionPool {
let server_pool = ServerPool::new(
Address::default(),
User {
min_pool_size: if min_pool_size == 0 {
None
} else {
Some(min_pool_size)
},
..User::default()
},
"test_db",
Arc::new(DashMap::new()),
false,
false,
0,
"test_app".to_string(),
1,
60_000,
60_000,
60_000,
Duration::from_secs(5),
Duration::from_secs(5),
false,
None,
Arc::new(std::collections::BTreeMap::new()),
Arc::new(std::collections::BTreeMap::new()),
);
let database = Pool::builder(server_pool)
.pool_name("test_db".to_string())
.username("test_user".to_string())
.build();
ConnectionPool {
database,
address: Address::default(),
original_server_parameters: Arc::new(tokio::sync::Mutex::new(ServerParameters::new())),
settings: PoolSettings {
pool_mode: PoolMode::Transaction,
user: User {
min_pool_size: if min_pool_size == 0 {
None
} else {
Some(min_pool_size)
},
..User::default()
},
db: "test_db".to_string(),
idle_timeout_ms: 60_000,
life_time_ms: 60_000,
sync_server_parameters: false,
min_guaranteed_pool_size: 0,
},
config_hash: 0,
per_user_startup_overlay_hash: crate::pool::empty_overlay_hash(),
prepared_statement_cache: None,
check_query_cache: Arc::new(CheckQueryCache::new()),
coordinator: None,
replenish_failures: Arc::new(AtomicU32::new(0)),
init_complete: Arc::new(AtomicBool::new(init_complete)),
}
}

#[test]
fn idle_pool_with_completed_init_is_collected() {
// Regular case: pool finished initializing, drained to zero, GC should reap it.
let id = PoolIdentifier::new("test_db", "test_user");
let p = pool(true, 0);
assert!(should_gc_idle_pool(&p, &id));
}

#[test]
fn idle_pool_still_initializing_is_skipped() {
// Issue #209: GC must not reap a pool whose first server connection
// is still being established. Without this check the next login
// observes "No pool configured" and the connection is dropped.
let id = PoolIdentifier::new("test_db", "test_user");
let p = pool(false, 0);
assert!(!should_gc_idle_pool(&p, &id));
}

#[test]
fn pool_with_min_pool_size_is_skipped() {
// The retain cycle keeps `min_pool_size` connections warm; GC must
// never compete with it on the same pool.
let id = PoolIdentifier::new("test_db", "test_user");
let p = pool(true, 5);
assert!(!should_gc_idle_pool(&p, &id));
}

#[test]
fn flipping_init_complete_makes_pool_eligible() {
// Same pool object, different observable behavior before and after
// `PoolInitGuard::commit` runs. Concretizes the contract between the
// guard and the GC sweep.
let id = PoolIdentifier::new("test_db", "test_user");
let p = pool(false, 0);
assert!(!should_gc_idle_pool(&p, &id));
p.init_complete.store(true, Ordering::Release);
assert!(should_gc_idle_pool(&p, &id));
}
}
94 changes: 94 additions & 0 deletions src/pool/init_guard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! RAII handle that closes the GC race on freshly created dynamic pools.
//!
//! A dynamic pool is inserted into `POOLS` before its first server
//! connection exists, so `pool_state().size == 0`. Without this guard
//! the next GC sweep could remove the pool while the caller is still
//! inside `get_server_parameters()` for the very first connection.
//!
//! The guard owns the pool's `init_complete` flag. `commit` flips the
//! flag after the first connection is established and disables the
//! `Drop` cleanup. If the guard is dropped without `commit` — typically
//! because the auth flow failed or panicked between insertion and the
//! first checkout — `Drop` removes the pool entry from `POOLS` so the
//! next login rebuilds the pool from scratch.
//!
//! `PoolInitGuard::already_committed()` is a no-op variant returned from
//! `create_dynamic_pool` when an existing pool is reused. Drop on it is
//! a no-op; `commit` is harmless.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use super::PoolIdentifier;

pub struct PoolInitGuard {
identifier: Option<PoolIdentifier>,
init_complete: Arc<AtomicBool>,
committed: bool,
}

impl PoolInitGuard {
/// Build a guard for a freshly inserted dynamic pool. The caller is
/// responsible for calling `commit` after the first connection has
/// been successfully established. Until then GC will skip the pool
/// because of its `init_complete == false` flag.
pub fn for_new_pool(identifier: PoolIdentifier, init_complete: Arc<AtomicBool>) -> Self {
Self {
identifier: Some(identifier),
init_complete,
committed: false,
}
}

/// Build a no-op guard for the early-return path in
/// `create_dynamic_pool` (existing pool reused). `Drop` does
/// nothing; `commit` is harmless.
pub fn already_committed() -> Self {
Self {
identifier: None,
init_complete: Arc::new(AtomicBool::new(true)),
committed: true,
}
}

/// Mark the pool as fully initialized. After this call the pool is
/// subject to normal GC rules. Consumes the guard so it cannot be
/// committed twice or dropped after commit.
pub fn commit(mut self) {
self.init_complete.store(true, Ordering::Release);
self.committed = true;
}
}

impl Drop for PoolInitGuard {
fn drop(&mut self) {
if self.committed {
return;
}
if let Some(id) = self.identifier.take() {
crate::pool::drop_dynamic_pool(&id);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn already_committed_leaves_flag_true_and_does_nothing_on_drop() {
let guard = PoolInitGuard::already_committed();
assert!(guard.init_complete.load(Ordering::Acquire));
drop(guard);
}

#[test]
fn commit_flips_flag_and_blocks_drop_cleanup() {
let id = PoolIdentifier::new("db", "user");
let flag = Arc::new(AtomicBool::new(false));
let guard = PoolInitGuard::for_new_pool(id, Arc::clone(&flag));
assert!(!flag.load(Ordering::Acquire));
guard.commit();
assert!(flag.load(Ordering::Acquire));
}
}
Loading
Loading