mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-30 12:11:56 +00:00
Feature/multi platform (#1866)
* wip * wip * wip * wip * wip * wip * remove debian dir * lazy env and git hash * remove env and git hash on clean * don't leave project dir * use docker for native builds * start9 rust * correctly mount registry * remove systemd config * switch to /usr/bin * disable sound for now * wip * change disk list * multi-arch images * multi-arch system images * default aarch64 * edition 2021 * dynamic wifi interface name * use wifi interface from config * bugfixes * add beep based sound * wip * wip * wip * separate out raspberry pi specific files * fixes * use new initramfs always * switch journald conf to sed script * fixes * fix permissions * talking about kernel modules not scripts * fix * fix * switch to MBR * install to /usr/lib * fixes * fixes * fixes * fixes * add media config to cfg path * fixes * fixes * fixes * raspi image fixes * fix test * fix workflow * sync boot partition * gahhhhh
This commit is contained in:
@@ -1,14 +1,13 @@
|
||||
use std::cmp::min;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::Display;
|
||||
use std::future::Future;
|
||||
use std::io::Error as StdIOError;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use color_eyre::eyre::eyre;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::{FutureExt, Stream};
|
||||
use http::header::{ACCEPT_RANGES, CONTENT_LENGTH, RANGE};
|
||||
use hyper::body::Bytes;
|
||||
use pin_project::pin_project;
|
||||
@@ -30,9 +29,27 @@ pub struct HttpReader {
|
||||
enum ReadInProgress {
|
||||
None,
|
||||
InProgress(
|
||||
BoxFuture<'static, Result<BoxStream<'static, Result<Bytes, reqwest::Error>>, Error>>,
|
||||
Pin<
|
||||
Box<
|
||||
dyn Future<
|
||||
Output = Result<
|
||||
Pin<
|
||||
Box<
|
||||
dyn Stream<Item = Result<Bytes, reqwest::Error>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
>,
|
||||
>,
|
||||
Error,
|
||||
>,
|
||||
> + Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
>,
|
||||
>,
|
||||
),
|
||||
Complete(BoxStream<'static, Result<Bytes, reqwest::Error>>),
|
||||
Complete(Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send + Sync + 'static>>),
|
||||
}
|
||||
impl ReadInProgress {
|
||||
fn take(&mut self) -> Self {
|
||||
@@ -62,6 +79,7 @@ impl Display for RangeUnit {
|
||||
impl HttpReader {
|
||||
pub async fn new(http_url: Url) -> Result<Self, Error> {
|
||||
let http_client = Client::builder()
|
||||
// .proxy(reqwest::Proxy::all("socks5h://127.0.0.1:9050").unwrap())
|
||||
.build()
|
||||
.with_kind(crate::ErrorKind::TLSInit)?;
|
||||
|
||||
@@ -141,11 +159,14 @@ impl HttpReader {
|
||||
start: usize,
|
||||
len: usize,
|
||||
total_bytes: usize,
|
||||
) -> Result<BoxStream<'static, Result<Bytes, reqwest::Error>>, Error> {
|
||||
) -> Result<
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send + Sync + 'static>>,
|
||||
Error,
|
||||
> {
|
||||
let end = min(start + len, total_bytes) - 1;
|
||||
|
||||
if start > end {
|
||||
return Ok(futures::stream::empty().boxed());
|
||||
return Ok(Box::pin(futures::stream::empty()));
|
||||
}
|
||||
|
||||
let data_range = format!("{}={}-{} ", range_unit.unwrap_or_default(), start, end);
|
||||
@@ -159,7 +180,7 @@ impl HttpReader {
|
||||
.error_for_status()
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
|
||||
Ok(data_resp.bytes_stream().boxed())
|
||||
Ok(Box::pin(data_resp.bytes_stream()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,7 +191,9 @@ impl AsyncRead for HttpReader {
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
fn poll_complete(
|
||||
body: &mut BoxStream<'static, Result<Bytes, reqwest::Error>>,
|
||||
body: &mut Pin<
|
||||
Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send + Sync + 'static>,
|
||||
>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<Option<std::io::Result<usize>>> {
|
||||
@@ -220,15 +243,14 @@ impl AsyncRead for HttpReader {
|
||||
continue;
|
||||
}
|
||||
},
|
||||
ReadInProgress::None => HttpReader::get_range(
|
||||
ReadInProgress::None => Box::pin(HttpReader::get_range(
|
||||
*this.range_unit,
|
||||
this.http_client.clone(),
|
||||
this.http_url.clone(),
|
||||
*this.cursor_pos,
|
||||
buf.remaining(),
|
||||
*this.total_bytes,
|
||||
)
|
||||
.boxed(),
|
||||
)),
|
||||
ReadInProgress::InProgress(fut) => fut,
|
||||
};
|
||||
|
||||
@@ -339,19 +361,20 @@ async fn main_test() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn s9pk_test() {
|
||||
use tokio::io::BufReader;
|
||||
|
||||
let http_url = Url::parse("https://github.com/Start9Labs/hello-world-wrapper/releases/download/v0.3.0/hello-world.s9pk").unwrap();
|
||||
let http_url = Url::parse("http://qhc6ac47cytstejcepk2ia3ipadzjhlkc5qsktsbl4e7u2krfmfuaqqd.onion/content/files/2022/09/ghost.s9pk").unwrap();
|
||||
|
||||
println!("Getting this resource: {}", http_url);
|
||||
let test_reader =
|
||||
BufReader::with_capacity(1024 * 1024, HttpReader::new(http_url).await.unwrap());
|
||||
|
||||
let mut s9pk = crate::s9pk::reader::S9pkReader::from_reader(test_reader, true)
|
||||
let mut s9pk = crate::s9pk::reader::S9pkReader::from_reader(test_reader, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let manifest = s9pk.manifest().await.unwrap();
|
||||
assert_eq!(&**manifest.id, "hello-world");
|
||||
assert_eq!(&**manifest.id, "ghost");
|
||||
}
|
||||
|
||||
@@ -1,11 +1,19 @@
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::task::Poll;
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, TryStreamExt};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use futures::future::{BoxFuture, Fuse};
|
||||
use futures::{AsyncSeek, FutureExt, TryStreamExt};
|
||||
use helpers::NonDetachingJoinHandle;
|
||||
use tokio::io::{
|
||||
duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf,
|
||||
};
|
||||
|
||||
use crate::ResultExt;
|
||||
|
||||
pub trait AsyncReadSeek: AsyncRead + AsyncSeek {}
|
||||
impl<T: AsyncRead + AsyncSeek> AsyncReadSeek for T {}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AsyncCompat<T>(pub T);
|
||||
impl<T> futures::io::AsyncRead for AsyncCompat<T>
|
||||
@@ -246,3 +254,72 @@ pub fn response_to_reader(response: reqwest::Response) -> impl AsyncRead + Unpin
|
||||
)
|
||||
}))
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct BufferedWriteReader {
|
||||
#[pin]
|
||||
hdl: Fuse<NonDetachingJoinHandle<Result<(), std::io::Error>>>,
|
||||
#[pin]
|
||||
rdr: DuplexStream,
|
||||
}
|
||||
impl BufferedWriteReader {
|
||||
pub fn new<
|
||||
W: FnOnce(WriteHalf<DuplexStream>) -> Fut,
|
||||
Fut: Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static,
|
||||
>(
|
||||
write_fn: W,
|
||||
max_buf_size: usize,
|
||||
) -> Self {
|
||||
let (w, rdr) = duplex(max_buf_size);
|
||||
let (_, w) = tokio::io::split(w);
|
||||
BufferedWriteReader {
|
||||
hdl: NonDetachingJoinHandle::from(tokio::spawn(write_fn(w))).fuse(),
|
||||
rdr,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AsyncRead for BufferedWriteReader {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let this = self.project();
|
||||
let res = this.rdr.poll_read(cx, buf);
|
||||
match this.hdl.poll(cx) {
|
||||
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Err(e)),
|
||||
Poll::Ready(Err(e)) => {
|
||||
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))
|
||||
}
|
||||
_ => res,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct ByteReplacementReader<R> {
|
||||
pub replace: u8,
|
||||
pub with: u8,
|
||||
#[pin]
|
||||
pub inner: R,
|
||||
}
|
||||
impl<R: AsyncRead> AsyncRead for ByteReplacementReader<R> {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let this = self.project();
|
||||
match this.inner.poll_read(cx, buf) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
for idx in 0..buf.filled().len() {
|
||||
if buf.filled()[idx] == *this.replace {
|
||||
buf.filled_mut()[idx] = *this.with;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
a => a,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,9 +24,10 @@ use tracing::instrument;
|
||||
use crate::shutdown::Shutdown;
|
||||
use crate::{Error, ErrorKind, ResultExt as _};
|
||||
pub mod config;
|
||||
pub mod io;
|
||||
pub mod http_reader;
|
||||
pub mod io;
|
||||
pub mod logger;
|
||||
pub mod rsync;
|
||||
pub mod serde;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
|
||||
105
backend/src/util/rsync.rs
Normal file
105
backend/src/util/rsync.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
use color_eyre::eyre::eyre;
|
||||
use std::path::Path;
|
||||
|
||||
use helpers::NonDetachingJoinHandle;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::sync::watch;
|
||||
use tokio_stream::wrappers::WatchStream;
|
||||
|
||||
use crate::util::io::ByteReplacementReader;
|
||||
use crate::{Error, ErrorKind};
|
||||
|
||||
pub struct RsyncOptions {
|
||||
pub delete: bool,
|
||||
pub force: bool,
|
||||
pub ignore_existing: bool,
|
||||
}
|
||||
impl Default for RsyncOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
delete: true,
|
||||
force: true,
|
||||
ignore_existing: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Rsync {
|
||||
pub command: Child,
|
||||
_progress_task: NonDetachingJoinHandle<Result<(), Error>>,
|
||||
stderr: NonDetachingJoinHandle<Result<String, Error>>,
|
||||
pub progress: WatchStream<f64>,
|
||||
}
|
||||
impl Rsync {
|
||||
pub fn new(
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
options: RsyncOptions,
|
||||
) -> Result<Self, Error> {
|
||||
let mut cmd = Command::new("rsync");
|
||||
if options.delete {
|
||||
cmd.arg("--delete");
|
||||
}
|
||||
if options.force {
|
||||
cmd.arg("--force");
|
||||
}
|
||||
if options.ignore_existing {
|
||||
cmd.arg("--ignore-existing");
|
||||
}
|
||||
let mut command = cmd
|
||||
.arg("-a")
|
||||
.arg("--info=progress2")
|
||||
.arg(src.as_ref())
|
||||
.arg(dst.as_ref())
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.spawn()?;
|
||||
let cmd_stdout = command.stdout.take().unwrap();
|
||||
let mut cmd_stderr = command.stderr.take().unwrap();
|
||||
let (send, recv) = watch::channel(0.0);
|
||||
let stderr = tokio::spawn(async move {
|
||||
let mut res = String::new();
|
||||
cmd_stderr.read_to_string(&mut res).await?;
|
||||
Ok(res)
|
||||
})
|
||||
.into();
|
||||
let progress_task = tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(ByteReplacementReader {
|
||||
replace: b'\r',
|
||||
with: b'\n',
|
||||
inner: cmd_stdout,
|
||||
})
|
||||
.lines();
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
if let Some(percentage) = line
|
||||
.split_ascii_whitespace()
|
||||
.find_map(|col| col.strip_suffix("%"))
|
||||
{
|
||||
send.send(percentage.parse::<f64>()? / 100.0).unwrap();
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.into();
|
||||
Ok(Rsync {
|
||||
command,
|
||||
_progress_task: progress_task,
|
||||
stderr,
|
||||
progress: WatchStream::new(recv),
|
||||
})
|
||||
}
|
||||
pub async fn wait(mut self) -> Result<(), Error> {
|
||||
let status = self.command.wait().await?;
|
||||
let stderr = self.stderr.await.unwrap()?;
|
||||
if status.success() {
|
||||
tracing::info!("rsync: {}", stderr);
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
eyre!("rsync error: {}", stderr),
|
||||
ErrorKind::Filesystem,
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user