Feature/new registry (#2612)

* wip

* overhaul boot process

* wip: new registry

* wip

* wip

* wip

* wip

* wip

* wip

* os registry complete

* ui fixes

* fixes

* fixes

* more fixes

* fix merkle archive
This commit is contained in:
Aiden McClelland
2024-05-06 10:20:44 -06:00
committed by GitHub
parent 8a38666105
commit 9b14d714ca
167 changed files with 6297 additions and 3190 deletions

View File

@@ -1,11 +1,15 @@
use std::collections::VecDeque;
use std::future::Future;
use std::io::Cursor;
use std::os::unix::prelude::MetadataExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::task::Poll;
use std::sync::Arc;
use std::task::{Poll, Waker};
use std::time::Duration;
use bytes::{Buf, BytesMut};
use futures::future::{BoxFuture, Fuse};
use futures::{AsyncSeek, FutureExt, TryStreamExt};
use helpers::NonDetachingJoinHandle;
@@ -15,6 +19,7 @@ use tokio::io::{
duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf,
};
use tokio::net::TcpStream;
use tokio::sync::Notify;
use tokio::time::{Instant, Sleep};
use crate::prelude::*;
@@ -312,6 +317,7 @@ impl AsyncRead for BufferedWriteReader {
pub trait CursorExt {
fn pure_read(&mut self, buf: &mut ReadBuf<'_>);
fn remaining_slice(&self) -> &[u8];
}
impl<T: AsRef<[u8]>> CursorExt for Cursor<T> {
@@ -324,6 +330,10 @@ impl<T: AsRef<[u8]>> CursorExt for Cursor<T> {
buf.put_slice(&self.get_ref().as_ref()[self.position() as usize..end]);
self.set_position(end as u64);
}
fn remaining_slice(&self) -> &[u8] {
let len = self.position().min(self.get_ref().as_ref().len() as u64);
&self.get_ref().as_ref()[(len as usize)..]
}
}
#[pin_project::pin_project]
@@ -744,3 +754,256 @@ pub async fn rename(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(),
.await
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("mv {src:?} -> {dst:?}")))
}
fn poll_flush_prefix<W: AsyncWrite>(
mut writer: Pin<&mut W>,
cx: &mut std::task::Context<'_>,
prefix: &mut VecDeque<Cursor<Vec<u8>>>,
flush_writer: bool,
) -> Poll<Result<(), std::io::Error>> {
while let Some(mut cur) = prefix.pop_front() {
let buf = cur.remaining_slice();
if !buf.is_empty() {
match writer.as_mut().poll_write(cx, buf)? {
Poll::Ready(n) if n == buf.len() => (),
Poll::Ready(n) => {
cur.advance(n);
prefix.push_front(cur);
}
Poll::Pending => {
prefix.push_front(cur);
return Poll::Pending;
}
}
}
}
if flush_writer {
writer.poll_flush(cx)
} else {
Poll::Ready(Ok(()))
}
}
fn poll_write_prefix_buf<W: AsyncWrite>(
mut writer: Pin<&mut W>,
cx: &mut std::task::Context<'_>,
prefix: &mut VecDeque<Cursor<Vec<u8>>>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
futures::ready!(poll_flush_prefix(writer.as_mut(), cx, prefix, false)?);
writer.poll_write(cx, buf)
}
#[pin_project::pin_project]
pub struct TeeWriter<W1, W2> {
capacity: usize,
buffer1: VecDeque<Cursor<Vec<u8>>>,
buffer2: VecDeque<Cursor<Vec<u8>>>,
#[pin]
writer1: W1,
#[pin]
writer2: W2,
}
impl<W1: AsyncWrite, W2: AsyncWrite> TeeWriter<W1, W2> {
pub fn new(writer1: W1, writer2: W2, capacity: usize) -> Self {
Self {
capacity,
buffer1: VecDeque::new(),
buffer2: VecDeque::new(),
writer1,
writer2,
}
}
}
impl<W1: AsyncWrite + Unpin, W2: AsyncWrite + Unpin> TeeWriter<W1, W2> {
pub async fn into_inner(mut self) -> Result<(W1, W2), Error> {
self.flush().await?;
Ok((self.writer1, self.writer2))
}
}
impl<W1: AsyncWrite, W2: AsyncWrite> AsyncWrite for TeeWriter<W1, W2> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
mut buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let mut this = self.project();
let buffer_size = this
.buffer1
.iter()
.chain(this.buffer2.iter())
.map(|b| b.get_ref().len())
.sum::<usize>();
if buffer_size < *this.capacity {
let to_write = std::cmp::min(*this.capacity - buffer_size, buf.len());
buf = &buf[0..to_write];
} else {
match (
poll_flush_prefix(this.writer1.as_mut(), cx, &mut this.buffer1, false)?,
poll_flush_prefix(this.writer2.as_mut(), cx, &mut this.buffer2, false)?,
) {
(Poll::Ready(()), Poll::Ready(())) => (),
_ => return Poll::Pending,
}
}
let (w1, w2) = match (
poll_write_prefix_buf(this.writer1.as_mut(), cx, &mut this.buffer1, buf)?,
poll_write_prefix_buf(this.writer2.as_mut(), cx, &mut this.buffer2, buf)?,
) {
(Poll::Pending, Poll::Pending) => return Poll::Pending,
(Poll::Ready(n), Poll::Pending) => (n, 0),
(Poll::Pending, Poll::Ready(n)) => (0, n),
(Poll::Ready(n1), Poll::Ready(n2)) => (n1, n2),
};
if w1 > w2 {
this.buffer2.push_back(Cursor::new(buf[w2..w1].to_vec()));
} else if w1 < w2 {
this.buffer1.push_back(Cursor::new(buf[w1..w2].to_vec()));
}
Poll::Ready(Ok(std::cmp::max(w1, w2)))
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let mut this = self.project();
match (
poll_flush_prefix(this.writer1, cx, &mut this.buffer1, true)?,
poll_flush_prefix(this.writer2, cx, &mut this.buffer2, true)?,
) {
(Poll::Ready(()), Poll::Ready(())) => Poll::Ready(Ok(())),
_ => Poll::Pending,
}
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.poll_flush(cx)
}
}
#[pin_project::pin_project]
pub struct ParallelBlake3Writer {
#[pin]
hasher: NonDetachingJoinHandle<blake3::Hash>,
buffer: Arc<(std::sync::Mutex<(BytesMut, Vec<Waker>, bool)>, Notify)>,
capacity: usize,
}
impl ParallelBlake3Writer {
/// memory usage can be as much as 2x capacity
pub fn new(capacity: usize) -> Self {
let buffer = Arc::new((
std::sync::Mutex::new((BytesMut::new(), Vec::<Waker>::new(), false)),
Notify::new(),
));
let hasher_buffer = buffer.clone();
Self {
hasher: tokio::spawn(async move {
let mut hasher = blake3::Hasher::new();
let mut to_hash = BytesMut::new();
let mut notified;
while {
let mut guard = hasher_buffer.0.lock().unwrap();
let (buffer, wakers, shutdown) = &mut *guard;
std::mem::swap(buffer, &mut to_hash);
let wakers = std::mem::take(wakers);
let shutdown = *shutdown;
notified = hasher_buffer.1.notified();
drop(guard);
if to_hash.len() > 128 * 1024
/* 128 KiB */
{
hasher.update_rayon(&to_hash);
} else {
hasher.update(&to_hash);
}
to_hash.truncate(0);
for waker in wakers {
waker.wake();
}
!shutdown && to_hash.len() == 0
} {
notified.await;
}
hasher.finalize()
})
.into(),
buffer,
capacity,
}
}
pub async fn finalize(mut self) -> Result<blake3::Hash, Error> {
self.shutdown().await?;
self.hasher.await.with_kind(ErrorKind::Unknown)
}
}
impl AsyncWrite for ParallelBlake3Writer {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let this = self.project();
let mut guard = this.buffer.0.lock().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, eyre!("hashing thread panicked"))
})?;
let (buffer, wakers, shutdown) = &mut *guard;
if !*shutdown {
if buffer.len() < *this.capacity {
let to_write = std::cmp::min(*this.capacity - buffer.len(), buf.len());
buffer.extend_from_slice(&buf[0..to_write]);
if buffer.len() >= *this.capacity / 2 {
this.buffer.1.notify_waiters();
}
Poll::Ready(Ok(to_write))
} else {
wakers.push(cx.waker().clone());
Poll::Pending
}
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
eyre!("write after shutdown"),
)))
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let this = self.project();
let mut guard = this.buffer.0.lock().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, eyre!("hashing thread panicked"))
})?;
let (buffer, wakers, _) = &mut *guard;
if buffer.is_empty() {
Poll::Ready(Ok(()))
} else {
wakers.push(cx.waker().clone());
this.buffer.1.notify_waiters();
Poll::Pending
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
futures::ready!(self.as_mut().poll_flush(cx)?);
let this = self.project();
let mut guard = this.buffer.0.lock().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, eyre!("hashing thread panicked"))
})?;
let (buffer, wakers, shutdown) = &mut *guard;
if *shutdown && buffer.len() == 0 {
return Poll::Ready(Ok(()));
}
wakers.push(cx.waker().clone());
*shutdown = true;
this.buffer.1.notify_waiters();
Poll::Pending
}
}

