From da80095bc4e6777991f5e20328008e91f1c03573 Mon Sep 17 00:00:00 2001 From: Serhii Tatarintsev Date: Mon, 7 Oct 2024 13:07:54 +0200 Subject: [PATCH 1/2] fix: Ensure connection pool metrics stay consistent This started as a bugfix to several prisma issues regarding incorrect metrics: https://github.com/prisma/prisma/issues/25177 https://github.com/prisma/prisma/issues/23525 And a couple of more discovered the testing, but not reported in the issues. There were several causes for this: 1. Following pattern appears quite a lot in mobc code: ```rust gauge!("something").increment(1.0); do_a_thing_that_could_fail()?; gauge!("something").decrement(1.0); ``` So, in case `do_a_thing_that_could_fail` actually fails, gauge will get incremented, but never will get decremented. 2. Couple of metrics were relying on `Conn::close` being manually called and that was not the case every once in a while. To prevent both of those problems, I rewrote the internals of library to rely on RAII rather than manual counters and resources management. `Conn` struct is now split into two: - `ActiveConn` - represents currently checked out connection that have been actively used by the client. Holds onto semaphore permit and can be converted into `IdleConn`. Doing so will free the permit. - `IdleConn` - represents idle connection, currently checked into the pool. Can be converted to `ActiveConn` by providing a valid permit. `ConnState` represents the shared state of the connection that is retained between different activity states. Both `IdleConn` and `ActiveConn` manage their corresponding gauges - increment them on creation and decrement them during drop. `ConnState` manages `CONNECTIONS_OPEN` gauge and `CONNECTIONS_TOTAL` and `CLOSED_TOTAL` counters in the same way. This system ensures that metrics stay consistent: since metrics are automatically incremented and decremented on state conversions, we can always be sure that: - Connection is always either idle or active, there is no in between state. - Idle connections and active connections gauges will always add up to the currently open connections gauge - Total connections open counter, minus total connections closed gauge will always be equal to number of currently open connections. Since resources are now managed by `Drop::drop` implementations, that removes the need for manual `close` method and simiplifies the code quite in a few places, also ensuring it is safer against future changes. --- Cargo.toml | 2 +- src/conn.rs | 183 +++++++++++++++++++++++++++++++++ src/lib.rs | 238 ++++++++++++++----------------------------- src/metrics_utils.rs | 45 +++++++- tests/mobc.rs | 8 -- 5 files changed, 303 insertions(+), 173 deletions(-) create mode 100644 src/conn.rs diff --git a/Cargo.toml b/Cargo.toml index 8f56fe4..70587d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ async-trait = "0.1" futures-timer = "3.0.2" log = "0.4" thiserror = "1.0" -metrics = "0.22.1" +metrics = "0.23.0" tracing = { version = "0.1", features = ["attributes"] } tracing-subscriber = "0.3.11" diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 0000000..457fba4 --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,183 @@ +use std::{ + marker::PhantomData, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use metrics::counter; +use tokio::sync::OwnedSemaphorePermit; + +use crate::metrics_utils::{ + GaugeGuard, ACTIVE_CONNECTIONS, CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, OPEN_CONNECTIONS, +}; + +pub(crate) struct ActiveConn { + inner: C, + state: ConnState, + _permit: OwnedSemaphorePermit, + _active_connections_gauge: GaugeGuard, +} + +impl ActiveConn { + pub(crate) fn new(inner: C, permit: OwnedSemaphorePermit, state: ConnState) -> ActiveConn { + Self { + inner, + state, + _permit: permit, + _active_connections_gauge: GaugeGuard::increment(ACTIVE_CONNECTIONS), + } + } + + pub(crate) fn into_idle(self) -> IdleConn { + IdleConn { + inner: self.inner, + state: self.state, + _idle_connections_gauge: GaugeGuard::increment(IDLE_CONNECTIONS), + } + } + + pub(crate) fn is_brand_new(&self) -> bool { + self.state.brand_new + } + + pub(crate) fn set_brand_new(&mut self, brand_new: bool) { + self.state.brand_new = brand_new + } + + pub(crate) fn into_raw(self) -> C { + self.inner + } + + pub(crate) fn as_raw_ref(&self) -> &C { + &self.inner + } + + pub(crate) fn as_raw_mut(&mut self) -> &mut C { + &mut self.inner + } +} + +pub(crate) struct IdleConn { + inner: C, + state: ConnState, + _idle_connections_gauge: GaugeGuard, +} + +impl IdleConn { + pub(crate) fn is_brand_new(&self) -> bool { + self.state.brand_new + } + + pub(crate) fn into_active(self, permit: OwnedSemaphorePermit) -> ActiveConn { + ActiveConn::new(self.inner, permit, self.state) + } + + pub(crate) fn created_at(&self) -> Instant { + self.state.created_at + } + + pub(crate) fn expired(&self, timeout: Option) -> bool { + timeout + .and_then(|check_interval| { + Instant::now() + .checked_duration_since(self.state.created_at) + .map(|dur_since| dur_since >= check_interval) + }) + .unwrap_or(false) + } + + pub(crate) fn idle_expired(&self, timeout: Option) -> bool { + timeout + .and_then(|check_interval| { + Instant::now() + .checked_duration_since(self.state.last_used_at) + .map(|dur_since| dur_since >= check_interval) + }) + .unwrap_or(false) + } + + pub(crate) fn needs_health_check(&self, timeout: Option) -> bool { + timeout + .and_then(|check_interval| { + Instant::now() + .checked_duration_since(self.state.last_checked_at) + .map(|dur_since| dur_since >= check_interval) + }) + .unwrap_or(true) + } + + pub(crate) fn mark_checked(&mut self) { + self.state.last_checked_at = Instant::now() + } + + pub(crate) fn split_raw(self) -> (C, ConnSplit) { + ( + self.inner, + ConnSplit::new(self.state, self._idle_connections_gauge), + ) + } +} + +pub(crate) struct ConnState { + pub(crate) created_at: Instant, + pub(crate) last_used_at: Instant, + pub(crate) last_checked_at: Instant, + pub(crate) brand_new: bool, + total_connections_open: Arc, + total_connections_closed: Arc, + _open_connections_gauge: GaugeGuard, +} + +impl ConnState { + pub(crate) fn new( + total_connections_open: Arc, + total_connections_closed: Arc, + ) -> Self { + counter!(OPENED_TOTAL).increment(1); + Self { + created_at: Instant::now(), + last_used_at: Instant::now(), + last_checked_at: Instant::now(), + brand_new: true, + total_connections_open, + total_connections_closed, + _open_connections_gauge: GaugeGuard::increment(OPEN_CONNECTIONS), + } + } +} + +impl Drop for ConnState { + fn drop(&mut self) { + self.total_connections_open.fetch_sub(1, Ordering::Relaxed); + self.total_connections_closed + .fetch_add(1, Ordering::Relaxed); + counter!(CLOSED_TOTAL).increment(1); + } +} + +pub(crate) struct ConnSplit { + state: ConnState, + gauge: GaugeGuard, + _phantom: PhantomData, +} + +impl ConnSplit { + fn new(state: ConnState, gauge: GaugeGuard) -> Self { + Self { + state, + gauge, + _phantom: PhantomData, + } + } + + pub(crate) fn restore(self, raw: C) -> IdleConn { + IdleConn { + inner: raw, + state: self.state, + _idle_connections_gauge: self.gauge, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index c22128e..d185434 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ #![recursion_limit = "256"] mod config; +mod conn; mod error; mod metrics_utils; #[cfg(feature = "unstable")] @@ -94,13 +95,15 @@ pub use error::Error; pub use async_trait::async_trait; pub use config::Builder; use config::{Config, InternalConfig, ShareConfig}; +use conn::{ActiveConn, ConnState, IdleConn}; use futures_channel::mpsc::{self, Receiver, Sender}; use futures_util::lock::{Mutex, MutexGuard}; use futures_util::select; use futures_util::FutureExt; use futures_util::SinkExt; use futures_util::StreamExt; -use metrics::{counter, gauge, histogram}; +use metrics::gauge; +use metrics_utils::DurationHistogramGuard; pub use spawn::spawn; use std::fmt; use std::future::Future; @@ -112,11 +115,9 @@ use std::sync::{ use std::time::{Duration, Instant}; #[doc(hidden)] pub use time::{delay_for, interval}; -use tokio::sync::Semaphore; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use metrics_utils::{ACTIVE_CONNECTIONS, WAIT_COUNT, WAIT_DURATION}; - -use crate::metrics_utils::{CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, OPEN_CONNECTIONS}; +use crate::metrics_utils::{GaugeGuard, IDLE_CONNECTIONS, WAIT_COUNT, WAIT_DURATION}; const CONNECTION_REQUEST_QUEUE_SIZE: usize = 10000; @@ -156,75 +157,26 @@ pub trait Manager: Send + Sync + 'static { struct SharedPool { config: ShareConfig, manager: M, - internals: Mutex>, + internals: Mutex>, state: PoolState, - semaphore: Semaphore, -} - -struct Conn { - raw: Option, - #[allow(dead_code)] - last_err: Mutex>, - created_at: Instant, - last_used_at: Instant, - last_checked_at: Instant, - brand_new: bool, + semaphore: Arc, } -impl Conn { - fn close(&self, state: &PoolState) { - state.num_open.fetch_sub(1, Ordering::Relaxed); - state.max_idle_closed.fetch_add(1, Ordering::Relaxed); - gauge!(OPEN_CONNECTIONS).decrement(1.0); - counter!(CLOSED_TOTAL).increment(1); - } - - fn expired(&self, timeout: Option) -> bool { - timeout - .and_then(|check_interval| { - Instant::now() - .checked_duration_since(self.created_at) - .map(|dur_since| dur_since >= check_interval) - }) - .unwrap_or(false) - } - - fn idle_expired(&self, timeout: Option) -> bool { - timeout - .and_then(|check_interval| { - Instant::now() - .checked_duration_since(self.last_used_at) - .map(|dur_since| dur_since >= check_interval) - }) - .unwrap_or(false) - } - - fn needs_health_check(&self, timeout: Option) -> bool { - timeout - .and_then(|check_interval| { - Instant::now() - .checked_duration_since(self.last_checked_at) - .map(|dur_since| dur_since >= check_interval) - }) - .unwrap_or(true) - } -} - -struct PoolInternals { +struct PoolInternals { config: InternalConfig, - free_conns: Vec>, + free_conns: Vec>, wait_duration: Duration, cleaner_ch: Option>, } struct PoolState { - num_open: AtomicU64, + num_open: Arc, max_lifetime_closed: AtomicU64, - max_idle_closed: AtomicU64, + max_idle_closed: Arc, wait_count: AtomicU64, } -impl Drop for PoolInternals { +impl Drop for PoolInternals { fn drop(&mut self) { log::debug!("Pool internal drop"); } @@ -316,11 +268,7 @@ impl Pool { let max_idle = internals.config.max_idle as usize; // Treat max_idle == 0 as unlimited if max_idle > 0 && internals.free_conns.len() > max_idle { - let closing = internals.free_conns.split_off(max_idle); - drop(internals); - for conn in closing { - conn.close(&self.0.state); - } + internals.free_conns.truncate(max_idle); } } @@ -385,17 +333,17 @@ impl Pool { }); let pool_state = PoolState { - num_open: AtomicU64::new(0), + num_open: Arc::new(AtomicU64::new(0)), max_lifetime_closed: AtomicU64::new(0), wait_count: AtomicU64::new(0), - max_idle_closed: AtomicU64::new(0), + max_idle_closed: Arc::new(AtomicU64::new(0)), }; let shared = Arc::new(SharedPool { config: share_config, manager, internals, - semaphore: Semaphore::new(max_open), + semaphore: Arc::new(Semaphore::new(max_open)), state: pool_state, }); @@ -439,62 +387,54 @@ impl Pool { } async fn get_connection(&self) -> Result, Error> { - let mut c = self.get_or_create_conn().await?; - c.last_used_at = Instant::now(); + let _guard = GaugeGuard::increment(WAIT_COUNT); + let c = self.get_or_create_conn().await?; let conn = Connection { - pool: Some(self.clone()), + pool: self.clone(), conn: Some(c), }; - gauge!(ACTIVE_CONNECTIONS).increment(1.0); - gauge!(WAIT_COUNT).decrement(1.0); - Ok(conn) } async fn validate_conn( &self, internal_config: InternalConfig, - conn: &mut Conn, - ) -> bool { - if conn.brand_new { - return true; + conn: IdleConn, + ) -> Option> { + if conn.is_brand_new() { + return Some(conn); } if conn.expired(internal_config.max_lifetime) { - return false; + return None; } if conn.idle_expired(internal_config.max_idle_lifetime) { - return false; + return None; } let needs_health_check = self.0.config.health_check && conn.needs_health_check(self.0.config.health_check_interval); if needs_health_check { - let raw = conn.raw.take().unwrap(); - match self.0.manager.check(raw).await { - Ok(raw) => { - conn.last_checked_at = Instant::now(); - conn.raw = Some(raw) - } - Err(_e) => return false, - } + let (raw, split) = conn.split_raw(); + let checked_raw = self.0.manager.check(raw).await.ok()?; + let mut checked = split.restore(checked_raw); + checked.mark_checked(); + return Some(checked); } - true + Some(conn) } - async fn get_or_create_conn(&self) -> Result, Error> { + async fn get_or_create_conn(&self) -> Result, Error> { self.0.state.wait_count.fetch_add(1, Ordering::Relaxed); - gauge!(WAIT_COUNT).increment(1.0); - let wait_start = Instant::now(); + let wait_guard = DurationHistogramGuard::start(WAIT_DURATION); - let permit = self - .0 - .semaphore - .acquire() + let semaphore = Arc::clone(&self.0.semaphore); + let permit = semaphore + .acquire_owned() .await .map_err(|_| Error::PoolClosed)?; @@ -502,51 +442,37 @@ impl Pool { let mut internals = self.0.internals.lock().await; - internals.wait_duration += wait_start.elapsed(); - histogram!(WAIT_DURATION).record(wait_start.elapsed()); + internals.wait_duration += wait_guard.elapsed(); + drop(wait_guard); let conn = internals.free_conns.pop(); let internal_config = internals.config.clone(); drop(internals); - if conn.is_some() { - let mut conn = conn.unwrap(); - if self.validate_conn(internal_config, &mut conn).await { - gauge!(IDLE_CONNECTIONS,).decrement(1.0); - permit.forget(); - return Ok(conn); - } else { - conn.close(&self.0.state); + if let Some(conn) = conn { + if let Some(valid_conn) = self.validate_conn(internal_config, conn).await { + return Ok(valid_conn.into_active(permit)); } } - let create_r = self.open_new_connection().await; - - if create_r.is_ok() { - permit.forget(); - } + let create_r = self.open_new_connection(permit).await; create_r } - async fn open_new_connection(&self) -> Result, Error> { + async fn open_new_connection( + &self, + permit: OwnedSemaphorePermit, + ) -> Result, Error> { log::debug!("creating new connection from manager"); match self.0.manager.connect().await { Ok(c) => { self.0.state.num_open.fetch_add(1, Ordering::Relaxed); - gauge!(OPEN_CONNECTIONS).increment(1.0); - counter!(OPENED_TOTAL).increment(1); - - let conn = Conn { - raw: Some(c), - last_err: Mutex::new(None), - created_at: Instant::now(), - last_used_at: Instant::now(), - last_checked_at: Instant::now(), - brand_new: true, - }; - - Ok(conn) + let state = ConnState::new( + Arc::clone(&self.0.state.num_open), + Arc::clone(&self.0.state.max_idle_closed), + ); + Ok(ActiveConn::new(c, permit, state)) } Err(e) => Err(Error::Inner(e)), } @@ -578,28 +504,20 @@ impl Pool { async fn recycle_conn( shared: &Arc>, - mut conn: Conn, + mut conn: ActiveConn, ) { if conn_still_valid(shared, &mut conn) { - conn.brand_new = false; + conn.set_brand_new(false); let internals = shared.internals.lock().await; - put_idle_conn(shared, internals, conn); - } else { - conn.close(&shared.state); + put_idle_conn::(internals, conn); } - - shared.semaphore.add_permits(1); } fn conn_still_valid( shared: &Arc>, - conn: &mut Conn, + conn: &mut ActiveConn, ) -> bool { - if conn.raw.is_none() { - return false; - } - - if !shared.manager.validate(conn.raw.as_mut().unwrap()) { + if !shared.manager.validate(conn.as_raw_mut()) { log::debug!("bad conn when check in"); return false; } @@ -608,19 +526,15 @@ fn conn_still_valid( } fn put_idle_conn( - shared: &Arc>, - mut internals: MutexGuard<'_, PoolInternals>, - conn: Conn, + mut internals: MutexGuard<'_, PoolInternals>, + conn: ActiveConn, ) { + let idle_conn = conn.into_idle(); // Treat max_idle == 0 as unlimited idle connections. if internals.config.max_idle == 0 || internals.config.max_idle > internals.free_conns.len() as u64 { - gauge!(IDLE_CONNECTIONS).increment(1.0); - internals.free_conns.push(conn); - drop(internals); - } else { - conn.close(&shared.state); + internals.free_conns.push(idle_conn); } } @@ -678,7 +592,7 @@ async fn clean_connection(shared: &Weak>) -> bool { break; } - if internals.free_conns[i].created_at < expired { + if internals.free_conns[i].created_at() < expired { let c = internals.free_conns.swap_remove(i); closing.push(c); continue; @@ -691,39 +605,37 @@ async fn clean_connection(shared: &Weak>) -> bool { .state .max_lifetime_closed .fetch_add(closing.len() as u64, Ordering::Relaxed); - for conn in closing { - conn.close(&shared.state); - } true } /// A smart pointer wrapping a connection. pub struct Connection { - pool: Option>, - conn: Option>, + pool: Pool, + conn: Option>, } impl Connection { /// Returns true is the connection is newly established. pub fn is_brand_new(&self) -> bool { - self.conn.as_ref().unwrap().brand_new + self.conn.as_ref().unwrap().is_brand_new() } /// Unwraps the raw database connection. pub fn into_inner(mut self) -> M::Connection { - self.conn.as_mut().unwrap().raw.take().unwrap() + self.conn.take().unwrap().into_raw() } } impl Drop for Connection { fn drop(&mut self) { - let pool = self.pool.take().unwrap(); - let conn = self.conn.take().unwrap(); + let Some(conn) = self.conn.take() else { + return; + }; + + let pool = Arc::clone(&self.pool.0); - gauge!(ACTIVE_CONNECTIONS).decrement(1.0); - // FIXME: No clone! - pool.clone().0.manager.spawn_task(async move { - recycle_conn(&pool.0, conn).await; + self.pool.0.manager.spawn_task(async move { + recycle_conn(&pool, conn).await; }); } } @@ -731,12 +643,12 @@ impl Drop for Connection { impl Deref for Connection { type Target = M::Connection; fn deref(&self) -> &Self::Target { - self.conn.as_ref().unwrap().raw.as_ref().unwrap() + self.conn.as_ref().unwrap().as_raw_ref() } } impl DerefMut for Connection { fn deref_mut(&mut self) -> &mut M::Connection { - self.conn.as_mut().unwrap().raw.as_mut().unwrap() + self.conn.as_mut().unwrap().as_raw_mut() } } diff --git a/src/metrics_utils.rs b/src/metrics_utils.rs index 58edd9d..984e2f9 100644 --- a/src/metrics_utils.rs +++ b/src/metrics_utils.rs @@ -1,4 +1,6 @@ -use metrics::{describe_counter, describe_gauge, describe_histogram}; +use std::time::{Duration, Instant}; + +use metrics::{describe_counter, describe_gauge, describe_histogram, gauge, histogram}; pub const OPENED_TOTAL: &str = "mobc_pool_connections_opened_total"; pub const CLOSED_TOTAL: &str = "mobc_pool_connections_closed_total"; @@ -38,3 +40,44 @@ pub fn describe_metrics() { "Histogram of the wait time of all queries in ms" ); } + +pub(crate) struct GaugeGuard { + key: &'static str, +} + +impl GaugeGuard { + pub fn increment(key: &'static str) -> Self { + gauge!(key).increment(1.0); + Self { key } + } +} + +impl Drop for GaugeGuard { + fn drop(&mut self) { + gauge!(self.key).decrement(1.0); + } +} + +pub(crate) struct DurationHistogramGuard { + start: Instant, + key: &'static str, +} + +impl DurationHistogramGuard { + pub(crate) fn start(key: &'static str) -> Self { + Self { + start: Instant::now(), + key, + } + } + + pub(crate) fn elapsed(&self) -> Duration { + self.start.elapsed() + } +} + +impl Drop for DurationHistogramGuard { + fn drop(&mut self) { + histogram!(self.key).record(self.start.elapsed()); + } +} diff --git a/tests/mobc.rs b/tests/mobc.rs index 2420e98..4dae16b 100644 --- a/tests/mobc.rs +++ b/tests/mobc.rs @@ -706,14 +706,6 @@ fn test_max_idle_lifetime() { drop(v); delay_for(Duration::from_millis(800)).await; - let mut v = vec![]; - for _ in 0..5 { - v.push(pool.get().await.unwrap()); - } - assert_eq!(0, DROPPED.load(Ordering::SeqCst)); - drop(v); - delay_for(Duration::from_millis(2000)).await; - let mut v = vec![]; for _ in 0..5 { v.push(pool.get().await.unwrap()); From bd2d5475647c87aeb3428d567595ce254a2088fe Mon Sep 17 00:00:00 2001 From: Serhii Tatarintsev Date: Thu, 10 Oct 2024 17:32:51 +0200 Subject: [PATCH 2/2] Update src/conn.rs Co-authored-by: Alexey Orlenko --- src/conn.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/conn.rs b/src/conn.rs index 457fba4..383e65b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -44,7 +44,7 @@ impl ActiveConn { } pub(crate) fn set_brand_new(&mut self, brand_new: bool) { - self.state.brand_new = brand_new + self.state.brand_new = brand_new; } pub(crate) fn into_raw(self) -> C {