diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 73bbeaa10..f4c426f17 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -26,6 +26,7 @@ use crate::manager::ManagerMap; use crate::net::tor::os_key; use crate::net::NetController; use crate::shutdown::Shutdown; +use crate::system::launch_metrics_task; use crate::util::io::from_toml_async_reader; use crate::util::logger::EmbassyLogger; use crate::util::AsyncFileExt; @@ -162,6 +163,7 @@ impl RpcContext { ) .await?; let managers = ManagerMap::default(); + let metrics_cache = RwLock::new(None); let seed = Arc::new(RpcContextSeed { bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()), @@ -174,7 +176,7 @@ impl RpcContext { managers, revision_cache_size: base.revision_cache_size.unwrap_or(512), revision_cache: RwLock::new(VecDeque::new()), - metrics_cache: RwLock::new(None), + metrics_cache, shutdown, websocket_count: AtomicUsize::new(0), logger, @@ -184,6 +186,13 @@ impl RpcContext { 9050, ))), }); + let metrics_seed = seed.clone(); + tokio::spawn(async move { + launch_metrics_task(&metrics_seed.metrics_cache, || { + metrics_seed.shutdown.subscribe() + }) + .await + }); let res = Self(seed); res.managers .init( diff --git a/appmgr/src/system.rs b/appmgr/src/system.rs index 7c30133ae..9da24f8de 100644 --- a/appmgr/src/system.rs +++ b/appmgr/src/system.rs @@ -2,10 +2,12 @@ use std::fmt; use rpc_toolkit::command; use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast::Receiver; use tokio::sync::RwLock; use crate::context::RpcContext; use crate::logs::{display_logs, fetch_logs, LogResponse, LogSource}; +use crate::shutdown::Shutdown; use crate::util::{display_none, display_serializable, IoFormat}; use crate::{Error, ErrorKind}; @@ -94,7 +96,10 @@ pub async fn metrics( } } -pub async fn launch_metrics_task(cache: &RwLock>) { +pub async fn launch_metrics_task Receiver>>( + cache: &RwLock>, + mut mk_shutdown: F, +) { // fetch init temp let init_temp; loop { @@ -172,17 +177,20 @@ pub async fn launch_metrics_task(cache: &RwLock>) { }) } // launch persistent temp task - let temp_task = launch_temp_task(cache); + let temp_task = launch_temp_task(cache, mk_shutdown()); // launch persistent cpu task - let cpu_task = launch_cpu_task(cache, proc_stat); + let cpu_task = launch_cpu_task(cache, proc_stat, mk_shutdown()); // launch persistent mem task - let mem_task = launch_mem_task(cache); + let mem_task = launch_mem_task(cache, mk_shutdown()); // launch persistent disk task - let disk_task = launch_disk_task(cache); + let disk_task = launch_disk_task(cache, mk_shutdown()); tokio::join!(temp_task, cpu_task, mem_task, disk_task,); } -async fn launch_temp_task(cache: &RwLock>) { +async fn launch_temp_task( + cache: &RwLock>, + mut shutdown: Receiver>, +) { loop { match get_temp().await { Ok(a) => { @@ -193,11 +201,18 @@ async fn launch_temp_task(cache: &RwLock>) { log::error!("Could not get new temperature: {}", e); } } - tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; + tokio::select! { + _ = shutdown.recv() => return, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (), + } } } -async fn launch_cpu_task(cache: &RwLock>, mut init: ProcStat) { +async fn launch_cpu_task( + cache: &RwLock>, + mut init: ProcStat, + mut shutdown: Receiver>, +) { loop { // read /proc/stat, diff against previous metrics, compute cpu load match get_cpu_info(&mut init).await { @@ -209,11 +224,17 @@ async fn launch_cpu_task(cache: &RwLock>, mut init: ProcStat) { log::error!("Could not get new CPU Metrics: {}", e); } } - tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; + tokio::select! { + _ = shutdown.recv() => return, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (), + } } } -async fn launch_mem_task(cache: &RwLock>) { +async fn launch_mem_task( + cache: &RwLock>, + mut shutdown: Receiver>, +) { loop { // read /proc/meminfo match get_mem_info().await { @@ -225,11 +246,16 @@ async fn launch_mem_task(cache: &RwLock>) { log::error!("Could not get new Memory Metrics: {}", e); } } - - tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; + tokio::select! { + _ = shutdown.recv() => return, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (), + } } } -async fn launch_disk_task(cache: &RwLock>) { +async fn launch_disk_task( + cache: &RwLock>, + mut shutdown: Receiver>, +) { loop { // run df and capture output match get_disk_info().await { @@ -241,7 +267,10 @@ async fn launch_disk_task(cache: &RwLock>) { log::error!("Could not get new Disk Metrics: {}", e); } } - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + tokio::select! { + _ = shutdown.recv() => return, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => (), + } } }