include download errors in error handler

This commit is contained in:
Aiden McClelland
2022-01-20 17:43:03 -07:00
committed by Aiden McClelland
parent 0b1f544fd7
commit 3372da9f80

View File

@@ -114,6 +114,13 @@ pub async fn install(
.with_kind(crate::ErrorKind::Registry)?; .with_kind(crate::ErrorKind::Registry)?;
let man: Manifest = man_res.json().await.with_kind(crate::ErrorKind::Registry)?; let man: Manifest = man_res.json().await.with_kind(crate::ErrorKind::Registry)?;
if man.id.as_str() != id || !man.version.satisfies(&version) {
return Err(Error::new(
eyre!("Fetched package does not match requested id and version"),
ErrorKind::Registry,
));
}
let progress = InstallProgress::new(s9pk.content_length()); let progress = InstallProgress::new(s9pk.content_length());
let static_files = StaticFiles::remote(&man.id, &man.version); let static_files = StaticFiles::remote(&man.id, &man.version);
let mut db_handle = ctx.db.handle(); let mut db_handle = ctx.db.handle();
@@ -305,11 +312,11 @@ pub async fn sideload(
}; };
// gc the map // gc the map
let mut guard = ctx.rpc_stream_continuations.lock().await; let mut guard = ctx.rpc_stream_continuations.lock().await;
let gced = std::mem::take(&mut *guard) let garbage_collected = std::mem::take(&mut *guard)
.into_iter() .into_iter()
.filter(|(_, v)| v.created_at.elapsed() < Duration::from_secs(30)) .filter(|(_, v)| v.created_at.elapsed() < Duration::from_secs(30))
.collect::<BTreeMap<RequestGuid, RpcContinuation>>(); .collect::<BTreeMap<RequestGuid, RpcContinuation>>();
*guard = gced; *guard = garbage_collected;
drop(guard); drop(guard);
// insert the new continuation // insert the new continuation
ctx.rpc_stream_continuations ctx.rpc_stream_continuations
@@ -488,70 +495,59 @@ pub async fn download_install_s9pk(
let pkg_id = &temp_manifest.id; let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version; let version = &temp_manifest.version;
let pkg_archive_dir = ctx if let Err(e) = async {
.datadir let pkg_archive_dir = ctx
.join(PKG_ARCHIVE_DIR) .datadir
.join(pkg_id) .join(PKG_ARCHIVE_DIR)
.join(version.as_str()); .join(pkg_id)
tokio::fs::create_dir_all(&pkg_archive_dir).await?; .join(version.as_str());
let pkg_archive = pkg_archive_dir.join(AsRef::<Path>::as_ref(pkg_id).with_extension("s9pk")); tokio::fs::create_dir_all(&pkg_archive_dir).await?;
let pkg_archive =
pkg_archive_dir.join(AsRef::<Path>::as_ref(pkg_id).with_extension("s9pk"));
let pkg_data_entry = crate::db::DatabaseModel::new() let pkg_data_entry = crate::db::DatabaseModel::new()
.package_data() .package_data()
.idx_model(pkg_id); .idx_model(pkg_id);
let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress()); let progress_model = pkg_data_entry.and_then(|pde| pde.install_progress());
File::delete(&pkg_archive).await?; File::delete(&pkg_archive).await?;
let mut dst = OpenOptions::new() let mut dst = OpenOptions::new()
.create(true) .create(true)
.write(true) .write(true)
.read(true) .read(true)
.open(&pkg_archive) .open(&pkg_archive)
.await?; .await?;
progress progress
.track_download_during(progress_model.clone(), &ctx.db, || async { .track_download_during(progress_model.clone(), &ctx.db, || async {
let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone()); let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone());
tokio::io::copy(&mut s9pk, &mut progress_writer).await?; tokio::io::copy(&mut s9pk, &mut progress_writer).await?;
progress.download_complete(); progress.download_complete();
Ok(()) Ok(())
}) })
.await?; .await?;
dst.seek(SeekFrom::Start(0)).await?; dst.seek(SeekFrom::Start(0)).await?;
let progress_reader = InstallProgressTracker::new(dst, progress.clone()); let progress_reader = InstallProgressTracker::new(dst, progress.clone());
let mut s9pk_reader = progress let mut s9pk_reader = progress
.track_read_during(progress_model.clone(), &ctx.db, || { .track_read_during(progress_model.clone(), &ctx.db, || {
S9pkReader::from_reader(progress_reader, true) S9pkReader::from_reader(progress_reader, true)
}) })
.await?; .await?;
install_s9pk_or_cleanup(&ctx, pkg_id, version, &mut s9pk_reader, progress).await?; install_s9pk(&ctx, pkg_id, version, &mut s9pk_reader, progress).await?;
Ok(()) Ok(())
} }
.await
#[instrument(skip(ctx, rdr))] {
pub async fn install_s9pk_or_cleanup<R: AsyncRead + AsyncSeek + Unpin>(
ctx: &RpcContext,
pkg_id: &PackageId,
version: &Version,
rdr: &mut S9pkReader<InstallProgressTracker<R>>,
progress: Arc<InstallProgress>,
) -> Result<(), Error> {
if let Err(e) = install_s9pk(ctx, pkg_id, version, rdr, progress).await {
let mut handle = ctx.db.handle(); let mut handle = ctx.db.handle();
let mut tx = handle.begin().await?; let mut tx = handle.begin().await?;
if let Err(e) = cleanup_failed(&ctx, &mut tx, pkg_id).await { if let Err(e) = cleanup_failed(&ctx, &mut tx, pkg_id).await {
tracing::error!( tracing::error!("Failed to clean up {}@{}: {}", pkg_id, version, e);
"Failed to clean up {}@{}: {}: Adding to broken packages",
pkg_id,
version,
e
);
tracing::debug!("{:?}", e); tracing::debug!("{:?}", e);
} else { } else {
tx.commit(None).await?; tx.commit(None).await?;