add shutdown handler to kick sync

This commit is contained in:
Jonathan de Jong 2021-07-14 14:50:07 +02:00
parent f3e806096b
commit 952fb75795
2 changed files with 21 additions and 4 deletions

View File

@ -23,7 +23,7 @@ use rocket::{
futures::{channel::mpsc, stream::FuturesUnordered, StreamExt}, futures::{channel::mpsc, stream::FuturesUnordered, StreamExt},
outcome::{try_outcome, IntoOutcome}, outcome::{try_outcome, IntoOutcome},
request::{FromRequest, Request}, request::{FromRequest, Request},
State, Shutdown, State,
}; };
use ruma::{DeviceId, ServerName, UserId}; use ruma::{DeviceId, ServerName, UserId};
use serde::{de::IgnoredAny, Deserialize}; use serde::{de::IgnoredAny, Deserialize};
@ -199,7 +199,7 @@ impl Database {
} }
/// Load an existing database or create a new one. /// Load an existing database or create a new one.
pub async fn load_or_create(config: Config) -> Result<Arc<TokioRwLock<Self>>> { pub async fn load_or_create(config: &Config) -> Result<Arc<TokioRwLock<Self>>> {
Self::check_sled_or_sqlite_db(&config)?; Self::check_sled_or_sqlite_db(&config)?;
let builder = Engine::open(&config)?; let builder = Engine::open(&config)?;
@ -425,6 +425,17 @@ impl Database {
Ok(db) Ok(db)
} }
#[cfg(feature = "conduit_bin")]
pub async fn start_on_shutdown_tasks(db: Arc<TokioRwLock<Self>>, shutdown: Shutdown) {
tokio::spawn(async move {
shutdown.await;
log::info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers...");
db.read().await.globals.rotate.fire();
});
}
pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) { pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) {
let userid_bytes = user_id.as_bytes().to_vec(); let userid_bytes = user_id.as_bytes().to_vec();
let mut userid_prefix = userid_bytes.clone(); let mut userid_prefix = userid_bytes.clone();

View File

@ -220,11 +220,17 @@ async fn main() {
config.warn_deprecated(); config.warn_deprecated();
let db = Database::load_or_create(config) let db = Database::load_or_create(&config)
.await .await
.expect("config is valid"); .expect("config is valid");
let rocket = setup_rocket(raw_config, db); let rocket = setup_rocket(raw_config, Arc::clone(&db))
.ignite()
.await
.unwrap();
Database::start_on_shutdown_tasks(db, rocket.shutdown()).await;
rocket.launch().await.unwrap(); rocket.launch().await.unwrap();
} }