mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 02:11:53 +00:00
* fix live-build resolv.conf * improved debuggability * wip: start-tunnel * fixes for trixie and tor * non-free-firmware on trixie * wip * web server WIP * wip: tls refactor * FE patchdb, mocks, and most endpoints * fix editing records and patch mocks * refactor complete * finish api * build and formatter update * minor change toi viewing addresses and fix build * fixes * more providers * endpoint for getting config * fix tests * api fixes * wip: separate port forward controller into parts * simplify iptables rules * bump sdk * misc fixes * predict next subnet and ip, use wan ips, and form validation * refactor: break big components apart and address todos (#3043) * refactor: break big components apart and address todos * starttunnel readme, fix pf mocks, fix adding tor domain in startos --------- Co-authored-by: Matt Hill <mattnine@protonmail.com> * better tui * tui tweaks * fix: address comments * better regex for subnet * fixes * better validation * handle rpc errors * build fixes * fix: address comments (#3044) * fix: address comments * fix unread notification mocks * fix row click for notification --------- Co-authored-by: Matt Hill <mattnine@protonmail.com> * fix raspi build * fix build * fix build * fix build * fix build * try to fix build * fix tests * fix tests * fix rsync tests * delete useless effectful test --------- Co-authored-by: Matt Hill <mattnine@protonmail.com> Co-authored-by: Alex Inkin <alexander@inkin.ru>
533 lines
19 KiB
Rust
533 lines
19 KiB
Rust
use std::collections::{BTreeMap, BTreeSet};
|
|
use std::ffi::OsStr;
|
|
use std::future::Future;
|
|
use std::ops::Deref;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::time::Duration;
|
|
|
|
use chrono::{TimeDelta, Utc};
|
|
use helpers::NonDetachingJoinHandle;
|
|
use imbl::OrdMap;
|
|
use imbl_value::InternedString;
|
|
use itertools::Itertools;
|
|
use josekit::jwk::Jwk;
|
|
use models::{ActionId, PackageId};
|
|
use reqwest::{Client, Proxy};
|
|
use rpc_toolkit::yajrc::RpcError;
|
|
use rpc_toolkit::{CallRemote, Context, Empty};
|
|
use tokio::sync::{RwLock, broadcast, oneshot, watch};
|
|
use tokio::time::Instant;
|
|
use tracing::instrument;
|
|
|
|
use super::setup::CURRENT_SECRET;
|
|
use crate::DATA_DIR;
|
|
use crate::account::AccountInfo;
|
|
use crate::auth::Sessions;
|
|
use crate::context::config::ServerConfig;
|
|
use crate::db::model::Database;
|
|
use crate::db::model::package::TaskSeverity;
|
|
use crate::disk::OsPartitionInfo;
|
|
use crate::init::{InitResult, check_time_is_synchronized};
|
|
use crate::install::PKG_ARCHIVE_DIR;
|
|
use crate::lxc::LxcManager;
|
|
use crate::net::gateway::UpgradableListener;
|
|
use crate::net::net_controller::{NetController, NetService};
|
|
use crate::net::socks::DEFAULT_SOCKS_LISTEN;
|
|
use crate::net::utils::{find_eth_iface, find_wifi_iface};
|
|
use crate::net::web_server::WebServerAcceptorSetter;
|
|
use crate::net::wifi::WpaCli;
|
|
use crate::prelude::*;
|
|
use crate::progress::{FullProgressTracker, PhaseProgressTrackerHandle};
|
|
use crate::rpc_continuations::{Guid, OpenAuthedContinuations, RpcContinuations};
|
|
use crate::service::ServiceMap;
|
|
use crate::service::action::update_tasks;
|
|
use crate::service::effects::callbacks::ServiceCallbacks;
|
|
use crate::shutdown::Shutdown;
|
|
use crate::util::io::delete_file;
|
|
use crate::util::lshw::LshwDevice;
|
|
use crate::util::sync::{SyncMutex, SyncRwLock, Watch};
|
|
|
|
pub struct RpcContextSeed {
|
|
is_closed: AtomicBool,
|
|
pub os_partitions: OsPartitionInfo,
|
|
pub wifi_interface: Option<String>,
|
|
pub ethernet_interface: String,
|
|
pub disk_guid: Arc<String>,
|
|
pub ephemeral_sessions: SyncMutex<Sessions>,
|
|
pub db: TypedPatchDb<Database>,
|
|
pub sync_db: watch::Sender<u64>,
|
|
pub account: SyncRwLock<AccountInfo>,
|
|
pub net_controller: Arc<NetController>,
|
|
pub os_net_service: NetService,
|
|
pub s9pk_arch: Option<&'static str>,
|
|
pub services: ServiceMap,
|
|
pub cancellable_installs: SyncMutex<BTreeMap<PackageId, oneshot::Sender<()>>>,
|
|
pub metrics_cache: Watch<Option<crate::system::Metrics>>,
|
|
pub shutdown: broadcast::Sender<Option<Shutdown>>,
|
|
pub lxc_manager: Arc<LxcManager>,
|
|
pub open_authed_continuations: OpenAuthedContinuations<Option<InternedString>>,
|
|
pub rpc_continuations: RpcContinuations,
|
|
pub callbacks: Arc<ServiceCallbacks>,
|
|
pub wifi_manager: Arc<RwLock<Option<WpaCli>>>,
|
|
pub current_secret: Arc<Jwk>,
|
|
pub client: Client,
|
|
pub start_time: Instant,
|
|
pub crons: SyncMutex<BTreeMap<Guid, NonDetachingJoinHandle<()>>>,
|
|
}
|
|
impl Drop for RpcContextSeed {
|
|
fn drop(&mut self) {
|
|
tracing::info!("RpcContext is dropped");
|
|
}
|
|
}
|
|
|
|
pub struct Hardware {
|
|
pub devices: Vec<LshwDevice>,
|
|
pub ram: u64,
|
|
}
|
|
|
|
pub struct InitRpcContextPhases {
|
|
load_db: PhaseProgressTrackerHandle,
|
|
init_net_ctrl: PhaseProgressTrackerHandle,
|
|
cleanup_init: CleanupInitPhases,
|
|
run_migrations: PhaseProgressTrackerHandle,
|
|
}
|
|
impl InitRpcContextPhases {
|
|
pub fn new(handle: &FullProgressTracker) -> Self {
|
|
Self {
|
|
load_db: handle.add_phase("Loading database".into(), Some(5)),
|
|
init_net_ctrl: handle.add_phase("Initializing network".into(), Some(1)),
|
|
cleanup_init: CleanupInitPhases::new(handle),
|
|
run_migrations: handle.add_phase("Running migrations".into(), Some(10)),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct CleanupInitPhases {
|
|
cleanup_sessions: PhaseProgressTrackerHandle,
|
|
init_services: PhaseProgressTrackerHandle,
|
|
prune_s9pks: PhaseProgressTrackerHandle,
|
|
check_tasks: PhaseProgressTrackerHandle,
|
|
}
|
|
impl CleanupInitPhases {
|
|
pub fn new(handle: &FullProgressTracker) -> Self {
|
|
Self {
|
|
cleanup_sessions: handle.add_phase("Cleaning up sessions".into(), Some(1)),
|
|
init_services: handle.add_phase("Initializing services".into(), Some(10)),
|
|
prune_s9pks: handle.add_phase("Pruning S9PKs".into(), Some(1)),
|
|
check_tasks: handle.add_phase("Checking action requests".into(), Some(1)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct RpcContext(Arc<RpcContextSeed>);
|
|
impl RpcContext {
|
|
#[instrument(skip_all)]
|
|
pub async fn init(
|
|
webserver: &WebServerAcceptorSetter<UpgradableListener>,
|
|
config: &ServerConfig,
|
|
disk_guid: Arc<String>,
|
|
init_result: Option<InitResult>,
|
|
InitRpcContextPhases {
|
|
mut load_db,
|
|
mut init_net_ctrl,
|
|
cleanup_init,
|
|
run_migrations,
|
|
}: InitRpcContextPhases,
|
|
) -> Result<Self, Error> {
|
|
let socks_proxy = config.socks_listen.unwrap_or(DEFAULT_SOCKS_LISTEN);
|
|
let (shutdown, _) = tokio::sync::broadcast::channel(1);
|
|
|
|
load_db.start();
|
|
let db = if let Some(InitResult { net_ctrl, .. }) = &init_result {
|
|
net_ctrl.db.clone()
|
|
} else {
|
|
TypedPatchDb::<Database>::load(config.db().await?).await?
|
|
};
|
|
let peek = db.peek().await;
|
|
let account = AccountInfo::load(&peek)?;
|
|
load_db.complete();
|
|
tracing::info!("Opened PatchDB");
|
|
|
|
init_net_ctrl.start();
|
|
let (net_controller, os_net_service) = if let Some(InitResult {
|
|
net_ctrl,
|
|
os_net_service,
|
|
}) = init_result
|
|
{
|
|
(net_ctrl, os_net_service)
|
|
} else {
|
|
let net_ctrl =
|
|
Arc::new(NetController::init(db.clone(), &account.hostname, socks_proxy).await?);
|
|
webserver.try_upgrade(|a| net_ctrl.net_iface.watcher.upgrade_listener(a))?;
|
|
let os_net_service = net_ctrl.os_bindings().await?;
|
|
(net_ctrl, os_net_service)
|
|
};
|
|
init_net_ctrl.complete();
|
|
tracing::info!("Initialized Net Controller");
|
|
|
|
let services = ServiceMap::default();
|
|
let metrics_cache = Watch::<Option<crate::system::Metrics>>::new(None);
|
|
let socks_proxy_url = format!("socks5h://{socks_proxy}");
|
|
|
|
let crons = SyncMutex::new(BTreeMap::new());
|
|
|
|
if !db
|
|
.peek()
|
|
.await
|
|
.as_public()
|
|
.as_server_info()
|
|
.as_ntp_synced()
|
|
.de()?
|
|
{
|
|
let db = db.clone();
|
|
crons.mutate(|c| {
|
|
c.insert(
|
|
Guid::new(),
|
|
tokio::spawn(async move {
|
|
while !check_time_is_synchronized().await.unwrap() {
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
}
|
|
db.mutate(|v| {
|
|
v.as_public_mut()
|
|
.as_server_info_mut()
|
|
.as_ntp_synced_mut()
|
|
.ser(&true)
|
|
})
|
|
.await
|
|
.result
|
|
.log_err();
|
|
})
|
|
.into(),
|
|
)
|
|
});
|
|
}
|
|
|
|
let wifi_interface = find_wifi_iface().await?;
|
|
|
|
let seed = Arc::new(RpcContextSeed {
|
|
is_closed: AtomicBool::new(false),
|
|
os_partitions: config.os_partitions.clone().ok_or_else(|| {
|
|
Error::new(
|
|
eyre!("OS Partition Information Missing"),
|
|
ErrorKind::Filesystem,
|
|
)
|
|
})?,
|
|
wifi_interface: wifi_interface.clone(),
|
|
ethernet_interface: if let Some(eth) = config.ethernet_interface.clone() {
|
|
eth
|
|
} else {
|
|
find_eth_iface().await?
|
|
},
|
|
disk_guid,
|
|
ephemeral_sessions: SyncMutex::new(Sessions::new()),
|
|
sync_db: watch::Sender::new(db.sequence().await),
|
|
db,
|
|
account: SyncRwLock::new(account),
|
|
callbacks: net_controller.callbacks.clone(),
|
|
net_controller,
|
|
os_net_service,
|
|
s9pk_arch: if config.multi_arch_s9pks.unwrap_or(false) {
|
|
None
|
|
} else {
|
|
Some(crate::ARCH)
|
|
},
|
|
services,
|
|
cancellable_installs: SyncMutex::new(BTreeMap::new()),
|
|
metrics_cache,
|
|
shutdown,
|
|
lxc_manager: Arc::new(LxcManager::new()),
|
|
open_authed_continuations: OpenAuthedContinuations::new(),
|
|
rpc_continuations: RpcContinuations::new(),
|
|
wifi_manager: Arc::new(RwLock::new(wifi_interface.clone().map(|i| WpaCli::init(i)))),
|
|
current_secret: Arc::new(
|
|
Jwk::generate_ec_key(josekit::jwk::alg::ec::EcCurve::P256).map_err(|e| {
|
|
tracing::debug!("{:?}", e);
|
|
tracing::error!("Couldn't generate ec key");
|
|
Error::new(
|
|
color_eyre::eyre::eyre!("Couldn't generate ec key"),
|
|
crate::ErrorKind::Unknown,
|
|
)
|
|
})?,
|
|
),
|
|
client: Client::builder()
|
|
.proxy(Proxy::all(socks_proxy_url)?)
|
|
.build()
|
|
.with_kind(crate::ErrorKind::ParseUrl)?,
|
|
start_time: Instant::now(),
|
|
crons,
|
|
});
|
|
|
|
let res = Self(seed.clone());
|
|
res.cleanup_and_initialize(cleanup_init).await?;
|
|
tracing::info!("Cleaned up transient states");
|
|
|
|
crate::version::post_init(&res, run_migrations).await?;
|
|
tracing::info!("Completed migrations");
|
|
Ok(res)
|
|
}
|
|
|
|
#[instrument(skip_all)]
|
|
pub async fn shutdown(self) -> Result<(), Error> {
|
|
self.crons.mutate(|c| std::mem::take(c));
|
|
self.services.shutdown_all().await?;
|
|
self.is_closed.store(true, Ordering::SeqCst);
|
|
tracing::info!("RpcContext is shutdown");
|
|
Ok(())
|
|
}
|
|
|
|
pub fn add_cron<F: Future<Output = ()> + Send + 'static>(&self, fut: F) -> Guid {
|
|
let guid = Guid::new();
|
|
self.crons
|
|
.mutate(|c| c.insert(guid.clone(), tokio::spawn(fut).into()));
|
|
guid
|
|
}
|
|
|
|
#[instrument(skip_all)]
|
|
pub async fn cleanup_and_initialize(
|
|
&self,
|
|
CleanupInitPhases {
|
|
mut cleanup_sessions,
|
|
mut init_services,
|
|
mut prune_s9pks,
|
|
mut check_tasks,
|
|
}: CleanupInitPhases,
|
|
) -> Result<(), Error> {
|
|
cleanup_sessions.start();
|
|
self.db
|
|
.mutate(|db| {
|
|
if db.as_public().as_server_info().as_ntp_synced().de()? {
|
|
for id in db.as_private().as_sessions().keys()? {
|
|
if Utc::now()
|
|
- db.as_private()
|
|
.as_sessions()
|
|
.as_idx(&id)
|
|
.unwrap()
|
|
.de()?
|
|
.last_active
|
|
> TimeDelta::days(30)
|
|
{
|
|
db.as_private_mut().as_sessions_mut().remove(&id)?;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
})
|
|
.await
|
|
.result?;
|
|
let db = self.db.clone();
|
|
self.add_cron(async move {
|
|
loop {
|
|
tokio::time::sleep(Duration::from_secs(86400)).await;
|
|
if let Err(e) = db
|
|
.mutate(|db| {
|
|
if db.as_public().as_server_info().as_ntp_synced().de()? {
|
|
for id in db.as_private().as_sessions().keys()? {
|
|
if Utc::now()
|
|
- db.as_private()
|
|
.as_sessions()
|
|
.as_idx(&id)
|
|
.unwrap()
|
|
.de()?
|
|
.last_active
|
|
> TimeDelta::days(30)
|
|
{
|
|
db.as_private_mut().as_sessions_mut().remove(&id)?;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
})
|
|
.await
|
|
.result
|
|
{
|
|
tracing::error!("Error in session cleanup cron: {e}");
|
|
tracing::debug!("{e:?}");
|
|
}
|
|
}
|
|
});
|
|
cleanup_sessions.complete();
|
|
|
|
init_services.start();
|
|
self.services.init(&self).await?;
|
|
init_services.complete();
|
|
|
|
prune_s9pks.start();
|
|
let peek = self.db.peek().await;
|
|
let keep = peek
|
|
.as_public()
|
|
.as_package_data()
|
|
.as_entries()?
|
|
.into_iter()
|
|
.map(|(_, pde)| pde.as_s9pk().de())
|
|
.collect::<Result<BTreeSet<PathBuf>, Error>>()?;
|
|
let installed_dir = &Path::new(DATA_DIR).join(PKG_ARCHIVE_DIR).join("installed");
|
|
if tokio::fs::metadata(&installed_dir).await.is_ok() {
|
|
let mut dir = tokio::fs::read_dir(&installed_dir)
|
|
.await
|
|
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("dir {installed_dir:?}")))?;
|
|
while let Some(file) = dir
|
|
.next_entry()
|
|
.await
|
|
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("dir {installed_dir:?}")))?
|
|
{
|
|
let path = file.path();
|
|
if path.extension() == Some(OsStr::new("s9pk")) && !keep.contains(&path) {
|
|
delete_file(path).await?;
|
|
}
|
|
}
|
|
}
|
|
prune_s9pks.complete();
|
|
|
|
check_tasks.start();
|
|
let mut action_input: OrdMap<PackageId, BTreeMap<ActionId, Value>> = OrdMap::new();
|
|
let tasks: BTreeSet<_> = peek
|
|
.as_public()
|
|
.as_package_data()
|
|
.as_entries()?
|
|
.into_iter()
|
|
.map(|(_, pde)| {
|
|
Ok(pde.as_tasks().as_entries()?.into_iter().map(|(_, r)| {
|
|
Ok::<_, Error>((
|
|
r.as_task().as_package_id().de()?,
|
|
r.as_task().as_action_id().de()?,
|
|
))
|
|
}))
|
|
})
|
|
.flatten_ok()
|
|
.map(|a| a.and_then(|a| a))
|
|
.try_collect()?;
|
|
let procedure_id = Guid::new();
|
|
for (package_id, action_id) in tasks {
|
|
if let Some(service) = self.services.get(&package_id).await.as_ref() {
|
|
if let Some(input) = service
|
|
.get_action_input(procedure_id.clone(), action_id.clone())
|
|
.await
|
|
.log_err()
|
|
.flatten()
|
|
.and_then(|i| i.value)
|
|
{
|
|
action_input
|
|
.entry(package_id)
|
|
.or_default()
|
|
.insert(action_id, input);
|
|
}
|
|
}
|
|
}
|
|
for id in
|
|
self.db
|
|
.mutate::<Vec<PackageId>>(|db| {
|
|
for (package_id, action_input) in &action_input {
|
|
for (action_id, input) in action_input {
|
|
for (_, pde) in
|
|
db.as_public_mut().as_package_data_mut().as_entries_mut()?
|
|
{
|
|
pde.as_tasks_mut().mutate(|tasks| {
|
|
Ok(update_tasks(tasks, package_id, action_id, input, false))
|
|
})?;
|
|
}
|
|
}
|
|
}
|
|
db.as_public()
|
|
.as_package_data()
|
|
.as_entries()?
|
|
.into_iter()
|
|
.filter_map(|(id, pkg)| {
|
|
(|| {
|
|
if pkg.as_tasks().de()?.into_iter().any(|(_, t)| {
|
|
t.active && t.task.severity == TaskSeverity::Critical
|
|
}) {
|
|
Ok(Some(id))
|
|
} else {
|
|
Ok(None)
|
|
}
|
|
})()
|
|
.transpose()
|
|
})
|
|
.collect()
|
|
})
|
|
.await
|
|
.result?
|
|
{
|
|
let svc = self.services.get(&id).await;
|
|
if let Some(svc) = &*svc {
|
|
svc.stop(procedure_id.clone(), false).await?;
|
|
}
|
|
}
|
|
check_tasks.complete();
|
|
|
|
Ok(())
|
|
}
|
|
pub async fn call_remote<RemoteContext>(
|
|
&self,
|
|
method: &str,
|
|
params: Value,
|
|
) -> Result<Value, RpcError>
|
|
where
|
|
Self: CallRemote<RemoteContext>,
|
|
{
|
|
<Self as CallRemote<RemoteContext, Empty>>::call_remote(&self, method, params, Empty {})
|
|
.await
|
|
}
|
|
pub async fn call_remote_with<RemoteContext, T>(
|
|
&self,
|
|
method: &str,
|
|
params: Value,
|
|
extra: T,
|
|
) -> Result<Value, RpcError>
|
|
where
|
|
Self: CallRemote<RemoteContext, T>,
|
|
{
|
|
<Self as CallRemote<RemoteContext, T>>::call_remote(&self, method, params, extra).await
|
|
}
|
|
}
|
|
impl AsRef<Client> for RpcContext {
|
|
fn as_ref(&self) -> &Client {
|
|
&self.client
|
|
}
|
|
}
|
|
impl AsRef<Jwk> for RpcContext {
|
|
fn as_ref(&self) -> &Jwk {
|
|
&CURRENT_SECRET
|
|
}
|
|
}
|
|
impl AsRef<RpcContinuations> for RpcContext {
|
|
fn as_ref(&self) -> &RpcContinuations {
|
|
&self.rpc_continuations
|
|
}
|
|
}
|
|
impl AsRef<OpenAuthedContinuations<Option<InternedString>>> for RpcContext {
|
|
fn as_ref(&self) -> &OpenAuthedContinuations<Option<InternedString>> {
|
|
&self.open_authed_continuations
|
|
}
|
|
}
|
|
impl Context for RpcContext {}
|
|
impl Deref for RpcContext {
|
|
type Target = RpcContextSeed;
|
|
fn deref(&self) -> &Self::Target {
|
|
#[cfg(feature = "unstable")]
|
|
if self.0.is_closed.load(Ordering::SeqCst) {
|
|
panic!(
|
|
"RpcContext used after shutdown! {}",
|
|
tracing_error::SpanTrace::capture()
|
|
);
|
|
}
|
|
&self.0
|
|
}
|
|
}
|
|
impl Drop for RpcContext {
|
|
fn drop(&mut self) {
|
|
#[cfg(feature = "unstable")]
|
|
if self.0.is_closed.load(Ordering::SeqCst) {
|
|
let count = Arc::strong_count(&self.0) - 1;
|
|
tracing::info!("RpcContext dropped. {} left.", count);
|
|
if count > 0 {
|
|
tracing::debug!("{}", std::backtrace::Backtrace::force_capture());
|
|
tracing::debug!("{:?}", eyre!(""))
|
|
}
|
|
}
|
|
}
|
|
}
|