improvement: optimize state storage

This commit is contained in:
Timo Kösters 2021-03-17 22:30:25 +01:00
parent 44425a903a
commit 100307c936
No known key found for this signature in database
GPG Key ID: 24DA7517711A2BA4
9 changed files with 341 additions and 254 deletions

View File

@ -106,7 +106,6 @@ pub async fn leave_room_route(
ErrorKind::BadState, ErrorKind::BadState,
"Cannot leave a room you are not a member of.", "Cannot leave a room you are not a member of.",
))? ))?
.1
.content, .content,
) )
.expect("from_value::<Raw<..>> can never fail") .expect("from_value::<Raw<..>> can never fail")
@ -195,7 +194,6 @@ pub async fn kick_user_route(
ErrorKind::BadState, ErrorKind::BadState,
"Cannot kick member that's not in the room.", "Cannot kick member that's not in the room.",
))? ))?
.1
.content, .content,
) )
.expect("Raw::from_value always works") .expect("Raw::from_value always works")
@ -251,7 +249,7 @@ pub async fn ban_user_route(
is_direct: None, is_direct: None,
third_party_invite: None, third_party_invite: None,
}), }),
|(_, event)| { |event| {
let mut event = let mut event =
serde_json::from_value::<Raw<member::MemberEventContent>>(event.content) serde_json::from_value::<Raw<member::MemberEventContent>>(event.content)
.expect("Raw::from_value always works") .expect("Raw::from_value always works")
@ -302,7 +300,6 @@ pub async fn unban_user_route(
ErrorKind::BadState, ErrorKind::BadState,
"Cannot unban a user who is not banned.", "Cannot unban a user who is not banned.",
))? ))?
.1
.content, .content,
) )
.expect("from_value::<Raw<..>> can never fail") .expect("from_value::<Raw<..>> can never fail")
@ -617,10 +614,7 @@ async fn join_room_by_id_helper(
&db.globals, &db.globals,
)?; )?;
} }
let mut long_id = room_id.as_bytes().to_vec(); state.insert((pdu.kind.clone(), state_key.clone()), pdu.event_id.clone());
long_id.push(0xff);
long_id.extend_from_slice(id.as_bytes());
state.insert((pdu.kind.clone(), state_key.clone()), long_id);
} }
} }
@ -629,7 +623,7 @@ async fn join_room_by_id_helper(
pdu.kind.clone(), pdu.kind.clone(),
pdu.state_key.clone().expect("join event has state key"), pdu.state_key.clone().expect("join event has state key"),
), ),
pdu_id.clone(), pdu.event_id.clone(),
); );
db.rooms.force_state(room_id, state, &db.globals)?; db.rooms.force_state(room_id, state, &db.globals)?;

View File

@ -49,7 +49,6 @@ pub async fn set_displayname_route(
"Tried to send displayname update for user not in the room.", "Tried to send displayname update for user not in the room.",
) )
})? })?
.1
.content .content
.clone(), .clone(),
) )
@ -144,7 +143,6 @@ pub async fn set_avatar_url_route(
"Tried to send avatar url update for user not in the room.", "Tried to send avatar url update for user not in the room.",
) )
})? })?
.1
.content .content
.clone(), .clone(),
) )

View File

