implements session counter (#463)

* implements session counter

* implements session ids
This commit is contained in:
Keagan McClelland
2021-09-11 13:15:05 -06:00
committed by Aiden McClelland
parent ac9b1a7744
commit c6a0cf90bf
3 changed files with 36 additions and 1 deletions

View File

@@ -3,6 +3,7 @@ use std::collections::VecDeque;
use std::net::{IpAddr, SocketAddr};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::Arc;
use bollard::Docker;
@@ -97,6 +98,8 @@ pub struct RpcContextSeed {
pub revision_cache: RwLock<VecDeque<Arc<Revision>>>,
pub metrics_cache: RwLock<Option<crate::system::Metrics>>,
pub shutdown: Sender<Option<Shutdown>>,
pub websocket_count: AtomicUsize,
pub session_id: AtomicU64,
}
#[derive(Clone)]
@@ -130,6 +133,8 @@ impl RpcContext {
revision_cache: RwLock::new(VecDeque::new()),
metrics_cache: RwLock::new(None),
shutdown,
websocket_count: AtomicUsize::new(0),
session_id: AtomicU64::new(rand::random()),
});
let res = Self(seed);
res.managers

View File

@@ -21,7 +21,7 @@ use tokio_tungstenite::WebSocketStream;
pub use self::model::DatabaseModel;
use self::util::WithRevision;
use crate::context::RpcContext;
use crate::util::{display_serializable, IoFormat};
use crate::util::{display_serializable, GeneralGuard, IoFormat};
use crate::{Error, ResultExt};
async fn ws_handler<
@@ -35,6 +35,21 @@ async fn ws_handler<
.await
.with_kind(crate::ErrorKind::Network)?
.with_kind(crate::ErrorKind::Unknown)?;
// add 1 to the session counter and issue an RAII guard to subtract 1 on drop
ctx.websocket_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let decrementer = GeneralGuard::new(|| {
let new_count = ctx
.websocket_count
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
if new_count == 0 {
ctx.session_id
.store(rand::random(), std::sync::atomic::Ordering::SeqCst)
}
()
});
loop {
if let Some(Message::Text(_)) = stream
.next()

View File

@@ -1030,3 +1030,18 @@ impl<T> Future for NonDetachingJoinHandle<T> {
this.0.poll(cx)
}
}
pub struct GeneralGuard<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> GeneralGuard<F> {
pub fn new(f: F) -> Self {
GeneralGuard(Some(f))
}
}
impl<F: FnOnce()> Drop for GeneralGuard<F> {
fn drop(&mut self) {
if let Some(destroy) = self.0.take() {
destroy()
}
}
}