don't wait for install to complete on sideload

This commit is contained in:
Aiden McClelland
2023-06-14 10:44:34 -06:00
committed by Aiden McClelland
parent 88869e9710
commit f29c7ba4f2
2 changed files with 52 additions and 39 deletions

View File

@@ -443,7 +443,7 @@ async fn restore_package<'a>(
Ok(( Ok((
progress.clone(), progress.clone(),
async move { async move {
download_install_s9pk(&ctx, &manifest, None, progress, file).await?; download_install_s9pk(&ctx, &manifest, None, progress, file, None).await?;
guard.unmount().await?; guard.unmount().await?;

View File

@@ -21,6 +21,7 @@ use rpc_toolkit::yajrc::RpcError;
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt}; use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReadDirStream; use tokio_stream::wrappers::ReadDirStream;
use tracing::instrument; use tracing::instrument;
@@ -297,6 +298,7 @@ pub async fn install(
Some(marketplace_url), Some(marketplace_url),
InstallProgress::new(s9pk.content_length()), InstallProgress::new(s9pk.content_length()),
response_to_reader(s9pk), response_to_reader(s9pk),
None,
) )
.await .await
{ {
@@ -425,47 +427,54 @@ pub async fn sideload(
pde.save(&mut tx).await?; pde.save(&mut tx).await?;
tx.commit().await?; tx.commit().await?;
if let Err(e) = download_install_s9pk( let (send, recv) = oneshot::channel();
&new_ctx,
&manifest, tokio::spawn(async move {
None, if let Err(e) = download_install_s9pk(
progress, &new_ctx,
tokio_util::io::StreamReader::new(req.into_body().map_err(|e| { &manifest,
std::io::Error::new( None,
match &e { progress,
e if e.is_connect() => std::io::ErrorKind::ConnectionRefused, tokio_util::io::StreamReader::new(req.into_body().map_err(|e| {
e if e.is_timeout() => std::io::ErrorKind::TimedOut, std::io::Error::new(
_ => std::io::ErrorKind::Other, match &e {
}, e if e.is_connect() => std::io::ErrorKind::ConnectionRefused,
e, e if e.is_timeout() => std::io::ErrorKind::TimedOut,
) _ => std::io::ErrorKind::Other,
})), },
) e,
.await )
{ })),
let err_str = format!( Some(send),
"Install of {}@{} Failed: {}", )
manifest.id, manifest.version, e .await
);
tracing::error!("{}", err_str);
tracing::debug!("{:?}", e);
if let Err(e) = new_ctx
.notification_manager
.notify(
&mut hdl,
Some(manifest.id),
NotificationLevel::Error,
String::from("Install Failed"),
err_str,
(),
None,
)
.await
{ {
tracing::error!("Failed to issue Notification: {}", e); let err_str = format!(
"Install of {}@{} Failed: {}",
manifest.id, manifest.version, e
);
tracing::error!("{}", err_str);
tracing::debug!("{:?}", e); tracing::debug!("{:?}", e);
if let Err(e) = new_ctx
.notification_manager
.notify(
&mut hdl,
Some(manifest.id),
NotificationLevel::Error,
String::from("Install Failed"),
err_str,
(),
None,
)
.await
{
tracing::error!("Failed to issue Notification: {}", e);
tracing::debug!("{:?}", e);
}
} }
} });
recv.await;
Response::builder() Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
@@ -707,6 +716,7 @@ pub async fn download_install_s9pk(
marketplace_url: Option<Url>, marketplace_url: Option<Url>,
progress: Arc<InstallProgress>, progress: Arc<InstallProgress>,
mut s9pk: impl AsyncRead + Unpin, mut s9pk: impl AsyncRead + Unpin,
download_complete: Option<oneshot::Sender<()>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let pkg_id = &temp_manifest.id; let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version; let version = &temp_manifest.version;
@@ -799,6 +809,9 @@ pub async fn download_install_s9pk(
let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone()); let mut progress_writer = InstallProgressTracker::new(&mut dst, progress.clone());
tokio::io::copy(&mut s9pk, &mut progress_writer).await?; tokio::io::copy(&mut s9pk, &mut progress_writer).await?;
progress.download_complete(); progress.download_complete();
if let Some(complete) = download_complete {
complete.send(()).unwrap_or_default();
}
Ok(()) Ok(())
}) })
.await?; .await?;