This commit is contained in:
Aiden McClelland
2024-08-27 17:11:37 -06:00
committed by GitHub
parent 4006dba9f1
commit c552fdfc0f

View File

@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::Deref;
use std::path::PathBuf;
@@ -6,6 +7,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use chrono::{TimeDelta, Utc};
use helpers::NonDetachingJoinHandle;
use imbl_value::InternedString;
use josekit::jwk::Jwk;
use reqwest::{Client, Proxy};
@@ -29,7 +32,7 @@ use crate::net::utils::{find_eth_iface, find_wifi_iface};
use crate::net::wifi::WpaCli;
use crate::prelude::*;
use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle};
use crate::rpc_continuations::{OpenAuthedContinuations, RpcContinuations};
use crate::rpc_continuations::{Guid, OpenAuthedContinuations, RpcContinuations};
use crate::service::effects::callbacks::ServiceCallbacks;
use crate::service::ServiceMap;
use crate::shutdown::Shutdown;
@@ -63,6 +66,7 @@ pub struct RpcContextSeed {
pub client: Client,
pub hardware: Hardware,
pub start_time: Instant,
pub crons: SyncMutex<BTreeMap<Guid, NonDetachingJoinHandle<()>>>,
#[cfg(feature = "dev")]
pub dev: Dev,
}
@@ -94,12 +98,14 @@ impl InitRpcContextPhases {
}
pub struct CleanupInitPhases {
cleanup_sessions: PhaseProgressTrackerHandle,
init_services: PhaseProgressTrackerHandle,
check_dependencies: PhaseProgressTrackerHandle,
}
impl CleanupInitPhases {
pub fn new(handle: &FullProgressTracker) -> Self {
Self {
cleanup_sessions: handle.add_phase("Cleaning up sessions".into(), Some(1)),
init_services: handle.add_phase("Initializing services".into(), Some(10)),
check_dependencies: handle.add_phase("Checking dependencies".into(), Some(1)),
}
@@ -174,6 +180,8 @@ impl RpcContext {
let ram = get_mem_info().await?.total.0 as u64 * 1024 * 1024;
read_device_info.complete();
let crons = SyncMutex::new(BTreeMap::new());
if !db
.peek()
.await
@@ -183,18 +191,24 @@ impl RpcContext {
.de()?
{
let db = db.clone();
tokio::spawn(async move {
while !check_time_is_synchronized().await.unwrap() {
tokio::time::sleep(Duration::from_secs(30)).await;
}
db.mutate(|v| {
v.as_public_mut()
.as_server_info_mut()
.as_ntp_synced_mut()
.ser(&true)
})
.await
.unwrap()
crons.mutate(|c| {
c.insert(
Guid::new(),
tokio::spawn(async move {
while !check_time_is_synchronized().await.unwrap() {
tokio::time::sleep(Duration::from_secs(30)).await;
}
db.mutate(|v| {
v.as_public_mut()
.as_server_info_mut()
.as_ntp_synced_mut()
.ser(&true)
})
.await
.unwrap()
})
.into(),
)
});
}
@@ -259,6 +273,7 @@ impl RpcContext {
.with_kind(crate::ErrorKind::ParseUrl)?,
hardware: Hardware { devices, ram },
start_time: Instant::now(),
crons,
#[cfg(feature = "dev")]
dev: Dev {
lxc: Mutex::new(BTreeMap::new()),
@@ -273,6 +288,7 @@ impl RpcContext {
#[instrument(skip_all)]
pub async fn shutdown(self) -> Result<(), Error> {
self.crons.mutate(|c| std::mem::take(c));
self.services.shutdown_all().await?;
self.is_closed.store(true, Ordering::SeqCst);
tracing::info!("RPC Context is shutdown");
@@ -280,14 +296,75 @@ impl RpcContext {
Ok(())
}
pub fn add_cron<F: Future<Output = ()> + Send + 'static>(&self, fut: F) -> Guid {
let guid = Guid::new();
self.crons
.mutate(|c| c.insert(guid.clone(), tokio::spawn(fut).into()));
guid
}
#[instrument(skip_all)]
pub async fn cleanup_and_initialize(
&self,
CleanupInitPhases {
mut cleanup_sessions,
init_services,
mut check_dependencies,
}: CleanupInitPhases,
) -> Result<(), Error> {
cleanup_sessions.start();
self.db
.mutate(|db| {
if db.as_public().as_server_info().as_ntp_synced().de()? {
for id in db.as_private().as_sessions().keys()? {
if Utc::now()
- db.as_private()
.as_sessions()
.as_idx(&id)
.unwrap()
.de()?
.last_active
> TimeDelta::days(30)
{
db.as_private_mut().as_sessions_mut().remove(&id)?;
}
}
}
Ok(())
})
.await?;
let db = self.db.clone();
self.add_cron(async move {
loop {
tokio::time::sleep(Duration::from_secs(86400)).await;
if let Err(e) = db
.mutate(|db| {
if db.as_public().as_server_info().as_ntp_synced().de()? {
for id in db.as_private().as_sessions().keys()? {
if Utc::now()
- db.as_private()
.as_sessions()
.as_idx(&id)
.unwrap()
.de()?
.last_active
> TimeDelta::days(30)
{
db.as_private_mut().as_sessions_mut().remove(&id)?;
}
}
}
Ok(())
})
.await
{
tracing::error!("Error in session cleanup cron: {e}");
tracing::debug!("{e:?}");
}
}
});
cleanup_sessions.complete();
self.services.init(&self, init_services).await?;
tracing::info!("Initialized Package Managers");