feat: Admin command to ask other servers for state

This commit is contained in:
Timo Kösters 2023-06-30 17:49:52 +02:00
parent 6a6f8e80f1
commit b4065a3e28
No known key found for this signature in database
GPG Key ID: 0B25E636FBA7E4CB
3 changed files with 183 additions and 77 deletions

View File

@ -1,7 +1,7 @@
use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
sync::Arc,
sync::{Arc, RwLock},
time::Instant,
};
@ -21,12 +21,13 @@ use ruma::{
power_levels::RoomPowerLevelsEventContent,
topic::RoomTopicEventContent,
},
TimelineEventType,
StateEventType, TimelineEventType,
},
EventId, OwnedRoomAliasId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
};
use serde_json::value::to_raw_value;
use tokio::sync::{mpsc, Mutex, MutexGuard};
use tracing::{error, info};
use crate::{
api::client_server::{leave_all_rooms, AUTO_GEN_PASSWORD_LENGTH},
@ -153,6 +154,12 @@ enum AdminCommand {
password: Option<String>,
},
AskForState {
room_id: Box<RoomId>,
event_id: Box<EventId>,
server: Box<ServerName>,
},
/// Disables incoming federation handling for a room.
DisableRoom { room_id: Box<RoomId> },
/// Enables incoming federation handling for a room again.
@ -736,6 +743,71 @@ impl Service {
)
}
}
AdminCommand::AskForState {
room_id,
event_id,
server,
} => {
let create_event = services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomCreate, "")?
.ok_or_else(|| Error::bad_database("Failed to find create event in db."))?;
let create_event_content: RoomCreateEventContent =
serde_json::from_str(create_event.content.get()).map_err(|e| {
error!("Invalid create event: {}", e);
Error::BadDatabase("Invalid create event in db")
})?;
let room_version_id = &create_event_content.room_version;
let state_at_event = services()
.rooms
.event_handler
.ask_for_state(
&room_id,
&event_id,
&server,
&create_event,
room_version_id,
&mut RwLock::new(BTreeMap::new()),
)
.await?;
// We start looking at current room state now, so lets lock the room
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.unwrap()
.entry((*room_id).to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
let new_room_state = services()
.rooms
.event_handler
.resolve_state(&room_id, room_version_id, state_at_event)
.await?;
// Set the new room state to the resolved state
info!("Forcing new room state");
let (sstatehash, new, removed) = services()
.rooms
.state_compressor
.save_state(&room_id, new_room_state)?;
services()
.rooms
.state
.force_state(&room_id, sstatehash, new, removed, &state_lock)
.await?;
drop(state_lock);
RoomMessageEventContent::text_plain("Updated state.")
}
};
Ok(reply_message_content)

View File

