follow sideload progress (#2718)

* follow sideload progress

* small bugfix

* shareReplay with no refcount false

* don't wrap sideload progress in RPCResult

* dont present toast

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
Matt Hill
2024-09-03 09:23:47 -06:00
committed by GitHub
parent 66b018a355
commit 9981ee7601
13 changed files with 200 additions and 125 deletions

View File

@@ -17,6 +17,7 @@ use rpc_toolkit::HandlerArgs;
use rustyline_async::ReadlineEvent;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tracing::instrument;
use ts_rs::TS;
@@ -188,7 +189,7 @@ pub async fn sideload(
SideloadParams { session }: SideloadParams,
) -> Result<SideloadResponse, Error> {
let (upload, file) = upload(&ctx, session.clone()).await?;
let (err_send, err_recv) = oneshot::channel();
let (err_send, err_recv) = oneshot::channel::<Error>();
let progress = Guid::new();
let progress_tracker = FullProgressTracker::new();
let mut progress_listener = progress_tracker.stream(Some(Duration::from_millis(200)));
@@ -202,12 +203,14 @@ pub async fn sideload(
use axum::extract::ws::Message;
async move {
if let Err(e) = async {
type RpcResponse = rpc_toolkit::yajrc::RpcResponse::<GenericRpcMethod<&'static str, (), FullProgress>>;
type RpcResponse = rpc_toolkit::yajrc::RpcResponse<
GenericRpcMethod<&'static str, (), FullProgress>,
>;
tokio::select! {
res = async {
while let Some(progress) = progress_listener.next().await {
ws.send(Message::Text(
serde_json::to_string(&RpcResponse::from_result::<RpcError>(Ok(progress)))
serde_json::to_string(&progress)
.with_kind(ErrorKind::Serialization)?,
))
.await
@@ -217,12 +220,8 @@ pub async fn sideload(
} => res?,
err = err_recv => {
if let Ok(e) = err {
ws.send(Message::Text(
serde_json::to_string(&RpcResponse::from_result::<RpcError>(Err(e)))
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
ws.close_result(Err::<&str, _>(e.clone_output())).await?;
return Err(e)
}
}
}
@@ -260,7 +259,7 @@ pub async fn sideload(
}
.await
{
let _ = err_send.send(RpcError::from(e.clone_output()));
let _ = err_send.send(e.clone_output());
tracing::error!("Error sideloading package: {e}");
tracing::debug!("{e:?}");
}
@@ -407,19 +406,21 @@ pub async fn cli_install(
let mut progress = FullProgress::new();
type RpcResponse = rpc_toolkit::yajrc::RpcResponse<
GenericRpcMethod<&'static str, (), FullProgress>,
>;
loop {
tokio::select! {
msg = ws.next() => {
if let Some(msg) = msg {
if let Message::Text(t) = msg.with_kind(ErrorKind::Network)? {
progress =
serde_json::from_str::<RpcResponse>(&t)
.with_kind(ErrorKind::Deserialization)?.result?;
bar.update(&progress);
match msg.with_kind(ErrorKind::Network)? {
Message::Text(t) => {
progress =
serde_json::from_str::<FullProgress>(&t)
.with_kind(ErrorKind::Deserialization)?;
bar.update(&progress);
}
Message::Close(Some(c)) if c.code != CloseCode::Normal => {
return Err(Error::new(eyre!("{}", c.reason), ErrorKind::Network))
}
_ => (),
}
} else {
break;