mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-30 12:11:56 +00:00
Feature/registry package index (#2623)
* include system images in compat s9pk * wip * wip * update types * wip * fix signature serialization * Add SignatureHeader conversions * finish display impl for get --------- Co-authored-by: Shadowy Super Coder <musashidisciple@proton.me>
This commit is contained in:
@@ -9,18 +9,18 @@ use futures::{FutureExt, StreamExt};
|
||||
use http::header::CONTENT_LENGTH;
|
||||
use http::StatusCode;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio::sync::watch;
|
||||
|
||||
use crate::context::RpcContext;
|
||||
use crate::prelude::*;
|
||||
use crate::rpc_continuations::{RequestGuid, RpcContinuation};
|
||||
use crate::rpc_continuations::{Guid, RpcContinuation};
|
||||
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
|
||||
use crate::s9pk::merkle_archive::source::ArchiveSource;
|
||||
use crate::util::io::TmpDir;
|
||||
|
||||
pub async fn upload(ctx: &RpcContext) -> Result<(RequestGuid, UploadingFile), Error> {
|
||||
let guid = RequestGuid::new();
|
||||
pub async fn upload(ctx: &RpcContext) -> Result<(Guid, UploadingFile), Error> {
|
||||
let guid = Guid::new();
|
||||
let (mut handle, file) = UploadingFile::new().await?;
|
||||
ctx.rpc_continuations
|
||||
.add(
|
||||
@@ -120,22 +120,44 @@ impl Progress {
|
||||
.and_then(|a| a.expected_size)
|
||||
}
|
||||
async fn ready_for(watch: &mut watch::Receiver<Self>, size: u64) -> Result<(), Error> {
|
||||
if let Some(e) = watch
|
||||
.wait_for(|progress| progress.error.is_some() || progress.written >= size)
|
||||
match &*watch
|
||||
.wait_for(|progress| {
|
||||
progress.error.is_some()
|
||||
|| progress.written >= size
|
||||
|| progress.expected_size.map_or(false, |e| e < size)
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
Error::new(
|
||||
eyre!("failed to determine upload progress"),
|
||||
ErrorKind::Network,
|
||||
)
|
||||
})?
|
||||
.error
|
||||
.as_ref()
|
||||
.map(|e| e.clone_output())
|
||||
{
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
})? {
|
||||
Progress { error: Some(e), .. } => Err(e.clone_output()),
|
||||
Progress {
|
||||
expected_size: Some(e),
|
||||
..
|
||||
} if *e < size => Err(Error::new(
|
||||
eyre!("file size is less than requested"),
|
||||
ErrorKind::Network,
|
||||
)),
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
async fn ready(watch: &mut watch::Receiver<Self>) -> Result<(), Error> {
|
||||
match &*watch
|
||||
.wait_for(|progress| {
|
||||
progress.error.is_some() || Some(progress.written) == progress.expected_size
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
Error::new(
|
||||
eyre!("failed to determine upload progress"),
|
||||
ErrorKind::Network,
|
||||
)
|
||||
})? {
|
||||
Progress { error: Some(e), .. } => Err(e.clone_output()),
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
fn complete(&mut self) -> bool {
|
||||
@@ -156,13 +178,25 @@ impl Progress {
|
||||
));
|
||||
true
|
||||
}
|
||||
Self { error, .. } if error.is_none() => {
|
||||
Self {
|
||||
error,
|
||||
expected_size: Some(_),
|
||||
..
|
||||
} if error.is_none() => {
|
||||
*error = Some(Error::new(
|
||||
eyre!("Connection closed or timed out before full file received"),
|
||||
ErrorKind::Network,
|
||||
));
|
||||
true
|
||||
}
|
||||
Self {
|
||||
expected_size,
|
||||
written,
|
||||
..
|
||||
} if expected_size.is_none() => {
|
||||
*expected_size = Some(*written);
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
@@ -204,6 +238,10 @@ impl ArchiveSource for UploadingFile {
|
||||
async fn size(&self) -> Option<u64> {
|
||||
Progress::expected_size(&mut self.progress.clone()).await
|
||||
}
|
||||
async fn fetch_all(&self) -> Result<impl AsyncRead + Unpin + Send, Error> {
|
||||
Progress::ready(&mut self.progress.clone()).await?;
|
||||
self.file.fetch_all().await
|
||||
}
|
||||
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
|
||||
Progress::ready_for(&mut self.progress.clone(), position + size).await?;
|
||||
self.file.fetch(position, size).await
|
||||
|
||||
Reference in New Issue
Block a user