rework installing page and add cancel install button (#2915)

* rework installing page and add cancel install button

* actually call cancel endpoint

* fix two bugs

* include translations in progress component

* cancellable installs

* fix: comments (#2916)

* fix: comments

* delete comments

* ensure trailing slash and no qp for new registry url

---------

Co-authored-by: Matt Hill <mattnine@protonmail.com>

* fix raspi

* bump sdk

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Alex Inkin <alexander@inkin.ru>
This commit is contained in:
Matt Hill
2025-04-30 13:50:08 -06:00
committed by GitHub
parent 5c473eb9cc
commit e6f0067728
37 changed files with 431 additions and 269 deletions

View File

@@ -16,7 +16,7 @@ use models::{ActionId, PackageId};
use reqwest::{Client, Proxy};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{CallRemote, Context, Empty};
use tokio::sync::{broadcast, watch, Mutex, RwLock};
use tokio::sync::{broadcast, oneshot, watch, Mutex, RwLock};
use tokio::time::Instant;
use tracing::instrument;
@@ -56,6 +56,7 @@ pub struct RpcContextSeed {
pub os_net_service: NetService,
pub s9pk_arch: Option<&'static str>,
pub services: ServiceMap,
pub cancellable_installs: SyncMutex<BTreeMap<PackageId, oneshot::Sender<()>>>,
pub metrics_cache: Watch<Option<crate::system::Metrics>>,
pub shutdown: broadcast::Sender<Option<Shutdown>>,
pub tor_socks: SocketAddr,
@@ -239,6 +240,7 @@ impl RpcContext {
Some(crate::ARCH)
},
services,
cancellable_installs: SyncMutex::new(BTreeMap::new()),
metrics_cache,
shutdown,
tor_socks: tor_proxy,

View File

@@ -154,13 +154,15 @@ pub async fn install(
})?
.s9pk;
let progress_tracker = FullProgressTracker::new();
let download_progress = progress_tracker.add_phase("Downloading".into(), Some(100));
let download = ctx
.services
.install(
ctx.clone(),
|| asset.deserialize_s9pk_buffered(ctx.client.clone()),
|| asset.deserialize_s9pk_buffered(ctx.client.clone(), download_progress),
None::<Never>,
None,
Some(progress_tracker),
)
.await?;
tokio::spawn(async move { download.await?.await });
@@ -188,10 +190,15 @@ pub async fn sideload(
ctx: RpcContext,
SideloadParams { session }: SideloadParams,
) -> Result<SideloadResponse, Error> {
let (upload, file) = upload(&ctx, session.clone()).await?;
let (err_send, mut err_recv) = oneshot::channel::<Error>();
let progress = Guid::new();
let progress_tracker = FullProgressTracker::new();
let (upload, file) = upload(
&ctx,
session.clone(),
progress_tracker.add_phase("Uploading".into(), Some(100)),
)
.await?;
let mut progress_listener = progress_tracker.stream(Some(Duration::from_millis(200)));
ctx.rpc_continuations
.add(
@@ -268,6 +275,24 @@ pub async fn sideload(
Ok(SideloadResponse { upload, progress })
}
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct CancelInstallParams {
pub id: PackageId,
}
#[instrument(skip_all)]
pub fn cancel_install(
ctx: RpcContext,
CancelInstallParams { id }: CancelInstallParams,
) -> Result<(), Error> {
if let Some(cancel) = ctx.cancellable_installs.mutate(|c| c.remove(&id)) {
cancel.send(()).ok();
}
Ok(())
}
#[derive(Deserialize, Serialize, Parser)]
pub struct QueryPackageParams {
id: PackageId,

View File

@@ -349,6 +349,13 @@ pub fn package<C: Context>() -> ParentHandler<C> {
.no_display()
.with_about("Install a package from a marketplace or via sideloading"),
)
.subcommand(
"cancel-install",
from_fn(install::cancel_install)
.no_display()
.with_about("Cancel an install of a package")
.with_call_remote::<CliContext>(),
)
.subcommand(
"uninstall",
from_fn_async(install::uninstall)

View File

@@ -10,6 +10,7 @@ use ts_rs::TS;
use url::Url;
use crate::prelude::*;
use crate::progress::PhaseProgressTrackerHandle;
use crate::registry::signer::commitment::merkle_archive::MerkleArchiveCommitment;
use crate::registry::signer::commitment::{Commitment, Digestable};
use crate::registry::signer::sign::{AnySignature, AnyVerifyingKey};
@@ -75,9 +76,10 @@ impl RegistryAsset<MerkleArchiveCommitment> {
pub async fn deserialize_s9pk_buffered(
&self,
client: Client,
progress: PhaseProgressTrackerHandle,
) -> Result<S9pk<Section<Arc<BufferedHttpSource>>>, Error> {
S9pk::deserialize(
&Arc::new(BufferedHttpSource::new(client, self.url.clone()).await?),
&Arc::new(BufferedHttpSource::new(client, self.url.clone(), progress).await?),
Some(&self.commitment),
)
.await
@@ -89,8 +91,12 @@ pub struct BufferedHttpSource {
file: UploadingFile,
}
impl BufferedHttpSource {
pub async fn new(client: Client, url: Url) -> Result<Self, Error> {
let (mut handle, file) = UploadingFile::new().await?;
pub async fn new(
client: Client,
url: Url,
progress: PhaseProgressTrackerHandle,
) -> Result<Self, Error> {
let (mut handle, file) = UploadingFile::new(progress).await?;
let response = client.get(url).send().await?;
Ok(Self {
_download: tokio::spawn(async move { handle.download(response).await }).into(),

View File

@@ -3,14 +3,14 @@ use std::sync::Arc;
use std::time::Duration;
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::future::{BoxFuture, Fuse};
use futures::stream::FuturesUnordered;
use futures::{Future, FutureExt, StreamExt};
use helpers::NonDetachingJoinHandle;
use imbl::OrdMap;
use imbl_value::InternedString;
use models::ErrorData;
use tokio::sync::{Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use tokio::sync::{oneshot, Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use tracing::instrument;
use crate::context::RpcContext;
@@ -138,41 +138,41 @@ impl ServiceMap {
Fut: Future<Output = Result<S9pk<S>, Error>>,
S: FileSource + Clone,
{
let progress = progress.unwrap_or_else(|| FullProgressTracker::new());
let mut validate_progress = progress.add_phase("Validating Headers".into(), Some(1));
let mut unpack_progress = progress.add_phase("Unpacking".into(), Some(100));
let mut s9pk = s9pk().await?;
validate_progress.start();
s9pk.validate_and_filter(ctx.s9pk_arch)?;
validate_progress.complete();
let manifest = s9pk.as_manifest().clone();
let id = manifest.id.clone();
let icon = s9pk.icon_data_url().await?;
let developer_key = s9pk.as_archive().signer();
let mut service = self.get_mut(&id).await;
let size = s9pk.size();
if let Some(size) = size {
unpack_progress.set_total(size);
}
let op_name = if recovery_source.is_none() {
if service.is_none() {
"Install"
"Installing"
} else {
"Update"
"Updating"
}
} else {
"Restore"
"Restoring"
};
let size = s9pk.size();
let progress = progress.unwrap_or_else(|| FullProgressTracker::new());
let download_progress_contribution = size.unwrap_or(60);
let mut download_progress = progress.add_phase(
InternedString::intern("Download"),
Some(download_progress_contribution),
);
if let Some(size) = size {
download_progress.set_total(size);
}
let mut finalization_progress = progress.add_phase(
InternedString::intern(op_name),
Some(download_progress_contribution / 2),
);
let mut finalization_progress = progress.add_phase(op_name.into(), Some(50));
let restoring = recovery_source.is_some();
let mut reload_guard = ServiceRefReloadGuard::new(ctx.clone(), id.clone(), op_name);
let (cancel_send, cancel_recv) = oneshot::channel();
ctx.cancellable_installs
.mutate(|c| c.insert(id.clone(), cancel_send));
let mut reload_guard =
ServiceRefReloadCancelGuard::new(ctx.clone(), id.clone(), op_name, Some(cancel_recv));
reload_guard
.handle(async {
@@ -256,15 +256,15 @@ impl ServiceMap {
Some(Duration::from_millis(100)),
)));
download_progress.start();
unpack_progress.start();
let mut progress_writer = ProgressTrackerWriter::new(
crate::util::io::create_file(&download_path).await?,
download_progress,
unpack_progress,
);
s9pk.serialize(&mut progress_writer, true).await?;
let (file, mut download_progress) = progress_writer.into_inner();
let (file, mut unpack_progress) = progress_writer.into_inner();
file.sync_all().await?;
download_progress.complete();
unpack_progress.complete();
let installed_path = Path::new(DATA_DIR)
.join(PKG_ARCHIVE_DIR)
@@ -339,7 +339,7 @@ impl ServiceMap {
) -> Result<(), Error> {
let mut guard = self.get_mut(id).await;
if let Some(service) = guard.take() {
ServiceRefReloadGuard::new(ctx.clone(), id.clone(), "Uninstall")
ServiceRefReloadCancelGuard::new(ctx.clone(), id.clone(), "Uninstall", None)
.handle_last(async move {
let res = service.uninstall(None, soft, force).await;
drop(guard);
@@ -370,32 +370,51 @@ impl ServiceMap {
}
}
pub struct ServiceRefReloadGuard(Option<ServiceRefReloadInfo>);
impl Drop for ServiceRefReloadGuard {
pub struct ServiceRefReloadCancelGuard(
Option<ServiceRefReloadInfo>,
Option<Fuse<oneshot::Receiver<()>>>,
);
impl Drop for ServiceRefReloadCancelGuard {
fn drop(&mut self) {
if let Some(info) = self.0.take() {
tokio::spawn(info.reload(None));
}
}
}
impl ServiceRefReloadGuard {
pub fn new(ctx: RpcContext, id: PackageId, operation: &'static str) -> Self {
Self(Some(ServiceRefReloadInfo { ctx, id, operation }))
impl ServiceRefReloadCancelGuard {
pub fn new(
ctx: RpcContext,
id: PackageId,
operation: &'static str,
cancel: Option<oneshot::Receiver<()>>,
) -> Self {
Self(
Some(ServiceRefReloadInfo { ctx, id, operation }),
cancel.map(|c| c.fuse()),
)
}
pub async fn handle<T>(
&mut self,
operation: impl Future<Output = Result<T, Error>>,
) -> Result<T, Error> {
let mut errors = ErrorCollection::new();
match operation.await {
let res = async {
if let Some(cancel) = self.1.as_mut() {
tokio::select! {
res = operation => res,
_ = cancel => Err(Error::new(eyre!("Operation Cancelled"), ErrorKind::Cancelled)),
}
} else {
operation.await
}
}.await;
match res {
Ok(a) => Ok(a),
Err(e) => {
if let Some(info) = self.0.take() {
errors.handle(info.reload(Some(e.clone_output())).await);
tokio::spawn(info.reload(Some(e.clone_output())));
}
errors.handle::<(), _>(Err(e));
errors.into_result().map(|_| unreachable!()) // TODO: there's gotta be a more elegant way?
Err(e)
}
}
}

View File

@@ -18,6 +18,7 @@ use tokio::sync::watch;
use crate::context::RpcContext;
use crate::prelude::*;
use crate::progress::PhaseProgressTrackerHandle;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::merkle_archive::source::multi_cursor_file::{FileCursor, MultiCursorFile};
use crate::s9pk::merkle_archive::source::ArchiveSource;
@@ -26,9 +27,10 @@ use crate::util::io::{create_file, TmpDir};
pub async fn upload(
ctx: &RpcContext,
session: Option<InternedString>,
progress: PhaseProgressTrackerHandle,
) -> Result<(Guid, UploadingFile), Error> {
let guid = Guid::new();
let (mut handle, file) = UploadingFile::new().await?;
let (mut handle, file) = UploadingFile::new(progress).await?;
ctx.rpc_continuations
.add(
guid.clone(),
@@ -50,8 +52,8 @@ pub async fn upload(
Ok((guid, file))
}
#[derive(Default)]
struct Progress {
tracker: PhaseProgressTrackerHandle,
expected_size: Option<u64>,
written: u64,
error: Option<Error>,
@@ -69,6 +71,7 @@ impl Progress {
match res {
Ok(a) => {
self.written += *a as u64;
self.tracker += *a as u64;
true
}
Err(e) => self.handle_error(e),
@@ -123,6 +126,7 @@ impl Progress {
}
}
fn complete(&mut self) -> bool {
self.tracker.complete();
match self {
Self {
expected_size: Some(size),
@@ -133,6 +137,7 @@ impl Progress {
expected_size: Some(size),
written,
error,
..
} if *written > *size && error.is_none() => {
*error = Some(Error::new(
eyre!("Too many bytes received"),
@@ -171,8 +176,13 @@ pub struct UploadingFile {
progress: watch::Receiver<Progress>,
}
impl UploadingFile {
pub async fn new() -> Result<(UploadHandle, Self), Error> {
let progress = watch::channel(Progress::default());
pub async fn new(progress: PhaseProgressTrackerHandle) -> Result<(UploadHandle, Self), Error> {
let progress = watch::channel(Progress {
tracker: progress,
expected_size: None,
written: 0,
error: None,
});
let tmp_dir = Arc::new(TmpDir::new().await?);
let file = create_file(tmp_dir.join("upload.tmp")).await?;
let uploading = Self {
@@ -327,10 +337,12 @@ impl UploadHandle {
self.process_headers(request.headers());
self.process_body(request.into_body().into_data_stream())
.await;
self.progress.send_if_modified(|p| p.complete());
}
pub async fn download(&mut self, response: reqwest::Response) {
self.process_headers(response.headers());
self.process_body(response.bytes_stream()).await;
self.progress.send_if_modified(|p| p.complete());
}
fn process_headers(&mut self, headers: &HeaderMap) {
if let Some(content_length) = headers
@@ -338,8 +350,10 @@ impl UploadHandle {
.and_then(|a| a.to_str().log_err())
.and_then(|a| a.parse::<u64>().log_err())
{
self.progress
.send_modify(|p| p.expected_size = Some(content_length));
self.progress.send_modify(|p| {
p.expected_size = Some(content_length);
p.tracker.set_total(content_length);
});
}
}
async fn process_body<E: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>>(