From 44425a903a27a0ca0e1f9ff7bc65ea1b13ded54a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 16 Mar 2021 18:00:26 +0100 Subject: [PATCH] fix: multiple federation/pusher fixes --- src/client_server/push.rs | 8 +-- src/client_server/sync.rs | 9 +--- src/database/pusher.rs | 22 +++----- src/database/rooms.rs | 22 ++++---- src/database/sending.rs | 24 ++++----- src/error.rs | 60 --------------------- src/server_server.rs | 107 +++++++++++++++++--------------------- 7 files changed, 85 insertions(+), 167 deletions(-) diff --git a/src/client_server/push.rs b/src/client_server/push.rs index 4dc97695..a7ddbb60 100644 --- a/src/client_server/push.rs +++ b/src/client_server/push.rs @@ -686,10 +686,10 @@ pub async fn get_pushers_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { - let sender = body.sender_user.as_ref().expect("authenticated endpoint"); + let sender_user = body.sender_user.as_ref().expect("user is authenticated"); Ok(get_pushers::Response { - pushers: db.pusher.get_pusher(sender)?, + pushers: db.pusher.get_pusher(sender_user)?, } .into()) } @@ -703,10 +703,10 @@ pub async fn set_pushers_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { - let sender = body.sender_user.as_ref().expect("authenticated endpoint"); + let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let pusher = body.pusher.clone(); - db.pusher.set_pusher(sender, pusher)?; + db.pusher.set_pusher(sender_user, pusher)?; db.flush().await?; diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 0fc98ec5..6551b2a2 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -315,7 +315,7 @@ pub async fn sync_events_route( (None, None, Vec::new()) }; - let state_events = if dbg!(joined_since_last_sync) { + let state_events = if joined_since_last_sync { current_state .into_iter() .map(|(_, pdu)| pdu.to_sync_state_event()) @@ -703,12 +703,7 @@ pub async fn sync_events_route( if duration.as_secs() > 30 { duration = Duration::from_secs(30); } - let delay = tokio::time::sleep(duration); - tokio::pin!(delay); - tokio::select! { - _ = &mut delay, if delay.is_elapsed() => {} - _ = watcher => {} - } + let _ = tokio::time::timeout(duration, watcher).await; } Ok(response.into()) diff --git a/src/database/pusher.rs b/src/database/pusher.rs index 2bf6bf75..59ccbef5 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -35,8 +35,6 @@ impl PushData { } pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> { - println!("CCCCCCCCCCCCCCCCCCCCCc"); - dbg!(&pusher); let mut key = sender.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(pusher.pushkey.as_bytes()); @@ -51,7 +49,7 @@ impl PushData { } self.senderkey_pusher.insert( - dbg!(key), + key, &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"), )?; @@ -63,12 +61,10 @@ impl PushData { prefix.push(0xff); self.senderkey_pusher - .scan_prefix(dbg!(prefix)) + .scan_prefix(prefix) .values() .map(|push| { - println!("DDDDDDDDDDDDDDDDDDDDDDDDDD"); - let push = - dbg!(push).map_err(|_| Error::bad_database("Invalid push bytes in db."))?; + let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?; Ok(serde_json::from_slice(&*push) .map_err(|_| Error::bad_database("Invalid Pusher in db."))?) }) @@ -100,10 +96,7 @@ where //*reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); let url = reqwest_request.url().clone(); - let reqwest_response = globals - .reqwest_client() - .execute(dbg!(reqwest_request)) - .await; + let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; // Because reqwest::Response -> http::Response is complicated: match reqwest_response { @@ -182,7 +175,7 @@ pub async fn send_push_notice( continue; } - match dbg!(rule.rule_id.as_str()) { + match rule.rule_id.as_str() { ".m.rule.master" => {} ".m.rule.suppress_notices" => { if pdu.kind == EventType::RoomMessage @@ -454,8 +447,7 @@ async fn send_notice( db: &Database, name: &str, ) -> Result<()> { - println!("BBBBBBBBBBBBBBBr"); - let (http, _emails): (Vec<&Pusher>, _) = dbg!(pushers) + let (http, _emails): (Vec<&Pusher>, _) = pushers .iter() .partition(|pusher| pusher.kind == Some(PusherKind::Http)); @@ -463,7 +455,7 @@ async fn send_notice( // Two problems with this // 1. if "event_id_only" is the only format kind it seems we should never add more info // 2. can pusher/devices have conflicting formats - for pusher in dbg!(http) { + for pusher in http { let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); let url = if let Some(url) = pusher.data.url.as_ref() { url diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 648f0803..c908d517 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -3,7 +3,7 @@ mod edus; pub use edus::RoomEdus; use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use regex::Regex; use ring::digest; use ruma::{ @@ -67,7 +67,7 @@ pub struct Rooms { /// StateKey = EventType + StateKey, Short = Count pub(super) statekey_short: sled::Tree, /// StateId = StateHash + Short, PduId = Count (without roomid) - pub(super) stateid_pduid: sled::Tree, + pub(super) stateid_eventid: sled::Tree, /// RoomId + EventId -> outlier PDU. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. @@ -138,7 +138,7 @@ impl Rooms { key.push(0xff); key.extend_from_slice(&state_key.as_bytes()); - info!("Looking for {} {:?}", event_type, state_key); + debug!("Looking for {} {:?}", event_type, state_key); let short = self.statekey_short.get(&key)?; @@ -147,11 +147,11 @@ impl Rooms { stateid.push(0xff); stateid.extend_from_slice(&short); - info!("trying to find pduid/eventid. short: {:?}", stateid); + debug!("trying to find pduid/eventid. short: {:?}", stateid); self.stateid_pduid .get(&stateid)? .map_or(Ok(None), |short_id| { - info!("found in stateid_pduid"); + debug!("found in stateid_pduid"); let mut long_id = room_id.as_bytes().to_vec(); long_id.push(0xff); long_id.extend_from_slice(&short_id); @@ -163,7 +163,7 @@ impl Rooms { .map_err(|_| Error::bad_database("Invalid PDU in db."))?, ), None => { - info!("looking in outliers"); + debug!("looking in outliers"); ( short_id.clone().into(), self.eventid_outlierpdu @@ -180,7 +180,7 @@ impl Rooms { })) }) } else { - info!("short id not found"); + warn!("short id not found"); Ok(None) } } @@ -288,7 +288,7 @@ impl Rooms { let mut state_id = prefix.clone(); state_id.extend_from_slice(&short.to_be_bytes()); - info!("inserting {:?} into {:?}", short_id, state_id); + debug!("inserting {:?} into {:?}", short_id, state_id); self.stateid_pduid.insert(state_id, short_id)?; } @@ -574,7 +574,7 @@ impl Rooms { self.pduid_pdu.insert( &pdu_id, - &*serde_json::to_string(&pdu_json) + &*serde_json::to_string(dbg!(&pdu_json)) .expect("CanonicalJsonObject is always a valid String"), )?; @@ -889,12 +889,12 @@ impl Rooms { content.clone(), prev_event, None, // TODO: third party invite - dbg!(&auth_events + &auth_events .iter() .map(|((ty, key), pdu)| { Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone()))) }) - .collect::>>()?), + .collect::>>()?, ) .map_err(|e| { log::error!("{}", e); diff --git a/src/database/sending.rs b/src/database/sending.rs index fc1d27dd..b35f7c57 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -10,7 +10,7 @@ use crate::{ appservice_server, database::pusher, server_server, utils, Database, Error, PduEvent, Result, }; use federation::transactions::send_transaction_message; -use log::{info, warn}; +use log::{debug, error, info, warn}; use ring::digest; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ @@ -308,8 +308,6 @@ impl Sending { key.extend_from_slice(pdu_id); self.servernamepduids.insert(key, b"")?; - println!("AAAA"); - Ok(()) } @@ -348,7 +346,7 @@ impl Sending { pdu_ids: Vec, db: &Database, ) -> std::result::Result { - match dbg!(&kind) { + match &kind { OutgoingKind::Appservice(server) => { let pdu_jsons = pdu_ids .iter() @@ -414,21 +412,23 @@ impl Sending { .filter_map(|r| r.ok()) .collect::>(); - for pdu in dbg!(&pdus) { + for pdu in pdus { // Redacted events are not notification targets (we don't send push for them) if pdu.unsigned.get("redacted_because").is_some() { continue; } - for user in db.rooms.room_members(&pdu.room_id) { - let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; - + for user in db.users.iter().filter_map(|r| r.ok()).filter(|user_id| { + db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false) + }) { // Don't notify the user of their own events if user == pdu.sender { continue; } - let pushers = dbg!(db.pusher.get_pusher(&user)) + let pushers = db + .pusher + .get_pusher(&user) .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; let rules_for_user = db @@ -467,7 +467,7 @@ impl Sending { unread, &pushers, rules_for_user, - pdu, + &pdu, db, ) .await @@ -510,7 +510,7 @@ impl Sending { let permit = db.sending.maximum_requests.acquire().await; - info!("sending pdus to {}: {:#?}", server, pdu_jsons); + error!("sending pdus to {}: {:#?}", server, pdu_jsons); let response = server_server::send_request( &db.globals, &*server, @@ -527,7 +527,7 @@ impl Sending { ) .await .map(|response| { - info!("server response: {:?}", response); + error!("server response: {:?}", response); kind.clone() }) .map_err(|e| (kind, e)); diff --git a/src/error.rs b/src/error.rs index d8f10f4e..8a64e632 100644 --- a/src/error.rs +++ b/src/error.rs @@ -111,63 +111,3 @@ where .respond_to(r) } } - -pub struct ConduitLogger { - pub db: Database, - pub last_logs: RwLock>, -} - -impl log::Log for ConduitLogger { - fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool { - true - } - - fn log(&self, record: &log::Record<'_>) { - let output = format!("{} - {}", record.level(), record.args()); - - let match_mod_path = - |path: &str| path.starts_with("conduit::") || path.starts_with("state"); - - if self.enabled(record.metadata()) - && (record.module_path().map_or(false, match_mod_path) - || record - .module_path() - .map_or(true, |path| !path.starts_with("rocket::")) // Rockets logs are annoying - && record.metadata().level() <= log::Level::Warn) - { - let first_line = output - .lines() - .next() - .expect("lines always returns one item"); - - eprintln!("{}", output); - - let mute_duration = match record.metadata().level() { - log::Level::Error => Duration::from_secs(60 * 5), // 5 minutes - log::Level::Warn => Duration::from_secs(60 * 60 * 24), // A day - _ => Duration::from_secs(60 * 60 * 24 * 7), // A week - }; - - if self - .last_logs - .read() - .unwrap() - .get(first_line) - .map_or(false, |i| i.elapsed() < mute_duration) - // Don't post this log again for some time - { - return; - } - - if let Ok(mut_last_logs) = &mut self.last_logs.try_write() { - mut_last_logs.insert(first_line.to_owned(), Instant::now()); - } - - self.db.admin.send(AdminCommand::SendMessage( - message::MessageEventContent::notice_plain(output), - )); - } - } - - fn flush(&self) {} -} diff --git a/src/server_server.rs b/src/server_server.rs index 02610e8a..919d12f8 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,7 +1,7 @@ use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use regex::Regex; use rocket::{get, post, put, response::content::Json, State}; use ruma::{ @@ -27,7 +27,7 @@ use ruma::{ use state_res::{Event, EventMap, StateMap}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, - convert::TryFrom, + convert::{TryFrom, TryInto}, fmt::Debug, future::Future, net::{IpAddr, SocketAddr}, @@ -601,7 +601,7 @@ pub async fn send_transaction_message_route<'a>( // discard the event whereas the Client Server API's /send/{eventType} endpoint // would return a M_BAD_JSON error. 'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve { - info!("Working on incoming pdu: {:?}", value); + debug!("Working on incoming pdu: {:?}", value); let server_name = &body.body.origin; let mut pub_key_map = BTreeMap::new(); @@ -636,11 +636,11 @@ pub async fn send_transaction_message_route<'a>( continue; } }; - info!("Validated event."); + debug!("Validated event."); // 6. persist the event as an outlier. db.rooms.add_pdu_outlier(&pdu)?; - info!("Added pdu as outlier."); + debug!("Added pdu as outlier."); // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all // the checks in this list starting at 1. These are not timeline events. @@ -649,7 +649,7 @@ pub async fn send_transaction_message_route<'a>( // // TODO: if we know the prev_events of the incoming event we can avoid the request and build // the state from a known point and resolve if > 1 prev_event - info!("Requesting state at event."); + debug!("Requesting state at event."); let (state_at_event, incoming_auth_events): (StateMap>, Vec>) = match db .sending @@ -664,7 +664,7 @@ pub async fn send_transaction_message_route<'a>( .await { Ok(res) => { - info!("Fetching state events at event."); + debug!("Fetching state events at event."); let state = match fetch_events( &db, server_name, @@ -706,7 +706,7 @@ pub async fn send_transaction_message_route<'a>( Err(_) => continue, }; - info!("Fetching auth events of state events at event."); + debug!("Fetching auth events of state events at event."); (state, incoming_auth_events) } Err(_) => { @@ -735,7 +735,7 @@ pub async fn send_transaction_message_route<'a>( ); continue; } - info!("Auth check succeeded."); + debug!("Auth check succeeded."); // End of step 10. // 12. check if the event passes auth based on the "current state" of the room, if not "soft fail" it @@ -746,8 +746,6 @@ pub async fn send_transaction_message_route<'a>( .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .collect(); - info!("current state: {:#?}", current_state); - if !state_res::event_auth::auth_check( &RoomVersionId::Version6, &pdu, @@ -764,7 +762,7 @@ pub async fn send_transaction_message_route<'a>( ); continue; }; - info!("Auth check with current state succeeded."); + debug!("Auth check with current state succeeded."); // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res // where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote) @@ -773,7 +771,7 @@ pub async fn send_transaction_message_route<'a>( // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. let extremities = match calculate_forward_extremities(&db, &pdu).await { Ok(fork_ids) => { - info!("Calculated new forward extremities: {:?}", fork_ids); + debug!("Calculated new forward extremities: {:?}", fork_ids); fork_ids } Err(_) => { @@ -828,20 +826,21 @@ pub async fn send_transaction_message_route<'a>( for map in &fork_states { let mut state_auth = vec![]; for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { - let event = match auth_cache.get(auth_id) { - Some(aev) => aev.clone(), - // The only events that haven't been added to the auth cache are - // events we have knowledge of previously - None => { - error!("Event was not present in auth_cache {}", auth_id); - resolved_map.insert( - event_id.clone(), - Err("Event was not present in auth cache".into()), - ); - continue 'main_pdu_loop; + match fetch_events( + &db, + server_name, + &mut pub_key_map, + &[auth_id.clone()], + &mut auth_cache, + ) + .await + { + // This should always contain exactly one element when Ok + Ok(events) => state_auth.push(events[0].clone()), + Err(e) => { + debug!("Event was not present: {}", e); } - }; - state_auth.push(event); + } } auth_events.push(state_auth); } @@ -864,7 +863,7 @@ pub async fn send_transaction_message_route<'a>( .map(|(_, pdu)| (pdu.event_id().clone(), pdu)), ); - info!("auth events: {:?}", auth_cache); + debug!("auth events: {:?}", auth_cache); let res = match state_res::StateResolution::resolve( pdu.room_id(), @@ -916,7 +915,7 @@ pub async fn send_transaction_message_route<'a>( // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; - info!("Appended incoming pdu."); + debug!("Appended incoming pdu."); // Set the new room state to the resolved state update_resolved_state( @@ -928,7 +927,7 @@ pub async fn send_transaction_message_route<'a>( None }, )?; - info!("Updated resolved state"); + debug!("Updated resolved state"); // Event has passed all auth/stateres checks } @@ -972,7 +971,7 @@ fn validate_event<'a>( } .keys() { - info!("Fetching signing keys for {}", signature_server); + debug!("Fetching signing keys for {}", signature_server); let keys = match fetch_signing_keys( &db, &Box::::try_from(&**signature_server).map_err(|_| { @@ -981,10 +980,7 @@ fn validate_event<'a>( ) .await { - Ok(keys) => { - info!("Keys: {:?}", keys); - keys - } + Ok(keys) => keys, Err(_) => { return Err( "Signature verification failed: Could not fetch signing key.".to_string(), @@ -993,8 +989,6 @@ fn validate_event<'a>( }; pub_key_map.insert(signature_server.clone(), keys); - - info!("Fetched signing keys"); } let mut val = @@ -1026,7 +1020,7 @@ fn validate_event<'a>( ) .map_err(|_| "Event is not a valid PDU".to_string())?; - info!("Fetching auth events."); + debug!("Fetching auth events."); fetch_check_auth_events(db, origin, pub_key_map, &pdu.auth_events, auth_cache) .await .map_err(|e| e.to_string())?; @@ -1035,7 +1029,7 @@ fn validate_event<'a>( /* // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - info!("Fetching prev events."); + debug!("Fetching prev events."); let previous = fetch_events(&db, origin, pub_key_map, &pdu.prev_events, auth_cache) .await .map_err(|e| e.to_string())?; @@ -1049,7 +1043,7 @@ fn validate_event<'a>( }; // Check that the event passes auth based on the auth_events - info!("Checking auth."); + debug!("Checking auth."); let is_authed = state_res::event_auth::auth_check( &RoomVersionId::Version6, &pdu, @@ -1073,7 +1067,7 @@ fn validate_event<'a>( return Err("Event has failed auth check with auth events".to_string()); } - info!("Validation successful."); + debug!("Validation successful."); Ok((pdu, previous_create)) }) } @@ -1111,20 +1105,19 @@ pub(crate) async fn fetch_events( ) -> Result>> { let mut pdus = vec![]; for id in events { - info!("Fetching event: {}", id); let pdu = match auth_cache.get(id) { Some(pdu) => { - info!("Event found in cache"); + debug!("Event found in cache"); pdu.clone() } // `get_pdu` checks the outliers tree for us None => match db.rooms.get_pdu(&id)? { Some(pdu) => { - info!("Event found in outliers"); + debug!("Event found in outliers"); Arc::new(pdu) } None => { - info!("Fetching event over federation"); + debug!("Fetching event over federation"); match db .sending .send_federation_request( @@ -1135,7 +1128,7 @@ pub(crate) async fn fetch_events( .await { Ok(res) => { - info!("Got event over federation: {:?}", res); + debug!("Got event over federation: {:?}", res); let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu); let (pdu, _) = @@ -1146,7 +1139,7 @@ pub(crate) async fn fetch_events( Error::Conflict("Authentication of event failed") })?; - info!("Added fetched pdu as outlier."); + debug!("Added fetched pdu as outlier."); db.rooms.add_pdu_outlier(&pdu)?; pdu } @@ -1171,15 +1164,11 @@ pub(crate) async fn fetch_signing_keys( let mut result = BTreeMap::new(); match db.globals.signing_keys_for(origin)? { - keys if !keys.is_empty() => { - info!("we knew the signing keys already: {:?}", keys); - Ok(keys - .into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect()) - } + keys if !keys.is_empty() => Ok(keys + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect()), _ => { - info!("Asking {} for it's signing key", origin); match db .sending .send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new()) @@ -1204,7 +1193,7 @@ pub(crate) async fn fetch_signing_keys( } _ => { for server in db.globals.trusted_servers() { - info!("Asking {} for {}'s signing key", server, origin); + debug!("Asking {} for {}'s signing key", server, origin); if let Ok(keys) = db .sending .send_federation_request( @@ -1219,7 +1208,7 @@ pub(crate) async fn fetch_signing_keys( ) .await { - info!("Got signing keys: {:?}", keys); + debug!("Got signing keys: {:?}", keys); for k in keys.server_keys.into_iter() { db.globals.add_signing_key(origin, &k)?; result.extend( @@ -1364,7 +1353,6 @@ pub(crate) async fn build_forward_extremity_snapshots( fork_states.insert(current_state); } - info!("Fork states: {:?}", fork_states); Ok(fork_states) } @@ -1548,7 +1536,10 @@ pub fn get_missing_events_route<'a>( ) .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?, ); - events.push(serde_json::from_value(pdu).expect("Raw<..> is always valid")); + events.push(PduEvent::convert_to_outgoing_federation_event( + serde_json::from_value(pdu) + .map_err(|_| Error::bad_database("Invalid pdu in database."))?, + )); } i += 1; }