adds metrics task to launch sequence

This commit is contained in:
Keagan McClelland
2021-10-04 17:38:28 -06:00
committed by Aiden McClelland
parent c931a922fb
commit b7e6729272
2 changed files with 53 additions and 15 deletions

View File

@@ -26,6 +26,7 @@ use crate::manager::ManagerMap;
use crate::net::tor::os_key;
use crate::net::NetController;
use crate::shutdown::Shutdown;
use crate::system::launch_metrics_task;
use crate::util::io::from_toml_async_reader;
use crate::util::logger::EmbassyLogger;
use crate::util::AsyncFileExt;
@@ -162,6 +163,7 @@ impl RpcContext {
)
.await?;
let managers = ManagerMap::default();
let metrics_cache = RwLock::new(None);
let seed = Arc::new(RpcContextSeed {
bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()),
@@ -174,7 +176,7 @@ impl RpcContext {
managers,
revision_cache_size: base.revision_cache_size.unwrap_or(512),
revision_cache: RwLock::new(VecDeque::new()),
metrics_cache: RwLock::new(None),
metrics_cache,
shutdown,
websocket_count: AtomicUsize::new(0),
logger,
@@ -184,6 +186,13 @@ impl RpcContext {
9050,
))),
});
let metrics_seed = seed.clone();
tokio::spawn(async move {
launch_metrics_task(&metrics_seed.metrics_cache, || {
metrics_seed.shutdown.subscribe()
})
.await
});
let res = Self(seed);
res.managers
.init(

View File

@@ -2,10 +2,12 @@ use std::fmt;
use rpc_toolkit::command;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use crate::context::RpcContext;
use crate::logs::{display_logs, fetch_logs, LogResponse, LogSource};
use crate::shutdown::Shutdown;
use crate::util::{display_none, display_serializable, IoFormat};
use crate::{Error, ErrorKind};
@@ -94,7 +96,10 @@ pub async fn metrics(
}
}
pub async fn launch_metrics_task(cache: &RwLock<Option<Metrics>>) {
pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
cache: &RwLock<Option<Metrics>>,
mut mk_shutdown: F,
) {
// fetch init temp
let init_temp;
loop {
@@ -172,17 +177,20 @@ pub async fn launch_metrics_task(cache: &RwLock<Option<Metrics>>) {
})
}
// launch persistent temp task
let temp_task = launch_temp_task(cache);
let temp_task = launch_temp_task(cache, mk_shutdown());
// launch persistent cpu task
let cpu_task = launch_cpu_task(cache, proc_stat);
let cpu_task = launch_cpu_task(cache, proc_stat, mk_shutdown());
// launch persistent mem task
let mem_task = launch_mem_task(cache);
let mem_task = launch_mem_task(cache, mk_shutdown());
// launch persistent disk task
let disk_task = launch_disk_task(cache);
let disk_task = launch_disk_task(cache, mk_shutdown());
tokio::join!(temp_task, cpu_task, mem_task, disk_task,);
}
async fn launch_temp_task(cache: &RwLock<Option<Metrics>>) {
async fn launch_temp_task(
cache: &RwLock<Option<Metrics>>,
mut shutdown: Receiver<Option<Shutdown>>,
) {
loop {
match get_temp().await {
Ok(a) => {
@@ -193,11 +201,18 @@ async fn launch_temp_task(cache: &RwLock<Option<Metrics>>) {
log::error!("Could not get new temperature: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
tokio::select! {
_ = shutdown.recv() => return,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (),
}
}
}
async fn launch_cpu_task(cache: &RwLock<Option<Metrics>>, mut init: ProcStat) {
async fn launch_cpu_task(
cache: &RwLock<Option<Metrics>>,
mut init: ProcStat,
mut shutdown: Receiver<Option<Shutdown>>,
) {
loop {
// read /proc/stat, diff against previous metrics, compute cpu load
match get_cpu_info(&mut init).await {
@@ -209,11 +224,17 @@ async fn launch_cpu_task(cache: &RwLock<Option<Metrics>>, mut init: ProcStat) {
log::error!("Could not get new CPU Metrics: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
tokio::select! {
_ = shutdown.recv() => return,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (),
}
}
}
async fn launch_mem_task(cache: &RwLock<Option<Metrics>>) {
async fn launch_mem_task(
cache: &RwLock<Option<Metrics>>,
mut shutdown: Receiver<Option<Shutdown>>,
) {
loop {
// read /proc/meminfo
match get_mem_info().await {
@@ -225,11 +246,16 @@ async fn launch_mem_task(cache: &RwLock<Option<Metrics>>) {
log::error!("Could not get new Memory Metrics: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
tokio::select! {
_ = shutdown.recv() => return,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (),
}
}
}
async fn launch_disk_task(cache: &RwLock<Option<Metrics>>) {
async fn launch_disk_task(
cache: &RwLock<Option<Metrics>>,
mut shutdown: Receiver<Option<Shutdown>>,
) {
loop {
// run df and capture output
match get_disk_info().await {
@@ -241,7 +267,10 @@ async fn launch_disk_task(cache: &RwLock<Option<Metrics>>) {
log::error!("Could not get new Disk Metrics: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
tokio::select! {
_ = shutdown.recv() => return,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => (),
}
}
}