Fix and integrate outlier tree, build forks after adding event to DB

This commit is contained in:
Devin Ragotzy 2021-01-29 21:45:33 -05:00
parent cd0c5c0566
commit 56b816a2be
3 changed files with 263 additions and 168 deletions

View File

@ -159,7 +159,7 @@ impl Database {
stateid_pduid: db.open_tree("stateid_pduid")?, stateid_pduid: db.open_tree("stateid_pduid")?,
pduid_statehash: db.open_tree("pduid_statehash")?, pduid_statehash: db.open_tree("pduid_statehash")?,
roomid_statehash: db.open_tree("roomid_statehash")?, roomid_statehash: db.open_tree("roomid_statehash")?,
eventid_outlierpdu: db.open_tree("eventid_outlierpdu")?, pduid_outlierpdu: db.open_tree("pduid_outlierpdu")?,
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?,

View File

@ -27,9 +27,10 @@ use std::{
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
mem, mem,
sync::Arc, sync::Arc,
time::Duration,
}; };
use super::admin::AdminCommand; use super::{admin::AdminCommand, sending::Sending};
/// The unique identifier of each state group. /// The unique identifier of each state group.
/// ///
@ -67,7 +68,7 @@ pub struct Rooms {
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid) pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid)
/// Any pdu that has passed the steps up to auth with auth_events. /// Any pdu that has passed the steps up to auth with auth_events.
pub(super) eventid_outlierpdu: sled::Tree, pub(super) pduid_outlierpdu: sled::Tree,
} }
impl Rooms { impl Rooms {
@ -85,13 +86,20 @@ impl Rooms {
let mut pduid = room_id.as_bytes().to_vec(); let mut pduid = room_id.as_bytes().to_vec();
pduid.push(0xff); pduid.push(0xff);
pduid.extend_from_slice(&pduid_short?); pduid.extend_from_slice(&pduid_short?);
self.pduid_pdu.get(&pduid)?.map_or_else( match self.pduid_pdu.get(&pduid)? {
|| Err(Error::bad_database("Failed to find PDU in state snapshot.")), Some(b) => serde_json::from_slice::<PduEvent>(&b)
|b| { .map_err(|_| Error::bad_database("Invalid PDU in db.")),
serde_json::from_slice::<PduEvent>(&b) None => self
.map_err(|_| Error::bad_database("Invalid PDU in db.")) .pduid_outlierpdu
}, .get(pduid)?
) .map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})?,
}
}) })
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.map(|pdu| { .map(|pdu| {
@ -137,12 +145,20 @@ impl Rooms {
Ok::<_, Error>(Some(( Ok::<_, Error>(Some((
pdu_id.clone().into(), pdu_id.clone().into(),
serde_json::from_slice::<PduEvent>( match self.pduid_pdu.get(&pdu_id)? {
&self.pduid_pdu.get(&pdu_id)?.ok_or_else(|| { Some(b) => serde_json::from_slice::<PduEvent>(&b)
Error::bad_database("PDU in state not found in database.") .map_err(|_| Error::bad_database("Invalid PDU in db."))?,
})?, None => self
) .pduid_outlierpdu
.map_err(|_| Error::bad_database("Invalid PDU bytes in room state."))?, .get(pdu_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})??,
},
))) )))
}) })
} else { } else {
@ -307,9 +323,12 @@ impl Rooms {
.get(event_id.as_bytes())? .get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| { .map_or(Ok(None), |pdu_id| {
Ok(Some( Ok(Some(
serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| { serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? {
Error::bad_database("eventid_pduid points to nonexistent pdu.") Some(b) => b,
})?) None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})?,
})
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, .map_err(|_| Error::bad_database("Invalid PDU in db."))?,
)) ))
}) })
@ -328,13 +347,17 @@ impl Rooms {
.get(event_id.as_bytes())? .get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| { .map_or(Ok(None), |pdu_id| {
Ok(Some( Ok(Some(
serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| { serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? {
Error::bad_database("eventid_pduid points to nonexistent pdu.") Some(b) => b,
})?) None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})?,
})
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, .map_err(|_| Error::bad_database("Invalid PDU in db."))?,
)) ))
}) })
} }
/// Returns the pdu. /// Returns the pdu.
pub fn get_pdu_from_id(&self, pdu_id: &IVec) -> Result<Option<PduEvent>> { pub fn get_pdu_from_id(&self, pdu_id: &IVec) -> Result<Option<PduEvent>> {
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| {
@ -420,23 +443,27 @@ impl Rooms {
/// Returns the pdu from the outlier tree. /// Returns the pdu from the outlier tree.
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> { pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_outlierpdu if let Some(id) = self.eventid_pduid.get(event_id.as_bytes())? {
.get(event_id.as_bytes())? self.pduid_outlierpdu.get(id)?.map_or(Ok(None), |pdu| {
.map_or(Ok(None), |pdu| { serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
Ok(Some(
serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
}) })
} else {
Ok(None)
}
} }
/// Returns true if the event_id was previously inserted. /// Returns true if the event_id was previously inserted.
pub fn append_pdu_outlier(&self, event_id: &EventId, pdu: &PduEvent) -> Result<bool> { pub fn append_pdu_outlier(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<bool> {
log::info!("Number of outlier pdu's {}", self.eventid_outlierpdu.len()); log::info!("Number of outlier pdu's {}", self.pduid_outlierpdu.len());
// we need to be able to find it by event_id
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
let res = self let res = self
.eventid_outlierpdu .pduid_outlierpdu
.insert( .insert(
event_id.as_bytes(), pdu_id,
&*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"),
) )
.map(|op| op.is_some())?; .map(|op| op.is_some())?;
@ -484,7 +511,9 @@ impl Rooms {
} }
// We no longer keep this pdu as an outlier // We no longer keep this pdu as an outlier
self.eventid_outlierpdu.remove(pdu.event_id().as_bytes())?; if let Some(id) = self.eventid_pduid.remove(pdu.event_id().as_bytes())? {
self.pduid_outlierpdu.remove(id)?;
}
self.replace_pdu_leaves(&pdu.room_id, leaves)?; self.replace_pdu_leaves(&pdu.room_id, leaves)?;

View File

@ -614,7 +614,7 @@ pub async fn send_transaction_message_route<'a>(
// 7. if not timeline event: stop // 7. if not timeline event: stop
// TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
// the events found in step 8 can be authed/resolved and appended to the DB // the events found in step 8 can be authed/resolved and appended to the DB
let (pdu, previous): (_, Vec<Arc<PduEvent>>) = match validate_event( let (pdu, previous): (Arc<PduEvent>, Vec<Arc<PduEvent>>) = match validate_event(
&db, &db,
value, value,
event_id.clone(), event_id.clone(),
@ -638,69 +638,75 @@ pub async fn send_transaction_message_route<'a>(
None None
}; };
let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// 6. persist the event as an outlier. // 6. persist the event as an outlier.
db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?; db.rooms.append_pdu_outlier(&pdu_id, &pdu)?;
// Step 9. fetch missing state by calling /state_ids at backwards extremities doing all // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
// the checks in this list starting at 1. These are not timeline events. // the checks in this list starting at 1. These are not timeline events.
// //
// Step 10. check the auth of the event passes based on the calculated state of the event // Step 10. check the auth of the event passes based on the calculated state of the event
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) = let (mut state_at_event, incoming_auth_events): (
match db StateMap<Arc<PduEvent>>,
.sending Vec<Arc<PduEvent>>,
.send_federation_request( ) = match db
&db.globals, .sending
.send_federation_request(
&db.globals,
server_name,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: pdu.event_id(),
},
)
.await
{
Ok(res) => {
let state = fetch_events(
&db,
server_name, server_name,
get_room_state_ids::v1::Request { &pub_key_map,
room_id: pdu.room_id(), &res.pdu_ids,
event_id: pdu.event_id(), &mut auth_cache,
},
) )
.await .await?;
{ // Sanity check: there are no conflicting events in the state we received
Ok(res) => { let mut seen = BTreeSet::new();
let state = fetch_events( for ev in &state {
// If the key is already present
if !seen.insert((&ev.kind, &ev.state_key)) {
todo!("Server sent us an invalid state")
}
}
let state = state
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect();
(
state,
fetch_events(
&db, &db,
server_name, server_name,
&pub_key_map, &pub_key_map,
&res.pdu_ids, &res.auth_chain_ids,
&mut auth_cache, &mut auth_cache,
) )
.await?; .await?,
// Sanity check: there are no conflicting events in the state we received )
let mut seen = BTreeSet::new(); }
for ev in &state { Err(_) => {
// If the key is already present resolved_map.insert(
if !seen.insert((&ev.kind, &ev.state_key)) { pdu.event_id().clone(),
todo!("Server sent us an invalid state") Err("Fetching state for event failed".into()),
} );
} continue;
}
let state = state };
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect();
(
state,
fetch_events(
&db,
server_name,
&pub_key_map,
&res.auth_chain_ids,
&mut auth_cache,
)
.await?,
)
}
Err(_) => {
resolved_map.insert(
pdu.event_id().clone(),
Err("Fetching state for event failed".into()),
);
continue;
}
};
// 10. This is the actual auth check for state at the event // 10. This is the actual auth check for state at the event
if !state_res::event_auth::auth_check( if !state_res::event_auth::auth_check(
@ -750,12 +756,25 @@ pub async fn send_transaction_message_route<'a>(
// //
// calculate_forward_extremities takes care of adding the current state if not already in the state sets // calculate_forward_extremities takes care of adding the current state if not already in the state sets
// it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
let (mut fork_states, extremities) = match calculate_forward_extremities( let extremities = match calculate_forward_extremities(&db, &pdu).await {
Ok(fork_ids) => fork_ids,
Err(_) => {
resolved_map.insert(event_id, Err("Failed to gather forward extremities".into()));
continue;
}
};
// Now that the event has passed all auth it is added into the timeline, we do have to
// find the leaves otherwise we would do this sooner
append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?;
let mut fork_states = match build_forward_extremity_snapshots(
&db, &db,
&pdu, pdu.room_id(),
server_name, server_name,
&pub_key_map,
current_state, current_state,
&extremities,
&pub_key_map,
&mut auth_cache, &mut auth_cache,
) )
.await .await
@ -767,6 +786,9 @@ pub async fn send_transaction_message_route<'a>(
} }
}; };
// Make this the state after (since we appended_incoming_pdu this should agree with our servers
// current state).
state_at_event.insert((pdu.kind(), pdu.state_key()), pdu.clone());
// add the incoming events to the mix of state snapshots // add the incoming events to the mix of state snapshots
// Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets
fork_states.insert(state_at_event.clone()); fork_states.insert(state_at_event.clone());
@ -840,7 +862,7 @@ pub async fn send_transaction_message_route<'a>(
); );
let res = match state_res::StateResolution::resolve( let res = match state_res::StateResolution::resolve(
&pdu.room_id, pdu.room_id(),
&RoomVersionId::Version6, &RoomVersionId::Version6,
&fork_states &fork_states
.into_iter() .into_iter()
@ -865,6 +887,7 @@ pub async fn send_transaction_message_route<'a>(
continue 'main_pdu_loop; continue 'main_pdu_loop;
} }
}; };
let mut resolved = BTreeMap::new(); let mut resolved = BTreeMap::new();
for (k, id) in res { for (k, id) in res {
// We should know of the event but just incase // We should know of the event but just incase
@ -890,10 +913,9 @@ pub async fn send_transaction_message_route<'a>(
}; };
// Add the event to the DB and update the forward extremities (via roomid_pduleaves). // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
append_incoming_pdu( update_resolved_state(
&db, &db,
&pdu, pdu.room_id(),
&extremities,
if update_state { if update_state {
Some(state_at_forks) Some(state_at_forks)
} else { } else {
@ -905,7 +927,10 @@ pub async fn send_transaction_message_route<'a>(
resolved_map.insert(pdu.event_id().clone(), Ok(())); resolved_map.insert(pdu.event_id().clone(), Ok(()));
} }
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) Ok(send_transaction_message::v1::Response {
pdus: dbg!(resolved_map),
}
.into())
} }
/// An async function that can recursively calls itself. /// An async function that can recursively calls itself.
@ -1036,13 +1061,14 @@ async fn fetch_check_auth_events(
Ok(()) Ok(())
} }
/// Find the event and auth it. /// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
/// ///
/// 1. Look in the main timeline (pduid_pdu tree) /// 1. Look in the main timeline (pduid_pdu tree)
/// 2. Look at outlier pdu tree /// 2. Look at outlier pdu tree
/// 3. Ask origin server over federation /// 3. Ask origin server over federation
/// 4. TODO: Ask other servers over federation? /// 4. TODO: Ask other servers over federation?
async fn fetch_events( pub(crate) async fn fetch_events(
db: &Database, db: &Database,
origin: &ServerName, origin: &ServerName,
key_map: &PublicKeyMap, key_map: &PublicKeyMap,
@ -1071,6 +1097,13 @@ async fn fetch_events(
.await .await
.map_err(|_| Error::Conflict("Authentication of event failed"))?; .map_err(|_| Error::Conflict("Authentication of event failed"))?;
// create the pduid for this event but stick it in the outliers DB
let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_pdu_outlier(&pdu_id, &pdu)?;
pdu pdu
} }
Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
@ -1084,7 +1117,7 @@ async fn fetch_events(
/// Search the DB for the signing keys of the given server, if we don't have them /// Search the DB for the signing keys of the given server, if we don't have them
/// fetch them from the server and save to our DB. /// fetch them from the server and save to our DB.
async fn fetch_signing_keys( pub(crate) async fn fetch_signing_keys(
db: &Database, db: &Database,
origin: &ServerName, origin: &ServerName,
) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> { ) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> {
@ -1108,26 +1141,28 @@ async fn fetch_signing_keys(
/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote). /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
/// ///
/// The state snapshot of the incoming event __needs__ to be added to the resulting list. /// The state snapshot of the incoming event __needs__ to be added to the resulting list.
async fn calculate_forward_extremities( pub(crate) async fn calculate_forward_extremities(
db: &Database, db: &Database,
pdu: &PduEvent, pdu: &PduEvent,
origin: &ServerName, ) -> Result<Vec<EventId>> {
pub_key_map: &PublicKeyMap,
current_state: BTreeMap<(EventType, Option<String>), Arc<PduEvent>>,
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> {
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
let mut is_incoming_leaf = true; let mut is_incoming_leaf = true;
// Make sure the incoming event is not already a forward extremity // Make sure the incoming event is not already a forward extremity
// FIXME: I think this could happen if different servers send us the same event?? // FIXME: I think this could happen if different servers send us the same event??
// if current_leaves.contains(pdu.event_id()) {
is_incoming_leaf = false;
// Not sure what to do here
}
// If the incoming event is already referenced by an existing event // If the incoming event is already referenced by an existing event
// then do nothing - it's not a candidate to be a new extremity if // then do nothing - it's not a candidate to be a new extremity if
// it has been referenced. // it has been referenced.
if current_leaves.contains(pdu.event_id()) || db.rooms.get_pdu_id(pdu.event_id())?.is_some() { //
is_incoming_leaf = false; // We first check if know of the event and then don't include it as a forward
// Not sure what to do here // extremity if it is a timeline event
if db.rooms.get_pdu_id(pdu.event_id())?.is_some() {
is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some();
} }
// TODO: // TODO:
@ -1144,11 +1179,34 @@ async fn calculate_forward_extremities(
} }
} }
let current_hash = db.rooms.current_state_hash(pdu.room_id())?; // Add the incoming event only if it is a leaf, we do this after fetching all the
// state since we know we have already fetched the state of the incoming event so lets
// not do it again!
if is_incoming_leaf {
current_leaves.push(pdu.event_id().clone());
}
Ok(current_leaves)
}
/// This should always be called after the incoming event has been appended to the DB.
///
/// This guarentees that the incoming event will be in the state sets (at least our servers
/// and the sending server).
pub(crate) async fn build_forward_extremity_snapshots(
db: &Database,
room_id: &RoomId,
origin: &ServerName,
current_state: StateMap<Arc<PduEvent>>,
current_leaves: &[EventId],
pub_key_map: &PublicKeyMap,
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<BTreeSet<StateMap<Arc<PduEvent>>>> {
let current_hash = db.rooms.current_state_hash(room_id)?;
let mut includes_current_state = false; let mut includes_current_state = false;
let mut fork_states = BTreeSet::new(); let mut fork_states = BTreeSet::new();
for id in &current_leaves { for id in current_leaves {
if let Some(id) = db.rooms.get_pdu_id(id)? { if let Some(id) = db.rooms.get_pdu_id(id)? {
let state_hash = db let state_hash = db
.rooms .rooms
@ -1158,14 +1216,21 @@ async fn calculate_forward_extremities(
if current_hash.as_ref() == Some(&state_hash) { if current_hash.as_ref() == Some(&state_hash) {
includes_current_state = true; includes_current_state = true;
} }
let state = db
let mut state_before = db
.rooms .rooms
.state_full(&pdu.room_id, &state_hash)? .state_full(room_id, &state_hash)?
.into_iter() .into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect(); .collect::<StateMap<_>>();
fork_states.insert(state); // Now it's the state after
if let Some(pdu) = db.rooms.get_pdu_from_id(&id)? {
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, Arc::new(pdu));
}
fork_states.insert(state_before);
} else { } else {
let res = db let res = db
.sending .sending
@ -1173,7 +1238,7 @@ async fn calculate_forward_extremities(
&db.globals, &db.globals,
origin, origin,
get_room_state_ids::v1::Request { get_room_state_ids::v1::Request {
room_id: pdu.room_id(), room_id,
event_id: id, event_id: id,
}, },
) )
@ -1181,41 +1246,38 @@ async fn calculate_forward_extremities(
// TODO: This only adds events to the auth_cache, there is for sure a better way to // TODO: This only adds events to the auth_cache, there is for sure a better way to
// do this... // do this...
fetch_events(&db, origin, &pub_key_map, &res.auth_chain_ids, auth_cache).await?; fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?;
let state = fetch_events(&db, origin, &pub_key_map, &res.pdu_ids, auth_cache) let mut state_before = fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache)
.await? .await?
.into_iter() .into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect(); .collect::<StateMap<_>>();
fork_states.insert(state); if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache)
.await?
.pop()
{
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, pdu);
}
// Now it's the state after
fork_states.insert(state_before);
} }
} }
// Add the incoming event only if it is a leaf, we do this after fetching all the
// state since we know we have already fetched the state of the incoming event so lets
// not do it again!
if is_incoming_leaf {
current_leaves.push(pdu.event_id().clone());
}
// This guarantees that our current room state is included // This guarantees that our current room state is included
if !includes_current_state && current_hash.is_some() { if !includes_current_state && current_hash.is_some() {
fork_states.insert(current_state); fork_states.insert(current_state);
} }
Ok((fork_states, current_leaves)) Ok(fork_states)
} }
/// Update the room state to be the resolved state and add the fully auth'ed event pub(crate) fn update_resolved_state(
/// to the DB.
///
/// TODO: Since all these events passed state resolution can we trust them to add
fn append_incoming_pdu(
db: &Database, db: &Database,
pdu: &PduEvent, room_id: &RoomId,
new_room_leaves: &[EventId],
state: Option<StateMap<Arc<PduEvent>>>, state: Option<StateMap<Arc<PduEvent>>>,
) -> Result<()> { ) -> Result<()> {
// Update the state of the room if needed // Update the state of the room if needed
@ -1236,44 +1298,50 @@ fn append_incoming_pdu(
); );
} }
None => { None => {
let count = db.globals.next_count()?; error!("We didn't append an event as an outlier\n{:?}", pdu);
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// TODO: can we use are current state if we just add this event to the end of our
// pduid_pdu tree??
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
db.rooms.append_pdu(
&*pdu,
utils::to_canonical_object(&*pdu).expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&new_room_leaves,
&db,
)?;
// TODO: is this ok...
db.rooms.set_room_state(&pdu.room_id, &statehashid)?;
new_state.insert(
(
ev_type,
state_k.ok_or_else(|| {
Error::Conflict("State contained non state event")
})?,
),
pdu_id.to_vec(),
);
} }
} }
} }
info!("Force update of state for {:?}", pdu); db.rooms.force_state(room_id, new_state, &db.globals)?;
db.rooms
.force_state(pdu.room_id(), new_state, &db.globals)?;
} }
Ok(())
}
/// Append the incoming event setting the state snapshot to the state from the
/// server that sent the event.
pub(crate) fn append_incoming_pdu(
db: &Database,
pdu: &PduEvent,
new_room_leaves: &[EventId],
state: &StateMap<Arc<PduEvent>>,
) -> Result<()> {
// Update the state of the room if needed
// We can tell if we need to do this based on wether state resolution took place or not
let mut new_state = HashMap::new();
for ((ev_type, state_k), pdu) in state {
match db.rooms.get_pdu_id(pdu.event_id())? {
Some(pduid) => {
new_state.insert(
(
ev_type.clone(),
state_k
.clone()
.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
pduid.to_vec(),
);
}
None => {
error!("We didn't append an event as an outlier\n{:?}", pdu);
}
}
}
db.rooms
.force_state(pdu.room_id(), new_state, &db.globals)?;
let count = db.globals.next_count()?; let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec(); let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff); pdu_id.push(0xff);
@ -1281,7 +1349,7 @@ fn append_incoming_pdu(
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; let state_hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
db.rooms.append_pdu( db.rooms.append_pdu(
pdu, pdu,
@ -1292,9 +1360,7 @@ fn append_incoming_pdu(
&db, &db,
)?; )?;
// We set the room state after inserting the pdu, so that we never have a moment in time db.rooms.set_room_state(pdu.room_id(), &state_hash)?;
// where events in the current room state do not exist
db.rooms.set_room_state(&pdu.room_id, &statehashid)?;
for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;