From 3ac3bdbac026d641c707fb2cc112a809c394f949 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Wed, 21 Dec 2022 17:28:21 +0100 Subject: [PATCH] feat: Keep track of avatar urls, displaynames, and blurhashes of remote users for the room directory --- src/api/client_server/membership.rs | 245 +++++++------ src/api/client_server/message.rs | 30 +- src/api/client_server/profile.rs | 80 +++- src/api/client_server/redact.rs | 34 +- src/api/client_server/room.rs | 487 ++++++++++++++----------- src/api/client_server/state.rs | 28 +- src/api/client_server/sync.rs | 6 +- src/api/server_server.rs | 20 +- src/service/admin/mod.rs | 477 +++++++++++++----------- src/service/globals/mod.rs | 2 +- src/service/rooms/event_handler/mod.rs | 40 +- src/service/rooms/state/mod.rs | 37 +- src/service/rooms/state_cache/mod.rs | 36 +- src/service/rooms/timeline/mod.rs | 66 ++-- 14 files changed, 908 insertions(+), 680 deletions(-) diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 87954ed3..6615547a 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -198,18 +198,22 @@ pub async fn kick_user_route( ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -264,18 +268,22 @@ pub async fn ban_user_route(body: Ruma) -> Result return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())), Err(e) => e, }; @@ -1259,28 +1280,32 @@ pub(crate) async fn invite_helper<'a>( ); let state_lock = mutex_state.lock().await; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Invite, - displayname: services().users.displayname(user_id)?, - avatar_url: services().users.avatar_url(user_id)?, - is_direct: Some(is_direct), - third_party_invite: None, - blurhash: services().users.blurhash(user_id)?, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - sender_user, - room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Invite, + displayname: services().users.displayname(user_id)?, + avatar_url: services().users.avatar_url(user_id)?, + is_direct: Some(is_direct), + third_party_invite: None, + blurhash: services().users.blurhash(user_id)?, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + sender_user, + room_id, + &state_lock, + ) + .await?; drop(state_lock); @@ -1334,14 +1359,18 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { )?; // We always drop the invite, we can't rely on other servers - services().rooms.state_cache.update_membership( - room_id, - user_id, - MembershipState::Leave, - user_id, - last_state, - true, - )?; + services() + .rooms + .state_cache + .update_membership( + room_id, + user_id, + RoomMemberEventContent::new(MembershipState::Leave), + user_id, + last_state, + true, + ) + .await?; } else { let mutex_state = Arc::clone( services() @@ -1365,14 +1394,18 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { None => { error!("Trying to leave a room you are not a member of."); - services().rooms.state_cache.update_membership( - room_id, - user_id, - MembershipState::Leave, - user_id, - None, - true, - )?; + services() + .rooms + .state_cache + .update_membership( + room_id, + user_id, + RoomMemberEventContent::new(MembershipState::Leave), + user_id, + None, + true, + ) + .await?; return Ok(()); } Some(e) => e, @@ -1383,18 +1416,22 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { event.membership = MembershipState::Leave; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - user_id, - room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + user_id, + room_id, + &state_lock, + ) + .await?; } Ok(()) diff --git a/src/api/client_server/message.rs b/src/api/client_server/message.rs index 6ad07517..a7c62a99 100644 --- a/src/api/client_server/message.rs +++ b/src/api/client_server/message.rs @@ -70,19 +70,23 @@ pub async fn send_message_event_route( let mut unsigned = BTreeMap::new(); unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into()); - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: body.event_type.to_string().into(), - content: serde_json::from_str(body.body.body.json().get()) - .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?, - unsigned: Some(unsigned), - state_key: None, - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: body.event_type.to_string().into(), + content: serde_json::from_str(body.body.body.json().get()) + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?, + unsigned: Some(unsigned), + state_key: None, + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; services().transaction_ids.add_txnid( sender_user, diff --git a/src/api/client_server/profile.rs b/src/api/client_server/profile.rs index 6400e891..f1d3ac5b 100644 --- a/src/api/client_server/profile.rs +++ b/src/api/client_server/profile.rs @@ -83,12 +83,11 @@ pub async fn set_displayname_route( ); let state_lock = mutex_state.lock().await; - let _ = services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - ); + let _ = services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await; // Presence update services().rooms.edus.presence.update_presence( @@ -115,15 +114,17 @@ pub async fn set_displayname_route( Ok(set_display_name::v3::Response {}) } -/// # `GET /_matrix/client/r0/profile/{userId}/displayname` +/// # `GET /_matrix/client/v3/profile/{userId}/displayname` /// /// Returns the displayname of the user. /// -/// - If user is on another server: Fetches displayname over federation +/// - If user is on another server and we do not have a copy, fetch over federation pub async fn get_displayname_route( body: Ruma, ) -> Result { - if body.user_id.server_name() != services().globals.server_name() { + if (services().users.exists(&body.user_id)?) + && (body.user_id.server_name() != services().globals.server_name()) + { let response = services() .sending .send_federation_request( @@ -135,6 +136,18 @@ pub async fn get_displayname_route( ) .await?; + // Create and update our local copy of the user + let _ = services().users.create(&body.user_id, None); + let _ = services() + .users + .set_displayname(&body.user_id, response.displayname.clone()); + let _ = services() + .users + .set_avatar_url(&body.user_id, response.avatar_url); + let _ = services() + .users + .set_blurhash(&body.user_id, response.blurhash); + return Ok(get_display_name::v3::Response { displayname: response.displayname, }); @@ -218,12 +231,11 @@ pub async fn set_avatar_url_route( ); let state_lock = mutex_state.lock().await; - let _ = services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - ); + let _ = services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await; // Presence update services().rooms.edus.presence.update_presence( @@ -250,15 +262,17 @@ pub async fn set_avatar_url_route( Ok(set_avatar_url::v3::Response {}) } -/// # `GET /_matrix/client/r0/profile/{userId}/avatar_url` +/// # `GET /_matrix/client/v3/profile/{userId}/avatar_url` /// /// Returns the avatar_url and blurhash of the user. /// -/// - If user is on another server: Fetches avatar_url and blurhash over federation +/// - If user is on another server and we do not have a copy, fetch over federation pub async fn get_avatar_url_route( body: Ruma, ) -> Result { - if body.user_id.server_name() != services().globals.server_name() { + if (services().users.exists(&body.user_id)?) + && (body.user_id.server_name() != services().globals.server_name()) + { let response = services() .sending .send_federation_request( @@ -270,6 +284,18 @@ pub async fn get_avatar_url_route( ) .await?; + // Create and update our local copy of the user + let _ = services().users.create(&body.user_id, None); + let _ = services() + .users + .set_displayname(&body.user_id, response.displayname); + let _ = services() + .users + .set_avatar_url(&body.user_id, response.avatar_url.clone()); + let _ = services() + .users + .set_blurhash(&body.user_id, response.blurhash.clone()); + return Ok(get_avatar_url::v3::Response { avatar_url: response.avatar_url, blurhash: response.blurhash, @@ -286,11 +312,13 @@ pub async fn get_avatar_url_route( /// /// Returns the displayname, avatar_url and blurhash of the user. /// -/// - If user is on another server: Fetches profile over federation +/// - If user is on another server and we do not have a copy, fetch over federation pub async fn get_profile_route( body: Ruma, ) -> Result { - if body.user_id.server_name() != services().globals.server_name() { + if (services().users.exists(&body.user_id)?) + && (body.user_id.server_name() != services().globals.server_name()) + { let response = services() .sending .send_federation_request( @@ -302,6 +330,18 @@ pub async fn get_profile_route( ) .await?; + // Create and update our local copy of the user + let _ = services().users.create(&body.user_id, None); + let _ = services() + .users + .set_displayname(&body.user_id, response.displayname.clone()); + let _ = services() + .users + .set_avatar_url(&body.user_id, response.avatar_url.clone()); + let _ = services() + .users + .set_blurhash(&body.user_id, response.blurhash.clone()); + return Ok(get_profile::v3::Response { displayname: response.displayname, avatar_url: response.avatar_url, diff --git a/src/api/client_server/redact.rs b/src/api/client_server/redact.rs index a29a5610..88059d6b 100644 --- a/src/api/client_server/redact.rs +++ b/src/api/client_server/redact.rs @@ -30,21 +30,25 @@ pub async fn redact_event_route( ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomRedaction, - content: to_raw_value(&RoomRedactionEventContent { - reason: body.reason.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: Some(body.event_id.into()), - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomRedaction, + content: to_raw_value(&RoomRedactionEventContent { + reason: body.reason.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: Some(body.event_id.into()), + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index c77cfa9b..d2238b44 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -173,42 +173,50 @@ pub async fn create_room_route( } // 1. The room create event - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCreate, - content: to_raw_value(&content).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCreate, + content: to_raw_value(&content).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 2. Let the room creator join - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: services().users.displayname(sender_user)?, - avatar_url: services().users.avatar_url(sender_user)?, - is_direct: Some(body.is_direct), - third_party_invite: None, - blurhash: services().users.blurhash(sender_user)?, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: services().users.displayname(sender_user)?, + avatar_url: services().users.avatar_url(sender_user)?, + is_direct: Some(body.is_direct), + third_party_invite: None, + blurhash: services().users.blurhash(sender_user)?, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_user.to_string()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 3. Power levels @@ -245,30 +253,14 @@ pub async fn create_room_route( } } - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&power_levels_content) - .expect("to_raw_value always works on serde_json::Value"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; - - // 4. Canonical room alias - if let Some(room_alias_id) = &alias { - services().rooms.timeline.build_and_append_pdu( + services() + .rooms + .timeline + .build_and_append_pdu( PduBuilder { - event_type: RoomEventType::RoomCanonicalAlias, - content: to_raw_value(&RoomCanonicalAliasEventContent { - alias: Some(room_alias_id.to_owned()), - alt_aliases: vec![], - }) - .expect("We checked that alias earlier, it must be fine"), + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&power_levels_content) + .expect("to_raw_value always works on serde_json::Value"), unsigned: None, state_key: Some("".to_owned()), redacts: None, @@ -276,64 +268,100 @@ pub async fn create_room_route( sender_user, &room_id, &state_lock, - )?; + ) + .await?; + + // 4. Canonical room alias + if let Some(room_alias_id) = &alias { + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCanonicalAlias, + content: to_raw_value(&RoomCanonicalAliasEventContent { + alias: Some(room_alias_id.to_owned()), + alt_aliases: vec![], + }) + .expect("We checked that alias earlier, it must be fine"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } // 5. Events set by preset // 5.1 Join Rules - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomJoinRules, - content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { - RoomPreset::PublicChat => JoinRule::Public, - // according to spec "invite" is the default - _ => JoinRule::Invite, - })) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomJoinRules, + content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { + RoomPreset::PublicChat => JoinRule::Public, + // according to spec "invite" is the default + _ => JoinRule::Invite, + })) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 5.2 History Visibility - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomHistoryVisibility, - content: to_raw_value(&RoomHistoryVisibilityEventContent::new( - HistoryVisibility::Shared, - )) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomHistoryVisibility, + content: to_raw_value(&RoomHistoryVisibilityEventContent::new( + HistoryVisibility::Shared, + )) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 5.3 Guest Access - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomGuestAccess, - content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { - RoomPreset::PublicChat => GuestAccess::Forbidden, - _ => GuestAccess::CanJoin, - })) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomGuestAccess, + content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { + RoomPreset::PublicChat => GuestAccess::Forbidden, + _ => GuestAccess::CanJoin, + })) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; // 6. Events listed in initial_state for event in &body.initial_state { @@ -352,47 +380,54 @@ pub async fn create_room_route( continue; } - services().rooms.timeline.build_and_append_pdu( - pdu_builder, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock) + .await?; } // 7. Events implied by name and topic if let Some(name) = &body.name { - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomName, - content: to_raw_value(&RoomNameEventContent::new(Some(name.clone()))) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomName, + content: to_raw_value(&RoomNameEventContent::new(Some(name.clone()))) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } if let Some(topic) = &body.topic { - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomTopic, - content: to_raw_value(&RoomTopicEventContent { - topic: topic.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomTopic, + content: to_raw_value(&RoomTopicEventContent { + topic: topic.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &room_id, + &state_lock, + ) + .await?; } // 8. Events implied by invite (and TODO: invite_3pid) @@ -523,22 +558,26 @@ pub async fn upgrade_room_route( // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions - let tombstone_event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomTombstone, - content: to_raw_value(&RoomTombstoneEventContent { - body: "This room has been replaced".to_owned(), - replacement_room: replacement_room.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let tombstone_event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomTombstone, + content: to_raw_value(&RoomTombstoneEventContent { + body: "This room has been replaced".to_owned(), + replacement_room: replacement_room.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; // Change lock to replacement room drop(state_lock); @@ -605,43 +644,51 @@ pub async fn upgrade_room_route( )); } - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCreate, - content: to_raw_value(&create_event_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCreate, + content: to_raw_value(&create_event_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; // Join the new room - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: services().users.displayname(sender_user)?, - avatar_url: services().users.avatar_url(sender_user)?, - is_direct: None, - third_party_invite: None, - blurhash: services().users.blurhash(sender_user)?, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: services().users.displayname(sender_user)?, + avatar_url: services().users.avatar_url(sender_user)?, + is_direct: None, + third_party_invite: None, + blurhash: services().users.blurhash(sender_user)?, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_user.to_string()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; // Recommended transferable state events list from the specs let transferable_state_events = vec![ @@ -668,18 +715,22 @@ pub async fn upgrade_room_route( None => continue, // Skipping missing events. }; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: event_type.to_string().into(), - content: event_content, - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &replacement_room, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: event_type.to_string().into(), + content: event_content, + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &replacement_room, + &state_lock, + ) + .await?; } // Moves any local aliases to the new room @@ -713,19 +764,23 @@ pub async fn upgrade_room_route( power_levels_event_content.invite = new_level; // Modify the power levels in the old room to prevent sending of events and inviting new users - let _ = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&power_levels_event_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_user, - &body.room_id, - &state_lock, - )?; + let _ = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&power_levels_event_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_user, + &body.room_id, + &state_lock, + ) + .await?; drop(state_lock); diff --git a/src/api/client_server/state.rs b/src/api/client_server/state.rs index d9c14648..12af5199 100644 --- a/src/api/client_server/state.rs +++ b/src/api/client_server/state.rs @@ -287,18 +287,22 @@ async fn send_state_event_for_key_helper( ); let state_lock = mutex_state.lock().await; - let event_id = services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: event_type.to_string().into(), - content: serde_json::from_str(json.json().get()).expect("content is valid json"), - unsigned: None, - state_key: Some(state_key), - redacts: None, - }, - sender_user, - room_id, - &state_lock, - )?; + let event_id = services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: event_type.to_string().into(), + content: serde_json::from_str(json.json().get()).expect("content is valid json"), + unsigned: None, + state_key: Some(state_key), + redacts: None, + }, + sender_user, + room_id, + &state_lock, + ) + .await?; Ok(event_id) } diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 568a23ce..07f8400c 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -231,7 +231,7 @@ async fn sync_helper( .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } @@ -847,7 +847,7 @@ async fn sync_helper( .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } @@ -979,7 +979,7 @@ async fn sync_helper( .entry(room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; drop(insert_lock); } diff --git a/src/api/server_server.rs b/src/api/server_server.rs index fc3e2c0f..72f6cae7 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1615,14 +1615,18 @@ pub async fn create_invite_route( .state_cache .server_in_room(services().globals.server_name(), &body.room_id)? { - services().rooms.state_cache.update_membership( - &body.room_id, - &invited_user, - MembershipState::Invite, - &sender, - Some(invite_state), - true, - )?; + services() + .rooms + .state_cache + .update_membership( + &body.room_id, + &invited_user, + RoomMemberEventContent::new(MembershipState::Invite), + &sender, + Some(invite_state), + true, + ) + .await?; } Ok(create_invite::v2::Response { diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 77f351a9..2df3180f 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -26,7 +26,7 @@ use ruma::{ EventId, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use serde_json::value::to_raw_value; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex}; use crate::{ api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH}, @@ -206,26 +206,6 @@ impl Service { .expect("Database data for admin room alias must be valid") .expect("Admin room must exist"); - let send_message = |message: RoomMessageEventContent, mutex_lock: &MutexGuard<'_, ()>| { - services() - .rooms - .timeline - .build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMessage, - content: to_raw_value(&message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - mutex_lock, - ) - .unwrap(); - }; - loop { tokio::select! { Some(event) = receiver.recv() => { @@ -245,7 +225,20 @@ impl Service { let state_lock = mutex_state.lock().await; - send_message(message_content, &state_lock); + services().rooms.timeline.build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMessage, + content: to_raw_value(&message_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &state_lock) + .await + .unwrap(); drop(state_lock); } @@ -853,164 +846,202 @@ impl Service { content.room_version = services().globals.default_room_version(); // 1. The room create event - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCreate, - content: to_raw_value(&content).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCreate, + content: to_raw_value(&content).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 2. Make conduit bot join - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: None, - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(conduit_user.to_string()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: None, + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(conduit_user.to_string()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 3. Power levels let mut users = BTreeMap::new(); users.insert(conduit_user.clone(), 100.into()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&RoomPowerLevelsEventContent { - users, - ..Default::default() - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&RoomPowerLevelsEventContent { + users, + ..Default::default() + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.1 Join Rules - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomJoinRules, - content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomJoinRules, + content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.2 History Visibility - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomHistoryVisibility, - content: to_raw_value(&RoomHistoryVisibilityEventContent::new( - HistoryVisibility::Shared, - )) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomHistoryVisibility, + content: to_raw_value(&RoomHistoryVisibilityEventContent::new( + HistoryVisibility::Shared, + )) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 4.3 Guest Access - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomGuestAccess, - content: to_raw_value(&RoomGuestAccessEventContent::new(GuestAccess::Forbidden)) + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomGuestAccess, + content: to_raw_value(&RoomGuestAccessEventContent::new( + GuestAccess::Forbidden, + )) .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 5. Events implied by name and topic let room_name = format!("{} Admin Room", services().globals.server_name()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomName, - content: to_raw_value(&RoomNameEventContent::new(Some(room_name))) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomName, + content: to_raw_value(&RoomNameEventContent::new(Some(room_name))) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomTopic, - content: to_raw_value(&RoomTopicEventContent { - topic: format!("Manage {}", services().globals.server_name()), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomTopic, + content: to_raw_value(&RoomTopicEventContent { + topic: format!("Manage {}", services().globals.server_name()), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // 6. Room alias let alias: OwnedRoomAliasId = format!("#admins:{}", services().globals.server_name()) .try_into() .expect("#admins:server_name is a valid alias name"); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomCanonicalAlias, - content: to_raw_value(&RoomCanonicalAliasEventContent { - alias: Some(alias.clone()), - alt_aliases: Vec::new(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomCanonicalAlias, + content: to_raw_value(&RoomCanonicalAliasEventContent { + alias: Some(alias.clone()), + alt_aliases: Vec::new(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; services().rooms.alias.set_alias(&alias, &room_id)?; @@ -1052,72 +1083,84 @@ impl Service { .expect("@conduit:server_name is valid"); // Invite and join the real user - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Invite, - displayname: None, - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: Some(displayname), - avatar_url: None, - is_direct: None, - third_party_invite: None, - blurhash: None, - reason: None, - join_authorized_via_users_server: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - user_id, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Invite, + displayname: None, + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomMember, + content: to_raw_value(&RoomMemberEventContent { + membership: MembershipState::Join, + displayname: Some(displayname), + avatar_url: None, + is_direct: None, + third_party_invite: None, + blurhash: None, + reason: None, + join_authorized_via_users_server: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + user_id, + &room_id, + &state_lock, + ) + .await?; // Set power level let mut users = BTreeMap::new(); users.insert(conduit_user.to_owned(), 100.into()); users.insert(user_id.to_owned(), 100.into()); - services().rooms.timeline.build_and_append_pdu( - PduBuilder { - event_type: RoomEventType::RoomPowerLevels, - content: to_raw_value(&RoomPowerLevelsEventContent { - users, - ..Default::default() - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &conduit_user, - &room_id, - &state_lock, - )?; + services() + .rooms + .timeline + .build_and_append_pdu( + PduBuilder { + event_type: RoomEventType::RoomPowerLevels, + content: to_raw_value(&RoomPowerLevelsEventContent { + users, + ..Default::default() + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &conduit_user, + &room_id, + &state_lock, + ) + .await?; // Send welcome message services().rooms.timeline.build_and_append_pdu( @@ -1135,7 +1178,7 @@ impl Service { &conduit_user, &room_id, &state_lock, - )?; + ).await?; Ok(()) } diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index c0fcb4bd..fc5268da 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -52,7 +52,7 @@ pub struct Service { pub bad_signature_ratelimiter: Arc, RateLimitState>>>, pub servername_ratelimiter: Arc>>>, pub sync_receivers: RwLock>, - pub roomid_mutex_insert: RwLock>>>, + pub roomid_mutex_insert: RwLock>>>, pub roomid_mutex_state: RwLock>>>, pub roomid_mutex_federation: RwLock>>>, // this lock will be held longer pub roomid_federationhandletime: RwLock>, diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 7531674b..7180008d 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -801,14 +801,18 @@ impl Service { .map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed."))?; if soft_fail { - services().rooms.timeline.append_incoming_pdu( - &incoming_pdu, - val, - extremities.iter().map(|e| (**e).to_owned()).collect(), - state_ids_compressed, - soft_fail, - &state_lock, - )?; + services() + .rooms + .timeline + .append_incoming_pdu( + &incoming_pdu, + val, + extremities.iter().map(|e| (**e).to_owned()).collect(), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .await?; // Soft fail, we keep the event as an outlier but don't add it to the timeline warn!("Event was soft failed: {:?}", incoming_pdu); @@ -1004,14 +1008,18 @@ impl Service { // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. - let pdu_id = services().rooms.timeline.append_incoming_pdu( - &incoming_pdu, - val, - extremities.iter().map(|e| (**e).to_owned()).collect(), - state_ids_compressed, - soft_fail, - &state_lock, - )?; + let pdu_id = services() + .rooms + .timeline + .append_incoming_pdu( + &incoming_pdu, + val, + extremities.iter().map(|e| (**e).to_owned()).collect(), + state_ids_compressed, + soft_fail, + &state_lock, + ) + .await?; info!("Appended incoming pdu"); diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 3072b80f..b7a2cb79 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -7,14 +7,13 @@ use std::{ pub use data::Data; use ruma::{ events::{ - room::{create::RoomCreateEventContent, member::MembershipState}, + room::{create::RoomCreateEventContent, member::RoomMemberEventContent}, AnyStrippedStateEvent, RoomEventType, StateEventType, }, serde::Raw, state_res::{self, StateMap}, EventId, OwnedEventId, RoomId, RoomVersionId, UserId, }; -use serde::Deserialize; use tokio::sync::MutexGuard; use tracing::warn; @@ -60,15 +59,11 @@ impl Service { Err(_) => continue, }; - #[derive(Deserialize)] - struct ExtractMembership { - membership: MembershipState, - } - - let membership = match serde_json::from_str::(pdu.content.get()) { - Ok(e) => e.membership, - Err(_) => continue, - }; + let membership_event = + match serde_json::from_str::(pdu.content.get()) { + Ok(e) => e, + Err(_) => continue, + }; let state_key = match pdu.state_key { Some(k) => k, @@ -80,14 +75,18 @@ impl Service { Err(_) => continue, }; - services().rooms.state_cache.update_membership( - room_id, - &user_id, - membership, - &pdu.sender, - None, - false, - )?; + services() + .rooms + .state_cache + .update_membership( + room_id, + &user_id, + membership_event, + &pdu.sender, + None, + false, + ) + .await?; } services().rooms.state_cache.update_joined_count(room_id)?; diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 32afdd4e..dfbe5db7 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -4,10 +4,14 @@ use std::{collections::HashSet, sync::Arc}; pub use data::Data; use ruma::{ + api::federation::{self, query::get_profile_information::v1::ProfileField}, events::{ direct::DirectEvent, ignored_user_list::IgnoredUserListEvent, - room::{create::RoomCreateEventContent, member::MembershipState}, + room::{ + create::RoomCreateEventContent, + member::{MembershipState, RoomMemberEventContent}, + }, AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, }, @@ -24,19 +28,43 @@ pub struct Service { impl Service { /// Update current membership data. #[tracing::instrument(skip(self, last_state))] - pub fn update_membership( + pub async fn update_membership( &self, room_id: &RoomId, user_id: &UserId, - membership: MembershipState, + membership_event: RoomMemberEventContent, sender: &UserId, last_state: Option>>, update_joined_count: bool, ) -> Result<()> { + let membership = membership_event.membership; // Keep track what remote users exist by adding them as "deactivated" users if user_id.server_name() != services().globals.server_name() { services().users.create(user_id, None)?; - // TODO: displayname, avatar url + // Try to update our local copy of the user if ours does not match + if ((services().users.displayname(user_id)? != membership_event.displayname) + || (services().users.avatar_url(user_id)? != membership_event.avatar_url) + || (services().users.blurhash(user_id)? != membership_event.blurhash)) + && (membership != MembershipState::Leave) + { + let response = services() + .sending + .send_federation_request( + user_id.server_name(), + federation::query::get_profile_information::v1::Request { + user_id: user_id.into(), + field: Some(ProfileField::AvatarUrl), + }, + ) + .await?; + let _ = services() + .users + .set_displayname(user_id, response.displayname.clone()); + let _ = services() + .users + .set_avatar_url(user_id, response.avatar_url); + let _ = services().users.set_blurhash(user_id, response.blurhash); + }; } match &membership { diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 34399d46..d34e80f9 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -15,7 +15,8 @@ use ruma::{ events::{ push_rules::PushRulesEvent, room::{ - create::RoomCreateEventContent, member::MembershipState, + create::RoomCreateEventContent, + member::{MembershipState, RoomMemberEventContent}, power_levels::RoomPowerLevelsEventContent, }, GlobalAccountDataEventType, RoomEventType, StateEventType, @@ -145,7 +146,7 @@ impl Service { /// /// Returns pdu id #[tracing::instrument(skip(self, pdu, pdu_json, leaves))] - pub fn append_pdu<'a>( + pub async fn append_pdu<'a>( &self, pdu: &PduEvent, mut pdu_json: CanonicalJsonObject, @@ -211,7 +212,7 @@ impl Service { .entry(pdu.room_id.clone()) .or_default(), ); - let insert_lock = mutex_insert.lock().unwrap(); + let insert_lock = mutex_insert.lock().await; let count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if appending @@ -323,16 +324,11 @@ impl Service { } RoomEventType::RoomMember => { if let Some(state_key) = &pdu.state_key { - #[derive(Deserialize)] - struct ExtractMembership { - membership: MembershipState, - } - // if the state_key fails let target_user_id = UserId::parse(state_key.clone()) .expect("This state_key was previously validated"); - let content = serde_json::from_str::(pdu.content.get()) + let content = serde_json::from_str::(pdu.content.get()) .map_err(|_| Error::bad_database("Invalid content in pdu."))?; let invite_state = match content.membership { @@ -345,14 +341,18 @@ impl Service { // Update our membership info, we do this here incase a user is invited // and immediately leaves we need the DB to record the invite event for auth - services().rooms.state_cache.update_membership( - &pdu.room_id, - &target_user_id, - content.membership, - &pdu.sender, - invite_state, - true, - )?; + services() + .rooms + .state_cache + .update_membership( + &pdu.room_id, + &target_user_id, + content, + &pdu.sender, + invite_state, + true, + ) + .await?; } } RoomEventType::RoomMessage => { @@ -673,7 +673,7 @@ impl Service { /// Creates a new persisted data unit and adds it to a room. This function takes a /// roomid_mutex_state, meaning that only this function is able to mutate the room state. #[tracing::instrument(skip(self, state_lock))] - pub fn build_and_append_pdu( + pub async fn build_and_append_pdu( &self, pdu_builder: PduBuilder, sender: &UserId, @@ -687,14 +687,16 @@ impl Service { // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = services().rooms.state.append_to_state(&pdu)?; - let pdu_id = self.append_pdu( - &pdu, - pdu_json, - // Since this PDU references all pdu_leaves we can update the leaves - // of the room - vec![(*pdu.event_id).to_owned()], - state_lock, - )?; + let pdu_id = self + .append_pdu( + &pdu, + pdu_json, + // Since this PDU references all pdu_leaves we can update the leaves + // of the room + vec![(*pdu.event_id).to_owned()], + state_lock, + ) + .await?; // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist @@ -732,7 +734,7 @@ impl Service { /// Append the incoming event setting the state snapshot to the state from the /// server that sent the event. #[tracing::instrument(skip_all)] - pub fn append_incoming_pdu<'a>( + pub async fn append_incoming_pdu<'a>( &self, pdu: &PduEvent, pdu_json: CanonicalJsonObject, @@ -762,11 +764,11 @@ impl Service { return Ok(None); } - let pdu_id = - services() - .rooms - .timeline - .append_pdu(pdu, pdu_json, new_room_leaves, state_lock)?; + let pdu_id = services() + .rooms + .timeline + .append_pdu(pdu, pdu_json, new_room_leaves, state_lock) + .await?; Ok(Some(pdu_id)) }