From d58a950762a373611c670e2c4e8c9ab4a006708d Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Tue, 5 Oct 2021 17:40:31 -0600 Subject: [PATCH] implements notification debouncing --- appmgr/src/context/rpc.rs | 4 ++ appmgr/src/notifications.rs | 110 +++++++++++++++++++++++++++--------- appmgr/src/status/mod.rs | 6 +- 3 files changed, 90 insertions(+), 30 deletions(-) diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 9b7c39126..48d650c5a 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -27,6 +27,7 @@ use crate::hostname::{get_hostname, get_id}; use crate::manager::ManagerMap; use crate::net::tor::os_key; use crate::net::NetController; +use crate::notifications::NotificationManager; use crate::shutdown::Shutdown; use crate::system::launch_metrics_task; use crate::util::io::from_toml_async_reader; @@ -126,6 +127,7 @@ pub struct RpcContextSeed { pub logger: EmbassyLogger, pub log_epoch: Arc, pub tor_socks: SocketAddr, + pub notification_manager: NotificationManager, } #[derive(Clone)] @@ -165,6 +167,7 @@ impl RpcContext { .await?; let managers = ManagerMap::default(); let metrics_cache = RwLock::new(None); + let notification_manager = NotificationManager::new(secret_store.clone(), db.clone(), 3600); let seed = Arc::new(RpcContextSeed { bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()), @@ -186,6 +189,7 @@ impl RpcContext { Ipv4Addr::new(127, 0, 0, 1), 9050, ))), + notification_manager, }); let metrics_seed = seed.clone(); tokio::spawn(async move { diff --git a/appmgr/src/notifications.rs b/appmgr/src/notifications.rs index 261bd069d..0d5422041 100644 --- a/appmgr/src/notifications.rs +++ b/appmgr/src/notifications.rs @@ -1,9 +1,10 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::str::FromStr; use anyhow::anyhow; use chrono::{DateTime, Utc}; +use futures::lock::Mutex; use patch_db::{PatchDb, Revision}; use rpc_toolkit::command; use sqlx::SqlitePool; @@ -132,7 +133,7 @@ pub async fn delete_before(#[context] ctx: RpcContext, #[arg] before: u32) -> Re Ok(()) } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum NotificationLevel { Success, Info, @@ -231,26 +232,43 @@ impl NotificationSubtype { } } -pub async fn notify( - sqlite: &SqlitePool, - patchdb: &PatchDb, - package_id: Option, - level: NotificationLevel, - title: String, - message: String, - subtype: NotificationSubtype, -) -> Result<(), Error> { - let mut handle = patchdb.handle(); - let mut count = crate::db::DatabaseModel::new() - .server_info() - .unread_notification_count() - .get_mut(&mut handle) - .await?; - let sql_package_id = package_id.map::(|p| p.into()); - let sql_code = subtype.code(); - let sql_level = format!("{}", level); - let sql_data = format!("{}", subtype.to_json()); - sqlx::query!( +pub struct NotificationManager { + sqlite: SqlitePool, + patchdb: PatchDb, + cache: Mutex, NotificationLevel, String), i64>>, + debounce_interval: u32, +} +impl NotificationManager { + pub fn new(sqlite: SqlitePool, patchdb: PatchDb, debounce_interval: u32) -> Self { + NotificationManager { + sqlite, + patchdb, + cache: Mutex::new(HashMap::new()), + debounce_interval, + } + } + pub async fn notify( + &self, + package_id: Option, + level: NotificationLevel, + title: String, + message: String, + subtype: NotificationSubtype, + ) -> Result<(), Error> { + if !self.should_notify(&package_id, &level, &title).await { + return Ok(()); + } + let mut handle = self.patchdb.handle(); + let mut count = crate::db::DatabaseModel::new() + .server_info() + .unread_notification_count() + .get_mut(&mut handle) + .await?; + let sql_package_id = package_id.map::(|p| p.into()); + let sql_code = subtype.code(); + let sql_level = format!("{}", level); + let sql_data = format!("{}", subtype.to_json()); + sqlx::query!( "INSERT INTO notifications (package_id, code, level, title, message, data) VALUES (?, ?, ?, ?, ?, ?)", sql_package_id, sql_code, @@ -258,10 +276,50 @@ pub async fn notify( title, message, sql_data - ).execute(sqlite).await?; - *count += 1; - count.save(&mut handle).await?; - Ok(()) + ).execute(&self.sqlite).await?; + *count += 1; + count.save(&mut handle).await?; + Ok(()) + } + async fn gc(&self) { + let mut guard = self.cache.lock().await; + let mut temp = HashMap::new(); + for (k, v) in (*guard).drain() { + if v + self.debounce_interval as i64 > Utc::now().timestamp() { + temp.insert(k, v); + } + } + *guard = temp + } + async fn should_notify( + &self, + package_id: &Option, + level: &NotificationLevel, + title: &String, + ) -> bool { + if level == &NotificationLevel::Error { + return true; + } + self.gc(); + let mut guard = self.cache.lock().await; + let k = (package_id.clone(), level.clone(), title.clone()); + let v = (*guard).get(&k); + match v { + None => { + (*guard).insert(k, Utc::now().timestamp()); + true + } + Some(t) => { + if t + self.debounce_interval as i64 > Utc::now().timestamp() { + false + } else { + // this path should be very rare due to gc above + (*guard).remove(&k); + true + } + } + } + } } #[test] diff --git a/appmgr/src/status/mod.rs b/appmgr/src/status/mod.rs index 1aa9d4ec6..0b7c31a52 100644 --- a/appmgr/src/status/mod.rs +++ b/appmgr/src/status/mod.rs @@ -15,7 +15,7 @@ use crate::dependencies::{ break_transitive, DependencyError, DependencyErrors, TaggedDependencyError, }; use crate::manager::{Manager, Status as ManagerStatus}; -use crate::notifications::{notify, NotificationLevel, NotificationSubtype}; +use crate::notifications::{NotificationLevel, NotificationSubtype}; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::status::health_check::HealthCheckResult; use crate::Error; @@ -335,9 +335,7 @@ impl MainStatus { .map(|hc| hc.critical) .unwrap_or_default() => { - notify( - &ctx.secret_store, - &ctx.db, + ctx.notification_manager.notify( Some(manifest.id.clone()), NotificationLevel::Error, String::from("Critical Health Check Failed"),