adds recovery (#731)

* should work

* works for real

* fix build
This commit is contained in:
Aiden McClelland
2021-10-27 15:50:53 -06:00
parent d0e8211b24
commit 1e7492e915
27 changed files with 694 additions and 598 deletions

View File

@@ -4,8 +4,7 @@ use bollard::image::ListImagesOptions;
use patch_db::{DbHandle, PatchDbHandle};
use tracing::instrument;
use super::PKG_ARCHIVE_DIR;
use super::PKG_DOCKER_DIR;
use super::{PKG_ARCHIVE_DIR, PKG_DOCKER_DIR};
use crate::context::RpcContext;
use crate::db::model::{CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry};
use crate::s9pk::manifest::PackageId;
@@ -131,7 +130,7 @@ pub async fn cleanup_failed<Db: DbHandle>(
.await?
.into_owned();
if match &pde {
PackageDataEntry::Installing { .. } => true,
PackageDataEntry::Installing { .. } | PackageDataEntry::Restoring { .. } => true,
PackageDataEntry::Updating { manifest, .. } => {
if &manifest.version != version {
true
@@ -148,7 +147,7 @@ pub async fn cleanup_failed<Db: DbHandle>(
}
match pde {
PackageDataEntry::Installing { .. } => {
PackageDataEntry::Installing { .. } | PackageDataEntry::Restoring { .. } => {
crate::db::DatabaseModel::new()
.package_data()
.remove(db, id)

View File

@@ -22,8 +22,8 @@ use tracing::instrument;
use self::cleanup::cleanup_failed;
use crate::context::RpcContext;
use crate::db::model::{
CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry, StaticDependencyInfo,
StaticFiles,
CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry, RecoveredPackageInfo,
StaticDependencyInfo, StaticFiles,
};
use crate::db::util::WithRevision;
use crate::dependencies::{
@@ -214,7 +214,7 @@ pub async fn uninstall_impl(ctx: RpcContext, id: PackageId) -> Result<WithRevisi
})
}
#[instrument(skip(ctx))]
#[instrument(skip(ctx, temp_manifest))]
pub async fn download_install_s9pk(
ctx: &RpcContext,
temp_manifest: &Manifest,
@@ -235,58 +235,64 @@ pub async fn download_install_s9pk(
.package_data()
.idx_model(pkg_id);
let res = (|| async {
let progress = InstallProgress::new(s9pk.content_length());
let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress());
let progress = InstallProgress::new(s9pk.content_length());
let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress());
File::delete(&pkg_cache).await?;
let mut dst = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&pkg_cache)
File::delete(&pkg_cache).await?;
let mut dst = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(&pkg_cache)
.await?;
progress
.track_download_during(progress_model.clone(), &ctx.db, || async {
let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone());
tokio::io::copy(
&mut tokio_util::io::StreamReader::new(s9pk.bytes_stream().map_err(|e| {
std::io::Error::new(
if e.is_connect() {
std::io::ErrorKind::ConnectionRefused
} else if e.is_timeout() {
std::io::ErrorKind::TimedOut
} else {
std::io::ErrorKind::Other
},
e,
)
})),
&mut progress_writer,
)
.await?;
progress.download_complete();
Ok(())
})
.await?;
progress
.track_download_during(progress_model.clone(), &ctx.db, || async {
let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone());
tokio::io::copy(
&mut tokio_util::io::StreamReader::new(s9pk.bytes_stream().map_err(|e| {
std::io::Error::new(
if e.is_connect() {
std::io::ErrorKind::ConnectionRefused
} else if e.is_timeout() {
std::io::ErrorKind::TimedOut
} else {
std::io::ErrorKind::Other
},
e,
)
})),
&mut progress_writer,
)
.await?;
progress.download_complete();
Ok(())
})
.await?;
dst.seek(SeekFrom::Start(0)).await?;
dst.seek(SeekFrom::Start(0)).await?;
let progress_reader = InstallProgressTracker::new(dst, progress.clone());
let mut s9pk_reader = progress
.track_read_during(progress_model.clone(), &ctx.db, || {
S9pkReader::from_reader(progress_reader, true)
})
.await?;
let progress_reader = InstallProgressTracker::new(dst, progress.clone());
let mut s9pk_reader = progress
.track_read_during(progress_model.clone(), &ctx.db, || {
S9pkReader::from_reader(progress_reader)
})
.await?;
install_s9pk_or_cleanup(&ctx, pkg_id, version, &mut s9pk_reader, progress).await?;
install_s9pk(&ctx, pkg_id, version, &mut s9pk_reader, progress).await?;
Ok(())
}
Ok(())
})()
.await;
if let Err(e) = res {
#[instrument(skip(ctx, rdr))]
pub async fn install_s9pk_or_cleanup<R: AsyncRead + AsyncSeek + Unpin>(
ctx: &RpcContext,
pkg_id: &PackageId,
version: &Version,
rdr: &mut S9pkReader<InstallProgressTracker<R>>,
progress: Arc<InstallProgress>,
) -> Result<(), Error> {
if let Err(e) = install_s9pk(ctx, pkg_id, version, rdr, progress).await {
let mut handle = ctx.db.handle();
let mut tx = handle.begin().await?;
@@ -325,6 +331,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
) -> Result<(), Error> {
rdr.validate().await?;
rdr.validated();
rdr.reset().await?;
let model = crate::db::DatabaseModel::new()
.package_data()
.idx_model(pkg_id);
@@ -672,16 +679,6 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
..
} = prev
{
update_dependents(
ctx,
&mut tx,
pkg_id,
current_dependents
.keys()
.chain(prev.current_dependents.keys())
.collect::<BTreeSet<_>>(),
)
.await?;
let mut configured = prev.status.configured;
if let Some(res) = prev_manifest
.migrations
@@ -739,17 +736,48 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
*main_status = prev.status.main;
main_status.save(&mut tx).await?;
}
update_dependents(
ctx,
&mut tx,
pkg_id,
current_dependents
.keys()
.chain(prev.current_dependents.keys())
.collect::<BTreeSet<_>>(),
)
.await?;
} else if let PackageDataEntry::Restoring { .. } = prev {
manifest
.backup
.restore(
ctx,
&mut tx,
&mut sql_tx,
pkg_id,
version,
&manifest.interfaces,
&manifest.volumes,
)
.await?;
update_dependents(ctx, &mut tx, pkg_id, current_dependents.keys()).await?;
} else if let Some(recovered) = crate::db::DatabaseModel::new()
.recovered_packages()
.idx_model(pkg_id)
.get(&mut tx, true)
.await?
.into_owned()
{
handle_recovered_package(recovered, manifest, ctx, pkg_id, version, &mut tx).await?;
update_dependents(ctx, &mut tx, pkg_id, current_dependents.keys()).await?;
} else {
update_dependents(ctx, &mut tx, pkg_id, current_dependents.keys()).await?;
let recovered = crate::db::DatabaseModel::new()
.recovered_packages()
.idx_model(pkg_id)
.get(&mut tx, true)
.await?
.into_owned();
handle_recovered_package(recovered, manifest, ctx, pkg_id, version, &mut tx).await?;
}
crate::db::DatabaseModel::new()
.recovered_packages()
.remove(&mut tx, pkg_id)
.await?;
sql_tx.commit().await?;
tx.commit(None).await?;
@@ -760,41 +788,37 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
#[instrument(skip(ctx, tx))]
async fn handle_recovered_package(
recovered: Option<crate::db::model::RecoveredPackageInfo>,
recovered: RecoveredPackageInfo,
manifest: Manifest,
ctx: &RpcContext,
pkg_id: &PackageId,
version: &Version,
tx: &mut patch_db::Transaction<&mut patch_db::PatchDbHandle>,
) -> Result<(), Error> {
Ok(if let Some(recovered) = recovered {
let configured = if let Some(res) = manifest
.migrations
.from(ctx, &recovered.version, pkg_id, version, &manifest.volumes)
.await?
{
res.configured
} else {
false
};
if configured {
crate::config::configure(
ctx,
tx,
pkg_id,
None,
&None,
false,
&mut BTreeMap::new(),
&mut BTreeMap::new(),
)
.await?;
}
crate::db::DatabaseModel::new()
.recovered_packages()
.remove(tx, pkg_id)
.await?
})
let configured = if let Some(res) = manifest
.migrations
.from(ctx, &recovered.version, pkg_id, version, &manifest.volumes)
.await?
{
res.configured
} else {
false
};
if configured {
crate::config::configure(
ctx,
tx,
pkg_id,
None,
&None,
false,
&mut BTreeMap::new(),
&mut BTreeMap::new(),
)
.await?;
}
Ok(())
}
#[instrument(skip(datadir))]