From 9b898248c7cd5c060fd806db98068c6298f6aac5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 30 May 2022 12:58:43 +0200 Subject: [PATCH] feat: more admin commands, better logging --- Cargo.toml | 6 +- src/client_server/unversioned.rs | 7 +- src/database.rs | 2 + src/database/admin.rs | 56 +++++++++++++++ src/database/globals.rs | 4 ++ src/database/rooms.rs | 18 +++++ src/ruma_wrapper/axum.rs | 2 +- src/server_server.rs | 119 ++++++++++++++++++++++--------- 8 files changed, 177 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 64b7a233..10be7501 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ tokio = { version = "1.11.0", features = ["fs", "macros", "signal", "sync"] } # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } -persy = { version = "1.2" , optional = true, features=["background_ops"] } +persy = { version = "1.2" , optional = true, features = ["background_ops"] } # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.1.0" @@ -64,7 +64,7 @@ regex = "1.5.4" # jwt jsonwebtokens jsonwebtoken = "7.2.0" # Performance measurements -tracing = { version = "0.1.26", features = ["release_max_level_warn"] } +tracing = { version = "0.1.26", features = [] } tracing-subscriber = "0.2.20" tracing-flame = "0.1.0" opentelemetry = { version = "0.16.0", features = ["rt-tokio"] } @@ -76,7 +76,7 @@ crossbeam = { version = "0.8.1", optional = true } num_cpus = "1.13.0" threadpool = "1.8.1" heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true } -rocksdb = { version = "0.17.0", default-features = false, features = ["multi-threaded-cf", "zstd"], optional = true } +rocksdb = { version = "0.17.0", default-features = true, features = ["multi-threaded-cf", "zstd"], optional = true } thread_local = "1.1.3" # used for TURN server authentication diff --git a/src/client_server/unversioned.rs b/src/client_server/unversioned.rs index fd0277c6..8a5c3d25 100644 --- a/src/client_server/unversioned.rs +++ b/src/client_server/unversioned.rs @@ -18,7 +18,12 @@ pub async fn get_supported_versions_route( _body: Ruma, ) -> Result { let resp = get_supported_versions::Response { - versions: vec!["r0.5.0".to_owned(), "r0.6.0".to_owned(), "v1.1".to_owned(), "v1.2".to_owned()], + versions: vec![ + "r0.5.0".to_owned(), + "r0.6.0".to_owned(), + "v1.1".to_owned(), + "v1.2".to_owned(), + ], unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]), }; diff --git a/src/database.rs b/src/database.rs index 4a03f18c..a0937c29 100644 --- a/src/database.rs +++ b/src/database.rs @@ -213,6 +213,8 @@ impl Database { userroomid_leftstate: builder.open_tree("userroomid_leftstate")?, roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?, + disabledroomids: builder.open_tree("disabledroomids")?, + lazyloadedids: builder.open_tree("lazyloadedids")?, userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?, diff --git a/src/database/admin.rs b/src/database/admin.rs index dcf09ebc..c6ef9a64 100644 --- a/src/database/admin.rs +++ b/src/database/admin.rs @@ -231,9 +231,15 @@ enum AdminCommand { /// List all the currently registered appservices ListAppservices, + /// List all rooms the server knows about + ListRooms, + /// List users in the database ListLocalUsers, + /// List all rooms we are currently handling an incoming pdu from + IncomingFederation, + /// Get the auth_chain of a PDU GetAuthChain { /// An event ID (the $ character followed by the base64 reference hash) @@ -269,6 +275,7 @@ enum AdminCommand { /// Username of the user for whom the password should be reset username: String, }, + /// Create a new user CreateUser { /// Username of the new user @@ -276,6 +283,11 @@ enum AdminCommand { /// Password of the new user, if unspecified one is generated password: Option, }, + + /// Disables incoming federation handling for a room. + DisableRoom { room_id: Box }, + /// Enables incoming federation handling for a room again. + EnableRoom { room_id: Box }, } fn process_admin_command( @@ -336,6 +348,26 @@ fn process_admin_command( RoomMessageEventContent::text_plain("Failed to get appservices.") } } + AdminCommand::ListRooms => { + let room_ids = db.rooms.iter_ids(); + let output = format!( + "Rooms:\n{}", + room_ids + .filter_map(|r| r.ok()) + .map(|id| id.to_string() + + "\tMembers: " + + &db + .rooms + .room_joined_count(&id) + .ok() + .flatten() + .unwrap_or(0) + .to_string()) + .collect::>() + .join("\n") + ); + RoomMessageEventContent::text_plain(output) + } AdminCommand::ListLocalUsers => match db.users.list_local_users() { Ok(users) => { let mut msg: String = format!("Found {} local user account(s):\n", users.len()); @@ -344,6 +376,22 @@ fn process_admin_command( } Err(e) => RoomMessageEventContent::text_plain(e.to_string()), }, + AdminCommand::IncomingFederation => { + let map = db.globals.roomid_federationhandletime.read().unwrap(); + let mut msg: String = format!("Handling {} incoming pdus:\n", map.len()); + + for (r, (e, i)) in map.iter() { + let elapsed = i.elapsed(); + msg += &format!( + "{} {}: {}m{}s\n", + r, + e, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); + } + RoomMessageEventContent::text_plain(&msg) + } AdminCommand::GetAuthChain { event_id } => { let event_id = Arc::::from(event_id); if let Some(event) = db.rooms.get_pdu_json(&event_id)? { @@ -545,6 +593,14 @@ fn process_admin_command( "Created user with user_id: {user_id} and password: {password}" )) } + AdminCommand::DisableRoom { room_id } => { + db.rooms.disabledroomids.insert(room_id.as_bytes(), &[])?; + RoomMessageEventContent::text_plain("Room disabled.") + } + AdminCommand::EnableRoom { room_id } => { + db.rooms.disabledroomids.remove(room_id.as_bytes())?; + RoomMessageEventContent::text_plain("Room enabled.") + } }; Ok(reply_message_content) diff --git a/src/database/globals.rs b/src/database/globals.rs index d363e933..7e09128e 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -52,6 +52,8 @@ pub struct Globals { pub roomid_mutex_insert: RwLock, Arc>>>, pub roomid_mutex_state: RwLock, Arc>>>, pub roomid_mutex_federation: RwLock, Arc>>>, // this lock will be held longer + pub roomid_federationhandletime: RwLock, (Box, Instant)>>, + pub stateres_mutex: Arc>, pub rotate: RotationHandler, } @@ -183,6 +185,8 @@ impl Globals { roomid_mutex_state: RwLock::new(HashMap::new()), roomid_mutex_insert: RwLock::new(HashMap::new()), roomid_mutex_federation: RwLock::new(HashMap::new()), + roomid_federationhandletime: RwLock::new(HashMap::new()), + stateres_mutex: Arc::new(Mutex::new(())), sync_receivers: RwLock::new(HashMap::new()), rotate: RotationHandler::new(), }; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index c885c960..2c1b8f44 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -76,6 +76,8 @@ pub struct Rooms { pub(super) userroomid_leftstate: Arc, pub(super) roomuserid_leftcount: Arc, + pub(super) disabledroomids: Arc, // Rooms where incoming federation handling is disabled + pub(super) lazyloadedids: Arc, // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId pub(super) userroomid_notificationcount: Arc, // NotifyCount = u64 @@ -2858,6 +2860,18 @@ impl Rooms { Ok(self.publicroomids.get(room_id.as_bytes())?.is_some()) } + #[tracing::instrument(skip(self))] + pub fn iter_ids(&self) -> impl Iterator>> + '_ { + self.roomid_shortroomid.iter().map(|(bytes, _)| { + RoomId::parse( + utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Room ID in publicroomids is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in roomid_shortroomid is invalid.")) + }) + } + #[tracing::instrument(skip(self))] pub fn public_rooms(&self) -> impl Iterator>> + '_ { self.publicroomids.iter().map(|(bytes, _)| { @@ -3140,6 +3154,10 @@ impl Rooms { .transpose() } + pub fn is_disabled(&self, room_id: &RoomId) -> Result { + Ok(self.disabledroomids.get(room_id.as_bytes())?.is_some()) + } + /// Returns an iterator over all rooms this user joined. #[tracing::instrument(skip(self))] pub fn rooms_joined<'a>( diff --git a/src/ruma_wrapper/axum.rs b/src/ruma_wrapper/axum.rs index fdb140fe..45e9d9a8 100644 --- a/src/ruma_wrapper/axum.rs +++ b/src/ruma_wrapper/axum.rs @@ -338,7 +338,7 @@ impl Credentials for XMatrix { "origin" => origin = Some(value.try_into().ok()?), "key" => key = Some(value.to_owned()), "sig" => sig = Some(value.to_owned()), - _ => warn!( + _ => debug!( "Unexpected field `{}` in X-Matrix Authorization header", name ), diff --git a/src/server_server.rs b/src/server_server.rs index a227f57c..7b08cf9b 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -768,7 +768,7 @@ pub async fn send_transaction_message_route( )?; } else { // TODO fetch missing events - debug!("No known event ids in read receipt: {:?}", user_updates); + info!("No known event ids in read receipt: {:?}", user_updates); } } } @@ -926,6 +926,13 @@ pub(crate) async fn handle_incoming_pdu<'a>( } } + match db.rooms.is_disabled(room_id) { + Ok(false) => {} + _ => { + return Err("Federation of this room is currently disabled on this server.".to_owned()); + } + } + // 1. Skip the PDU if we already have it as a timeline event if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(event_id) { return Ok(Some(pdu_id.to_vec())); @@ -1038,6 +1045,15 @@ pub(crate) async fn handle_incoming_pdu<'a>( let mut errors = 0; for prev_id in dbg!(sorted) { + match db.rooms.is_disabled(room_id) { + Ok(false) => {} + _ => { + return Err( + "Federation of this room is currently disabled on this server.".to_owned(), + ); + } + } + if errors >= 5 { break; } @@ -1047,6 +1063,11 @@ pub(crate) async fn handle_incoming_pdu<'a>( } let start_time = Instant::now(); + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time)); let event_id = pdu.event_id.clone(); if let Err(e) = upgrade_outlier_to_timeline_pdu( pdu, @@ -1063,6 +1084,11 @@ pub(crate) async fn handle_incoming_pdu<'a>( warn!("Prev event {} failed: {}", event_id, e); } let elapsed = start_time.elapsed(); + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .remove(&room_id.to_owned()); warn!( "Handling prev event {} took {}m{}s", event_id, @@ -1072,7 +1098,13 @@ pub(crate) async fn handle_incoming_pdu<'a>( } } - upgrade_outlier_to_timeline_pdu( + let start_time = Instant::now(); + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .insert(room_id.to_owned(), (event_id.to_owned(), start_time)); + let r = upgrade_outlier_to_timeline_pdu( incoming_pdu, val, &create_event, @@ -1081,10 +1113,17 @@ pub(crate) async fn handle_incoming_pdu<'a>( room_id, pub_key_map, ) - .await + .await; + db.globals + .roomid_federationhandletime + .write() + .unwrap() + .remove(&room_id.to_owned()); + + r } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip(create_event, value, db, pub_key_map))] fn handle_outlier_pdu<'a>( origin: &'a ServerName, create_event: &'a PduEvent, @@ -1166,7 +1205,7 @@ fn handle_outlier_pdu<'a>( .await; // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events - debug!( + info!( "Auth check for {} based on auth events", incoming_pdu.event_id ); @@ -1221,19 +1260,19 @@ fn handle_outlier_pdu<'a>( return Err("Event has failed auth check with auth events.".to_owned()); } - debug!("Validation successful."); + info!("Validation successful."); // 7. Persist the event as an outlier. db.rooms .add_pdu_outlier(&incoming_pdu.event_id, &val) .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; - debug!("Added pdu as outlier."); + info!("Added pdu as outlier."); Ok((Arc::new(incoming_pdu), val)) }) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip(incoming_pdu, val, create_event, db, pub_key_map))] async fn upgrade_outlier_to_timeline_pdu( incoming_pdu: Arc, val: BTreeMap, @@ -1255,6 +1294,8 @@ async fn upgrade_outlier_to_timeline_pdu( return Err("Event has been soft failed".into()); } + info!("Upgrading {} to timeline pdu", incoming_pdu.event_id); + let create_event_content: RoomCreateEventContent = serde_json::from_str(create_event.content.get()).map_err(|e| { warn!("Invalid create event: {}", e); @@ -1270,7 +1311,7 @@ async fn upgrade_outlier_to_timeline_pdu( // TODO: if we know the prev_events of the incoming event we can avoid the request and build // the state from a known point and resolve if > 1 prev_event - debug!("Requesting state at event."); + info!("Requesting state at event"); let mut state_at_incoming_event = None; if incoming_pdu.prev_events.len() == 1 { @@ -1284,7 +1325,7 @@ async fn upgrade_outlier_to_timeline_pdu( prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); if let Some(Ok(mut state)) = state { - warn!("Using cached state"); + info!("Using cached state"); let prev_pdu = db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { "Could not find prev event, but we know the state.".to_owned() @@ -1307,7 +1348,7 @@ async fn upgrade_outlier_to_timeline_pdu( state_at_incoming_event = Some(state); } } else { - warn!("Calculating state at event using state res"); + info!("Calculating state at event using state res"); let mut extremity_sstatehashes = HashMap::new(); let mut okay = true; @@ -1375,18 +1416,18 @@ async fn upgrade_outlier_to_timeline_pdu( fork_states.push(state); } - state_at_incoming_event = match state_res::resolve( - room_version_id, - &fork_states, - auth_chain_sets, - |id| { - let res = db.rooms.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }, - ) { + let lock = db.globals.stateres_mutex.lock(); + + let result = state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { + let res = db.rooms.get_pdu(id); + if let Err(e) = &res { + error!("LOOK AT ME Failed to fetch event: {}", e); + } + res.ok().flatten() + }); + drop(lock); + + state_at_incoming_event = match result { Ok(new_state) => Some( new_state .into_iter() @@ -1407,12 +1448,12 @@ async fn upgrade_outlier_to_timeline_pdu( warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); None } - }; + } } } if state_at_incoming_event.is_none() { - warn!("Calling /state_ids"); + info!("Calling /state_ids"); // Call /state_ids to find out what the state at this pdu is. We trust the server's // response to some extend, but we still do a lot of checks on the events match db @@ -1428,7 +1469,7 @@ async fn upgrade_outlier_to_timeline_pdu( .await { Ok(res) => { - warn!("Fetching state events at event."); + info!("Fetching state events at event."); let state_vec = fetch_and_handle_outliers( db, origin, @@ -1513,7 +1554,7 @@ async fn upgrade_outlier_to_timeline_pdu( if !check_result { return Err("Event has failed auth check with state at the event.".into()); } - debug!("Auth check succeeded."); + info!("Auth check succeeded."); // We start looking at current room state now, so lets lock the room @@ -1576,7 +1617,7 @@ async fn upgrade_outlier_to_timeline_pdu( .collect::>()?; // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it - debug!("starting soft fail auth check"); + info!("Starting soft fail auth check"); let soft_fail = !state_res::event_auth::auth_check( &room_version, @@ -1610,8 +1651,10 @@ async fn upgrade_outlier_to_timeline_pdu( } if incoming_pdu.state_key.is_some() { + info!("Preparing for stateres to derive new room state"); let mut extremity_sstatehashes = HashMap::new(); + info!("Loading extremities"); for id in dbg!(&extremities) { match db .rooms @@ -1671,6 +1714,7 @@ async fn upgrade_outlier_to_timeline_pdu( let new_room_state = if fork_states.is_empty() { return Err("State is empty.".to_owned()); } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { + info!("State resolution trivial"); // There was only one state, so it has to be the room's current state (because that is // always included) fork_states[0] @@ -1682,6 +1726,7 @@ async fn upgrade_outlier_to_timeline_pdu( }) .collect::>()? } else { + info!("Loading auth chains"); // We do need to force an update to this room's state update_state = true; @@ -1698,6 +1743,8 @@ async fn upgrade_outlier_to_timeline_pdu( ); } + info!("Loading fork states"); + let fork_states: Vec<_> = fork_states .into_iter() .map(|map| { @@ -1715,6 +1762,9 @@ async fn upgrade_outlier_to_timeline_pdu( }) .collect(); + info!("Resolving state"); + + let lock = db.globals.stateres_mutex.lock(); let state = match state_res::resolve( room_version_id, &fork_states, @@ -1733,6 +1783,10 @@ async fn upgrade_outlier_to_timeline_pdu( } }; + drop(lock); + + info!("State resolution done. Compressing state"); + state .into_iter() .map(|((event_type, state_key), event_id)| { @@ -1753,13 +1807,14 @@ async fn upgrade_outlier_to_timeline_pdu( // Set the new room state to the resolved state if update_state { + info!("Forcing new room state"); db.rooms .force_state(room_id, new_room_state, db) .map_err(|_| "Failed to set new room state.".to_owned())?; } - debug!("Updated resolved state"); } + info!("Appending pdu to timeline"); extremities.insert(incoming_pdu.event_id.clone()); // Now that the event has passed all auth it is added into the timeline. @@ -1780,7 +1835,7 @@ async fn upgrade_outlier_to_timeline_pdu( "Failed to add pdu to db.".to_owned() })?; - debug!("Appended incoming pdu."); + info!("Appended incoming pdu"); // Event has passed all auth/stateres checks drop(state_lock); @@ -1854,7 +1909,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( continue; } - warn!("Fetching {} over federation.", next_id); + info!("Fetching {} over federation.", next_id); match db .sending .send_federation_request( @@ -1865,7 +1920,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( .await { Ok(res) => { - warn!("Got {} over federation", next_id); + info!("Got {} over federation", next_id); let (calculated_event_id, value) = match crate::pdu::gen_event_id_canonical_json(&res.pdu, &db) { Ok(t) => t,