From 100307c9366383d8c612a464dfcee542e97f9d44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 17 Mar 2021 22:30:25 +0100 Subject: [PATCH] improvement: optimize state storage --- src/client_server/membership.rs | 12 +- src/client_server/profile.rs | 2 - src/client_server/room.rs | 4 +- src/client_server/state.rs | 12 +- src/client_server/sync.rs | 45 +-- src/database.rs | 12 +- src/database/pusher.rs | 3 +- src/database/rooms.rs | 469 +++++++++++++++++++------------- src/server_server.rs | 36 ++- 9 files changed, 341 insertions(+), 254 deletions(-) diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index d63fa029..d571eaae 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -106,7 +106,6 @@ pub async fn leave_room_route( ErrorKind::BadState, "Cannot leave a room you are not a member of.", ))? - .1 .content, ) .expect("from_value::> can never fail") @@ -195,7 +194,6 @@ pub async fn kick_user_route( ErrorKind::BadState, "Cannot kick member that's not in the room.", ))? - .1 .content, ) .expect("Raw::from_value always works") @@ -251,7 +249,7 @@ pub async fn ban_user_route( is_direct: None, third_party_invite: None, }), - |(_, event)| { + |event| { let mut event = serde_json::from_value::>(event.content) .expect("Raw::from_value always works") @@ -302,7 +300,6 @@ pub async fn unban_user_route( ErrorKind::BadState, "Cannot unban a user who is not banned.", ))? - .1 .content, ) .expect("from_value::> can never fail") @@ -617,10 +614,7 @@ async fn join_room_by_id_helper( &db.globals, )?; } - let mut long_id = room_id.as_bytes().to_vec(); - long_id.push(0xff); - long_id.extend_from_slice(id.as_bytes()); - state.insert((pdu.kind.clone(), state_key.clone()), long_id); + state.insert((pdu.kind.clone(), state_key.clone()), pdu.event_id.clone()); } } @@ -629,7 +623,7 @@ async fn join_room_by_id_helper( pdu.kind.clone(), pdu.state_key.clone().expect("join event has state key"), ), - pdu_id.clone(), + pdu.event_id.clone(), ); db.rooms.force_state(room_id, state, &db.globals)?; diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 7e57c1ef..9bcb2892 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -49,7 +49,6 @@ pub async fn set_displayname_route( "Tried to send displayname update for user not in the room.", ) })? - .1 .content .clone(), ) @@ -144,7 +143,6 @@ pub async fn set_avatar_url_route( "Tried to send avatar url update for user not in the room.", ) })? - .1 .content .clone(), ) diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 409028c2..399677f6 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -380,7 +380,6 @@ pub async fn upgrade_room_route( db.rooms .room_state_get(&body.room_id, &EventType::RoomCreate, "")? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? - .1 .content, ) .expect("Raw::from_value always works") @@ -452,7 +451,7 @@ pub async fn upgrade_room_route( // Replicate transferable state events to the new room for event_type in transferable_state_events { let event_content = match db.rooms.room_state_get(&body.room_id, &event_type, "")? { - Some((_, v)) => v.content.clone(), + Some(v) => v.content.clone(), None => continue, // Skipping missing events. }; @@ -482,7 +481,6 @@ pub async fn upgrade_room_route( db.rooms .room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? - .1 .content, ) .expect("database contains invalid PDU") diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 57bf7e56..54c5fa5c 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -112,7 +112,7 @@ pub async fn get_state_events_route( && !matches!( db.rooms .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? - .map(|(_, event)| { + .map(|event| { serde_json::from_value::(event.content) .map_err(|_| { Error::bad_database( @@ -159,7 +159,7 @@ pub async fn get_state_events_for_key_route( && !matches!( db.rooms .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? - .map(|(_, event)| { + .map(|event| { serde_json::from_value::(event.content) .map_err(|_| { Error::bad_database( @@ -183,8 +183,7 @@ pub async fn get_state_events_for_key_route( .ok_or(Error::BadRequest( ErrorKind::NotFound, "State event not found.", - ))? - .1; + ))?; Ok(get_state_events_for_key::Response { content: serde_json::value::to_raw_value(&event.content) @@ -211,7 +210,7 @@ pub async fn get_state_events_for_empty_key_route( && !matches!( db.rooms .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? - .map(|(_, event)| { + .map(|event| { serde_json::from_value::(event.content) .map_err(|_| { Error::bad_database( @@ -235,8 +234,7 @@ pub async fn get_state_events_for_empty_key_route( .ok_or(Error::BadRequest( ErrorKind::NotFound, "State event not found.", - ))? - .1; + ))?; Ok(get_state_events_for_empty_key::Response { content: serde_json::value::to_raw_value(&event.content) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 6551b2a2..280632b5 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -96,7 +96,7 @@ pub async fn sync_events_route( // Database queries: - let current_state_hash = db.rooms.current_state_hash(&room_id)?; + let current_shortstatehash = db.rooms.current_shortstatehash(&room_id)?; // These type is Option>. The outer Option is None when there is no event between // since and the current room state, meaning there should be no updates. @@ -109,9 +109,11 @@ pub async fn sync_events_route( .next() .is_some(); - let since_state_hash = first_pdu_before_since - .as_ref() - .map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?); + let since_shortstatehash = first_pdu_before_since.as_ref().map(|pdu| { + db.rooms + .pdu_shortstatehash(&pdu.as_ref().ok()?.1.event_id) + .ok()? + }); let ( heroes, @@ -119,7 +121,7 @@ pub async fn sync_events_route( invited_member_count, joined_since_last_sync, state_events, - ) = if pdus_after_since && Some(¤t_state_hash) != since_state_hash.as_ref() { + ) = if pdus_after_since && Some(current_shortstatehash) != since_shortstatehash { let current_state = db.rooms.room_state_full(&room_id)?; let current_members = current_state .iter() @@ -129,11 +131,18 @@ pub async fn sync_events_route( let encrypted_room = current_state .get(&(EventType::RoomEncryption, "".to_owned())) .is_some(); - let since_state = since_state_hash.as_ref().map(|state_hash| { - state_hash - .as_ref() - .and_then(|state_hash| db.rooms.state_full(&room_id, &state_hash).ok()) - }); + let since_state = since_shortstatehash + .as_ref() + .map(|since_shortstatehash| { + Ok::<_, Error>( + since_shortstatehash + .map(|since_shortstatehash| { + db.rooms.state_full(&room_id, since_shortstatehash) + }) + .transpose()?, + ) + }) + .transpose()?; let since_encryption = since_state.as_ref().map(|state| { state @@ -496,16 +505,16 @@ pub async fn sync_events_route( .and_then(|pdu| pdu.ok()) .and_then(|pdu| { db.rooms - .pdu_state_hash(&pdu.0) + .pdu_shortstatehash(&pdu.1.event_id) .ok()? .ok_or_else(|| Error::bad_database("Pdu in db doesn't have a state hash.")) .ok() }) - .and_then(|state_hash| { + .and_then(|shortstatehash| { db.rooms .state_get( &room_id, - &state_hash, + shortstatehash, &EventType::RoomMember, sender_user.as_str(), ) @@ -513,14 +522,14 @@ pub async fn sync_events_route( .ok_or_else(|| Error::bad_database("State hash in db doesn't have a state.")) .ok() }) - .and_then(|(pdu_id, pdu)| { + .and_then(|pdu| { serde_json::from_value::>( pdu.content.clone(), ) .expect("Raw::from_value always works") .deserialize() .map_err(|_| Error::bad_database("Invalid PDU in database.")) - .map(|content| (pdu_id, pdu, content)) + .map(|content| (pdu, content)) .ok() }) { since_member @@ -529,7 +538,7 @@ pub async fn sync_events_route( continue; }; - let left_since_last_sync = since_member.2.membership == MembershipState::Join; + let left_since_last_sync = since_member.1.membership == MembershipState::Join; let left_room = if left_since_last_sync { device_list_left.extend( @@ -550,10 +559,10 @@ pub async fn sync_events_route( let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?; let mut room_events = pdus .filter_map(|pdu| pdu.ok()) // Filter out buggy events - .take_while(|(pdu_id, _)| since_member.0 != pdu_id) + .take_while(|(pdu_id, pdu)| &since_member.0 != pdu) .map(|(_, pdu)| pdu.to_sync_room_event()) .collect::>(); - room_events.push(since_member.1.to_sync_room_event()); + room_events.push(since_member.0.to_sync_room_event()); sync_events::LeftRoom { account_data: sync_events::AccountData { events: Vec::new() }, diff --git a/src/database.rs b/src/database.rs index 17177e8f..f65d5e01 100644 --- a/src/database.rs +++ b/src/database.rs @@ -163,10 +163,14 @@ impl Database { roomuserid_invited: db.open_tree("roomuserid_invited")?, userroomid_left: db.open_tree("userroomid_left")?, - statekey_short: db.open_tree("statekey_short")?, - stateid_pduid: db.open_tree("stateid_pduid")?, - pduid_statehash: db.open_tree("pduid_statehash")?, - roomid_statehash: db.open_tree("roomid_statehash")?, + statekey_shortstatekey: db.open_tree("statekey_shortstatekey")?, + stateid_shorteventid: db.open_tree("stateid_shorteventid")?, + eventid_shorteventid: db.open_tree("eventid_shorteventid")?, + shorteventid_eventid: db.open_tree("shorteventid_eventid")?, + shorteventid_shortstatehash: db.open_tree("eventid_shortstatehash")?, + roomid_shortstatehash: db.open_tree("roomid_shortstatehash")?, + statehash_shortstatehash: db.open_tree("statehash_shortstatehash")?, + eventid_outlierpdu: db.open_tree("roomeventid_outlierpdu")?, prevevent_parent: db.open_tree("prevevent_parent")?, }, diff --git a/src/database/pusher.rs b/src/database/pusher.rs index 59ccbef5..b6c6cf49 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -312,7 +312,6 @@ pub async fn send_push_notice( && db .rooms .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? - .map(|(_, pl)| pl) .map(deserialize) .flatten() .map_or(false, power_level_cmp) @@ -514,7 +513,7 @@ async fn send_notice( let room_name = db .rooms .room_state_get(&event.room_id, &EventType::RoomName, "")? - .map(|(_, pdu)| match pdu.content.get("name") { + .map(|pdu| match pdu.content.get("name") { Some(serde_json::Value::String(s)) => Some(s.to_string()), _ => None, }) diff --git a/src/database/rooms.rs b/src/database/rooms.rs index c908d517..a3425660 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -59,15 +59,19 @@ pub struct Rooms { pub(super) userroomid_left: sled::Tree, /// Remember the current state hash of a room. - pub(super) roomid_statehash: sled::Tree, + pub(super) roomid_shortstatehash: sled::Tree, /// Remember the state hash at events in the past. - pub(super) pduid_statehash: sled::Tree, - /// The state for a given state hash. - /// - /// StateKey = EventType + StateKey, Short = Count - pub(super) statekey_short: sled::Tree, - /// StateId = StateHash + Short, PduId = Count (without roomid) - pub(super) stateid_eventid: sled::Tree, + pub(super) shorteventid_shortstatehash: sled::Tree, + /// StateKey = EventType + StateKey, ShortStateKey = Count + pub(super) statekey_shortstatekey: sled::Tree, + pub(super) shorteventid_eventid: sled::Tree, + /// ShortEventId = Count + pub(super) eventid_shorteventid: sled::Tree, + /// ShortEventId = Count + pub(super) statehash_shortstatehash: sled::Tree, + /// ShortStateHash = Count + /// StateId = ShortStateHash + ShortStateKey + pub(super) stateid_shorteventid: sled::Tree, /// RoomId + EventId -> outlier PDU. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. @@ -81,37 +85,65 @@ impl Rooms { /// Builds a StateMap by iterating over all keys that start /// with state_hash, this gives the full state for the given state_hash. #[tracing::instrument(skip(self))] - pub fn state_full( + pub fn state_full_ids( &self, room_id: &RoomId, state_hash: &StateHashId, - ) -> Result> { - self.stateid_pduid - .scan_prefix(&state_hash) + ) -> Result> { + let shortstatehash = self + .statehash_shortstatehash + .get(state_hash)? + .ok_or_else(|| Error::bad_database("Asked for statehash that does not exist."))?; + + Ok(self + .stateid_shorteventid + .scan_prefix(&shortstatehash) .values() - .map(|short_id| { - let short_id = short_id?; - let mut long_id = room_id.as_bytes().to_vec(); - long_id.push(0xff); - long_id.extend_from_slice(&short_id); - match self.pduid_pdu.get(&long_id)? { - Some(b) => serde_json::from_slice::(&b) - .map_err(|_| Error::bad_database("Invalid PDU in db.")), - None => self - .eventid_outlierpdu - .get(short_id)? - .map(|b| { - serde_json::from_slice::(&b) - .map_err(|_| Error::bad_database("Invalid PDU in db.")) - }) - .ok_or_else(|| { - Error::bad_database("Event is not in pdu tree or outliers.") - })?, - } + .filter_map(|r| r.ok()) + .map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten()) + .flatten() + .map(|bytes| { + Ok::<_, Error>( + EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") + })?) + .map_err(|_| { + Error::bad_database("EventId in stateid_shorteventid is invalid.") + })?, + ) }) .filter_map(|r| r.ok()) + .collect()) + } + + #[tracing::instrument(skip(self))] + pub fn state_full( + &self, + room_id: &RoomId, + shortstatehash: u64, + ) -> Result> { + Ok(self + .stateid_shorteventid + .scan_prefix(shortstatehash.to_be_bytes()) + .values() + .filter_map(|r| r.ok()) + .map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten()) + .flatten() + .map(|bytes| { + Ok::<_, Error>( + EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("EventID in stateid_shorteventid is invalid unicode.") + })?) + .map_err(|_| { + Error::bad_database("EventId in stateid_shorteventid is invalid.") + })?, + ) + }) + .filter_map(|r| r.ok()) + .map(|eventid| self.get_pdu(&eventid)) + .filter_map(|r| r.ok().flatten()) .map(|pdu| { - Ok(( + Ok::<_, Error>(( ( pdu.kind.clone(), pdu.state_key @@ -122,7 +154,8 @@ impl Rooms { pdu, )) }) - .collect() + .filter_map(|r| r.ok()) + .collect()) } /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). @@ -130,71 +163,73 @@ impl Rooms { pub fn state_get( &self, room_id: &RoomId, - state_hash: &StateHashId, + shortstatehash: u64, event_type: &EventType, state_key: &str, - ) -> Result> { + ) -> Result> { let mut key = event_type.to_string().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(&state_key.as_bytes()); - debug!("Looking for {} {:?}", event_type, state_key); + let shortstatekey = self.statekey_shortstatekey.get(&key)?; - let short = self.statekey_short.get(&key)?; + if let Some(shortstatekey) = shortstatekey { + let mut stateid = shortstatehash.to_be_bytes().to_vec(); + stateid.extend_from_slice(&shortstatekey); - if let Some(short) = short { - let mut stateid = state_hash.to_vec(); - stateid.push(0xff); - stateid.extend_from_slice(&short); - - debug!("trying to find pduid/eventid. short: {:?}", stateid); - self.stateid_pduid + self.stateid_shorteventid .get(&stateid)? - .map_or(Ok(None), |short_id| { - debug!("found in stateid_pduid"); - let mut long_id = room_id.as_bytes().to_vec(); - long_id.push(0xff); - long_id.extend_from_slice(&short_id); - - Ok::<_, Error>(Some(match self.pduid_pdu.get(&long_id)? { - Some(b) => ( - long_id.clone().into(), - serde_json::from_slice::(&b) - .map_err(|_| Error::bad_database("Invalid PDU in db."))?, - ), - None => { - debug!("looking in outliers"); - ( - short_id.clone().into(), - self.eventid_outlierpdu - .get(&short_id)? - .map(|b| { - serde_json::from_slice::(&b) - .map_err(|_| Error::bad_database("Invalid PDU in db.")) - }) - .ok_or_else(|| { - Error::bad_database("Event is not in pdu tree or outliers.") - })??, + .map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten()) + .flatten() + .map(|bytes| { + Ok::<_, Error>( + EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database( + "EventID in stateid_shorteventid is invalid unicode.", ) - } - })) + })?) + .map_err(|_| { + Error::bad_database("EventId in stateid_shorteventid is invalid.") + })?, + ) }) + .map(|r| r.ok()) + .flatten() + .map_or(Ok(None), |event_id| self.get_pdu(&event_id)) } else { - warn!("short id not found"); Ok(None) } } /// Returns the state hash for this pdu. #[tracing::instrument(skip(self))] - pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result> { - Ok(self.pduid_statehash.get(pdu_id)?) + pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { + self.eventid_shorteventid + .get(event_id.as_bytes())? + .map_or(Ok(None), |shorteventid| { + Ok(self.shorteventid_shortstatehash.get(shorteventid)?.map_or( + Ok::<_, Error>(None), + |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database( + "Invalid shortstatehash bytes in shorteventid_shortstatehash", + ) + })?)) + }, + )?) + }) } /// Returns the last state hash key added to the db for the given room. #[tracing::instrument(skip(self))] - pub fn current_state_hash(&self, room_id: &RoomId) -> Result> { - Ok(self.roomid_statehash.get(room_id.as_bytes())?) + pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result> { + self.roomid_shortstatehash + .get(room_id.as_bytes())? + .map_or(Ok(None), |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid shortstatehash in roomid_shortstatehash") + })?)) + }) } /// This fetches auth events from the current state. @@ -215,7 +250,7 @@ impl Rooms { let mut events = StateMap::new(); for (event_type, state_key) in auth_events { - if let Some((_, pdu)) = self.room_state_get( + if let Some(pdu) = self.room_state_get( room_id, &event_type, &state_key @@ -233,9 +268,9 @@ impl Rooms { /// Generate a new StateHash. /// /// A unique hash made from hashing all PDU ids of the state joined with 0xff. - fn calculate_hash(&self, pdu_id_bytes: &[&[u8]]) -> Result { + fn calculate_hash(&self, bytes_list: &[&[u8]]) -> Result { // We only hash the pdu's event ids, not the whole pdu - let bytes = pdu_id_bytes.join(&0xff); + let bytes = bytes_list.join(&0xff); let hash = digest::digest(&digest::SHA256, &bytes); Ok(hash.as_ref().into()) } @@ -259,41 +294,65 @@ impl Rooms { pub fn force_state( &self, room_id: &RoomId, - state: HashMap<(EventType, String), Vec>, + state: HashMap<(EventType, String), EventId>, globals: &super::globals::Globals, ) -> Result<()> { - let state_hash = - self.calculate_hash(&state.values().map(|long_id| &**long_id).collect::>())?; - let mut prefix = state_hash.to_vec(); - prefix.push(0xff); + let state_hash = self.calculate_hash( + &state + .values() + .map(|event_id| event_id.as_bytes()) + .collect::>(), + )?; - for ((event_type, state_key), long_id) in state { + let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { + Some(shortstatehash) => { + warn!("state hash already existed?!"); + shortstatehash.to_vec() + } + None => { + let shortstatehash = globals.next_count()?; + self.statehash_shortstatehash + .insert(&state_hash, &shortstatehash.to_be_bytes())?; + shortstatehash.to_be_bytes().to_vec() + } + }; + + for ((event_type, state_key), eventid) in state { let mut statekey = event_type.as_ref().as_bytes().to_vec(); statekey.push(0xff); statekey.extend_from_slice(&state_key.as_bytes()); - let short = match self.statekey_short.get(&statekey)? { - Some(short) => utils::u64_from_bytes(&short) - .map_err(|_| Error::bad_database("Invalid short bytes in statekey_short."))?, + let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { + Some(shortstatekey) => shortstatekey.to_vec(), None => { - let short = globals.next_count()?; - self.statekey_short - .insert(&statekey, &short.to_be_bytes())?; - short + let shortstatekey = globals.next_count()?; + self.statekey_shortstatekey + .insert(&statekey, &shortstatekey.to_be_bytes())?; + shortstatekey.to_be_bytes().to_vec() } }; - // If it's a pdu id we remove the room id, if it's an event id we leave it the same - let short_id = long_id.splitn(2, |&b| b == 0xff).nth(1).unwrap_or(&long_id); + let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? { + Some(shorteventid) => shorteventid.to_vec(), + None => { + let shorteventid = globals.next_count()?; + self.eventid_shorteventid + .insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?; + shorteventid.to_be_bytes().to_vec() + } + }; - let mut state_id = prefix.clone(); - state_id.extend_from_slice(&short.to_be_bytes()); - debug!("inserting {:?} into {:?}", short_id, state_id); - self.stateid_pduid.insert(state_id, short_id)?; + let mut state_id = shortstatehash.clone(); + state_id.extend_from_slice(&shortstatekey); + + self.stateid_shorteventid + .insert(&*state_id, &*shorteventid)?; } - self.roomid_statehash - .insert(room_id.as_bytes(), &*state_hash)?; + self.roomid_shortstatehash + .insert(room_id.as_bytes(), &*shortstatehash)?; Ok(()) } @@ -304,8 +363,8 @@ impl Rooms { &self, room_id: &RoomId, ) -> Result> { - if let Some(current_state_hash) = self.current_state_hash(room_id)? { - self.state_full(&room_id, ¤t_state_hash) + if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { + self.state_full(&room_id, current_shortstatehash) } else { Ok(BTreeMap::new()) } @@ -318,9 +377,9 @@ impl Rooms { room_id: &RoomId, event_type: &EventType, state_key: &str, - ) -> Result> { - if let Some(current_state_hash) = self.current_state_hash(room_id)? { - self.state_get(&room_id, ¤t_state_hash, event_type, state_key) + ) -> Result> { + if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? { + self.state_get(&room_id, current_shortstatehash, event_type, state_key) } else { Ok(None) } @@ -377,12 +436,6 @@ impl Rooms { .map_or(Ok(None), |pdu_id| Ok(Some(pdu_id))) } - pub fn get_long_id(&self, event_id: &EventId) -> Result> { - Ok(self - .get_pdu_id(event_id)? - .map_or_else(|| event_id.as_bytes().to_vec(), |pduid| pduid.to_vec())) - } - /// Returns the pdu. /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. @@ -538,15 +591,15 @@ impl Rooms { .entry("unsigned".to_owned()) .or_insert_with(|| CanonicalJsonValue::Object(Default::default())) { - if let Some(prev_state_hash) = self.pdu_state_hash(&pdu_id).unwrap() { + if let Some(shortstatehash) = self.pdu_shortstatehash(&pdu.event_id).unwrap() { if let Some(prev_state) = self - .state_get(&pdu.room_id, &prev_state_hash, &pdu.kind, &state_key) + .state_get(&pdu.room_id, shortstatehash, &pdu.kind, &state_key) .unwrap() { unsigned.insert( "prev_content".to_owned(), CanonicalJsonValue::Object( - utils::to_canonical_object(prev_state.1.content) + utils::to_canonical_object(prev_state.content) .expect("event is valid, we just created it"), ), ); @@ -574,7 +627,7 @@ impl Rooms { self.pduid_pdu.insert( &pdu_id, - &*serde_json::to_string(dbg!(&pdu_json)) + &*serde_json::to_string(&pdu_json) .expect("CanonicalJsonObject is always a valid String"), )?; @@ -706,71 +759,112 @@ impl Rooms { /// Generates a new StateHash and associates it with the incoming event. /// /// This adds all current state events (not including the incoming event) - /// to `stateid_pduid` and adds the incoming event to `pduid_statehash`. - /// The incoming event is the `pdu_id` passed to this method. + /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. pub fn append_to_state( &self, - new_pdu_id: &[u8], new_pdu: &PduEvent, globals: &super::globals::Globals, - ) -> Result { - let old_state = - if let Some(old_state_hash) = self.roomid_statehash.get(new_pdu.room_id.as_bytes())? { - // Store state for event. The state does not include the event itself. - // Instead it's the state before the pdu, so the room's old state. - self.pduid_statehash.insert(new_pdu_id, &old_state_hash)?; - if new_pdu.state_key.is_none() { - return Ok(old_state_hash); - } + ) -> Result { + let old_state = if let Some(old_shortstatehash) = + self.roomid_shortstatehash.get(new_pdu.room_id.as_bytes())? + { + // Store state for event. The state does not include the event itself. + // Instead it's the state before the pdu, so the room's old state. - let mut prefix = old_state_hash.to_vec(); - prefix.push(0xff); - self.stateid_pduid - .scan_prefix(&prefix) - .filter_map(|pdu| pdu.map_err(|e| error!("{}", e)).ok()) - // Chop the old state_hash out leaving behind the short key (u64) - .map(|(k, v)| (k.subslice(prefix.len(), k.len() - prefix.len()), v)) - .collect::>() - } else { - HashMap::new() - }; - - if let Some(state_key) = &new_pdu.state_key { - let mut new_state = old_state; - let mut pdu_key = new_pdu.kind.as_ref().as_bytes().to_vec(); - pdu_key.push(0xff); - pdu_key.extend_from_slice(state_key.as_bytes()); - - let short = match self.statekey_short.get(&pdu_key)? { - Some(short) => utils::u64_from_bytes(&short) - .map_err(|_| Error::bad_database("Invalid short bytes in statekey_short."))?, + let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? { + Some(shorteventid) => shorteventid.to_vec(), None => { - let short = globals.next_count()?; - self.statekey_short.insert(&pdu_key, &short.to_be_bytes())?; - short + let shorteventid = globals.next_count()?; + self.eventid_shorteventid + .insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?; + shorteventid.to_be_bytes().to_vec() } }; - let new_pdu_id_short = new_pdu_id - .splitn(2, |&b| b == 0xff) - .nth(1) - .ok_or_else(|| Error::bad_database("Invalid pduid in state."))?; - - new_state.insert((&short.to_be_bytes()).into(), new_pdu_id_short.into()); - - let new_state_hash = - self.calculate_hash(&new_state.values().map(|b| &**b).collect::>())?; - - let mut key = new_state_hash.to_vec(); - key.push(0xff); - - for (short, short_pdu_id) in new_state { - let mut state_id = key.clone(); - state_id.extend_from_slice(&short); - self.stateid_pduid.insert(&state_id, &short_pdu_id)?; + self.shorteventid_shortstatehash + .insert(shorteventid, &old_shortstatehash)?; + if new_pdu.state_key.is_none() { + return utils::u64_from_bytes(&old_shortstatehash).map_err(|_| { + Error::bad_database("Invalid shortstatehash in roomid_shortstatehash.") + }); } - Ok(new_state_hash) + self.stateid_shorteventid + .scan_prefix(&old_shortstatehash) + .filter_map(|pdu| pdu.map_err(|e| error!("{}", e)).ok()) + // Chop the old_shortstatehash out leaving behind the short state key + .map(|(k, v)| { + ( + k.subslice(old_shortstatehash.len(), k.len() - old_shortstatehash.len()), + v, + ) + }) + .collect::>() + } else { + HashMap::new() + }; + + if let Some(state_key) = &new_pdu.state_key { + let mut new_state: HashMap = old_state; + + let mut new_state_key = new_pdu.kind.as_ref().as_bytes().to_vec(); + new_state_key.push(0xff); + new_state_key.extend_from_slice(state_key.as_bytes()); + + let shortstatekey = match self.statekey_shortstatekey.get(&new_state_key)? { + Some(shortstatekey) => shortstatekey.to_vec(), + None => { + let shortstatekey = globals.next_count()?; + self.statekey_shortstatekey + .insert(&new_state_key, &shortstatekey.to_be_bytes())?; + shortstatekey.to_be_bytes().to_vec() + } + }; + + let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? { + Some(shorteventid) => shorteventid.to_vec(), + None => { + let shorteventid = globals.next_count()?; + self.eventid_shorteventid + .insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?; + shorteventid.to_be_bytes().to_vec() + } + }; + + new_state.insert(shortstatekey.into(), shorteventid.into()); + + let new_state_hash = self.calculate_hash( + &new_state + .values() + .map(|event_id| &**event_id) + .collect::>(), + )?; + + let shortstatehash = match self.statehash_shortstatehash.get(&new_state_hash)? { + Some(shortstatehash) => { + warn!("state hash already existed?!"); + utils::u64_from_bytes(&shortstatehash) + .map_err(|_| Error::bad_database("PDU has invalid count bytes."))? + } + None => { + let shortstatehash = globals.next_count()?; + self.statehash_shortstatehash + .insert(&new_state_hash, &shortstatehash.to_be_bytes())?; + shortstatehash + } + }; + + for (shortstatekey, shorteventid) in new_state { + let mut state_id = shortstatehash.to_be_bytes().to_vec(); + state_id.extend_from_slice(&shortstatekey); + self.stateid_shorteventid.insert(&state_id, &shorteventid)?; + } + + Ok(shortstatehash) } else { Err(Error::bad_database( "Tried to insert non-state event into room without a state.", @@ -778,9 +872,9 @@ impl Rooms { } } - pub fn set_room_state(&self, room_id: &RoomId, state_hash: &StateHashId) -> Result<()> { - self.roomid_statehash - .insert(room_id.as_bytes(), state_hash)?; + pub fn set_room_state(&self, room_id: &RoomId, shortstatehash: u64) -> Result<()> { + self.roomid_shortstatehash + .insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?; Ok(()) } @@ -833,7 +927,7 @@ impl Rooms { }, }) }, - |(_, power_levels)| { + |power_levels| { Ok(serde_json::from_value::>( power_levels.content, ) @@ -844,18 +938,15 @@ impl Rooms { )?; let sender_membership = self .room_state_get(&room_id, &EventType::RoomMember, &sender.to_string())? - .map_or( - Ok::<_, Error>(member::MembershipState::Leave), - |(_, pdu)| { - Ok( - serde_json::from_value::>(pdu.content) - .expect("Raw::from_value always works.") - .deserialize() - .map_err(|_| Error::bad_database("Invalid Member event in db."))? - .membership, - ) - }, - )?; + .map_or(Ok::<_, Error>(member::MembershipState::Leave), |pdu| { + Ok( + serde_json::from_value::>(pdu.content) + .expect("Raw::from_value always works.") + .deserialize() + .map_err(|_| Error::bad_database("Invalid Member event in db."))? + .membership, + ) + })?; let sender_power = power_levels.users.get(&sender).map_or_else( || { @@ -936,7 +1027,7 @@ impl Rooms { let mut unsigned = unsigned.unwrap_or_default(); if let Some(state_key) = &state_key { - if let Some((_, prev_pdu)) = self.room_state_get(&room_id, &event_type, &state_key)? { + if let Some(prev_pdu) = self.room_state_get(&room_id, &event_type, &state_key)? { unsigned.insert("prev_content".to_owned(), prev_pdu.content); unsigned.insert( "prev_sender".to_owned(), @@ -1014,7 +1105,7 @@ impl Rooms { // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let statehashid = self.append_to_state(&pdu_id, &pdu, &db.globals)?; + let statehashid = self.append_to_state(&pdu, &db.globals)?; // remove the self.append_pdu( @@ -1030,7 +1121,7 @@ impl Rooms { // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist - self.set_room_state(&room_id, &statehashid)?; + self.set_room_state(&room_id, statehashid)?; for server in self .room_servers(room_id) @@ -1267,7 +1358,7 @@ impl Rooms { // Check if the room has a predecessor if let Some(predecessor) = self .room_state_get(&room_id, &EventType::RoomCreate, "")? - .and_then(|(_, create)| { + .and_then(|create| { serde_json::from_value::< Raw, >(create.content) diff --git a/src/server_server.rs b/src/server_server.rs index 919d12f8..2f32b633 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1301,7 +1301,7 @@ pub(crate) async fn build_forward_extremity_snapshots( pub_key_map: &PublicKeyMap, auth_cache: &mut EventMap>, ) -> Result>>> { - let current_hash = db.rooms.current_state_hash(pdu.room_id())?; + let current_shortstatehash = db.rooms.current_shortstatehash(pdu.room_id())?; let mut includes_current_state = false; let mut fork_states = BTreeSet::new(); @@ -1309,39 +1309,37 @@ pub(crate) async fn build_forward_extremity_snapshots( if id == &pdu.event_id { continue; } - match db.rooms.get_pdu_id(id)? { + match db.rooms.get_pdu(id)? { // We can skip this because it is handled outside of this function // The current server state and incoming event state are built to be // the state after. // This would be the incoming state from the server. - Some(pduid) if db.rooms.get_pdu_from_id(&pduid)?.is_some() => { - let state_hash = db + Some(leave_pdu) => { + let pdu_shortstatehash = db .rooms - .pdu_state_hash(&pduid)? - .expect("found pdu with no statehash"); + .pdu_shortstatehash(&leave_pdu.event_id)? + .ok_or_else(|| Error::bad_database("Found pdu with no statehash in db."))?; - if current_hash.as_ref() == Some(&state_hash) { + if current_shortstatehash.as_ref() == Some(&pdu_shortstatehash) { includes_current_state = true; } let mut state_before = db .rooms - .state_full(pdu.room_id(), &state_hash)? + .state_full(pdu.room_id(), pdu_shortstatehash)? .into_iter() .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .collect::>(); // Now it's the state after - if let Some(pdu) = db.rooms.get_pdu_from_id(&pduid)? { - let key = (pdu.kind.clone(), pdu.state_key()); - state_before.insert(key, Arc::new(pdu)); - } + let key = (leave_pdu.kind.clone(), leave_pdu.state_key.clone()); + state_before.insert(key, Arc::new(leave_pdu)); fork_states.insert(state_before); } _ => { - error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind()); - return Err(Error::BadDatabase("Missing state snapshot.")); + error!("Missing state snapshot for {:?}", id); + return Err(Error::bad_database("Missing state snapshot.")); } } } @@ -1367,13 +1365,12 @@ pub(crate) fn update_resolved_state( if let Some(state) = state { let mut new_state = HashMap::new(); for ((ev_type, state_k), pdu) in state { - let long_id = db.rooms.get_long_id(&pdu.event_id)?; new_state.insert( ( ev_type, state_k.ok_or_else(|| Error::Conflict("State contained non state event"))?, ), - long_id, + pdu.event_id.clone(), ); } @@ -1396,7 +1393,6 @@ pub(crate) fn append_incoming_pdu( // We can tell if we need to do this based on wether state resolution took place or not let mut new_state = HashMap::new(); for ((ev_type, state_k), state_pdu) in state { - let long_id = db.rooms.get_long_id(state_pdu.event_id())?; new_state.insert( ( ev_type.clone(), @@ -1404,7 +1400,7 @@ pub(crate) fn append_incoming_pdu( .clone() .ok_or_else(|| Error::Conflict("State contained non state event"))?, ), - long_id.to_vec(), + state_pdu.event_id.clone(), ); } @@ -1418,7 +1414,7 @@ pub(crate) fn append_incoming_pdu( // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let state_hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; + let state_hash = db.rooms.append_to_state(&pdu, &db.globals)?; db.rooms.append_pdu( pdu, @@ -1429,7 +1425,7 @@ pub(crate) fn append_incoming_pdu( &db, )?; - db.rooms.set_room_state(pdu.room_id(), &state_hash)?; + db.rooms.set_room_state(pdu.room_id(), state_hash)?; for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { if let Some(namespaces) = appservice.1.get("namespaces") {