diff --git a/Cargo.lock b/Cargo.lock index 4cce6668..5f6b21c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1547,6 +1547,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "ruma-api", "ruma-appservice-api", @@ -1562,6 +1563,7 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "http", "percent-encoding", @@ -1576,6 +1578,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1586,6 +1589,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "ruma-api", "ruma-common", @@ -1598,6 +1602,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "assign", "http", @@ -1616,6 +1621,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.2.0" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "js_int", "ruma-api", @@ -1629,6 +1635,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "js_int", "ruma-common", @@ -1643,6 +1650,7 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1653,6 +1661,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.0.3" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "js_int", "ruma-api", @@ -1667,6 +1676,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.17.4" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "rand", "ruma-identifiers-macros", @@ -1678,6 +1688,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.17.4" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "proc-macro2", "quote", @@ -1688,6 +1699,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.1.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "serde", "strum", @@ -1696,6 +1708,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.2.3" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "form_urlencoded", "itoa", @@ -1707,6 +1720,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0-dev.1" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" dependencies = [ "base64", "ring", @@ -1956,6 +1970,7 @@ checksum = "7345c971d1ef21ffdbd103a75990a15eb03604fc8b8852ca8cb418ee1a099028" [[package]] name = "state-res" version = "0.1.0" +source = "git+https://github.com/timokoesters/state-res?branch=spec-comp#d11a3feb5307715ab5d86af8f25d4bccfee6264b" dependencies = [ "itertools", "js_int", diff --git a/src/database/sending.rs b/src/database/sending.rs index a3f15742..d3c7fc6a 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -1,8 +1,11 @@ -use std::{convert::TryFrom, time::SystemTime}; +use std::{collections::HashSet, convert::TryFrom, time::SystemTime}; use crate::{server_server, utils, Error, Result}; +use federation::transactions::send_transaction_message; +use log::warn; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{api::federation, Raw, ServerName}; +use sled::IVec; use tokio::select; pub struct Sending { @@ -18,66 +21,49 @@ impl Sending { tokio::spawn(async move { let mut futures = FuturesUnordered::new(); + let mut waiting_servers = HashSet::new(); + let mut subscriber = serverpduids.watch_prefix(b""); loop { select! { - Some(_) = futures.next() => {}, - Some(event) = &mut subscriber => { - let serverpduid = if let sled::Event::Insert {key, ..} = event { - key - } else - { return Err::<(), Error>(Error::bad_database("")); }; - let mut parts = serverpduid.splitn(2, |&b| b == 0xff); - let server = Box::::try_from( - utils::string_from_bytes(parts.next().expect("splitn will always return 1 or more elements")) - .map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid."))? - ).map_err(|_| Error::bad_database("ServerName in serverpduid is invalid."))?; - - let pdu_id = parts.next().ok_or_else(|| Error::bad_database("Invalid serverpduid in db."))?; - let mut pdu_json = rooms.get_pdu_json_from_id(&pdu_id.into())?.ok_or_else(|| Error::bad_database("Event in serverpduids not found in db."))?; - - if let Some(unsigned) = pdu_json - .as_object_mut() - .expect("json is object") - .get_mut("unsigned") { - unsigned.as_object_mut().expect("unsigned is object").remove("transaction_id"); - } - - pdu_json - .as_object_mut() - .expect("json is object") - .remove("event_id"); - - let raw_json = - serde_json::from_value::>(pdu_json).expect("Raw::from_value always works"); - - let globals = &globals; - - futures.push( - async move { - let pdus = vec![raw_json]; - let transaction_id = utils::random_string(16); - - server_server::send_request( - &globals, - server, - federation::transactions::send_transaction_message::v1::Request { - origin: globals.server_name(), - pdus: &pdus, - edus: &[], - origin_server_ts: SystemTime::now(), - transaction_id: &transaction_id, - }, - ).await + Some(server) = futures.next() => { + warn!("response: {:?}", &server); + match server { + Ok((server, _response)) => { + waiting_servers.remove(&server) } - ); + Err((server, _e)) => { + waiting_servers.remove(&server) + } + }; }, + Some(event) = &mut subscriber => { + if let sled::Event::Insert { key, .. } = event { + let serverpduid = key.clone(); + let mut parts = serverpduid.splitn(2, |&b| b == 0xff); + + if let Some((server, pdu_id)) = utils::string_from_bytes( + parts + .next() + .expect("splitn will always return 1 or more elements"), + ) + .map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid.")) + .and_then(|server_str|Box::::try_from(server_str) + .map_err(|_| Error::bad_database("ServerName in serverpduid is invalid."))) + .ok() + .filter(|server| waiting_servers.insert(server.clone())) + .and_then(|server| parts + .next() + .ok_or_else(|| Error::bad_database("Invalid serverpduid in db.")).ok().map(|pdu_id| (server, pdu_id))) + { + futures.push(Self::handle_event(server, pdu_id.into(), &globals, &rooms)); + } + } + } } } }); } - /* - */ pub fn send_pdu(&self, server: Box, pdu_id: &[u8]) -> Result<()> { let mut key = server.as_bytes().to_vec(); @@ -87,4 +73,63 @@ impl Sending { Ok(()) } + + async fn handle_event( + server: Box, + pdu_id: IVec, + globals: &super::globals::Globals, + rooms: &super::rooms::Rooms, + ) -> std::result::Result< + (Box, send_transaction_message::v1::Response), + (Box, Error), + > { + let mut pdu_json = rooms + .get_pdu_json_from_id(&pdu_id) + .map_err(|e| (server.clone(), e))? + .ok_or_else(|| { + ( + server.clone(), + Error::bad_database("Event in serverpduids not found in db."), + ) + })?; + + if let Some(unsigned) = pdu_json + .as_object_mut() + .expect("json is object") + .get_mut("unsigned") + { + unsigned + .as_object_mut() + .expect("unsigned is object") + .remove("transaction_id"); + } + + pdu_json + .as_object_mut() + .expect("json is object") + .remove("event_id"); + + let raw_json = + serde_json::from_value::>(pdu_json).expect("Raw::from_value always works"); + + let globals = &globals; + + let pdus = vec![raw_json]; + let transaction_id = utils::random_string(16); + + server_server::send_request( + &globals, + server.clone(), + send_transaction_message::v1::Request { + origin: globals.server_name(), + pdus: &pdus, + edus: &[], + origin_server_ts: SystemTime::now(), + transaction_id: &transaction_id, + }, + ) + .await + .map(|response| (server.clone(), response)) + .map_err(|e| (server, e)) + } } diff --git a/src/server_server.rs b/src/server_server.rs index 3ebbeac4..d67b0b60 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -161,6 +161,7 @@ where *reqwest_request.timeout_mut() = Some(Duration::from_secs(30)); + let url = reqwest_request.url().clone(); let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; // Because reqwest::Response -> http::Response is complicated: @@ -189,7 +190,10 @@ where .expect("reqwest body is valid http body"), ); response.map_err(|e| { - warn!("Server returned bad response: {:?}", e); + warn!( + "Server returned bad response {} ({}): {:?}", + destination, url, e + ); Error::BadServerResponse("Server returned bad response.") }) }