From 33215d6099e7aa4728c240c26d243b3862004197 Mon Sep 17 00:00:00 2001 From: Timo Date: Sun, 23 Aug 2020 17:29:39 +0200 Subject: [PATCH 1/2] fix: send notification count updates when private read receipts change --- src/client_server/read_marker.rs | 5 +- src/client_server/sync.rs | 20 ++--- src/client_server/typing.rs | 4 +- src/database.rs | 13 +-- src/database/rooms.rs | 2 +- src/database/rooms/edus.rs | 148 ++++++++++++++++++------------- 6 files changed, 107 insertions(+), 85 deletions(-) diff --git a/src/client_server/read_marker.rs b/src/client_server/read_marker.rs index ff72765f..1b8bd8e4 100644 --- a/src/client_server/read_marker.rs +++ b/src/client_server/read_marker.rs @@ -34,13 +34,14 @@ pub fn set_read_marker_route( )?; if let Some(event) = &body.read_receipt { - db.rooms.edus.room_read_set( + db.rooms.edus.private_read_set( &body.room_id, &sender_id, db.rooms.get_pdu_count(event)?.ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Event does not exist.", ))?, + &db.globals, )?; let mut user_receipts = BTreeMap::new(); @@ -58,7 +59,7 @@ pub fn set_read_marker_route( }, ); - db.rooms.edus.roomlatest_update( + db.rooms.edus.readreceipt_update( &sender_id, &body.room_id, AnyEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt( diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 2307f028..8f373544 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -81,7 +81,12 @@ pub async fn sync_events_route( .rev() .collect::>(); - let send_notification_counts = !timeline_pdus.is_empty(); + let send_notification_counts = !timeline_pdus.is_empty() + || db + .rooms + .edus + .last_privateread_update(&sender_id, &room_id)? + > since; // They /sync response doesn't always return all messages, so we say the output is // limited unless there are events in non_timeline_pdus @@ -242,7 +247,7 @@ pub async fn sync_events_route( }; let notification_count = if send_notification_counts { - if let Some(last_read) = db.rooms.edus.room_read_get(&room_id, &sender_id)? { + if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_id)? { Some( (db.rooms .pdus_since(&sender_id, &room_id, last_read)? @@ -280,20 +285,15 @@ pub async fn sync_events_route( let mut edus = db .rooms .edus - .roomlatests_since(&room_id, since)? + .readreceipts_since(&room_id, since)? .filter_map(|r| r.ok()) // Filter out buggy events .collect::>(); - if db - .rooms - .edus - .last_roomactive_update(&room_id, &db.globals)? - > since - { + if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since { edus.push( serde_json::from_str( &serde_json::to_string(&AnySyncEphemeralRoomEvent::Typing( - db.rooms.edus.roomactives_all(&room_id)?, + db.rooms.edus.typings_all(&room_id)?, )) .expect("event is valid, we just created it"), ) diff --git a/src/client_server/typing.rs b/src/client_server/typing.rs index 7eba13e1..89e1e4ab 100644 --- a/src/client_server/typing.rs +++ b/src/client_server/typing.rs @@ -16,7 +16,7 @@ pub fn create_typing_event_route( let sender_id = body.sender_id.as_ref().expect("user is authenticated"); if body.typing { - db.rooms.edus.roomactive_add( + db.rooms.edus.typing_add( &sender_id, &body.room_id, body.timeout.map(|d| d.as_millis() as u64).unwrap_or(30000) @@ -26,7 +26,7 @@ pub fn create_typing_event_route( } else { db.rooms .edus - .roomactive_remove(&sender_id, &body.room_id, &db.globals)?; + .typing_remove(&sender_id, &body.room_id, &db.globals)?; } Ok(create_typing_event::Response.into()) diff --git a/src/database.rs b/src/database.rs index 7bbb6dd7..41781b95 100644 --- a/src/database.rs +++ b/src/database.rs @@ -88,10 +88,11 @@ impl Database { }, rooms: rooms::Rooms { edus: rooms::RoomEdus { - roomuserid_lastread: db.open_tree("roomuserid_lastread")?, // "Private" read receipt - roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest")?, // Read receipts - roomactiveid_userid: db.open_tree("roomactiveid_userid")?, // Typing notifs - roomid_lastroomactiveupdate: db.open_tree("roomid_lastroomactiveupdate")?, + readreceiptid_readreceipt: db.open_tree("readreceiptid_readreceipt")?, + roomuserid_privateread: db.open_tree("roomuserid_privateread")?, // "Private" read receipt + roomuserid_lastprivatereadupdate: db.open_tree("roomid_lastprivatereadupdate")?, + typingid_userid: db.open_tree("typingid_userid")?, + roomid_lasttypingupdate: db.open_tree("roomid_lasttypingupdate")?, presenceid_presence: db.open_tree("presenceid_presence")?, userid_lastpresenceupdate: db.open_tree("userid_lastpresenceupdate")?, }, @@ -163,14 +164,14 @@ impl Database { futures.push( self.rooms .edus - .roomid_lastroomactiveupdate + .roomid_lasttypingupdate .watch_prefix(&roomid_bytes), ); futures.push( self.rooms .edus - .roomlatestid_roomlatest + .readreceiptid_readreceipt .watch_prefix(&roomid_prefix), ); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 3c1febd4..bb14c8a5 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -621,7 +621,7 @@ impl Rooms { } _ => {} } - self.edus.room_read_set(&room_id, &sender, index)?; + self.edus.private_read_set(&room_id, &sender, index, &globals)?; Ok(pdu.event_id) } diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index fff30c22..fbd3edb6 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -14,17 +14,18 @@ use std::{ }; pub struct RoomEdus { - pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User - pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId - pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count - pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count + pub(in super::super) readreceiptid_readreceipt: sled::Tree, // ReadReceiptId = RoomId + Count + UserId + pub(in super::super) roomuserid_privateread: sled::Tree, // RoomUserId = Room + User, PrivateRead = Count + pub(in super::super) roomuserid_lastprivatereadupdate: sled::Tree, // LastPrivateReadUpdate = Count + pub(in super::super) typingid_userid: sled::Tree, // TypingId = RoomId + TimeoutTime + Count + pub(in super::super) roomid_lasttypingupdate: sled::Tree, // LastRoomTypingUpdate = Count pub(in super::super) presenceid_presence: sled::Tree, // PresenceId = RoomId + Count + UserId pub(in super::super) userid_lastpresenceupdate: sled::Tree, // LastPresenceUpdate = Count } impl RoomEdus { /// Adds an event which will be saved until a new event replaces it (e.g. read receipt). - pub fn roomlatest_update( + pub fn readreceipt_update( &self, user_id: &UserId, room_id: &RoomId, @@ -36,7 +37,7 @@ impl RoomEdus { // Remove old entry if let Some(old) = self - .roomlatestid_roomlatest + .readreceiptid_readreceipt .scan_prefix(&prefix) .keys() .rev() @@ -50,7 +51,7 @@ impl RoomEdus { }) { // This is the old room_latest - self.roomlatestid_roomlatest.remove(old)?; + self.readreceiptid_readreceipt.remove(old)?; } let mut room_latest_id = prefix; @@ -58,7 +59,7 @@ impl RoomEdus { room_latest_id.push(0xff); room_latest_id.extend_from_slice(&user_id.to_string().as_bytes()); - self.roomlatestid_roomlatest.insert( + self.readreceiptid_readreceipt.insert( room_latest_id, &*serde_json::to_string(&event).expect("EduEvent::to_string always works"), )?; @@ -67,7 +68,7 @@ impl RoomEdus { } /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - pub fn roomlatests_since( + pub fn readreceipts_since( &self, room_id: &RoomId, since: u64, @@ -79,7 +80,7 @@ impl RoomEdus { first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since Ok(self - .roomlatestid_roomlatest + .readreceiptid_readreceipt .range(&*first_possible_edu..) .filter_map(|r| r.ok()) .take_while(move |(k, _)| k.starts_with(&prefix)) @@ -90,9 +91,54 @@ impl RoomEdus { })) } - /// Sets a user as typing until the timeout timestamp is reached or roomactive_remove is + /// Sets a private read marker at `count`. + pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64, globals: &super::super::globals::Globals) -> Result<()> { + let mut key = room_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&user_id.to_string().as_bytes()); + + self.roomuserid_privateread + .insert(&key, &count.to_be_bytes())?; + + self.roomuserid_lastprivatereadupdate + .insert(&key, &globals.next_count()?.to_be_bytes())?; + + Ok(()) + } + + /// Returns the private read marker. + pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + let mut key = room_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&user_id.to_string().as_bytes()); + + self.roomuserid_privateread.get(key)?.map_or(Ok(None), |v| { + Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { + Error::bad_database("Invalid private read marker bytes") + })?)) + }) + } + + /// Returns the count of the last typing update in this room. + pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut key = room_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&user_id.to_string().as_bytes()); + + Ok(self + .roomuserid_lastprivatereadupdate + .get(&key)? + .map_or(Ok::<_, Error>(None), |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.") + })?)) + })? + .unwrap_or(0)) + } + + /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is /// called. - pub fn roomactive_add( + pub fn typing_add( &self, user_id: &UserId, room_id: &RoomId, @@ -104,22 +150,22 @@ impl RoomEdus { let count = globals.next_count()?.to_be_bytes(); - let mut room_active_id = prefix; - room_active_id.extend_from_slice(&timeout.to_be_bytes()); - room_active_id.push(0xff); - room_active_id.extend_from_slice(&count); + let mut room_typing_id = prefix; + room_typing_id.extend_from_slice(&timeout.to_be_bytes()); + room_typing_id.push(0xff); + room_typing_id.extend_from_slice(&count); - self.roomactiveid_userid - .insert(&room_active_id, &*user_id.to_string().as_bytes())?; + self.typingid_userid + .insert(&room_typing_id, &*user_id.to_string().as_bytes())?; - self.roomid_lastroomactiveupdate + self.roomid_lasttypingupdate .insert(&room_id.to_string().as_bytes(), &count)?; Ok(()) } /// Removes a user from typing before the timeout is reached. - pub fn roomactive_remove( + pub fn typing_remove( &self, user_id: &UserId, room_id: &RoomId, @@ -132,19 +178,19 @@ impl RoomEdus { let mut found_outdated = false; - // Maybe there are multiple ones from calling roomactive_add multiple times + // Maybe there are multiple ones from calling roomtyping_add multiple times for outdated_edu in self - .roomactiveid_userid + .typingid_userid .scan_prefix(&prefix) .filter_map(|r| r.ok()) .filter(|(_, v)| v == user_id.as_bytes()) { - self.roomactiveid_userid.remove(outdated_edu.0)?; + self.typingid_userid.remove(outdated_edu.0)?; found_outdated = true; } if found_outdated { - self.roomid_lastroomactiveupdate.insert( + self.roomid_lasttypingupdate.insert( &room_id.to_string().as_bytes(), &globals.next_count()?.to_be_bytes(), )?; @@ -154,7 +200,7 @@ impl RoomEdus { } /// Makes sure that typing events with old timestamps get removed. - fn roomactives_maintain( + fn typings_maintain( &self, room_id: &RoomId, globals: &super::super::globals::Globals, @@ -168,7 +214,7 @@ impl RoomEdus { // Find all outdated edus before inserting a new one for outdated_edu in self - .roomactiveid_userid + .typingid_userid .scan_prefix(&prefix) .keys() .map(|key| { @@ -176,21 +222,21 @@ impl RoomEdus { Ok::<_, Error>(( key.clone(), utils::u64_from_bytes(key.split(|&b| b == 0xff).nth(1).ok_or_else(|| { - Error::bad_database("RoomActive has invalid timestamp or delimiters.") + Error::bad_database("RoomTyping has invalid timestamp or delimiters.") })?) - .map_err(|_| Error::bad_database("RoomActive has invalid timestamp bytes."))?, + .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, )) }) .filter_map(|r| r.ok()) .take_while(|&(_, timestamp)| timestamp < current_timestamp) { // This is an outdated edu (time > timestamp) - self.roomactiveid_userid.remove(outdated_edu.0)?; + self.typingid_userid.remove(outdated_edu.0)?; found_outdated = true; } if found_outdated { - self.roomid_lastroomactiveupdate.insert( + self.roomid_lasttypingupdate.insert( &room_id.to_string().as_bytes(), &globals.next_count()?.to_be_bytes(), )?; @@ -199,16 +245,16 @@ impl RoomEdus { Ok(()) } - /// Returns an iterator over all active events (e.g. typing notifications). - pub fn last_roomactive_update( + /// Returns the count of the last typing update in this room. + pub fn last_typing_update( &self, room_id: &RoomId, globals: &super::super::globals::Globals, ) -> Result { - self.roomactives_maintain(room_id, globals)?; + self.typings_maintain(room_id, globals)?; Ok(self - .roomid_lastroomactiveupdate + .roomid_lasttypingupdate .get(&room_id.to_string().as_bytes())? .map_or(Ok::<_, Error>(None), |bytes| { Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { @@ -218,7 +264,7 @@ impl RoomEdus { .unwrap_or(0)) } - pub fn roomactives_all( + pub fn typings_all( &self, room_id: &RoomId, ) -> Result> { @@ -228,17 +274,15 @@ impl RoomEdus { let mut user_ids = Vec::new(); for user_id in self - .roomactiveid_userid + .typingid_userid .scan_prefix(prefix) .values() .map(|user_id| { Ok::<_, Error>( UserId::try_from(utils::string_from_bytes(&user_id?).map_err(|_| { - Error::bad_database("User ID in roomactiveid_userid is invalid unicode.") + Error::bad_database("User ID in typingid_userid is invalid unicode.") })?) - .map_err(|_| { - Error::bad_database("User ID in roomactiveid_userid is invalid.") - })?, + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?, ) }) { @@ -250,30 +294,6 @@ impl RoomEdus { }) } - /// Sets a private read marker at `count`. - pub fn room_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { - let mut key = room_id.to_string().as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(&user_id.to_string().as_bytes()); - - self.roomuserid_lastread.insert(key, &count.to_be_bytes())?; - - Ok(()) - } - - /// Returns the private read marker. - pub fn room_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - let mut key = room_id.to_string().as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(&user_id.to_string().as_bytes()); - - self.roomuserid_lastread.get(key)?.map_or(Ok(None), |v| { - Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { - Error::bad_database("Invalid private read marker bytes") - })?)) - }) - } - /// Adds a presence event which will be saved until a new event replaces it. /// /// Note: This method takes a RoomId because presence updates are always bound to rooms to From 0c1cc8d82bb1474f6585a516a01fbabe8b040a66 Mon Sep 17 00:00:00 2001 From: Timo Date: Mon, 24 Aug 2020 10:45:57 +0200 Subject: [PATCH 2/2] Fix CI --- sytest/sytest-whitelist | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sytest/sytest-whitelist b/sytest/sytest-whitelist index 3c1095b3..b0b20975 100644 --- a/sytest/sytest-whitelist +++ b/sytest/sytest-whitelist @@ -17,6 +17,7 @@ Can invite users to invite-only rooms Can list tags for a room Can logout all devices Can logout current device +Can re-join room if re-invited Can read configuration endpoint Can recv a device message using /sync Can recv device messages until they are acknowledged @@ -113,7 +114,6 @@ Typing events appear in incremental sync Typing events appear in initial sync Uninvited users cannot join the room User appears in user directory -User directory correctly update on display name change User in dir while user still shares private rooms User in shared private room does appear in user directory User is offline if they set_presence=offline in their sync