View File

@@ -33,6 +33,7 @@ pub mod http_reader;
pub mod io;
pub mod logger;
pub mod lshw;
pub mod rpc;
pub mod rpc_client;
pub mod serde;

View File

@@ -0,0 +1,72 @@
use clap::Parser;
use rpc_toolkit::{from_fn_async, Context, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use url::Url;
use crate::context::CliContext;
use crate::prelude::*;
use crate::s9pk::merkle_archive::source::http::HttpSource;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::{ArchiveSource, DynFileSource, FileSource};
use crate::util::io::ParallelBlake3Writer;
use crate::util::serde::Base16;
pub fn util<C: Context>() -> ParentHandler<C> {
ParentHandler::new().subcommand("b3sum", from_fn_async(b3sum))
}
#[derive(Debug, Deserialize, Serialize, Parser)]
pub struct B3sumParams {
#[arg(long = "no-mmap", action = clap::ArgAction::SetFalse)]
allow_mmap: bool,
file: String,
}
pub async fn b3sum(
ctx: CliContext,
B3sumParams { file, allow_mmap }: B3sumParams,
) -> Result<Base16<[u8; 32]>, Error> {
let source = if let Ok(url) = file.parse::<Url>() {
if url.scheme() == "file" {
let file = MultiCursorFile::from(File::open(url.path()).await?);
if allow_mmap {
return file.blake3_mmap().await.map(|h| *h.as_bytes()).map(Base16);
}
DynFileSource::new(file.section(
0,
file.size().await.ok_or_else(|| {
Error::new(eyre!("failed to get file size"), ErrorKind::Filesystem)
})?,
))
} else if url.scheme() == "http" || url.scheme() == "https" {
let file = HttpSource::new(ctx.client.clone(), url).await?;
DynFileSource::new(file.section(
0,
file.size().await.ok_or_else(|| {
Error::new(eyre!("failed to get file size"), ErrorKind::Filesystem)
})?,
))
} else {
return Err(Error::new(
eyre!("unknown scheme: {}", url.scheme()),
ErrorKind::InvalidRequest,
));
}
} else {
let file = MultiCursorFile::from(File::open(file).await?);
if allow_mmap {
return file.blake3_mmap().await.map(|h| *h.as_bytes()).map(Base16);
}
DynFileSource::new(file.section(
0,
file.size().await.ok_or_else(|| {
Error::new(eyre!("failed to get file size"), ErrorKind::Filesystem)
})?,
))
};
let mut hasher = ParallelBlake3Writer::new(crate::s9pk::merkle_archive::hash::BUFFER_CAPACITY);
source.copy(&mut hasher).await?;
hasher.finalize().await.map(|h| *h.as_bytes()).map(Base16)
}

View File

@@ -10,7 +10,10 @@ use color_eyre::eyre::eyre;
use imbl::OrdMap;
use openssl::pkey::{PKey, Private};
use openssl::x509::{X509Ref, X509};
use rpc_toolkit::{AnyContext, Handler, HandlerArgs, HandlerArgsFor, HandlerTypes, PrintCliResult};
use rpc_toolkit::{
CliBindings, Context, Handler, HandlerArgs, HandlerArgsFor, HandlerFor, HandlerTypes,
PrintCliResult,
};
use serde::de::DeserializeOwned;
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -18,8 +21,8 @@ use serde_json::Value;
use ts_rs::TS;
use super::IntoDoubleEndedIterator;
use crate::prelude::*;
use crate::util::clap::FromStrParser;
use crate::{Error, ResultExt};
pub fn deserialize_from_str<
'de,
@@ -489,10 +492,10 @@ impl<T: CommandFactory> CommandFactory for WithIoFormat<T> {
}
}
pub trait HandlerExtSerde: Handler {
pub trait HandlerExtSerde<C: Context>: HandlerFor<C> {
fn with_display_serializable(self) -> DisplaySerializable<Self>;
}
impl<T: Handler> HandlerExtSerde for T {
impl<T: HandlerFor<C>, C: Context> HandlerExtSerde<C> for T {
fn with_display_serializable(self) -> DisplaySerializable<Self> {
DisplaySerializable(self)
}
@@ -506,9 +509,7 @@ impl<T: HandlerTypes> HandlerTypes for DisplaySerializable<T> {
type Ok = T::Ok;
type Err = T::Err;
}
#[async_trait::async_trait]
impl<T: Handler> Handler for DisplaySerializable<T> {
type Context = T::Context;
impl<T: HandlerFor<C>, C: Context> HandlerFor<C> for DisplaySerializable<T> {
fn handle_sync(
&self,
HandlerArgs {
@@ -518,7 +519,7 @@ impl<T: Handler> Handler for DisplaySerializable<T> {
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
}: HandlerArgsFor<C, Self>,
) -> Result<Self::Ok, Self::Err> {
self.0.handle_sync(HandlerArgs {
context,
@@ -538,7 +539,7 @@ impl<T: Handler> Handler for DisplaySerializable<T> {
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
}: HandlerArgsFor<C, Self>,
) -> Result<Self::Ok, Self::Err> {
self.0
.handle_async(HandlerArgs {
@@ -551,34 +552,56 @@ impl<T: Handler> Handler for DisplaySerializable<T> {
})
.await
}
fn contexts(&self) -> Option<imbl::OrdSet<std::any::TypeId>> {
self.0.contexts()
fn metadata(&self, method: VecDeque<&'static str>) -> OrdMap<&'static str, imbl_value::Value> {
self.0.metadata(method)
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, imbl_value::Value> {
self.0.metadata(method, ctx_ty)
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.0.method_from_dots(method, ctx_ty)
fn method_from_dots(&self, method: &str) -> Option<VecDeque<&'static str>> {
self.0.method_from_dots(method)
}
}
impl<T: HandlerTypes> PrintCliResult for DisplaySerializable<T>
impl<T: HandlerTypes, C: Context> PrintCliResult<C> for DisplaySerializable<T>
where
T::Ok: Serialize,
{
type Context = AnyContext;
fn print(
&self,
HandlerArgs { params, .. }: HandlerArgsFor<Self::Context, Self>,
HandlerArgs { params, .. }: HandlerArgsFor<C, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
display_serializable(params.format.unwrap_or_default(), result);
Ok(())
}
}
impl<Context, T> CliBindings<Context> for DisplaySerializable<T>
where
Context: crate::Context,
Self: HandlerTypes,
Self::Params: CommandFactory + FromArgMatches + Serialize,
Self: PrintCliResult<Context>,
{
fn cli_command(&self) -> clap::Command {
Self::Params::command()
}
fn cli_parse(
&self,
matches: &clap::ArgMatches,
) -> Result<(VecDeque<&'static str>, patch_db::Value), clap::Error> {
Self::Params::from_arg_matches(matches).and_then(|a| {
Ok((
VecDeque::new(),
imbl_value::to_value(&a)
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::ValueValidation, e))?,
))
})
}
fn cli_display(
&self,
handle_args: HandlerArgsFor<Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
self.print(handle_args, result)
}
}
#[derive(Deserialize, Serialize, TS)]
pub struct StdinDeserializable<T>(pub T);
@@ -938,6 +961,43 @@ impl<'de, K: Deserialize<'de>, V: Deserialize<'de>> Deserialize<'de> for KeyVal<
}
}
#[derive(TS)]
#[ts(type = "string", concrete(T = Vec<u8>))]
pub struct Base16<T>(pub T);
impl<'de, T: TryFrom<Vec<u8>>> Deserialize<'de> for Base16<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
hex::decode(&s)
.map_err(|_| {
serde::de::Error::invalid_value(
serde::de::Unexpected::Str(&s),
&"a valid hex string",
)
})?
.try_into()
.map_err(|_| serde::de::Error::custom("invalid length"))
.map(Self)
}
}
impl<T: AsRef<[u8]>> Serialize for Base16<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&hex::encode(self.0.as_ref()))
}
}
impl<T: AsRef<[u8]>> std::fmt::Display for Base16<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
hex::encode(self.0.as_ref()).fmt(f)
}
}
#[derive(TS)]
#[ts(type = "string", concrete(T = Vec<u8>))]
pub struct Base32<T>(pub T);
impl<'de, T: TryFrom<Vec<u8>>> Deserialize<'de> for Base32<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
@@ -968,7 +1028,14 @@ impl<T: AsRef<[u8]>> Serialize for Base32<T> {
))
}
}
impl<T: AsRef<[u8]>> std::fmt::Display for Base32<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
base32::encode(base32::Alphabet::RFC4648 { padding: true }, self.0.as_ref()).fmt(f)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, TS)]
#[ts(type = "string", concrete(T = Vec<u8>))]
pub struct Base64<T>(pub T);
impl<'de, T: TryFrom<Vec<u8>>> Deserialize<'de> for Base64<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
@@ -991,6 +1058,12 @@ impl<T: AsRef<[u8]>> Serialize for Base64<T> {
serializer.serialize_str(&base64::encode(self.0.as_ref()))
}
}
impl<T> Deref for Base64<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Clone, Debug)]
pub struct Regex(regex::Regex);
@@ -1163,7 +1236,8 @@ pub mod pem {
}
#[repr(transparent)]
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash, TS)]
#[ts(type = "string", concrete(T = ed25519_dalek::VerifyingKey))]
pub struct Pem<T: PemEncoding>(#[serde(with = "pem")] pub T);
impl<T: PemEncoding> Pem<T> {
pub fn new(value: T) -> Self {
@@ -1176,6 +1250,33 @@ impl<T: PemEncoding> Pem<T> {
unsafe { std::mem::transmute(value) }
}
}
impl<T: PemEncoding> Deref for Pem<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: PemEncoding> std::fmt::Display for Pem<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.to_pem::<serde_json::Error>()
.map_err(|_| std::fmt::Error::default())?
.fmt(f)
}
}
impl<T: PemEncoding> FromStr for Pem<T> {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
T::from_pem::<serde_json::Error>(s).with_kind(ErrorKind::Pem)?,
))
}
}
impl<T: PemEncoding> ValueParserFactory for Pem<T> {
type Parser = FromStrParser<Self>;
fn value_parser() -> Self::Parser {
Self::Parser::new()
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, TS)]
#[ts(export, type = "string | number[]")]