From 4956fb9fbac289df09c92197ff5119913c896788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 21 Aug 2021 14:22:21 +0200 Subject: [PATCH] improvement: limit prev event fetching --- src/database.rs | 25 ++++++++++++++++++++----- src/server_server.rs | 13 ++++++++++++- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/database.rs b/src/database.rs index bfc33f26..23d3bdf9 100644 --- a/src/database.rs +++ b/src/database.rs @@ -636,7 +636,7 @@ impl Database { if db.globals.database_version()? < 9 { // Update tokenids db layout - let mut batch = db.rooms.tokenids.iter().filter_map(|(key, _)| { + let batch = db.rooms.tokenids.iter().filter_map(|(key, _)| { if !key.starts_with(b"!") { return None; } @@ -659,14 +659,29 @@ impl Database { println!("old {:?}", key); println!("new {:?}", new_key); Some((new_key, Vec::new())) - }); + }).collect::>(); - db.rooms.tokenids.insert_batch(&mut batch)?; + let mut iter = batch.into_iter().peekable(); - for (key, _) in db.rooms.tokenids.iter() { + while iter.peek().is_some() { + db.rooms.tokenids.insert_batch(&mut iter.by_ref().take(1000))?; + println!("smaller batch done"); + } + + println!("Deleting starts"); + + let batch2 = db.rooms.tokenids.iter().filter_map(|(key, _)| { if key.starts_with(b"!") { - db.rooms.tokenids.remove(&key)?; + println!("del {:?}", key); + Some(key) + } else { + None } + }).collect::>(); + + for key in batch2 { + println!("del"); + db.rooms.tokenids.remove(&key)?; } db.globals.bump_database_version(9)?; diff --git a/src/server_server.rs b/src/server_server.rs index 56b28f27..5b09872d 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -254,7 +254,7 @@ where }); // TODO: handle timeout if status != 200 { - info!( + warn!( "{} {}: {}", url, status, @@ -893,6 +893,9 @@ pub async fn handle_incoming_pdu<'a>( let mut graph = HashMap::new(); let mut eventid_info = HashMap::new(); let mut todo_outlier_stack = incoming_pdu.prev_events.clone(); + + let mut amount = 0; + while let Some(prev_event_id) = todo_outlier_stack.pop() { if let Some((pdu, json_opt)) = fetch_and_handle_outliers( db, @@ -905,6 +908,13 @@ pub async fn handle_incoming_pdu<'a>( .await .pop() { + if amount > 100 { + // Max limit reached + warn!("Max prev event limit reached!"); + graph.insert(prev_event_id.clone(), HashSet::new()); + continue + } + if let Some(json) = json_opt.or_else(|| db.rooms.get_outlier_pdu_json(&prev_event_id).ok().flatten()) { @@ -915,6 +925,7 @@ pub async fn handle_incoming_pdu<'a>( .expect("Room exists") .origin_server_ts { + amount += 1; for prev_prev in &pdu.prev_events { if !graph.contains_key(prev_prev) { todo_outlier_stack.push(dbg!(prev_prev.clone()));