diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 739f9ad01..1bf72af0f 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -3,6 +3,7 @@ use std::collections::VecDeque; use std::net::{IpAddr, SocketAddr}; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::Arc; use bollard::Docker; @@ -97,6 +98,8 @@ pub struct RpcContextSeed { pub revision_cache: RwLock>>, pub metrics_cache: RwLock>, pub shutdown: Sender>, + pub websocket_count: AtomicUsize, + pub session_id: AtomicU64, } #[derive(Clone)] @@ -130,6 +133,8 @@ impl RpcContext { revision_cache: RwLock::new(VecDeque::new()), metrics_cache: RwLock::new(None), shutdown, + websocket_count: AtomicUsize::new(0), + session_id: AtomicU64::new(rand::random()), }); let res = Self(seed); res.managers diff --git a/appmgr/src/db/mod.rs b/appmgr/src/db/mod.rs index ca9c0b37c..0bc9fb392 100644 --- a/appmgr/src/db/mod.rs +++ b/appmgr/src/db/mod.rs @@ -21,7 +21,7 @@ use tokio_tungstenite::WebSocketStream; pub use self::model::DatabaseModel; use self::util::WithRevision; use crate::context::RpcContext; -use crate::util::{display_serializable, IoFormat}; +use crate::util::{display_serializable, GeneralGuard, IoFormat}; use crate::{Error, ResultExt}; async fn ws_handler< @@ -35,6 +35,21 @@ async fn ws_handler< .await .with_kind(crate::ErrorKind::Network)? .with_kind(crate::ErrorKind::Unknown)?; + + // add 1 to the session counter and issue an RAII guard to subtract 1 on drop + ctx.websocket_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let decrementer = GeneralGuard::new(|| { + let new_count = ctx + .websocket_count + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + if new_count == 0 { + ctx.session_id + .store(rand::random(), std::sync::atomic::Ordering::SeqCst) + } + () + }); + loop { if let Some(Message::Text(_)) = stream .next() diff --git a/appmgr/src/util/mod.rs b/appmgr/src/util/mod.rs index 08306601d..ef178a9ce 100644 --- a/appmgr/src/util/mod.rs +++ b/appmgr/src/util/mod.rs @@ -1030,3 +1030,18 @@ impl Future for NonDetachingJoinHandle { this.0.poll(cx) } } + +pub struct GeneralGuard(Option); +impl GeneralGuard { + pub fn new(f: F) -> Self { + GeneralGuard(Some(f)) + } +} + +impl Drop for GeneralGuard { + fn drop(&mut self) { + if let Some(destroy) = self.0.take() { + destroy() + } + } +}