@ -632,81 +632,18 @@ impl Service {
}
if state_at_incoming_event.is_none() {
info!("Calling /state_ids");
// Call /state_ids to find out what the state at this pdu is. We trust the server's
// response to some extend, but we still do a lot of checks on the events
match services()
.sending
.send_federation_request(
state_at_incoming_event = Some(
self.ask_for_state(
room_id,
&incoming_pdu.event_id,
origin,
get_room_state_ids::v1::Request {
room_id: room_id.to_owned(),
event_id: (*incoming_pdu.event_id).to_owned(),
},
create_event,
room_version_id,
pub_key_map,
)
.await
{
Ok(res) => {
info!("Fetching state events at event.");
let state_vec = self
.fetch_and_handle_outliers(
origin,
&res.pdu_ids
.iter()
.map(|x| Arc::from(&**x))
.collect::<Vec<_>>(),
create_event,
room_id,
room_version_id,
pub_key_map,
)
.await;
let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
for (pdu, _) in state_vec {
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Found non-state pdu in state events.")
})?;
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
&pdu.kind.to_string().into(),
&state_key,
)?;
match state.entry(shortstatekey) {
hash_map::Entry::Vacant(v) => {
v.insert(Arc::from(&*pdu.event_id));
}
hash_map::Entry::Occupied(_) => return Err(
Error::bad_database("State event's type and state_key combination exists multiple times."),
),
}
}
// The original create event must still be in the state
let create_shortstatekey = services()
.rooms
.short
.get_shortstatekey(&StateEventType::RoomCreate, "")?
.expect("Room exists");
if state.get(&create_shortstatekey).map(|id| id.as_ref())
!= Some(&create_event.event_id)
{
return Err(Error::bad_database(
"Incoming event refers to wrong create event.",
));
}
state_at_incoming_event = Some(state);
}
Err(e) => {
warn!("Fetching state for event failed: {}", e);
return Err(e);
}
};
.await?,
);
}
let state_at_incoming_event =
state_at_incoming_event.expect("we always set this to some above");
@ -884,7 +821,91 @@ impl Service {
Ok(pdu_id)
}
async fn resolve_state(
pub async fn ask_for_state(
&self,
room_id: &RoomId,
event_id: &EventId,
server_name: &ServerName,
create_event: &PduEvent,
room_version_id: &RoomVersionId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<HashMap<u64, Arc<EventId>>> {
info!("Calling /state_ids");
// Call /state_ids to find out what the state at this pdu is. We trust the server's
// response to some extend, but we still do a lot of checks on the events
match services()
.sending
.send_federation_request(
server_name,
get_room_state_ids::v1::Request {
room_id: room_id.to_owned(),
event_id: event_id.to_owned(),
},
)
.await
{
Ok(res) => {
info!("Fetching state events at event.");
let state_vec = self
.fetch_and_handle_outliers(
server_name,
&res.pdu_ids
.iter()
.map(|x| Arc::from(&**x))
.collect::<Vec<_>>(),
create_event,
room_id,
room_version_id,
pub_key_map,
)
.await;
let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
for (pdu, _) in state_vec {
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Found non-state pdu in state events.")
})?;
let shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key)?;
match state.entry(shortstatekey) {
hash_map::Entry::Vacant(v) => {
v.insert(Arc::from(&*pdu.event_id));
}
hash_map::Entry::Occupied(_) => return Err(Error::bad_database(
"State event's type and state_key combination exists multiple times.",
)),
}
}
// The original create event must still be in the state
let create_shortstatekey = services()
.rooms
.short
.get_shortstatekey(&StateEventType::RoomCreate, "")?
.expect("Room exists");
if state.get(&create_shortstatekey).map(|id| id.as_ref())
!= Some(&create_event.event_id)
{
return Err(Error::bad_database(
"Incoming event refers to wrong create event.",
));
}
Ok(state)
}
Err(e) => {
warn!("Fetching state for event failed: {}", e);
Err(e)
}
}
}
pub async fn resolve_state(
&self,
room_id: &RoomId,
room_version_id: &RoomVersionId,

View File

@ -16,7 +16,7 @@ use ruma::{
};
use serde::Deserialize;
use tokio::sync::MutexGuard;
use tracing::warn;
use tracing::{info, warn};
use crate::{services, utils::calculate_hash, Error, PduEvent, Result};
@ -33,7 +33,7 @@ impl Service {
room_id: &RoomId,
shortstatehash: u64,
statediffnew: Arc<HashSet<CompressedStateEvent>>,
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
statediffremoved: Arc<HashSet<CompressedStateEvent>>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
for event_id in statediffnew.iter().filter_map(|new| {
@ -49,6 +49,8 @@ impl Service {
None => continue,
};
info!("New in state: {event_id}");
if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") {
continue;
}
@ -90,6 +92,17 @@ impl Service {
)?;
}
for event_id in statediffremoved.iter().filter_map(|removed| {
services()
.rooms
.state_compressor
.parse_compressed_state_event(&removed)
.ok()
.map(|(_, id)| id)
}) {
info!("Removed from state: {event_id}");
}
services().rooms.state_cache.update_joined_count(room_id)?;
self.db