mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-19 04:30:10 +00:00
Don't crash when a room errors
This commit is contained in:
parent
63f787f635
commit
42b12934e3
@ -2,7 +2,14 @@ use crate::{service::rooms::timeline::PduCount, services, Error, Result, Ruma, R
|
|||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::{
|
api::client::{
|
||||||
filter::{FilterDefinition, LazyLoadOptions},
|
filter::{FilterDefinition, LazyLoadOptions},
|
||||||
sync::sync_events::{self, DeviceLists, UnreadNotificationsCount},
|
sync::sync_events::{
|
||||||
|
self,
|
||||||
|
v3::{
|
||||||
|
Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
|
||||||
|
LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
|
||||||
|
},
|
||||||
|
DeviceLists, UnreadNotificationsCount,
|
||||||
|
},
|
||||||
uiaa::UiaaResponse,
|
uiaa::UiaaResponse,
|
||||||
},
|
},
|
||||||
events::{
|
events::{
|
||||||
@ -10,7 +17,7 @@ use ruma::{
|
|||||||
RoomEventType, StateEventType,
|
RoomEventType, StateEventType,
|
||||||
},
|
},
|
||||||
serde::Raw,
|
serde::Raw,
|
||||||
OwnedDeviceId, OwnedUserId, RoomId, UserId,
|
DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
|
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
|
||||||
@ -160,11 +167,6 @@ async fn sync_helper(
|
|||||||
body: sync_events::v3::Request,
|
body: sync_events::v3::Request,
|
||||||
// bool = caching allowed
|
// bool = caching allowed
|
||||||
) -> Result<(sync_events::v3::Response, bool), Error> {
|
) -> Result<(sync_events::v3::Response, bool), Error> {
|
||||||
use sync_events::v3::{
|
|
||||||
Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
|
|
||||||
Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: match body.set_presence {
|
// TODO: match body.set_presence {
|
||||||
services().rooms.edus.presence.ping_presence(&sender_user)?;
|
services().rooms.edus.presence.ping_presence(&sender_user)?;
|
||||||
|
|
||||||
@ -192,6 +194,8 @@ async fn sync_helper(
|
|||||||
_ => (false, false),
|
_ => (false, false),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let full_state = body.full_state;
|
||||||
|
|
||||||
let mut joined_rooms = BTreeMap::new();
|
let mut joined_rooms = BTreeMap::new();
|
||||||
let since = body
|
let since = body
|
||||||
.since
|
.since
|
||||||
@ -220,7 +224,347 @@ async fn sync_helper(
|
|||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
for room_id in all_joined_rooms {
|
for room_id in all_joined_rooms {
|
||||||
let room_id = room_id?;
|
let room_id = room_id?;
|
||||||
|
if let Ok(joined_room) = load_joined_room(
|
||||||
|
&sender_user,
|
||||||
|
&sender_device,
|
||||||
|
&room_id,
|
||||||
|
since,
|
||||||
|
sincecount,
|
||||||
|
next_batch,
|
||||||
|
next_batchcount,
|
||||||
|
lazy_load_enabled,
|
||||||
|
lazy_load_send_redundant,
|
||||||
|
full_state,
|
||||||
|
&mut device_list_updates,
|
||||||
|
&mut left_encrypted_users,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
if !joined_room.is_empty() {
|
||||||
|
joined_rooms.insert(room_id.clone(), joined_room);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take presence updates from this room
|
||||||
|
for (user_id, presence) in services()
|
||||||
|
.rooms
|
||||||
|
.edus
|
||||||
|
.presence
|
||||||
|
.presence_since(&room_id, since)?
|
||||||
|
{
|
||||||
|
match presence_updates.entry(user_id) {
|
||||||
|
Entry::Vacant(v) => {
|
||||||
|
v.insert(presence);
|
||||||
|
}
|
||||||
|
Entry::Occupied(mut o) => {
|
||||||
|
let p = o.get_mut();
|
||||||
|
|
||||||
|
// Update existing presence event with more info
|
||||||
|
p.content.presence = presence.content.presence;
|
||||||
|
if let Some(status_msg) = presence.content.status_msg {
|
||||||
|
p.content.status_msg = Some(status_msg);
|
||||||
|
}
|
||||||
|
if let Some(last_active_ago) = presence.content.last_active_ago {
|
||||||
|
p.content.last_active_ago = Some(last_active_ago);
|
||||||
|
}
|
||||||
|
if let Some(displayname) = presence.content.displayname {
|
||||||
|
p.content.displayname = Some(displayname);
|
||||||
|
}
|
||||||
|
if let Some(avatar_url) = presence.content.avatar_url {
|
||||||
|
p.content.avatar_url = Some(avatar_url);
|
||||||
|
}
|
||||||
|
if let Some(currently_active) = presence.content.currently_active {
|
||||||
|
p.content.currently_active = Some(currently_active);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut left_rooms = BTreeMap::new();
|
||||||
|
let all_left_rooms: Vec<_> = services()
|
||||||
|
.rooms
|
||||||
|
.state_cache
|
||||||
|
.rooms_left(&sender_user)
|
||||||
|
.collect();
|
||||||
|
for result in all_left_rooms {
|
||||||
|
let (room_id, _) = result?;
|
||||||
|
|
||||||
|
let mut left_state_events = Vec::new();
|
||||||
|
|
||||||
|
{
|
||||||
|
// Get and drop the lock to wait for remaining operations to finish
|
||||||
|
let mutex_insert = Arc::clone(
|
||||||
|
services()
|
||||||
|
.globals
|
||||||
|
.roomid_mutex_insert
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.entry(room_id.clone())
|
||||||
|
.or_default(),
|
||||||
|
);
|
||||||
|
let insert_lock = mutex_insert.lock().unwrap();
|
||||||
|
drop(insert_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
let left_count = services()
|
||||||
|
.rooms
|
||||||
|
.state_cache
|
||||||
|
.get_left_count(&room_id, &sender_user)?;
|
||||||
|
|
||||||
|
// Left before last sync
|
||||||
|
if Some(since) >= left_count {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !services().rooms.metadata.exists(&room_id)? {
|
||||||
|
// This is just a rejected invite, not a room we know
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let since_shortstatehash = services()
|
||||||
|
.rooms
|
||||||
|
.user
|
||||||
|
.get_token_shortstatehash(&room_id, since)?;
|
||||||
|
|
||||||
|
let since_state_ids = match since_shortstatehash {
|
||||||
|
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
|
||||||
|
None => HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
|
||||||
|
&room_id,
|
||||||
|
&StateEventType::RoomMember,
|
||||||
|
sender_user.as_str(),
|
||||||
|
)? {
|
||||||
|
Some(e) => e,
|
||||||
|
None => {
|
||||||
|
error!("Left room but no left state event");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let left_shortstatehash = match services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.pdu_shortstatehash(&left_event_id)?
|
||||||
|
{
|
||||||
|
Some(s) => s,
|
||||||
|
None => {
|
||||||
|
error!("Leave event has no state");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut left_state_ids = services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.state_full_ids(left_shortstatehash)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let leave_shortstatekey = services()
|
||||||
|
.rooms
|
||||||
|
.short
|
||||||
|
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
|
||||||
|
|
||||||
|
left_state_ids.insert(leave_shortstatekey, left_event_id);
|
||||||
|
|
||||||
|
let mut i = 0;
|
||||||
|
for (key, id) in left_state_ids {
|
||||||
|
if full_state || since_state_ids.get(&key) != Some(&id) {
|
||||||
|
let (event_type, state_key) =
|
||||||
|
services().rooms.short.get_statekey_from_short(key)?;
|
||||||
|
|
||||||
|
if !lazy_load_enabled
|
||||||
|
|| event_type != StateEventType::RoomMember
|
||||||
|
|| full_state
|
||||||
|
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|
||||||
|
|| *sender_user == state_key
|
||||||
|
{
|
||||||
|
let pdu = match services().rooms.timeline.get_pdu(&id)? {
|
||||||
|
Some(pdu) => pdu,
|
||||||
|
None => {
|
||||||
|
error!("Pdu in state not found: {}", id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
left_state_events.push(pdu.to_sync_state_event());
|
||||||
|
|
||||||
|
i += 1;
|
||||||
|
if i % 100 == 0 {
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
left_rooms.insert(
|
||||||
|
room_id.clone(),
|
||||||
|
LeftRoom {
|
||||||
|
account_data: RoomAccountData { events: Vec::new() },
|
||||||
|
timeline: Timeline {
|
||||||
|
limited: false,
|
||||||
|
prev_batch: Some(next_batch_string.clone()),
|
||||||
|
events: Vec::new(),
|
||||||
|
},
|
||||||
|
state: State {
|
||||||
|
events: left_state_events,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut invited_rooms = BTreeMap::new();
|
||||||
|
let all_invited_rooms: Vec<_> = services()
|
||||||
|
.rooms
|
||||||
|
.state_cache
|
||||||
|
.rooms_invited(&sender_user)
|
||||||
|
.collect();
|
||||||
|
for result in all_invited_rooms {
|
||||||
|
let (room_id, invite_state_events) = result?;
|
||||||
|
|
||||||
|
{
|
||||||
|
// Get and drop the lock to wait for remaining operations to finish
|
||||||
|
let mutex_insert = Arc::clone(
|
||||||
|
services()
|
||||||
|
.globals
|
||||||
|
.roomid_mutex_insert
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.entry(room_id.clone())
|
||||||
|
.or_default(),
|
||||||
|
);
|
||||||
|
let insert_lock = mutex_insert.lock().unwrap();
|
||||||
|
drop(insert_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
let invite_count = services()
|
||||||
|
.rooms
|
||||||
|
.state_cache
|
||||||
|
.get_invite_count(&room_id, &sender_user)?;
|
||||||
|
|
||||||
|
// Invited before last sync
|
||||||
|
if Some(since) >= invite_count {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
invited_rooms.insert(
|
||||||
|
room_id.clone(),
|
||||||
|
InvitedRoom {
|
||||||
|
invite_state: InviteState {
|
||||||
|
events: invite_state_events,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
for user_id in left_encrypted_users {
|
||||||
|
let still_share_encrypted_room = services()
|
||||||
|
.rooms
|
||||||
|
.user
|
||||||
|
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
|
||||||
|
.filter_map(|r| r.ok())
|
||||||
|
.filter_map(|other_room_id| {
|
||||||
|
Some(
|
||||||
|
services()
|
||||||
|
.rooms
|
||||||
|
.state_accessor
|
||||||
|
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
|
||||||
|
.ok()?
|
||||||
|
.is_some(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.all(|encrypted| !encrypted);
|
||||||
|
// If the user doesn't share an encrypted room with the target anymore, we need to tell
|
||||||
|
// them
|
||||||
|
if still_share_encrypted_room {
|
||||||
|
device_list_left.insert(user_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove all to-device events the device received *last time*
|
||||||
|
services()
|
||||||
|
.users
|
||||||
|
.remove_to_device_events(&sender_user, &sender_device, since)?;
|
||||||
|
|
||||||
|
let response = sync_events::v3::Response {
|
||||||
|
next_batch: next_batch_string,
|
||||||
|
rooms: Rooms {
|
||||||
|
leave: left_rooms,
|
||||||
|
join: joined_rooms,
|
||||||
|
invite: invited_rooms,
|
||||||
|
knock: BTreeMap::new(), // TODO
|
||||||
|
},
|
||||||
|
presence: Presence {
|
||||||
|
events: presence_updates
|
||||||
|
.into_values()
|
||||||
|
.map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
|
||||||
|
.collect(),
|
||||||
|
},
|
||||||
|
account_data: GlobalAccountData {
|
||||||
|
events: services()
|
||||||
|
.account_data
|
||||||
|
.changes_since(None, &sender_user, since)?
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|(_, v)| {
|
||||||
|
serde_json::from_str(v.json().get())
|
||||||
|
.map_err(|_| Error::bad_database("Invalid account event in database."))
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
},
|
||||||
|
device_lists: DeviceLists {
|
||||||
|
changed: device_list_updates.into_iter().collect(),
|
||||||
|
left: device_list_left.into_iter().collect(),
|
||||||
|
},
|
||||||
|
device_one_time_keys_count: services()
|
||||||
|
.users
|
||||||
|
.count_one_time_keys(&sender_user, &sender_device)?,
|
||||||
|
to_device: ToDevice {
|
||||||
|
events: services()
|
||||||
|
.users
|
||||||
|
.get_to_device_events(&sender_user, &sender_device)?,
|
||||||
|
},
|
||||||
|
// Fallback keys are not yet supported
|
||||||
|
device_unused_fallback_key_types: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: Retry the endpoint instead of returning (waiting for #118)
|
||||||
|
if !full_state
|
||||||
|
&& response.rooms.is_empty()
|
||||||
|
&& response.presence.is_empty()
|
||||||
|
&& response.account_data.is_empty()
|
||||||
|
&& response.device_lists.is_empty()
|
||||||
|
&& response.to_device.is_empty()
|
||||||
|
{
|
||||||
|
// Hang a few seconds so requests are not spammed
|
||||||
|
// Stop hanging if new info arrives
|
||||||
|
let mut duration = body.timeout.unwrap_or_default();
|
||||||
|
if duration.as_secs() > 30 {
|
||||||
|
duration = Duration::from_secs(30);
|
||||||
|
}
|
||||||
|
let _ = tokio::time::timeout(duration, watcher).await;
|
||||||
|
Ok((response, false))
|
||||||
|
} else {
|
||||||
|
Ok((response, since != next_batch)) // Only cache if we made progress
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_joined_room(
|
||||||
|
sender_user: &UserId,
|
||||||
|
sender_device: &DeviceId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
since: u64,
|
||||||
|
sincecount: PduCount,
|
||||||
|
next_batch: u64,
|
||||||
|
next_batchcount: PduCount,
|
||||||
|
lazy_load_enabled: bool,
|
||||||
|
lazy_load_send_redundant: bool,
|
||||||
|
full_state: bool,
|
||||||
|
device_list_updates: &mut HashSet<OwnedUserId>,
|
||||||
|
left_encrypted_users: &mut HashSet<OwnedUserId>,
|
||||||
|
) -> Result<JoinedRoom> {
|
||||||
{
|
{
|
||||||
// Get and drop the lock to wait for remaining operations to finish
|
// Get and drop the lock to wait for remaining operations to finish
|
||||||
// This will make sure the we have all events until next_batch
|
// This will make sure the we have all events until next_batch
|
||||||
@ -230,7 +574,7 @@ async fn sync_helper(
|
|||||||
.roomid_mutex_insert
|
.roomid_mutex_insert
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.entry(room_id.clone())
|
.entry(room_id.to_owned())
|
||||||
.or_default(),
|
.or_default(),
|
||||||
);
|
);
|
||||||
let insert_lock = mutex_insert.lock().unwrap();
|
let insert_lock = mutex_insert.lock().unwrap();
|
||||||
@ -301,7 +645,7 @@ async fn sync_helper(
|
|||||||
s
|
s
|
||||||
} else {
|
} else {
|
||||||
error!("Room {} has no state", room_id);
|
error!("Room {} has no state", room_id);
|
||||||
continue;
|
return Err(Error::BadDatabase("Room has no state"));
|
||||||
};
|
};
|
||||||
|
|
||||||
let since_shortstatehash = services()
|
let since_shortstatehash = services()
|
||||||
@ -336,15 +680,14 @@ async fn sync_helper(
|
|||||||
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
|
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
|
||||||
.filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
|
.filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
|
||||||
.map(|(_, pdu)| {
|
.map(|(_, pdu)| {
|
||||||
let content: RoomMemberEventContent =
|
let content: RoomMemberEventContent = serde_json::from_str(pdu.content.get())
|
||||||
serde_json::from_str(pdu.content.get()).map_err(|_| {
|
.map_err(|_| {
|
||||||
Error::bad_database("Invalid member event in database.")
|
Error::bad_database("Invalid member event in database.")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if let Some(state_key) = &pdu.state_key {
|
if let Some(state_key) = &pdu.state_key {
|
||||||
let user_id = UserId::parse(state_key.clone()).map_err(|_| {
|
let user_id = UserId::parse(state_key.clone())
|
||||||
Error::bad_database("Invalid UserId in member PDU.")
|
.map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?;
|
||||||
})?;
|
|
||||||
|
|
||||||
// The membership was and still is invite or join
|
// The membership was and still is invite or join
|
||||||
if matches!(
|
if matches!(
|
||||||
@ -406,13 +749,8 @@ async fn sync_helper(
|
|||||||
let joined_since_last_sync =
|
let joined_since_last_sync =
|
||||||
since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
|
since_sender_member.map_or(true, |member| member.membership != MembershipState::Join);
|
||||||
|
|
||||||
let (
|
let (heroes, joined_member_count, invited_member_count, joined_since_last_sync, state_events) =
|
||||||
heroes,
|
if since_shortstatehash.is_none() || joined_since_last_sync {
|
||||||
joined_member_count,
|
|
||||||
invited_member_count,
|
|
||||||
joined_since_last_sync,
|
|
||||||
state_events,
|
|
||||||
) = if since_shortstatehash.is_none() || joined_since_last_sync {
|
|
||||||
// Probably since = 0, we will do an initial sync
|
// Probably since = 0, we will do an initial sync
|
||||||
|
|
||||||
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
|
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
|
||||||
@ -448,7 +786,7 @@ async fn sync_helper(
|
|||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
} else if !lazy_load_enabled
|
} else if !lazy_load_enabled
|
||||||
|| body.full_state
|
|| full_state
|
||||||
|| timeline_users.contains(&state_key)
|
|| timeline_users.contains(&state_key)
|
||||||
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|
||||||
|| *sender_user == state_key
|
|| *sender_user == state_key
|
||||||
@ -521,7 +859,7 @@ async fn sync_helper(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
for (key, id) in current_state_ids {
|
for (key, id) in current_state_ids {
|
||||||
if body.full_state || since_state_ids.get(&key) != Some(&id) {
|
if full_state || since_state_ids.get(&key) != Some(&id) {
|
||||||
let pdu = match services().rooms.timeline.get_pdu(&id)? {
|
let pdu = match services().rooms.timeline.get_pdu(&id)? {
|
||||||
Some(pdu) => pdu,
|
Some(pdu) => pdu,
|
||||||
None => {
|
None => {
|
||||||
@ -748,7 +1086,7 @@ async fn sync_helper(
|
|||||||
current_shortstatehash,
|
current_shortstatehash,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let joined_room = JoinedRoom {
|
Ok(JoinedRoom {
|
||||||
account_data: RoomAccountData {
|
account_data: RoomAccountData {
|
||||||
events: services()
|
events: services()
|
||||||
.account_data
|
.account_data
|
||||||
@ -783,316 +1121,7 @@ async fn sync_helper(
|
|||||||
},
|
},
|
||||||
ephemeral: Ephemeral { events: edus },
|
ephemeral: Ephemeral { events: edus },
|
||||||
unread_thread_notifications: BTreeMap::new(),
|
unread_thread_notifications: BTreeMap::new(),
|
||||||
};
|
|
||||||
|
|
||||||
if !joined_room.is_empty() {
|
|
||||||
joined_rooms.insert(room_id.clone(), joined_room);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Take presence updates from this room
|
|
||||||
for (user_id, presence) in services()
|
|
||||||
.rooms
|
|
||||||
.edus
|
|
||||||
.presence
|
|
||||||
.presence_since(&room_id, since)?
|
|
||||||
{
|
|
||||||
match presence_updates.entry(user_id) {
|
|
||||||
Entry::Vacant(v) => {
|
|
||||||
v.insert(presence);
|
|
||||||
}
|
|
||||||
Entry::Occupied(mut o) => {
|
|
||||||
let p = o.get_mut();
|
|
||||||
|
|
||||||
// Update existing presence event with more info
|
|
||||||
p.content.presence = presence.content.presence;
|
|
||||||
if let Some(status_msg) = presence.content.status_msg {
|
|
||||||
p.content.status_msg = Some(status_msg);
|
|
||||||
}
|
|
||||||
if let Some(last_active_ago) = presence.content.last_active_ago {
|
|
||||||
p.content.last_active_ago = Some(last_active_ago);
|
|
||||||
}
|
|
||||||
if let Some(displayname) = presence.content.displayname {
|
|
||||||
p.content.displayname = Some(displayname);
|
|
||||||
}
|
|
||||||
if let Some(avatar_url) = presence.content.avatar_url {
|
|
||||||
p.content.avatar_url = Some(avatar_url);
|
|
||||||
}
|
|
||||||
if let Some(currently_active) = presence.content.currently_active {
|
|
||||||
p.content.currently_active = Some(currently_active);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut left_rooms = BTreeMap::new();
|
|
||||||
let all_left_rooms: Vec<_> = services()
|
|
||||||
.rooms
|
|
||||||
.state_cache
|
|
||||||
.rooms_left(&sender_user)
|
|
||||||
.collect();
|
|
||||||
for result in all_left_rooms {
|
|
||||||
let (room_id, _) = result?;
|
|
||||||
|
|
||||||
let mut left_state_events = Vec::new();
|
|
||||||
|
|
||||||
{
|
|
||||||
// Get and drop the lock to wait for remaining operations to finish
|
|
||||||
let mutex_insert = Arc::clone(
|
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_insert
|
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.entry(room_id.clone())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let insert_lock = mutex_insert.lock().unwrap();
|
|
||||||
drop(insert_lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
let left_count = services()
|
|
||||||
.rooms
|
|
||||||
.state_cache
|
|
||||||
.get_left_count(&room_id, &sender_user)?;
|
|
||||||
|
|
||||||
// Left before last sync
|
|
||||||
if Some(since) >= left_count {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !services().rooms.metadata.exists(&room_id)? {
|
|
||||||
// This is just a rejected invite, not a room we know
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let since_shortstatehash = services()
|
|
||||||
.rooms
|
|
||||||
.user
|
|
||||||
.get_token_shortstatehash(&room_id, since)?;
|
|
||||||
|
|
||||||
let since_state_ids = match since_shortstatehash {
|
|
||||||
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
|
|
||||||
None => HashMap::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
|
|
||||||
&room_id,
|
|
||||||
&StateEventType::RoomMember,
|
|
||||||
sender_user.as_str(),
|
|
||||||
)? {
|
|
||||||
Some(e) => e,
|
|
||||||
None => {
|
|
||||||
error!("Left room but no left state event");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let left_shortstatehash = match services()
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.pdu_shortstatehash(&left_event_id)?
|
|
||||||
{
|
|
||||||
Some(s) => s,
|
|
||||||
None => {
|
|
||||||
error!("Leave event has no state");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut left_state_ids = services()
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.state_full_ids(left_shortstatehash)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let leave_shortstatekey = services()
|
|
||||||
.rooms
|
|
||||||
.short
|
|
||||||
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())?;
|
|
||||||
|
|
||||||
left_state_ids.insert(leave_shortstatekey, left_event_id);
|
|
||||||
|
|
||||||
let mut i = 0;
|
|
||||||
for (key, id) in left_state_ids {
|
|
||||||
if body.full_state || since_state_ids.get(&key) != Some(&id) {
|
|
||||||
let (event_type, state_key) =
|
|
||||||
services().rooms.short.get_statekey_from_short(key)?;
|
|
||||||
|
|
||||||
if !lazy_load_enabled
|
|
||||||
|| event_type != StateEventType::RoomMember
|
|
||||||
|| body.full_state
|
|
||||||
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|
|
||||||
|| *sender_user == state_key
|
|
||||||
{
|
|
||||||
let pdu = match services().rooms.timeline.get_pdu(&id)? {
|
|
||||||
Some(pdu) => pdu,
|
|
||||||
None => {
|
|
||||||
error!("Pdu in state not found: {}", id);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
left_state_events.push(pdu.to_sync_state_event());
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
if i % 100 == 0 {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
left_rooms.insert(
|
|
||||||
room_id.clone(),
|
|
||||||
LeftRoom {
|
|
||||||
account_data: RoomAccountData { events: Vec::new() },
|
|
||||||
timeline: Timeline {
|
|
||||||
limited: false,
|
|
||||||
prev_batch: Some(next_batch_string.clone()),
|
|
||||||
events: Vec::new(),
|
|
||||||
},
|
|
||||||
state: State {
|
|
||||||
events: left_state_events,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut invited_rooms = BTreeMap::new();
|
|
||||||
let all_invited_rooms: Vec<_> = services()
|
|
||||||
.rooms
|
|
||||||
.state_cache
|
|
||||||
.rooms_invited(&sender_user)
|
|
||||||
.collect();
|
|
||||||
for result in all_invited_rooms {
|
|
||||||
let (room_id, invite_state_events) = result?;
|
|
||||||
|
|
||||||
{
|
|
||||||
// Get and drop the lock to wait for remaining operations to finish
|
|
||||||
let mutex_insert = Arc::clone(
|
|
||||||
services()
|
|
||||||
.globals
|
|
||||||
.roomid_mutex_insert
|
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.entry(room_id.clone())
|
|
||||||
.or_default(),
|
|
||||||
);
|
|
||||||
let insert_lock = mutex_insert.lock().unwrap();
|
|
||||||
drop(insert_lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
let invite_count = services()
|
|
||||||
.rooms
|
|
||||||
.state_cache
|
|
||||||
.get_invite_count(&room_id, &sender_user)?;
|
|
||||||
|
|
||||||
// Invited before last sync
|
|
||||||
if Some(since) >= invite_count {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
invited_rooms.insert(
|
|
||||||
room_id.clone(),
|
|
||||||
InvitedRoom {
|
|
||||||
invite_state: InviteState {
|
|
||||||
events: invite_state_events,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
for user_id in left_encrypted_users {
|
|
||||||
let still_share_encrypted_room = services()
|
|
||||||
.rooms
|
|
||||||
.user
|
|
||||||
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
|
|
||||||
.filter_map(|r| r.ok())
|
|
||||||
.filter_map(|other_room_id| {
|
|
||||||
Some(
|
|
||||||
services()
|
|
||||||
.rooms
|
|
||||||
.state_accessor
|
|
||||||
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
|
|
||||||
.ok()?
|
|
||||||
.is_some(),
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
.all(|encrypted| !encrypted);
|
|
||||||
// If the user doesn't share an encrypted room with the target anymore, we need to tell
|
|
||||||
// them
|
|
||||||
if still_share_encrypted_room {
|
|
||||||
device_list_left.insert(user_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove all to-device events the device received *last time*
|
|
||||||
services()
|
|
||||||
.users
|
|
||||||
.remove_to_device_events(&sender_user, &sender_device, since)?;
|
|
||||||
|
|
||||||
let response = sync_events::v3::Response {
|
|
||||||
next_batch: next_batch_string,
|
|
||||||
rooms: Rooms {
|
|
||||||
leave: left_rooms,
|
|
||||||
join: joined_rooms,
|
|
||||||
invite: invited_rooms,
|
|
||||||
knock: BTreeMap::new(), // TODO
|
|
||||||
},
|
|
||||||
presence: Presence {
|
|
||||||
events: presence_updates
|
|
||||||
.into_values()
|
|
||||||
.map(|v| Raw::new(&v).expect("PresenceEvent always serializes successfully"))
|
|
||||||
.collect(),
|
|
||||||
},
|
|
||||||
account_data: GlobalAccountData {
|
|
||||||
events: services()
|
|
||||||
.account_data
|
|
||||||
.changes_since(None, &sender_user, since)?
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|(_, v)| {
|
|
||||||
serde_json::from_str(v.json().get())
|
|
||||||
.map_err(|_| Error::bad_database("Invalid account event in database."))
|
|
||||||
.ok()
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
},
|
|
||||||
device_lists: DeviceLists {
|
|
||||||
changed: device_list_updates.into_iter().collect(),
|
|
||||||
left: device_list_left.into_iter().collect(),
|
|
||||||
},
|
|
||||||
device_one_time_keys_count: services()
|
|
||||||
.users
|
|
||||||
.count_one_time_keys(&sender_user, &sender_device)?,
|
|
||||||
to_device: ToDevice {
|
|
||||||
events: services()
|
|
||||||
.users
|
|
||||||
.get_to_device_events(&sender_user, &sender_device)?,
|
|
||||||
},
|
|
||||||
// Fallback keys are not yet supported
|
|
||||||
device_unused_fallback_key_types: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: Retry the endpoint instead of returning (waiting for #118)
|
|
||||||
if !body.full_state
|
|
||||||
&& response.rooms.is_empty()
|
|
||||||
&& response.presence.is_empty()
|
|
||||||
&& response.account_data.is_empty()
|
|
||||||
&& response.device_lists.is_empty()
|
|
||||||
&& response.to_device.is_empty()
|
|
||||||
{
|
|
||||||
// Hang a few seconds so requests are not spammed
|
|
||||||
// Stop hanging if new info arrives
|
|
||||||
let mut duration = body.timeout.unwrap_or_default();
|
|
||||||
if duration.as_secs() > 30 {
|
|
||||||
duration = Duration::from_secs(30);
|
|
||||||
}
|
|
||||||
let _ = tokio::time::timeout(duration, watcher).await;
|
|
||||||
Ok((response, false))
|
|
||||||
} else {
|
|
||||||
Ok((response, since != next_batch)) // Only cache if we made progress
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn share_encrypted_room(
|
fn share_encrypted_room(
|
||||||
|
Loading…
Reference in New Issue
Block a user