use std::fmt; use chrono::Utc; use color_eyre::eyre::eyre; use futures::FutureExt; use rpc_toolkit::command; use rpc_toolkit::yajrc::RpcError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use tokio::process::Command; use tokio::sync::broadcast::Receiver; use tokio::sync::RwLock; use tracing::instrument; use crate::context::{CliContext, RpcContext}; use crate::disk::util::{get_available, get_used}; use crate::logs::{ cli_logs_generic_follow, cli_logs_generic_nofollow, fetch_logs, follow_logs, LogFollowResponse, LogResponse, LogSource, }; use crate::shutdown::Shutdown; use crate::util::serde::{display_serializable, IoFormat}; use crate::util::{display_none, Invoke}; use crate::{Error, ErrorKind, ResultExt}; pub const SYSTEMD_UNIT: &'static str = "embassyd"; #[command(subcommands(zram))] pub async fn experimental() -> Result<(), Error> { Ok(()) } pub async fn enable_zram() -> Result<(), Error> { let mem_info = get_mem_info().await?; Command::new("modprobe") .arg("zram") .invoke(ErrorKind::Zram) .await?; tokio::fs::write("/sys/block/zram0/comp_algorithm", "lz4") .await .with_kind(ErrorKind::Zram)?; tokio::fs::write( "/sys/block/zram0/disksize", format!("{}M", mem_info.total.0 as u64 / 4), ) .await .with_kind(ErrorKind::Zram)?; Command::new("mkswap") .arg("/dev/zram0") .invoke(ErrorKind::Zram) .await?; Command::new("swapon") .arg("-p") .arg("5") .arg("/dev/zram0") .invoke(ErrorKind::Zram) .await?; Ok(()) } #[command(display(display_none))] pub async fn zram(#[context] ctx: RpcContext, #[arg] enable: bool) -> Result<(), Error> { let mut db = ctx.db.handle(); let mut zram = crate::db::DatabaseModel::new() .server_info() .zram() .get_mut(&mut db) .await?; if enable == *zram { return Ok(()); } *zram = enable; if enable { enable_zram().await?; } else { Command::new("swapoff") .arg("/dev/zram0") .invoke(ErrorKind::Zram) .await?; tokio::fs::write("/sys/block/zram0/reset", "1") .await .with_kind(ErrorKind::Zram)?; } zram.save(&mut db).await?; Ok(()) } #[command] pub async fn time() -> Result { Ok(Utc::now().to_rfc3339()) } #[command( custom_cli(cli_logs(async, context(CliContext))), subcommands(self(logs_nofollow(async)), logs_follow), display(display_none) )] pub async fn logs( #[arg(short = 'l', long = "limit")] limit: Option, #[arg(short = 'c', long = "cursor")] cursor: Option, #[arg(short = 'B', long = "before", default)] before: bool, #[arg(short = 'f', long = "follow", default)] follow: bool, ) -> Result<(Option, Option, bool, bool), Error> { Ok((limit, cursor, before, follow)) } pub async fn cli_logs( ctx: CliContext, (limit, cursor, before, follow): (Option, Option, bool, bool), ) -> Result<(), RpcError> { if follow { if cursor.is_some() { return Err(RpcError::from(Error::new( eyre!("The argument '--cursor ' cannot be used with '--follow'"), crate::ErrorKind::InvalidRequest, ))); } if before { return Err(RpcError::from(Error::new( eyre!("The argument '--before' cannot be used with '--follow'"), crate::ErrorKind::InvalidRequest, ))); } cli_logs_generic_follow(ctx, "server.logs.follow", None, limit).await } else { cli_logs_generic_nofollow(ctx, "server.logs", None, limit, cursor, before).await } } pub async fn logs_nofollow( _ctx: (), (limit, cursor, before, _): (Option, Option, bool, bool), ) -> Result { fetch_logs(LogSource::Service(SYSTEMD_UNIT), limit, cursor, before).await } #[command(rpc_only, rename = "follow", display(display_none))] pub async fn logs_follow( #[context] ctx: RpcContext, #[parent_data] (limit, _, _, _): (Option, Option, bool, bool), ) -> Result { follow_logs(ctx, LogSource::Service(SYSTEMD_UNIT), limit).await } #[command( rename = "kernel-logs", custom_cli(cli_kernel_logs(async, context(CliContext))), subcommands(self(kernel_logs_nofollow(async)), kernel_logs_follow), display(display_none) )] pub async fn kernel_logs( #[arg(short = 'l', long = "limit")] limit: Option, #[arg(short = 'c', long = "cursor")] cursor: Option, #[arg(short = 'B', long = "before", default)] before: bool, #[arg(short = 'f', long = "follow", default)] follow: bool, ) -> Result<(Option, Option, bool, bool), Error> { Ok((limit, cursor, before, follow)) } pub async fn cli_kernel_logs( ctx: CliContext, (limit, cursor, before, follow): (Option, Option, bool, bool), ) -> Result<(), RpcError> { if follow { if cursor.is_some() { return Err(RpcError::from(Error::new( eyre!("The argument '--cursor ' cannot be used with '--follow'"), crate::ErrorKind::InvalidRequest, ))); } if before { return Err(RpcError::from(Error::new( eyre!("The argument '--before' cannot be used with '--follow'"), crate::ErrorKind::InvalidRequest, ))); } cli_logs_generic_follow(ctx, "server.kernel-logs.follow", None, limit).await } else { cli_logs_generic_nofollow(ctx, "server.kernel-logs", None, limit, cursor, before).await } } pub async fn kernel_logs_nofollow( _ctx: (), (limit, cursor, before, _): (Option, Option, bool, bool), ) -> Result { fetch_logs(LogSource::Kernel, limit, cursor, before).await } #[command(rpc_only, rename = "follow", display(display_none))] pub async fn kernel_logs_follow( #[context] ctx: RpcContext, #[parent_data] (limit, _, _, _): (Option, Option, bool, bool), ) -> Result { follow_logs(ctx, LogSource::Kernel, limit).await } #[derive(Serialize, Deserialize)] pub struct MetricLeaf { value: T, unit: Option, } #[derive(Clone, Debug)] pub struct Celsius(f64); impl fmt::Display for Celsius { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{:.1}°C", self.0) } } impl Serialize for Celsius { fn serialize(&self, serializer: S) -> Result where S: Serializer, { MetricLeaf { value: format!("{:.1}", self.0), unit: Some(String::from("°C")), } .serialize(serializer) } } impl<'de> Deserialize<'de> for Celsius { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { let s = MetricLeaf::::deserialize(deserializer)?; Ok(Celsius(s.value.parse().map_err(serde::de::Error::custom)?)) } } #[derive(Clone, Debug)] pub struct Percentage(f64); impl Serialize for Percentage { fn serialize(&self, serializer: S) -> Result where S: Serializer, { MetricLeaf { value: format!("{:.1}", self.0), unit: Some(String::from("%")), } .serialize(serializer) } } impl<'de> Deserialize<'de> for Percentage { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { let s = MetricLeaf::::deserialize(deserializer)?; Ok(Percentage( s.value.parse().map_err(serde::de::Error::custom)?, )) } } #[derive(Clone, Debug)] pub struct MebiBytes(f64); impl Serialize for MebiBytes { fn serialize(&self, serializer: S) -> Result where S: Serializer, { MetricLeaf { value: format!("{:.2}", self.0), unit: Some(String::from("MiB")), } .serialize(serializer) } } impl<'de> Deserialize<'de> for MebiBytes { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { let s = MetricLeaf::::deserialize(deserializer)?; Ok(MebiBytes( s.value.parse().map_err(serde::de::Error::custom)?, )) } } #[derive(Clone, Debug)] pub struct GigaBytes(f64); impl Serialize for GigaBytes { fn serialize(&self, serializer: S) -> Result where S: Serializer, { MetricLeaf { value: format!("{:.2}", self.0), unit: Some(String::from("GB")), } .serialize(serializer) } } impl<'de> Deserialize<'de> for GigaBytes { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { let s = MetricLeaf::::deserialize(deserializer)?; Ok(GigaBytes( s.value.parse().map_err(serde::de::Error::custom)?, )) } } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsGeneral { #[serde(rename = "Temperature")] temperature: Option, } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsMemory { #[serde(rename = "Percentage Used")] percentage_used: Percentage, #[serde(rename = "Total")] total: MebiBytes, #[serde(rename = "Available")] available: MebiBytes, #[serde(rename = "Used")] used: MebiBytes, #[serde(rename = "Swap Total")] swap_total: MebiBytes, #[serde(rename = "Swap Free")] swap_free: MebiBytes, #[serde(rename = "Swap Used")] swap_used: MebiBytes, } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsCpu { #[serde(rename = "User Space")] user_space: Percentage, #[serde(rename = "Kernel Space")] kernel_space: Percentage, #[serde(rename = "I/O Wait")] wait: Percentage, #[serde(rename = "Idle")] idle: Percentage, #[serde(rename = "Usage")] usage: Percentage, } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct MetricsDisk { #[serde(rename = "Size")] size: GigaBytes, #[serde(rename = "Used")] used: GigaBytes, #[serde(rename = "Available")] available: GigaBytes, #[serde(rename = "Percentage Used")] used_percentage: Percentage, } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct Metrics { #[serde(rename = "General")] general: MetricsGeneral, #[serde(rename = "Memory")] memory: MetricsMemory, #[serde(rename = "CPU")] cpu: MetricsCpu, #[serde(rename = "Disk")] disk: MetricsDisk, } #[command(display(display_serializable))] pub async fn metrics( #[context] ctx: RpcContext, #[allow(unused_variables)] #[arg(long = "format")] format: Option, ) -> Result { match ctx.metrics_cache.read().await.clone() { None => Err(Error { source: color_eyre::eyre::eyre!("No Metrics Found"), kind: ErrorKind::NotFound, revision: None, }), Some(metrics_val) => Ok(metrics_val), } } pub async fn launch_metrics_task Receiver>>( cache: &RwLock>, mut mk_shutdown: F, ) { // fetch init temp let init_temp = match get_temp().await { Ok(a) => Some(a), Err(e) => { tracing::error!("Could not get initial temperature: {}", e); tracing::debug!("{:?}", e); None } }; // fetch init cpu let init_cpu; let proc_stat; loop { match get_proc_stat().await { Ok(mut ps) => match get_cpu_info(&mut ps).await { Ok(mc) => { proc_stat = ps; init_cpu = mc; break; } Err(e) => { tracing::error!("Could not get initial cpu info: {}", e); tracing::debug!("{:?}", e); } }, Err(e) => { tracing::error!("Could not get initial proc stat: {}", e); tracing::debug!("{:?}", e); } } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } // fetch init memory let init_mem; loop { match get_mem_info().await { Ok(a) => { init_mem = a; break; } Err(e) => { tracing::error!("Could not get initial mem info: {}", e); tracing::debug!("{:?}", e); } } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } // fetch init disk usage let init_disk; loop { match get_disk_info().await { Ok(a) => { init_disk = a; break; } Err(e) => { tracing::error!("Could not get initial disk info: {}", e); tracing::debug!("{:?}", e); } } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } { // lock for writing let mut guard = cache.write().await; // write *guard = Some(Metrics { general: MetricsGeneral { temperature: init_temp, }, memory: init_mem, cpu: init_cpu, disk: init_disk, }) } let mut task_vec = Vec::new(); // launch persistent temp task if cache .read() .await .as_ref() .unwrap() .general .temperature .is_some() { task_vec.push(launch_temp_task(cache, mk_shutdown()).boxed()); } // launch persistent cpu task task_vec.push(launch_cpu_task(cache, proc_stat, mk_shutdown()).boxed()); // launch persistent mem task task_vec.push(launch_mem_task(cache, mk_shutdown()).boxed()); // launch persistent disk task task_vec.push(launch_disk_task(cache, mk_shutdown()).boxed()); futures::future::join_all(task_vec).await; } async fn launch_temp_task( cache: &RwLock>, mut shutdown: Receiver>, ) { loop { match get_temp().await { Ok(a) => { let mut lock = cache.write().await; (*lock).as_mut().unwrap().general.temperature = Some(a) } Err(e) => { tracing::error!("Could not get new temperature: {}", e); tracing::debug!("{:?}", e); } } tokio::select! { _ = shutdown.recv() => return, _ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (), } } } async fn launch_cpu_task( cache: &RwLock>, mut init: ProcStat, mut shutdown: Receiver>, ) { loop { // read /proc/stat, diff against previous metrics, compute cpu load match get_cpu_info(&mut init).await { Ok(info) => { let mut lock = cache.write().await; (*lock).as_mut().unwrap().cpu = info; } Err(e) => { tracing::error!("Could not get new CPU Metrics: {}", e); tracing::debug!("{:?}", e); } } tokio::select! { _ = shutdown.recv() => return, _ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (), } } } async fn launch_mem_task( cache: &RwLock>, mut shutdown: Receiver>, ) { loop { // read /proc/meminfo match get_mem_info().await { Ok(a) => { let mut lock = cache.write().await; (*lock).as_mut().unwrap().memory = a; } Err(e) => { tracing::error!("Could not get new Memory Metrics: {}", e); tracing::debug!("{:?}", e); } } tokio::select! { _ = shutdown.recv() => return, _ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => (), } } } async fn launch_disk_task( cache: &RwLock>, mut shutdown: Receiver>, ) { loop { // run df and capture output match get_disk_info().await { Ok(a) => { let mut lock = cache.write().await; (*lock).as_mut().unwrap().disk = a; } Err(e) => { tracing::error!("Could not get new Disk Metrics: {}", e); tracing::debug!("{:?}", e); } } tokio::select! { _ = shutdown.recv() => return, _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => (), } } } #[instrument(skip_all)] async fn get_temp() -> Result { let temp = serde_json::from_slice::( &Command::new("sensors") .arg("-j") .invoke(ErrorKind::Filesystem) .await?, ) .with_kind(ErrorKind::Deserialization)? .as_object() .into_iter() .flatten() .flat_map(|(_, v)| v.as_object()) .flatten() .flat_map(|(_, v)| v.as_object()) .flatten() .filter_map(|(k, v)| { if k.ends_with("_input") { v.as_f64() } else { None } }) .reduce(f64::max) .ok_or_else(|| Error::new(eyre!("No temperatures available"), ErrorKind::Filesystem))?; Ok(Celsius(temp)) } #[derive(Debug, Clone)] pub struct ProcStat { user: u64, nice: u64, system: u64, idle: u64, iowait: u64, irq: u64, softirq: u64, // below are only applicable to virtualized environments // steal: u64, // guest: u64, // guest_nice: u64, } impl ProcStat { fn total(&self) -> u64 { self.user + self.nice + self.system + self.idle + self.iowait + self.irq + self.softirq } fn user(&self) -> u64 { self.user + self.nice } fn system(&self) -> u64 { self.system + self.irq + self.softirq } fn used(&self) -> u64 { self.user() + self.system() } } #[instrument(skip_all)] async fn get_proc_stat() -> Result { use tokio::io::AsyncBufReadExt; let mut cpu_line = String::new(); let _n = tokio::io::BufReader::new(tokio::fs::File::open("/proc/stat").await?) .read_line(&mut cpu_line) .await?; let stats: Vec = cpu_line .split_whitespace() .skip(1) .map(|s| { s.parse::().map_err(|e| { Error::new( color_eyre::eyre::eyre!("Invalid /proc/stat column value: {}", e), ErrorKind::ParseSysInfo, ) }) }) .collect::, Error>>()?; if stats.len() < 10 { Err(Error { source: color_eyre::eyre::eyre!( "Columns missing from /proc/stat. Need 10, found {}", stats.len() ), kind: ErrorKind::ParseSysInfo, revision: None, }) } else { Ok(ProcStat { user: stats[0], nice: stats[1], system: stats[2], idle: stats[3], iowait: stats[4], irq: stats[5], softirq: stats[6], }) } } #[instrument(skip_all)] async fn get_cpu_info(last: &mut ProcStat) -> Result { let new = get_proc_stat().await?; let total_old = last.total(); let total_new = new.total(); let total_diff = total_new - total_old; let res = MetricsCpu { user_space: Percentage((new.user() - last.user()) as f64 * 100.0 / total_diff as f64), kernel_space: Percentage((new.system() - last.system()) as f64 * 100.0 / total_diff as f64), idle: Percentage((new.idle - last.idle) as f64 * 100.0 / total_diff as f64), wait: Percentage((new.iowait - last.iowait) as f64 * 100.0 / total_diff as f64), usage: Percentage((new.used() - last.used()) as f64 * 100.0 / total_diff as f64), }; *last = new; Ok(res) } pub struct MemInfo { mem_total: Option, mem_free: Option, mem_available: Option, buffers: Option, cached: Option, slab: Option, swap_total: Option, swap_free: Option, } #[instrument(skip_all)] async fn get_mem_info() -> Result { let contents = tokio::fs::read_to_string("/proc/meminfo").await?; let mut mem_info = MemInfo { mem_total: None, mem_free: None, mem_available: None, buffers: None, cached: None, slab: None, swap_total: None, swap_free: None, }; fn get_num_kb(l: &str) -> Result { let e = Error::new( color_eyre::eyre::eyre!("Invalid meminfo line: {}", l), ErrorKind::ParseSysInfo, ); match l.split_whitespace().skip(1).next() { Some(x) => match x.parse() { Ok(y) => Ok(y), Err(_) => Err(e), }, None => Err(e), } } for entry in contents.lines() { match entry { _ if entry.starts_with("MemTotal") => mem_info.mem_total = Some(get_num_kb(entry)?), _ if entry.starts_with("MemFree") => mem_info.mem_free = Some(get_num_kb(entry)?), _ if entry.starts_with("MemAvailable") => { mem_info.mem_available = Some(get_num_kb(entry)?) } _ if entry.starts_with("Buffers") => mem_info.buffers = Some(get_num_kb(entry)?), _ if entry.starts_with("Cached") => mem_info.cached = Some(get_num_kb(entry)?), _ if entry.starts_with("Slab") => mem_info.slab = Some(get_num_kb(entry)?), _ if entry.starts_with("SwapTotal") => mem_info.swap_total = Some(get_num_kb(entry)?), _ if entry.starts_with("SwapFree") => mem_info.swap_free = Some(get_num_kb(entry)?), _ => (), } } fn ensure_present(a: Option, field: &str) -> Result { a.ok_or(Error::new( color_eyre::eyre::eyre!("{} missing from /proc/meminfo", field), ErrorKind::ParseSysInfo, )) } let mem_total = ensure_present(mem_info.mem_total, "MemTotal")?; let mem_free = ensure_present(mem_info.mem_free, "MemFree")?; let mem_available = ensure_present(mem_info.mem_available, "MemAvailable")?; let buffers = ensure_present(mem_info.buffers, "Buffers")?; let cached = ensure_present(mem_info.cached, "Cached")?; let slab = ensure_present(mem_info.slab, "Slab")?; let swap_total_k = ensure_present(mem_info.swap_total, "SwapTotal")?; let swap_free_k = ensure_present(mem_info.swap_free, "SwapFree")?; let total = MebiBytes(mem_total as f64 / 1024.0); let available = MebiBytes(mem_available as f64 / 1024.0); let used = MebiBytes((mem_total - mem_free - buffers - cached - slab) as f64 / 1024.0); let swap_total = MebiBytes(swap_total_k as f64 / 1024.0); let swap_free = MebiBytes(swap_free_k as f64 / 1024.0); let swap_used = MebiBytes((swap_total_k - swap_free_k) as f64 / 1024.0); let percentage_used = Percentage((total.0 - available.0) / total.0 * 100.0); Ok(MetricsMemory { percentage_used, total, available, used, swap_total, swap_free, swap_used, }) } #[instrument(skip_all)] async fn get_disk_info() -> Result { let package_used_task = get_used("/embassy-data/package-data"); let package_available_task = get_available("/embassy-data/package-data"); let os_used_task = get_used("/embassy-data/main"); let os_available_task = get_available("/embassy-data/main"); let (package_used, package_available, os_used, os_available) = futures::try_join!( package_used_task, package_available_task, os_used_task, os_available_task, )?; let total_used = package_used + os_used; let total_available = package_available + os_available; let total_size = total_used + total_available; let total_percentage = total_used as f64 / total_size as f64 * 100.0f64; Ok(MetricsDisk { size: GigaBytes(total_size as f64 / 1_000_000_000.0), used: GigaBytes(total_used as f64 / 1_000_000_000.0), available: GigaBytes(total_available as f64 / 1_000_000_000.0), used_percentage: Percentage(total_percentage as f64), }) } #[tokio::test] pub async fn test_get_temp() { println!("{}", get_temp().await.unwrap()) } #[tokio::test] pub async fn test_get_proc_stat() { println!("{:?}", get_proc_stat().await.unwrap()) } #[tokio::test] pub async fn test_get_mem_info() { println!("{:?}", get_mem_info().await.unwrap()) } #[tokio::test] pub async fn test_get_disk_usage() { println!("{:?}", get_disk_info().await.unwrap()) }