@ -380,7 +380,6 @@ pub async fn upgrade_room_route(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomCreate, "")? .room_state_get(&body.room_id, &EventType::RoomCreate, "")?
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
.1
.content, .content,
) )
.expect("Raw::from_value always works") .expect("Raw::from_value always works")
@ -452,7 +451,7 @@ pub async fn upgrade_room_route(
// Replicate transferable state events to the new room // Replicate transferable state events to the new room
for event_type in transferable_state_events { for event_type in transferable_state_events {
let event_content = match db.rooms.room_state_get(&body.room_id, &event_type, "")? { let event_content = match db.rooms.room_state_get(&body.room_id, &event_type, "")? {
Some((_, v)) => v.content.clone(), Some(v) => v.content.clone(),
None => continue, // Skipping missing events. None => continue, // Skipping missing events.
}; };
@ -482,7 +481,6 @@ pub async fn upgrade_room_route(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")? .room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")?
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
.1
.content, .content,
) )
.expect("database contains invalid PDU") .expect("database contains invalid PDU")

View File

@ -112,7 +112,7 @@ pub async fn get_state_events_route(
&& !matches!( && !matches!(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
.map(|(_, event)| { .map(|event| {
serde_json::from_value::<HistoryVisibilityEventContent>(event.content) serde_json::from_value::<HistoryVisibilityEventContent>(event.content)
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
@ -159,7 +159,7 @@ pub async fn get_state_events_for_key_route(
&& !matches!( && !matches!(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
.map(|(_, event)| { .map(|event| {
serde_json::from_value::<HistoryVisibilityEventContent>(event.content) serde_json::from_value::<HistoryVisibilityEventContent>(event.content)
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
@ -183,8 +183,7 @@ pub async fn get_state_events_for_key_route(
.ok_or(Error::BadRequest( .ok_or(Error::BadRequest(
ErrorKind::NotFound, ErrorKind::NotFound,
"State event not found.", "State event not found.",
))? ))?;
.1;
Ok(get_state_events_for_key::Response { Ok(get_state_events_for_key::Response {
content: serde_json::value::to_raw_value(&event.content) content: serde_json::value::to_raw_value(&event.content)
@ -211,7 +210,7 @@ pub async fn get_state_events_for_empty_key_route(
&& !matches!( && !matches!(
db.rooms db.rooms
.room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
.map(|(_, event)| { .map(|event| {
serde_json::from_value::<HistoryVisibilityEventContent>(event.content) serde_json::from_value::<HistoryVisibilityEventContent>(event.content)
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
@ -235,8 +234,7 @@ pub async fn get_state_events_for_empty_key_route(
.ok_or(Error::BadRequest( .ok_or(Error::BadRequest(
ErrorKind::NotFound, ErrorKind::NotFound,
"State event not found.", "State event not found.",
))? ))?;
.1;
Ok(get_state_events_for_empty_key::Response { Ok(get_state_events_for_empty_key::Response {
content: serde_json::value::to_raw_value(&event.content) content: serde_json::value::to_raw_value(&event.content)

View File

@ -96,7 +96,7 @@ pub async fn sync_events_route(
// Database queries: // Database queries:
let current_state_hash = db.rooms.current_state_hash(&room_id)?; let current_shortstatehash = db.rooms.current_shortstatehash(&room_id)?;
// These type is Option<Option<_>>. The outer Option is None when there is no event between // These type is Option<Option<_>>. The outer Option is None when there is no event between
// since and the current room state, meaning there should be no updates. // since and the current room state, meaning there should be no updates.
@ -109,9 +109,11 @@ pub async fn sync_events_route(
.next() .next()
.is_some(); .is_some();
let since_state_hash = first_pdu_before_since let since_shortstatehash = first_pdu_before_since.as_ref().map(|pdu| {
.as_ref() db.rooms
.map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?); .pdu_shortstatehash(&pdu.as_ref().ok()?.1.event_id)
.ok()?
});
let ( let (
heroes, heroes,
@ -119,7 +121,7 @@ pub async fn sync_events_route(
invited_member_count, invited_member_count,
joined_since_last_sync, joined_since_last_sync,
state_events, state_events,
) = if pdus_after_since && Some(&current_state_hash) != since_state_hash.as_ref() { ) = if pdus_after_since && Some(current_shortstatehash) != since_shortstatehash {
let current_state = db.rooms.room_state_full(&room_id)?; let current_state = db.rooms.room_state_full(&room_id)?;
let current_members = current_state let current_members = current_state
.iter() .iter()
@ -129,11 +131,18 @@ pub async fn sync_events_route(
let encrypted_room = current_state let encrypted_room = current_state
.get(&(EventType::RoomEncryption, "".to_owned())) .get(&(EventType::RoomEncryption, "".to_owned()))
.is_some(); .is_some();
let since_state = since_state_hash.as_ref().map(|state_hash| { let since_state = since_shortstatehash
state_hash .as_ref()
.as_ref() .map(|since_shortstatehash| {
.and_then(|state_hash| db.rooms.state_full(&room_id, &state_hash).ok()) Ok::<_, Error>(
}); since_shortstatehash
.map(|since_shortstatehash| {
db.rooms.state_full(&room_id, since_shortstatehash)
})
.transpose()?,
)
})
.transpose()?;
let since_encryption = since_state.as_ref().map(|state| { let since_encryption = since_state.as_ref().map(|state| {
state state
@ -496,16 +505,16 @@ pub async fn sync_events_route(
.and_then(|pdu| pdu.ok()) .and_then(|pdu| pdu.ok())
.and_then(|pdu| { .and_then(|pdu| {
db.rooms db.rooms
.pdu_state_hash(&pdu.0) .pdu_shortstatehash(&pdu.1.event_id)
.ok()? .ok()?
.ok_or_else(|| Error::bad_database("Pdu in db doesn't have a state hash.")) .ok_or_else(|| Error::bad_database("Pdu in db doesn't have a state hash."))
.ok() .ok()
}) })
.and_then(|state_hash| { .and_then(|shortstatehash| {
db.rooms db.rooms
.state_get( .state_get(
&room_id, &room_id,
&state_hash, shortstatehash,
&EventType::RoomMember, &EventType::RoomMember,
sender_user.as_str(), sender_user.as_str(),
) )
@ -513,14 +522,14 @@ pub async fn sync_events_route(
.ok_or_else(|| Error::bad_database("State hash in db doesn't have a state.")) .ok_or_else(|| Error::bad_database("State hash in db doesn't have a state."))
.ok() .ok()
}) })
.and_then(|(pdu_id, pdu)| { .and_then(|pdu| {
serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>( serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>(
pdu.content.clone(), pdu.content.clone(),
) )
.expect("Raw::from_value always works") .expect("Raw::from_value always works")
.deserialize() .deserialize()
.map_err(|_| Error::bad_database("Invalid PDU in database.")) .map_err(|_| Error::bad_database("Invalid PDU in database."))
.map(|content| (pdu_id, pdu, content)) .map(|content| (pdu, content))
.ok() .ok()
}) { }) {
since_member since_member
@ -529,7 +538,7 @@ pub async fn sync_events_route(
continue; continue;
}; };
let left_since_last_sync = since_member.2.membership == MembershipState::Join; let left_since_last_sync = since_member.1.membership == MembershipState::Join;
let left_room = if left_since_last_sync { let left_room = if left_since_last_sync {
device_list_left.extend( device_list_left.extend(
@ -550,10 +559,10 @@ pub async fn sync_events_route(
let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?; let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?;
let mut room_events = pdus let mut room_events = pdus
.filter_map(|pdu| pdu.ok()) // Filter out buggy events .filter_map(|pdu| pdu.ok()) // Filter out buggy events
.take_while(|(pdu_id, _)| since_member.0 != pdu_id) .take_while(|(pdu_id, pdu)| &since_member.0 != pdu)
.map(|(_, pdu)| pdu.to_sync_room_event()) .map(|(_, pdu)| pdu.to_sync_room_event())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
room_events.push(since_member.1.to_sync_room_event()); room_events.push(since_member.0.to_sync_room_event());
sync_events::LeftRoom { sync_events::LeftRoom {
account_data: sync_events::AccountData { events: Vec::new() }, account_data: sync_events::AccountData { events: Vec::new() },

View File

@ -163,10 +163,14 @@ impl Database {
roomuserid_invited: db.open_tree("roomuserid_invited")?, roomuserid_invited: db.open_tree("roomuserid_invited")?,
userroomid_left: db.open_tree("userroomid_left")?, userroomid_left: db.open_tree("userroomid_left")?,
statekey_short: db.open_tree("statekey_short")?, statekey_shortstatekey: db.open_tree("statekey_shortstatekey")?,
stateid_pduid: db.open_tree("stateid_pduid")?, stateid_shorteventid: db.open_tree("stateid_shorteventid")?,
pduid_statehash: db.open_tree("pduid_statehash")?, eventid_shorteventid: db.open_tree("eventid_shorteventid")?,
roomid_statehash: db.open_tree("roomid_statehash")?, shorteventid_eventid: db.open_tree("shorteventid_eventid")?,
shorteventid_shortstatehash: db.open_tree("eventid_shortstatehash")?,
roomid_shortstatehash: db.open_tree("roomid_shortstatehash")?,
statehash_shortstatehash: db.open_tree("statehash_shortstatehash")?,
eventid_outlierpdu: db.open_tree("roomeventid_outlierpdu")?, eventid_outlierpdu: db.open_tree("roomeventid_outlierpdu")?,
prevevent_parent: db.open_tree("prevevent_parent")?, prevevent_parent: db.open_tree("prevevent_parent")?,
}, },

View File

@ -312,7 +312,6 @@ pub async fn send_push_notice(
&& db && db
.rooms .rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|(_, pl)| pl)
.map(deserialize) .map(deserialize)
.flatten() .flatten()
.map_or(false, power_level_cmp) .map_or(false, power_level_cmp)
@ -514,7 +513,7 @@ async fn send_notice(
let room_name = db let room_name = db
.rooms .rooms
.room_state_get(&event.room_id, &EventType::RoomName, "")? .room_state_get(&event.room_id, &EventType::RoomName, "")?
.map(|(_, pdu)| match pdu.content.get("name") { .map(|pdu| match pdu.content.get("name") {
Some(serde_json::Value::String(s)) => Some(s.to_string()), Some(serde_json::Value::String(s)) => Some(s.to_string()),
_ => None, _ => None,
}) })

View File

@ -59,15 +59,19 @@ pub struct Rooms {
pub(super) userroomid_left: sled::Tree, pub(super) userroomid_left: sled::Tree,
/// Remember the current state hash of a room. /// Remember the current state hash of a room.
pub(super) roomid_statehash: sled::Tree, pub(super) roomid_shortstatehash: sled::Tree,
/// Remember the state hash at events in the past. /// Remember the state hash at events in the past.
pub(super) pduid_statehash: sled::Tree, pub(super) shorteventid_shortstatehash: sled::Tree,
/// The state for a given state hash. /// StateKey = EventType + StateKey, ShortStateKey = Count
/// pub(super) statekey_shortstatekey: sled::Tree,
/// StateKey = EventType + StateKey, Short = Count pub(super) shorteventid_eventid: sled::Tree,
pub(super) statekey_short: sled::Tree, /// ShortEventId = Count
/// StateId = StateHash + Short, PduId = Count (without roomid) pub(super) eventid_shorteventid: sled::Tree,
pub(super) stateid_eventid: sled::Tree, /// ShortEventId = Count
pub(super) statehash_shortstatehash: sled::Tree,
/// ShortStateHash = Count
/// StateId = ShortStateHash + ShortStateKey
pub(super) stateid_shorteventid: sled::Tree,
/// RoomId + EventId -> outlier PDU. /// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
@ -81,37 +85,65 @@ impl Rooms {
/// Builds a StateMap by iterating over all keys that start /// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash. /// with state_hash, this gives the full state for the given state_hash.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn state_full( pub fn state_full_ids(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
state_hash: &StateHashId, state_hash: &StateHashId,
) -> Result<BTreeMap<(EventType, String), PduEvent>> { ) -> Result<Vec<EventId>> {
self.stateid_pduid let shortstatehash = self
.scan_prefix(&state_hash) .statehash_shortstatehash
.get(state_hash)?
.ok_or_else(|| Error::bad_database("Asked for statehash that does not exist."))?;
Ok(self
.stateid_shorteventid
.scan_prefix(&shortstatehash)
.values() .values()
.map(|short_id| { .filter_map(|r| r.ok())
let short_id = short_id?; .map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten())
let mut long_id = room_id.as_bytes().to_vec(); .flatten()
long_id.push(0xff); .map(|bytes| {
long_id.extend_from_slice(&short_id); Ok::<_, Error>(
match self.pduid_pdu.get(&long_id)? { EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
Some(b) => serde_json::from_slice::<PduEvent>(&b) Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
.map_err(|_| Error::bad_database("Invalid PDU in db.")), })?)
None => self .map_err(|_| {
.eventid_outlierpdu Error::bad_database("EventId in stateid_shorteventid is invalid.")
.get(short_id)? })?,
.map(|b| { )
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})?,
}
}) })
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.collect())
}
#[tracing::instrument(skip(self))]
pub fn state_full(
&self,
room_id: &RoomId,
shortstatehash: u64,
) -> Result<BTreeMap<(EventType, String), PduEvent>> {
Ok(self
.stateid_shorteventid
.scan_prefix(shortstatehash.to_be_bytes())
.values()
.filter_map(|r| r.ok())
.map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten())
.flatten()
.map(|bytes| {
Ok::<_, Error>(
EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
Error::bad_database("EventID in stateid_shorteventid is invalid unicode.")
})?)
.map_err(|_| {
Error::bad_database("EventId in stateid_shorteventid is invalid.")
})?,
)
})
.filter_map(|r| r.ok())
.map(|eventid| self.get_pdu(&eventid))
.filter_map(|r| r.ok().flatten())
.map(|pdu| { .map(|pdu| {
Ok(( Ok::<_, Error>((
( (
pdu.kind.clone(), pdu.kind.clone(),
pdu.state_key pdu.state_key
@ -122,7 +154,8 @@ impl Rooms {
pdu, pdu,
)) ))
}) })
.collect() .filter_map(|r| r.ok())
.collect())
} }
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
@ -130,71 +163,73 @@ impl Rooms {
pub fn state_get( pub fn state_get(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
state_hash: &StateHashId, shortstatehash: u64,
event_type: &EventType, event_type: &EventType,
state_key: &str, state_key: &str,
) -> Result<Option<(IVec, PduEvent)>> { ) -> Result<Option<PduEvent>> {
let mut key = event_type.to_string().as_bytes().to_vec(); let mut key = event_type.to_string().as_bytes().to_vec();
key.push(0xff); key.push(0xff);
key.extend_from_slice(&state_key.as_bytes()); key.extend_from_slice(&state_key.as_bytes());
debug!("Looking for {} {:?}", event_type, state_key); let shortstatekey = self.statekey_shortstatekey.get(&key)?;
let short = self.statekey_short.get(&key)?; if let Some(shortstatekey) = shortstatekey {
let mut stateid = shortstatehash.to_be_bytes().to_vec();
stateid.extend_from_slice(&shortstatekey);
if let Some(short) = short { self.stateid_shorteventid
let mut stateid = state_hash.to_vec();
stateid.push(0xff);
stateid.extend_from_slice(&short);
debug!("trying to find pduid/eventid. short: {:?}", stateid);
self.stateid_pduid
.get(&stateid)? .get(&stateid)?
.map_or(Ok(None), |short_id| { .map(|bytes| self.shorteventid_eventid.get(&bytes).ok().flatten())
debug!("found in stateid_pduid"); .flatten()
let mut long_id = room_id.as_bytes().to_vec(); .map(|bytes| {
long_id.push(0xff); Ok::<_, Error>(
long_id.extend_from_slice(&short_id); EventId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
Error::bad_database(
Ok::<_, Error>(Some(match self.pduid_pdu.get(&long_id)? { "EventID in stateid_shorteventid is invalid unicode.",
Some(b) => (
long_id.clone().into(),
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
),
None => {
debug!("looking in outliers");
(
short_id.clone().into(),
self.eventid_outlierpdu
.get(&short_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})??,
) )
} })?)
})) .map_err(|_| {
Error::bad_database("EventId in stateid_shorteventid is invalid.")
})?,
)
}) })
.map(|r| r.ok())
.flatten()
.map_or(Ok(None), |event_id| self.get_pdu(&event_id))
} else { } else {
warn!("short id not found");
Ok(None) Ok(None)
} }
} }
/// Returns the state hash for this pdu. /// Returns the state hash for this pdu.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result<Option<StateHashId>> { pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
Ok(self.pduid_statehash.get(pdu_id)?) self.eventid_shorteventid
.get(event_id.as_bytes())?
.map_or(Ok(None), |shorteventid| {
Ok(self.shorteventid_shortstatehash.get(shorteventid)?.map_or(
Ok::<_, Error>(None),
|bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database(
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
)
})?))
},
)?)
})
} }
/// Returns the last state hash key added to the db for the given room. /// Returns the last state hash key added to the db for the given room.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn current_state_hash(&self, room_id: &RoomId) -> Result<Option<StateHashId>> { pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>> {
Ok(self.roomid_statehash.get(room_id.as_bytes())?) self.roomid_shortstatehash
.get(room_id.as_bytes())?
.map_or(Ok(None), |bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash")
})?))
})
} }
/// This fetches auth events from the current state. /// This fetches auth events from the current state.
@ -215,7 +250,7 @@ impl Rooms {
let mut events = StateMap::new(); let mut events = StateMap::new();
for (event_type, state_key) in auth_events { for (event_type, state_key) in auth_events {
if let Some((_, pdu)) = self.room_state_get( if let Some(pdu) = self.room_state_get(
room_id, room_id,
&event_type, &event_type,
&state_key &state_key
@ -233,9 +268,9 @@ impl Rooms {
/// Generate a new StateHash. /// Generate a new StateHash.
/// ///
/// A unique hash made from hashing all PDU ids of the state joined with 0xff. /// A unique hash made from hashing all PDU ids of the state joined with 0xff.
fn calculate_hash(&self, pdu_id_bytes: &[&[u8]]) -> Result<StateHashId> { fn calculate_hash(&self, bytes_list: &[&[u8]]) -> Result<StateHashId> {
// We only hash the pdu's event ids, not the whole pdu // We only hash the pdu's event ids, not the whole pdu
let bytes = pdu_id_bytes.join(&0xff); let bytes = bytes_list.join(&0xff);
let hash = digest::digest(&digest::SHA256, &bytes); let hash = digest::digest(&digest::SHA256, &bytes);
Ok(hash.as_ref().into()) Ok(hash.as_ref().into())
} }
@ -259,41 +294,65 @@ impl Rooms {
pub fn force_state( pub fn force_state(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
state: HashMap<(EventType, String), Vec<u8>>, state: HashMap<(EventType, String), EventId>,
globals: &super::globals::Globals, globals: &super::globals::Globals,
) -> Result<()> { ) -> Result<()> {
let state_hash = let state_hash = self.calculate_hash(
self.calculate_hash(&state.values().map(|long_id| &**long_id).collect::<Vec<_>>())?; &state
let mut prefix = state_hash.to_vec(); .values()
prefix.push(0xff); .map(|event_id| event_id.as_bytes())
.collect::<Vec<_>>(),
)?;
for ((event_type, state_key), long_id) in state { let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? {
Some(shortstatehash) => {
warn!("state hash already existed?!");
shortstatehash.to_vec()
}
None => {
let shortstatehash = globals.next_count()?;
self.statehash_shortstatehash
.insert(&state_hash, &shortstatehash.to_be_bytes())?;
shortstatehash.to_be_bytes().to_vec()
}
};
for ((event_type, state_key), eventid) in state {
let mut statekey = event_type.as_ref().as_bytes().to_vec(); let mut statekey = event_type.as_ref().as_bytes().to_vec();
statekey.push(0xff); statekey.push(0xff);
statekey.extend_from_slice(&state_key.as_bytes()); statekey.extend_from_slice(&state_key.as_bytes());
let short = match self.statekey_short.get(&statekey)? { let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? {
Some(short) => utils::u64_from_bytes(&short) Some(shortstatekey) => shortstatekey.to_vec(),
.map_err(|_| Error::bad_database("Invalid short bytes in statekey_short."))?,
None => { None => {
let short = globals.next_count()?; let shortstatekey = globals.next_count()?;
self.statekey_short self.statekey_shortstatekey
.insert(&statekey, &short.to_be_bytes())?; .insert(&statekey, &shortstatekey.to_be_bytes())?;
short shortstatekey.to_be_bytes().to_vec()
} }
}; };
// If it's a pdu id we remove the room id, if it's an event id we leave it the same let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? {
let short_id = long_id.splitn(2, |&b| b == 0xff).nth(1).unwrap_or(&long_id); Some(shorteventid) => shorteventid.to_vec(),
None => {
let shorteventid = globals.next_count()?;
self.eventid_shorteventid
.insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?;
self.shorteventid_eventid
.insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?;
shorteventid.to_be_bytes().to_vec()
}
};
let mut state_id = prefix.clone(); let mut state_id = shortstatehash.clone();
state_id.extend_from_slice(&short.to_be_bytes()); state_id.extend_from_slice(&shortstatekey);
debug!("inserting {:?} into {:?}", short_id, state_id);
self.stateid_pduid.insert(state_id, short_id)?; self.stateid_shorteventid
.insert(&*state_id, &*shorteventid)?;
} }
self.roomid_statehash self.roomid_shortstatehash
.insert(room_id.as_bytes(), &*state_hash)?; .insert(room_id.as_bytes(), &*shortstatehash)?;
Ok(()) Ok(())
} }
@ -304,8 +363,8 @@ impl Rooms {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
) -> Result<BTreeMap<(EventType, String), PduEvent>> { ) -> Result<BTreeMap<(EventType, String), PduEvent>> {
if let Some(current_state_hash) = self.current_state_hash(room_id)? { if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
self.state_full(&room_id, &current_state_hash) self.state_full(&room_id, current_shortstatehash)
} else { } else {
Ok(BTreeMap::new()) Ok(BTreeMap::new())
} }
@ -318,9 +377,9 @@ impl Rooms {
room_id: &RoomId, room_id: &RoomId,
event_type: &EventType, event_type: &EventType,
state_key: &str, state_key: &str,
) -> Result<Option<(IVec, PduEvent)>> { ) -> Result<Option<PduEvent>> {
if let Some(current_state_hash) = self.current_state_hash(room_id)? { if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
self.state_get(&room_id, &current_state_hash, event_type, state_key) self.state_get(&room_id, current_shortstatehash, event_type, state_key)
} else { } else {
Ok(None) Ok(None)
} }
@ -377,12 +436,6 @@ impl Rooms {
.map_or(Ok(None), |pdu_id| Ok(Some(pdu_id))) .map_or(Ok(None), |pdu_id| Ok(Some(pdu_id)))
} }
pub fn get_long_id(&self, event_id: &EventId) -> Result<Vec<u8>> {
Ok(self
.get_pdu_id(event_id)?
.map_or_else(|| event_id.as_bytes().to_vec(), |pduid| pduid.to_vec()))
}
/// Returns the pdu. /// Returns the pdu.
/// ///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline. /// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
@ -538,15 +591,15 @@ impl Rooms {
.entry("unsigned".to_owned()) .entry("unsigned".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(Default::default())) .or_insert_with(|| CanonicalJsonValue::Object(Default::default()))
{ {
if let Some(prev_state_hash) = self.pdu_state_hash(&pdu_id).unwrap() { if let Some(shortstatehash) = self.pdu_shortstatehash(&pdu.event_id).unwrap() {
if let Some(prev_state) = self if let Some(prev_state) = self
.state_get(&pdu.room_id, &prev_state_hash, &pdu.kind, &state_key) .state_get(&pdu.room_id, shortstatehash, &pdu.kind, &state_key)
.unwrap() .unwrap()
{ {
unsigned.insert( unsigned.insert(
"prev_content".to_owned(), "prev_content".to_owned(),
CanonicalJsonValue::Object( CanonicalJsonValue::Object(
utils::to_canonical_object(prev_state.1.content) utils::to_canonical_object(prev_state.content)
.expect("event is valid, we just created it"), .expect("event is valid, we just created it"),
), ),
); );
@ -574,7 +627,7 @@ impl Rooms {
self.pduid_pdu.insert( self.pduid_pdu.insert(
&pdu_id, &pdu_id,
&*serde_json::to_string(dbg!(&pdu_json)) &*serde_json::to_string(&pdu_json)
.expect("CanonicalJsonObject is always a valid String"), .expect("CanonicalJsonObject is always a valid String"),
)?; )?;
@ -706,71 +759,112 @@ impl Rooms {
/// Generates a new StateHash and associates it with the incoming event. /// Generates a new StateHash and associates it with the incoming event.
/// ///
/// This adds all current state events (not including the incoming event) /// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `pduid_statehash`. /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
/// The incoming event is the `pdu_id` passed to this method.
pub fn append_to_state( pub fn append_to_state(
&self, &self,
new_pdu_id: &[u8],
new_pdu: &PduEvent, new_pdu: &PduEvent,
globals: &super::globals::Globals, globals: &super::globals::Globals,
) -> Result<StateHashId> { ) -> Result<u64> {
let old_state = let old_state = if let Some(old_shortstatehash) =
if let Some(old_state_hash) = self.roomid_statehash.get(new_pdu.room_id.as_bytes())? { self.roomid_shortstatehash.get(new_pdu.room_id.as_bytes())?
// Store state for event. The state does not include the event itself. {
// Instead it's the state before the pdu, so the room's old state. // Store state for event. The state does not include the event itself.
self.pduid_statehash.insert(new_pdu_id, &old_state_hash)?; // Instead it's the state before the pdu, so the room's old state.
if new_pdu.state_key.is_none() {
return Ok(old_state_hash);
}
let mut prefix = old_state_hash.to_vec(); let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? {
prefix.push(0xff); Some(shorteventid) => shorteventid.to_vec(),
self.stateid_pduid
.scan_prefix(&prefix)
.filter_map(|pdu| pdu.map_err(|e| error!("{}", e)).ok())
// Chop the old state_hash out leaving behind the short key (u64)
.map(|(k, v)| (k.subslice(prefix.len(), k.len() - prefix.len()), v))
.collect::<HashMap<IVec, IVec>>()
} else {
HashMap::new()
};
if let Some(state_key) = &new_pdu.state_key {
let mut new_state = old_state;
let mut pdu_key = new_pdu.kind.as_ref().as_bytes().to_vec();
pdu_key.push(0xff);
pdu_key.extend_from_slice(state_key.as_bytes());
let short = match self.statekey_short.get(&pdu_key)? {
Some(short) => utils::u64_from_bytes(&short)
.map_err(|_| Error::bad_database("Invalid short bytes in statekey_short."))?,
None => { None => {
let short = globals.next_count()?; let shorteventid = globals.next_count()?;
self.statekey_short.insert(&pdu_key, &short.to_be_bytes())?; self.eventid_shorteventid
short .insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?;
self.shorteventid_eventid
.insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?;
shorteventid.to_be_bytes().to_vec()
} }
}; };
let new_pdu_id_short = new_pdu_id self.shorteventid_shortstatehash
.splitn(2, |&b| b == 0xff) .insert(shorteventid, &old_shortstatehash)?;
.nth(1) if new_pdu.state_key.is_none() {
.ok_or_else(|| Error::bad_database("Invalid pduid in state."))?; return utils::u64_from_bytes(&old_shortstatehash).map_err(|_| {
Error::bad_database("Invalid shortstatehash in roomid_shortstatehash.")
new_state.insert((&short.to_be_bytes()).into(), new_pdu_id_short.into()); });
let new_state_hash =
self.calculate_hash(&new_state.values().map(|b| &**b).collect::<Vec<_>>())?;
let mut key = new_state_hash.to_vec();
key.push(0xff);
for (short, short_pdu_id) in new_state {
let mut state_id = key.clone();
state_id.extend_from_slice(&short);
self.stateid_pduid.insert(&state_id, &short_pdu_id)?;
} }
Ok(new_state_hash) self.stateid_shorteventid
.scan_prefix(&old_shortstatehash)
.filter_map(|pdu| pdu.map_err(|e| error!("{}", e)).ok())
// Chop the old_shortstatehash out leaving behind the short state key
.map(|(k, v)| {
(
k.subslice(old_shortstatehash.len(), k.len() - old_shortstatehash.len()),
v,
)
})
.collect::<HashMap<IVec, IVec>>()
} else {
HashMap::new()
};
if let Some(state_key) = &new_pdu.state_key {
let mut new_state: HashMap<IVec, IVec> = old_state;
let mut new_state_key = new_pdu.kind.as_ref().as_bytes().to_vec();
new_state_key.push(0xff);
new_state_key.extend_from_slice(state_key.as_bytes());
let shortstatekey = match self.statekey_shortstatekey.get(&new_state_key)? {
Some(shortstatekey) => shortstatekey.to_vec(),
None => {
let shortstatekey = globals.next_count()?;
self.statekey_shortstatekey
.insert(&new_state_key, &shortstatekey.to_be_bytes())?;
shortstatekey.to_be_bytes().to_vec()
}
};
let shorteventid = match self.eventid_shorteventid.get(new_pdu.event_id.as_bytes())? {
Some(shorteventid) => shorteventid.to_vec(),
None => {
let shorteventid = globals.next_count()?;
self.eventid_shorteventid
.insert(new_pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?;
self.shorteventid_eventid
.insert(&shorteventid.to_be_bytes(), new_pdu.event_id.as_bytes())?;
shorteventid.to_be_bytes().to_vec()
}
};
new_state.insert(shortstatekey.into(), shorteventid.into());
let new_state_hash = self.calculate_hash(
&new_state
.values()
.map(|event_id| &**event_id)
.collect::<Vec<_>>(),
)?;
let shortstatehash = match self.statehash_shortstatehash.get(&new_state_hash)? {
Some(shortstatehash) => {
warn!("state hash already existed?!");
utils::u64_from_bytes(&shortstatehash)
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?
}
None => {
let shortstatehash = globals.next_count()?;
self.statehash_shortstatehash
.insert(&new_state_hash, &shortstatehash.to_be_bytes())?;
shortstatehash
}
};
for (shortstatekey, shorteventid) in new_state {
let mut state_id = shortstatehash.to_be_bytes().to_vec();
state_id.extend_from_slice(&shortstatekey);
self.stateid_shorteventid.insert(&state_id, &shorteventid)?;
}
Ok(shortstatehash)
} else { } else {
Err(Error::bad_database( Err(Error::bad_database(
"Tried to insert non-state event into room without a state.", "Tried to insert non-state event into room without a state.",
@ -778,9 +872,9 @@ impl Rooms {
} }
} }
pub fn set_room_state(&self, room_id: &RoomId, state_hash: &StateHashId) -> Result<()> { pub fn set_room_state(&self, room_id: &RoomId, shortstatehash: u64) -> Result<()> {
self.roomid_statehash self.roomid_shortstatehash
.insert(room_id.as_bytes(), state_hash)?; .insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?;
Ok(()) Ok(())
} }
@ -833,7 +927,7 @@ impl Rooms {
}, },
}) })
}, },
|(_, power_levels)| { |power_levels| {
Ok(serde_json::from_value::<Raw<PowerLevelsEventContent>>( Ok(serde_json::from_value::<Raw<PowerLevelsEventContent>>(
power_levels.content, power_levels.content,
) )
@ -844,18 +938,15 @@ impl Rooms {
)?; )?;
let sender_membership = self let sender_membership = self
.room_state_get(&room_id, &EventType::RoomMember, &sender.to_string())? .room_state_get(&room_id, &EventType::RoomMember, &sender.to_string())?
.map_or( .map_or(Ok::<_, Error>(member::MembershipState::Leave), |pdu| {
Ok::<_, Error>(member::MembershipState::Leave), Ok(
|(_, pdu)| { serde_json::from_value::<Raw<member::MemberEventContent>>(pdu.content)
Ok( .expect("Raw::from_value always works.")
serde_json::from_value::<Raw<member::MemberEventContent>>(pdu.content) .deserialize()
.expect("Raw::from_value always works.") .map_err(|_| Error::bad_database("Invalid Member event in db."))?
.deserialize() .membership,
.map_err(|_| Error::bad_database("Invalid Member event in db."))? )
.membership, })?;
)
},
)?;
let sender_power = power_levels.users.get(&sender).map_or_else( let sender_power = power_levels.users.get(&sender).map_or_else(
|| { || {
@ -936,7 +1027,7 @@ impl Rooms {
let mut unsigned = unsigned.unwrap_or_default(); let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key { if let Some(state_key) = &state_key {
if let Some((_, prev_pdu)) = self.room_state_get(&room_id, &event_type, &state_key)? { if let Some(prev_pdu) = self.room_state_get(&room_id, &event_type, &state_key)? {
unsigned.insert("prev_content".to_owned(), prev_pdu.content); unsigned.insert("prev_content".to_owned(), prev_pdu.content);
unsigned.insert( unsigned.insert(
"prev_sender".to_owned(), "prev_sender".to_owned(),
@ -1014,7 +1105,7 @@ impl Rooms {
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = self.append_to_state(&pdu_id, &pdu, &db.globals)?; let statehashid = self.append_to_state(&pdu, &db.globals)?;
// remove the // remove the
self.append_pdu( self.append_pdu(
@ -1030,7 +1121,7 @@ impl Rooms {
// We set the room state after inserting the pdu, so that we never have a moment in time // We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist // where events in the current room state do not exist
self.set_room_state(&room_id, &statehashid)?; self.set_room_state(&room_id, statehashid)?;
for server in self for server in self
.room_servers(room_id) .room_servers(room_id)
@ -1267,7 +1358,7 @@ impl Rooms {
// Check if the room has a predecessor // Check if the room has a predecessor
if let Some(predecessor) = self if let Some(predecessor) = self
.room_state_get(&room_id, &EventType::RoomCreate, "")? .room_state_get(&room_id, &EventType::RoomCreate, "")?
.and_then(|(_, create)| { .and_then(|create| {
serde_json::from_value::< serde_json::from_value::<
Raw<ruma::events::room::create::CreateEventContent>, Raw<ruma::events::room::create::CreateEventContent>,
>(create.content) >(create.content)

View File

@ -1301,7 +1301,7 @@ pub(crate) async fn build_forward_extremity_snapshots(
pub_key_map: &PublicKeyMap, pub_key_map: &PublicKeyMap,
auth_cache: &mut EventMap<Arc<PduEvent>>, auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<BTreeSet<StateMap<Arc<PduEvent>>>> { ) -> Result<BTreeSet<StateMap<Arc<PduEvent>>>> {
let current_hash = db.rooms.current_state_hash(pdu.room_id())?; let current_shortstatehash = db.rooms.current_shortstatehash(pdu.room_id())?;
let mut includes_current_state = false; let mut includes_current_state = false;
let mut fork_states = BTreeSet::new(); let mut fork_states = BTreeSet::new();
@ -1309,39 +1309,37 @@ pub(crate) async fn build_forward_extremity_snapshots(
if id == &pdu.event_id { if id == &pdu.event_id {
continue; continue;
} }
match db.rooms.get_pdu_id(id)? { match db.rooms.get_pdu(id)? {
// We can skip this because it is handled outside of this function // We can skip this because it is handled outside of this function
// The current server state and incoming event state are built to be // The current server state and incoming event state are built to be
// the state after. // the state after.
// This would be the incoming state from the server. // This would be the incoming state from the server.
Some(pduid) if db.rooms.get_pdu_from_id(&pduid)?.is_some() => { Some(leave_pdu) => {
let state_hash = db let pdu_shortstatehash = db
.rooms .rooms
.pdu_state_hash(&pduid)? .pdu_shortstatehash(&leave_pdu.event_id)?
.expect("found pdu with no statehash"); .ok_or_else(|| Error::bad_database("Found pdu with no statehash in db."))?;
if current_hash.as_ref() == Some(&state_hash) { if current_shortstatehash.as_ref() == Some(&pdu_shortstatehash) {
includes_current_state = true; includes_current_state = true;
} }
let mut state_before = db let mut state_before = db
.rooms .rooms
.state_full(pdu.room_id(), &state_hash)? .state_full(pdu.room_id(), pdu_shortstatehash)?
.into_iter() .into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect::<StateMap<_>>(); .collect::<StateMap<_>>();
// Now it's the state after // Now it's the state after
if let Some(pdu) = db.rooms.get_pdu_from_id(&pduid)? { let key = (leave_pdu.kind.clone(), leave_pdu.state_key.clone());
let key = (pdu.kind.clone(), pdu.state_key()); state_before.insert(key, Arc::new(leave_pdu));
state_before.insert(key, Arc::new(pdu));
}
fork_states.insert(state_before); fork_states.insert(state_before);
} }
_ => { _ => {
error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind()); error!("Missing state snapshot for {:?}", id);
return Err(Error::BadDatabase("Missing state snapshot.")); return Err(Error::bad_database("Missing state snapshot."));
} }
} }
} }
@ -1367,13 +1365,12 @@ pub(crate) fn update_resolved_state(
if let Some(state) = state { if let Some(state) = state {
let mut new_state = HashMap::new(); let mut new_state = HashMap::new();
for ((ev_type, state_k), pdu) in state { for ((ev_type, state_k), pdu) in state {
let long_id = db.rooms.get_long_id(&pdu.event_id)?;
new_state.insert( new_state.insert(
( (
ev_type, ev_type,
state_k.ok_or_else(|| Error::Conflict("State contained non state event"))?, state_k.ok_or_else(|| Error::Conflict("State contained non state event"))?,
), ),
long_id, pdu.event_id.clone(),
); );
} }
@ -1396,7 +1393,6 @@ pub(crate) fn append_incoming_pdu(
// We can tell if we need to do this based on wether state resolution took place or not // We can tell if we need to do this based on wether state resolution took place or not
let mut new_state = HashMap::new(); let mut new_state = HashMap::new();
for ((ev_type, state_k), state_pdu) in state { for ((ev_type, state_k), state_pdu) in state {
let long_id = db.rooms.get_long_id(state_pdu.event_id())?;
new_state.insert( new_state.insert(
( (
ev_type.clone(), ev_type.clone(),
@ -1404,7 +1400,7 @@ pub(crate) fn append_incoming_pdu(
.clone() .clone()
.ok_or_else(|| Error::Conflict("State contained non state event"))?, .ok_or_else(|| Error::Conflict("State contained non state event"))?,
), ),
long_id.to_vec(), state_pdu.event_id.clone(),
); );
} }
@ -1418,7 +1414,7 @@ pub(crate) fn append_incoming_pdu(
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let state_hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; let state_hash = db.rooms.append_to_state(&pdu, &db.globals)?;
db.rooms.append_pdu( db.rooms.append_pdu(
pdu, pdu,
@ -1429,7 +1425,7 @@ pub(crate) fn append_incoming_pdu(
&db, &db,
)?; )?;
db.rooms.set_room_state(pdu.room_id(), &state_hash)?; db.rooms.set_room_state(pdu.room_id(), state_hash)?;
for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
if let Some(namespaces) = appservice.1.get("namespaces") { if let Some(namespaces) = appservice.1.get("namespaces") {