Update/040 types (#2845)

* small type changes and clear todos

* handle notifications and metrics

* wip

* fixes

* migration

* dedup all urls

* better handling of clearnet ips

* add rfkill dependency

---------

Co-authored-by: Matt Hill <mattnine@protonmail.com>
This commit is contained in:
Aiden McClelland
2025-03-06 20:36:19 -07:00
committed by GitHub
parent ac392dcb96
commit e830fade06
63 changed files with 800 additions and 480 deletions

View File

@@ -1,19 +1,20 @@
use std::collections::BTreeSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use clap::Parser;
use color_eyre::eyre::eyre;
use futures::FutureExt;
use futures::{FutureExt, TryStreamExt};
use imbl::vector;
use imbl_value::InternedString;
use rpc_toolkit::{from_fn_async, Context, Empty, HandlerExt, ParentHandler};
use rustls::RootCertStore;
use rustls_pki_types::CertificateDer;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tokio::process::Command;
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use tracing::instrument;
use ts_rs::TS;
@@ -21,11 +22,13 @@ use crate::context::{CliContext, RpcContext};
use crate::disk::util::{get_available, get_used};
use crate::logs::{LogSource, LogsParams, SYSTEM_UNIT};
use crate::prelude::*;
use crate::rpc_continuations::RpcContinuations;
use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations};
use crate::shutdown::Shutdown;
use crate::util::cpupower::{get_available_governors, set_governor, Governor};
use crate::util::io::open_file;
use crate::util::net::WebSocketExt;
use crate::util::serde::{display_serializable, HandlerExtSerde, WithIoFormat};
use crate::util::sync::Watch;
use crate::util::Invoke;
use crate::{MAIN_DATA, PACKAGE_DATA};
@@ -249,7 +252,7 @@ pub struct MetricLeaf<T> {
unit: Option<String>,
}
#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, TS)]
pub struct Celsius(f64);
impl fmt::Display for Celsius {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -277,7 +280,7 @@ impl<'de> Deserialize<'de> for Celsius {
Ok(Celsius(s.value.parse().map_err(serde::de::Error::custom)?))
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, PartialOrd, TS)]
pub struct Percentage(f64);
impl Serialize for Percentage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -303,7 +306,7 @@ impl<'de> Deserialize<'de> for Percentage {
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, TS)]
pub struct MebiBytes(pub f64);
impl Serialize for MebiBytes {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -329,7 +332,7 @@ impl<'de> Deserialize<'de> for MebiBytes {
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, PartialOrd, TS)]
pub struct GigaBytes(f64);
impl Serialize for GigaBytes {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -355,12 +358,13 @@ impl<'de> Deserialize<'de> for GigaBytes {
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug, TS)]
#[serde(rename_all = "camelCase")]
pub struct MetricsGeneral {
pub temperature: Option<Celsius>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug, TS)]
#[serde(rename_all = "camelCase")]
pub struct MetricsMemory {
pub percentage_used: Percentage,
@@ -371,7 +375,8 @@ pub struct MetricsMemory {
pub zram_available: MebiBytes,
pub zram_used: MebiBytes,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug, TS)]
#[serde(rename_all = "camelCase")]
pub struct MetricsCpu {
percentage_used: Percentage,
@@ -380,7 +385,8 @@ pub struct MetricsCpu {
kernel_space: Percentage,
wait: Percentage,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct MetricsDisk {
percentage_used: Percentage,
@@ -388,8 +394,10 @@ pub struct MetricsDisk {
available: GigaBytes,
capacity: GigaBytes,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[derive(Deserialize, Serialize, Clone, Debug, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct Metrics {
general: MetricsGeneral,
memory: MetricsMemory,
@@ -398,19 +406,74 @@ pub struct Metrics {
}
// #[command(display(display_serializable))]
pub async fn metrics(ctx: RpcContext, _: Empty) -> Result<Metrics, Error> {
match ctx.metrics_cache.read().await.clone() {
None => Err(Error {
source: color_eyre::eyre::eyre!("No Metrics Found"),
kind: ErrorKind::NotFound,
revision: None,
}),
Some(metrics_val) => Ok(metrics_val),
}
pub async fn metrics(ctx: RpcContext) -> Result<Metrics, Error> {
ctx.metrics_cache.read().or_not_found("No Metrics Found")
}
#[derive(Deserialize, Serialize, Clone, Debug, TS)]
#[serde(rename_all = "camelCase")]
pub struct MetricsFollowResponse {
pub guid: Guid,
pub metrics: Metrics,
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct MetricsFollowParams {
#[ts(skip)]
#[serde(rename = "__auth_session")] // from Auth middleware
session: Option<InternedString>,
}
pub async fn metrics_follow(
ctx: RpcContext,
MetricsFollowParams { session }: MetricsFollowParams,
) -> Result<MetricsFollowResponse, Error> {
let mut local_cache = ctx.metrics_cache.clone();
let metrics = local_cache
.peek_and_mark_seen(|m| m.clone())
.or_not_found("No Metrics Found")?;
let guid = Guid::new();
ctx.rpc_continuations
.add(
guid.clone(),
RpcContinuation::ws_authed(
ctx.clone(),
session,
|mut ws| async move {
let res = async {
loop {
use axum::extract::ws::Message;
tokio::select! {
_ = local_cache.changed() => {
ws.send(Message::Text(
local_cache
.peek(|m| serde_json::to_string(&m))
.with_kind(ErrorKind::Serialization)?
)).await.with_kind(ErrorKind::Network)?;
}
msg = ws.try_next() => {
if msg.with_kind(crate::ErrorKind::Network)?.is_none() {
break;
}
}
}
}
Ok::<_, Error>("complete")
}
.await;
ws.close_result(res).await.log_err();
},
Duration::from_secs(30),
),
)
.await;
Ok(MetricsFollowResponse { guid, metrics })
}
pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
cache: &RwLock<Option<Metrics>>,
cache: &Watch<Option<Metrics>>,
mut mk_shutdown: F,
) {
// fetch init temp
@@ -475,31 +538,21 @@ pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
{
// lock for writing
let mut guard = cache.write().await;
// write
*guard = Some(Metrics {
general: MetricsGeneral {
temperature: init_temp,
},
memory: init_mem,
cpu: init_cpu,
disk: init_disk,
})
}
let should_launch_temp_task = init_temp.is_some();
cache.send(Some(Metrics {
general: MetricsGeneral {
temperature: init_temp,
},
memory: init_mem,
cpu: init_cpu,
disk: init_disk,
}));
let mut task_vec = Vec::new();
// launch persistent temp task
if cache
.read()
.await
.as_ref()
.unwrap()
.general
.temperature
.is_some()
{
if should_launch_temp_task {
task_vec.push(launch_temp_task(cache, mk_shutdown()).boxed());
}
// launch persistent cpu task
@@ -513,14 +566,15 @@ pub async fn launch_metrics_task<F: FnMut() -> Receiver<Option<Shutdown>>>(
}
async fn launch_temp_task(
cache: &RwLock<Option<Metrics>>,
cache: &Watch<Option<Metrics>>,
mut shutdown: Receiver<Option<Shutdown>>,
) {
loop {
match get_temp().await {
Ok(a) => {
let mut lock = cache.write().await;
(*lock).as_mut().unwrap().general.temperature = Some(a)
cache.send_if_modified(|c| {
c.as_mut().unwrap().general.temperature.replace(a) != Some(a)
});
}
Err(e) => {
tracing::error!("Could not get new temperature: {}", e);
@@ -535,7 +589,7 @@ async fn launch_temp_task(
}
async fn launch_cpu_task(
cache: &RwLock<Option<Metrics>>,
cache: &Watch<Option<Metrics>>,
mut init: ProcStat,
mut shutdown: Receiver<Option<Shutdown>>,
) {
@@ -543,8 +597,7 @@ async fn launch_cpu_task(
// read /proc/stat, diff against previous metrics, compute cpu load
match get_cpu_info(&mut init).await {
Ok(info) => {
let mut lock = cache.write().await;
(*lock).as_mut().unwrap().cpu = info;
cache.send_modify(|c| c.as_mut().unwrap().cpu = info);
}
Err(e) => {
tracing::error!("Could not get new CPU Metrics: {}", e);
@@ -558,16 +611,12 @@ async fn launch_cpu_task(
}
}
async fn launch_mem_task(
cache: &RwLock<Option<Metrics>>,
mut shutdown: Receiver<Option<Shutdown>>,
) {
async fn launch_mem_task(cache: &Watch<Option<Metrics>>, mut shutdown: Receiver<Option<Shutdown>>) {
loop {
// read /proc/meminfo
match get_mem_info().await {
Ok(a) => {
let mut lock = cache.write().await;
(*lock).as_mut().unwrap().memory = a;
cache.send_modify(|c| c.as_mut().unwrap().memory = a);
}
Err(e) => {
tracing::error!("Could not get new Memory Metrics: {}", e);
@@ -581,15 +630,22 @@ async fn launch_mem_task(
}
}
async fn launch_disk_task(
cache: &RwLock<Option<Metrics>>,
cache: &Watch<Option<Metrics>>,
mut shutdown: Receiver<Option<Shutdown>>,
) {
loop {
// run df and capture output
match get_disk_info().await {
Ok(a) => {
let mut lock = cache.write().await;
(*lock).as_mut().unwrap().disk = a;
cache.send_if_modified(|c| {
let c = c.as_mut().unwrap();
if c.disk != a {
c.disk = a;
true
} else {
false
}
});
}
Err(e) => {
tracing::error!("Could not get new Disk Metrics: {}", e);