From b7ab57897bc96e468421cf82ecd7d49e75c3f7dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 15 Sep 2020 16:13:54 +0200 Subject: [PATCH] fix: sending slowness --- Cargo.lock | 2 +- src/client_server/account.rs | 30 +- src/client_server/membership.rs | 6 + src/client_server/message.rs | 42 +-- src/client_server/profile.rs | 138 +++++---- src/client_server/redact.rs | 36 ++- src/client_server/room.rs | 516 ++++++++++++++++++-------------- src/client_server/state.rs | 48 +-- src/database.rs | 5 + src/database/globals.rs | 21 +- src/database/rooms.rs | 58 ++-- src/database/rooms/edus.rs | 1 + src/database/sending.rs | 83 +++++ src/main.rs | 2 + src/pdu.rs | 3 +- 15 files changed, 574 insertions(+), 417 deletions(-) create mode 100644 src/database/sending.rs diff --git a/Cargo.lock b/Cargo.lock index e0de2a7b..30144ca5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1831,7 +1831,7 @@ checksum = "7345c971d1ef21ffdbd103a75990a15eb03604fc8b8852ca8cb418ee1a099028" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/timokoesters/state-res?branch=spec-comp#a9186476b748c901fbf4356414247a0b3ac01b5f" +source = "git+https://github.com/timokoesters/state-res?branch=spec-comp#1d01b6e65b6afd50e65085fb40f1e7d2782f519e" dependencies = [ "itertools", "js_int", diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 2ec9282f..7e0f942e 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -354,19 +354,23 @@ pub async fn deactivate_route( third_party_invite: None, }; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_id.to_string()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(event) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_id.to_string()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; } // Remove devices and mark account as deactivated diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index f60601fd..c4eed95c 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -120,6 +120,7 @@ pub async fn leave_room_route( &sender_id, &body.room_id, &db.globals, + &db.sending, &db.account_data, ) .await?; @@ -157,6 +158,7 @@ pub async fn invite_user_route( &sender_id, &body.room_id, &db.globals, + &db.sending, &db.account_data, ) .await?; @@ -209,6 +211,7 @@ pub async fn kick_user_route( &sender_id, &body.room_id, &db.globals, + &db.sending, &db.account_data, ) .await?; @@ -266,6 +269,7 @@ pub async fn ban_user_route( &sender_id, &body.room_id, &db.globals, + &db.sending, &db.account_data, ) .await?; @@ -314,6 +318,7 @@ pub async fn unban_user_route( &sender_id, &body.room_id, &db.globals, + &db.sending, &db.account_data, ) .await?; @@ -672,6 +677,7 @@ async fn join_room_by_id_helper( &sender_id, &room_id, &db.globals, + &db.sending, &db.account_data, ) .await?; diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 4ba0d9fd..3944d5bd 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -49,25 +49,29 @@ pub async fn send_message_event_route( let mut unsigned = serde_json::Map::new(); unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into()); - let event_id = db.rooms.build_and_append_pdu( - PduBuilder { - event_type: body.content.event_type().into(), - content: serde_json::from_str( - body.json_body - .as_ref() - .ok_or(Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))? - .get(), - ) - .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?, - unsigned: Some(unsigned), - state_key: None, - redacts: None, - }, - &sender_id, - &body.room_id, - &db.globals, - &db.account_data, - ).await?; + let event_id = db + .rooms + .build_and_append_pdu( + PduBuilder { + event_type: body.content.event_type().into(), + content: serde_json::from_str( + body.json_body + .as_ref() + .ok_or(Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))? + .get(), + ) + .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?, + unsigned: Some(unsigned), + state_key: None, + redacts: None, + }, + &sender_id, + &body.room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; db.transaction_ids .add_txnid(sender_id, device_id, &body.txn_id, event_id.as_bytes())?; diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index be893e1b..53893c0f 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -31,40 +31,43 @@ pub async fn set_displayname_route( // Send a new membership event and presence update into all joined rooms for room_id in db.rooms.rooms_joined(&sender_id) { let room_id = room_id?; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(ruma::events::room::member::MemberEventContent { - displayname: body.displayname.clone(), - ..serde_json::from_value::>( - db.rooms - .room_state_get( - &room_id, - &EventType::RoomMember, - &sender_id.to_string(), - )? - .ok_or_else(|| { - Error::bad_database( + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(ruma::events::room::member::MemberEventContent { + displayname: body.displayname.clone(), + ..serde_json::from_value::>( + db.rooms + .room_state_get( + &room_id, + &EventType::RoomMember, + &sender_id.to_string(), + )? + .ok_or_else(|| { + Error::bad_database( "Tried to send displayname update for user not in the room.", ) - })? - .content - .clone(), - ) - .expect("from_value::> can never fail") - .deserialize() - .map_err(|_| Error::bad_database("Database contains invalid PDU."))? - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_id.to_string()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + })? + .content + .clone(), + ) + .expect("from_value::> can never fail") + .deserialize() + .map_err(|_| Error::bad_database("Database contains invalid PDU."))? + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_id.to_string()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // Presence update db.rooms.edus.update_presence( @@ -134,40 +137,43 @@ pub async fn set_avatar_url_route( // Send a new membership event and presence update into all joined rooms for room_id in db.rooms.rooms_joined(&sender_id) { let room_id = room_id?; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(ruma::events::room::member::MemberEventContent { - avatar_url: body.avatar_url.clone(), - ..serde_json::from_value::>( - db.rooms - .room_state_get( - &room_id, - &EventType::RoomMember, - &sender_id.to_string(), - )? - .ok_or_else(|| { - Error::bad_database( - "Tried to send avatar url update for user not in the room.", - ) - })? - .content - .clone(), - ) - .expect("from_value::> can never fail") - .deserialize() - .map_err(|_| Error::bad_database("Database contains invalid PDU."))? - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_id.to_string()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(ruma::events::room::member::MemberEventContent { + avatar_url: body.avatar_url.clone(), + ..serde_json::from_value::>( + db.rooms + .room_state_get( + &room_id, + &EventType::RoomMember, + &sender_id.to_string(), + )? + .ok_or_else(|| { + Error::bad_database( + "Tried to send avatar url update for user not in the room.", + ) + })? + .content + .clone(), + ) + .expect("from_value::> can never fail") + .deserialize() + .map_err(|_| Error::bad_database("Database contains invalid PDU."))? + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_id.to_string()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // Presence update db.rooms.edus.update_presence( diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index 701fc00d..24df8dd7 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -18,22 +18,26 @@ pub async fn redact_event_route( ) -> ConduitResult { let sender_id = body.sender_id.as_ref().expect("user is authenticated"); - let event_id = db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomRedaction, - content: serde_json::to_value(redaction::RedactionEventContent { - reason: body.reason.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: Some(body.event_id.clone()), - }, - &sender_id, - &body.room_id, - &db.globals, - &db.account_data, - ).await?; + let event_id = db + .rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomRedaction, + content: serde_json::to_value(redaction::RedactionEventContent { + reason: body.reason.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: Some(body.event_id.clone()), + }, + &sender_id, + &body.room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; Ok(redact_event::Response { event_id }.into()) } diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 0e5c5716..d21148bb 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -53,41 +53,47 @@ pub async fn create_room_route( content.room_version = RoomVersionId::Version6; // 1. The room create event - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomCreate, - content: serde_json::to_value(content).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomCreate, + content: serde_json::to_value(content).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // 2. Let the room creator join - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(member::MemberEventContent { - membership: member::MembershipState::Join, - displayname: db.users.displayname(&sender_id)?, - avatar_url: db.users.avatar_url(&sender_id)?, - is_direct: Some(body.is_direct), - third_party_invite: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_id.to_string()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(member::MemberEventContent { + membership: member::MembershipState::Join, + displayname: db.users.displayname(&sender_id)?, + avatar_url: db.users.avatar_url(&sender_id)?, + is_direct: Some(body.is_direct), + third_party_invite: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_id.to_string()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // 3. Power levels let mut users = BTreeMap::new(); @@ -117,19 +123,22 @@ pub async fn create_room_route( }) .expect("event is valid, we just created it") }; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomPowerLevels, - content: power_levels_content, - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomPowerLevels, + content: power_levels_content, + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // 4. Events set by preset @@ -140,73 +149,84 @@ pub async fn create_room_route( }); // 4.1 Join Rules - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomJoinRules, - content: match preset { - create_room::RoomPreset::PublicChat => serde_json::to_value( - join_rules::JoinRulesEventContent::new(join_rules::JoinRule::Public), - ) - .expect("event is valid, we just created it"), - // according to spec "invite" is the default - _ => serde_json::to_value(join_rules::JoinRulesEventContent::new( - join_rules::JoinRule::Invite, - )) - .expect("event is valid, we just created it"), + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomJoinRules, + content: match preset { + create_room::RoomPreset::PublicChat => serde_json::to_value( + join_rules::JoinRulesEventContent::new(join_rules::JoinRule::Public), + ) + .expect("event is valid, we just created it"), + // according to spec "invite" is the default + _ => serde_json::to_value(join_rules::JoinRulesEventContent::new( + join_rules::JoinRule::Invite, + )) + .expect("event is valid, we just created it"), + }, + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, }, - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // 4.2 History Visibility - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomHistoryVisibility, - content: serde_json::to_value(history_visibility::HistoryVisibilityEventContent::new( - history_visibility::HistoryVisibility::Shared, - )) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomHistoryVisibility, + content: serde_json::to_value( + history_visibility::HistoryVisibilityEventContent::new( + history_visibility::HistoryVisibility::Shared, + ), + ) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // 4.3 Guest Access - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomGuestAccess, - content: match preset { - create_room::RoomPreset::PublicChat => { - serde_json::to_value(guest_access::GuestAccessEventContent::new( - guest_access::GuestAccess::Forbidden, + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomGuestAccess, + content: match preset { + create_room::RoomPreset::PublicChat => { + serde_json::to_value(guest_access::GuestAccessEventContent::new( + guest_access::GuestAccess::Forbidden, + )) + .expect("event is valid, we just created it") + } + _ => serde_json::to_value(guest_access::GuestAccessEventContent::new( + guest_access::GuestAccess::CanJoin, )) - .expect("event is valid, we just created it") - } - _ => serde_json::to_value(guest_access::GuestAccessEventContent::new( - guest_access::GuestAccess::CanJoin, - )) - .expect("event is valid, we just created it"), + .expect("event is valid, we just created it"), + }, + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, }, - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // 5. Events listed in initial_state for event in &body.initial_state { @@ -220,78 +240,90 @@ pub async fn create_room_route( continue; } - db.rooms.build_and_append_pdu( - pdu_builder, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + pdu_builder, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; } // 6. Events implied by name and topic if let Some(name) = &body.name { - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomName, - content: serde_json::to_value( - name::NameEventContent::new(name.clone()).map_err(|_| { - Error::BadRequest(ErrorKind::InvalidParam, "Name is invalid.") - })?, - ) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomName, + content: serde_json::to_value( + name::NameEventContent::new(name.clone()).map_err(|_| { + Error::BadRequest(ErrorKind::InvalidParam, "Name is invalid.") + })?, + ) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; } if let Some(topic) = &body.topic { - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomTopic, - content: serde_json::to_value(topic::TopicEventContent { - topic: topic.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomTopic, + content: serde_json::to_value(topic::TopicEventContent { + topic: topic.clone(), + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; } // 7. Events implied by invite (and TODO: invite_3pid) for user in &body.invite { - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(member::MemberEventContent { - membership: member::MembershipState::Invite, - displayname: db.users.displayname(&user)?, - avatar_url: db.users.avatar_url(&user)?, - is_direct: Some(body.is_direct), - third_party_invite: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user.to_string()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(member::MemberEventContent { + membership: member::MembershipState::Invite, + displayname: db.users.displayname(&user)?, + avatar_url: db.users.avatar_url(&user)?, + is_direct: Some(body.is_direct), + third_party_invite: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user.to_string()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; } // Homeserver specific stuff @@ -363,23 +395,29 @@ pub async fn upgrade_room_route( // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions - let tombstone_event_id = db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomTombstone, - content: serde_json::to_value(ruma::events::room::tombstone::TombstoneEventContent { - body: "This room has been replaced".to_string(), - replacement_room: replacement_room.clone(), - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_id, - &body.room_id, - &db.globals, - &db.account_data, - ).await?; + let tombstone_event_id = db + .rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomTombstone, + content: serde_json::to_value( + ruma::events::room::tombstone::TombstoneEventContent { + body: "This room has been replaced".to_string(), + replacement_room: replacement_room.clone(), + }, + ) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_id, + &body.room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // Get the old room federations status let federate = serde_json::from_value::>( @@ -406,42 +444,48 @@ pub async fn upgrade_room_route( create_event_content.room_version = new_version; create_event_content.predecessor = predecessor; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomCreate, - content: serde_json::to_value(create_event_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_id, - &replacement_room, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomCreate, + content: serde_json::to_value(create_event_content) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_id, + &replacement_room, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // Join the new room - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(member::MemberEventContent { - membership: member::MembershipState::Join, - displayname: db.users.displayname(&sender_id)?, - avatar_url: db.users.avatar_url(&sender_id)?, - is_direct: None, - third_party_invite: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_id.to_string()), - redacts: None, - }, - sender_id, - &replacement_room, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(member::MemberEventContent { + membership: member::MembershipState::Join, + displayname: db.users.displayname(&sender_id)?, + avatar_url: db.users.avatar_url(&sender_id)?, + is_direct: None, + third_party_invite: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_id.to_string()), + redacts: None, + }, + sender_id, + &replacement_room, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; // Recommended transferable state events list from the specs let transferable_state_events = vec![ @@ -463,19 +507,22 @@ pub async fn upgrade_room_route( None => continue, // Skipping missing events. }; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type, - content: event_content, - unsigned: None, - state_key: Some("".to_owned()), - redacts: None, - }, - sender_id, - &replacement_room, - &db.globals, - &db.account_data, - ).await?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type, + content: event_content, + unsigned: None, + state_key: Some("".to_owned()), + redacts: None, + }, + sender_id, + &replacement_room, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; } // Moves any local aliases to the new room @@ -505,7 +552,8 @@ pub async fn upgrade_room_route( power_levels_event_content.invite = new_level; // Modify the power levels in the old room to prevent sending of events and inviting new users - let _ = db.rooms + let _ = db + .rooms .build_and_append_pdu( PduBuilder { event_type: EventType::RoomPowerLevels, @@ -518,8 +566,10 @@ pub async fn upgrade_room_route( sender_id, &body.room_id, &db.globals, + &db.sending, &db.account_data, - ).await; + ) + .await; // Return the replacement room id Ok(upgrade_room::Response { replacement_room }.into()) diff --git a/src/client_server/state.rs b/src/client_server/state.rs index e9d20e2f..46182a12 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -33,17 +33,18 @@ pub async fn send_state_event_for_key_route( ) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?; - Ok( - send_state_event_for_key::Response::new(send_state_event_for_key_helper( + Ok(send_state_event_for_key::Response::new( + send_state_event_for_key_helper( &db, sender_id, &body.content, content, &body.room_id, Some(body.state_key.to_owned()), - ).await?) - .into(), + ) + .await?, ) + .into()) } #[cfg_attr( @@ -70,8 +71,8 @@ pub async fn send_state_event_for_empty_key_route( ) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?; - Ok( - send_state_event_for_empty_key::Response::new(send_state_event_for_key_helper( + Ok(send_state_event_for_empty_key::Response::new( + send_state_event_for_key_helper( &db, sender_id .as_ref() @@ -80,9 +81,10 @@ pub async fn send_state_event_for_empty_key_route( json, &body.room_id, Some("".into()), - ).await?) - .into(), + ) + .await?, ) + .into()) } #[cfg_attr( @@ -211,19 +213,23 @@ pub async fn send_state_event_for_key_helper( } } - let event_id = db.rooms.build_and_append_pdu( - PduBuilder { - event_type: content.event_type().into(), - content: json, - unsigned: None, - state_key, - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - ).await?; + let event_id = db + .rooms + .build_and_append_pdu( + PduBuilder { + event_type: content.event_type().into(), + content: json, + unsigned: None, + state_key, + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + ) + .await?; Ok(event_id) } diff --git a/src/database.rs b/src/database.rs index e1a356c7..4b2cba10 100644 --- a/src/database.rs +++ b/src/database.rs @@ -3,6 +3,7 @@ pub mod globals; pub mod key_backups; pub mod media; pub mod rooms; +pub mod sending; pub mod transaction_ids; pub mod uiaa; pub mod users; @@ -25,6 +26,7 @@ pub struct Database { pub media: media::Media, pub key_backups: key_backups::KeyBackups, pub transaction_ids: transaction_ids::TransactionIds, + pub sending: sending::Sending, pub _db: sled::Db, } @@ -135,6 +137,9 @@ impl Database { transaction_ids: transaction_ids::TransactionIds { userdevicetxnid_response: db.open_tree("userdevicetxnid_response")?, }, + sending: sending::Sending { + serverpduids: db.open_tree("serverpduids")?, + }, _db: db, }) } diff --git a/src/database/globals.rs b/src/database/globals.rs index 5db28069..89514251 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -1,12 +1,13 @@ use crate::{utils, Error, Result}; use ruma::ServerName; -use std::convert::TryInto; +use std::{convert::TryInto, sync::Arc}; pub const COUNTER: &str = "c"; +#[derive(Clone)] pub struct Globals { pub(super) globals: sled::Tree, - keypair: ruma::signatures::Ed25519KeyPair, + keypair: Arc, reqwest_client: reqwest::Client, server_name: Box, max_request_size: u32, @@ -16,13 +17,15 @@ pub struct Globals { impl Globals { pub fn load(globals: sled::Tree, config: &rocket::Config) -> Result { - let keypair = ruma::signatures::Ed25519KeyPair::new( - &*globals - .update_and_fetch("keypair", utils::generate_keypair)? - .expect("utils::generate_keypair always returns Some"), - "key1".to_owned(), - ) - .map_err(|_| Error::bad_database("Private or public keys are invalid."))?; + let keypair = Arc::new( + ruma::signatures::Ed25519KeyPair::new( + &*globals + .update_and_fetch("keypair", utils::generate_keypair)? + .expect("utils::generate_keypair always returns Some"), + "key1".to_owned(), + ) + .map_err(|_| Error::bad_database("Private or public keys are invalid."))?, + ); Ok(Self { globals, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 3c3a0b27..2246a61f 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1,14 +1,12 @@ mod edus; pub use edus::RoomEdus; -use rocket::futures; -use crate::{pdu::PduBuilder, server_server, utils, Error, PduEvent, Result}; -use log::{error, warn}; +use crate::{pdu::PduBuilder, utils, Error, PduEvent, Result}; +use log::error; use ring::digest; use ruma::{ api::client::error::ErrorKind, - api::federation, events::{ ignored_user_list, room::{ @@ -27,7 +25,6 @@ use std::{ convert::{TryFrom, TryInto}, mem, sync::Arc, - time::SystemTime, }; /// The unique identifier of each state group. @@ -36,6 +33,7 @@ use std::{ /// hashing the entire state. pub type StateHashId = Vec; +#[derive(Clone)] pub struct Rooms { pub edus: edus::RoomEdus, pub(super) pduid_pdu: sled::Tree, // PduId = RoomId + Count @@ -415,6 +413,16 @@ impl Rooms { }) } + /// Returns the pdu. + pub fn get_pdu_json_from_id(&self, pdu_id: &IVec) -> Result> { + self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { + Ok(Some( + serde_json::from_slice(&pdu) + .map_err(|_| Error::bad_database("Invalid PDU in db."))?, + )) + }) + } + /// Removes a pdu and creates a new one with the same id. fn replace_pdu(&self, pdu_id: &IVec, pdu: &PduEvent) -> Result<()> { if self.pduid_pdu.get(&pdu_id)?.is_some() { @@ -613,6 +621,7 @@ impl Rooms { sender: &UserId, room_id: &RoomId, globals: &super::globals::Globals, + sending: &super::sending::Sending, account_data: &super::account_data::AccountData, ) -> Result { let PduBuilder { @@ -829,39 +838,12 @@ impl Rooms { self.append_to_state(&pdu_id, &pdu)?; } - pdu_json - .as_object_mut() - .expect("json is object") - .remove("event_id"); - - let raw_json = - serde_json::from_value::>(pdu_json).expect("Raw::from_value always works"); - - let pdus = &[raw_json]; - let transaction_id = utils::random_string(16); - - for result in futures::future::join_all( - self.room_servers(room_id) - .filter_map(|r| r.ok()) - .filter(|server| &**server != globals.server_name()) - .map(|server| { - server_server::send_request( - &globals, - server, - federation::transactions::send_transaction_message::v1::Request { - origin: globals.server_name(), - pdus, - edus: &[], - origin_server_ts: SystemTime::now(), - transaction_id: &transaction_id, - }, - ) - }), - ) - .await { - if let Err(e) = result { - warn!("{}", e); - } + for server in self + .room_servers(room_id) + .filter_map(|r| r.ok()) + .filter(|server| &**server != globals.server_name()) + { + sending.send_pdu(server, &pdu_id)?; } Ok(pdu.event_id) diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index d60e1f16..a794c690 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -13,6 +13,7 @@ use std::{ convert::{TryFrom, TryInto}, }; +#[derive(Clone)] pub struct RoomEdus { pub(in super::super) readreceiptid_readreceipt: sled::Tree, // ReadReceiptId = RoomId + Count + UserId pub(in super::super) roomuserid_privateread: sled::Tree, // RoomUserId = Room + User, PrivateRead = Count diff --git a/src/database/sending.rs b/src/database/sending.rs new file mode 100644 index 00000000..187fd575 --- /dev/null +++ b/src/database/sending.rs @@ -0,0 +1,83 @@ +use std::{convert::TryFrom, time::SystemTime}; + +use crate::{server_server, utils, Error, Result}; +use rocket::futures::stream::{FuturesUnordered, StreamExt}; +use ruma::{api::federation, Raw, ServerName}; +use tokio::select; + +pub struct Sending { + /// The state for a given state hash. + pub(super) serverpduids: sled::Tree, // ServerPduId = ServerName + PduId +} + +impl Sending { + pub fn start_handler(&self, globals: &super::globals::Globals, rooms: &super::rooms::Rooms) { + let serverpduids = self.serverpduids.clone(); + let rooms = rooms.clone(); + let globals = globals.clone(); + + tokio::spawn(async move { + let mut futures = FuturesUnordered::new(); + let mut subscriber = serverpduids.watch_prefix(b""); + loop { + select! { + Some(_) = futures.next() => {}, + Some(event) = &mut subscriber => { + let serverpduid = if let sled::Event::Insert {key, ..} = event { + key + } else + { return Err::<(), Error>(Error::bad_database("")); }; + let mut parts = serverpduid.splitn(2, |&b| b == 0xff); + let server = Box::::try_from( + utils::string_from_bytes(parts.next().expect("splitn will always return 1 or more elements")) + .map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid."))? + ).map_err(|_| Error::bad_database("ServerName in serverpduid is invalid."))?; + + let pdu_id = parts.next().ok_or_else(|| Error::bad_database("Invalid serverpduid in db."))?; + let mut pdu_json = rooms.get_pdu_json_from_id(&pdu_id.into())?.ok_or_else(|| Error::bad_database("Event in serverpduids not found in db."))?; + + pdu_json + .as_object_mut() + .expect("json is object") + .remove("event_id"); + + let raw_json = + serde_json::from_value::>(pdu_json).expect("Raw::from_value always works"); + + let globals = &globals; + + futures.push( + async move { + let pdus = vec![raw_json]; + let transaction_id = utils::random_string(16); + + server_server::send_request( + &globals, + server, + federation::transactions::send_transaction_message::v1::Request { + origin: globals.server_name(), + pdus: &pdus, + edus: &[], + origin_server_ts: SystemTime::now(), + transaction_id: &transaction_id, + }, + ).await + } + ); + }, + } + } + }); + } + /* + */ + + pub fn send_pdu(&self, server: Box, pdu_id: &[u8]) -> Result<()> { + let mut key = server.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(pdu_id); + self.serverpduids.insert(key, b"")?; + + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index eb060e3e..2817ab97 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,6 +130,8 @@ fn setup_rocket() -> rocket::Rocket { .attach(AdHoc::on_attach("Config", |mut rocket| async { let data = Database::load_or_create(rocket.config().await).expect("valid config"); + data.sending.start_handler(&data.globals, &data.rooms); + Ok(rocket.manage(data)) })) } diff --git a/src/pdu.rs b/src/pdu.rs index c9042306..6d78092f 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -1,12 +1,13 @@ use crate::{Error, Result}; use js_int::UInt; use ruma::{ + events::pdu::PduStub, events::{ pdu::EventHash, room::member::MemberEventContent, AnyEvent, AnyRoomEvent, AnyStateEvent, AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, StateEvent, }, EventId, Raw, RoomId, ServerKeyId, ServerName, UserId, -events::pdu::PduStub}; +}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::{collections::BTreeMap, convert::TryInto, sync::Arc, time::UNIX_EPOCH};