From c552fdfc0f9d98cc47b39b9d7d22d9a585a88980 Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Tue, 27 Aug 2024 17:11:37 -0600 Subject: [PATCH] fixes #2651 (#2729) --- core/startos/src/context/rpc.rs | 103 ++++++++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 13 deletions(-) diff --git a/core/startos/src/context/rpc.rs b/core/startos/src/context/rpc.rs index 0db681d3b..5330c58bc 100644 --- a/core/startos/src/context/rpc.rs +++ b/core/startos/src/context/rpc.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::future::Future; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::ops::Deref; use std::path::PathBuf; @@ -6,6 +7,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use chrono::{TimeDelta, Utc}; +use helpers::NonDetachingJoinHandle; use imbl_value::InternedString; use josekit::jwk::Jwk; use reqwest::{Client, Proxy}; @@ -29,7 +32,7 @@ use crate::net::utils::{find_eth_iface, find_wifi_iface}; use crate::net::wifi::WpaCli; use crate::prelude::*; use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle}; -use crate::rpc_continuations::{OpenAuthedContinuations, RpcContinuations}; +use crate::rpc_continuations::{Guid, OpenAuthedContinuations, RpcContinuations}; use crate::service::effects::callbacks::ServiceCallbacks; use crate::service::ServiceMap; use crate::shutdown::Shutdown; @@ -63,6 +66,7 @@ pub struct RpcContextSeed { pub client: Client, pub hardware: Hardware, pub start_time: Instant, + pub crons: SyncMutex>>, #[cfg(feature = "dev")] pub dev: Dev, } @@ -94,12 +98,14 @@ impl InitRpcContextPhases { } pub struct CleanupInitPhases { + cleanup_sessions: PhaseProgressTrackerHandle, init_services: PhaseProgressTrackerHandle, check_dependencies: PhaseProgressTrackerHandle, } impl CleanupInitPhases { pub fn new(handle: &FullProgressTracker) -> Self { Self { + cleanup_sessions: handle.add_phase("Cleaning up sessions".into(), Some(1)), init_services: handle.add_phase("Initializing services".into(), Some(10)), check_dependencies: handle.add_phase("Checking dependencies".into(), Some(1)), } @@ -174,6 +180,8 @@ impl RpcContext { let ram = get_mem_info().await?.total.0 as u64 * 1024 * 1024; read_device_info.complete(); + let crons = SyncMutex::new(BTreeMap::new()); + if !db .peek() .await @@ -183,18 +191,24 @@ impl RpcContext { .de()? { let db = db.clone(); - tokio::spawn(async move { - while !check_time_is_synchronized().await.unwrap() { - tokio::time::sleep(Duration::from_secs(30)).await; - } - db.mutate(|v| { - v.as_public_mut() - .as_server_info_mut() - .as_ntp_synced_mut() - .ser(&true) - }) - .await - .unwrap() + crons.mutate(|c| { + c.insert( + Guid::new(), + tokio::spawn(async move { + while !check_time_is_synchronized().await.unwrap() { + tokio::time::sleep(Duration::from_secs(30)).await; + } + db.mutate(|v| { + v.as_public_mut() + .as_server_info_mut() + .as_ntp_synced_mut() + .ser(&true) + }) + .await + .unwrap() + }) + .into(), + ) }); } @@ -259,6 +273,7 @@ impl RpcContext { .with_kind(crate::ErrorKind::ParseUrl)?, hardware: Hardware { devices, ram }, start_time: Instant::now(), + crons, #[cfg(feature = "dev")] dev: Dev { lxc: Mutex::new(BTreeMap::new()), @@ -273,6 +288,7 @@ impl RpcContext { #[instrument(skip_all)] pub async fn shutdown(self) -> Result<(), Error> { + self.crons.mutate(|c| std::mem::take(c)); self.services.shutdown_all().await?; self.is_closed.store(true, Ordering::SeqCst); tracing::info!("RPC Context is shutdown"); @@ -280,14 +296,75 @@ impl RpcContext { Ok(()) } + pub fn add_cron + Send + 'static>(&self, fut: F) -> Guid { + let guid = Guid::new(); + self.crons + .mutate(|c| c.insert(guid.clone(), tokio::spawn(fut).into())); + guid + } + #[instrument(skip_all)] pub async fn cleanup_and_initialize( &self, CleanupInitPhases { + mut cleanup_sessions, init_services, mut check_dependencies, }: CleanupInitPhases, ) -> Result<(), Error> { + cleanup_sessions.start(); + self.db + .mutate(|db| { + if db.as_public().as_server_info().as_ntp_synced().de()? { + for id in db.as_private().as_sessions().keys()? { + if Utc::now() + - db.as_private() + .as_sessions() + .as_idx(&id) + .unwrap() + .de()? + .last_active + > TimeDelta::days(30) + { + db.as_private_mut().as_sessions_mut().remove(&id)?; + } + } + } + Ok(()) + }) + .await?; + let db = self.db.clone(); + self.add_cron(async move { + loop { + tokio::time::sleep(Duration::from_secs(86400)).await; + if let Err(e) = db + .mutate(|db| { + if db.as_public().as_server_info().as_ntp_synced().de()? { + for id in db.as_private().as_sessions().keys()? { + if Utc::now() + - db.as_private() + .as_sessions() + .as_idx(&id) + .unwrap() + .de()? + .last_active + > TimeDelta::days(30) + { + db.as_private_mut().as_sessions_mut().remove(&id)?; + } + } + } + Ok(()) + }) + .await + { + tracing::error!("Error in session cleanup cron: {e}"); + tracing::debug!("{e:?}"); + } + } + }); + cleanup_sessions.complete(); + self.services.init(&self, init_services).await?; tracing::info!("Initialized Package Managers");