mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-27 02:41:53 +00:00
implements session counter (#463)
* implements session counter * implements session ids
This commit is contained in:
committed by
GitHub
parent
3027d79319
commit
1e0583264e
@@ -3,6 +3,7 @@ use std::collections::VecDeque;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use std::sync::Arc;
|
||||
|
||||
use bollard::Docker;
|
||||
@@ -97,6 +98,8 @@ pub struct RpcContextSeed {
|
||||
pub revision_cache: RwLock<VecDeque<Arc<Revision>>>,
|
||||
pub metrics_cache: RwLock<Option<crate::system::Metrics>>,
|
||||
pub shutdown: Sender<Option<Shutdown>>,
|
||||
pub websocket_count: AtomicUsize,
|
||||
pub session_id: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -130,6 +133,8 @@ impl RpcContext {
|
||||
revision_cache: RwLock::new(VecDeque::new()),
|
||||
metrics_cache: RwLock::new(None),
|
||||
shutdown,
|
||||
websocket_count: AtomicUsize::new(0),
|
||||
session_id: AtomicU64::new(rand::random()),
|
||||
});
|
||||
let res = Self(seed);
|
||||
res.managers
|
||||
|
||||
@@ -21,7 +21,7 @@ use tokio_tungstenite::WebSocketStream;
|
||||
pub use self::model::DatabaseModel;
|
||||
use self::util::WithRevision;
|
||||
use crate::context::RpcContext;
|
||||
use crate::util::{display_serializable, IoFormat};
|
||||
use crate::util::{display_serializable, GeneralGuard, IoFormat};
|
||||
use crate::{Error, ResultExt};
|
||||
|
||||
async fn ws_handler<
|
||||
@@ -35,6 +35,21 @@ async fn ws_handler<
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Network)?
|
||||
.with_kind(crate::ErrorKind::Unknown)?;
|
||||
|
||||
// add 1 to the session counter and issue an RAII guard to subtract 1 on drop
|
||||
ctx.websocket_count
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
let decrementer = GeneralGuard::new(|| {
|
||||
let new_count = ctx
|
||||
.websocket_count
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
|
||||
if new_count == 0 {
|
||||
ctx.session_id
|
||||
.store(rand::random(), std::sync::atomic::Ordering::SeqCst)
|
||||
}
|
||||
()
|
||||
});
|
||||
|
||||
loop {
|
||||
if let Some(Message::Text(_)) = stream
|
||||
.next()
|
||||
|
||||
@@ -1030,3 +1030,18 @@ impl<T> Future for NonDetachingJoinHandle<T> {
|
||||
this.0.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GeneralGuard<F: FnOnce()>(Option<F>);
|
||||
impl<F: FnOnce()> GeneralGuard<F> {
|
||||
pub fn new(f: F) -> Self {
|
||||
GeneralGuard(Some(f))
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: FnOnce()> Drop for GeneralGuard<F> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(destroy) = self.0.take() {
|
||||
destroy()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user