clean up transient state on boot (#816)

* Should Work™

* fix remove

* do not sleep on exit 0
This commit is contained in:
Aiden McClelland
2021-11-22 17:59:28 -07:00
parent 3a9bfd08a9
commit 1effb50b79
6 changed files with 199 additions and 54 deletions

View File

@@ -2,7 +2,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -20,14 +20,16 @@ use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
use tracing::instrument;
use crate::core::rpc_continuations::{RequestGuid, RpcContinuation};
use crate::db::model::Database;
use crate::db::model::{Database, InstalledPackageDataEntry, PackageDataEntry};
use crate::hostname::{get_hostname, get_id};
use crate::install::cleanup::{cleanup_failed, uninstall};
use crate::manager::ManagerMap;
use crate::middleware::auth::HashSessionToken;
use crate::net::tor::os_key;
use crate::net::NetController;
use crate::notifications::NotificationManager;
use crate::shutdown::Shutdown;
use crate::status::{MainStatus, Status};
use crate::system::launch_metrics_task;
use crate::util::io::from_toml_async_reader;
use crate::util::logger::EmbassyLogger;
@@ -103,6 +105,7 @@ impl RpcContextConfig {
}
pub struct RpcContextSeed {
is_closed: AtomicBool,
pub bind_rpc: SocketAddr,
pub bind_ws: SocketAddr,
pub bind_static: SocketAddr,
@@ -166,6 +169,7 @@ impl RpcContext {
let notification_manager = NotificationManager::new(secret_store.clone());
tracing::info!("Initialized Notification Manager");
let seed = Arc::new(RpcContextSeed {
is_closed: AtomicBool::new(false),
bind_rpc: base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
bind_ws: base.bind_ws.unwrap_or(([127, 0, 0, 1], 5960).into()),
bind_static: base.bind_static.unwrap_or(([127, 0, 0, 1], 5961).into()),
@@ -199,7 +203,8 @@ impl RpcContext {
.await
});
let res = Self(seed);
tracing::info!("Initialized Package Managers");
res.cleanup().await?;
tracing::info!("Cleaned up transient states");
res.managers
.init(
&res,
@@ -207,7 +212,7 @@ impl RpcContext {
&mut res.secret_store.acquire().await?,
)
.await?;
// TODO: handle apps in bad / transient state
tracing::info!("Initialized Package Managers");
Ok(res)
}
#[instrument(skip(self))]
@@ -238,16 +243,73 @@ impl RpcContext {
#[instrument(skip(self))]
pub async fn shutdown(self) -> Result<(), Error> {
self.managers.empty().await?;
match Arc::try_unwrap(self.0) {
Ok(seed) => {
let RpcContextSeed { secret_store, .. } = seed;
secret_store.close().await;
self.secret_store.close().await;
self.is_closed.store(true, Ordering::SeqCst);
if let Err(ctx) = Arc::try_unwrap(self.0) {
tracing::warn!(
"{} RPC Context(s) are still being held somewhere. This is likely a mistake.",
Arc::strong_count(&ctx) - 1
);
}
Ok(())
}
#[instrument(skip(self))]
pub async fn cleanup(&self) -> Result<(), Error> {
let mut db = self.db.handle();
for package_id in crate::db::DatabaseModel::new()
.package_data()
.keys(&mut db, true)
.await?
{
if let Err(e) = async {
let mut pde = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&package_id)
.expect(&mut db)
.await?
.get_mut(&mut db)
.await?;
match &mut *pde {
PackageDataEntry::Installing { .. }
| PackageDataEntry::Restoring { .. }
| PackageDataEntry::Updating { .. } => {
cleanup_failed(self, &mut db, &package_id).await?;
}
PackageDataEntry::Removing { .. } => {
uninstall(self, &mut db, &package_id).await?;
}
PackageDataEntry::Installed {
installed:
InstalledPackageDataEntry {
status: Status { main, .. },
..
},
..
} => {
let new_main = match std::mem::replace(
main,
MainStatus::Stopped, /* placeholder */
) {
MainStatus::BackingUp { started, health } => {
if let Some(started) = started {
MainStatus::Running { started, health }
} else {
MainStatus::Stopped
}
}
a => a,
};
*main = new_main;
pde.save(&mut db).await?;
}
}
Ok::<_, Error>(())
}
Err(ctx) => {
tracing::warn!(
"{} RPC Context(s) are still being held somewhere. This is likely a mistake.",
Arc::strong_count(&ctx) - 1
);
.await
{
tracing::error!("Failed to clean up package {}: {}", package_id, e);
tracing::debug!("{:?}", e);
}
}
Ok(())
@@ -267,6 +329,9 @@ impl Context for RpcContext {
impl Deref for RpcContext {
type Target = RpcContextSeed;
fn deref(&self) -> &Self::Target {
if self.0.is_closed.load(Ordering::SeqCst) {
panic!("RpcContext used after shutdown!");
}
&*self.0
}
}

View File

@@ -196,6 +196,7 @@ pub enum PackageDataEntry {
Removing {
static_files: StaticFiles,
manifest: Manifest,
removing: InstalledPackageDataEntry,
},
#[serde(rename_all = "kebab-case")]
Installed {
@@ -228,6 +229,9 @@ impl PackageDataEntryModel {
pub fn installed(self) -> OptionModel<InstalledPackageDataEntry> {
self.0.child("installed").into()
}
pub fn removing(self) -> OptionModel<InstalledPackageDataEntry> {
self.0.child("removing").into()
}
pub fn install_progress(self) -> OptionModel<InstallProgress> {
self.0.child("install-progress").into()
}

View File

@@ -59,6 +59,7 @@ pub enum ErrorKind {
DiagnosticMode = 51,
ParseDbField = 52,
Duplicate = 53,
MultipleErrors = 54,
}
impl ErrorKind {
pub fn as_str(&self) -> &'static str {
@@ -117,6 +118,7 @@ impl ErrorKind {
DiagnosticMode => "Embassy is in Diagnostic Mode",
ParseDbField => "Database Field Parse Error",
Duplicate => "Duplication Error",
MultipleErrors => "Multiple Errors",
}
}
}
@@ -239,6 +241,55 @@ impl From<Error> for RpcError {
}
}
#[derive(Debug, Default)]
pub struct ErrorCollection(Vec<Error>);
impl ErrorCollection {
pub fn new() -> Self {
Self::default()
}
pub fn handle<T, E: Into<Error>>(&mut self, result: Result<T, E>) -> Option<T> {
match result {
Ok(a) => Some(a),
Err(e) => {
self.0.push(e.into());
None
}
}
}
pub fn into_result(self) -> Result<(), Error> {
if self.0.is_empty() {
Ok(())
} else {
Err(Error::new(eyre!("{}", self), ErrorKind::MultipleErrors))
}
}
}
impl From<ErrorCollection> for Result<(), Error> {
fn from(e: ErrorCollection) -> Self {
e.into_result()
}
}
impl<T, E: Into<Error>> Extend<Result<T, E>> for ErrorCollection {
fn extend<I: IntoIterator<Item = Result<T, E>>>(&mut self, iter: I) {
for item in iter {
self.handle(item);
}
}
}
impl std::fmt::Display for ErrorCollection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (idx, e) in self.0.iter().enumerate() {
if idx > 0 {
write!(f, "; ")?;
}
write!(f, "{}", e)?;
}
Ok(())
}
}
pub trait ResultExt<T, E>
where
Self: Sized,

View File

@@ -1,14 +1,16 @@
use std::collections::{BTreeMap, HashMap};
use bollard::image::ListImagesOptions;
use color_eyre::eyre::eyre;
use patch_db::{DbHandle, PatchDbHandle};
use tracing::instrument;
use super::{PKG_ARCHIVE_DIR, PKG_DOCKER_DIR};
use crate::context::RpcContext;
use crate::db::model::{CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry};
use crate::error::ErrorCollection;
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::util::{Apply, Version};
use crate::Error;
#[instrument(skip(ctx, db, deps))]
@@ -71,6 +73,7 @@ pub async fn update_dependents<'a, Db: DbHandle, I: IntoIterator<Item = &'a Pack
#[instrument(skip(ctx))]
pub async fn cleanup(ctx: &RpcContext, id: &PackageId, version: &Version) -> Result<(), Error> {
let mut errors = ErrorCollection::new();
ctx.managers.remove(&(id.clone(), version.clone())).await;
// docker images start9/$APP_ID/*:$VERSION -q | xargs docker rmi
let images = ctx
@@ -87,19 +90,24 @@ pub async fn cleanup(ctx: &RpcContext, id: &PackageId, version: &Version) -> Res
},
digests: false,
}))
.await?;
futures::future::try_join_all(images.into_iter().map(|image| async {
let image = image; // move into future
ctx.docker.remove_image(&image.id, None, None).await
}))
.await?;
.await
.apply(|res| errors.handle(res));
errors.extend(
futures::future::join_all(images.into_iter().flatten().map(|image| async {
let image = image; // move into future
ctx.docker.remove_image(&image.id, None, None).await
}))
.await,
);
let pkg_archive_dir = ctx
.datadir
.join(PKG_ARCHIVE_DIR)
.join(id)
.join(version.as_str());
if tokio::fs::metadata(&pkg_archive_dir).await.is_ok() {
tokio::fs::remove_dir_all(&pkg_archive_dir).await?;
tokio::fs::remove_dir_all(&pkg_archive_dir)
.await
.apply(|res| errors.handle(res));
}
let docker_path = ctx
.datadir
@@ -107,11 +115,12 @@ pub async fn cleanup(ctx: &RpcContext, id: &PackageId, version: &Version) -> Res
.join(id)
.join(version.as_str());
if tokio::fs::metadata(&docker_path).await.is_ok() {
tokio::fs::remove_dir_all(&docker_path).await?;
tokio::fs::remove_dir_all(&docker_path)
.await
.apply(|res| errors.handle(res));
}
// TODO: delete public dir if not a dependency
Ok(())
errors.into_result()
}
#[instrument(skip(ctx, db))]
@@ -119,7 +128,6 @@ pub async fn cleanup_failed<Db: DbHandle>(
ctx: &RpcContext,
db: &mut Db,
id: &PackageId,
version: &Version,
) -> Result<(), Error> {
let pde = crate::db::DatabaseModel::new()
.package_data()
@@ -129,21 +137,30 @@ pub async fn cleanup_failed<Db: DbHandle>(
.get(db, true)
.await?
.into_owned();
if match &pde {
PackageDataEntry::Installing { .. } | PackageDataEntry::Restoring { .. } => true,
PackageDataEntry::Updating { manifest, .. } => {
if &manifest.version != version {
true
if let Some(manifest) = match &pde {
PackageDataEntry::Installing { manifest, .. }
| PackageDataEntry::Restoring { manifest, .. } => Some(manifest),
PackageDataEntry::Updating {
manifest,
installed:
InstalledPackageDataEntry {
manifest: installed_manifest,
..
},
..
} => {
if &manifest.version != &installed_manifest.version {
Some(manifest)
} else {
false
None
}
}
_ => {
tracing::warn!("{}: Nothing to clean up!", id);
false
None
}
} {
cleanup(ctx, id, version).await?;
cleanup(ctx, id, &manifest.version).await?;
}
match pde {
@@ -155,7 +172,6 @@ pub async fn cleanup_failed<Db: DbHandle>(
}
PackageDataEntry::Updating {
installed,
manifest,
static_files,
..
} => {
@@ -165,8 +181,8 @@ pub async fn cleanup_failed<Db: DbHandle>(
.put(
db,
&PackageDataEntry::Installed {
manifest: installed.manifest.clone(),
installed,
manifest,
static_files,
},
)
@@ -210,13 +226,26 @@ pub async fn remove_current_dependents<'a, Db: DbHandle, I: IntoIterator<Item =
pub async fn uninstall(
ctx: &RpcContext,
db: &mut PatchDbHandle,
entry: &InstalledPackageDataEntry,
id: &PackageId,
) -> Result<(), Error> {
cleanup(ctx, &entry.manifest.id, &entry.manifest.version).await?;
let mut tx = db.begin().await?;
let entry = crate::db::DatabaseModel::new()
.package_data()
.idx_model(id)
.and_then(|pde| pde.removing())
.get(&mut tx, true)
.await?
.into_owned()
.ok_or_else(|| {
Error::new(
eyre!("Package not in removing state: {}", id),
crate::ErrorKind::NotFound,
)
})?;
cleanup(ctx, &entry.manifest.id, &entry.manifest.version).await?;
crate::db::DatabaseModel::new()
.package_data()
.remove(&mut tx, &entry.manifest.id)
.remove(&mut tx, id)
.await?;
remove_current_dependents(
&mut tx,
@@ -231,12 +260,13 @@ pub async fn uninstall(
entry.current_dependents.keys(),
)
.await?;
tokio::fs::remove_dir_all(
ctx.datadir
.join(crate::volume::PKG_VOLUME_DIR)
.join(&entry.manifest.id),
)
.await?;
let volumes = ctx
.datadir
.join(crate::volume::PKG_VOLUME_DIR)
.join(&entry.manifest.id);
if tokio::fs::metadata(&volumes).await.is_ok() {
tokio::fs::remove_dir_all(&volumes).await?;
}
tx.commit(None).await?;
Ok(())
}

View File

@@ -131,7 +131,7 @@ pub async fn install(
install_progress: progress.clone(),
static_files,
installed,
manifest,
manifest: man.clone(),
})
}
None => {
@@ -422,13 +422,14 @@ pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevisi
*pde = Some(PackageDataEntry::Removing {
manifest,
static_files,
removing: installed,
});
pde.save(&mut tx).await?;
let res = tx.commit(None).await?;
drop(handle);
tokio::spawn(async move {
if let Err(e) = cleanup::uninstall(&ctx, &mut ctx.db.handle(), &installed).await {
if let Err(e) = cleanup::uninstall(&ctx, &mut ctx.db.handle(), &id).await {
let err_str = format!("Uninstall of {} Failed: {}", id, e);
tracing::error!("{}", err_str);
tracing::debug!("{:?}", e);
@@ -524,7 +525,7 @@ pub async fn install_s9pk_or_cleanup<R: AsyncRead + AsyncSeek + Unpin>(
let mut handle = ctx.db.handle();
let mut tx = handle.begin().await?;
if let Err(e) = cleanup_failed(&ctx, &mut tx, pkg_id, version).await {
if let Err(e) = cleanup_failed(&ctx, &mut tx, pkg_id).await {
let mut tx = handle.begin().await?;
tracing::error!(
"Failed to clean up {}@{}: {}: Adding to broken packages",

View File

@@ -394,13 +394,7 @@ async fn manager_thread_loop(mut recv: Receiver<OnStop>, thread_shared: &Arc<Man
}
}
match run_main(&thread_shared).await {
Ok(Ok(NoOutput)) => {
thread_shared
.on_stop
.send(OnStop::Sleep)
.map_err(|_| ())
.unwrap(); // recv is still in scope, cannot fail
}
Ok(Ok(NoOutput)) => (), // restart
Ok(Err(e)) => {
let res = thread_shared.ctx.notification_manager
.notify(