fix: stack overflows when fetching auth events

This commit is contained in:
Timo Kösters 2021-12-16 14:52:19 +01:00
parent a30b588ede
commit c9c9974641
No known key found for this signature in database
GPG Key ID: 356E705610F626D5
2 changed files with 94 additions and 81 deletions

View File

@ -22,14 +22,20 @@ impl DatabaseEngine for Engine {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Arc<Self>> {
let mut db_opts = rocksdb::Options::default(); let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true); db_opts.create_if_missing(true);
db_opts.set_max_open_files(16); db_opts.set_max_open_files(512);
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
db_opts.set_target_file_size_base(256 << 20); db_opts.set_target_file_size_base(2 << 22);
db_opts.set_write_buffer_size(256 << 20); db_opts.set_max_bytes_for_level_base(2 << 24);
db_opts.set_max_bytes_for_level_multiplier(2.0);
db_opts.set_num_levels(8);
db_opts.set_write_buffer_size(2 << 27);
let rocksdb_cache = rocksdb::Cache::new_lru_cache((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize).unwrap();
let mut block_based_options = rocksdb::BlockBasedOptions::default(); let mut block_based_options = rocksdb::BlockBasedOptions::default();
block_based_options.set_block_size(512 << 10); block_based_options.set_block_size(2 << 19);
block_based_options.set_block_cache(&rocksdb_cache);
db_opts.set_block_based_table_factory(&block_based_options); db_opts.set_block_based_table_factory(&block_based_options);
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf( let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
@ -45,7 +51,6 @@ impl DatabaseEngine for Engine {
let mut options = rocksdb::Options::default(); let mut options = rocksdb::Options::default();
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
options.set_prefix_extractor(prefix_extractor); options.set_prefix_extractor(prefix_extractor);
options.set_merge_operator_associative("increment", utils::increment_rocksdb);
rocksdb::ColumnFamilyDescriptor::new(name, options) rocksdb::ColumnFamilyDescriptor::new(name, options)
}), }),
@ -63,7 +68,6 @@ impl DatabaseEngine for Engine {
let mut options = rocksdb::Options::default(); let mut options = rocksdb::Options::default();
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
options.set_prefix_extractor(prefix_extractor); options.set_prefix_extractor(prefix_extractor);
options.set_merge_operator_associative("increment", utils::increment_rocksdb);
let _ = self.rocks.create_cf(name, &options); let _ = self.rocks.create_cf(name, &options);
println!("created cf"); println!("created cf");

View File

@ -1392,12 +1392,11 @@ async fn upgrade_outlier_to_timeline_pdu(
let mut starting_events = Vec::with_capacity(leaf_state.len()); let mut starting_events = Vec::with_capacity(leaf_state.len());
for (k, id) in leaf_state { for (k, id) in leaf_state {
let k = db if let Ok(k) = db.rooms.get_statekey_from_short(k) {
.rooms
.get_statekey_from_short(k)
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
state.insert(k, id.clone()); state.insert(k, id.clone());
} else {
warn!("Failed to get_statekey_from_short.");
}
starting_events.push(id); starting_events.push(id);
} }
@ -1755,11 +1754,16 @@ async fn upgrade_outlier_to_timeline_pdu(
.into_iter() .into_iter()
.map(|map| { .map(|map| {
map.into_iter() map.into_iter()
.map(|(k, id)| db.rooms.get_statekey_from_short(k).map(|k| (k, id))) .filter_map(|(k, id)| {
.collect::<Result<StateMap<_>>>() db.rooms
.get_statekey_from_short(k)
.map(|k| (k, id))
.map_err(|e| warn!("Failed to get_statekey_from_short: {}", e))
.ok()
}) })
.collect::<Result<_>>() .collect::<StateMap<_>>()
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; })
.collect();
let state = match state_res::resolve( let state = match state_res::resolve(
room_version_id, room_version_id,
@ -1871,45 +1875,62 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
// a. Look in the main timeline (pduid_pdu tree) // a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree // b. Look at outlier pdu tree
// (get_pdu_json checks both) // (get_pdu_json checks both)
let local_pdu = db.rooms.get_pdu(id); if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) {
let pdu = match local_pdu {
Ok(Some(pdu)) => {
trace!("Found {} in db", id); trace!("Found {} in db", id);
(pdu, None) pdus.push((local_pdu, None));
} }
Ok(None) => {
// c. Ask origin server over federation // c. Ask origin server over federation
warn!("Fetching {} over federation.", id); // We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events = vec![id];
let mut events_in_reverse_order = Vec::new();
while let Some(next_id) = todo_auth_events.pop() {
if let Ok(Some(_)) = db.rooms.get_pdu(next_id) {
trace!("Found {} in db", id);
continue;
}
warn!("Fetching {} over federation.", next_id);
match db match db
.sending .sending
.send_federation_request( .send_federation_request(
&db.globals, &db.globals,
origin, origin,
get_event::v1::Request { event_id: id }, get_event::v1::Request { event_id: next_id },
) )
.await .await
{ {
Ok(res) => { Ok(res) => {
warn!("Got {} over federation", id); warn!("Got {} over federation", next_id);
let (calculated_event_id, value) = let (calculated_event_id, value) =
match crate::pdu::gen_event_id_canonical_json(&res.pdu) { match crate::pdu::gen_event_id_canonical_json(&res.pdu) {
Ok(t) => t, Ok(t) => t,
Err(_) => { Err(_) => {
back_off((**id).to_owned()); back_off((**next_id).to_owned());
continue; continue;
} }
}; };
if calculated_event_id != **id { if calculated_event_id != **next_id {
warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}",
id, calculated_event_id, &res.pdu); next_id, calculated_event_id, &res.pdu);
} }
// This will also fetch the auth chain events_in_reverse_order.push((next_id, value));
}
Err(_) => {
warn!("Failed to fetch event: {}", next_id);
back_off((**next_id).to_owned());
}
}
}
while let Some((next_id, value)) = events_in_reverse_order.pop() {
match handle_outlier_pdu( match handle_outlier_pdu(
origin, origin,
create_event, create_event,
id, next_id,
room_id, room_id,
value.clone(), value.clone(),
db, db,
@ -1917,27 +1938,15 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
) )
.await .await
{ {
Ok((pdu, json)) => (pdu, Some(json)), Ok((pdu, json)) => {
Err(e) => { pdus.push((pdu, Some(json)));
warn!("Authentication of event {} failed: {:?}", id, e);
back_off((**id).to_owned());
continue;
}
}
}
Err(_) => {
warn!("Failed to fetch event: {}", id);
back_off((**id).to_owned());
continue;
}
}
} }
Err(e) => { Err(e) => {
warn!("Error loading {}: {}", id, e); warn!("Authentication of event {} failed: {:?}", next_id, e);
continue; back_off((**next_id).to_owned());
}
}
} }
};
pdus.push(pdu);
} }
pdus pdus
}) })