conduit/src/data.rs

282 lines
8.8 KiB
Rust
Raw Normal View History

2020-04-04 10:53:37 +01:00
use crate::{utils, Database, PduEvent};
use log::debug;
2020-04-05 22:06:43 +01:00
use ruma_events::{
room::message::{MessageEvent, MessageEventContent},
EventType,
};
use ruma_federation_api::RoomV3Pdu;
2020-03-29 23:10:15 +01:00
use ruma_identifiers::{EventId, RoomId, UserId};
2020-04-04 10:53:37 +01:00
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
};
2020-03-29 20:05:20 +01:00
pub struct Data {
hostname: String,
db: Database,
}
2020-03-28 17:50:02 +00:00
impl Data {
2020-03-29 20:05:20 +01:00
/// Load an existing database or create a new one.
pub fn load_or_create(hostname: &str) -> Self {
Self {
hostname: hostname.to_owned(),
db: Database::load_or_create(hostname),
}
2020-03-29 20:05:20 +01:00
}
/// Get the hostname of the server.
pub fn hostname(&self) -> &str {
&self.hostname
2020-03-29 20:05:20 +01:00
}
/// Check if a user has an account by looking for an assigned password.
2020-03-28 17:50:02 +00:00
pub fn user_exists(&self, user_id: &UserId) -> bool {
self.db
.userid_password
2020-03-28 17:50:02 +00:00
.contains_key(user_id.to_string())
.unwrap()
}
2020-03-29 20:05:20 +01:00
/// Create a new user account by assigning them a password.
pub fn user_add(&self, user_id: &UserId, password: Option<String>) {
self.db
.userid_password
2020-03-28 17:50:02 +00:00
.insert(user_id.to_string(), &*password.unwrap_or_default())
.unwrap();
}
2020-03-29 12:48:44 +01:00
2020-03-29 20:05:20 +01:00
/// Find out which user an access token belongs to.
pub fn user_from_token(&self, token: &str) -> Option<UserId> {
self.db
.token_userid
2020-03-29 20:05:20 +01:00
.get(token)
.unwrap()
.and_then(|bytes| (*utils::string_from_bytes(&bytes)).try_into().ok())
2020-03-29 20:05:20 +01:00
}
/// Checks if the given password is equal to the one in the database.
pub fn password_get(&self, user_id: &UserId) -> Option<String> {
self.db
.userid_password
2020-03-29 20:05:20 +01:00
.get(user_id.to_string())
.unwrap()
.map(|bytes| utils::string_from_bytes(&bytes))
2020-03-29 20:05:20 +01:00
}
/// Add a new device to a user.
pub fn device_add(&self, user_id: &UserId, device_id: &str) {
if self
.db
.userid_deviceids
.get_iter(&user_id.to_string().as_bytes())
.filter_map(|item| item.ok())
.map(|(_key, value)| value)
.all(|device| device != device_id)
{
self.db
.userid_deviceids
.add(user_id.to_string().as_bytes(), device_id.into());
}
2020-03-29 20:05:20 +01:00
}
/// Replace the access token of one device.
pub fn token_replace(&self, user_id: &UserId, device_id: &String, token: String) {
// Make sure the device id belongs to the user
debug_assert!(self
.db
.userid_deviceids
.get_iter(&user_id.to_string().as_bytes())
.filter_map(|item| item.ok())
.map(|(_key, value)| value)
.any(|device| device == device_id.as_bytes())); // Does the user have that device?
2020-03-29 20:05:20 +01:00
// Remove old token
if let Some(old_token) = self.db.deviceid_token.get(device_id).unwrap() {
self.db.token_userid.remove(old_token).unwrap();
// It will be removed from deviceid_token by the insert later
2020-03-29 20:05:20 +01:00
}
// Assign token to device_id
self.db.deviceid_token.insert(device_id, &*token).unwrap();
2020-03-29 20:05:20 +01:00
// Assign token to user
self.db
.token_userid
2020-03-29 20:05:20 +01:00
.insert(token, &*user_id.to_string())
.unwrap();
}
pub fn pdu_get(&self, event_id: &EventId) -> Option<RoomV3Pdu> {
self.db
.eventid_pduid
.get(event_id.to_string().as_bytes())
.unwrap()
.map(|pdu_id| {
serde_json::from_slice(
&self
.db
.pduid_pdus
.get(pdu_id)
.unwrap()
.expect("eventid_pduid in db is valid"),
)
.expect("pdu is valid")
})
}
2020-04-05 22:06:43 +01:00
pub fn pdu_leaves_get(&self, room_id: &RoomId) -> Vec<EventId> {
let event_ids = self
.db
.roomid_pduleaves
.get_iter(room_id.to_string().as_bytes())
.values()
.map(|pdu_id| {
EventId::try_from(&*utils::string_from_bytes(&pdu_id.unwrap()))
.expect("pdu leaves are valid event ids")
})
.collect();
2020-04-05 22:06:43 +01:00
event_ids
}
pub fn pdu_leaves_replace(&self, room_id: &RoomId, event_id: &EventId) {
self.db
.roomid_pduleaves
.clear(room_id.to_string().as_bytes());
self.db.roomid_pduleaves.add(
&room_id.to_string().as_bytes(),
(*event_id.to_string()).into(),
);
}
/// Add a persisted data unit from this homeserver
2020-04-05 22:06:43 +01:00
pub fn pdu_append(
&self,
room_id: RoomId,
sender: UserId,
event_type: EventType,
content: MessageEventContent,
) -> EventId {
// prev_events are the leaves of the current graph. This method removes all leaves from the
// room and replaces them with our event
2020-04-05 22:06:43 +01:00
// TODO: Make sure this isn't called twice in parallel
let prev_events = self.pdu_leaves_get(&room_id);
// Our depth is the maximum depth of prev_events + 1
let depth = prev_events
.iter()
.map(|event_id| {
self.pdu_get(event_id)
.expect("pdu in prev_events is valid")
.depth
.into()
})
.max()
.unwrap_or(0_u64)
+ 1;
2020-04-05 22:06:43 +01:00
let mut pdu = PduEvent {
event_id: EventId::try_from("$thiswillbefilledinlater").unwrap(),
2020-04-04 10:53:37 +01:00
room_id: room_id.clone(),
2020-04-05 22:06:43 +01:00
sender: sender.clone(),
2020-04-04 10:53:37 +01:00
origin: self.hostname.clone(),
2020-04-05 22:06:43 +01:00
origin_server_ts: utils::millis_since_unix_epoch(),
kind: event_type,
content: serde_json::to_value(content).expect("message content is valid json"),
2020-04-04 10:53:37 +01:00
state_key: None,
prev_events,
depth: depth.try_into().unwrap(),
auth_events: Vec::new(),
redacts: None,
2020-04-05 22:06:43 +01:00
unsigned: Default::default(), // TODO
2020-04-04 10:53:37 +01:00
hashes: ruma_federation_api::EventHash {
sha256: "aaa".to_owned(),
},
signatures: HashMap::new(),
};
2020-04-05 22:06:43 +01:00
// Generate event id
pdu.event_id = EventId::try_from(&*format!(
"${}",
ruma_signatures::reference_hash(&serde_json::to_value(&pdu).unwrap())
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are correct");
self.pdu_leaves_replace(&room_id, &pdu.event_id);
// The new value will need a new index. We store the last used index in 'n' + id
let mut count_key: Vec<u8> = vec![b'n'];
count_key.extend_from_slice(&room_id.to_string().as_bytes());
// Increment the last index and use that
let index = utils::u64_from_bytes(
&self
.db
.pduid_pdus
.update_and_fetch(&count_key, utils::increment)
.unwrap()
.unwrap(),
);
let mut pdu_id = vec![b'd'];
pdu_id.extend_from_slice(room_id.to_string().as_bytes());
pdu_id.push(b'#'); // Add delimiter so we don't find rooms starting with the same id
pdu_id.extend_from_slice(index.to_string().as_bytes());
self.db
.pduid_pdus
2020-04-04 10:53:37 +01:00
.insert(&pdu_id, &*serde_json::to_string(&pdu).unwrap())
.unwrap();
self.db
.eventid_pduid
2020-04-05 22:06:43 +01:00
.insert(pdu.event_id.to_string(), pdu_id.clone())
.unwrap();
2020-04-05 22:06:43 +01:00
pdu.event_id
}
/// Returns a vector of all PDUs.
2020-04-04 10:53:37 +01:00
pub fn pdus_all(&self) -> Vec<PduEvent> {
self.pdus_since(
self.db
.eventid_pduid
.iter()
.values()
.next()
.unwrap()
.map(|key| utils::string_from_bytes(&key))
.expect("there should be at least one pdu"),
)
}
/// Returns a vector of all events that happened after the event with id `since`.
2020-04-04 10:53:37 +01:00
pub fn pdus_since(&self, since: String) -> Vec<PduEvent> {
let mut pdus = Vec::new();
if let Some(room_id) = since.rsplitn(2, '#').nth(1) {
let mut current = since.clone();
while let Some((key, value)) = self.db.pduid_pdus.get_gt(current).unwrap() {
if key.starts_with(&room_id.to_string().as_bytes()) {
current = utils::string_from_bytes(&key);
} else {
break;
}
pdus.push(serde_json::from_slice(&value).expect("pdu is valid"));
}
} else {
debug!("event at `since` not found");
}
pdus
}
pub fn debug(&self) {
self.db.debug();
2020-03-29 20:05:20 +01:00
}
2020-03-28 17:50:02 +00:00
}