conduit/src/database/pusher.rs

328 lines
9.9 KiB
Rust
Raw Normal View History

use crate::{Database, Error, PduEvent, Result};
use log::{error, info, warn};
use ruma::{
api::{
client::r0::push::{Pusher, PusherKind},
push_gateway::send_event_notification::{
self,
v1::{Device, Notification, NotificationCounts, NotificationPriority},
},
2021-04-13 14:00:45 +01:00
IncomingResponse, OutgoingRequest,
},
events::{room::power_levels::PowerLevelsEventContent, EventType},
push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak},
uint, UInt, UserId,
};
use sled::IVec;
2021-03-18 17:33:43 +00:00
use std::{convert::TryFrom, fmt::Debug};
#[derive(Debug, Clone)]
pub struct PushData {
/// UserId + pushkey -> Pusher
pub(super) senderkey_pusher: sled::Tree,
}
impl PushData {
pub fn new(db: &sled::Db) -> Result<Self> {
Ok(Self {
senderkey_pusher: db.open_tree("senderkey_pusher")?,
})
}
pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> {
let mut key = sender.as_bytes().to_vec();
2021-02-11 12:16:14 +00:00
key.push(0xff);
key.extend_from_slice(pusher.pushkey.as_bytes());
// There are 2 kinds of pushers but the spec says: null deletes the pusher.
if pusher.kind.is_none() {
return self
.senderkey_pusher
.remove(key)
.map(|_| ())
.map_err(Into::into);
}
self.senderkey_pusher.insert(
2021-03-16 17:00:26 +00:00
key,
&*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"),
)?;
Ok(())
}
pub fn get_pusher(&self, senderkey: &[u8]) -> Result<Option<Pusher>> {
self.senderkey_pusher
.get(senderkey)?
.map(|push| {
Ok(serde_json::from_slice(&*push)
.map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
})
.transpose()
}
pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> {
2021-02-11 12:16:14 +00:00
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);
self.senderkey_pusher
2021-03-16 17:00:26 +00:00
.scan_prefix(prefix)
.values()
2021-02-11 12:16:14 +00:00
.map(|push| {
2021-03-16 17:00:26 +00:00
let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?;
Ok(serde_json::from_slice(&*push)
.map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
})
.collect()
}
pub fn get_pusher_senderkeys(&self, sender: &UserId) -> impl Iterator<Item = Result<IVec>> {
let mut prefix = sender.as_bytes().to_vec();
prefix.push(0xff);
self.senderkey_pusher
.scan_prefix(prefix)
.keys()
.map(|r| Ok(r?))
}
}
pub async fn send_request<T: OutgoingRequest>(
globals: &crate::database::globals::Globals,
destination: &str,
request: T,
) -> Result<T::IncomingResponse>
where
T: Debug,
{
let destination = destination.replace("/_matrix/push/v1/notify", "");
let http_request = request
.try_into_http_request(&destination, Some(""))
.map_err(|e| {
warn!("Failed to find destination {}: {}", destination, e);
Error::BadServerResponse("Invalid destination")
})?;
2021-02-11 12:16:14 +00:00
let reqwest_request = reqwest::Request::try_from(http_request)
.expect("all http requests are valid reqwest requests");
// TODO: we could keep this very short and let expo backoff do it's thing...
2021-02-11 12:16:14 +00:00
//*reqwest_request.timeout_mut() = Some(Duration::from_secs(5));
let url = reqwest_request.url().clone();
2021-03-16 17:00:26 +00:00
let reqwest_response = globals.reqwest_client().execute(reqwest_request).await;
// Because reqwest::Response -> http::Response is complicated:
match reqwest_response {
Ok(mut reqwest_response) => {
let status = reqwest_response.status();
let mut http_response = http::Response::builder().status(status);
let headers = http_response.headers_mut().unwrap();
for (k, v) in reqwest_response.headers_mut().drain() {
if let Some(key) = k {
headers.insert(key, v);
}
}
let status = reqwest_response.status();
2021-04-13 14:00:45 +01:00
let body = reqwest_response.bytes().await.unwrap_or_else(|e| {
warn!("server error {}", e);
Vec::new().into()
}); // TODO: handle timeout
if status != 200 {
info!(
"Push gateway returned bad response {} {}\n{}\n{:?}",
destination,
status,
url,
crate::utils::string_from_bytes(&body)
);
}
2021-04-13 14:00:45 +01:00
let response = T::IncomingResponse::try_from_http_response(
http_response
.body(body)
.expect("reqwest body is valid http body"),
);
response.map_err(|_| {
info!(
"Push gateway returned invalid response bytes {}\n{}",
destination, url
);
Error::BadServerResponse("Push gateway returned bad response.")
})
}
Err(e) => Err(e.into()),
}
}
pub async fn send_push_notice(
user: &UserId,
unread: UInt,
pusher: &Pusher,
ruleset: Ruleset,
pdu: &PduEvent,
db: &Database,
) -> Result<()> {
let mut notify = None;
let mut tweaks = Vec::new();
for action in get_actions(user, &ruleset, pdu, db)? {
let n = match action {
Action::DontNotify => false,
// TODO: Implement proper support for coalesce
Action::Notify | Action::Coalesce => true,
Action::SetTweak(tweak) => {
tweaks.push(tweak.clone());
continue;
}
};
if notify.is_some() {
return Err(Error::bad_database(
r#"Malformed pushrule contains more than one of these actions: ["dont_notify", "notify", "coalesce"]"#,
));
}
notify = Some(n);
}
2021-04-11 20:01:27 +01:00
if notify == Some(true) {
send_notice(unread, pusher, tweaks, pdu, db).await?;
}
// Else the event triggered no actions
2021-04-11 20:01:27 +01:00
Ok(())
}
pub fn get_actions<'a>(
user: &UserId,
ruleset: &'a Ruleset,
pdu: &PduEvent,
db: &Database,
) -> Result<impl 'a + Iterator<Item = Action>> {
let power_levels: PowerLevelsEventContent = db
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content)
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
.unwrap_or_default();
let ctx = PushConditionRoomCtx {
room_id: pdu.room_id.clone(),
member_count: (db.rooms.room_members(&pdu.room_id).count() as u32).into(),
user_display_name: db
.users
.displayname(&user)?
2021-04-14 09:43:31 +01:00
.unwrap_or_else(|| user.localpart().to_owned()),
users_power_levels: power_levels.users,
default_power_level: power_levels.users_default,
notification_power_levels: power_levels.notifications,
};
Ok(ruleset
.get_actions(&pdu.to_sync_room_event(), &ctx)
.map(Clone::clone))
}
async fn send_notice(
unread: UInt,
pusher: &Pusher,
tweaks: Vec<Tweak>,
event: &PduEvent,
db: &Database,
) -> Result<()> {
// TODO: email
2021-03-23 11:59:27 +00:00
if pusher.kind == Some(PusherKind::Email) {
return Ok(());
}
// TODO:
// Two problems with this
// 1. if "event_id_only" is the only format kind it seems we should never add more info
// 2. can pusher/devices have conflicting formats
let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
let url = if let Some(url) = pusher.data.url.as_ref() {
url
} else {
error!("Http Pusher must have URL specified.");
return Ok(());
};
let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone());
let mut data_minus_url = pusher.data.clone();
// The url must be stripped off according to spec
data_minus_url.url = None;
device.data = Some(data_minus_url);
// Tweaks are only added if the format is NOT event_id_only
if !event_id_only {
device.tweaks = tweaks.clone();
}
let d = &[device];
let mut notifi = Notification::new(d);
notifi.prio = NotificationPriority::Low;
notifi.event_id = Some(&event.event_id);
notifi.room_id = Some(&event.room_id);
// TODO: missed calls
notifi.counts = NotificationCounts::new(unread, uint!(0));
if event.kind == EventType::RoomEncrypted
|| tweaks
.iter()
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
{
notifi.prio = NotificationPriority::High
}
if event_id_only {
send_request(
&db.globals,
&url,
send_event_notification::v1::Request::new(notifi),
)
.await?;
} else {
notifi.sender = Some(&event.sender);
notifi.event_type = Some(&event.kind);
notifi.content = serde_json::value::to_raw_value(&event.content).ok();
if event.kind == EventType::RoomMember {
notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str());
}
let user_name = db.users.displayname(&event.sender)?;
notifi.sender_display_name = user_name.as_deref();
let room_name = db
.rooms
.room_state_get(&event.room_id, &EventType::RoomName, "")?
.map(|pdu| match pdu.content.get("name") {
Some(serde_json::Value::String(s)) => Some(s.to_string()),
_ => None,
})
.flatten();
notifi.room_name = room_name.as_deref();
send_request(
&db.globals,
&url,
send_event_notification::v1::Request::new(notifi),
)
.await?;
}
// TODO: email
Ok(())
}