diff --git a/src/api/server_server.rs b/src/api/server_server.rs index ca402f82..a2051251 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -2,7 +2,10 @@ use crate::{ api::client_server::{self, claim_keys_helper, get_keys_helper}, - service::pdu::{gen_event_id_canonical_json, PduBuilder}, + service::{ + globals::SigningKeys, + pdu::{gen_event_id_canonical_json, PduBuilder}, + }, services, utils, Error, PduEvent, Result, Ruma, }; use axum::{response::IntoResponse, Json}; @@ -800,17 +803,78 @@ pub fn parse_incoming_pdu( let (event_id, value) = match gen_event_id_canonical_json(pdu, &room_version_id) { Ok(t) => t, - Err(_) => { + Err(e) => { // Event could not be converted to canonical json - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Could not convert event to canonical json.", - )); + return Err(e); } }; Ok((event_id, value, room_id)) } +/// Attempts to parse and append PDU to timeline. +/// If no event ID is returned, then the PDU was failed to be parsed. +/// If the Ok(()) is returned, then the PDU was successfully appended to the timeline. +async fn handle_pdu_in_transaction( + origin: &ServerName, + pub_key_map: &RwLock>, + pdu: &RawJsonValue, +) -> (Option, Result<()>) { + let (event_id, value, room_id) = match parse_incoming_pdu(pdu) { + Ok(t) => t, + Err(e) => { + warn!("Could not parse PDU: {e}"); + warn!("Full PDU: {:?}", &pdu); + return (None, Err(Error::BadServerResponse("Could not parse PDU"))); + } + }; + + // Makes use of the m.room.create event. If we cannot fetch this event, + // we must have never been in that room. + if services().rooms.state.get_room_version(&room_id).is_err() { + debug!("Room {room_id} is not known to this server"); + return ( + Some(event_id), + Err(Error::BadServerResponse("Room is not known to this server")), + ); + } + + // We do not add the event_id field to the pdu here because of signature and hashes checks + + let mutex = Arc::clone( + services() + .globals + .roomid_mutex_federation + .write() + .await + .entry(room_id.to_owned()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let start_time = Instant::now(); + + if let Err(e) = services() + .rooms + .event_handler + .handle_incoming_pdu(origin, &event_id, &room_id, value, true, pub_key_map) + .await + { + warn!("Error appending PDU to timeline: {}: {:?}", e, pdu); + return (Some(event_id), Err(e)); + } + + drop(mutex_lock); + + let elapsed = start_time.elapsed(); + debug!( + "Handling transaction of event {} took {}m{}s", + event_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); + + (Some(event_id), Ok(())) +} + /// # `PUT /_matrix/federation/v1/send/{txnId}` /// /// Push EDUs and PDUs to this server. @@ -835,77 +899,11 @@ pub async fn send_transaction_message_route( // let mut auth_cache = EventMap::new(); for pdu in &body.pdus { - let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| { - warn!("Error parsing incoming event {:?}: {:?}", pdu, e); - Error::BadServerResponse("Invalid PDU in server response") - })?; - let room_id: OwnedRoomId = value - .get("room_id") - .and_then(|id| RoomId::parse(id.as_str()?).ok()) - .ok_or(Error::BadRequest( - ErrorKind::InvalidParam, - "Invalid room id in pdu", - ))?; + let (event_id, result) = + handle_pdu_in_transaction(sender_servername, &pub_key_map, pdu).await; - if services().rooms.state.get_room_version(&room_id).is_err() { - debug!("Server is not in room {room_id}"); - continue; - } - - let r = parse_incoming_pdu(pdu); - let (event_id, value, room_id) = match r { - Ok(t) => t, - Err(e) => { - warn!("Could not parse PDU: {e}"); - warn!("Full PDU: {:?}", &pdu); - continue; - } - }; - // We do not add the event_id field to the pdu here because of signature and hashes checks - - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; - let start_time = Instant::now(); - resolved_map.insert( - event_id.clone(), - services() - .rooms - .event_handler - .handle_incoming_pdu( - sender_servername, - &event_id, - &room_id, - value, - true, - &pub_key_map, - ) - .await - .map(|_| ()), - ); - drop(mutex_lock); - - let elapsed = start_time.elapsed(); - debug!( - "Handling transaction of event {} took {}m{}s", - event_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); - } - - for pdu in &resolved_map { - if let Err(e) = pdu.1 { - if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) { - warn!("Incoming PDU failed {:?}", pdu); - } + if let Some(event_id) = event_id { + resolved_map.insert(event_id.clone(), result.map_err(|e| e.sanitized_error())); } } @@ -1074,12 +1072,7 @@ pub async fn send_transaction_message_route( } } - Ok(send_transaction_message::v1::Response { - pdus: resolved_map - .into_iter() - .map(|(e, r)| (e, r.map_err(|e| e.sanitized_error()))) - .collect(), - }) + Ok(send_transaction_message::v1::Response { pdus: resolved_map }) } /// # `GET /_matrix/federation/v1/event/{eventId}`