diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index 5c0169d2..3f3d25d5 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -627,7 +627,7 @@ async fn join_room_by_id_helper( let event_id = format!( "${}", ruma::signatures::reference_hash(&join_event_stub, &room_version_id) - .expect("ruma can calculate reference hashes") + .expect("Event format validated when event was hashed") ); let event_id = <&EventId>::try_from(event_id.as_str()) .expect("ruma's reference hashes are valid event ids"); @@ -1145,7 +1145,7 @@ async fn validate_and_add_event_id( let event_id = EventId::parse(format!( "${}", ruma::signatures::reference_hash(&value, room_version) - .expect("ruma can calculate reference hashes") + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid PDU format"))? )) .expect("ruma's reference hashes are valid event ids"); @@ -1614,7 +1614,7 @@ async fn remote_leave_room(user_id: &UserId, room_id: &RoomId) -> Result<()> { let event_id = EventId::parse(format!( "${}", ruma::signatures::reference_hash(&leave_event_stub, &room_version_id) - .expect("ruma can calculate reference hashes") + .expect("Event format validated when event was hashed") )) .expect("ruma's reference hashes are valid event ids"); diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 60ded056..b0cdfc9a 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -2,7 +2,10 @@ use crate::{ api::client_server::{self, claim_keys_helper, get_keys_helper}, - service::pdu::{gen_event_id_canonical_json, PduBuilder}, + service::{ + globals::SigningKeys, + pdu::{gen_event_id_canonical_json, PduBuilder}, + }, services, utils, Error, PduEvent, Result, Ruma, }; use axum::{response::IntoResponse, Json}; @@ -807,17 +810,78 @@ pub fn parse_incoming_pdu( let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { Ok(t) => t, - Err(_) => { + Err(e) => { // Event could not be converted to canonical json - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Could not convert event to canonical json.", - )); + return Err(e); } }; Ok((event_id, value, room_id)) } +/// Attempts to parse and append PDU to timeline. +/// If no event ID is returned, then the PDU was failed to be parsed. +/// If the Ok(()) is returned, then the PDU was successfully appended to the timeline. +async fn handle_pdu_in_transaction( + origin: &ServerName, + pub_key_map: &RwLock>, + pdu: &RawJsonValue, +) -> (Option, Result<()>) { + let (event_id, value, room_id) = match parse_incoming_pdu(pdu) { + Ok(t) => t, + Err(e) => { + warn!("Could not parse PDU: {e}"); + warn!("Full PDU: {:?}", &pdu); + return (None, Err(Error::BadServerResponse("Could not parse PDU"))); + } + }; + + // Makes use of the m.room.create event. If we cannot fetch this event, + // we must have never been in that room. + if services().rooms.state.get_room_version(&room_id).is_err() { + debug!("Room {room_id} is not known to this server"); + return ( + Some(event_id), + Err(Error::BadServerResponse("Room is not known to this server")), + ); + } + + // We do not add the event_id field to the pdu here because of signature and hashes checks + + let mutex = Arc::clone( + services() + .globals + .roomid_mutex_federation + .write() + .await + .entry(room_id.to_owned()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let start_time = Instant::now(); + + if let Err(e) = services() + .rooms + .event_handler + .handle_incoming_pdu(origin, &event_id, &room_id, value, true, pub_key_map) + .await + { + warn!("Error appending PDU to timeline: {}: {:?}", e, pdu); + return (Some(event_id), Err(e)); + } + + drop(mutex_lock); + + let elapsed = start_time.elapsed(); + debug!( + "Handling transaction of event {} took {}m{}s", + event_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); + + (Some(event_id), Ok(())) +} + /// # `PUT /_matrix/federation/v1/send/{txnId}` /// /// Push EDUs and PDUs to this server. @@ -842,77 +906,11 @@ pub async fn send_transaction_message_route( // let mut auth_cache = EventMap::new(); for pdu in &body.pdus { - let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { - warn!("Error parsing incoming event {:?}: {:?}", pdu, e); - Error::BadServerResponse("Invalid PDU in server response") - })?; - let room_id: OwnedRoomId = value - .get("room_id") - .and_then(|id| RoomId::parse(id.as_str()?).ok()) - .ok_or(Error::BadRequest( - ErrorKind::InvalidParam, - "Invalid room id in pdu", - ))?; + let (event_id, result) = + handle_pdu_in_transaction(sender_servername, &pub_key_map, pdu).await; - if services().rooms.state.get_room_version(&room_id).is_err() { - debug!("Server is not in room {room_id}"); - continue; - } - - let r = parse_incoming_pdu(pdu); - let (event_id, value, room_id) = match r { - Ok(t) => t, - Err(e) => { - warn!("Could not parse PDU: {e}"); - warn!("Full PDU: {:?}", &pdu); - continue; - } - }; - // We do not add the event_id field to the pdu here because of signature and hashes checks - - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; - let start_time = Instant::now(); - resolved_map.insert( - event_id.clone(), - services() - .rooms - .event_handler - .handle_incoming_pdu( - sender_servername, - &event_id, - &room_id, - value, - true, - &pub_key_map, - ) - .await - .map(|_| ()), - ); - drop(mutex_lock); - - let elapsed = start_time.elapsed(); - debug!( - "Handling transaction of event {} took {}m{}s", - event_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); - } - - for pdu in &resolved_map { - if let Err(e) = pdu.1 { - if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) { - warn!("Incoming PDU failed {:?}", pdu); - } + if let Some(event_id) = event_id { + resolved_map.insert(event_id.clone(), result.map_err(|e| e.sanitized_error())); } } @@ -1081,12 +1079,7 @@ pub async fn send_transaction_message_route( } } - Ok(send_transaction_message::v1::Response { - pdus: resolved_map - .into_iter() - .map(|(e, r)| (e, r.map_err(|e| e.sanitized_error()))) - .collect(), - }) + Ok(send_transaction_message::v1::Response { pdus: resolved_map }) } /// # `GET /_matrix/federation/v1/event/{eventId}` @@ -1829,7 +1822,7 @@ pub async fn create_invite_route( let event_id = EventId::parse(format!( "${}", ruma::signatures::reference_hash(&signed_event, &body.room_version) - .expect("ruma can calculate reference hashes") + .expect("Event format validated when event was hashed") )) .expect("ruma's reference hashes are valid event ids"); diff --git a/src/service/pdu.rs b/src/service/pdu.rs index dab7b6e0..7934909b 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -1,5 +1,6 @@ use crate::Error; use ruma::{ + api::client::error::ErrorKind, canonical_json::redact_content_in_place, events::{ room::{member::RoomMemberEventContent, redaction::RoomRedactionEventContent}, @@ -443,7 +444,7 @@ pub(crate) fn gen_event_id_canonical_json( "${}", // Anything higher than version3 behaves the same ruma::signatures::reference_hash(&value, room_version_id) - .expect("ruma can calculate reference hashes") + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid PDU format"))? ) .try_into() .expect("ruma's reference hashes are valid event ids"); diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 002b8d77..0dd405c7 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -1477,7 +1477,7 @@ impl Service { let event_id = format!( "${}", ruma::signatures::reference_hash(&value, room_version) - .expect("ruma can calculate reference hashes") + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid PDU format"))? ); let event_id = <&EventId>::try_from(event_id.as_str()) .expect("ruma's reference hashes are valid event ids"); diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 6603ea65..9f0e2905 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -815,7 +815,7 @@ impl Service { pdu.event_id = EventId::parse_arc(format!( "${}", ruma::signatures::reference_hash(&pdu_json, &room_version_id) - .expect("ruma can calculate reference hashes") + .expect("Event format validated when event was hashed") )) .expect("ruma's reference hashes are valid event ids");