diff --git a/src/api/client_server/read_marker.rs b/src/api/client_server/read_marker.rs index b12468a7..dc6dfe9a 100644 --- a/src/api/client_server/read_marker.rs +++ b/src/api/client_server/read_marker.rs @@ -34,29 +34,33 @@ pub async fn set_read_marker_route( )?; } - if body.private_read_receipt.is_some() || body.read_receipt.is_some() { - services() - .rooms - .user - .reset_notification_counts(sender_user, &body.room_id)?; - } - if let Some(event) = &body.private_read_receipt { + let _pdu = services() + .rooms + .timeline + .get_pdu(event)? + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Event does not exist.", + ))?; + services().rooms.edus.read_receipt.private_read_set( &body.room_id, sender_user, - services() - .rooms - .timeline - .get_pdu_count(event)? - .ok_or(Error::BadRequest( - ErrorKind::InvalidParam, - "Event does not exist.", - ))?, + services().rooms.short.get_or_create_shorteventid(event)?, )?; } if let Some(event) = &body.read_receipt { + let _pdu = services() + .rooms + .timeline + .get_pdu(event)? + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Event does not exist.", + ))?; + let mut user_receipts = BTreeMap::new(); user_receipts.insert( sender_user.clone(), @@ -80,6 +84,12 @@ pub async fn set_read_marker_route( room_id: body.room_id.clone(), }, )?; + + services().rooms.edus.read_receipt.private_read_set( + &body.room_id, + sender_user, + services().rooms.short.get_or_create_shorteventid(event)?, + )?; } Ok(set_read_marker::v3::Response {}) @@ -93,16 +103,6 @@ pub async fn create_receipt_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - if matches!( - &body.receipt_type, - create_receipt::v3::ReceiptType::Read | create_receipt::v3::ReceiptType::ReadPrivate - ) { - services() - .rooms - .user - .reset_notification_counts(sender_user, &body.room_id)?; - } - match body.receipt_type { create_receipt::v3::ReceiptType::FullyRead => { let fully_read_event = ruma::events::fully_read::FullyReadEvent { @@ -118,6 +118,16 @@ pub async fn create_receipt_route( )?; } create_receipt::v3::ReceiptType::Read => { + let _pdu = + services() + .rooms + .timeline + .get_pdu(&body.event_id)? + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Event does not exist.", + ))?; + let mut user_receipts = BTreeMap::new(); user_receipts.insert( sender_user.clone(), @@ -140,19 +150,34 @@ pub async fn create_receipt_route( room_id: body.room_id.clone(), }, )?; - } - create_receipt::v3::ReceiptType::ReadPrivate => { + services().rooms.edus.read_receipt.private_read_set( &body.room_id, sender_user, + services() + .rooms + .short + .get_or_create_shorteventid(&body.event_id)?, + )?; + } + create_receipt::v3::ReceiptType::ReadPrivate => { + let _pdu = services() .rooms .timeline - .get_pdu_count(&body.event_id)? + .get_pdu(&body.event_id)? .ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Event does not exist.", - ))?, + ))?; + + services().rooms.edus.read_receipt.private_read_set( + &body.room_id, + sender_user, + services() + .rooms + .short + .get_or_create_shorteventid(&body.event_id)?, )?; } _ => return Err(Error::bad_database("Unsupported receipt type")), diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 568a23ce..60e388d9 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -6,6 +6,7 @@ use ruma::{ uiaa::UiaaResponse, }, events::{ + receipt::{ReceiptThread, ReceiptType}, room::member::{MembershipState, RoomMemberEventContent}, RoomEventType, StateEventType, }, @@ -731,6 +732,50 @@ async fn sync_helper( .map(|(_, _, v)| v) .collect(); + if services() + .rooms + .edus + .read_receipt + .last_privateread_update(&sender_user, &room_id) + .unwrap_or(0) + > since + { + if let Ok(event_id) = services().rooms.short.get_eventid_from_short( + services() + .rooms + .edus + .read_receipt + .private_read_get(&room_id, &sender_user) + .expect("User did not have a valid private read receipt?") + .expect("User had a last read private receipt update but no receipt?"), + ) { + let mut user_receipts = BTreeMap::new(); + user_receipts.insert( + sender_user.clone(), + ruma::events::receipt::Receipt { + ts: None, + thread: ReceiptThread::Unthreaded, + }, + ); + + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::ReadPrivate, user_receipts); + + let mut receipt_content = BTreeMap::new(); + receipt_content.insert((*event_id).to_owned(), receipts); + + edus.push( + serde_json::from_str( + &serde_json::to_string(&ruma::events::SyncEphemeralRoomEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + }) + .expect("Did not get valid JSON?"), + ) + .expect("JSON was somehow invalid despite just being created"), + ); + } + }; + if services().rooms.edus.typing.last_typing_update(&room_id)? > since { edus.push( serde_json::from_str( diff --git a/src/api/client_server/unversioned.rs b/src/api/client_server/unversioned.rs index 526598b9..cabee023 100644 --- a/src/api/client_server/unversioned.rs +++ b/src/api/client_server/unversioned.rs @@ -24,7 +24,10 @@ pub async fn get_supported_versions_route( "v1.1".to_owned(), "v1.2".to_owned(), ], - unstable_features: BTreeMap::from_iter([("org.matrix.e2e_cross_signing".to_owned(), true)]), + unstable_features: BTreeMap::from_iter([ + ("org.matrix.e2e_cross_signing".to_owned(), true), + ("org.matrix.msc2285.stable".to_owned(), true), + ]), }; Ok(resp) diff --git a/src/database/key_value/rooms/edus/read_receipt.rs b/src/database/key_value/rooms/edus/read_receipt.rs index fa97ea34..4722cdc0 100644 --- a/src/database/key_value/rooms/edus/read_receipt.rs +++ b/src/database/key_value/rooms/edus/read_receipt.rs @@ -105,16 +105,25 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase { ) } - fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { + fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + shorteventid: u64, + ) -> Result<()> { let mut key = room_id.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(user_id.as_bytes()); - self.roomuserid_privateread - .insert(&key, &count.to_be_bytes())?; + if self.private_read_get(room_id, user_id)?.unwrap_or(0) < shorteventid { + self.roomuserid_privateread + .insert(&key, &shorteventid.to_be_bytes())?; - self.roomuserid_lastprivatereadupdate - .insert(&key, &services().globals.next_count()?.to_be_bytes()) + self.roomuserid_lastprivatereadupdate + .insert(&key, &services().globals.next_count()?.to_be_bytes()) + } else { + Ok(()) + } } fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 4c435720..63a13d36 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -3,7 +3,13 @@ use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; use crate::{database::KeyValueDatabase, service, services, utils, Error, Result}; impl service::rooms::user::Data for KeyValueDatabase { - fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + fn update_notification_counts( + &self, + user_id: &UserId, + room_id: &RoomId, + notification_count: u64, + highlight_count: u64, + ) -> Result<()> { let mut userroom_id = user_id.as_bytes().to_vec(); userroom_id.push(0xff); userroom_id.extend_from_slice(room_id.as_bytes()); @@ -12,9 +18,9 @@ impl service::rooms::user::Data for KeyValueDatabase { roomuser_id.extend_from_slice(user_id.as_bytes()); self.userroomid_notificationcount - .insert(&userroom_id, &0_u64.to_be_bytes())?; + .insert(&userroom_id, ¬ification_count.to_be_bytes())?; self.userroomid_highlightcount - .insert(&userroom_id, &0_u64.to_be_bytes())?; + .insert(&userroom_id, &highlight_count.to_be_bytes())?; self.roomuserid_lastnotificationread.insert( &roomuser_id, diff --git a/src/service/rooms/edus/read_receipt/data.rs b/src/service/rooms/edus/read_receipt/data.rs index a183d196..7ebd3589 100644 --- a/src/service/rooms/edus/read_receipt/data.rs +++ b/src/service/rooms/edus/read_receipt/data.rs @@ -25,8 +25,9 @@ pub trait Data: Send + Sync { > + 'a, >; - /// Sets a private read marker at `count`. - fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>; + /// Sets a private read marker at `shorteventid`. + fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, shorteventid: u64) + -> Result<()>; /// Returns the private read marker. fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; diff --git a/src/service/rooms/edus/read_receipt/mod.rs b/src/service/rooms/edus/read_receipt/mod.rs index c6035280..a18a0dae 100644 --- a/src/service/rooms/edus/read_receipt/mod.rs +++ b/src/service/rooms/edus/read_receipt/mod.rs @@ -2,7 +2,7 @@ mod data; pub use data::Data; -use crate::Result; +use crate::{services, Result}; use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId}; pub struct Service { @@ -36,10 +36,19 @@ impl Service { self.db.readreceipts_since(room_id, since) } - /// Sets a private read marker at `count`. + /// Sets a private read marker at `shorteventid`. #[tracing::instrument(skip(self))] - pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { - self.db.private_read_set(room_id, user_id, count) + pub fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + shorteventid: u64, + ) -> Result<()> { + self.db.private_read_set(room_id, user_id, shorteventid)?; + services() + .rooms + .user + .update_notification_counts(user_id, room_id) } /// Returns the private read marker. diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 34399d46..f955b7a0 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -213,18 +213,17 @@ impl Service { ); let insert_lock = mutex_insert.lock().unwrap(); - let count1 = services().globals.next_count()?; + let _count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if appending // fails - services() - .rooms - .edus - .read_receipt - .private_read_set(&pdu.room_id, &pdu.sender, count1)?; - services() - .rooms - .user - .reset_notification_counts(&pdu.sender, &pdu.room_id)?; + services().rooms.edus.read_receipt.private_read_set( + &pdu.room_id, + &pdu.sender, + services() + .rooms + .short + .get_or_create_shorteventid(&pdu.event_id)?, + )?; let count2 = services().globals.next_count()?; let mut pdu_id = shortroomid.to_be_bytes().to_vec(); diff --git a/src/service/rooms/user/data.rs b/src/service/rooms/user/data.rs index 4b8a4eca..90fc18bf 100644 --- a/src/service/rooms/user/data.rs +++ b/src/service/rooms/user/data.rs @@ -2,7 +2,13 @@ use crate::Result; use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; pub trait Data: Send + Sync { - fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; + fn update_notification_counts( + &self, + user_id: &UserId, + room_id: &RoomId, + notification_count: u64, + highlight_count: u64, + ) -> Result<()>; fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result; diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index 672e502d..c6c7867b 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -1,17 +1,117 @@ mod data; pub use data::Data; -use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; +use ruma::{ + events::{ + push_rules::PushRulesEvent, room::power_levels::RoomPowerLevelsEventContent, + GlobalAccountDataEventType, StateEventType, + }, + push::{Action, Ruleset, Tweak}, + OwnedRoomId, OwnedUserId, RoomId, UserId, +}; -use crate::Result; +use crate::{services, Error, Result}; pub struct Service { pub db: &'static dyn Data, } impl Service { - pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { - self.db.reset_notification_counts(user_id, room_id) + pub fn update_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + let power_levels: RoomPowerLevelsEventContent = services() + .rooms + .state_accessor + .room_state_get(room_id, &StateEventType::RoomPowerLevels, "")? + .map(|ev| { + serde_json::from_str(ev.content.get()) + .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) + }) + .transpose()? + .unwrap_or_default(); + + let read_event = services() + .rooms + .edus + .read_receipt + .private_read_get(room_id, user_id) + .unwrap_or(None) + .unwrap_or(0u64); + let mut notification_count = 0u64; + let mut highlight_count = 0u64; + + services() + .rooms + .timeline + .pdus_since(user_id, room_id, read_event)? + .filter_map(|pdu| pdu.ok()) + .map(|(_, pdu)| pdu) + .filter(|pdu| { + // Don't include user's own messages in notification counts + user_id != &pdu.sender + && services() + .rooms + .short + .get_or_create_shorteventid(&pdu.event_id) + .unwrap_or(0) + != read_event + }) + .filter_map(|pdu| { + let rules_for_user = services() + .account_data + .get( + None, + user_id, + GlobalAccountDataEventType::PushRules.to_string().into(), + ) + .ok()? + .map(|event| { + serde_json::from_str::(event.get()) + .map_err(|_| Error::bad_database("Invalid push rules event in db.")) + }) + .transpose() + .ok()? + .map(|ev: PushRulesEvent| ev.content.global) + .unwrap_or_else(|| Ruleset::server_default(user_id)); + + let mut highlight = false; + let mut notify = false; + + for action in services() + .pusher + .get_actions( + user_id, + &rules_for_user, + &power_levels, + &pdu.to_sync_room_event(), + &pdu.room_id, + ) + .ok()? + { + match action { + Action::DontNotify => notify = false, + // TODO: Implement proper support for coalesce + Action::Notify | Action::Coalesce => notify = true, + Action::SetTweak(Tweak::Highlight(true)) => { + highlight = true; + } + _ => {} + }; + } + + if notify { + notification_count += 1; + }; + + if highlight { + highlight_count += 1; + }; + + Some(()) + }) + .for_each(|_| {}); + + self.db + .update_notification_counts(user_id, room_id, notification_count, highlight_count) } pub fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result {