Merge branch 'connection-cushion' into 'master'

Spillover connection cushion

See merge request famedly/conduit!133
This commit is contained in:
Timo Kösters 2021-07-20 10:39:04 +00:00
commit d07762f596
2 changed files with 147 additions and 33 deletions

View File

@ -53,6 +53,10 @@ pub struct Config {
sqlite_wal_clean_second_interval: u32, sqlite_wal_clean_second_interval: u32,
#[serde(default = "default_sqlite_wal_clean_second_timeout")] #[serde(default = "default_sqlite_wal_clean_second_timeout")]
sqlite_wal_clean_second_timeout: u32, sqlite_wal_clean_second_timeout: u32,
#[serde(default = "default_sqlite_spillover_reap_fraction")]
sqlite_spillover_reap_fraction: f64,
#[serde(default = "default_sqlite_spillover_reap_interval_secs")]
sqlite_spillover_reap_interval_secs: u32,
#[serde(default = "default_max_request_size")] #[serde(default = "default_max_request_size")]
max_request_size: u32, max_request_size: u32,
#[serde(default = "default_max_concurrent_requests")] #[serde(default = "default_max_concurrent_requests")]
@ -121,6 +125,14 @@ fn default_sqlite_wal_clean_second_timeout() -> u32 {
2 2
} }
fn default_sqlite_spillover_reap_fraction() -> f64 {
0.5
}
fn default_sqlite_spillover_reap_interval_secs() -> u32 {
60
}
fn default_max_request_size() -> u32 { fn default_max_request_size() -> u32 {
20 * 1024 * 1024 // Default to 20 MB 20 * 1024 * 1024 // Default to 20 MB
} }
@ -420,7 +432,10 @@ impl Database {
drop(guard); drop(guard);
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
{
Self::start_wal_clean_task(&db, &config).await; Self::start_wal_clean_task(&db, &config).await;
Self::start_spillover_reap_task(builder, &config).await;
}
Ok(db) Ok(db)
} }
@ -541,6 +556,32 @@ impl Database {
self._db.flush_wal() self._db.flush_wal()
} }
#[cfg(feature = "sqlite")]
pub async fn start_spillover_reap_task(engine: Arc<Engine>, config: &Config) {
let fraction = config.sqlite_spillover_reap_fraction.clamp(0.01, 1.0);
let interval_secs = config.sqlite_spillover_reap_interval_secs as u64;
let weak = Arc::downgrade(&engine);
tokio::spawn(async move {
use tokio::time::interval;
use std::{sync::Weak, time::Duration};
let mut i = interval(Duration::from_secs(interval_secs));
loop {
i.tick().await;
if let Some(arc) = Weak::upgrade(&weak) {
arc.reap_spillover_by_fraction(fraction);
} else {
break;
}
}
});
}
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
pub async fn start_wal_clean_task(lock: &Arc<TokioRwLock<Self>>, config: &Config) { pub async fn start_wal_clean_task(lock: &Arc<TokioRwLock<Self>>, config: &Config) {
use tokio::time::{interval, timeout}; use tokio::time::{interval, timeout};

View File

@ -1,3 +1,11 @@
use super::{DatabaseEngine, Tree};
use crate::{database::Config, Result};
use crossbeam::channel::{
bounded, unbounded, Receiver as ChannelReceiver, Sender as ChannelSender, TryRecvError,
};
use log::debug;
use parking_lot::{Mutex, MutexGuard, RwLock};
use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension};
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
future::Future, future::Future,
@ -8,33 +16,12 @@ use std::{
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use crate::{database::Config, Result};
use super::{DatabaseEngine, Tree};
use log::debug;
use crossbeam::channel::{bounded, Sender as ChannelSender};
use parking_lot::{Mutex, MutexGuard, RwLock};
use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension};
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
// const SQL_CREATE_TABLE: &str =
// "CREATE TABLE IF NOT EXISTS {} {{ \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL }}";
// const SQL_SELECT: &str = "SELECT value FROM {} WHERE key = ?";
// const SQL_INSERT: &str = "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)";
// const SQL_DELETE: &str = "DELETE FROM {} WHERE key = ?";
// const SQL_SELECT_ITER: &str = "SELECT key, value FROM {}";
// const SQL_SELECT_PREFIX: &str = "SELECT key, value FROM {} WHERE key LIKE ?||'%' ORDER BY key ASC";
// const SQL_SELECT_ITER_FROM_FORWARDS: &str = "SELECT key, value FROM {} WHERE key >= ? ORDER BY ASC";
// const SQL_SELECT_ITER_FROM_BACKWARDS: &str =
// "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC";
struct Pool { struct Pool {
writer: Mutex<Connection>, writer: Mutex<Connection>,
readers: Vec<Mutex<Connection>>, readers: Vec<Mutex<Connection>>,
spills: ConnectionRecycler,
spill_tracker: Arc<()>, spill_tracker: Arc<()>,
path: PathBuf, path: PathBuf,
} }
@ -43,7 +30,7 @@ pub const MILLI: Duration = Duration::from_millis(1);
enum HoldingConn<'a> { enum HoldingConn<'a> {
FromGuard(MutexGuard<'a, Connection>), FromGuard(MutexGuard<'a, Connection>),
FromOwned(Connection, Arc<()>), FromRecycled(RecycledConn, Arc<()>),
} }
impl<'a> Deref for HoldingConn<'a> { impl<'a> Deref for HoldingConn<'a> {
@ -52,7 +39,57 @@ impl<'a> Deref for HoldingConn<'a> {
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
match self { match self {
HoldingConn::FromGuard(guard) => guard.deref(), HoldingConn::FromGuard(guard) => guard.deref(),
HoldingConn::FromOwned(conn, _) => conn, HoldingConn::FromRecycled(conn, _) => conn.deref(),
}
}
}
struct ConnectionRecycler(ChannelSender<Connection>, ChannelReceiver<Connection>);
impl ConnectionRecycler {
fn new() -> Self {
let (s, r) = unbounded();
Self(s, r)
}
fn recycle(&self, conn: Connection) -> RecycledConn {
let sender = self.0.clone();
RecycledConn(Some(conn), sender)
}
fn try_take(&self) -> Option<Connection> {
match self.1.try_recv() {
Ok(conn) => Some(conn),
Err(TryRecvError::Empty) => None,
// as this is pretty impossible, a panic is warranted if it ever occurs
Err(TryRecvError::Disconnected) => panic!("Receiving channel was disconnected. A a sender is owned by the current struct, this should never happen(!!!)")
}
}
}
struct RecycledConn(
Option<Connection>, // To allow moving out of the struct when `Drop` is called.
ChannelSender<Connection>,
);
impl Deref for RecycledConn {
type Target = Connection;
fn deref(&self) -> &Self::Target {
self.0
.as_ref()
.expect("RecycledConn does not have a connection in Option<>")
}
}
impl Drop for RecycledConn {
fn drop(&mut self) {
if let Some(conn) = self.0.take() {
log::debug!("Recycled connection");
if let Err(e) = self.1.send(conn) {
log::warn!("Recycling a connection led to the following error: {:?}", e)
}
} }
} }
} }
@ -76,6 +113,7 @@ impl Pool {
Ok(Self { Ok(Self {
writer, writer,
readers, readers,
spills: ConnectionRecycler::new(),
spill_tracker: Arc::new(()), spill_tracker: Arc::new(()),
path: path.as_ref().to_path_buf(), path: path.as_ref().to_path_buf(),
}) })
@ -104,24 +142,41 @@ impl Pool {
} }
fn read_lock(&self) -> HoldingConn<'_> { fn read_lock(&self) -> HoldingConn<'_> {
// First try to get a connection from the permanent pool
for r in &self.readers { for r in &self.readers {
if let Some(reader) = r.try_lock() { if let Some(reader) = r.try_lock() {
return HoldingConn::FromGuard(reader); return HoldingConn::FromGuard(reader);
} }
} }
let spill_arc = self.spill_tracker.clone(); log::debug!("read_lock: All permanent readers locked, obtaining spillover reader...");
// We didn't get a connection from the permanent pool, so we'll dumpster-dive for recycled connections.
// Either we have a connection or we dont, if we don't, we make a new one.
let conn = match self.spills.try_take() {
Some(conn) => conn,
None => {
log::debug!("read_lock: No recycled connections left, creating new one...");
Self::prepare_conn(&self.path, None).unwrap()
}
};
// Clone the spill Arc to mark how many spilled connections actually exist.
let spill_arc = Arc::clone(&self.spill_tracker);
// Get a sense of how many connections exist now.
let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */; let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */;
log::warn!("read_lock: all readers locked, creating spillover reader..."); // If the spillover readers are more than the number of total readers, there might be a problem.
if now_count > self.readers.len() {
if now_count > 1 { log::warn!(
log::warn!("read_lock: now {} spillover readers exist", now_count); "Database is under high load. Consider increasing sqlite_read_pool_size ({} spillover readers exist)",
now_count
);
} }
let spilled = Self::prepare_conn(&self.path, None).unwrap(); // Return the recyclable connection.
HoldingConn::FromRecycled(self.spills.recycle(conn), spill_arc)
HoldingConn::FromOwned(spilled, spill_arc)
} }
} }
@ -188,6 +243,24 @@ impl Engine {
) )
.map_err(Into::into) .map_err(Into::into)
} }
// Reaps (at most) (.len() * `fraction`) (rounded down, min 1) connections.
pub fn reap_spillover_by_fraction(&self, fraction: f64) {
let mut reaped = 0;
let spill_amount = self.pool.spills.1.len() as f64;
let fraction = fraction.clamp(0.01, 1.0);
let amount = (spill_amount * fraction).max(1.0) as u32;
for _ in 0..amount {
if self.pool.spills.try_take().is_some() {
reaped += 1;
}
}
log::debug!("Reaped {} connections", reaped);
}
} }
pub struct SqliteTable { pub struct SqliteTable {