diff --git a/src/client_server/account.rs b/src/client_server/account.rs index ca8b7b1a..87e3731f 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -243,15 +243,15 @@ pub async fn register_route( let room_id = RoomId::new(db.globals.server_name()); - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let mut content = ruma::events::room::create::CreateEventContent::new(conduit_user.clone()); content.federate = true; @@ -270,7 +270,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 2. Make conduit bot join @@ -293,7 +293,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 3. Power levels @@ -318,7 +318,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 4.1 Join Rules @@ -336,7 +336,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 4.2 History Visibility @@ -356,7 +356,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 4.3 Guest Access @@ -374,7 +374,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 6. Events implied by name and topic @@ -393,7 +393,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; db.rooms.build_and_append_pdu( @@ -410,7 +410,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // Room alias @@ -433,7 +433,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; @@ -458,7 +458,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; db.rooms.build_and_append_pdu( PduBuilder { @@ -479,7 +479,7 @@ pub async fn register_route( &user_id, &room_id, &db, - &mutex_lock, + &state_lock, )?; // Send welcome message @@ -498,7 +498,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; } @@ -677,15 +677,15 @@ pub async fn deactivate_route( blurhash: None, }; - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; db.rooms.build_and_append_pdu( PduBuilder { @@ -698,7 +698,7 @@ pub async fn deactivate_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; } diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 895ad279..716a615f 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -203,15 +203,15 @@ pub async fn kick_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; // TODO: reason - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(body.room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; db.rooms.build_and_append_pdu( PduBuilder { @@ -224,10 +224,10 @@ pub async fn kick_user_route( &sender_user, &body.room_id, &db, - &mutex_lock, + &state_lock, )?; - drop(mutex_lock); + drop(state_lock); db.flush()?; @@ -275,15 +275,15 @@ pub async fn ban_user_route( }, )?; - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(body.room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; db.rooms.build_and_append_pdu( PduBuilder { @@ -296,10 +296,10 @@ pub async fn ban_user_route( &sender_user, &body.room_id, &db, - &mutex_lock, + &state_lock, )?; - drop(mutex_lock); + drop(state_lock); db.flush()?; @@ -337,15 +337,15 @@ pub async fn unban_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(body.room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; db.rooms.build_and_append_pdu( PduBuilder { @@ -358,10 +358,10 @@ pub async fn unban_user_route( &sender_user, &body.room_id, &db, - &mutex_lock, + &state_lock, )?; - drop(mutex_lock); + drop(state_lock); db.flush()?; @@ -486,15 +486,15 @@ async fn join_room_by_id_helper( ) -> ConduitResult { let sender_user = sender_user.expect("user is authenticated"); - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; // Ask a remote server if we don't have this room if !db.rooms.exists(&room_id)? && room_id.server_name() != db.globals.server_name() { @@ -706,11 +706,11 @@ async fn join_room_by_id_helper( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; } - drop(mutex_lock); + drop(state_lock); db.flush()?; @@ -790,15 +790,15 @@ pub async fn invite_helper<'a>( ) -> Result<()> { if user_id.server_name() != db.globals.server_name() { let (room_version_id, pdu_json, invite_room_state) = { - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let prev_events = db .rooms @@ -942,7 +942,7 @@ pub async fn invite_helper<'a>( let invite_room_state = db.rooms.calculate_invite_state(&pdu)?; - drop(mutex_lock); + drop(state_lock); (room_version_id, pdu_json, invite_room_state) }; @@ -1018,16 +1018,15 @@ pub async fn invite_helper<'a>( return Ok(()); } - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; db.rooms.build_and_append_pdu( PduBuilder { @@ -1048,10 +1047,10 @@ pub async fn invite_helper<'a>( &sender_user, room_id, &db, - &mutex_lock, + &state_lock, )?; - drop(mutex_lock); + drop(state_lock); Ok(()) } diff --git a/src/client_server/message.rs b/src/client_server/message.rs index f77ca891..9cb6faab 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -28,15 +28,15 @@ pub async fn send_message_event_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_deref(); - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(body.room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; // Check if this is a new transaction id if let Some(response) = @@ -75,7 +75,7 @@ pub async fn send_message_event_route( &sender_user, &body.room_id, &db, - &mutex_lock, + &state_lock, )?; db.transaction_ids.add_txnid( @@ -85,7 +85,7 @@ pub async fn send_message_event_route( event_id.as_bytes(), )?; - drop(mutex_lock); + drop(state_lock); db.flush()?; diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 648afeaa..de1babab 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -73,19 +73,19 @@ pub async fn set_displayname_route( }) .filter_map(|r| r.ok()) { - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let _ = db.rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &state_lock); // Presence update db.rooms.edus.update_presence( @@ -207,19 +207,19 @@ pub async fn set_avatar_url_route( }) .filter_map(|r| r.ok()) { - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let _ = db.rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &state_lock); // Presence update db.rooms.edus.update_presence( diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index 63d3d4a7..63bf103b 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -20,15 +20,15 @@ pub async fn redact_event_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(body.room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let event_id = db.rooms.build_and_append_pdu( PduBuilder { @@ -44,10 +44,10 @@ pub async fn redact_event_route( &sender_user, &body.room_id, &db, - &mutex_lock, + &state_lock, )?; - drop(mutex_lock); + drop(state_lock); db.flush()?; diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 1b14a930..f73d5445 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -33,15 +33,15 @@ pub async fn create_room_route( let room_id = RoomId::new(db.globals.server_name()); - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let alias = body .room_alias_name @@ -79,7 +79,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 2. Let the room creator join @@ -102,7 +102,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 3. Power levels @@ -157,7 +157,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 4. Events set by preset @@ -184,7 +184,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 4.2 History Visibility @@ -202,7 +202,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 4.3 Guest Access @@ -228,7 +228,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; // 5. Events listed in initial_state @@ -244,7 +244,7 @@ pub async fn create_room_route( } db.rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock)?; + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &state_lock)?; } // 6. Events implied by name and topic @@ -261,7 +261,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; } @@ -280,12 +280,12 @@ pub async fn create_room_route( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; } // 7. Events implied by invite (and TODO: invite_3pid) - drop(mutex_lock); + drop(state_lock); for user_id in &body.invite { let _ = invite_helper(sender_user, user_id, &room_id, &db, body.is_direct).await; } @@ -364,13 +364,12 @@ pub async fn get_room_aliases_route( #[cfg_attr( feature = "conduit_bin", - post("/_matrix/client/r0/rooms/<_room_id>/upgrade", data = "") + post("/_matrix/client/r0/rooms/<_>/upgrade", data = "") )] #[tracing::instrument(skip(db, body))] pub async fn upgrade_room_route( db: DatabaseGuard, body: Ruma>, - _room_id: String, ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); @@ -387,15 +386,15 @@ pub async fn upgrade_room_route( // Create a replacement room let replacement_room = RoomId::new(db.globals.server_name()); - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(body.room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions @@ -414,9 +413,21 @@ pub async fn upgrade_room_route( sender_user, &body.room_id, &db, - &mutex_lock, + &state_lock, )?; + // Change lock to replacement room + drop(state_lock); + let mutex_state = Arc::clone( + db.globals + .roomid_mutex_state + .write() + .unwrap() + .entry(replacement_room.clone()) + .or_default(), + ); + let state_lock = mutex_state.lock().await; + // Get the old room federations status let federate = serde_json::from_value::>( db.rooms @@ -455,7 +466,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, - &mutex_lock, + &state_lock, )?; // Join the new room @@ -478,7 +489,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, - &mutex_lock, + &state_lock, )?; // Recommended transferable state events list from the specs @@ -512,7 +523,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, - &mutex_lock, + &state_lock, )?; } @@ -556,10 +567,10 @@ pub async fn upgrade_room_route( sender_user, &body.room_id, &db, - &mutex_lock, + &state_lock, )?; - drop(mutex_lock); + drop(state_lock); db.flush()?; diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 5afac039..aa020b50 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -259,15 +259,15 @@ pub async fn send_state_event_for_key_helper( } } - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let event_id = db.rooms.build_and_append_pdu( PduBuilder { @@ -280,7 +280,7 @@ pub async fn send_state_event_for_key_helper( &sender_user, &room_id, &db, - &mutex_lock, + &state_lock, )?; Ok(event_id) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index b09a2122..937a252e 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -191,17 +191,17 @@ async fn sync_helper( let room_id = room_id?; // Get and drop the lock to wait for remaining operations to finish - let mutex = Arc::clone( + // This will make sure the we have all events until next_batch + let mutex_insert = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_insert .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - - let mutex_lock = mutex.lock().await; - drop(mutex_lock); + let insert_lock = mutex_insert.lock().unwrap(); + drop(insert_lock); let mut non_timeline_pdus = db .rooms @@ -665,16 +665,16 @@ async fn sync_helper( let (room_id, left_state_events) = result?; // Get and drop the lock to wait for remaining operations to finish - let mutex = Arc::clone( + let mutex_insert = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_insert .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; - drop(mutex_lock); + let insert_lock = mutex_insert.lock().unwrap(); + drop(insert_lock); let left_count = db.rooms.get_left_count(&room_id, &sender_user)?; @@ -705,16 +705,16 @@ async fn sync_helper( let (room_id, invite_state_events) = result?; // Get and drop the lock to wait for remaining operations to finish - let mutex = Arc::clone( + let mutex_insert = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_insert .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; - drop(mutex_lock); + let insert_lock = mutex_insert.lock().unwrap(); + drop(insert_lock); let invite_count = db.rooms.get_invite_count(&room_id, &sender_user)?; diff --git a/src/database/admin.rs b/src/database/admin.rs index e1b24d08..424e6746 100644 --- a/src/database/admin.rs +++ b/src/database/admin.rs @@ -84,15 +84,15 @@ impl Admin { tokio::select! { Some(event) = receiver.next() => { let guard = db.read().await; - let mutex = Arc::clone( + let mutex_state = Arc::clone( guard.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(conduit_room.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; match event { AdminCommand::RegisterAppservice(yaml) => { @@ -106,17 +106,17 @@ impl Admin { count, appservices.into_iter().filter_map(|r| r.ok()).collect::>().join(", ") ); - send_message(message::MessageEventContent::text_plain(output), guard, &mutex_lock); + send_message(message::MessageEventContent::text_plain(output), guard, &state_lock); } else { - send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &mutex_lock); + send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &state_lock); } } AdminCommand::SendMessage(message) => { - send_message(message, guard, &mutex_lock); + send_message(message, guard, &state_lock); } } - drop(mutex_lock); + drop(state_lock); } } } diff --git a/src/database/globals.rs b/src/database/globals.rs index 2ca8de98..823ce349 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -12,7 +12,7 @@ use std::{ fs, future::Future, path::PathBuf, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }; use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore}; @@ -45,7 +45,8 @@ pub struct Globals { pub bad_signature_ratelimiter: Arc, RateLimitState>>>, pub servername_ratelimiter: Arc, Arc>>>, pub sync_receivers: RwLock), SyncHandle>>, - pub roomid_mutex: RwLock>>>, + pub roomid_mutex_insert: RwLock>>>, + pub roomid_mutex_state: RwLock>>>, pub roomid_mutex_federation: RwLock>>>, // this lock will be held longer pub rotate: RotationHandler, } @@ -200,7 +201,8 @@ impl Globals { bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())), servername_ratelimiter: Arc::new(RwLock::new(HashMap::new())), - roomid_mutex: RwLock::new(HashMap::new()), + roomid_mutex_state: RwLock::new(HashMap::new()), + roomid_mutex_insert: RwLock::new(HashMap::new()), roomid_mutex_federation: RwLock::new(HashMap::new()), sync_receivers: RwLock::new(HashMap::new()), rotate: RotationHandler::new(), diff --git a/src/database/rooms.rs b/src/database/rooms.rs index eb98152a..e5eb2eea 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -736,6 +736,16 @@ impl Rooms { self.replace_pdu_leaves(&pdu.room_id, leaves)?; + let mutex_insert = Arc::clone( + db.globals + .roomid_mutex_insert + .write() + .unwrap() + .entry(pdu.room_id.clone()) + .or_default(), + ); + let insert_lock = mutex_insert.lock().unwrap(); + let count1 = db.globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if appending // fails @@ -750,6 +760,8 @@ impl Rooms { // There's a brief moment of time here where the count is updated but the pdu does not // exist. This could theoretically lead to dropped pdus, but it's extremely rare + // + // Update: We fixed this using insert_lock self.pduid_pdu.insert( &pdu_id, @@ -761,6 +773,8 @@ impl Rooms { self.eventid_pduid .insert(pdu.event_id.as_bytes(), &pdu_id)?; + drop(insert_lock); + // See if the event matches any known pushers let power_levels: PowerLevelsEventContent = db .rooms @@ -1464,13 +1478,6 @@ impl Rooms { self.shorteventid_eventid .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?; - // Increment the last index and use that - // This is also the next_batch/since value - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = self.append_to_state(&pdu, &db.globals)?; @@ -1904,15 +1911,15 @@ impl Rooms { db, )?; } else { - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; let mut event = serde_json::from_value::>( self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())? @@ -1941,7 +1948,7 @@ impl Rooms { user_id, room_id, db, - &mutex_lock, + &state_lock, )?; } diff --git a/src/server_server.rs b/src/server_server.rs index 09b6bfc9..a20c9abe 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1159,15 +1159,15 @@ pub fn handle_incoming_pdu<'a>( // We start looking at current room state now, so lets lock the room - let mutex = Arc::clone( + let mutex_state = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_state .write() .unwrap() .entry(room_id.clone()) .or_default(), ); - let mutex_lock = mutex.lock().await; + let state_lock = mutex_state.lock().await; // Now we calculate the set of extremities this room has after the incoming event has been // applied. We start with the previous extremities (aka leaves) @@ -1341,7 +1341,7 @@ pub fn handle_incoming_pdu<'a>( val, extremities, &state_at_incoming_event, - &mutex_lock, + &state_lock, ) .map_err(|_| "Failed to add pdu to db.".to_owned())?, ); @@ -1364,7 +1364,7 @@ pub fn handle_incoming_pdu<'a>( } // Event has passed all auth/stateres checks - drop(mutex_lock); + drop(state_lock); Ok(pdu_id) }) }