diff --git a/src/database.rs b/src/database.rs index b841ab97..a9cc3623 100644 --- a/src/database.rs +++ b/src/database.rs @@ -159,7 +159,7 @@ impl Database { stateid_pduid: db.open_tree("stateid_pduid")?, pduid_statehash: db.open_tree("pduid_statehash")?, roomid_statehash: db.open_tree("roomid_statehash")?, - eventid_outlierpdu: db.open_tree("eventid_outlierpdu")?, + pduid_outlierpdu: db.open_tree("pduid_outlierpdu")?, }, account_data: account_data::AccountData { roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index a3f3aab8..d459aeec 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -27,9 +27,10 @@ use std::{ convert::{TryFrom, TryInto}, mem, sync::Arc, + time::Duration, }; -use super::admin::AdminCommand; +use super::{admin::AdminCommand, sending::Sending}; /// The unique identifier of each state group. /// @@ -67,7 +68,7 @@ pub struct Rooms { pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid) /// Any pdu that has passed the steps up to auth with auth_events. - pub(super) eventid_outlierpdu: sled::Tree, + pub(super) pduid_outlierpdu: sled::Tree, } impl Rooms { @@ -85,13 +86,20 @@ impl Rooms { let mut pduid = room_id.as_bytes().to_vec(); pduid.push(0xff); pduid.extend_from_slice(&pduid_short?); - self.pduid_pdu.get(&pduid)?.map_or_else( - || Err(Error::bad_database("Failed to find PDU in state snapshot.")), - |b| { - serde_json::from_slice::(&b) - .map_err(|_| Error::bad_database("Invalid PDU in db.")) - }, - ) + match self.pduid_pdu.get(&pduid)? { + Some(b) => serde_json::from_slice::(&b) + .map_err(|_| Error::bad_database("Invalid PDU in db.")), + None => self + .pduid_outlierpdu + .get(pduid)? + .map(|b| { + serde_json::from_slice::(&b) + .map_err(|_| Error::bad_database("Invalid PDU in db.")) + }) + .ok_or_else(|| { + Error::bad_database("Event is not in pdu tree or outliers.") + })?, + } }) .filter_map(|r| r.ok()) .map(|pdu| { @@ -137,12 +145,20 @@ impl Rooms { Ok::<_, Error>(Some(( pdu_id.clone().into(), - serde_json::from_slice::( - &self.pduid_pdu.get(&pdu_id)?.ok_or_else(|| { - Error::bad_database("PDU in state not found in database.") - })?, - ) - .map_err(|_| Error::bad_database("Invalid PDU bytes in room state."))?, + match self.pduid_pdu.get(&pdu_id)? { + Some(b) => serde_json::from_slice::(&b) + .map_err(|_| Error::bad_database("Invalid PDU in db."))?, + None => self + .pduid_outlierpdu + .get(pdu_id)? + .map(|b| { + serde_json::from_slice::(&b) + .map_err(|_| Error::bad_database("Invalid PDU in db.")) + }) + .ok_or_else(|| { + Error::bad_database("Event is not in pdu tree or outliers.") + })??, + }, ))) }) } else { @@ -307,9 +323,12 @@ impl Rooms { .get(event_id.as_bytes())? .map_or(Ok(None), |pdu_id| { Ok(Some( - serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| { - Error::bad_database("eventid_pduid points to nonexistent pdu.") - })?) + serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { + Some(b) => b, + None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| { + Error::bad_database("Event is not in pdu tree or outliers.") + })?, + }) .map_err(|_| Error::bad_database("Invalid PDU in db."))?, )) }) @@ -328,13 +347,17 @@ impl Rooms { .get(event_id.as_bytes())? .map_or(Ok(None), |pdu_id| { Ok(Some( - serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| { - Error::bad_database("eventid_pduid points to nonexistent pdu.") - })?) + serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { + Some(b) => b, + None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| { + Error::bad_database("Event is not in pdu tree or outliers.") + })?, + }) .map_err(|_| Error::bad_database("Invalid PDU in db."))?, )) }) } + /// Returns the pdu. pub fn get_pdu_from_id(&self, pdu_id: &IVec) -> Result> { self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { @@ -420,23 +443,27 @@ impl Rooms { /// Returns the pdu from the outlier tree. pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result> { - self.eventid_outlierpdu - .get(event_id.as_bytes())? - .map_or(Ok(None), |pdu| { - Ok(Some( - serde_json::from_slice(&pdu) - .map_err(|_| Error::bad_database("Invalid PDU in db."))?, - )) + if let Some(id) = self.eventid_pduid.get(event_id.as_bytes())? { + self.pduid_outlierpdu.get(id)?.map_or(Ok(None), |pdu| { + serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) }) + } else { + Ok(None) + } } /// Returns true if the event_id was previously inserted. - pub fn append_pdu_outlier(&self, event_id: &EventId, pdu: &PduEvent) -> Result { - log::info!("Number of outlier pdu's {}", self.eventid_outlierpdu.len()); + pub fn append_pdu_outlier(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result { + log::info!("Number of outlier pdu's {}", self.pduid_outlierpdu.len()); + + // we need to be able to find it by event_id + self.eventid_pduid + .insert(pdu.event_id.as_bytes(), &*pdu_id)?; + let res = self - .eventid_outlierpdu + .pduid_outlierpdu .insert( - event_id.as_bytes(), + pdu_id, &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), ) .map(|op| op.is_some())?; @@ -484,7 +511,9 @@ impl Rooms { } // We no longer keep this pdu as an outlier - self.eventid_outlierpdu.remove(pdu.event_id().as_bytes())?; + if let Some(id) = self.eventid_pduid.remove(pdu.event_id().as_bytes())? { + self.pduid_outlierpdu.remove(id)?; + } self.replace_pdu_leaves(&pdu.room_id, leaves)?; diff --git a/src/server_server.rs b/src/server_server.rs index adf3c587..ad0a1a44 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -614,7 +614,7 @@ pub async fn send_transaction_message_route<'a>( // 7. if not timeline event: stop // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events // the events found in step 8 can be authed/resolved and appended to the DB - let (pdu, previous): (_, Vec>) = match validate_event( + let (pdu, previous): (Arc, Vec>) = match validate_event( &db, value, event_id.clone(), @@ -638,69 +638,75 @@ pub async fn send_transaction_message_route<'a>( None }; + let count = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); // 6. persist the event as an outlier. - db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?; + db.rooms.append_pdu_outlier(&pdu_id, &pdu)?; // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all // the checks in this list starting at 1. These are not timeline events. // // Step 10. check the auth of the event passes based on the calculated state of the event - let (state_at_event, incoming_auth_events): (StateMap>, Vec>) = - match db - .sending - .send_federation_request( - &db.globals, + let (mut state_at_event, incoming_auth_events): ( + StateMap>, + Vec>, + ) = match db + .sending + .send_federation_request( + &db.globals, + server_name, + get_room_state_ids::v1::Request { + room_id: pdu.room_id(), + event_id: pdu.event_id(), + }, + ) + .await + { + Ok(res) => { + let state = fetch_events( + &db, server_name, - get_room_state_ids::v1::Request { - room_id: pdu.room_id(), - event_id: pdu.event_id(), - }, + &pub_key_map, + &res.pdu_ids, + &mut auth_cache, ) - .await - { - Ok(res) => { - let state = fetch_events( + .await?; + // Sanity check: there are no conflicting events in the state we received + let mut seen = BTreeSet::new(); + for ev in &state { + // If the key is already present + if !seen.insert((&ev.kind, &ev.state_key)) { + todo!("Server sent us an invalid state") + } + } + + let state = state + .into_iter() + .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) + .collect(); + + ( + state, + fetch_events( &db, server_name, &pub_key_map, - &res.pdu_ids, + &res.auth_chain_ids, &mut auth_cache, ) - .await?; - // Sanity check: there are no conflicting events in the state we received - let mut seen = BTreeSet::new(); - for ev in &state { - // If the key is already present - if !seen.insert((&ev.kind, &ev.state_key)) { - todo!("Server sent us an invalid state") - } - } - - let state = state - .into_iter() - .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) - .collect(); - - ( - state, - fetch_events( - &db, - server_name, - &pub_key_map, - &res.auth_chain_ids, - &mut auth_cache, - ) - .await?, - ) - } - Err(_) => { - resolved_map.insert( - pdu.event_id().clone(), - Err("Fetching state for event failed".into()), - ); - continue; - } - }; + .await?, + ) + } + Err(_) => { + resolved_map.insert( + pdu.event_id().clone(), + Err("Fetching state for event failed".into()), + ); + continue; + } + }; // 10. This is the actual auth check for state at the event if !state_res::event_auth::auth_check( @@ -750,12 +756,25 @@ pub async fn send_transaction_message_route<'a>( // // calculate_forward_extremities takes care of adding the current state if not already in the state sets // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. - let (mut fork_states, extremities) = match calculate_forward_extremities( + let extremities = match calculate_forward_extremities(&db, &pdu).await { + Ok(fork_ids) => fork_ids, + Err(_) => { + resolved_map.insert(event_id, Err("Failed to gather forward extremities".into())); + continue; + } + }; + + // Now that the event has passed all auth it is added into the timeline, we do have to + // find the leaves otherwise we would do this sooner + append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; + + let mut fork_states = match build_forward_extremity_snapshots( &db, - &pdu, + pdu.room_id(), server_name, - &pub_key_map, current_state, + &extremities, + &pub_key_map, &mut auth_cache, ) .await @@ -767,6 +786,9 @@ pub async fn send_transaction_message_route<'a>( } }; + // Make this the state after (since we appended_incoming_pdu this should agree with our servers + // current state). + state_at_event.insert((pdu.kind(), pdu.state_key()), pdu.clone()); // add the incoming events to the mix of state snapshots // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets fork_states.insert(state_at_event.clone()); @@ -840,7 +862,7 @@ pub async fn send_transaction_message_route<'a>( ); let res = match state_res::StateResolution::resolve( - &pdu.room_id, + pdu.room_id(), &RoomVersionId::Version6, &fork_states .into_iter() @@ -865,6 +887,7 @@ pub async fn send_transaction_message_route<'a>( continue 'main_pdu_loop; } }; + let mut resolved = BTreeMap::new(); for (k, id) in res { // We should know of the event but just incase @@ -890,10 +913,9 @@ pub async fn send_transaction_message_route<'a>( }; // Add the event to the DB and update the forward extremities (via roomid_pduleaves). - append_incoming_pdu( + update_resolved_state( &db, - &pdu, - &extremities, + pdu.room_id(), if update_state { Some(state_at_forks) } else { @@ -905,7 +927,10 @@ pub async fn send_transaction_message_route<'a>( resolved_map.insert(pdu.event_id().clone(), Ok(())); } - Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) + Ok(send_transaction_message::v1::Response { + pdus: dbg!(resolved_map), + } + .into()) } /// An async function that can recursively calls itself. @@ -1036,13 +1061,14 @@ async fn fetch_check_auth_events( Ok(()) } -/// Find the event and auth it. +/// Find the event and auth it. Once the event is validated (steps 1 - 8) +/// it is appended to the outliers Tree. /// /// 1. Look in the main timeline (pduid_pdu tree) /// 2. Look at outlier pdu tree /// 3. Ask origin server over federation /// 4. TODO: Ask other servers over federation? -async fn fetch_events( +pub(crate) async fn fetch_events( db: &Database, origin: &ServerName, key_map: &PublicKeyMap, @@ -1071,6 +1097,13 @@ async fn fetch_events( .await .map_err(|_| Error::Conflict("Authentication of event failed"))?; + // create the pduid for this event but stick it in the outliers DB + let count = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_pdu_outlier(&pdu_id, &pdu)?; pdu } Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), @@ -1084,7 +1117,7 @@ async fn fetch_events( /// Search the DB for the signing keys of the given server, if we don't have them /// fetch them from the server and save to our DB. -async fn fetch_signing_keys( +pub(crate) async fn fetch_signing_keys( db: &Database, origin: &ServerName, ) -> Result> { @@ -1108,26 +1141,28 @@ async fn fetch_signing_keys( /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote). /// /// The state snapshot of the incoming event __needs__ to be added to the resulting list. -async fn calculate_forward_extremities( +pub(crate) async fn calculate_forward_extremities( db: &Database, pdu: &PduEvent, - origin: &ServerName, - pub_key_map: &PublicKeyMap, - current_state: BTreeMap<(EventType, Option), Arc>, - auth_cache: &mut EventMap>, -) -> Result<(BTreeSet>>, Vec)> { +) -> Result> { let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; let mut is_incoming_leaf = true; // Make sure the incoming event is not already a forward extremity // FIXME: I think this could happen if different servers send us the same event?? - // + if current_leaves.contains(pdu.event_id()) { + is_incoming_leaf = false; + // Not sure what to do here + } + // If the incoming event is already referenced by an existing event // then do nothing - it's not a candidate to be a new extremity if // it has been referenced. - if current_leaves.contains(pdu.event_id()) || db.rooms.get_pdu_id(pdu.event_id())?.is_some() { - is_incoming_leaf = false; - // Not sure what to do here + // + // We first check if know of the event and then don't include it as a forward + // extremity if it is a timeline event + if db.rooms.get_pdu_id(pdu.event_id())?.is_some() { + is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some(); } // TODO: @@ -1144,11 +1179,34 @@ async fn calculate_forward_extremities( } } - let current_hash = db.rooms.current_state_hash(pdu.room_id())?; + // Add the incoming event only if it is a leaf, we do this after fetching all the + // state since we know we have already fetched the state of the incoming event so lets + // not do it again! + if is_incoming_leaf { + current_leaves.push(pdu.event_id().clone()); + } + + Ok(current_leaves) +} + +/// This should always be called after the incoming event has been appended to the DB. +/// +/// This guarentees that the incoming event will be in the state sets (at least our servers +/// and the sending server). +pub(crate) async fn build_forward_extremity_snapshots( + db: &Database, + room_id: &RoomId, + origin: &ServerName, + current_state: StateMap>, + current_leaves: &[EventId], + pub_key_map: &PublicKeyMap, + auth_cache: &mut EventMap>, +) -> Result>>> { + let current_hash = db.rooms.current_state_hash(room_id)?; let mut includes_current_state = false; let mut fork_states = BTreeSet::new(); - for id in ¤t_leaves { + for id in current_leaves { if let Some(id) = db.rooms.get_pdu_id(id)? { let state_hash = db .rooms @@ -1158,14 +1216,21 @@ async fn calculate_forward_extremities( if current_hash.as_ref() == Some(&state_hash) { includes_current_state = true; } - let state = db + + let mut state_before = db .rooms - .state_full(&pdu.room_id, &state_hash)? + .state_full(room_id, &state_hash)? .into_iter() .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) - .collect(); + .collect::>(); - fork_states.insert(state); + // Now it's the state after + if let Some(pdu) = db.rooms.get_pdu_from_id(&id)? { + let key = (pdu.kind.clone(), pdu.state_key()); + state_before.insert(key, Arc::new(pdu)); + } + + fork_states.insert(state_before); } else { let res = db .sending @@ -1173,7 +1238,7 @@ async fn calculate_forward_extremities( &db.globals, origin, get_room_state_ids::v1::Request { - room_id: pdu.room_id(), + room_id, event_id: id, }, ) @@ -1181,41 +1246,38 @@ async fn calculate_forward_extremities( // TODO: This only adds events to the auth_cache, there is for sure a better way to // do this... - fetch_events(&db, origin, &pub_key_map, &res.auth_chain_ids, auth_cache).await?; + fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?; - let state = fetch_events(&db, origin, &pub_key_map, &res.pdu_ids, auth_cache) + let mut state_before = fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache) .await? .into_iter() .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) - .collect(); + .collect::>(); - fork_states.insert(state); + if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache) + .await? + .pop() + { + let key = (pdu.kind.clone(), pdu.state_key()); + state_before.insert(key, pdu); + } + + // Now it's the state after + fork_states.insert(state_before); } } - // Add the incoming event only if it is a leaf, we do this after fetching all the - // state since we know we have already fetched the state of the incoming event so lets - // not do it again! - if is_incoming_leaf { - current_leaves.push(pdu.event_id().clone()); - } - // This guarantees that our current room state is included if !includes_current_state && current_hash.is_some() { fork_states.insert(current_state); } - Ok((fork_states, current_leaves)) + Ok(fork_states) } -/// Update the room state to be the resolved state and add the fully auth'ed event -/// to the DB. -/// -/// TODO: Since all these events passed state resolution can we trust them to add -fn append_incoming_pdu( +pub(crate) fn update_resolved_state( db: &Database, - pdu: &PduEvent, - new_room_leaves: &[EventId], + room_id: &RoomId, state: Option>>, ) -> Result<()> { // Update the state of the room if needed @@ -1236,44 +1298,50 @@ fn append_incoming_pdu( ); } None => { - let count = db.globals.next_count()?; - let mut pdu_id = pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - // TODO: can we use are current state if we just add this event to the end of our - // pduid_pdu tree?? - let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; - - db.rooms.append_pdu( - &*pdu, - utils::to_canonical_object(&*pdu).expect("Pdu is valid canonical object"), - count, - pdu_id.clone().into(), - &new_room_leaves, - &db, - )?; - // TODO: is this ok... - db.rooms.set_room_state(&pdu.room_id, &statehashid)?; - new_state.insert( - ( - ev_type, - state_k.ok_or_else(|| { - Error::Conflict("State contained non state event") - })?, - ), - pdu_id.to_vec(), - ); + error!("We didn't append an event as an outlier\n{:?}", pdu); } } } - info!("Force update of state for {:?}", pdu); - - db.rooms - .force_state(pdu.room_id(), new_state, &db.globals)?; + db.rooms.force_state(room_id, new_state, &db.globals)?; } + Ok(()) +} + +/// Append the incoming event setting the state snapshot to the state from the +/// server that sent the event. +pub(crate) fn append_incoming_pdu( + db: &Database, + pdu: &PduEvent, + new_room_leaves: &[EventId], + state: &StateMap>, +) -> Result<()> { + // Update the state of the room if needed + // We can tell if we need to do this based on wether state resolution took place or not + let mut new_state = HashMap::new(); + for ((ev_type, state_k), pdu) in state { + match db.rooms.get_pdu_id(pdu.event_id())? { + Some(pduid) => { + new_state.insert( + ( + ev_type.clone(), + state_k + .clone() + .ok_or_else(|| Error::Conflict("State contained non state event"))?, + ), + pduid.to_vec(), + ); + } + None => { + error!("We didn't append an event as an outlier\n{:?}", pdu); + } + } + } + + db.rooms + .force_state(pdu.room_id(), new_state, &db.globals)?; + let count = db.globals.next_count()?; let mut pdu_id = pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); @@ -1281,7 +1349,7 @@ fn append_incoming_pdu( // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; + let state_hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; db.rooms.append_pdu( pdu, @@ -1292,9 +1360,7 @@ fn append_incoming_pdu( &db, )?; - // We set the room state after inserting the pdu, so that we never have a moment in time - // where events in the current room state do not exist - db.rooms.set_room_state(&pdu.room_id, &statehashid)?; + db.rooms.set_room_state(pdu.room_id(), &state_hash)?; for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;