implements notification debouncing

This commit is contained in:
Keagan McClelland
2021-10-05 17:40:31 -06:00
committed by Aiden McClelland
parent ac9b97ee28
commit d58a950762
3 changed files with 90 additions and 30 deletions

View File

@@ -27,6 +27,7 @@ use crate::hostname::{get_hostname, get_id};
use crate::manager::ManagerMap; use crate::manager::ManagerMap;
use crate::net::tor::os_key; use crate::net::tor::os_key;
use crate::net::NetController; use crate::net::NetController;
use crate::notifications::NotificationManager;
use crate::shutdown::Shutdown; use crate::shutdown::Shutdown;
use crate::system::launch_metrics_task; use crate::system::launch_metrics_task;
use crate::util::io::from_toml_async_reader; use crate::util::io::from_toml_async_reader;
@@ -126,6 +127,7 @@ pub struct RpcContextSeed {
pub logger: EmbassyLogger, pub logger: EmbassyLogger,
pub log_epoch: Arc<AtomicU64>, pub log_epoch: Arc<AtomicU64>,
pub tor_socks: SocketAddr, pub tor_socks: SocketAddr,
pub notification_manager: NotificationManager,
} }
#[derive(Clone)] #[derive(Clone)]
@@ -165,6 +167,7 @@ impl RpcContext {
.await?; .await?;
let managers = ManagerMap::default(); let managers = ManagerMap::default();
let metrics_cache = RwLock::new(None); let metrics_cache = RwLock::new(None);
let notification_manager = NotificationManager::new(secret_store.clone(), db.clone(), 3600);
let seed = Arc::new(RpcContextSeed { let seed = Arc::new(RpcContextSeed {
bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()), bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()),
@@ -186,6 +189,7 @@ impl RpcContext {
Ipv4Addr::new(127, 0, 0, 1), Ipv4Addr::new(127, 0, 0, 1),
9050, 9050,
))), ))),
notification_manager,
}); });
let metrics_seed = seed.clone(); let metrics_seed = seed.clone();
tokio::spawn(async move { tokio::spawn(async move {

View File

@@ -1,9 +1,10 @@
use std::collections::BTreeMap; use std::collections::{BTreeMap, HashMap};
use std::fmt; use std::fmt;
use std::str::FromStr; use std::str::FromStr;
use anyhow::anyhow; use anyhow::anyhow;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::lock::Mutex;
use patch_db::{PatchDb, Revision}; use patch_db::{PatchDb, Revision};
use rpc_toolkit::command; use rpc_toolkit::command;
use sqlx::SqlitePool; use sqlx::SqlitePool;
@@ -132,7 +133,7 @@ pub async fn delete_before(#[context] ctx: RpcContext, #[arg] before: u32) -> Re
Ok(()) Ok(())
} }
#[derive(serde::Serialize, serde::Deserialize)] #[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum NotificationLevel { pub enum NotificationLevel {
Success, Success,
Info, Info,
@@ -231,26 +232,43 @@ impl NotificationSubtype {
} }
} }
pub async fn notify( pub struct NotificationManager {
sqlite: &SqlitePool, sqlite: SqlitePool,
patchdb: &PatchDb, patchdb: PatchDb,
package_id: Option<PackageId>, cache: Mutex<HashMap<(Option<PackageId>, NotificationLevel, String), i64>>,
level: NotificationLevel, debounce_interval: u32,
title: String, }
message: String, impl NotificationManager {
subtype: NotificationSubtype, pub fn new(sqlite: SqlitePool, patchdb: PatchDb, debounce_interval: u32) -> Self {
) -> Result<(), Error> { NotificationManager {
let mut handle = patchdb.handle(); sqlite,
let mut count = crate::db::DatabaseModel::new() patchdb,
.server_info() cache: Mutex::new(HashMap::new()),
.unread_notification_count() debounce_interval,
.get_mut(&mut handle) }
.await?; }
let sql_package_id = package_id.map::<String, _>(|p| p.into()); pub async fn notify(
let sql_code = subtype.code(); &self,
let sql_level = format!("{}", level); package_id: Option<PackageId>,
let sql_data = format!("{}", subtype.to_json()); level: NotificationLevel,
sqlx::query!( title: String,
message: String,
subtype: NotificationSubtype,
) -> Result<(), Error> {
if !self.should_notify(&package_id, &level, &title).await {
return Ok(());
}
let mut handle = self.patchdb.handle();
let mut count = crate::db::DatabaseModel::new()
.server_info()
.unread_notification_count()
.get_mut(&mut handle)
.await?;
let sql_package_id = package_id.map::<String, _>(|p| p.into());
let sql_code = subtype.code();
let sql_level = format!("{}", level);
let sql_data = format!("{}", subtype.to_json());
sqlx::query!(
"INSERT INTO notifications (package_id, code, level, title, message, data) VALUES (?, ?, ?, ?, ?, ?)", "INSERT INTO notifications (package_id, code, level, title, message, data) VALUES (?, ?, ?, ?, ?, ?)",
sql_package_id, sql_package_id,
sql_code, sql_code,
@@ -258,10 +276,50 @@ pub async fn notify(
title, title,
message, message,
sql_data sql_data
).execute(sqlite).await?; ).execute(&self.sqlite).await?;
*count += 1; *count += 1;
count.save(&mut handle).await?; count.save(&mut handle).await?;
Ok(()) Ok(())
}
async fn gc(&self) {
let mut guard = self.cache.lock().await;
let mut temp = HashMap::new();
for (k, v) in (*guard).drain() {
if v + self.debounce_interval as i64 > Utc::now().timestamp() {
temp.insert(k, v);
}
}
*guard = temp
}
async fn should_notify(
&self,
package_id: &Option<PackageId>,
level: &NotificationLevel,
title: &String,
) -> bool {
if level == &NotificationLevel::Error {
return true;
}
self.gc();
let mut guard = self.cache.lock().await;
let k = (package_id.clone(), level.clone(), title.clone());
let v = (*guard).get(&k);
match v {
None => {
(*guard).insert(k, Utc::now().timestamp());
true
}
Some(t) => {
if t + self.debounce_interval as i64 > Utc::now().timestamp() {
false
} else {
// this path should be very rare due to gc above
(*guard).remove(&k);
true
}
}
}
}
} }
#[test] #[test]

View File

@@ -15,7 +15,7 @@ use crate::dependencies::{
break_transitive, DependencyError, DependencyErrors, TaggedDependencyError, break_transitive, DependencyError, DependencyErrors, TaggedDependencyError,
}; };
use crate::manager::{Manager, Status as ManagerStatus}; use crate::manager::{Manager, Status as ManagerStatus};
use crate::notifications::{notify, NotificationLevel, NotificationSubtype}; use crate::notifications::{NotificationLevel, NotificationSubtype};
use crate::s9pk::manifest::{Manifest, PackageId}; use crate::s9pk::manifest::{Manifest, PackageId};
use crate::status::health_check::HealthCheckResult; use crate::status::health_check::HealthCheckResult;
use crate::Error; use crate::Error;
@@ -335,9 +335,7 @@ impl MainStatus {
.map(|hc| hc.critical) .map(|hc| hc.critical)
.unwrap_or_default() => .unwrap_or_default() =>
{ {
notify( ctx.notification_manager.notify(
&ctx.secret_store,
&ctx.db,
Some(manifest.id.clone()), Some(manifest.id.clone()),
NotificationLevel::Error, NotificationLevel::Error,
String::from("Critical Health Check Failed"), String::from("Critical Health Check Failed"),