diff --git a/src/client_server/context.rs b/src/client_server/context.rs index 9bfec9e1..94a44e39 100644 --- a/src/client_server/context.rs +++ b/src/client_server/context.rs @@ -1,5 +1,9 @@ use crate::{database::DatabaseGuard, ConduitResult, Error, Ruma}; -use ruma::api::client::{error::ErrorKind, r0::context::get_context}; +use ruma::{ + api::client::{error::ErrorKind, r0::context::get_context}, + events::EventType, +}; +use std::collections::HashSet; use std::convert::TryFrom; #[cfg(feature = "conduit_bin")] @@ -21,6 +25,7 @@ pub async fn get_context_route( body: Ruma>, ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let sender_device = body.sender_device.as_ref().expect("user is authenticated"); if !db.rooms.is_joined(sender_user, &body.room_id)? { return Err(Error::BadRequest( @@ -29,6 +34,8 @@ pub async fn get_context_route( )); } + let mut lazy_loaded = HashSet::new(); + let base_pdu_id = db .rooms .get_pdu_id(&body.event_id)? @@ -45,8 +52,18 @@ pub async fn get_context_route( .ok_or(Error::BadRequest( ErrorKind::NotFound, "Base event not found.", - ))? - .to_room_event(); + ))?; + + if !db.rooms.lazy_load_was_sent_before( + &sender_user, + &sender_device, + &body.room_id, + &base_event.sender, + )? { + lazy_loaded.insert(base_event.sender.clone()); + } + + let base_event = base_event.to_room_event(); let events_before: Vec<_> = db .rooms @@ -60,6 +77,17 @@ pub async fn get_context_route( .filter_map(|r| r.ok()) // Remove buggy events .collect(); + for (_, event) in &events_before { + if !db.rooms.lazy_load_was_sent_before( + &sender_user, + &sender_device, + &body.room_id, + &event.sender, + )? { + lazy_loaded.insert(event.sender.clone()); + } + } + let start_token = events_before .last() .and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok()) @@ -82,6 +110,17 @@ pub async fn get_context_route( .filter_map(|r| r.ok()) // Remove buggy events .collect(); + for (_, event) in &events_after { + if !db.rooms.lazy_load_was_sent_before( + &sender_user, + &sender_device, + &body.room_id, + &event.sender, + )? { + lazy_loaded.insert(event.sender.clone()); + } + } + let end_token = events_after .last() .and_then(|(pdu_id, _)| db.rooms.pdu_count(pdu_id).ok()) @@ -92,18 +131,23 @@ pub async fn get_context_route( .map(|(_, pdu)| pdu.to_room_event()) .collect(); + let mut state = Vec::new(); + for ll_id in &lazy_loaded { + if let Some(member_event) = + db.rooms + .room_state_get(&body.room_id, &EventType::RoomMember, ll_id.as_str())? + { + state.push(member_event.to_state_event()); + } + } + let resp = get_context::Response { start: start_token, end: end_token, events_before, event: Some(base_event), events_after, - state: db // TODO: State at event - .rooms - .room_state_full(&body.room_id)? - .values() - .map(|pdu| pdu.to_state_event()) - .collect(), + state, }; Ok(resp.into()) diff --git a/src/client_server/message.rs b/src/client_server/message.rs index cbce019e..48ca4ae8 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -6,7 +6,11 @@ use ruma::{ }, events::EventType, }; -use std::{collections::BTreeMap, convert::TryInto, sync::Arc}; +use std::{ + collections::{BTreeMap, HashSet}, + convert::TryInto, + sync::Arc, +}; #[cfg(feature = "conduit_bin")] use rocket::{get, put}; @@ -117,6 +121,7 @@ pub async fn get_message_events_route( body: Ruma>, ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let sender_device = body.sender_device.as_ref().expect("user is authenticated"); if !db.rooms.is_joined(sender_user, &body.room_id)? { return Err(Error::BadRequest( @@ -136,6 +141,12 @@ pub async fn get_message_events_route( // Use limit or else 10 let limit = body.limit.try_into().map_or(10_usize, |l: u32| l as usize); + let next_token; + + let mut resp = get_message_events::Response::new(); + + let mut lazy_loaded = HashSet::new(); + match body.dir { get_message_events::Direction::Forward => { let events_after: Vec<_> = db @@ -152,21 +163,27 @@ pub async fn get_message_events_route( .take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to` .collect(); - let end_token = events_after.last().map(|(count, _)| count.to_string()); + for (_, event) in &events_after { + if !db.rooms.lazy_load_was_sent_before( + &sender_user, + &sender_device, + &body.room_id, + &event.sender, + )? { + lazy_loaded.insert(event.sender.clone()); + } + } + + next_token = events_after.last().map(|(count, _)| count).copied(); let events_after: Vec<_> = events_after .into_iter() .map(|(_, pdu)| pdu.to_room_event()) .collect(); - let resp = get_message_events::Response { - start: body.from.to_owned(), - end: end_token, - chunk: events_after, - state: Vec::new(), - }; - - Ok(resp.into()) + resp.start = body.from.to_owned(); + resp.end = next_token.map(|count| count.to_string()); + resp.chunk = events_after; } get_message_events::Direction::Backward => { let events_before: Vec<_> = db @@ -183,21 +200,51 @@ pub async fn get_message_events_route( .take_while(|&(k, _)| Some(Ok(k)) != to) // Stop at `to` .collect(); - let start_token = events_before.last().map(|(count, _)| count.to_string()); + for (_, event) in &events_before { + if !db.rooms.lazy_load_was_sent_before( + &sender_user, + &sender_device, + &body.room_id, + &event.sender, + )? { + lazy_loaded.insert(event.sender.clone()); + } + } + + next_token = events_before.last().map(|(count, _)| count).copied(); let events_before: Vec<_> = events_before .into_iter() .map(|(_, pdu)| pdu.to_room_event()) .collect(); - let resp = get_message_events::Response { - start: body.from.to_owned(), - end: start_token, - chunk: events_before, - state: Vec::new(), - }; - - Ok(resp.into()) + resp.start = body.from.to_owned(); + resp.end = next_token.map(|count| count.to_string()); + resp.chunk = events_before; } } + + db.rooms + .lazy_load_confirm_delivery(&sender_user, &sender_device, &body.room_id, from)?; + resp.state = Vec::new(); + for ll_id in &lazy_loaded { + if let Some(member_event) = + db.rooms + .room_state_get(&body.room_id, &EventType::RoomMember, ll_id.as_str())? + { + resp.state.push(member_event.to_state_event()); + } + } + + if let Some(next_token) = next_token { + db.rooms.lazy_load_mark_sent( + &sender_user, + &sender_device, + &body.room_id, + lazy_loaded.into_iter().collect(), + next_token, + ); + } + + Ok(resp.into()) } diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 64588a2c..88bf8614 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -264,6 +264,14 @@ async fn sync_helper( // limited unless there are events in non_timeline_pdus let limited = non_timeline_pdus.next().is_some(); + let mut timeline_users = HashSet::new(); + for (_, event) in &timeline_pdus { + timeline_users.insert(event.sender.as_str().to_owned()); + } + + db.rooms + .lazy_load_confirm_delivery(&sender_user, &sender_device, &room_id, since)?; + // Database queries: let current_shortstatehash = db @@ -344,14 +352,51 @@ async fn sync_helper( state_events, ) = if since_shortstatehash.is_none() { // Probably since = 0, we will do an initial sync + let (joined_member_count, invited_member_count, heroes) = calculate_counts()?; let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?; - let state_events: Vec<_> = current_state_ids - .iter() - .map(|(_, id)| db.rooms.get_pdu(id)) - .filter_map(|r| r.ok().flatten()) - .collect(); + + let mut state_events = Vec::new(); + let mut lazy_loaded = Vec::new(); + + for (_, id) in current_state_ids { + let pdu = match db.rooms.get_pdu(&id)? { + Some(pdu) => pdu, + None => { + error!("Pdu in state not found: {}", id); + continue; + } + }; + let state_key = pdu + .state_key + .as_ref() + .expect("state events have state keys"); + if pdu.kind != EventType::RoomMember { + state_events.push(pdu); + } else if full_state || timeline_users.contains(state_key) { + // TODO: check filter: is ll enabled? + lazy_loaded.push( + UserId::parse(state_key.as_ref()) + .expect("they are in timeline_users, so they should be correct"), + ); + state_events.push(pdu); + } + } + + // Reset lazy loading because this is an initial sync + db.rooms + .lazy_load_reset(&sender_user, &sender_device, &room_id)?; + + // The state_events above should contain all timeline_users, let's mark them as lazy + // loaded. + db.rooms.lazy_load_mark_sent( + &sender_user, + &sender_device, + &room_id, + lazy_loaded, + next_batch, + ); ( heroes, @@ -387,20 +432,66 @@ async fn sync_helper( let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?; - let state_events = if joined_since_last_sync { + /* + let state_events = if joined_since_last_sync || full_state { current_state_ids .iter() .map(|(_, id)| db.rooms.get_pdu(id)) .filter_map(|r| r.ok().flatten()) .collect::>() } else { - current_state_ids - .iter() - .filter(|(key, id)| since_state_ids.get(key) != Some(id)) - .map(|(_, id)| db.rooms.get_pdu(id)) - .filter_map(|r| r.ok().flatten()) - .collect() - }; + */ + let mut state_events = Vec::new(); + let mut lazy_loaded = Vec::new(); + + for (key, id) in current_state_ids { + let pdu = match db.rooms.get_pdu(&id)? { + Some(pdu) => pdu, + None => { + error!("Pdu in state not found: {}", id); + continue; + } + }; + + let state_key = pdu + .state_key + .as_ref() + .expect("state events have state keys"); + + if pdu.kind != EventType::RoomMember { + if full_state || since_state_ids.get(&key) != Some(&id) { + state_events.push(pdu); + } + continue; + } + + // Pdu has to be a member event + let state_key_userid = UserId::parse(state_key.as_ref()) + .expect("they are in timeline_users, so they should be correct"); + + if full_state || since_state_ids.get(&key) != Some(&id) { + lazy_loaded.push(state_key_userid); + state_events.push(pdu); + } else if timeline_users.contains(state_key) + && !db.rooms.lazy_load_was_sent_before( + &sender_user, + &sender_device, + &room_id, + &state_key_userid, + )? + { + lazy_loaded.push(state_key_userid); + state_events.push(pdu); + } + } + + db.rooms.lazy_load_mark_sent( + &sender_user, + &sender_device, + &room_id, + lazy_loaded, + next_batch, + ); let encrypted_room = db .rooms diff --git a/src/database.rs b/src/database.rs index af6136b3..9e020198 100644 --- a/src/database.rs +++ b/src/database.rs @@ -288,6 +288,8 @@ impl Database { userroomid_leftstate: builder.open_tree("userroomid_leftstate")?, roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?, + lazyloadedids: builder.open_tree("lazyloadedids")?, + userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?, userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?, @@ -323,6 +325,7 @@ impl Database { statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), + lazy_load_waiting: Mutex::new(HashMap::new()), stateinfo_cache: Mutex::new(LruCache::new(1000)), }, account_data: account_data::AccountData { diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 775e2f8d..b957b55d 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -28,7 +28,7 @@ use ruma::{ push::{Action, Ruleset, Tweak}, serde::{CanonicalJsonObject, CanonicalJsonValue, Raw}, state_res::{self, RoomVersion, StateMap}, - uint, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, + uint, DeviceId, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde::Deserialize; use serde_json::value::to_raw_value; @@ -79,6 +79,8 @@ pub struct Rooms { pub(super) userroomid_leftstate: Arc, pub(super) roomuserid_leftcount: Arc, + pub(super) lazyloadedids: Arc, // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId + pub(super) userroomid_notificationcount: Arc, // NotifyCount = u64 pub(super) userroomid_highlightcount: Arc, // HightlightCount = u64 @@ -117,6 +119,8 @@ pub struct Rooms { pub(super) shortstatekey_cache: Mutex>, pub(super) our_real_users_cache: RwLock, Arc>>>>, pub(super) appservice_in_room_cache: RwLock, HashMap>>, + pub(super) lazy_load_waiting: + Mutex, Box, Box, u64), Vec>>>, pub(super) stateinfo_cache: Mutex< LruCache< u64, @@ -3466,4 +3470,94 @@ impl Rooms { Ok(()) } + + #[tracing::instrument(skip(self))] + pub fn lazy_load_was_sent_before( + &self, + user_id: &UserId, + device_id: &DeviceId, + room_id: &RoomId, + ll_user: &UserId, + ) -> Result { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&device_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(&room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(&ll_user.as_bytes()); + Ok(self.lazyloadedids.get(&key)?.is_some()) + } + + #[tracing::instrument(skip(self))] + pub fn lazy_load_mark_sent( + &self, + user_id: &UserId, + device_id: &DeviceId, + room_id: &RoomId, + lazy_load: Vec>, + count: u64, + ) { + self.lazy_load_waiting.lock().unwrap().insert( + ( + user_id.to_owned(), + device_id.to_owned(), + room_id.to_owned(), + count, + ), + lazy_load, + ); + } + + #[tracing::instrument(skip(self))] + pub fn lazy_load_confirm_delivery( + &self, + user_id: &UserId, + device_id: &DeviceId, + room_id: &RoomId, + since: u64, + ) -> Result<()> { + if let Some(user_ids) = self.lazy_load_waiting.lock().unwrap().remove(&( + user_id.to_owned(), + device_id.to_owned(), + room_id.to_owned(), + since, + )) { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(&device_id.as_bytes()); + prefix.push(0xff); + prefix.extend_from_slice(&room_id.as_bytes()); + prefix.push(0xff); + + for ll_id in user_ids { + let mut key = prefix.clone(); + key.extend_from_slice(&ll_id.as_bytes()); + self.lazyloadedids.insert(&key, &[])?; + } + } + + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub fn lazy_load_reset( + &self, + user_id: &Box, + device_id: &Box, + room_id: &Box, + ) -> Result<()> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(&device_id.as_bytes()); + prefix.push(0xff); + prefix.extend_from_slice(&room_id.as_bytes()); + prefix.push(0xff); + + for (key, _) in self.lazyloadedids.scan_prefix(prefix) { + self.lazyloadedids.remove(&key)?; + } + + Ok(()) + } }