use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, Utc}; use clap::Parser; use futures::future::BoxFuture; use imbl::OrdMap; use models::{ActionId, HealthCheckId, PackageId, ProcedureName}; use persistent_container::PersistentContainer; use rpc_toolkit::{from_fn_async, CallRemoteHandler, Handler, HandlerArgs}; use serde::{Deserialize, Serialize}; use start_stop::StartStop; use tokio::sync::{watch, Notify}; use crate::action::ActionResult; use crate::config::action::ConfigRes; use crate::context::{CliContext, RpcContext}; use crate::core::rpc_continuations::RequestGuid; use crate::db::model::{ CurrentDependencies, CurrentDependents, InstalledPackageInfo, PackageDataEntry, PackageDataEntryInstalled, PackageDataEntryMatchModel, StaticFiles, }; use crate::disk::mount::guard::GenericMountGuard; use crate::install::PKG_ARCHIVE_DIR; use crate::prelude::*; use crate::progress::{self, NamedProgress, Progress}; use crate::s9pk::S9pk; use crate::service::service_map::InstallProgressHandles; use crate::service::transition::{TempDesiredState, TransitionKind, TransitionState}; use crate::status::health_check::HealthCheckResult; use crate::status::{DependencyConfigErrors, MainStatus, Status}; use crate::util::actor::{Actor, BackgroundJobs, SimpleActor}; use crate::volume::data_dir; pub mod cli; mod config; mod control; pub mod persistent_container; mod rpc; pub mod service_effect_handler; pub mod service_map; mod start_stop; mod transition; mod util; pub use service_map::ServiceMap; pub const HEALTH_CHECK_COOLDOWN_SECONDS: u64 = 15; pub const HEALTH_CHECK_GRACE_PERIOD_SECONDS: u64 = 5; pub const SYNC_RETRY_COOLDOWN_SECONDS: u64 = 10; pub type Task<'a> = BoxFuture<'a, Result<(), Error>>; /// TODO pub enum BackupReturn { TODO, } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub enum LoadDisposition { Retry, Undo, } pub struct Service { actor: SimpleActor, seed: Arc, } impl Service { #[instrument(skip_all)] async fn new(ctx: RpcContext, s9pk: S9pk, start: StartStop) -> Result { let id = s9pk.as_manifest().id.clone(); let desired_state = watch::channel(start).0; let temp_desired_state = TempDesiredState(Arc::new(watch::channel(None).0)); let persistent_container = PersistentContainer::new( &ctx, s9pk, // desired_state.subscribe(), // temp_desired_state.subscribe(), ) .await?; let seed = Arc::new(ServiceActorSeed { id, running_status: persistent_container.running_status.subscribe(), persistent_container, ctx, desired_state, temp_desired_state, transition_state: Arc::new(watch::channel(None).0), synchronized: Arc::new(Notify::new()), }); seed.persistent_container .init(Arc::downgrade(&seed)) .await?; Ok(Self { actor: SimpleActor::new(ServiceActor(seed.clone())), seed, }) } #[instrument(skip_all)] pub async fn load( ctx: &RpcContext, id: &PackageId, disposition: LoadDisposition, ) -> Result, Error> { let handle_installed = { let ctx = ctx.clone(); move |s9pk: S9pk, i: Model| async move { for volume_id in &s9pk.as_manifest().volumes { let tmp_path = data_dir(&ctx.datadir, &s9pk.as_manifest().id.clone(), volume_id); if tokio::fs::metadata(&tmp_path).await.is_err() { tokio::fs::create_dir_all(&tmp_path).await?; } } let start_stop = if i.as_status().as_main().de()?.running() { StartStop::Start } else { StartStop::Stop }; Self::new(ctx, s9pk, start_stop).await.map(Some) } }; let s9pk_dir = ctx.datadir.join(PKG_ARCHIVE_DIR).join("installed"); // TODO: make this based on hash let s9pk_path = s9pk_dir.join(id).with_extension("s9pk"); match ctx .db .peek() .await .into_package_data() .into_idx(id) .map(|pde| pde.into_match()) { Some(PackageDataEntryMatchModel::Installing(_)) => { if disposition == LoadDisposition::Retry { if let Ok(s9pk) = S9pk::open(s9pk_path, Some(id)).await.map_err(|e| { tracing::error!("Error opening s9pk for install: {e}"); tracing::debug!("{e:?}") }) { if let Ok(service) = Self::install(ctx.clone(), s9pk, None, None) .await .map_err(|e| { tracing::error!("Error installing service: {e}"); tracing::debug!("{e:?}") }) { return Ok(Some(service)); } } } // TODO: delete s9pk? ctx.db .mutate(|v| v.as_package_data_mut().remove(id)) .await?; Ok(None) } Some(PackageDataEntryMatchModel::Updating(e)) => { if disposition == LoadDisposition::Retry && e.as_install_progress().de()?.phases.iter().any( |NamedProgress { name, progress }| { name.eq_ignore_ascii_case("download") && progress == &Progress::Complete(true) }, ) { if let Ok(s9pk) = S9pk::open(&s9pk_path, Some(id)).await.map_err(|e| { tracing::error!("Error opening s9pk for update: {e}"); tracing::debug!("{e:?}") }) { if let Ok(service) = Self::install( ctx.clone(), s9pk, Some(e.as_installed().as_manifest().as_version().de()?), None, ) .await .map_err(|e| { tracing::error!("Error installing service: {e}"); tracing::debug!("{e:?}") }) { return Ok(Some(service)); } } } let s9pk = S9pk::open(s9pk_path, Some(id)).await?; ctx.db .mutate({ let manifest = s9pk.as_manifest().clone(); |db| { db.as_package_data_mut() .as_idx_mut(&manifest.id) .or_not_found(&manifest.id)? .ser(&PackageDataEntry::Installed(PackageDataEntryInstalled { static_files: e.as_static_files().de()?, manifest, installed: e.as_installed().de()?, })) } }) .await?; handle_installed(s9pk, e.as_installed().clone()).await } Some(PackageDataEntryMatchModel::Removing(_)) | Some(PackageDataEntryMatchModel::Restoring(_)) => { if let Ok(s9pk) = S9pk::open(s9pk_path, Some(id)).await.map_err(|e| { tracing::error!("Error opening s9pk for removal: {e}"); tracing::debug!("{e:?}") }) { if let Ok(service) = Self::new(ctx.clone(), s9pk, StartStop::Stop) .await .map_err(|e| { tracing::error!("Error loading service for removal: {e}"); tracing::debug!("{e:?}") }) { if service .uninstall(None) .await .map_err(|e| { tracing::error!("Error uninstalling service: {e}"); tracing::debug!("{e:?}") }) .is_ok() { return Ok(None); } } } ctx.db .mutate(|v| v.as_package_data_mut().remove(id)) .await?; Ok(None) } Some(PackageDataEntryMatchModel::Installed(i)) => { handle_installed( S9pk::open(s9pk_path, Some(id)).await?, i.as_installed().clone(), ) .await } Some(PackageDataEntryMatchModel::Error(e)) => Err(Error::new( eyre!("Failed to parse PackageDataEntry, found {e:?}"), ErrorKind::Deserialization, )), None => Ok(None), } } #[instrument(skip_all)] pub async fn install( ctx: RpcContext, s9pk: S9pk, src_version: Option, progress: Option, ) -> Result { let manifest = s9pk.as_manifest().clone(); let developer_key = s9pk.as_archive().signer(); let icon = s9pk.icon_data_url().await?; let static_files = StaticFiles::local(&manifest.id, &manifest.version, icon); let service = Self::new(ctx.clone(), s9pk, StartStop::Stop).await?; service .seed .persistent_container .execute(ProcedureName::Init, to_value(&src_version)?, None) // TODO timeout .await .with_kind(ErrorKind::MigrationFailed)?; // TODO: handle cancellation if let Some(mut progress) = progress { progress.finalization_progress.complete(); progress.progress_handle.complete(); tokio::task::yield_now().await; } ctx.db .mutate(|d| { d.as_package_data_mut() .as_idx_mut(&manifest.id) .or_not_found(&manifest.id)? .ser(&PackageDataEntry::Installed(PackageDataEntryInstalled { installed: InstalledPackageInfo { current_dependencies: Default::default(), // TODO current_dependents: Default::default(), // TODO dependency_info: Default::default(), // TODO developer_key, status: Status { configured: false, // TODO main: MainStatus::Stopped, // TODO dependency_config_errors: Default::default(), // TODO }, interface_addresses: Default::default(), // TODO marketplace_url: None, // TODO manifest: manifest.clone(), last_backup: None, // TODO store: Value::Null, // TODO store_exposed_dependents: Default::default(), // TODO store_exposed_ui: Default::default(), // TODO }, manifest, static_files, })) }) .await?; Ok(service) } pub async fn restore( ctx: RpcContext, s9pk: S9pk, guard: impl GenericMountGuard, progress: Option, ) -> Result { // TODO Err(Error::new(eyre!("not yet implemented"), ErrorKind::Unknown)) } pub async fn get_config(&self) -> Result { let container = &self.seed.persistent_container; container .execute::( ProcedureName::GetConfig, Value::Null, Some(Duration::from_secs(30)), // TODO timeout ) .await .with_kind(ErrorKind::ConfigGen) } // TODO DO the Action Get pub async fn action(&self, id: ActionId, input: Value) -> Result { let container = &self.seed.persistent_container; container .execute::( ProcedureName::RunAction(id), input, Some(Duration::from_secs(30)), ) .await .with_kind(ErrorKind::Action) } pub async fn shutdown(self) -> Result<(), Error> { self.actor .shutdown(crate::util::actor::PendingMessageStrategy::FinishAll { timeout: None }) // TODO timeout .await; if let Some((hdl, shutdown)) = self.seed.persistent_container.rpc_server.send_replace(None) { shutdown.shutdown(); hdl.await.with_kind(ErrorKind::Cancelled)?; } Arc::try_unwrap(self.seed) .map_err(|_| { Error::new( eyre!("ServiceActorSeed held somewhere after actor shutdown"), ErrorKind::Unknown, ) })? .persistent_container .exit() .await?; Ok(()) } pub async fn uninstall(self, target_version: Option) -> Result<(), Error> { self.seed .persistent_container .execute(ProcedureName::Uninit, to_value(&target_version)?, None) // TODO timeout .await?; self.shutdown().await } pub async fn backup(&self, guard: impl GenericMountGuard) -> Result { // TODO Err(Error::new(eyre!("not yet implemented"), ErrorKind::Unknown)) } } #[derive(Clone)] struct RunningStatus { health: OrdMap, started: DateTime, } pub(self) struct ServiceActorSeed { ctx: RpcContext, id: PackageId, persistent_container: PersistentContainer, desired_state: watch::Sender, temp_desired_state: TempDesiredState, transition_state: Arc>>, running_status: watch::Receiver>, synchronized: Arc, } struct ServiceActor(Arc); impl Actor for ServiceActor { fn init(&mut self, jobs: &mut BackgroundJobs) { let seed = self.0.clone(); jobs.add_job(async move { let id = seed.id.clone(); let mut current = seed.persistent_container.current_state.subscribe(); let mut desired = seed.desired_state.subscribe(); let mut temp_desired = seed.temp_desired_state.subscribe(); let mut transition = seed.transition_state.subscribe(); let mut running = seed.running_status.clone(); loop { let (desired_state, current_state, transition_kind, running_status) = ( temp_desired.borrow().unwrap_or(*desired.borrow()), *current.borrow(), transition.borrow().as_ref().map(|t| t.kind()), running.borrow().clone(), ); if let Err(e) = async { seed.ctx .db .mutate(|d| { if let Some(i) = d .as_package_data_mut() .as_idx_mut(&id) .and_then(|p| p.as_installed_mut()) { i.as_status_mut().as_main_mut().ser(&match ( transition_kind, desired_state, current_state, running_status, ) { (Some(TransitionKind::Restarting), _, _, _) => { MainStatus::Restarting } (Some(TransitionKind::BackingUp), _, _, Some(status)) => { MainStatus::BackingUp { started: Some(status.started), health: status.health.clone(), } } (Some(TransitionKind::BackingUp), _, _, None) => { MainStatus::BackingUp { started: None, health: OrdMap::new(), } } (None, StartStop::Stop, StartStop::Stop, _) => { MainStatus::Stopped } (None, StartStop::Stop, StartStop::Start, _) => { MainStatus::Stopping { timeout: todo!("sigterm timeout"), } } (None, StartStop::Start, StartStop::Stop, _) => { MainStatus::Starting } (None, StartStop::Start, StartStop::Start, None) => { MainStatus::Starting } (None, StartStop::Start, StartStop::Start, Some(status)) => { MainStatus::Running { started: status.started, health: status.health.clone(), } } })?; } Ok(()) }) .await?; match (desired_state, current_state) { (StartStop::Start, StartStop::Stop) => { seed.persistent_container.start().await } (StartStop::Stop, StartStop::Start) => { seed.persistent_container .stop(todo!("s9pk sigterm timeout")) .await } _ => Ok(()), } } .await { tracing::error!("error synchronizing state of service: {e}"); tracing::debug!("{e:?}"); seed.synchronized.notify_waiters(); tracing::error!("Retrying in {}s...", SYNC_RETRY_COOLDOWN_SECONDS); tokio::time::sleep(Duration::from_secs(SYNC_RETRY_COOLDOWN_SECONDS)).await; continue; } seed.synchronized.notify_waiters(); tokio::select! { _ = current.changed() => (), _ = desired.changed() => (), _ = temp_desired.changed() => (), _ = transition.changed() => (), _ = running.changed() => (), } } }) } } #[derive(Deserialize, Serialize, Parser)] pub struct ConnectParams { pub id: PackageId, } pub async fn connect_rpc( ctx: RpcContext, ConnectParams { id }: ConnectParams, ) -> Result { let id_ref = &id; crate::lxc::connect( &ctx, ctx.services .get(&id) .await .as_ref() .or_not_found(lazy_format!("service for {id_ref}"))? .seed .persistent_container .lxc_container .get() .or_not_found(lazy_format!("container for {id_ref}"))?, ) .await } pub async fn connect_rpc_cli( handle_args: HandlerArgs, ) -> Result<(), Error> { let ctx = handle_args.context.clone(); let guid = CallRemoteHandler::::new(from_fn_async(connect_rpc)) .handle_async(handle_args) .await?; crate::lxc::connect_cli(&ctx, guid).await }