From 3372da9f8010d4bd095a5e3ecb1f2e764d0bc202 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Thu, 20 Jan 2022 17:43:03 -0700 Subject: [PATCH] include download errors in error handler --- appmgr/src/install/mod.rs | 104 ++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 54 deletions(-) diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index ad68a7d8c..291661981 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -114,6 +114,13 @@ pub async fn install( .with_kind(crate::ErrorKind::Registry)?; let man: Manifest = man_res.json().await.with_kind(crate::ErrorKind::Registry)?; + if man.id.as_str() != id || !man.version.satisfies(&version) { + return Err(Error::new( + eyre!("Fetched package does not match requested id and version"), + ErrorKind::Registry, + )); + } + let progress = InstallProgress::new(s9pk.content_length()); let static_files = StaticFiles::remote(&man.id, &man.version); let mut db_handle = ctx.db.handle(); @@ -305,11 +312,11 @@ pub async fn sideload( }; // gc the map let mut guard = ctx.rpc_stream_continuations.lock().await; - let gced = std::mem::take(&mut *guard) + let garbage_collected = std::mem::take(&mut *guard) .into_iter() .filter(|(_, v)| v.created_at.elapsed() < Duration::from_secs(30)) .collect::>(); - *guard = gced; + *guard = garbage_collected; drop(guard); // insert the new continuation ctx.rpc_stream_continuations @@ -488,70 +495,59 @@ pub async fn download_install_s9pk( let pkg_id = &temp_manifest.id; let version = &temp_manifest.version; - let pkg_archive_dir = ctx - .datadir - .join(PKG_ARCHIVE_DIR) - .join(pkg_id) - .join(version.as_str()); - tokio::fs::create_dir_all(&pkg_archive_dir).await?; - let pkg_archive = pkg_archive_dir.join(AsRef::::as_ref(pkg_id).with_extension("s9pk")); + if let Err(e) = async { + let pkg_archive_dir = ctx + .datadir + .join(PKG_ARCHIVE_DIR) + .join(pkg_id) + .join(version.as_str()); + tokio::fs::create_dir_all(&pkg_archive_dir).await?; + let pkg_archive = + pkg_archive_dir.join(AsRef::::as_ref(pkg_id).with_extension("s9pk")); - let pkg_data_entry = crate::db::DatabaseModel::new() - .package_data() - .idx_model(pkg_id); + let pkg_data_entry = crate::db::DatabaseModel::new() + .package_data() + .idx_model(pkg_id); - let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress()); + let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress()); - File::delete(&pkg_archive).await?; - let mut dst = OpenOptions::new() - .create(true) - .write(true) - .read(true) - .open(&pkg_archive) - .await?; + File::delete(&pkg_archive).await?; + let mut dst = OpenOptions::new() + .create(true) + .write(true) + .read(true) + .open(&pkg_archive) + .await?; - progress - .track_download_during(progress_model.clone(), &ctx.db, || async { - let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone()); - tokio::io::copy(&mut s9pk, &mut progress_writer).await?; - progress.download_complete(); - Ok(()) - }) - .await?; + progress + .track_download_during(progress_model.clone(), &ctx.db, || async { + let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone()); + tokio::io::copy(&mut s9pk, &mut progress_writer).await?; + progress.download_complete(); + Ok(()) + }) + .await?; - dst.seek(SeekFrom::Start(0)).await?; + dst.seek(SeekFrom::Start(0)).await?; - let progress_reader = InstallProgressTracker::new(dst, progress.clone()); - let mut s9pk_reader = progress - .track_read_during(progress_model.clone(), &ctx.db, || { - S9pkReader::from_reader(progress_reader, true) - }) - .await?; + let progress_reader = InstallProgressTracker::new(dst, progress.clone()); + let mut s9pk_reader = progress + .track_read_during(progress_model.clone(), &ctx.db, || { + S9pkReader::from_reader(progress_reader, true) + }) + .await?; - install_s9pk_or_cleanup(&ctx, pkg_id, version, &mut s9pk_reader, progress).await?; + install_s9pk(&ctx, pkg_id, version, &mut s9pk_reader, progress).await?; - Ok(()) -} - -#[instrument(skip(ctx, rdr))] -pub async fn install_s9pk_or_cleanup( - ctx: &RpcContext, - pkg_id: &PackageId, - version: &Version, - rdr: &mut S9pkReader>, - progress: Arc, -) -> Result<(), Error> { - if let Err(e) = install_s9pk(ctx, pkg_id, version, rdr, progress).await { + Ok(()) + } + .await + { let mut handle = ctx.db.handle(); let mut tx = handle.begin().await?; if let Err(e) = cleanup_failed(&ctx, &mut tx, pkg_id).await { - tracing::error!( - "Failed to clean up {}@{}: {}: Adding to broken packages", - pkg_id, - version, - e - ); + tracing::error!("Failed to clean up {}@{}: {}", pkg_id, version, e); tracing::debug!("{:?}", e); } else { tx.commit(None).await?;