From 1effb50b796de2fec793ebe2188d817b837b4223 Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Mon, 22 Nov 2021 17:59:28 -0700 Subject: [PATCH] clean up transient state on boot (#816) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Should Work™ * fix remove * do not sleep on exit 0 --- appmgr/src/context/rpc.rs | 91 +++++++++++++++++++++++++++++----- appmgr/src/db/model.rs | 4 ++ appmgr/src/error.rs | 51 +++++++++++++++++++ appmgr/src/install/cleanup.rs | 92 +++++++++++++++++++++++------------ appmgr/src/install/mod.rs | 7 +-- appmgr/src/manager/mod.rs | 8 +-- 6 files changed, 199 insertions(+), 54 deletions(-) diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index f003e0f73..848dd432d 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::ops::Deref; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -20,14 +20,16 @@ use tokio::sync::{broadcast, oneshot, Mutex, RwLock}; use tracing::instrument; use crate::core::rpc_continuations::{RequestGuid, RpcContinuation}; -use crate::db::model::Database; +use crate::db::model::{Database, InstalledPackageDataEntry, PackageDataEntry}; use crate::hostname::{get_hostname, get_id}; +use crate::install::cleanup::{cleanup_failed, uninstall}; use crate::manager::ManagerMap; use crate::middleware::auth::HashSessionToken; use crate::net::tor::os_key; use crate::net::NetController; use crate::notifications::NotificationManager; use crate::shutdown::Shutdown; +use crate::status::{MainStatus, Status}; use crate::system::launch_metrics_task; use crate::util::io::from_toml_async_reader; use crate::util::logger::EmbassyLogger; @@ -103,6 +105,7 @@ impl RpcContextConfig { } pub struct RpcContextSeed { + is_closed: AtomicBool, pub bind_rpc: SocketAddr, pub bind_ws: SocketAddr, pub bind_static: SocketAddr, @@ -166,6 +169,7 @@ impl RpcContext { let notification_manager = NotificationManager::new(secret_store.clone()); tracing::info!("Initialized Notification Manager"); let seed = Arc::new(RpcContextSeed { + is_closed: AtomicBool::new(false), bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()), bind_static: base.bind_static.unwrap_or(([127, 0, 0, 1], 5961).into()), @@ -199,7 +203,8 @@ impl RpcContext { .await }); let res = Self(seed); - tracing::info!("Initialized Package Managers"); + res.cleanup().await?; + tracing::info!("Cleaned up transient states"); res.managers .init( &res, @@ -207,7 +212,7 @@ impl RpcContext { &mut res.secret_store.acquire().await?, ) .await?; - // TODO: handle apps in bad / transient state + tracing::info!("Initialized Package Managers"); Ok(res) } #[instrument(skip(self))] @@ -238,16 +243,73 @@ impl RpcContext { #[instrument(skip(self))] pub async fn shutdown(self) -> Result<(), Error> { self.managers.empty().await?; - match Arc::try_unwrap(self.0) { - Ok(seed) => { - let RpcContextSeed { secret_store, .. } = seed; - secret_store.close().await; + self.secret_store.close().await; + self.is_closed.store(true, Ordering::SeqCst); + if let Err(ctx) = Arc::try_unwrap(self.0) { + tracing::warn!( + "{} RPC Context(s) are still being held somewhere. This is likely a mistake.", + Arc::strong_count(&ctx) - 1 + ); + } + Ok(()) + } + #[instrument(skip(self))] + pub async fn cleanup(&self) -> Result<(), Error> { + let mut db = self.db.handle(); + for package_id in crate::db::DatabaseModel::new() + .package_data() + .keys(&mut db, true) + .await? + { + if let Err(e) = async { + let mut pde = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&package_id) + .expect(&mut db) + .await? + .get_mut(&mut db) + .await?; + match &mut *pde { + PackageDataEntry::Installing { .. } + | PackageDataEntry::Restoring { .. } + | PackageDataEntry::Updating { .. } => { + cleanup_failed(self, &mut db, &package_id).await?; + } + PackageDataEntry::Removing { .. } => { + uninstall(self, &mut db, &package_id).await?; + } + PackageDataEntry::Installed { + installed: + InstalledPackageDataEntry { + status: Status { main, .. }, + .. + }, + .. + } => { + let new_main = match std::mem::replace( + main, + MainStatus::Stopped, /* placeholder */ + ) { + MainStatus::BackingUp { started, health } => { + if let Some(started) = started { + MainStatus::Running { started, health } + } else { + MainStatus::Stopped + } + } + a => a, + }; + *main = new_main; + + pde.save(&mut db).await?; + } + } + Ok::<_, Error>(()) } - Err(ctx) => { - tracing::warn!( - "{} RPC Context(s) are still being held somewhere. This is likely a mistake.", - Arc::strong_count(&ctx) - 1 - ); + .await + { + tracing::error!("Failed to clean up package {}: {}", package_id, e); + tracing::debug!("{:?}", e); } } Ok(()) @@ -267,6 +329,9 @@ impl Context for RpcContext { impl Deref for RpcContext { type Target = RpcContextSeed; fn deref(&self) -> &Self::Target { + if self.0.is_closed.load(Ordering::SeqCst) { + panic!("RpcContext used after shutdown!"); + } &*self.0 } } diff --git a/appmgr/src/db/model.rs b/appmgr/src/db/model.rs index 51f5b1a72..22c330026 100644 --- a/appmgr/src/db/model.rs +++ b/appmgr/src/db/model.rs @@ -196,6 +196,7 @@ pub enum PackageDataEntry { Removing { static_files: StaticFiles, manifest: Manifest, + removing: InstalledPackageDataEntry, }, #[serde(rename_all = "kebab-case")] Installed { @@ -228,6 +229,9 @@ impl PackageDataEntryModel { pub fn installed(self) -> OptionModel { self.0.child("installed").into() } + pub fn removing(self) -> OptionModel { + self.0.child("removing").into() + } pub fn install_progress(self) -> OptionModel { self.0.child("install-progress").into() } diff --git a/appmgr/src/error.rs b/appmgr/src/error.rs index 30b55b8dc..f2f270907 100644 --- a/appmgr/src/error.rs +++ b/appmgr/src/error.rs @@ -59,6 +59,7 @@ pub enum ErrorKind { DiagnosticMode = 51, ParseDbField = 52, Duplicate = 53, + MultipleErrors = 54, } impl ErrorKind { pub fn as_str(&self) -> &'static str { @@ -117,6 +118,7 @@ impl ErrorKind { DiagnosticMode => "Embassy is in Diagnostic Mode", ParseDbField => "Database Field Parse Error", Duplicate => "Duplication Error", + MultipleErrors => "Multiple Errors", } } } @@ -239,6 +241,55 @@ impl From for RpcError { } } +#[derive(Debug, Default)] +pub struct ErrorCollection(Vec); +impl ErrorCollection { + pub fn new() -> Self { + Self::default() + } + + pub fn handle>(&mut self, result: Result) -> Option { + match result { + Ok(a) => Some(a), + Err(e) => { + self.0.push(e.into()); + None + } + } + } + + pub fn into_result(self) -> Result<(), Error> { + if self.0.is_empty() { + Ok(()) + } else { + Err(Error::new(eyre!("{}", self), ErrorKind::MultipleErrors)) + } + } +} +impl From for Result<(), Error> { + fn from(e: ErrorCollection) -> Self { + e.into_result() + } +} +impl> Extend> for ErrorCollection { + fn extend>>(&mut self, iter: I) { + for item in iter { + self.handle(item); + } + } +} +impl std::fmt::Display for ErrorCollection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for (idx, e) in self.0.iter().enumerate() { + if idx > 0 { + write!(f, "; ")?; + } + write!(f, "{}", e)?; + } + Ok(()) + } +} + pub trait ResultExt where Self: Sized, diff --git a/appmgr/src/install/cleanup.rs b/appmgr/src/install/cleanup.rs index 073dad840..99ec32e52 100644 --- a/appmgr/src/install/cleanup.rs +++ b/appmgr/src/install/cleanup.rs @@ -1,14 +1,16 @@ use std::collections::{BTreeMap, HashMap}; use bollard::image::ListImagesOptions; +use color_eyre::eyre::eyre; use patch_db::{DbHandle, PatchDbHandle}; use tracing::instrument; use super::{PKG_ARCHIVE_DIR, PKG_DOCKER_DIR}; use crate::context::RpcContext; use crate::db::model::{CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry}; +use crate::error::ErrorCollection; use crate::s9pk::manifest::PackageId; -use crate::util::Version; +use crate::util::{Apply, Version}; use crate::Error; #[instrument(skip(ctx, db, deps))] @@ -71,6 +73,7 @@ pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator Result<(), Error> { + let mut errors = ErrorCollection::new(); ctx.managers.remove(&(id.clone(), version.clone())).await; // docker images start9/$APP_ID/*:$VERSION -q | xargs docker rmi let images = ctx @@ -87,19 +90,24 @@ pub async fn cleanup(ctx: &RpcContext, id: &PackageId, version: &Version) -> Res }, digests: false, })) - .await?; - futures::future::try_join_all(images.into_iter().map(|image| async { - let image = image; // move into future - ctx.docker.remove_image(&image.id, None, None).await - })) - .await?; + .await + .apply(|res| errors.handle(res)); + errors.extend( + futures::future::join_all(images.into_iter().flatten().map(|image| async { + let image = image; // move into future + ctx.docker.remove_image(&image.id, None, None).await + })) + .await, + ); let pkg_archive_dir = ctx .datadir .join(PKG_ARCHIVE_DIR) .join(id) .join(version.as_str()); if tokio::fs::metadata(&pkg_archive_dir).await.is_ok() { - tokio::fs::remove_dir_all(&pkg_archive_dir).await?; + tokio::fs::remove_dir_all(&pkg_archive_dir) + .await + .apply(|res| errors.handle(res)); } let docker_path = ctx .datadir @@ -107,11 +115,12 @@ pub async fn cleanup(ctx: &RpcContext, id: &PackageId, version: &Version) -> Res .join(id) .join(version.as_str()); if tokio::fs::metadata(&docker_path).await.is_ok() { - tokio::fs::remove_dir_all(&docker_path).await?; + tokio::fs::remove_dir_all(&docker_path) + .await + .apply(|res| errors.handle(res)); } - // TODO: delete public dir if not a dependency - Ok(()) + errors.into_result() } #[instrument(skip(ctx, db))] @@ -119,7 +128,6 @@ pub async fn cleanup_failed( ctx: &RpcContext, db: &mut Db, id: &PackageId, - version: &Version, ) -> Result<(), Error> { let pde = crate::db::DatabaseModel::new() .package_data() @@ -129,21 +137,30 @@ pub async fn cleanup_failed( .get(db, true) .await? .into_owned(); - if match &pde { - PackageDataEntry::Installing { .. } | PackageDataEntry::Restoring { .. } => true, - PackageDataEntry::Updating { manifest, .. } => { - if &manifest.version != version { - true + if let Some(manifest) = match &pde { + PackageDataEntry::Installing { manifest, .. } + | PackageDataEntry::Restoring { manifest, .. } => Some(manifest), + PackageDataEntry::Updating { + manifest, + installed: + InstalledPackageDataEntry { + manifest: installed_manifest, + .. + }, + .. + } => { + if &manifest.version != &installed_manifest.version { + Some(manifest) } else { - false + None } } _ => { tracing::warn!("{}: Nothing to clean up!", id); - false + None } } { - cleanup(ctx, id, version).await?; + cleanup(ctx, id, &manifest.version).await?; } match pde { @@ -155,7 +172,6 @@ pub async fn cleanup_failed( } PackageDataEntry::Updating { installed, - manifest, static_files, .. } => { @@ -165,8 +181,8 @@ pub async fn cleanup_failed( .put( db, &PackageDataEntry::Installed { + manifest: installed.manifest.clone(), installed, - manifest, static_files, }, ) @@ -210,13 +226,26 @@ pub async fn remove_current_dependents<'a, Db: DbHandle, I: IntoIterator Result<(), Error> { - cleanup(ctx, &entry.manifest.id, &entry.manifest.version).await?; let mut tx = db.begin().await?; + let entry = crate::db::DatabaseModel::new() + .package_data() + .idx_model(id) + .and_then(|pde| pde.removing()) + .get(&mut tx, true) + .await? + .into_owned() + .ok_or_else(|| { + Error::new( + eyre!("Package not in removing state: {}", id), + crate::ErrorKind::NotFound, + ) + })?; + cleanup(ctx, &entry.manifest.id, &entry.manifest.version).await?; crate::db::DatabaseModel::new() .package_data() - .remove(&mut tx, &entry.manifest.id) + .remove(&mut tx, id) .await?; remove_current_dependents( &mut tx, @@ -231,12 +260,13 @@ pub async fn uninstall( entry.current_dependents.keys(), ) .await?; - tokio::fs::remove_dir_all( - ctx.datadir - .join(crate::volume::PKG_VOLUME_DIR) - .join(&entry.manifest.id), - ) - .await?; + let volumes = ctx + .datadir + .join(crate::volume::PKG_VOLUME_DIR) + .join(&entry.manifest.id); + if tokio::fs::metadata(&volumes).await.is_ok() { + tokio::fs::remove_dir_all(&volumes).await?; + } tx.commit(None).await?; Ok(()) } diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index 3cc5cd2a1..4b4c99ce0 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -131,7 +131,7 @@ pub async fn install( install_progress: progress.clone(), static_files, installed, - manifest, + manifest: man.clone(), }) } None => { @@ -422,13 +422,14 @@ pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result( let mut handle = ctx.db.handle(); let mut tx = handle.begin().await?; - if let Err(e) = cleanup_failed(&ctx, &mut tx, pkg_id, version).await { + if let Err(e) = cleanup_failed(&ctx, &mut tx, pkg_id).await { let mut tx = handle.begin().await?; tracing::error!( "Failed to clean up {}@{}: {}: Adding to broken packages", diff --git a/appmgr/src/manager/mod.rs b/appmgr/src/manager/mod.rs index 8ddbcad78..8ad083974 100644 --- a/appmgr/src/manager/mod.rs +++ b/appmgr/src/manager/mod.rs @@ -394,13 +394,7 @@ async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc { - thread_shared - .on_stop - .send(OnStop::Sleep) - .map_err(|_| ()) - .unwrap(); // recv is still in scope, cannot fail - } + Ok(Ok(NoOutput)) => (), // restart Ok(Err(e)) => { let res = thread_shared.ctx.notification_manager .notify(