diff --git a/backend/src/action.rs b/backend/src/action.rs index bf9844ecf..e397e8f2f 100644 --- a/backend/src/action.rs +++ b/backend/src/action.rs @@ -65,7 +65,7 @@ impl Action { image_ids: &BTreeSet, ) -> Result<(), Error> { self.implementation - .validate(container, eos_version, volumes, image_ids, true) + .validate(eos_version, volumes, image_ids, true) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, diff --git a/backend/src/backup/backup_bulk.rs b/backend/src/backup/backup_bulk.rs index 6e4fb88dd..c97eeb0d6 100644 --- a/backend/src/backup/backup_bulk.rs +++ b/backend/src/backup/backup_bulk.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use chrono::Utc; use clap::ArgMatches; @@ -7,8 +8,7 @@ use color_eyre::eyre::eyre; use helpers::AtomicFile; use patch_db::{DbHandle, LockType, PatchDbHandle}; use rpc_toolkit::command; -use tokio::io::AsyncWriteExt; -use tokio::process::Command; +use tokio::{io::AsyncWriteExt, sync::Mutex}; use tracing::instrument; use super::target::BackupTargetId; @@ -21,12 +21,12 @@ use crate::db::model::BackupProgress; use crate::disk::mount::backup::BackupMountGuard; use crate::disk::mount::filesystem::ReadWrite; use crate::disk::mount::guard::TmpMountGuard; +use crate::manager::BackupReturn; use crate::notifications::NotificationLevel; use crate::s9pk::manifest::PackageId; -use crate::status::MainStatus; +use crate::util::display_none; use crate::util::io::dir_copy; use crate::util::serde::IoFormat; -use crate::util::{display_none, Invoke}; use crate::version::VersionT; use crate::{Error, ErrorKind, ResultExt}; @@ -206,10 +206,12 @@ async fn assure_backing_up( async fn perform_backup( ctx: &RpcContext, mut db: Db, - mut backup_guard: BackupMountGuard, + backup_guard: BackupMountGuard, package_ids: &BTreeSet, ) -> Result, Error> { let mut backup_report = BTreeMap::new(); + let backup_guard = Arc::new(Mutex::new(backup_guard)); + for package_id in crate::db::DatabaseModel::new() .package_data() .keys(&mut db) @@ -231,93 +233,55 @@ async fn perform_backup( }; let main_status_model = installed_model.clone().status().main(); - main_status_model.lock(&mut tx, LockType::Write).await?; - let (started, health) = match main_status_model.get(&mut tx).await?.into_owned() { - MainStatus::Starting { .. } => (Some(Utc::now()), Default::default()), - MainStatus::Running { started, health } => (Some(started), health.clone()), - MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { - (None, Default::default()) - } - MainStatus::BackingUp { .. } => { - backup_report.insert( - package_id, - PackageBackupReport { - error: Some( - "Can't do backup because service is in a backing up state".to_owned(), - ), - }, - ); - continue; - } - }; - main_status_model - .put( - &mut tx, - &MainStatus::BackingUp { - started, - health: health.clone(), - }, - ) - .await?; - tx.save().await?; // drop locks + let manifest = installed_model.clone().manifest().get(&mut tx).await?; - let manifest = installed_model.clone().manifest().get(&mut db).await?; - - ctx.managers + let (response, report) = match ctx + .managers .get(&(manifest.id.clone(), manifest.version.clone())) .await .ok_or_else(|| { Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest) })? - .synchronize() - .await; - - let mut tx = db.begin().await?; - - installed_model.lock(&mut tx, LockType::Write).await?; - - let guard = backup_guard.mount_package_backup(&package_id).await?; - let res = manifest - .backup - .create( - ctx, - &mut tx, - &package_id, - &manifest.title, - &manifest.version, - &manifest.interfaces, - &manifest.volumes, - ) - .await; - guard.unmount().await?; + .backup(backup_guard.clone()) + .await + { + BackupReturn::Ran { report, res } => (res, report), + BackupReturn::AlreadyRunning(report) => { + backup_report.insert(package_id, report); + continue; + } + BackupReturn::Error(error) => { + tracing::warn!("Backup thread error"); + tracing::debug!("{error:?}"); + backup_report.insert( + package_id, + PackageBackupReport { + error: Some("Backup thread error".to_owned()), + }, + ); + continue; + } + }; backup_report.insert( package_id.clone(), PackageBackupReport { - error: res.as_ref().err().map(|e| e.to_string()), + error: response.as_ref().err().map(|e| e.to_string()), }, ); - if let Ok(pkg_meta) = res { + if let Ok(pkg_meta) = response { installed_model .last_backup() .put(&mut tx, &Some(pkg_meta.timestamp)) .await?; backup_guard + .lock() + .await .metadata .package_backups .insert(package_id.clone(), pkg_meta); } - main_status_model - .put( - &mut tx, - &match started { - Some(started) => MainStatus::Running { started, health }, - None => MainStatus::Stopped, - }, - ) - .await?; - let mut backup_progress = crate::db::DatabaseModel::new() .server_info() .status_info() @@ -344,7 +308,7 @@ async fn perform_backup( .into_owned(); let mut os_backup_file = AtomicFile::new( - backup_guard.as_ref().join("os-backup.cbor"), + backup_guard.lock().await.as_ref().join("os-backup.cbor"), None::, ) .await @@ -360,11 +324,11 @@ async fn perform_backup( .await .with_kind(ErrorKind::Filesystem)?; - let luks_folder_old = backup_guard.as_ref().join("luks.old"); + let luks_folder_old = backup_guard.lock().await.as_ref().join("luks.old"); if tokio::fs::metadata(&luks_folder_old).await.is_ok() { tokio::fs::remove_dir_all(&luks_folder_old).await?; } - let luks_folder_bak = backup_guard.as_ref().join("luks"); + let luks_folder_bak = backup_guard.lock().await.as_ref().join("luks"); if tokio::fs::metadata(&luks_folder_bak).await.is_ok() { tokio::fs::rename(&luks_folder_bak, &luks_folder_old).await?; } @@ -374,6 +338,14 @@ async fn perform_backup( } let timestamp = Some(Utc::now()); + let mut backup_guard = Arc::try_unwrap(backup_guard) + .map_err(|_err| { + Error::new( + eyre!("Backup guard could not ensure that the others where dropped"), + ErrorKind::Unknown, + ) + })? + .into_inner(); backup_guard.unencrypted_metadata.version = crate::version::Current::new().semver().into(); backup_guard.unencrypted_metadata.full = true; diff --git a/backend/src/backup/mod.rs b/backend/src/backup/mod.rs index 0125e75b1..ba890645b 100644 --- a/backend/src/backup/mod.rs +++ b/backend/src/backup/mod.rs @@ -47,7 +47,7 @@ pub struct ServerBackupReport { #[derive(Debug, Deserialize, Serialize)] pub struct PackageBackupReport { - error: Option, + pub error: Option, } #[command(subcommands(backup_bulk::backup_all, target::target))] @@ -84,10 +84,10 @@ impl BackupActions { image_ids: &BTreeSet, ) -> Result<(), Error> { self.create - .validate(container, eos_version, volumes, image_ids, false) + .validate(eos_version, volumes, image_ids, false) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Backup Create"))?; self.restore - .validate(container, eos_version, volumes, image_ids, false) + .validate(eos_version, volumes, image_ids, false) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Backup Restore"))?; Ok(()) } diff --git a/backend/src/bins/start_init.rs b/backend/src/bins/start_init.rs index 485d8e323..5fdf10c3b 100644 --- a/backend/src/bins/start_init.rs +++ b/backend/src/bins/start_init.rs @@ -15,7 +15,6 @@ use crate::init::STANDBY_MODE_PATH; use crate::net::web_server::WebServer; use crate::shutdown::Shutdown; use crate::sound::CHIME; -use crate::util::logger::EmbassyLogger; use crate::util::Invoke; use crate::{Error, ErrorKind, ResultExt, OS_ARCH}; diff --git a/backend/src/config/action.rs b/backend/src/config/action.rs index 0684d689e..3d349c846 100644 --- a/backend/src/config/action.rs +++ b/backend/src/config/action.rs @@ -40,10 +40,10 @@ impl ConfigActions { image_ids: &BTreeSet, ) -> Result<(), Error> { self.get - .validate(container, eos_version, volumes, image_ids, true) + .validate(eos_version, volumes, image_ids, true) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Config Get"))?; self.set - .validate(container, eos_version, volumes, image_ids, true) + .validate(eos_version, volumes, image_ids, true) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Config Set"))?; Ok(()) } @@ -99,7 +99,6 @@ impl ConfigActions { }) })?; Ok(SetResult { - signal: res.signal, depends_on: res .depends_on .into_iter() @@ -112,9 +111,5 @@ impl ConfigActions { #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct SetResult { - #[serde(default)] - #[serde(deserialize_with = "crate::util::serde::deserialize_from_str_opt")] - #[serde(serialize_with = "crate::util::serde::serialize_display_opt")] - pub signal: Option, pub depends_on: BTreeMap>, } diff --git a/backend/src/config/mod.rs b/backend/src/config/mod.rs index e13db5c57..ece30deeb 100644 --- a/backend/src/config/mod.rs +++ b/backend/src/config/mod.rs @@ -1,26 +1,24 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::path::PathBuf; use std::time::Duration; use color_eyre::eyre::eyre; -use futures::future::{BoxFuture, FutureExt}; use indexmap::IndexSet; use itertools::Itertools; +use models::ErrorKind; use patch_db::{DbHandle, LockReceipt, LockTarget, LockTargetId, LockType, Verifier}; -use rand::SeedableRng; use regex::Regex; use rpc_toolkit::command; use serde_json::Value; use tracing::instrument; use crate::context::RpcContext; -use crate::db::model::{CurrentDependencies, CurrentDependencyInfo, CurrentDependents}; +use crate::db::model::{CurrentDependencies, CurrentDependents}; use crate::dependencies::{ - add_dependent_to_current_dependents_lists, break_transitive, heal_all_dependents_transitive, - BreakTransitiveReceipts, BreakageRes, Dependencies, DependencyConfig, DependencyError, - DependencyErrors, DependencyReceipt, TaggedDependencyError, TryHealReceipts, + BreakTransitiveReceipts, BreakageRes, Dependencies, DependencyConfig, DependencyErrors, + DependencyReceipt, TaggedDependencyError, TryHealReceipts, }; -use crate::install::cleanup::{remove_from_current_dependents_lists, UpdateDependencyReceipts}; +use crate::install::cleanup::UpdateDependencyReceipts; use crate::procedure::docker::DockerContainers; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::util::display_none; @@ -35,7 +33,7 @@ pub use spec::{ConfigSpec, Defaultable}; use util::NumRange; use self::action::{ConfigActions, ConfigRes}; -use self::spec::{ConfigPointerReceipts, PackagePointerSpec, ValueSpecPointer}; +use self::spec::{ConfigPointerReceipts, ValueSpecPointer}; pub type Config = serde_json::Map; pub trait TypeOf { @@ -264,18 +262,18 @@ pub struct ConfigReceipts { pub update_dependency_receipts: UpdateDependencyReceipts, pub try_heal_receipts: TryHealReceipts, pub break_transitive_receipts: BreakTransitiveReceipts, - configured: LockReceipt, - config_actions: LockReceipt, - dependencies: LockReceipt, - volumes: LockReceipt, - version: LockReceipt, - manifest: LockReceipt, - system_pointers: LockReceipt, String>, + pub configured: LockReceipt, + pub config_actions: LockReceipt, + pub dependencies: LockReceipt, + pub volumes: LockReceipt, + pub version: LockReceipt, + pub manifest: LockReceipt, + pub system_pointers: LockReceipt, String>, pub current_dependents: LockReceipt, pub current_dependencies: LockReceipt, - dependency_errors: LockReceipt, - manifest_dependencies_config: LockReceipt, - docker_containers: LockReceipt, + pub dependency_errors: LockReceipt, + pub manifest_dependencies_config: LockReceipt, + pub docker_containers: LockReceipt, } impl ConfigReceipts { @@ -418,370 +416,78 @@ pub async fn set_dry( #[context] ctx: RpcContext, #[parent_data] (id, config, timeout): (PackageId, Option, Option), ) -> Result { - let mut db = ctx.db.handle(); - let mut tx = db.begin().await?; - let mut breakages = BTreeMap::new(); - let locks = ConfigReceipts::new(&mut tx).await?; - configure( - &ctx, - &mut tx, - &id, - config, - &timeout, - true, - &mut BTreeMap::new(), - &mut breakages, - &locks, - ) - .await?; + let breakages = BTreeMap::new(); + let overrides = Default::default(); + + let configure_context = ConfigureContext { + breakages, + timeout, + config, + dry_run: true, + overrides, + }; + let breakages = configure(&ctx, &id, configure_context).await?; - locks.configured.set(&mut tx, true, &id).await?; - tx.abort().await?; Ok(BreakageRes(breakages)) } +pub struct ConfigureContext { + pub breakages: BTreeMap, + pub timeout: Option, + pub config: Option, + pub overrides: BTreeMap, + pub dry_run: bool, +} + #[instrument(skip_all)] pub async fn set_impl( ctx: RpcContext, (id, config, timeout): (PackageId, Option, Option), ) -> Result<(), Error> { - let mut db = ctx.db.handle(); - let mut tx = db.begin().await?; - let mut breakages = BTreeMap::new(); - let locks = ConfigReceipts::new(&mut tx).await?; - configure( - &ctx, - &mut tx, - &id, + let breakages = BTreeMap::new(); + let overrides = Default::default(); + + let configure_context = ConfigureContext { + breakages, + timeout, config, - &timeout, - false, - &mut BTreeMap::new(), - &mut breakages, - &locks, - ) - .await?; - tx.commit().await?; + dry_run: false, + overrides, + }; + configure(&ctx, &id, configure_context).await?; Ok(()) } #[instrument(skip_all)] -pub async fn configure<'a, Db: DbHandle>( +pub async fn configure( ctx: &RpcContext, - db: &'a mut Db, id: &PackageId, - config: Option, - timeout: &Option, - dry_run: bool, - overrides: &mut BTreeMap, - breakages: &mut BTreeMap, - receipts: &ConfigReceipts, -) -> Result<(), Error> { - configure_rec( - ctx, db, id, config, timeout, dry_run, overrides, breakages, receipts, - ) - .await?; - receipts.configured.set(db, true, &id).await?; - Ok(()) -} - -#[instrument(skip_all)] -pub fn configure_rec<'a, Db: DbHandle>( - ctx: &'a RpcContext, - db: &'a mut Db, - id: &'a PackageId, - config: Option, - timeout: &'a Option, - dry_run: bool, - overrides: &'a mut BTreeMap, - breakages: &'a mut BTreeMap, - receipts: &'a ConfigReceipts, -) -> BoxFuture<'a, Result<(), Error>> { - async move { - // fetch data from db - let action = receipts - .config_actions - .get(db, id) - .await? - .ok_or_else(|| not_found!(id))?; - let dependencies = receipts - .dependencies - .get(db, id) - .await? - .ok_or_else(|| not_found!(id))?; - let volumes = receipts - .volumes - .get(db, id) - .await? - .ok_or_else(|| not_found!(id))?; - let is_needs_config = !receipts - .configured - .get(db, id) - .await? - .ok_or_else(|| not_found!(id))?; - let version = receipts - .version - .get(db, id) - .await? - .ok_or_else(|| not_found!(id))?; - - // get current config and current spec - let ConfigRes { - config: old_config, - spec, - } = action.get(ctx, id, &version, &volumes).await?; - - // determine new config to use - let mut config = if let Some(config) = config.or_else(|| old_config.clone()) { - config - } else { - spec.gen(&mut rand::rngs::StdRng::from_entropy(), timeout)? - }; - - let manifest = receipts - .manifest - .get(db, id) - .await? - .ok_or_else(|| not_found!(id))?; - - spec.validate(&manifest)?; - spec.matches(&config)?; // check that new config matches spec - spec.update( - ctx, - db, - &manifest, - &*overrides, - &mut config, - &receipts.config_receipts, - ) - .await?; // dereference pointers in the new config - - // create backreferences to pointers - let mut sys = receipts - .system_pointers - .get(db, &id) - .await? - .ok_or_else(|| not_found!(id))?; - sys.truncate(0); - let mut current_dependencies: CurrentDependencies = CurrentDependencies( - dependencies - .0 - .iter() - .filter_map(|(id, info)| { - if info.requirement.required() { - Some((id.clone(), CurrentDependencyInfo::default())) - } else { - None - } - }) - .collect(), - ); - for ptr in spec.pointers(&config)? { - match ptr { - ValueSpecPointer::Package(pkg_ptr) => { - if let Some(current_dependency) = - current_dependencies.0.get_mut(pkg_ptr.package_id()) - { - current_dependency.pointers.push(pkg_ptr); - } else { - current_dependencies.0.insert( - pkg_ptr.package_id().to_owned(), - CurrentDependencyInfo { - pointers: vec![pkg_ptr], - health_checks: BTreeSet::new(), - }, - ); - } - } - ValueSpecPointer::System(s) => sys.push(s), - } - } - receipts.system_pointers.set(db, sys, &id).await?; - - let signal = if !dry_run { - // run config action - let res = action - .set(ctx, id, &version, &dependencies, &volumes, &config) - .await?; - - // track dependencies with no pointers - for (package_id, health_checks) in res.depends_on.into_iter() { - if let Some(current_dependency) = current_dependencies.0.get_mut(&package_id) { - current_dependency.health_checks.extend(health_checks); - } else { - current_dependencies.0.insert( - package_id, - CurrentDependencyInfo { - pointers: Vec::new(), - health_checks, - }, - ); - } - } - - // track dependency health checks - current_dependencies = current_dependencies.map(|x| { - x.into_iter() - .filter(|(dep_id, _)| { - if dep_id != id && !manifest.dependencies.0.contains_key(dep_id) { - tracing::warn!("Illegal dependency specified: {}", dep_id); - false - } else { - true - } - }) - .collect() - }); - res.signal - } else { - None - }; - - // update dependencies - let prev_current_dependencies = receipts - .current_dependencies - .get(db, &id) - .await? - .unwrap_or_default(); - remove_from_current_dependents_lists( - db, - id, - &prev_current_dependencies, - &receipts.current_dependents, - ) - .await?; // remove previous - add_dependent_to_current_dependents_lists( - db, - id, - ¤t_dependencies, - &receipts.current_dependents, - ) - .await?; // add new - current_dependencies.0.remove(id); - receipts - .current_dependencies - .set(db, current_dependencies.clone(), &id) - .await?; - - let errs = receipts - .dependency_errors - .get(db, &id) - .await? - .ok_or_else(|| not_found!(id))?; - tracing::warn!("Dependency Errors: {:?}", errs); - let errs = DependencyErrors::init( - ctx, - db, - &manifest, - ¤t_dependencies, - &receipts.dependency_receipt.try_heal, - ) + configure_context: ConfigureContext, +) -> Result, Error> { + let mut db = ctx.db.handle(); + let version = crate::db::DatabaseModel::new() + .package_data() + .idx_model(id) + .expect(&mut db) + .await? + .installed() + .expect(&mut db) + .await? + .manifest() + .version() + .get(&mut ctx.db.handle()) .await?; - receipts.dependency_errors.set(db, errs, &id).await?; - - // cache current config for dependents - overrides.insert(id.clone(), config.clone()); - - // handle dependents - let dependents = receipts - .current_dependents - .get(db, id) - .await? - .ok_or_else(|| not_found!(id))?; - let prev = if is_needs_config { None } else { old_config } - .map(Value::Object) - .unwrap_or_default(); - let next = Value::Object(config.clone()); - for (dependent, dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) { - let dependent_container = receipts.docker_containers.get(db, &dependent).await?; - let dependent_container = &dependent_container; - // check if config passes dependent check - if let Some(cfg) = receipts - .manifest_dependencies_config - .get(db, (&dependent, &id)) - .await? - { - let manifest = receipts - .manifest - .get(db, &dependent) - .await? - .ok_or_else(|| not_found!(id))?; - if let Err(error) = cfg - .check( - ctx, - dependent_container, - dependent, - &manifest.version, - &manifest.volumes, - id, - &config, - ) - .await? - { - let dep_err = DependencyError::ConfigUnsatisfied { error }; - break_transitive( - db, - dependent, - id, - dep_err, - breakages, - &receipts.break_transitive_receipts, - ) - .await?; - } - - // handle backreferences - for ptr in &dep_info.pointers { - if let PackagePointerSpec::Config(cfg_ptr) = ptr { - if cfg_ptr.select(&next) != cfg_ptr.select(&prev) { - if let Err(e) = configure_rec( - ctx, db, dependent, None, timeout, dry_run, overrides, breakages, - receipts, - ) - .await - { - if e.kind == crate::ErrorKind::ConfigRulesViolation { - break_transitive( - db, - dependent, - id, - DependencyError::ConfigUnsatisfied { - error: format!("{}", e), - }, - breakages, - &receipts.break_transitive_receipts, - ) - .await?; - } else { - return Err(e); - } - } - } - } - } - heal_all_dependents_transitive(ctx, db, id, &receipts.dependency_receipt).await?; - } - } - - if let Some(signal) = signal { - match ctx.managers.get(&(id.clone(), version.clone())).await { - None => { - // in theory this should never happen, which indicates this function should be moved behind the - // Manager interface - return Err(Error::new( - eyre!("Manager Not Found for package being configured"), - crate::ErrorKind::Incoherent, - )); - } - Some(m) => { - m.signal(&signal).await?; - } - } - } - - Ok(()) - } - .boxed() + ctx.managers + .get(&(id.clone(), version.clone())) + .await + .ok_or_else(|| { + Error::new( + eyre!("There is no manager running for {id:?} and {version:?}"), + ErrorKind::Unknown, + ) + })? + .configure(configure_context) + .await } macro_rules! not_found { diff --git a/backend/src/context/rpc.rs b/backend/src/context/rpc.rs index 8894af0af..6b9dd364f 100644 --- a/backend/src/context/rpc.rs +++ b/backend/src/context/rpc.rs @@ -366,14 +366,12 @@ impl RpcContext { let main = match status.main { MainStatus::BackingUp { started, .. } => { if let Some(_) = started { - MainStatus::Starting { restarting: false } + MainStatus::Starting } else { MainStatus::Stopped } } - MainStatus::Running { .. } => { - MainStatus::Starting { restarting: false } - } + MainStatus::Running { .. } => MainStatus::Starting, a => a.clone(), }; let new_package = PackageDataEntry::Installed { diff --git a/backend/src/control.rs b/backend/src/control.rs index c0cd6e822..794afc64a 100644 --- a/backend/src/control.rs +++ b/backend/src/control.rs @@ -28,7 +28,7 @@ impl StartReceipts { let mut locks = Vec::new(); let setup = Self::setup(&mut locks, id); - Ok(setup(&db.lock_all(locks).await?)?) + setup(&db.lock_all(locks).await?) } pub fn setup( @@ -67,10 +67,7 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<( let mut tx = db.begin().await?; let receipts = StartReceipts::new(&mut tx, &id).await?; let version = receipts.version.get(&mut tx).await?; - receipts - .status - .set(&mut tx, MainStatus::Starting { restarting: false }) - .await?; + receipts.status.set(&mut tx, MainStatus::Starting).await?; heal_all_dependents_transitive(&ctx, &mut tx, &id, &receipts.dependency_receipt).await?; tx.commit().await?; @@ -80,8 +77,7 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<( .get(&(id, version)) .await .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? - .synchronize() - .await; + .start(); Ok(()) } @@ -96,7 +92,7 @@ impl StopReceipts { let mut locks = Vec::new(); let setup = Self::setup(&mut locks, id); - Ok(setup(&db.lock_all(locks).await?)?) + setup(&db.lock_all(locks).await?) } pub fn setup( @@ -174,10 +170,28 @@ pub async fn stop_dry( pub async fn stop_impl(ctx: RpcContext, id: PackageId) -> Result { let mut db = ctx.db.handle(); let mut tx = db.begin().await?; + let version = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&id) + .expect(&mut tx) + .await? + .installed() + .expect(&mut tx) + .await? + .manifest() + .version() + .get(&mut tx) + .await? + .clone(); let last_statuts = stop_common(&mut tx, &id, &mut BTreeMap::new()).await?; tx.commit().await?; + ctx.managers + .get(&(id, version)) + .await + .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? + .stop(); Ok(last_statuts) } @@ -186,23 +200,28 @@ pub async fn stop_impl(ctx: RpcContext, id: PackageId) -> Result Result<(), Error> { let mut db = ctx.db.handle(); let mut tx = db.begin().await?; - - let mut status = crate::db::DatabaseModel::new() + let version = crate::db::DatabaseModel::new() .package_data() .idx_model(&id) - .and_then(|pde| pde.installed()) - .map(|i| i.status().main()) - .get_mut(&mut tx) - .await?; - if !matches!(&*status, Some(MainStatus::Running { .. })) { - return Err(Error::new( - eyre!("{} is not running", id), - crate::ErrorKind::InvalidRequest, - )); - } - *status = Some(MainStatus::Restarting); - status.save(&mut tx).await?; + .expect(&mut tx) + .await? + .installed() + .expect(&mut tx) + .await? + .manifest() + .version() + .get(&mut tx) + .await? + .clone(); + tx.commit().await?; + ctx.managers + .get(&(id, version)) + .await + .ok_or_else(|| Error::new(eyre!("Manager not found"), crate::ErrorKind::InvalidRequest))? + .restart() + .await; + Ok(()) } diff --git a/backend/src/db/mod.rs b/backend/src/db/mod.rs index 54b849610..6823c398d 100644 --- a/backend/src/db/mod.rs +++ b/backend/src/db/mod.rs @@ -14,7 +14,6 @@ use rpc_toolkit::hyper::{Body, Error as HyperError, Request, Response}; use rpc_toolkit::yajrc::RpcError; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::io::AsyncWrite; use tokio::sync::oneshot; use tokio::task::JoinError; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; diff --git a/backend/src/dependencies.rs b/backend/src/dependencies.rs index e86c5eee9..99e146944 100644 --- a/backend/src/dependencies.rs +++ b/backend/src/dependencies.rs @@ -16,7 +16,7 @@ use tracing::instrument; use crate::config::action::{ConfigActions, ConfigRes}; use crate::config::spec::PackagePointerSpec; -use crate::config::{not_found, Config, ConfigReceipts, ConfigSpec}; +use crate::config::{not_found, Config, ConfigReceipts, ConfigSpec, ConfigureContext}; use crate::context::RpcContext; use crate::db::model::{CurrentDependencies, CurrentDependents, InstalledPackageDataEntry}; use crate::procedure::docker::DockerContainers; @@ -519,7 +519,6 @@ impl DependencyConfig { Ok(self .check .sandboxed( - container, ctx, dependent_id, dependent_version, @@ -542,7 +541,6 @@ impl DependencyConfig { ) -> Result { self.auto_configure .sandboxed( - container, ctx, dependent_id, dependent_version, @@ -557,7 +555,6 @@ impl DependencyConfig { } pub struct DependencyConfigReceipts { - config: ConfigReceipts, dependencies: LockReceipt, dependency_volumes: LockReceipt, dependency_version: LockReceipt, @@ -584,7 +581,6 @@ impl DependencyConfigReceipts { package_id: &PackageId, dependency_id: &PackageId, ) -> impl FnOnce(&Verifier) -> Result { - let config = ConfigReceipts::setup(locks); let dependencies = crate::db::DatabaseModel::new() .package_data() .idx_model(package_id) @@ -636,7 +632,6 @@ impl DependencyConfigReceipts { .add_to_keys(locks); move |skeleton_key| { Ok(Self { - config: config(skeleton_key)?, dependencies: dependencies.verify(&skeleton_key)?, dependency_volumes: dependency_volumes.verify(&skeleton_key)?, dependency_version: dependency_version.verify(&skeleton_key)?, @@ -665,6 +660,8 @@ pub async fn configure_impl( (pkg_id, dep_id): (PackageId, PackageId), ) -> Result<(), Error> { let mut db = ctx.db.handle(); + let breakages = BTreeMap::new(); + let overrides = Default::default(); let receipts = DependencyConfigReceipts::new(&mut db, &pkg_id, &dep_id).await?; let ConfigDryRes { old_config: _, @@ -672,19 +669,15 @@ pub async fn configure_impl( spec: _, } = configure_logic(ctx.clone(), &mut db, (pkg_id, dep_id.clone()), &receipts).await?; - let locks = &receipts.config; - Ok(crate::config::configure( - &ctx, - &mut db, - &dep_id, - Some(new_config), - &Some(Duration::from_secs(3).into()), - false, - &mut BTreeMap::new(), - &mut BTreeMap::new(), - locks, - ) - .await?) + let configure_context = ConfigureContext { + breakages, + timeout: Some(Duration::from_secs(3).into()), + config: Some(new_config), + dry_run: false, + overrides, + }; + crate::config::configure(&ctx, &dep_id, configure_context).await?; + Ok(()) } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -769,7 +762,6 @@ pub async fn configure_logic( let new_config = dependency .auto_configure .sandboxed( - &pkg_docker_container, &ctx, &pkg_id, &pkg_version, @@ -1049,7 +1041,7 @@ pub fn heal_transitive<'a, Db: DbHandle>( pub async fn reconfigure_dependents_with_live_pointers( ctx: &RpcContext, - mut tx: impl DbHandle, + tx: impl DbHandle, receipts: &ConfigReceipts, pde: &InstalledPackageDataEntry, ) -> Result<(), Error> { @@ -1064,18 +1056,17 @@ pub async fn reconfigure_dependents_with_live_pointers( PackagePointerSpec::TorKey(_) => false, PackagePointerSpec::Config(_) => false, }) { - crate::config::configure( - ctx, - &mut tx, - dependent_id, - None, - &None, - false, - &mut BTreeMap::new(), - &mut BTreeMap::new(), - receipts, - ) - .await?; + let breakages = BTreeMap::new(); + let overrides = Default::default(); + + let configure_context = ConfigureContext { + breakages, + timeout: None, + config: None, + dry_run: false, + overrides, + }; + crate::config::configure(&ctx, dependent_id, configure_context).await?; } } Ok(()) diff --git a/backend/src/disk/mount/filesystem/efivarfs.rs b/backend/src/disk/mount/filesystem/efivarfs.rs index 3b2bae3ba..ad9d79941 100644 --- a/backend/src/disk/mount/filesystem/efivarfs.rs +++ b/backend/src/disk/mount/filesystem/efivarfs.rs @@ -7,7 +7,7 @@ use sha2::Sha256; use super::{FileSystem, MountType, ReadOnly}; use crate::util::Invoke; -use crate::{Error, ResultExt}; +use crate::Error; pub struct EfiVarFs; #[async_trait] diff --git a/backend/src/disk/mount/util.rs b/backend/src/disk/mount/util.rs index 18fbcbe05..392e5d67a 100644 --- a/backend/src/disk/mount/util.rs +++ b/backend/src/disk/mount/util.rs @@ -3,7 +3,7 @@ use std::path::Path; use tracing::instrument; use crate::util::Invoke; -use crate::{Error, ResultExt}; +use crate::Error; #[instrument(skip_all)] pub async fn bind, P1: AsRef>( diff --git a/backend/src/disk/util.rs b/backend/src/disk/util.rs index 27b2bb5f0..7051026cd 100644 --- a/backend/src/disk/util.rs +++ b/backend/src/disk/util.rs @@ -3,7 +3,6 @@ use std::path::{Path, PathBuf}; use color_eyre::eyre::{self, eyre}; use futures::TryStreamExt; -use indexmap::IndexSet; use nom::bytes::complete::{tag, take_till1}; use nom::character::complete::multispace1; use nom::character::is_space; @@ -62,8 +61,8 @@ pub struct EmbassyOsRecoveryInfo { pub wrapped_key: Option, } -const DISK_PATH: &'static str = "/dev/disk/by-path"; -const SYS_BLOCK_PATH: &'static str = "/sys/block"; +const DISK_PATH: &str = "/dev/disk/by-path"; +const SYS_BLOCK_PATH: &str = "/sys/block"; lazy_static::lazy_static! { static ref PARTITION_REGEX: Regex = Regex::new("-part[0-9]+$").unwrap(); diff --git a/backend/src/hostname.rs b/backend/src/hostname.rs index 91e4ea71f..25342c4a1 100644 --- a/backend/src/hostname.rs +++ b/backend/src/hostname.rs @@ -1,9 +1,7 @@ -use patch_db::DbHandle; use rand::{thread_rng, Rng}; use tokio::process::Command; use tracing::instrument; -use crate::account::AccountInfo; use crate::util::Invoke; use crate::{Error, ErrorKind}; #[derive(Clone, serde::Deserialize, serde::Serialize, Debug)] diff --git a/backend/src/install/mod.rs b/backend/src/install/mod.rs index 565c0c570..beeb83a82 100644 --- a/backend/src/install/mod.rs +++ b/backend/src/install/mod.rs @@ -18,6 +18,7 @@ use patch_db::{DbHandle, LockType}; use reqwest::Url; use rpc_toolkit::command; use rpc_toolkit::yajrc::RpcError; +use serde_json::{json, Value}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt}; use tokio::process::Command; @@ -26,7 +27,7 @@ use tokio_stream::wrappers::ReadDirStream; use tracing::instrument; use self::cleanup::{cleanup_failed, remove_from_current_dependents_lists}; -use crate::config::ConfigReceipts; +use crate::config::{ConfigReceipts, ConfigureContext}; use crate::context::{CliContext, RpcContext}; use crate::core::rpc_continuations::{RequestGuid, RpcContinuation}; use crate::db::model::{ @@ -48,7 +49,6 @@ use crate::status::{MainStatus, Status}; use crate::util::io::{copy_and_shutdown, response_to_reader}; use crate::util::serde::{display_serializable, Port}; use crate::util::{display_none, AsyncFileExt, Version}; -use crate::version::{Current, VersionT}; use crate::volume::{asset_dir, script_dir}; use crate::{Error, ErrorKind, ResultExt}; @@ -61,7 +61,7 @@ pub const PKG_PUBLIC_DIR: &str = "package-data/public"; pub const PKG_WASM_DIR: &str = "package-data/wasm"; #[command(display(display_serializable))] -pub async fn list(#[context] ctx: RpcContext) -> Result, Error> { +pub async fn list(#[context] ctx: RpcContext) -> Result { let mut hdl = ctx.db.handle(); let package_data = crate::db::DatabaseModel::new() .package_data() @@ -71,11 +71,25 @@ pub async fn list(#[context] ctx: RpcContext) -> Result { - Some((id.clone(), installed.manifest.version.clone())) - } - _ => None, + .filter_map(|(id, pde)| { + serde_json::to_value(match pde { + PackageDataEntry::Installed { installed, .. } => { + json!({ "status":"installed","id": id.clone(), "version": installed.manifest.version.clone()}) + } + PackageDataEntry::Installing { manifest, install_progress, .. } => { + json!({ "status":"installing","id": id.clone(), "version": manifest.version.clone(), "progress": install_progress.clone()}) + } + PackageDataEntry::Updating { manifest, installed, install_progress, .. } => { + json!({ "status":"updating","id": id.clone(), "version": installed.manifest.version.clone(), "progress": install_progress.clone()}) + } + PackageDataEntry::Restoring { manifest, install_progress, .. } => { + json!({ "status":"restoring","id": id.clone(), "version": manifest.version.clone(), "progress": install_progress.clone()}) + } + PackageDataEntry::Removing { manifest, .. } => { + json!({ "status":"removing", "id": id.clone(), "version": manifest.version.clone()}) + } + }) + .ok() }) .collect()) } @@ -1248,7 +1262,6 @@ pub async fn install_s9pk( current_dependencies: current_dependencies.clone(), interface_addresses, }; - let prev = std::mem::replace( &mut *pde, PackageDataEntry::Installed { @@ -1328,18 +1341,17 @@ pub async fn install_s9pk( false }; if configured && manifest.config.is_some() { - crate::config::configure( - ctx, - &mut tx, - pkg_id, - None, - &None, - false, - &mut BTreeMap::new(), - &mut BTreeMap::new(), - &receipts.config, - ) - .await?; + let breakages = BTreeMap::new(); + let overrides = Default::default(); + + let configure_context = ConfigureContext { + breakages, + timeout: None, + config: None, + dry_run: false, + overrides, + }; + crate::config::configure(&ctx, pkg_id, configure_context).await?; } else { add_dependent_to_current_dependents_lists( &mut tx, diff --git a/backend/src/manager/health.rs b/backend/src/manager/health.rs index 1e801a440..a70a19f37 100644 --- a/backend/src/manager/health.rs +++ b/backend/src/manager/health.rs @@ -1,7 +1,5 @@ use std::collections::BTreeMap; -use std::sync::atomic::{AtomicBool, Ordering}; -use itertools::Itertools; use patch_db::{DbHandle, LockReceipt, LockType}; use tracing::instrument; @@ -91,12 +89,12 @@ impl HealthCheckStatusReceipt { } } +/// So, this is used for a service to run a health check cycle, go out and run the health checks, and store those in the db #[instrument(skip_all)] pub async fn check( ctx: &RpcContext, db: &mut Db, id: &PackageId, - should_commit: &AtomicBool, ) -> Result<(), Error> { let mut tx = db.begin().await?; let (manifest, started) = { @@ -115,40 +113,12 @@ pub async fn check( tracing::debug!("Checking health of {}", id); manifest .health_checks - .check_all( - ctx, - &manifest.containers, - started, - id, - &manifest.version, - &manifest.volumes, - ) + .check_all(ctx, started, id, &manifest.version, &manifest.volumes) .await? } else { return Ok(()); }; - if !should_commit.load(Ordering::SeqCst) { - return Ok(()); - } - - if !health_results - .iter() - .any(|(_, res)| matches!(res, HealthCheckResult::Failure { .. })) - { - tracing::debug!("All health checks succeeded for {}", id); - } else { - tracing::debug!( - "Some health checks failed for {}: {}", - id, - health_results - .iter() - .filter(|(_, res)| matches!(res, HealthCheckResult::Failure { .. })) - .map(|(id, _)| &*id) - .join(", ") - ); - } - let current_dependents = { let mut checkpoint = tx.begin().await?; let receipts = HealthCheckStatusReceipt::new(&mut checkpoint, id).await?; diff --git a/backend/src/manager/manager_container.rs b/backend/src/manager/manager_container.rs new file mode 100644 index 000000000..198cf70af --- /dev/null +++ b/backend/src/manager/manager_container.rs @@ -0,0 +1,343 @@ +use std::sync::Arc; +use std::time::Duration; + +use futures::FutureExt; +use patch_db::PatchDbHandle; +use tokio::sync::watch; +use tokio::sync::watch::Sender; +use tracing::instrument; + +use super::start_stop::StartStop; +use super::{manager_seed, run_main, ManagerPersistentContainer, RunMainResult}; +use crate::procedure::NoOutput; +use crate::s9pk::manifest::Manifest; +use crate::status::MainStatus; +use crate::util::{GeneralBoxedGuard, NonDetachingJoinHandle}; +use crate::Error; + +pub type ManageContainerOverride = Arc>>; + +/// This is the thing describing the state machine actor for a service +/// state and current running/ desired states. +pub struct ManageContainer { + pub(super) current_state: Arc>, + pub(super) desired_state: Arc>, + _service: NonDetachingJoinHandle<()>, + _save_state: NonDetachingJoinHandle<()>, + override_main_status: ManageContainerOverride, +} + +impl ManageContainer { + pub async fn new( + seed: Arc, + persistent_container: ManagerPersistentContainer, + ) -> Result { + let mut db = seed.ctx.db.handle(); + let current_state = Arc::new(watch::channel(StartStop::Stop).0); + let desired_state = Arc::new( + watch::channel::(get_status(&mut db, &seed.manifest).await.into()).0, + ); + let override_main_status: ManageContainerOverride = Arc::new(watch::channel(None).0); + let service = tokio::spawn(create_service_manager( + desired_state.clone(), + seed.clone(), + current_state.clone(), + persistent_container, + )) + .into(); + let save_state = tokio::spawn(save_state( + desired_state.clone(), + current_state.clone(), + override_main_status.clone(), + seed.clone(), + )) + .into(); + Ok(ManageContainer { + current_state, + desired_state, + _service: service, + override_main_status, + _save_state: save_state, + }) + } + + /// Set override is used during something like a restart of a service. We want to show certain statuses be different + /// from the actual status of the service. + pub fn set_override(&self, override_status: Option) -> GeneralBoxedGuard { + self.override_main_status + .send_modify(|x| *x = override_status); + let override_main_status = self.override_main_status.clone(); + GeneralBoxedGuard::new(move || { + override_main_status.send_modify(|x| *x = None); + }) + } + + /// Set the override, but don't have a guard to revert it. Used only on the mananger to do a shutdown. + pub(super) async fn lock_state_forever(&self, seed: &manager_seed::ManagerSeed) { + let mut db = seed.ctx.db.handle(); + let current_state = get_status(&mut db, &seed.manifest).await; + self.override_main_status + .send_modify(|x| *x = Some(current_state)); + } + + /// We want to set the state of the service, like to start or stop + pub fn to_desired(&self, new_state: StartStop) { + self.desired_state.send_modify(|x| *x = new_state); + } + + /// This is a tool to say wait for the service to be in a certain state. + pub async fn wait_for_desired(&self, new_state: StartStop) { + let mut current_state = self.current_state(); + self.to_desired(new_state); + while *current_state.borrow() != new_state { + current_state.changed().await.unwrap_or_default(); + } + } + + /// Getter + pub fn current_state(&self) -> watch::Receiver { + self.current_state.subscribe() + } + + /// Getter + pub fn desired_state(&self) -> watch::Receiver { + self.desired_state.subscribe() + } +} + +async fn create_service_manager( + desired_state: Arc>, + seed: Arc, + current_state: Arc>, + persistent_container: Arc>, +) { + let mut desired_state_receiver = desired_state.subscribe(); + let mut running_service: Option> = None; + let seed = seed.clone(); + loop { + let current: StartStop = *current_state.borrow(); + let desired: StartStop = *desired_state_receiver.borrow(); + match (current, desired) { + (StartStop::Start, StartStop::Start) => (), + (StartStop::Start, StartStop::Stop) => { + if persistent_container.is_none() { + if let Err(err) = seed.stop_container().await { + tracing::error!("Could not stop container"); + tracing::debug!("{:?}", err) + } + running_service = None; + } else if let Some(current_service) = running_service.take() { + tokio::select! { + _ = current_service => (), + _ = tokio::time::sleep(Duration::from_secs_f64(seed.manifest + .containers + .as_ref() + .and_then(|c| c.main.sigterm_timeout).map(|x| x.as_secs_f64()).unwrap_or_default())) => { + tracing::error!("Could not stop service"); + } + } + } + current_state.send_modify(|x| *x = StartStop::Stop); + } + (StartStop::Stop, StartStop::Start) => starting_service( + current_state.clone(), + desired_state.clone(), + seed.clone(), + persistent_container.clone(), + &mut running_service, + ), + (StartStop::Stop, StartStop::Stop) => (), + } + + if desired_state_receiver.changed().await.is_err() { + tracing::error!("Desired state error"); + break; + } + } +} + +async fn save_state( + desired_state: Arc>, + current_state: Arc>, + override_main_status: Arc>>, + seed: Arc, +) { + let mut desired_state_receiver = desired_state.subscribe(); + let mut current_state_receiver = current_state.subscribe(); + let mut override_main_status_receiver = override_main_status.subscribe(); + loop { + let current: StartStop = *current_state_receiver.borrow(); + let desired: StartStop = *desired_state_receiver.borrow(); + let override_status = override_main_status_receiver.borrow().clone(); + let mut db = seed.ctx.db.handle(); + let res = match (override_status, current, desired) { + (Some(status), _, _) => set_status(&mut db, &seed.manifest, &status).await, + (None, StartStop::Start, StartStop::Start) => { + set_status( + &mut db, + &seed.manifest, + &MainStatus::Running { + started: chrono::Utc::now(), + health: Default::default(), + }, + ) + .await + } + (None, StartStop::Start, StartStop::Stop) => { + set_status(&mut db, &seed.manifest, &MainStatus::Stopping).await + } + (None, StartStop::Stop, StartStop::Start) => { + set_status(&mut db, &seed.manifest, &MainStatus::Starting).await + } + (None, StartStop::Stop, StartStop::Stop) => { + set_status(&mut db, &seed.manifest, &MainStatus::Stopped).await + } + }; + if let Err(err) = res { + tracing::error!("Did not set status for {}", seed.container_name); + tracing::debug!("{:?}", err); + } + tokio::select! { + _ = desired_state_receiver.changed() =>{}, + _ = current_state_receiver.changed() => {}, + _ = override_main_status_receiver.changed() => {} + } + } +} + +fn starting_service( + current_state: Arc>, + desired_state: Arc>, + seed: Arc, + persistent_container: ManagerPersistentContainer, + running_service: &mut Option>, +) { + let set_running = { + let current_state = current_state.clone(); + Arc::new(move || { + current_state.send_modify(|x| *x = StartStop::Start); + }) + }; + let set_stopped = { move || current_state.send_modify(|x| *x = StartStop::Stop) }; + let running_main_loop = async move { + while desired_state.borrow().is_start() { + let result = run_main( + seed.clone(), + persistent_container.clone(), + set_running.clone(), + ) + .await; + set_stopped(); + run_main_log_result(result, seed.clone()).await; + } + }; + *running_service = Some(tokio::spawn(running_main_loop).into()); +} + +async fn run_main_log_result(result: RunMainResult, seed: Arc) { + match result { + Ok(Ok(NoOutput)) => (), // restart + Ok(Err(e)) => { + #[cfg(feature = "unstable")] + { + use crate::notifications::NotificationLevel; + let mut db = seed.ctx.db.handle(); + let started = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&seed.manifest.id) + .and_then(|pde| pde.installed()) + .map::<_, MainStatus>(|i| i.status().main()) + .get(&mut db) + .await; + match started.as_deref() { + Ok(Some(MainStatus::Running { .. })) => { + let res = seed.ctx.notification_manager + .notify( + &mut db, + Some(seed.manifest.id.clone()), + NotificationLevel::Warning, + String::from("Service Crashed"), + format!("The service {} has crashed with the following exit code: {}\nDetails: {}", seed.manifest.id.clone(), e.0, e.1), + (), + Some(3600) // 1 hour + ) + .await; + if let Err(e) = res { + tracing::error!("Failed to issue notification: {}", e); + tracing::debug!("{:?}", e); + } + } + _ => { + tracing::error!("service just started. not issuing crash notification") + } + } + } + tracing::error!( + "The service {} has crashed with the following exit code: {}", + seed.manifest.id.clone(), + e.0 + ); + + tokio::time::sleep(Duration::from_secs(15)).await; + } + Err(e) => { + tracing::error!("failed to start service: {}", e); + tracing::debug!("{:?}", e); + } + } +} + +/// Used only in the mod where we are doing a backup +#[instrument(skip(db, manifest))] +pub(super) async fn get_status(db: &mut PatchDbHandle, manifest: &Manifest) -> MainStatus { + async move { + Ok::<_, Error>( + crate::db::DatabaseModel::new() + .package_data() + .idx_model(&manifest.id) + .expect(db) + .await? + .installed() + .expect(db) + .await? + .status() + .main() + .get(db) + .await? + .clone(), + ) + } + .map(|x| x.unwrap_or_else(|_| MainStatus::Stopped)) + .await +} + +#[instrument(skip(db, manifest))] +async fn set_status( + db: &mut PatchDbHandle, + manifest: &Manifest, + main_status: &MainStatus, +) -> Result<(), Error> { + if crate::db::DatabaseModel::new() + .package_data() + .idx_model(&manifest.id) + .expect(db) + .await? + .installed() + .exists(db) + .await? + { + crate::db::DatabaseModel::new() + .package_data() + .idx_model(&manifest.id) + .expect(db) + .await? + .installed() + .expect(db) + .await? + .status() + .main() + .put(db, main_status) + .await?; + } + Ok(()) +} diff --git a/backend/src/manager/manager_map.rs b/backend/src/manager/manager_map.rs new file mode 100644 index 000000000..0e4f39db7 --- /dev/null +++ b/backend/src/manager/manager_map.rs @@ -0,0 +1,111 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use color_eyre::eyre::eyre; +use patch_db::DbHandle; +use sqlx::{Executor, Postgres}; +use tokio::sync::RwLock; +use tracing::instrument; + +use super::Manager; +use crate::context::RpcContext; +use crate::s9pk::manifest::{Manifest, PackageId}; +use crate::util::Version; +use crate::Error; + +/// This is the structure to contain all the service managers +#[derive(Default)] +pub struct ManagerMap(RwLock>>); +impl ManagerMap { + #[instrument(skip_all)] + pub async fn init( + &self, + ctx: &RpcContext, + db: &mut Db, + secrets: &mut Ex, + ) -> Result<(), Error> + where + for<'a> &'a mut Ex: Executor<'a, Database = Postgres>, + { + let mut res = BTreeMap::new(); + for package in crate::db::DatabaseModel::new() + .package_data() + .keys(db) + .await? + { + let man: Manifest = if let Some(manifest) = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&package) + .and_then(|pkg| pkg.installed()) + .map(|m| m.manifest()) + .get(db) + .await? + .to_owned() + { + manifest + } else { + continue; + }; + + res.insert( + (package, man.version.clone()), + Arc::new(Manager::new(ctx.clone(), man).await?), + ); + } + *self.0.write().await = res; + Ok(()) + } + + /// Used during the install process + #[instrument(skip_all)] + pub async fn add(&self, ctx: RpcContext, manifest: Manifest) -> Result<(), Error> { + let mut lock = self.0.write().await; + let id = (manifest.id.clone(), manifest.version.clone()); + if let Some(man) = lock.remove(&id) { + man.exit().await; + } + lock.insert(id, Arc::new(Manager::new(ctx, manifest).await?)); + Ok(()) + } + + /// This is ran during the cleanup, so when we are uninstalling the service + #[instrument(skip_all)] + pub async fn remove(&self, id: &(PackageId, Version)) { + if let Some(man) = self.0.write().await.remove(id) { + man.exit().await; + } + } + + /// Used during a shutdown + #[instrument(skip_all)] + pub async fn empty(&self) -> Result<(), Error> { + let res = + futures::future::join_all(std::mem::take(&mut *self.0.write().await).into_iter().map( + |((id, version), man)| async move { + tracing::debug!("Manager for {}@{} shutting down", id, version); + man.shutdown().await; + tracing::debug!("Manager for {}@{} is shutdown", id, version); + if let Err(e) = Arc::try_unwrap(man) { + tracing::trace!( + "Manager for {}@{} still has {} other open references", + id, + version, + Arc::strong_count(&e) - 1 + ); + } + Ok::<_, Error>(()) + }, + )) + .await; + res.into_iter().fold(Ok(()), |res, x| match (res, x) { + (Ok(()), x) => x, + (Err(e), Ok(())) => Err(e), + (Err(e1), Err(e2)) => Err(Error::new(eyre!("{}, {}", e1.source, e2.source), e1.kind)), + }) + } + + #[instrument(skip_all)] + pub async fn get(&self, id: &(PackageId, Version)) -> Option> { + self.0.read().await.get(id).cloned() + } +} diff --git a/backend/src/manager/manager_seed.rs b/backend/src/manager/manager_seed.rs new file mode 100644 index 000000000..69ba94f40 --- /dev/null +++ b/backend/src/manager/manager_seed.rs @@ -0,0 +1,63 @@ +use bollard::container::{StopContainerOptions, WaitContainerOptions}; +use tokio_stream::StreamExt; + +use crate::context::RpcContext; +use crate::s9pk::manifest::Manifest; +use crate::Error; + +/// This is helper structure for a service, the seed of the data that is needed for the manager_container +pub struct ManagerSeed { + pub ctx: RpcContext, + pub manifest: Manifest, + pub container_name: String, +} + +impl ManagerSeed { + pub async fn stop_container(&self) -> Result<(), Error> { + match self + .ctx + .docker + .stop_container( + &self.container_name, + Some(StopContainerOptions { + t: self + .manifest + .containers + .as_ref() + .and_then(|c| c.main.sigterm_timeout) + .map(|d| d.as_secs()) + .unwrap_or(30) as i64, + }), + ) + .await + { + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 409, // CONFLICT + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 304, // NOT MODIFIED + .. + }) => (), // Already stopped + a => a?, + } + + // Wait for the container to stop + { + let mut waiting = self.ctx.docker.wait_container( + &self.container_name, + Some(WaitContainerOptions { + condition: "not-running", + }), + ); + while let Some(_) = waiting.next().await { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + Ok(()) + } +} diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index 7a68bdf80..66bfafe1e 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -1,143 +1,652 @@ -use std::collections::BTreeMap; -use std::future::Future; +use std::collections::{BTreeMap, BTreeSet}; use std::net::Ipv4Addr; -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use bollard::container::{KillContainerOptions, StopContainerOptions}; use color_eyre::eyre::eyre; -use embassy_container_init::{ProcessGroupId, SignalGroupParams}; +use embassy_container_init::ProcessGroupId; +use futures::future::BoxFuture; +use futures::{Future, FutureExt, TryFutureExt}; use helpers::UnixRpcClient; +use models::{ErrorKind, PackageId}; use nix::sys::signal::Signal; use patch_db::DbHandle; -use sqlx::{Connection, Executor, Postgres}; -use tokio::sync::watch::error::RecvError; -use tokio::sync::watch::{channel, Receiver, Sender}; -use tokio::sync::{oneshot, Notify, RwLock}; +use persistent_container::PersistentContainer; +use rand::SeedableRng; +use sqlx::Connection; +use start_stop::StartStop; +use tokio::sync::oneshot; +use tokio::sync::{ + watch::{self, Sender}, + Mutex, +}; use tracing::instrument; +use transition_state::TransitionState; +use crate::backup::target::PackageBackupInfo; +use crate::backup::PackageBackupReport; +use crate::config::action::ConfigRes; +use crate::config::spec::ValueSpecPointer; +use crate::config::{not_found, ConfigReceipts, ConfigureContext}; use crate::context::RpcContext; -use crate::manager::sync::synchronizer; +use crate::db::model::{CurrentDependencies, CurrentDependencyInfo}; +use crate::dependencies::{ + add_dependent_to_current_dependents_lists, break_transitive, heal_all_dependents_transitive, + DependencyError, DependencyErrors, TaggedDependencyError, +}; +use crate::disk::mount::backup::BackupMountGuard; +use crate::disk::mount::guard::TmpMountGuard; +use crate::install::cleanup::remove_from_current_dependents_lists; use crate::net::net_controller::NetService; use crate::net::vhost::AlpnInfo; use crate::procedure::docker::{DockerContainer, DockerProcedure, LongRunning}; -#[cfg(feature = "js_engine")] -use crate::procedure::js_scripts::JsProcedure; -use crate::procedure::{NoOutput, PackageProcedure, ProcedureName}; -use crate::s9pk::manifest::{Manifest, PackageId}; -use crate::util::{ApplyRef, Container, NonDetachingJoinHandle, Version}; +use crate::procedure::{NoOutput, ProcedureName}; +use crate::s9pk::manifest::Manifest; +use crate::status::MainStatus; +use crate::util::NonDetachingJoinHandle; use crate::volume::Volume; use crate::Error; pub mod health; -mod sync; +mod manager_container; +mod manager_map; +pub mod manager_seed; +mod persistent_container; +mod start_stop; +mod transition_state; + +pub use manager_map::ManagerMap; + +use self::manager_container::{get_status, ManageContainer}; +use self::manager_seed::ManagerSeed; pub const HEALTH_CHECK_COOLDOWN_SECONDS: u64 = 15; pub const HEALTH_CHECK_GRACE_PERIOD_SECONDS: u64 = 5; -#[derive(Default)] -pub struct ManagerMap(RwLock>>); -impl ManagerMap { - #[instrument(skip_all)] - pub async fn init( - &self, - ctx: &RpcContext, - db: &mut Db, - secrets: &mut Ex, - ) -> Result<(), Error> - where - for<'a> &'a mut Ex: Executor<'a, Database = Postgres>, - { - let mut res = BTreeMap::new(); - for package in crate::db::DatabaseModel::new() - .package_data() - .keys(db) - .await? - { - let man: Manifest = if let Some(manifest) = crate::db::DatabaseModel::new() - .package_data() - .idx_model(&package) - .and_then(|pkg| pkg.installed()) - .map(|m| m.manifest()) - .get(db) - .await? - .to_owned() - { - manifest - } else { - continue; - }; +type ManagerPersistentContainer = Arc>; +type BackupGuard = Arc>>; +pub enum BackupReturn { + Error(Error), + AlreadyRunning(PackageBackupReport), + Ran { + report: PackageBackupReport, + res: Result, + }, +} - res.insert( - (package, man.version.clone()), - Arc::new(Manager::create(ctx.clone(), man).await?), - ); - } - *self.0.write().await = res; - Ok(()) - } +pub struct Gid { + next_gid: (watch::Sender, watch::Receiver), + main_gid: ( + watch::Sender, + watch::Receiver, + ), +} - #[instrument(skip_all)] - pub async fn add(&self, ctx: RpcContext, manifest: Manifest) -> Result<(), Error> { - let mut lock = self.0.write().await; - let id = (manifest.id.clone(), manifest.version.clone()); - if let Some(man) = lock.remove(&id) { - if !man.thread.is_empty().await { - man.exit().await?; - } - } - lock.insert(id, Arc::new(Manager::create(ctx, manifest).await?)); - Ok(()) - } - - #[instrument(skip_all)] - pub async fn remove(&self, id: &(PackageId, Version)) { - if let Some(man) = self.0.write().await.remove(id) { - if let Err(e) = man.exit().await { - tracing::error!("Error shutting down manager: {}", e); - tracing::debug!("{:?}", e); - } +impl Default for Gid { + fn default() -> Self { + Self { + next_gid: watch::channel(1), + main_gid: watch::channel(ProcessGroupId(1)), } } - - #[instrument(skip_all)] - pub async fn empty(&self) -> Result<(), Error> { - let res = - futures::future::join_all(std::mem::take(&mut *self.0.write().await).into_iter().map( - |((id, version), man)| async move { - tracing::debug!("Manager for {}@{} shutting down", id, version); - man.exit().await?; - tracing::debug!("Manager for {}@{} is shutdown", id, version); - if let Err(e) = Arc::try_unwrap(man) { - tracing::trace!( - "Manager for {}@{} still has {} other open references", - id, - version, - Arc::strong_count(&e) - 1 - ); - } - Ok::<_, Error>(()) - }, - )) - .await; - res.into_iter().fold(Ok(()), |res, x| match (res, x) { - (Ok(()), x) => x, - (Err(e), Ok(())) => Err(e), - (Err(e1), Err(e2)) => Err(Error::new(eyre!("{}, {}", e1.source, e2.source), e1.kind)), - }) +} +impl Gid { + pub fn new_gid(&self) -> ProcessGroupId { + let mut previous = 0; + self.next_gid.0.send_modify(|x| { + previous = *x; + *x = previous + 1; + }); + ProcessGroupId(previous) } - #[instrument(skip_all)] - pub async fn get(&self, id: &(PackageId, Version)) -> Option> { - self.0.read().await.get(id).cloned() + pub fn new_main_gid(&self) -> ProcessGroupId { + let gid = self.new_gid(); + self.main_gid.0.send(gid).unwrap_or_default(); + gid } } +/// This is the controller of the services. Here is where we can control a service with a start, stop, restart, etc. +#[derive(Clone)] pub struct Manager { - shared: Arc, - thread: Container>, + seed: Arc, + + manage_container: Arc, + transition: Arc>>, + persistent_container: ManagerPersistentContainer, + + pub gid: Arc, +} +impl Manager { + pub async fn new(ctx: RpcContext, manifest: Manifest) -> Result { + let seed = Arc::new(ManagerSeed { + ctx, + container_name: DockerProcedure::container_name(&manifest.id, None), + manifest, + }); + + let persistent_container = Arc::new(PersistentContainer::init(&seed).await?); + let manage_container = Arc::new( + manager_container::ManageContainer::new(seed.clone(), persistent_container.clone()) + .await?, + ); + let (transition, _) = watch::channel(Default::default()); + let transition = Arc::new(transition); + Ok(Self { + seed, + manage_container, + transition, + persistent_container, + gid: Default::default(), + }) + } + + pub fn start(&self) { + self._transition_abort(); + self.manage_container.to_desired(StartStop::Start); + } + pub fn stop(&self) { + self._transition_abort(); + self.manage_container.to_desired(StartStop::Stop); + } + pub async fn restart(&self) { + if self._is_transition_restart() { + return; + } + self._transition_replace(self._transition_restart()); + } + pub async fn configure( + &self, + configure_context: ConfigureContext, + ) -> Result, Error> { + if self._is_transition_configure() { + return Ok(configure_context.breakages); + } + let context = self.seed.ctx.clone(); + let id = self.seed.manifest.id.clone(); + + let breakages = configure(context, id, configure_context).await?; + self._transition_replace({ + let manage_container = self.manage_container.clone(); + let state_reverter = DesiredStateReverter::new(manage_container.clone()); + + let transition = self.transition.clone(); + TransitionState::Configuring( + tokio::spawn(async move { + manage_container.wait_for_desired(StartStop::Stop).await; + + state_reverter.revert().await; + transition.send_replace(Default::default()); + }) + .into(), + ) + }); + Ok(breakages) + } + pub async fn backup(&self, backup_guard: BackupGuard) -> BackupReturn { + if self._is_transition_backup() { + return BackupReturn::AlreadyRunning(PackageBackupReport { + error: Some("Can't do backup because service is in a backing up state".to_owned()), + }); + } + let (transition_state, done) = self._transition_backup(backup_guard); + self._transition_replace(transition_state); + done.await + } + pub async fn exit(&self) { + self._transition_abort(); + self.manage_container + .wait_for_desired(StartStop::Stop) + .await; + } + + /// A special exit that is overridden the start state, should only be called in the shutdown, where we remove other containers + async fn shutdown(&self) { + self.manage_container.lock_state_forever(&self.seed).await; + + self.exit().await; + } + + /// Used when we want to shutdown the service + pub async fn signal(&self, signal: Signal) -> Result<(), Error> { + let gid = self.gid.clone(); + send_signal(self, gid, signal).await + } + + /// Used as a getter, but also used in procedure + pub fn rpc_client(&self) -> Option> { + (*self.persistent_container) + .as_ref() + .map(|x| x.rpc_client()) + } + + fn _transition_abort(&self) { + if let Some(transition) = self + .transition + .send_replace(Default::default()) + .join_handle() + { + transition.abort(); + } + } + fn _transition_replace(&self, transition_state: TransitionState) { + self.transition + .send_replace(Arc::new(transition_state)) + .abort(); + } + + pub(super) fn perform_restart(&self) -> impl Future + 'static { + let manage_container = self.manage_container.clone(); + async move { + let restart_override = manage_container.set_override(Some(MainStatus::Restarting)); + manage_container.wait_for_desired(StartStop::Stop).await; + manage_container.wait_for_desired(StartStop::Start).await; + drop(restart_override); + } + } + fn _transition_restart(&self) -> TransitionState { + let transition = self.transition.clone(); + let restart = self.perform_restart(); + TransitionState::Restarting( + tokio::spawn(async move { + restart.await; + transition.send_replace(Default::default()); + }) + .into(), + ) + } + fn perform_backup( + &self, + backup_guard: BackupGuard, + ) -> impl Future, Error>> + 'static { + let manage_container = self.manage_container.clone(); + let seed = self.seed.clone(); + async move { + let state_reverter = DesiredStateReverter::new(manage_container.clone()); + let mut tx = seed.ctx.db.handle(); + let override_guard = manage_container + .set_override(Some(get_status(&mut tx, &seed.manifest).await.backing_up())); + manage_container.wait_for_desired(StartStop::Stop).await; + let backup_guard = backup_guard.lock().await; + let guard = backup_guard.mount_package_backup(&seed.manifest.id).await?; + + let res = seed + .manifest + .backup + .create( + &seed.ctx, + &mut tx, + &seed.manifest.id, + &seed.manifest.title, + &seed.manifest.version, + &seed.manifest.interfaces, + &seed.manifest.volumes, + ) + .await; + guard.unmount().await?; + drop(backup_guard); + + let return_value = res; + state_reverter.revert().await; + drop(override_guard); + Ok::<_, Error>(return_value) + } + } + fn _transition_backup( + &self, + backup_guard: BackupGuard, + ) -> (TransitionState, BoxFuture) { + let (send, done) = oneshot::channel(); + ( + TransitionState::BackingUp( + tokio::spawn( + self.perform_backup(backup_guard) + .then(finish_up_backup_task(self.transition.clone(), send)), + ) + .into(), + ), + done.map_err(|err| Error::new(eyre!("Oneshot error: {err:?}"), ErrorKind::Unknown)) + .map(flatten_backup_error) + .boxed(), + ) + } + fn _is_transition_restart(&self) -> bool { + let transition = self.transition.borrow(); + matches!(**transition, TransitionState::Restarting(_)) + } + fn _is_transition_backup(&self) -> bool { + let transition = self.transition.borrow(); + matches!(**transition, TransitionState::BackingUp(_)) + } + fn _is_transition_configure(&self) -> bool { + let transition = self.transition.borrow(); + matches!(**transition, TransitionState::Configuring(_)) + } +} + +#[instrument(skip_all)] +async fn configure( + ctx: RpcContext, + id: PackageId, + mut configure_context: ConfigureContext, +) -> Result, Error> { + let mut db = ctx.db.handle(); + let mut tx = db.begin().await?; + let db = &mut tx; + + let receipts = ConfigReceipts::new(db).await?; + let id = &id; + let ctx = &ctx; + let overrides = &mut configure_context.overrides; + // fetch data from db + let action = receipts + .config_actions + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + let dependencies = receipts + .dependencies + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + let volumes = receipts + .volumes + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + let version = receipts + .version + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + + // get current config and current spec + let ConfigRes { + config: old_config, + spec, + } = action.get(ctx, id, &version, &volumes).await?; + + // determine new config to use + let mut config = if let Some(config) = configure_context.config.or_else(|| old_config.clone()) { + config + } else { + spec.gen( + &mut rand::rngs::StdRng::from_entropy(), + &configure_context.timeout, + )? + }; + + let manifest = receipts + .manifest + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + + spec.validate(&manifest)?; + spec.matches(&config)?; // check that new config matches spec + spec.update( + ctx, + db, + &manifest, + overrides, + &mut config, + &receipts.config_receipts, + ) + .await?; // dereference pointers in the new config + + // create backreferences to pointers + let mut sys = receipts + .system_pointers + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + sys.truncate(0); + let mut current_dependencies: CurrentDependencies = CurrentDependencies( + dependencies + .0 + .iter() + .filter_map(|(id, info)| { + if info.requirement.required() { + Some((id.clone(), CurrentDependencyInfo::default())) + } else { + None + } + }) + .collect(), + ); + for ptr in spec.pointers(&config)? { + match ptr { + ValueSpecPointer::Package(pkg_ptr) => { + if let Some(current_dependency) = + current_dependencies.0.get_mut(pkg_ptr.package_id()) + { + current_dependency.pointers.push(pkg_ptr); + } else { + current_dependencies.0.insert( + pkg_ptr.package_id().to_owned(), + CurrentDependencyInfo { + pointers: vec![pkg_ptr], + health_checks: BTreeSet::new(), + }, + ); + } + } + ValueSpecPointer::System(s) => sys.push(s), + } + } + receipts.system_pointers.set(db, sys, id).await?; + + if !configure_context.dry_run { + // run config action + let res = action + .set(ctx, id, &version, &dependencies, &volumes, &config) + .await?; + + // track dependencies with no pointers + for (package_id, health_checks) in res.depends_on.into_iter() { + if let Some(current_dependency) = current_dependencies.0.get_mut(&package_id) { + current_dependency.health_checks.extend(health_checks); + } else { + current_dependencies.0.insert( + package_id, + CurrentDependencyInfo { + pointers: Vec::new(), + health_checks, + }, + ); + } + } + + // track dependency health checks + current_dependencies = current_dependencies.map(|x| { + x.into_iter() + .filter(|(dep_id, _)| { + if dep_id != id && !manifest.dependencies.0.contains_key(dep_id) { + tracing::warn!("Illegal dependency specified: {}", dep_id); + false + } else { + true + } + }) + .collect() + }); + } + + // update dependencies + let prev_current_dependencies = receipts + .current_dependencies + .get(db, id) + .await? + .unwrap_or_default(); + remove_from_current_dependents_lists( + db, + id, + &prev_current_dependencies, + &receipts.current_dependents, + ) + .await?; // remove previous + add_dependent_to_current_dependents_lists( + db, + id, + ¤t_dependencies, + &receipts.current_dependents, + ) + .await?; // add new + current_dependencies.0.remove(id); + receipts + .current_dependencies + .set(db, current_dependencies.clone(), id) + .await?; + + let errs = receipts + .dependency_errors + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + tracing::warn!("Dependency Errors: {:?}", errs); + let errs = DependencyErrors::init( + ctx, + db, + &manifest, + ¤t_dependencies, + &receipts.dependency_receipt.try_heal, + ) + .await?; + receipts.dependency_errors.set(db, errs, id).await?; + + // cache current config for dependents + configure_context + .overrides + .insert(id.clone(), config.clone()); + + // handle dependents + let dependents = receipts + .current_dependents + .get(db, id) + .await? + .ok_or_else(|| not_found!(id))?; + for (dependent, _dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) { + let dependent_container = receipts.docker_containers.get(db, dependent).await?; + let dependent_container = &dependent_container; + // check if config passes dependent check + if let Some(cfg) = receipts + .manifest_dependencies_config + .get(db, (dependent, id)) + .await? + { + let manifest = receipts + .manifest + .get(db, dependent) + .await? + .ok_or_else(|| not_found!(id))?; + if let Err(error) = cfg + .check( + ctx, + dependent_container, + dependent, + &manifest.version, + &manifest.volumes, + id, + &config, + ) + .await? + { + let dep_err = DependencyError::ConfigUnsatisfied { error }; + break_transitive( + db, + dependent, + id, + dep_err, + &mut configure_context.breakages, + &receipts.break_transitive_receipts, + ) + .await?; + } + + heal_all_dependents_transitive(ctx, db, id, &receipts.dependency_receipt).await?; + } + } + + receipts.configured.set(db, true, id).await?; + + if configure_context.dry_run { + tx.abort().await?; + } else { + tx.commit().await?; + } + + Ok(configure_context.breakages) +} + +struct DesiredStateReverter { + manage_container: Option>, + starting_state: StartStop, +} +impl DesiredStateReverter { + fn new(manage_container: Arc) -> Self { + let starting_state = *manage_container.desired_state().borrow(); + let manage_container = Some(manage_container); + Self { + starting_state, + manage_container, + } + } + async fn revert(mut self) { + if let Some(mut current_state) = self._revert() { + while *current_state.borrow() != self.starting_state { + current_state.changed().await.unwrap(); + } + } + } + fn _revert(&mut self) -> Option> { + if let Some(manage_container) = self.manage_container.take() { + manage_container.to_desired(self.starting_state); + + return Some(manage_container.desired_state()); + } + None + } +} +impl Drop for DesiredStateReverter { + fn drop(&mut self) { + self._revert(); + } +} + +type BackupDoneSender = oneshot::Sender>; + +fn finish_up_backup_task( + transition: Arc>>, + send: BackupDoneSender, +) -> impl FnOnce(Result, Error>) -> BoxFuture<'static, ()> { + move |result| { + async move { + transition.send_replace(Default::default()); + send.send(match result { + Ok(a) => a, + Err(e) => Err(e), + }) + .unwrap_or_default(); + } + .boxed() + } +} + +fn response_to_report(response: &Result) -> PackageBackupReport { + PackageBackupReport { + error: response.as_ref().err().map(|e| e.to_string()), + } +} +fn flatten_backup_error(input: Result, Error>) -> BackupReturn { + match input { + Ok(a) => BackupReturn::Ran { + report: response_to_report(&a), + res: a, + }, + Err(err) => BackupReturn::Error(err), + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -149,25 +658,6 @@ pub enum Status { Shutdown, } -struct ManagerSeed { - ctx: RpcContext, - manifest: Manifest, - container_name: String, -} - -pub struct ManagerSharedState { - seed: Arc, - persistent_container: Option, - status: (Sender, Receiver), - killer: Notify, - on_stop: Sender, - synchronized: Notify, - synchronize_now: Notify, - commit_health_check_results: AtomicBool, - next_gid: AtomicU32, - main_gid: (Sender, Receiver), -} - #[derive(Debug, Clone, Copy)] pub enum OnStop { Restart, @@ -175,15 +665,17 @@ pub enum OnStop { Exit, } +type RunMainResult = Result, Error>; + #[instrument(skip_all)] async fn run_main( - state: &Arc, -) -> Result, Error> { - let rt_state = state.clone(); - - let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image(rt_state))); - let ip = match state.persistent_container.is_some() { - false => Some(match get_running_ip(state, &mut runtime).await { + seed: Arc, + persistent_container: ManagerPersistentContainer, + started: Arc, +) -> RunMainResult { + let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image(seed.clone()))); + let ip = match persistent_container.is_some() { + false => Some(match get_running_ip(&seed, &mut runtime).await { GetRunningIp::Ip(x) => x, GetRunningIp::Error(e) => return Err(e), GetRunningIp::EarlyExit(x) => return Ok(x), @@ -192,18 +684,17 @@ async fn run_main( }; let svc = if let Some(ip) = ip { - Some(add_network_for_main(&*state.seed, ip).await?) + let net = add_network_for_main(&seed, ip).await?; + started(); + Some(net) } else { None }; - set_commit_health_true(state); - let health = main_health_check_daemon(state.clone()); - fetch_starting_to_running(state); + let health = main_health_check_daemon(seed.clone()); let res = tokio::select! { a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).and_then(|a| a), - _ = health => Err(Error::new(eyre!("Health check daemon exited!"), crate::ErrorKind::Unknown)), - _ = state.killer.notified() => Ok(Err((137, "Killed".to_string()))) + _ = health => Err(Error::new(eyre!("Health check daemon exited!"), crate::ErrorKind::Unknown)) }; if let Some(svc) = svc { remove_network_for_main(svc).await?; @@ -213,298 +704,21 @@ async fn run_main( /// We want to start up the manifest, but in this case we want to know that we have generated the certificates. /// Note for _generated_certificate: Needed to know that before we start the state we have generated the certificate -async fn start_up_image( - rt_state: Arc, -) -> Result, Error> { - rt_state - .seed - .manifest +async fn start_up_image(seed: Arc) -> Result, Error> { + seed.manifest .main .execute::<(), NoOutput>( - &rt_state.seed.ctx, - &rt_state.seed.manifest.id, - &rt_state.seed.manifest.version, + &seed.ctx, + &seed.manifest.id, + &seed.manifest.version, ProcedureName::Main, - &rt_state.seed.manifest.volumes, + &seed.manifest.volumes, None, None, ) .await } -impl Manager { - #[instrument(skip_all)] - async fn create(ctx: RpcContext, manifest: Manifest) -> Result { - let (on_stop, recv) = channel(OnStop::Sleep); - let seed = Arc::new(ManagerSeed { - ctx, - container_name: DockerProcedure::container_name(&manifest.id, None), - manifest, - }); - let persistent_container = PersistentContainer::init(&seed).await?; - let shared = Arc::new(ManagerSharedState { - seed, - persistent_container, - status: channel(Status::Stopped), - killer: Notify::new(), - on_stop, - synchronized: Notify::new(), - synchronize_now: Notify::new(), - commit_health_check_results: AtomicBool::new(true), - next_gid: AtomicU32::new(1), - main_gid: channel(ProcessGroupId(0)), - }); - shared.synchronize_now.notify_one(); - let thread_shared = shared.clone(); - let thread = NonDetachingJoinHandle::from(tokio::spawn(async move { - tokio::select! { - _ = manager_thread_loop(recv, &thread_shared) => (), - _ = synchronizer(&*thread_shared) => (), - } - })); - Ok(Manager { - shared, - thread: Container::new(Some(thread)), - }) - } - - pub async fn signal(&self, signal: &Signal) -> Result<(), Error> { - send_signal(&self.shared, signal).await - } - - #[instrument(skip_all)] - async fn exit(&self) -> Result<(), Error> { - self.shared - .commit_health_check_results - .store(false, Ordering::SeqCst); - let _ = self.shared.on_stop.send(OnStop::Exit); - - match self - .shared - .seed - .ctx - .docker - .stop_container( - &self.shared.seed.container_name, - Some(StopContainerOptions { - t: sigterm_timeout(&self.shared.seed.manifest) - .map(|d| d.as_secs()) - .unwrap_or(30) as i64, - }), - ) - .await - { - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 409, // CONFLICT - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 304, // NOT MODIFIED - .. - }) => (), // Already stopped - a => a?, - }; - self.shared.killer.notify_waiters(); - - if let Some(thread) = self.thread.take().await { - thread.await.map_err(|e| { - Error::new( - eyre!("Manager thread panicked: {}", e), - crate::ErrorKind::Docker, - ) - })?; - } - Ok(()) - } - /// this will depend on locks to main status. if you hold any locks when calling this function that conflict, this will deadlock - pub async fn synchronize(&self) { - self.shared.synchronize_now.notify_waiters(); - self.shared.synchronized.notified().await - } - - pub fn new_gid(&self) -> ProcessGroupId { - ProcessGroupId( - self.shared - .next_gid - .fetch_add(1, std::sync::atomic::Ordering::SeqCst), - ) - } - - pub fn new_main_gid(&self) -> ProcessGroupId { - let gid = self.new_gid(); - self.shared.main_gid.0.send_modify(|x| *x = gid); - gid - } - - pub fn rpc_client(&self) -> Option> { - self.shared - .persistent_container - .as_ref() - .map(|c| c.rpc_client.borrow().clone()) - } -} - -async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc) { - loop { - fn handle_stop_action<'a>( - recv: &'a mut Receiver, - ) -> ( - OnStop, - Option> + 'a>, - ) { - let val = *recv.borrow_and_update(); - match val { - OnStop::Sleep => (OnStop::Sleep, Some(recv.changed())), - a => (a, None), - } - } - let (stop_action, fut) = handle_stop_action(&mut recv); - match stop_action { - OnStop::Sleep => { - if let Some(fut) = fut { - let _ = thread_shared.status.0.send(Status::Stopped); - fut.await.unwrap(); - continue; - } - } - OnStop::Exit => { - let _ = thread_shared.status.0.send(Status::Shutdown); - break; - } - OnStop::Restart => { - let _ = thread_shared.status.0.send(Status::Running); - } - } - match run_main(thread_shared).await { - Ok(Ok(NoOutput)) => (), // restart - Ok(Err(e)) => { - #[cfg(feature = "unstable")] - { - use crate::notifications::NotificationLevel; - use crate::status::MainStatus; - let mut db = thread_shared.seed.ctx.db.handle(); - let started = crate::db::DatabaseModel::new() - .package_data() - .idx_model(&thread_shared.seed.manifest.id) - .and_then(|pde| pde.installed()) - .map::<_, MainStatus>(|i| i.status().main()) - .get(&mut db, false) - .await; - match started.as_deref() { - Ok(Some(MainStatus::Running { .. })) => { - let res = thread_shared.seed.ctx.notification_manager - .notify( - &mut db, - Some(thread_shared.seed.manifest.id.clone()), - NotificationLevel::Warning, - String::from("Service Crashed"), - format!("The service {} has crashed with the following exit code: {}\nDetails: {}", thread_shared.seed.manifest.id.clone(), e.0, e.1), - (), - Some(3600) // 1 hour - ) - .await; - if let Err(e) = res { - tracing::error!("Failed to issue notification: {}", e); - tracing::debug!("{:?}", e); - } - } - _ => { - tracing::error!("service just started. not issuing crash notification") - } - } - } - tracing::error!("service crashed: {}: {}", e.0, e.1); - tokio::time::sleep(Duration::from_secs(15)).await; - } - Err(e) => { - tracing::error!("failed to start service: {}", e); - tracing::debug!("{:?}", e); - } - } - } -} - -pub struct PersistentContainer { - _running_docker: NonDetachingJoinHandle<()>, - rpc_client: Receiver>, -} - -impl PersistentContainer { - #[instrument(skip_all)] - async fn init(seed: &Arc) -> Result, Error> { - Ok(if let Some(containers) = &seed.manifest.containers { - let (running_docker, rpc_client) = - spawn_persistent_container(seed.clone(), containers.main.clone()).await?; - Some(Self { - _running_docker: running_docker, - rpc_client, - }) - } else { - None - }) - } -} - -async fn spawn_persistent_container( - seed: Arc, - container: DockerContainer, -) -> Result<(NonDetachingJoinHandle<()>, Receiver>), Error> { - let (send_inserter, inserter) = oneshot::channel(); - Ok(( - tokio::task::spawn(async move { - let mut inserter_send: Option>> = None; - let mut send_inserter: Option>>> = Some(send_inserter); - loop { - if let Err(e) = async { - let (mut runtime, inserter) = - long_running_docker(&seed, &container).await?; - - let ip = match get_long_running_ip(&*seed, &mut runtime).await { - GetRunningIp::Ip(x) => x, - GetRunningIp::Error(e) => return Err(e), - GetRunningIp::EarlyExit(e) => { - tracing::error!("Early Exit"); - tracing::debug!("{:?}", e); - return Ok(()); - } - }; - let svc = add_network_for_main(&*seed, ip).await?; - - if let Some(inserter_send) = inserter_send.as_mut() { - let _ = inserter_send.send(Arc::new(inserter)); - } else { - let (s, r) = channel(Arc::new(inserter)); - inserter_send = Some(s); - if let Some(send_inserter) = send_inserter.take() { - let _ = send_inserter.send(r); - } - } - - let res = tokio::select! { - a = runtime.running_output => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()), - }; - - remove_network_for_main(svc).await?; - - res - }.await { - tracing::error!("Error in persistent container: {}", e); - tracing::debug!("{:?}", e); - } else { - break; - } - tokio::time::sleep(Duration::from_millis(200)).await; - } - }) - .into(), - inserter.await.map_err(|_| Error::new(eyre!("Container handle dropped before inserter sent"), crate::ErrorKind::Unknown))?, - )) -} - async fn long_running_docker( seed: &ManagerSeed, container: &DockerContainer, @@ -519,47 +733,61 @@ async fn long_running_docker( .await } -async fn remove_network_for_main(svc: NetService) -> Result<(), Error> { - svc.remove_all().await +enum GetRunningIp { + Ip(Ipv4Addr), + Error(Error), + EarlyExit(Result), } -fn fetch_starting_to_running(state: &Arc) { - let _ = state.status.0.send_modify(|x| { - if *x == Status::Starting { - *x = Status::Running; - } - }); -} - -async fn main_health_check_daemon(state: Arc) { - tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_GRACE_PERIOD_SECONDS)).await; +async fn get_long_running_ip(seed: &ManagerSeed, runtime: &mut LongRunning) -> GetRunningIp { loop { - let mut db = state.seed.ctx.db.handle(); - if let Err(e) = health::check( - &state.seed.ctx, - &mut db, - &state.seed.manifest.id, - &state.commit_health_check_results, - ) - .await - { - tracing::error!( - "Failed to run health check for {}: {}", - &state.seed.manifest.id, - e - ); - tracing::debug!("{:?}", e); + match container_inspect(seed).await { + Ok(res) => { + match res + .network_settings + .and_then(|ns| ns.networks) + .and_then(|mut n| n.remove("start9")) + .and_then(|es| es.ip_address) + .filter(|ip| !ip.is_empty()) + .map(|ip| ip.parse()) + .transpose() + { + Ok(Some(ip_addr)) => return GetRunningIp::Ip(ip_addr), + Ok(None) => (), + Err(e) => return GetRunningIp::Error(e.into()), + } + } + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) => (), + Err(e) => return GetRunningIp::Error(e.into()), + } + if let Poll::Ready(res) = futures::poll!(&mut runtime.running_output) { + match res { + Ok(_) => return GetRunningIp::EarlyExit(Ok(NoOutput)), + Err(_e) => { + return GetRunningIp::Error(Error::new( + eyre!("Manager runtime panicked!"), + crate::ErrorKind::Docker, + )) + } + } } - tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await; } } -fn set_commit_health_true(state: &Arc) { - state - .commit_health_check_results - .store(true, Ordering::SeqCst); +#[instrument(skip(seed))] +async fn container_inspect( + seed: &ManagerSeed, +) -> Result { + seed.ctx + .docker + .inspect_container(&seed.container_name, None) + .await } +#[instrument(skip(seed))] async fn add_network_for_main( seed: &ManagerSeed, ip: std::net::Ipv4Addr, @@ -597,20 +825,33 @@ async fn add_network_for_main( Ok(svc) } -enum GetRunningIp { - Ip(Ipv4Addr), - Error(Error), - EarlyExit(Result), +#[instrument(skip(svc))] +async fn remove_network_for_main(svc: NetService) -> Result<(), Error> { + svc.remove_all().await +} + +async fn main_health_check_daemon(seed: Arc) { + tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_GRACE_PERIOD_SECONDS)).await; + loop { + let mut db = seed.ctx.db.handle(); + if let Err(e) = health::check(&seed.ctx, &mut db, &seed.manifest.id).await { + tracing::error!( + "Failed to run health check for {}: {}", + &seed.manifest.id, + e + ); + tracing::debug!("{:?}", e); + } + tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await; + } } type RuntimeOfCommand = NonDetachingJoinHandle, Error>>; -async fn get_running_ip( - state: &Arc, - mut runtime: &mut RuntimeOfCommand, -) -> GetRunningIp { +#[instrument(skip(seed, runtime))] +async fn get_running_ip(seed: &ManagerSeed, mut runtime: &mut RuntimeOfCommand) -> GetRunningIp { loop { - match container_inspect(&*state.seed).await { + match container_inspect(seed).await { Ok(res) => { match res .network_settings @@ -660,172 +901,29 @@ async fn get_running_ip( } } -async fn get_long_running_ip(seed: &ManagerSeed, runtime: &mut LongRunning) -> GetRunningIp { - loop { - match container_inspect(seed).await { - Ok(res) => { - match res - .network_settings - .and_then(|ns| ns.networks) - .and_then(|mut n| n.remove("start9")) - .and_then(|es| es.ip_address) - .filter(|ip| !ip.is_empty()) - .map(|ip| ip.parse()) - .transpose() - { - Ok(Some(ip_addr)) => return GetRunningIp::Ip(ip_addr), - Ok(None) => (), - Err(e) => return GetRunningIp::Error(e.into()), - } - } - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) => (), - Err(e) => return GetRunningIp::Error(e.into()), - } - if let Poll::Ready(res) = futures::poll!(&mut runtime.running_output) { - match res { - Ok(_) => return GetRunningIp::EarlyExit(Ok(NoOutput)), - Err(_e) => { - return GetRunningIp::Error(Error::new( - eyre!("Manager runtime panicked!"), - crate::ErrorKind::Docker, - )) - } - } - } - } -} - -async fn container_inspect( - seed: &ManagerSeed, -) -> Result { - seed.ctx - .docker - .inspect_container(&seed.container_name, None) - .await -} - -async fn wait_for_status(shared: &ManagerSharedState, status: Status) { - let mut recv = shared.status.0.subscribe(); - while { - let s = *recv.borrow(); - s != status - } { - if recv.changed().await.is_ok() { - break; - } - } -} - -fn sigterm_timeout(manifest: &Manifest) -> Option { - if let PackageProcedure::Docker(d) = &manifest.main { - d.sigterm_timeout.map(|d| *d) - } else if let Some(c) = &manifest.containers { - c.main.sigterm_timeout.map(|d| *d) - } else { - None - } -} - -#[instrument(skip_all)] -async fn stop(shared: &ManagerSharedState) -> Result<(), Error> { - shared - .commit_health_check_results - .store(false, Ordering::SeqCst); - shared.on_stop.send_modify(|status| { - if matches!(*status, OnStop::Restart) { - *status = OnStop::Sleep; - } - }); - if *shared.status.1.borrow() == Status::Paused { - resume(shared).await?; - } - send_signal(shared, &Signal::SIGTERM).await?; - let _ = tokio::time::timeout( - sigterm_timeout(&shared.seed.manifest).unwrap_or(Duration::from_secs(30)), - wait_for_status(shared, Status::Stopped), - ) - .await; - shared.killer.notify_waiters(); - - Ok(()) -} - -#[instrument(skip_all)] -async fn start(shared: &ManagerSharedState) -> Result<(), Error> { - shared.on_stop.send_modify(|status| { - if matches!(*status, OnStop::Sleep) { - *status = OnStop::Restart; - } - }); - let _ = shared.status.0.send_modify(|x| { - if *x != Status::Running { - *x = Status::Starting - } - }); - Ok(()) -} - -#[instrument(skip_all)] -async fn pause(shared: &ManagerSharedState) -> Result<(), Error> { - if let Err(e) = shared - .seed - .ctx - .docker - .pause_container(&shared.seed.container_name) - .await - { - tracing::error!("failed to pause container. stopping instead. {}", e); - tracing::debug!("{:?}", e); - return stop(shared).await; - } - let _ = shared.status.0.send(Status::Paused); - Ok(()) -} - -#[instrument(skip_all)] -async fn resume(shared: &ManagerSharedState) -> Result<(), Error> { - shared - .seed - .ctx - .docker - .unpause_container(&shared.seed.container_name) - .await?; - let _ = shared.status.0.send(Status::Running); - Ok(()) -} - -async fn send_signal(shared: &ManagerSharedState, signal: &Signal) -> Result<(), Error> { +async fn send_signal(manager: &Manager, gid: Arc, signal: Signal) -> Result<(), Error> { // stop health checks from committing their results - shared - .commit_health_check_results - .store(false, Ordering::SeqCst); + // shared + // .commit_health_check_results + // .store(false, Ordering::SeqCst); - if let Some(rpc_client) = shared - .persistent_container - .as_ref() - .map(|c| c.rpc_client.borrow().clone()) - { + if let Some(rpc_client) = manager.rpc_client() { + let main_gid = *gid.main_gid.0.borrow(); + let next_gid = gid.new_gid(); #[cfg(feature = "js_engine")] - if let Err(e) = JsProcedure::default() + if let Err(e) = crate::procedure::js_scripts::JsProcedure::default() .execute::<_, NoOutput>( - &shared.seed.ctx.datadir, - &shared.seed.manifest.id, - &shared.seed.manifest.version, + &manager.seed.ctx.datadir, + &manager.seed.manifest.id, + &manager.seed.manifest.version, ProcedureName::Signal, - &shared.seed.manifest.volumes, - Some(SignalGroupParams { - gid: shared.main_gid.1.apply_ref(|g| *g.borrow()), - signal: *signal as u32, + &manager.seed.manifest.volumes, + Some(embassy_container_init::SignalGroupParams { + gid: main_gid, + signal: signal as u32, }), None, // TODO - ProcessGroupId( - shared - .next_gid - .fetch_add(1, std::sync::atomic::Ordering::SeqCst), - ), + next_gid, Some(rpc_client), ) .await? @@ -835,13 +933,13 @@ async fn send_signal(shared: &ManagerSharedState, signal: &Signal) -> Result<(), } } else { // send signal to container - shared + manager .seed .ctx .docker .kill_container( - &shared.seed.container_name, - Some(KillContainerOptions { + &manager.seed.container_name, + Some(bollard::container::KillContainerOptions { signal: signal.to_string(), }), ) diff --git a/backend/src/manager/persistent_container.rs b/backend/src/manager/persistent_container.rs new file mode 100644 index 000000000..d9868a622 --- /dev/null +++ b/backend/src/manager/persistent_container.rs @@ -0,0 +1,101 @@ +use std::sync::Arc; +use std::time::Duration; + +use color_eyre::eyre::eyre; +use helpers::UnixRpcClient; +use tokio::sync::oneshot; +use tokio::sync::watch::{self, Receiver}; +use tracing::instrument; + +use super::manager_seed::ManagerSeed; +use super::{ + add_network_for_main, get_long_running_ip, long_running_docker, remove_network_for_main, + GetRunningIp, +}; +use crate::procedure::docker::DockerContainer; +use crate::util::NonDetachingJoinHandle; +use crate::Error; + +/// Persistant container are the old containers that need to run all the time +/// The goal is that all services will be persistent containers, waiting to run the main system. +pub struct PersistentContainer { + _running_docker: NonDetachingJoinHandle<()>, + pub rpc_client: Receiver>, +} + +impl PersistentContainer { + #[instrument(skip_all)] + pub async fn init(seed: &Arc) -> Result, Error> { + Ok(if let Some(containers) = &seed.manifest.containers { + let (running_docker, rpc_client) = + spawn_persistent_container(seed.clone(), containers.main.clone()).await?; + Some(Self { + _running_docker: running_docker, + rpc_client, + }) + } else { + None + }) + } + + pub fn rpc_client(&self) -> Arc { + self.rpc_client.borrow().clone() + } +} + +pub async fn spawn_persistent_container( + seed: Arc, + container: DockerContainer, +) -> Result<(NonDetachingJoinHandle<()>, Receiver>), Error> { + let (send_inserter, inserter) = oneshot::channel(); + Ok(( + tokio::task::spawn(async move { + let mut inserter_send: Option>> = None; + let mut send_inserter: Option>>> = Some(send_inserter); + loop { + if let Err(e) = async { + let (mut runtime, inserter) = + long_running_docker(&seed, &container).await?; + + + let ip = match get_long_running_ip(&seed, &mut runtime).await { + GetRunningIp::Ip(x) => x, + GetRunningIp::Error(e) => return Err(e), + GetRunningIp::EarlyExit(e) => { + tracing::error!("Early Exit"); + tracing::debug!("{:?}", e); + return Ok(()); + } + }; + let svc = add_network_for_main(&seed, ip).await?; + + if let Some(inserter_send) = inserter_send.as_mut() { + let _ = inserter_send.send(Arc::new(inserter)); + } else { + let (s, r) = watch::channel(Arc::new(inserter)); + inserter_send = Some(s); + if let Some(send_inserter) = send_inserter.take() { + let _ = send_inserter.send(r); + } + } + + let res = tokio::select! { + a = runtime.running_output => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()), + }; + + remove_network_for_main(svc).await?; + + res + }.await { + tracing::error!("Error in persistent container: {}", e); + tracing::debug!("{:?}", e); + } else { + break; + } + tokio::time::sleep(Duration::from_millis(200)).await; + } + }) + .into(), + inserter.await.map_err(|_| Error::new(eyre!("Container handle dropped before inserter sent"), crate::ErrorKind::Unknown))?, + )) +} diff --git a/backend/src/manager/start_stop.rs b/backend/src/manager/start_stop.rs new file mode 100644 index 000000000..66d0b112b --- /dev/null +++ b/backend/src/manager/start_stop.rs @@ -0,0 +1,26 @@ +use crate::status::MainStatus; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum StartStop { + Start, + Stop, +} + +impl StartStop { + pub(crate) fn is_start(&self) -> bool { + matches!(self, StartStop::Start) + } +} +impl From for StartStop { + fn from(value: MainStatus) -> Self { + match value { + MainStatus::Stopped => StartStop::Stop, + MainStatus::Restarting => StartStop::Start, + MainStatus::Stopping => StartStop::Stop, + MainStatus::Starting => StartStop::Start, + MainStatus::Running { started, health } => StartStop::Start, + MainStatus::BackingUp { started, health } if started.is_some() => StartStop::Start, + MainStatus::BackingUp { started, health } => StartStop::Stop, + } + } +} diff --git a/backend/src/manager/sync.rs b/backend/src/manager/sync.rs deleted file mode 100644 index 41a6445c5..000000000 --- a/backend/src/manager/sync.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::collections::BTreeMap; -use std::time::Duration; - -use chrono::Utc; - -use super::{pause, resume, start, stop, ManagerSharedState, Status}; -use crate::status::MainStatus; -use crate::Error; - -/// Allocates a db handle. DO NOT CALL with a db handle already in scope -async fn synchronize_once(shared: &ManagerSharedState) -> Result { - let mut db = shared.seed.ctx.db.handle(); - let mut status = crate::db::DatabaseModel::new() - .package_data() - .idx_model(&shared.seed.manifest.id) - .expect(&mut db) - .await? - .installed() - .expect(&mut db) - .await? - .status() - .main() - .get_mut(&mut db) - .await?; - let manager_status = *shared.status.1.borrow(); - match manager_status { - Status::Stopped => match &mut *status { - MainStatus::Stopped => (), - MainStatus::Stopping => { - *status = MainStatus::Stopped; - } - MainStatus::Restarting => { - *status = MainStatus::Starting { restarting: true }; - } - MainStatus::Starting { .. } => { - start(shared).await?; - } - MainStatus::Running { started, .. } => { - *started = Utc::now(); - start(shared).await?; - } - MainStatus::BackingUp { .. } => (), - }, - Status::Starting => match *status { - MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { - stop(shared).await?; - } - MainStatus::Starting { .. } | MainStatus::Running { .. } => (), - MainStatus::BackingUp { .. } => { - pause(shared).await?; - } - }, - Status::Running => match *status { - MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { - stop(shared).await?; - } - MainStatus::Starting { .. } => { - *status = MainStatus::Running { - started: Utc::now(), - health: BTreeMap::new(), - }; - } - MainStatus::Running { .. } => (), - MainStatus::BackingUp { .. } => { - pause(shared).await?; - } - }, - Status::Paused => match *status { - MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { - stop(shared).await?; - } - MainStatus::Starting { .. } | MainStatus::Running { .. } => { - resume(shared).await?; - } - MainStatus::BackingUp { .. } => (), - }, - Status::Shutdown => (), - } - status.save(&mut db).await?; - Ok(manager_status) -} - -pub async fn synchronizer(shared: &ManagerSharedState) { - let mut status_recv = shared.status.0.subscribe(); - loop { - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(5)) => (), - _ = shared.synchronize_now.notified() => (), - _ = status_recv.changed() => (), - } - let status = match synchronize_once(shared).await { - Err(e) => { - tracing::error!( - "Synchronizer for {}@{} failed: {}", - shared.seed.manifest.id, - shared.seed.manifest.version, - e - ); - tracing::debug!("{:?}", e); - continue; - } - Ok(status) => status, - }; - tracing::trace!("{} status synchronized", shared.seed.manifest.id); - shared.synchronized.notify_waiters(); - match status { - Status::Shutdown => { - break; - } - _ => (), - } - } -} diff --git a/backend/src/manager/transition_state.rs b/backend/src/manager/transition_state.rs new file mode 100644 index 000000000..6826aef09 --- /dev/null +++ b/backend/src/manager/transition_state.rs @@ -0,0 +1,30 @@ +use helpers::NonDetachingJoinHandle; + +/// Used only in the manager/mod and is used to keep track of the state of the manager during the +/// transitional states +pub(super) enum TransitionState { + BackingUp(NonDetachingJoinHandle<()>), + Restarting(NonDetachingJoinHandle<()>), + Configuring(NonDetachingJoinHandle<()>), + None, +} + +impl TransitionState { + pub(super) fn join_handle(&self) -> Option<&NonDetachingJoinHandle<()>> { + Some(match self { + TransitionState::BackingUp(a) => a, + TransitionState::Restarting(a) => a, + TransitionState::Configuring(a) => a, + TransitionState::None => return None, + }) + } + pub(super) fn abort(&self) { + self.join_handle().map(|transition| transition.abort()); + } +} + +impl Default for TransitionState { + fn default() -> Self { + TransitionState::None + } +} diff --git a/backend/src/migration.rs b/backend/src/migration.rs index 56d3e50e0..3961907e7 100644 --- a/backend/src/migration.rs +++ b/backend/src/migration.rs @@ -34,7 +34,7 @@ impl Migrations { ) -> Result<(), Error> { for (version, migration) in &self.from { migration - .validate(container, eos_version, volumes, image_ids, true) + .validate(eos_version, volumes, image_ids, true) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, @@ -44,7 +44,7 @@ impl Migrations { } for (version, migration) in &self.to { migration - .validate(container, eos_version, volumes, image_ids, true) + .validate(eos_version, volumes, image_ids, true) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, diff --git a/backend/src/net/static_server.rs b/backend/src/net/static_server.rs index d191d7724..8b6869b2c 100644 --- a/backend/src/net/static_server.rs +++ b/backend/src/net/static_server.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::fs::Metadata; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -10,7 +9,6 @@ use digest::Digest; use futures::FutureExt; use http::header::ACCEPT_ENCODING; use http::request::Parts as RequestParts; -use http::response::Builder; use hyper::{Body, Method, Request, Response, StatusCode}; use include_dir::{include_dir, Dir}; use new_mime_guess::MimeGuess; diff --git a/backend/src/net/utils.rs b/backend/src/net/utils.rs index 80bfa7ca0..e496bd1f7 100644 --- a/backend/src/net/utils.rs +++ b/backend/src/net/utils.rs @@ -152,7 +152,7 @@ impl hyper::server::accept::Accept for TcpListeners { type Error = std::io::Error; fn poll_accept( - mut self: std::pin::Pin<&mut Self>, + self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll>> { for listener in self.listeners.iter() { diff --git a/backend/src/procedure/docker.rs b/backend/src/procedure/docker.rs index 59fe84399..0f8a71dda 100644 --- a/backend/src/procedure/docker.rs +++ b/backend/src/procedure/docker.rs @@ -8,7 +8,6 @@ use std::time::Duration; use async_stream::stream; use bollard::container::RemoveContainerOptions; -use chrono::format::Item; use color_eyre::eyre::eyre; use color_eyre::Report; use futures::future::{BoxFuture, Either as EitherFuture}; @@ -199,7 +198,7 @@ impl DockerProcedure { image_ids: &BTreeSet, expected_io: bool, ) -> Result<(), color_eyre::eyre::Report> { - for (volume, _) in &self.mounts { + for volume in self.mounts.keys() { if !volumes.contains_key(volume) && !matches!(&volume, &VolumeId::Backup) { color_eyre::eyre::bail!("unknown volume: {}", volume); } @@ -229,7 +228,7 @@ impl DockerProcedure { timeout: Option, ) -> Result, Error> { let name = name.docker_name(); - let name: Option<&str> = name.as_ref().map(|x| &**x); + let name: Option<&str> = name.as_deref(); let mut cmd = tokio::process::Command::new("docker"); let container_name = Self::container_name(pkg_id, name); cmd.arg("run") @@ -408,8 +407,6 @@ impl DockerProcedure { input: Option, timeout: Option, ) -> Result, Error> { - let name = name.docker_name(); - let name: Option<&str> = name.as_deref(); let mut cmd = tokio::process::Command::new("docker"); cmd.arg("exec"); @@ -559,7 +556,7 @@ impl DockerProcedure { pkg_version: &Version, volumes: &Volumes, input: Option, - timeout: Option, + _timeout: Option, ) -> Result, Error> { let mut cmd = tokio::process::Command::new("docker"); cmd.arg("run").arg("--rm").arg("--network=none"); @@ -726,7 +723,7 @@ impl DockerProcedure { if fty.is_block_device() || fty.is_char_device() { res.push(entry.path()); } else if fty.is_dir() { - get_devices(&*entry.path(), res).await?; + get_devices(&entry.path(), res).await?; } } Ok(()) @@ -745,7 +742,7 @@ impl DockerProcedure { res.push(OsStr::new("--entrypoint").into()); res.push(OsStr::new(&self.entrypoint).into()); if self.system { - res.push(OsString::from(self.image.for_package(&*SYSTEM_PACKAGE_ID, None)).into()); + res.push(OsString::from(self.image.for_package(&SYSTEM_PACKAGE_ID, None)).into()); } else { res.push(OsString::from(self.image.for_package(pkg_id, Some(pkg_version))).into()); } @@ -833,7 +830,7 @@ impl LongRunning { .arg("'{{.Architecture}}'"); if docker.system { - cmd.arg(docker.image.for_package(&*SYSTEM_PACKAGE_ID, None)); + cmd.arg(docker.image.for_package(&SYSTEM_PACKAGE_ID, None)); } else { cmd.arg(docker.image.for_package(pkg_id, Some(pkg_version))); } @@ -855,7 +852,7 @@ impl LongRunning { input = socket_path.display() )) .arg("--name") - .arg(&container_name) + .arg(container_name) .arg(format!("--hostname={}", &container_name)) .arg("--entrypoint") .arg(format!("{INIT_EXEC}.{image_architecture}")) @@ -885,7 +882,7 @@ impl LongRunning { } cmd.arg("--log-driver=journald"); if docker.system { - cmd.arg(docker.image.for_package(&*SYSTEM_PACKAGE_ID, None)); + cmd.arg(docker.image.for_package(&SYSTEM_PACKAGE_ID, None)); } else { cmd.arg(docker.image.for_package(pkg_id, Some(pkg_version))); } diff --git a/backend/src/procedure/mod.rs b/backend/src/procedure/mod.rs index 1f7af9173..e75445fec 100644 --- a/backend/src/procedure/mod.rs +++ b/backend/src/procedure/mod.rs @@ -8,7 +8,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tracing::instrument; -use self::docker::{DockerContainers, DockerProcedure}; +use self::docker::DockerProcedure; use crate::context::RpcContext; use crate::s9pk::manifest::PackageId; use crate::util::Version; @@ -43,7 +43,6 @@ impl PackageProcedure { #[instrument(skip_all)] pub fn validate( &self, - container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, @@ -83,25 +82,21 @@ impl PackageProcedure { } #[cfg(feature = "js_engine")] PackageProcedure::Script(procedure) => { - let (gid, rpc_client) = match ctx + let man = ctx .managers .get(&(pkg_id.clone(), pkg_version.clone())) .await - { - None => { - return Err(Error::new( + .ok_or_else(|| { + Error::new( eyre!("No manager found for {}", pkg_id), ErrorKind::NotFound, - )) - } - Some(man) => ( - if matches!(name, ProcedureName::Main) { - man.new_main_gid() - } else { - man.new_gid() - }, - man.rpc_client(), - ), + ) + })?; + let rpc_client = man.rpc_client(); + let gid = if matches!(name, ProcedureName::Main) { + man.gid.new_main_gid() + } else { + man.gid.new_gid() }; procedure @@ -124,7 +119,6 @@ impl PackageProcedure { #[instrument(skip_all)] pub async fn sandboxed( &self, - container: &Option, ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, diff --git a/backend/src/s9pk/reader.rs b/backend/src/s9pk/reader.rs index 39e6bdf2d..98ccb8fc1 100644 --- a/backend/src/s9pk/reader.rs +++ b/backend/src/s9pk/reader.rs @@ -185,18 +185,14 @@ impl S9pkReader { .map(|i| i.validate(&man.id, &man.version).map(|_| i.image_id)) .collect::, _>>()?; man.description.validate()?; - man.actions - .0 - .iter() - .map(|(_, action)| { - action.validate( - containers, - &man.eos_version, - &man.volumes, - &validated_image_ids, - ) - }) - .collect::>()?; + man.actions.0.iter().try_for_each(|(_, action)| { + action.validate( + containers, + &man.eos_version, + &man.volumes, + &validated_image_ids, + ) + })?; man.backup.validate( containers, &man.eos_version, @@ -211,21 +207,11 @@ impl S9pkReader { &validated_image_ids, )?; } - man.health_checks.validate( - containers, - &man.eos_version, - &man.volumes, - &validated_image_ids, - )?; + man.health_checks + .validate(&man.eos_version, &man.volumes, &validated_image_ids)?; man.interfaces.validate()?; man.main - .validate( - containers, - &man.eos_version, - &man.volumes, - &validated_image_ids, - false, - ) + .validate(&man.eos_version, &man.volumes, &validated_image_ids, false) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Main"))?; man.migrations.validate( containers, @@ -273,13 +259,7 @@ impl S9pkReader { } if let Some(props) = &man.properties { props - .validate( - containers, - &man.eos_version, - &man.volumes, - &validated_image_ids, - true, - ) + .validate(&man.eos_version, &man.volumes, &validated_image_ids, true) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Properties"))?; } man.volumes.validate(&man.interfaces)?; @@ -387,7 +367,7 @@ impl S9pkReader { }) } - pub async fn manifest_raw<'a>(&'a mut self) -> Result, Error> { + pub async fn manifest_raw(&mut self) -> Result, Error> { self.read_handle(self.toc.manifest).await } @@ -397,27 +377,27 @@ impl S9pkReader { .with_ctx(|_| (crate::ErrorKind::ParseS9pk, "Deserializing Manifest (CBOR)")) } - pub async fn license<'a>(&'a mut self) -> Result, Error> { - Ok(self.read_handle(self.toc.license).await?) + pub async fn license(&mut self) -> Result, Error> { + self.read_handle(self.toc.license).await } - pub async fn instructions<'a>(&'a mut self) -> Result, Error> { - Ok(self.read_handle(self.toc.instructions).await?) + pub async fn instructions(&mut self) -> Result, Error> { + self.read_handle(self.toc.instructions).await } - pub async fn icon<'a>(&'a mut self) -> Result, Error> { - Ok(self.read_handle(self.toc.icon).await?) + pub async fn icon(&mut self) -> Result, Error> { + self.read_handle(self.toc.icon).await } - pub async fn docker_images<'a>(&'a mut self) -> Result>, Error> { + pub async fn docker_images(&mut self) -> Result>, Error> { DockerReader::new(self.read_handle(self.toc.docker_images).await?).await } - pub async fn assets<'a>(&'a mut self) -> Result, Error> { - Ok(self.read_handle(self.toc.assets).await?) + pub async fn assets(&mut self) -> Result, Error> { + self.read_handle(self.toc.assets).await } - pub async fn scripts<'a>(&'a mut self) -> Result>, Error> { + pub async fn scripts(&mut self) -> Result>, Error> { Ok(match self.toc.scripts { None => None, Some(a) => Some(self.read_handle(a).await?), diff --git a/backend/src/setup.rs b/backend/src/setup.rs index ea825cf63..a404d9f1d 100644 --- a/backend/src/setup.rs +++ b/backend/src/setup.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use std::time::Duration; use color_eyre::eyre::eyre; -use futures::StreamExt; use josekit::jwk::Jwk; use openssl::x509::X509; use patch_db::DbHandle; diff --git a/backend/src/shutdown.rs b/backend/src/shutdown.rs index 47dc3c432..8c5c515cb 100644 --- a/backend/src/shutdown.rs +++ b/backend/src/shutdown.rs @@ -8,7 +8,7 @@ use crate::disk::main::export; use crate::init::{STANDBY_MODE_PATH, SYSTEM_REBUILD_PATH}; use crate::sound::SHUTDOWN; use crate::util::{display_none, Invoke}; -use crate::{Error, ErrorKind, OS_ARCH}; +use crate::{Error, OS_ARCH}; #[derive(Debug, Clone)] pub struct Shutdown { @@ -72,18 +72,16 @@ impl Shutdown { Command::new("sync").spawn().unwrap().wait().unwrap(); } Command::new("reboot").spawn().unwrap().wait().unwrap(); + } else if self.restart { + Command::new("reboot").spawn().unwrap().wait().unwrap(); } else { - if self.restart { - Command::new("reboot").spawn().unwrap().wait().unwrap(); - } else { - Command::new("shutdown") - .arg("-h") - .arg("now") - .spawn() - .unwrap() - .wait() - .unwrap(); - } + Command::new("shutdown") + .arg("-h") + .arg("now") + .spawn() + .unwrap() + .wait() + .unwrap(); } } } diff --git a/backend/src/ssh.rs b/backend/src/ssh.rs index 486a75097..697e05727 100644 --- a/backend/src/ssh.rs +++ b/backend/src/ssh.rs @@ -4,8 +4,7 @@ use chrono::Utc; use clap::ArgMatches; use color_eyre::eyre::eyre; use rpc_toolkit::command; -use sqlx::{Executor, Pool, Postgres}; -use ssh_key::private::Ed25519PrivateKey; +use sqlx::{Pool, Postgres}; use tracing::instrument; use crate::context::RpcContext; diff --git a/backend/src/status/health_check.rs b/backend/src/status/health_check.rs index e7b412e8a..64155b0ce 100644 --- a/backend/src/status/health_check.rs +++ b/backend/src/status/health_check.rs @@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize}; use tracing::instrument; use crate::context::RpcContext; -use crate::procedure::docker::DockerContainers; use crate::procedure::{NoOutput, PackageProcedure, ProcedureName}; use crate::s9pk::manifest::PackageId; use crate::util::serde::Duration; @@ -21,15 +20,14 @@ impl HealthChecks { #[instrument(skip_all)] pub fn validate( &self, - container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, ) -> Result<(), Error> { - for (_, check) in &self.0 { + for check in self.0.values() { check .implementation - .validate(container, eos_version, &volumes, image_ids, false) + .validate(eos_version, volumes, image_ids, false) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, @@ -42,7 +40,6 @@ impl HealthChecks { pub async fn check_all( &self, ctx: &RpcContext, - container: &Option, started: DateTime, pkg_id: &PackageId, pkg_version: &Version, @@ -52,7 +49,7 @@ impl HealthChecks { Ok::<_, Error>(( id.clone(), check - .check(ctx, container, id, started, pkg_id, pkg_version, volumes) + .check(ctx, id, started, pkg_id, pkg_version, volumes) .await?, )) })) @@ -75,7 +72,6 @@ impl HealthCheck { pub async fn check( &self, ctx: &RpcContext, - container: &Option, id: &HealthCheckId, started: DateTime, pkg_id: &PackageId, diff --git a/backend/src/status/mod.rs b/backend/src/status/mod.rs index d39737fb3..eba367580 100644 --- a/backend/src/status/mod.rs +++ b/backend/src/status/mod.rs @@ -26,9 +26,7 @@ pub enum MainStatus { Stopped, Restarting, Stopping, - Starting { - restarting: bool, - }, + Starting, Running { started: DateTime, health: BTreeMap, @@ -73,6 +71,17 @@ impl MainStatus { MainStatus::Starting { .. } => None, } } + pub fn backing_up(&self) -> Self { + let (started, health) = match self { + MainStatus::Starting { .. } => (Some(Utc::now()), Default::default()), + MainStatus::Running { started, health } => (Some(started.clone()), health.clone()), + MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { + (None, Default::default()) + } + MainStatus::BackingUp { .. } => return self.clone(), + }; + MainStatus::BackingUp { started, health } + } } impl MainStatusModel { pub fn started(self) -> Model>> { diff --git a/backend/src/update/mod.rs b/backend/src/update/mod.rs index 90ac2870c..3322c403e 100644 --- a/backend/src/update/mod.rs +++ b/backend/src/update/mod.rs @@ -26,7 +26,6 @@ use crate::sound::{ }; use crate::update::latest_information::LatestInformation; use crate::util::Invoke; -use crate::version::{Current, VersionT}; use crate::{Error, ErrorKind, ResultExt, OS_ARCH}; mod latest_information; diff --git a/backend/src/util/io.rs b/backend/src/util/io.rs index ad831d14f..a2fa84342 100644 --- a/backend/src/util/io.rs +++ b/backend/src/util/io.rs @@ -522,7 +522,6 @@ pub fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + S let src_path = e.path(); let dst_path = dst_path.join(e.file_name()); if m.is_file() { - let len = m.len(); let mut dst_file = tokio::fs::File::create(&dst_path).await.with_ctx(|_| { ( crate::ErrorKind::Filesystem, @@ -638,7 +637,7 @@ impl AsyncWrite for TimeoutStream { cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { - let mut this = self.project(); + let this = self.project(); let res = this.stream.poll_write(cx, buf); if res.is_ready() { this.sleep.reset(Instant::now() + *this.timeout); @@ -649,7 +648,7 @@ impl AsyncWrite for TimeoutStream { self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut this = self.project(); + let this = self.project(); let res = this.stream.poll_flush(cx); if res.is_ready() { this.sleep.reset(Instant::now() + *this.timeout); @@ -660,7 +659,7 @@ impl AsyncWrite for TimeoutStream { self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut this = self.project(); + let this = self.project(); let res = this.stream.poll_shutdown(cx); if res.is_ready() { this.sleep.reset(Instant::now() + *this.timeout); diff --git a/backend/src/util/mod.rs b/backend/src/util/mod.rs index dfc1deea2..0c4d66c20 100644 --- a/backend/src/util/mod.rs +++ b/backend/src/util/mod.rs @@ -170,9 +170,7 @@ impl std::io::Write for FmtWriter { } } -pub fn display_none(_: T, _: &ArgMatches) { - () -} +pub fn display_none(_: T, _: &ArgMatches) {} pub struct Container(RwLock>); impl Container { @@ -256,6 +254,29 @@ where } } +pub struct GeneralBoxedGuard(Option>); +impl GeneralBoxedGuard { + pub fn new(f: impl FnOnce() + 'static + Send + Sync) -> Self { + GeneralBoxedGuard(Some(Box::new(f))) + } + + pub fn drop(mut self) { + self.0.take().unwrap()() + } + + pub fn drop_without_action(mut self) { + self.0 = None; + } +} + +impl Drop for GeneralBoxedGuard { + fn drop(&mut self) { + if let Some(destroy) = self.0.take() { + destroy(); + } + } +} + pub struct GeneralGuard T, T = ()>(Option); impl T, T> GeneralGuard { pub fn new(f: F) -> Self { diff --git a/libs/embassy_container_init/src/lib.rs b/libs/embassy_container_init/src/lib.rs index d714b8e82..63d3380a7 100644 --- a/libs/embassy_container_init/src/lib.rs +++ b/libs/embassy_container_init/src/lib.rs @@ -212,24 +212,3 @@ impl RpcMethod for SignalGroup { "signal-group" } } - -#[test] -fn example_echo_line() { - let input = r#"{"id":0,"jsonrpc":"2.0","method":"command","params":{"command":"echo","args":["world I am here"]}}"#; - let new_input = JsonRpc::::maybe_parse(input); - assert!(new_input.is_some()); - assert_eq!(input, &serde_json::to_string(&new_input.unwrap()).unwrap()); -} - -#[test] -fn example_input_line() { - let output = JsonRpc::new(RpcId::UInt(0), Output::Line("world I am here".to_string())); - let output_str = output.maybe_serialize(); - assert!(output_str.is_some()); - let output_str = output_str.unwrap(); - assert_eq!( - &output_str, - r#"{"id":0,"jsonrpc":"2.0","method":"line","params":"world I am here"}"# - ); - assert_eq!(output, serde_json::from_str(&output_str).unwrap()); -} diff --git a/libs/embassy_container_init/src/main.rs b/libs/embassy_container_init/src/main.rs index 17db5d74c..705fa085f 100644 --- a/libs/embassy_container_init/src/main.rs +++ b/libs/embassy_container_init/src/main.rs @@ -5,8 +5,7 @@ use std::process::Stdio; use std::sync::Arc; use embassy_container_init::{ - LogParams, OutputParams, OutputStrategy, ProcessGroupId, ProcessId, ReadLineStderrParams, - ReadLineStdoutParams, RunCommandParams, SendSignalParams, SignalGroupParams, + LogParams, OutputParams, OutputStrategy, ProcessGroupId, ProcessId, RunCommandParams, SendSignalParams, SignalGroupParams, }; use futures::StreamExt; use helpers::NonDetachingJoinHandle; @@ -15,7 +14,7 @@ use nix::sys::signal::Signal; use serde::{Deserialize, Serialize}; use serde_json::json; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{Child, ChildStderr, ChildStdout, Command}; +use tokio::process::{Child, Command}; use tokio::select; use tokio::sync::{watch, Mutex}; use yajrc::{Id, RpcError}; diff --git a/libs/helpers/src/lib.rs b/libs/helpers/src/lib.rs index f20cd400f..d5893b0a1 100644 --- a/libs/helpers/src/lib.rs +++ b/libs/helpers/src/lib.rs @@ -1,6 +1,9 @@ -use std::future::Future; use std::path::{Path, PathBuf}; use std::time::Duration; +use std::{ + future::Future, + ops::{Deref, DerefMut}, +}; use color_eyre::eyre::{eyre, Context, Error}; use futures::future::BoxFuture; @@ -74,6 +77,18 @@ impl From> for NonDetachingJoinHandle { NonDetachingJoinHandle(t) } } + +impl Deref for NonDetachingJoinHandle { + type Target = JoinHandle; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for NonDetachingJoinHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} #[pin_project::pinned_drop] impl PinnedDrop for NonDetachingJoinHandle { fn drop(self: std::pin::Pin<&mut Self>) { diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index 2758c0633..d3af82651 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -145,17 +145,17 @@ impl ModuleLoader for ModsLoader { "file:///deno_global.js" => Ok(ModuleSource::new( ModuleType::JavaScript, FastString::Static("const old_deno = Deno; Deno = null; export default old_deno"), - &*DENO_GLOBAL_JS, + &DENO_GLOBAL_JS, )), "file:///loadModule.js" => Ok(ModuleSource::new( ModuleType::JavaScript, FastString::Static(include_str!("./artifacts/loadModule.js")), - &*LOAD_MODULE_JS, + &LOAD_MODULE_JS, )), "file:///embassy.js" => Ok(ModuleSource::new( ModuleType::JavaScript, self.code.0.clone().into(), - &*EMBASSY_JS, + &EMBASSY_JS, )), x => Err(anyhow!("Not allowed to import: {}", x)), @@ -340,7 +340,7 @@ impl JsExecutionEnvironment { .ops(Self::declarations()) .state(move |state| { state.put(ext_answer_state.clone()); - state.put(js_ctx.clone()); + state.put(js_ctx); }) .build(); @@ -397,7 +397,7 @@ mod fns { }; use helpers::{to_tmp_path, AtomicFile, Rsync, RsyncOptions}; use itertools::Itertools; - use models::{ErrorKind, VolumeId}; + use models::VolumeId; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::io::AsyncWriteExt; @@ -578,7 +578,7 @@ mod fns { } let path_in = path_in.strip_prefix("/").unwrap_or(&path_in); - let new_file = volume_path.join(&path_in); + let new_file = volume_path.join(path_in); let parent_new_file = new_file .parent() .ok_or_else(|| anyhow!("Expecting that file is not root"))?; @@ -706,7 +706,7 @@ mod fns { volume_path.to_string_lossy(), ); } - if let Err(_) = tokio::fs::metadata(&src).await { + if tokio::fs::metadata(&src).await.is_err() { bail!("Source at {} does not exists", src.to_string_lossy()); } @@ -870,11 +870,9 @@ mod fns { let volume_path = { let state = state.borrow(); let ctx: &JsContext = state.borrow(); - let volume_path = ctx - .volumes + ctx.volumes .path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id) - .ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?; - volume_path + .ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))? }; let path_in = path_in.strip_prefix("/").unwrap_or(&path_in); let new_file = volume_path.join(path_in); @@ -945,8 +943,7 @@ mod fns { .stdout, )? .lines() - .skip(1) - .next() + .nth(1) .unwrap_or_default() .parse()?; let used = String::from_utf8( @@ -976,8 +973,7 @@ mod fns { .stdout, )? .lines() - .skip(1) - .next() + .nth(1) .unwrap_or_default() .split_ascii_whitespace() .next_tuple() @@ -994,8 +990,10 @@ mod fns { #[op] async fn log_trace(state: Rc>, input: String) -> Result<(), AnyError> { - let state = state.borrow(); - let ctx = state.borrow::().clone(); + let ctx = { + let state = state.borrow(); + state.borrow::().clone() + }; if let Some(rpc_client) = ctx.container_rpc_client { return rpc_client .request( @@ -1018,8 +1016,10 @@ mod fns { } #[op] async fn log_warn(state: Rc>, input: String) -> Result<(), AnyError> { - let state = state.borrow(); - let ctx = state.borrow::().clone(); + let ctx = { + let state = state.borrow(); + state.borrow::().clone() + }; if let Some(rpc_client) = ctx.container_rpc_client { return rpc_client .request( @@ -1042,8 +1042,10 @@ mod fns { } #[op] async fn log_error(state: Rc>, input: String) -> Result<(), AnyError> { - let state = state.borrow(); - let ctx = state.borrow::().clone(); + let ctx = { + let state = state.borrow(); + state.borrow::().clone() + }; if let Some(rpc_client) = ctx.container_rpc_client { return rpc_client .request( @@ -1066,8 +1068,10 @@ mod fns { } #[op] async fn log_debug(state: Rc>, input: String) -> Result<(), AnyError> { - let state = state.borrow(); - let ctx = state.borrow::().clone(); + let ctx = { + let state = state.borrow(); + state.borrow::().clone() + }; if let Some(rpc_client) = ctx.container_rpc_client { return rpc_client .request( @@ -1090,14 +1094,22 @@ mod fns { } #[op] async fn log_info(state: Rc>, input: String) -> Result<(), AnyError> { - let state = state.borrow(); - let ctx = state.borrow::().clone(); - if let Some(rpc_client) = ctx.container_rpc_client { + let (container_rpc_client, container_process_gid, package_id, run_function) = { + let state = state.borrow(); + let ctx: JsContext = state.borrow::().clone(); + ( + ctx.container_rpc_client, + ctx.container_process_gid, + ctx.package_id, + ctx.run_function, + ) + }; + if let Some(rpc_client) = container_rpc_client { return rpc_client .request( embassy_container_init::Log, embassy_container_init::LogParams { - gid: Some(ctx.container_process_gid), + gid: Some(container_process_gid), level: embassy_container_init::LogLevel::Info(input), }, ) @@ -1105,8 +1117,8 @@ mod fns { .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)); } tracing::info!( - package_id = tracing::field::display(&ctx.package_id), - run_function = tracing::field::display(&ctx.run_function), + package_id = tracing::field::display(&package_id), + run_function = tracing::field::display(&run_function), "{}", input ); diff --git a/system-images/compat/src/config/mod.rs b/system-images/compat/src/config/mod.rs index b80af0cf1..89e4b9282 100644 --- a/system-images/compat/src/config/mod.rs +++ b/system-images/compat/src/config/mod.rs @@ -60,11 +60,7 @@ pub fn validate_configuration( )?; std::fs::rename(config_path.with_extension("tmp"), config_path)?; // return set result - Ok(SetResult { - depends_on, - // sending sigterm so service is restarted - in 0.3.x services, this is whatever signal is needed to send to the process to pick up the configuration - signal: Some(nix::sys::signal::SIGTERM), - }) + Ok(SetResult { depends_on }) } Err(e) => Err(anyhow!("{}", e)), }