fix merge

This commit is contained in:
Aiden McClelland
2023-11-13 15:41:35 -07:00
parent 3b3e1e37b9
commit 5f7ff460fb
21 changed files with 0 additions and 5 deletions

View File

@@ -0,0 +1,199 @@
use std::collections::BTreeMap;
use std::path::Path;
use futures::future::BoxFuture;
use futures::FutureExt;
use imbl_value::InternedString;
use tokio::io::AsyncRead;
use crate::prelude::*;
use crate::s9pk::merkle_archive::hash::{Hash, HashWriter};
use crate::s9pk::merkle_archive::sink::{Sink, TrackingWriter};
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
use crate::s9pk::merkle_archive::write_queue::WriteQueue;
use crate::s9pk::merkle_archive::{varint, Entry, EntryContents};
#[derive(Debug)]
pub struct DirectoryContents<S>(BTreeMap<InternedString, Entry<S>>);
impl<S> DirectoryContents<S> {
pub fn new() -> Self {
Self(BTreeMap::new())
}
#[instrument(skip_all)]
pub fn get_path(&self, path: impl AsRef<Path>) -> Option<&Entry<S>> {
let mut dir = Some(self);
let mut res = None;
for segment in path.as_ref().into_iter() {
let segment = segment.to_str()?;
if segment == "/" {
continue;
}
res = dir?.get(segment);
if let Some(EntryContents::Directory(d)) = res.as_ref().map(|e| e.as_contents()) {
dir = Some(d);
} else {
dir = None
}
}
res
}
pub fn insert_path(&mut self, path: impl AsRef<Path>, entry: Entry<S>) -> Result<(), Error> {
let path = path.as_ref();
let (parent, Some(file)) = (path.parent(), path.file_name().and_then(|f| f.to_str()))
else {
return Err(Error::new(
eyre!("cannot create file at root"),
ErrorKind::Pack,
));
};
let mut dir = self;
for segment in parent.into_iter().flatten() {
let segment = segment
.to_str()
.ok_or_else(|| Error::new(eyre!("non-utf8 path segment"), ErrorKind::Utf8))?;
if segment == "/" {
continue;
}
if !dir.contains_key(segment) {
dir.insert(
segment.into(),
Entry::new(EntryContents::Directory(DirectoryContents::new())),
);
}
if let Some(EntryContents::Directory(d)) =
dir.get_mut(segment).map(|e| e.as_contents_mut())
{
dir = d;
} else {
return Err(Error::new(eyre!("failed to insert entry at path {path:?}: ancestor exists and is not a directory"), ErrorKind::Pack));
}
}
dir.insert(file.into(), entry);
Ok(())
}
pub const fn header_size() -> u64 {
8 // position: u64 BE
+ 8 // size: u64 BE
}
#[instrument(skip_all)]
pub async fn serialize_header<W: Sink>(&self, position: u64, w: &mut W) -> Result<u64, Error> {
use tokio::io::AsyncWriteExt;
let size = self.toc_size();
w.write_all(&position.to_be_bytes()).await?;
w.write_all(&size.to_be_bytes()).await?;
Ok(position)
}
pub fn toc_size(&self) -> u64 {
self.0.iter().fold(
varint::serialized_varint_size(self.0.len() as u64),
|acc, (name, entry)| {
acc + varint::serialized_varstring_size(&**name) + entry.header_size()
},
)
}
}
impl<S: ArchiveSource> DirectoryContents<Section<S>> {
#[instrument(skip_all)]
pub fn deserialize<'a>(
source: &'a S,
header: &'a mut (impl AsyncRead + Unpin + Send),
sighash: Hash,
) -> BoxFuture<'a, Result<Self, Error>> {
async move {
use tokio::io::AsyncReadExt;
let mut position = [0u8; 8];
header.read_exact(&mut position).await?;
let position = u64::from_be_bytes(position);
let mut size = [0u8; 8];
header.read_exact(&mut size).await?;
let size = u64::from_be_bytes(size);
let mut toc_reader = source.fetch(position, size).await?;
let len = varint::deserialize_varint(&mut toc_reader).await?;
let mut entries = BTreeMap::new();
for _ in 0..len {
entries.insert(
varint::deserialize_varstring(&mut toc_reader).await?.into(),
Entry::deserialize(source, &mut toc_reader).await?,
);
}
let res = Self(entries);
if res.sighash().await? == sighash {
Ok(res)
} else {
Err(Error::new(
eyre!("hash sum does not match"),
ErrorKind::InvalidSignature,
))
}
}
.boxed()
}
}
impl<S: FileSource> DirectoryContents<S> {
#[instrument(skip_all)]
pub fn update_hashes<'a>(&'a mut self, only_missing: bool) -> BoxFuture<'a, Result<(), Error>> {
async move {
for (_, entry) in &mut self.0 {
entry.update_hash(only_missing).await?;
}
Ok(())
}
.boxed()
}
#[instrument(skip_all)]
pub fn sighash<'a>(&'a self) -> BoxFuture<'a, Result<Hash, Error>> {
async move {
let mut hasher = TrackingWriter::new(0, HashWriter::new());
let mut sig_contents = BTreeMap::new();
for (name, entry) in &self.0 {
sig_contents.insert(name.clone(), entry.to_missing().await?);
}
Self(sig_contents)
.serialize_toc(&mut WriteQueue::new(0), &mut hasher)
.await?;
Ok(hasher.into_inner().finalize())
}
.boxed()
}
#[instrument(skip_all)]
pub async fn serialize_toc<'a, W: Sink>(
&'a self,
queue: &mut WriteQueue<'a, S>,
w: &mut W,
) -> Result<(), Error> {
varint::serialize_varint(self.0.len() as u64, w).await?;
for (name, entry) in self.0.iter() {
varint::serialize_varstring(&**name, w).await?;
entry.serialize_header(queue.add(entry).await?, w).await?;
}
Ok(())
}
}
impl<S> std::ops::Deref for DirectoryContents<S> {
type Target = BTreeMap<InternedString, Entry<S>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<S> std::ops::DerefMut for DirectoryContents<S> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

View File

@@ -0,0 +1,82 @@
use tokio::io::AsyncRead;
use crate::prelude::*;
use crate::s9pk::merkle_archive::hash::{Hash, HashWriter};
use crate::s9pk::merkle_archive::sink::{Sink, TrackingWriter};
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
#[derive(Debug)]
pub struct FileContents<S>(S);
impl<S> FileContents<S> {
pub fn new(source: S) -> Self {
Self(source)
}
pub const fn header_size() -> u64 {
8 // position: u64 BE
+ 8 // size: u64 BE
}
}
impl<S: ArchiveSource> FileContents<Section<S>> {
#[instrument(skip_all)]
pub async fn deserialize(
source: &S,
header: &mut (impl AsyncRead + Unpin + Send),
) -> Result<Self, Error> {
use tokio::io::AsyncReadExt;
let mut position = [0u8; 8];
header.read_exact(&mut position).await?;
let position = u64::from_be_bytes(position);
let mut size = [0u8; 8];
header.read_exact(&mut size).await?;
let size = u64::from_be_bytes(size);
Ok(Self(source.section(position, size)))
}
}
impl<S: FileSource> FileContents<S> {
pub async fn hash(&self) -> Result<Hash, Error> {
let mut hasher = TrackingWriter::new(0, HashWriter::new());
self.serialize_body(&mut hasher, None).await?;
Ok(hasher.into_inner().finalize())
}
#[instrument(skip_all)]
pub async fn serialize_header<W: Sink>(&self, position: u64, w: &mut W) -> Result<u64, Error> {
use tokio::io::AsyncWriteExt;
let size = self.0.size().await?;
w.write_all(&position.to_be_bytes()).await?;
w.write_all(&size.to_be_bytes()).await?;
Ok(position)
}
#[instrument(skip_all)]
pub async fn serialize_body<W: Sink>(
&self,
w: &mut W,
verify: Option<Hash>,
) -> Result<(), Error> {
let start = if verify.is_some() {
Some(w.current_position().await?)
} else {
None
};
self.0.copy_verify(w, verify).await?;
if let Some(start) = start {
ensure_code!(
w.current_position().await? - start == self.0.size().await?,
ErrorKind::Pack,
"FileSource::copy wrote a number of bytes that does not match FileSource::size"
);
}
Ok(())
}
}
impl<S> std::ops::Deref for FileContents<S> {
type Target = S;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@@ -0,0 +1,97 @@
pub use blake3::Hash;
use blake3::Hasher;
use tokio::io::AsyncWrite;
use crate::prelude::*;
#[pin_project::pin_project]
pub struct HashWriter {
hasher: Hasher,
}
impl HashWriter {
pub fn new() -> Self {
Self {
hasher: Hasher::new(),
}
}
pub fn finalize(self) -> Hash {
self.hasher.finalize()
}
}
impl AsyncWrite for HashWriter {
fn poll_write(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().hasher.update(buf);
std::task::Poll::Ready(Ok(buf.len()))
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
std::task::Poll::Ready(Ok(()))
}
}
#[pin_project::pin_project]
pub struct VerifyingWriter<W> {
verify: Option<(Hasher, Hash)>,
#[pin]
writer: W,
}
impl<W: AsyncWrite> VerifyingWriter<W> {
pub fn new(w: W, verify: Option<Hash>) -> Self {
Self {
verify: verify.map(|v| (Hasher::new(), v)),
writer: w,
}
}
pub fn verify(self) -> Result<W, Error> {
if let Some((actual, expected)) = self.verify {
ensure_code!(
actual.finalize() == expected,
ErrorKind::InvalidSignature,
"hash sum does not match"
);
}
Ok(self.writer)
}
}
impl<W: AsyncWrite> AsyncWrite for VerifyingWriter<W> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.project();
match this.writer.poll_write(cx, buf) {
std::task::Poll::Ready(Ok(written)) => {
if let Some((h, _)) = this.verify {
h.update(&buf[..written]);
}
std::task::Poll::Ready(Ok(written))
}
a => a,
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().writer.poll_flush(cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().writer.poll_shutdown(cx)
}
}

View File

@@ -0,0 +1,268 @@
use ed25519_dalek::{Signature, SigningKey, VerifyingKey};
use tokio::io::AsyncRead;
use crate::prelude::*;
use crate::s9pk::merkle_archive::directory_contents::DirectoryContents;
use crate::s9pk::merkle_archive::file_contents::FileContents;
use crate::s9pk::merkle_archive::hash::Hash;
use crate::s9pk::merkle_archive::sink::Sink;
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
use crate::s9pk::merkle_archive::write_queue::WriteQueue;
pub mod directory_contents;
pub mod file_contents;
pub mod hash;
pub mod sink;
pub mod source;
#[cfg(test)]
mod test;
pub mod varint;
pub mod write_queue;
#[derive(Debug)]
enum Signer {
Signed(VerifyingKey, Signature),
Signer(SigningKey),
}
#[derive(Debug)]
pub struct MerkleArchive<S> {
signer: Signer,
contents: DirectoryContents<S>,
}
impl<S> MerkleArchive<S> {
pub fn new(contents: DirectoryContents<S>, signer: SigningKey) -> Self {
Self {
signer: Signer::Signer(signer),
contents,
}
}
pub const fn header_size() -> u64 {
32 // pubkey
+ 64 // signature
+ DirectoryContents::<Section<S>>::header_size()
}
pub fn contents(&self) -> &DirectoryContents<S> {
&self.contents
}
}
impl<S: ArchiveSource> MerkleArchive<Section<S>> {
#[instrument(skip_all)]
pub async fn deserialize(
source: &S,
header: &mut (impl AsyncRead + Unpin + Send),
) -> Result<Self, Error> {
use tokio::io::AsyncReadExt;
let mut pubkey = [0u8; 32];
header.read_exact(&mut pubkey).await?;
let pubkey = VerifyingKey::from_bytes(&pubkey)?;
let mut signature = [0u8; 64];
header.read_exact(&mut signature).await?;
let signature = Signature::from_bytes(&signature);
let mut sighash = [0u8; 32];
header.read_exact(&mut sighash).await?;
let sighash = Hash::from_bytes(sighash);
let contents = DirectoryContents::deserialize(source, header, sighash).await?;
pubkey.verify_strict(contents.sighash().await?.as_bytes(), &signature)?;
Ok(Self {
signer: Signer::Signed(pubkey, signature),
contents,
})
}
}
impl<S: FileSource> MerkleArchive<S> {
pub async fn update_hashes(&mut self, only_missing: bool) -> Result<(), Error> {
self.contents.update_hashes(only_missing).await
}
#[instrument(skip_all)]
pub async fn serialize<W: Sink>(&self, w: &mut W, verify: bool) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
let sighash = self.contents.sighash().await?;
let (pubkey, signature) = match &self.signer {
Signer::Signed(pubkey, signature) => (*pubkey, *signature),
Signer::Signer(s) => (s.into(), ed25519_dalek::Signer::sign(s, sighash.as_bytes())),
};
w.write_all(pubkey.as_bytes()).await?;
w.write_all(&signature.to_bytes()).await?;
w.write_all(sighash.as_bytes()).await?;
let mut next_pos = w.current_position().await?;
next_pos += DirectoryContents::<S>::header_size();
self.contents.serialize_header(next_pos, w).await?;
next_pos += self.contents.toc_size();
let mut queue = WriteQueue::new(next_pos);
self.contents.serialize_toc(&mut queue, w).await?;
queue.serialize(w, verify).await?;
Ok(())
}
}
#[derive(Debug)]
pub struct Entry<S> {
hash: Option<Hash>,
contents: EntryContents<S>,
}
impl<S> Entry<S> {
pub fn new(contents: EntryContents<S>) -> Self {
Self {
hash: None,
contents,
}
}
pub fn hash(&self) -> Option<Hash> {
self.hash
}
pub fn as_contents(&self) -> &EntryContents<S> {
&self.contents
}
pub fn as_contents_mut(&mut self) -> &mut EntryContents<S> {
self.hash = None;
&mut self.contents
}
pub fn into_contents(self) -> EntryContents<S> {
self.contents
}
pub fn header_size(&self) -> u64 {
32 // hash
+ self.contents.header_size()
}
}
impl<S: ArchiveSource> Entry<Section<S>> {
#[instrument(skip_all)]
pub async fn deserialize(
source: &S,
header: &mut (impl AsyncRead + Unpin + Send),
) -> Result<Self, Error> {
use tokio::io::AsyncReadExt;
let mut hash = [0u8; 32];
header.read_exact(&mut hash).await?;
let hash = Hash::from_bytes(hash);
let contents = EntryContents::deserialize(source, header, hash).await?;
Ok(Self {
hash: Some(hash),
contents,
})
}
}
impl<S: FileSource> Entry<S> {
pub async fn to_missing(&self) -> Result<Self, Error> {
let hash = if let Some(hash) = self.hash {
hash
} else {
self.contents.hash().await?
};
Ok(Self {
hash: Some(hash),
contents: EntryContents::Missing,
})
}
pub async fn update_hash(&mut self, only_missing: bool) -> Result<(), Error> {
if let EntryContents::Directory(d) = &mut self.contents {
d.update_hashes(only_missing).await?;
}
self.hash = Some(self.contents.hash().await?);
Ok(())
}
#[instrument(skip_all)]
pub async fn serialize_header<W: Sink>(
&self,
position: u64,
w: &mut W,
) -> Result<Option<u64>, Error> {
use tokio::io::AsyncWriteExt;
let hash = if let Some(hash) = self.hash {
hash
} else {
self.contents.hash().await?
};
w.write_all(hash.as_bytes()).await?;
self.contents.serialize_header(position, w).await
}
}
#[derive(Debug)]
pub enum EntryContents<S> {
Missing,
File(FileContents<S>),
Directory(DirectoryContents<S>),
}
impl<S> EntryContents<S> {
fn type_id(&self) -> u8 {
match self {
Self::Missing => 0,
Self::File(_) => 1,
Self::Directory(_) => 2,
}
}
pub fn header_size(&self) -> u64 {
1 // type
+ match self {
Self::Missing => 0,
Self::File(_) => FileContents::<S>::header_size(),
Self::Directory(_) => DirectoryContents::<S>::header_size(),
}
}
}
impl<S: ArchiveSource> EntryContents<Section<S>> {
#[instrument(skip_all)]
pub async fn deserialize(
source: &S,
header: &mut (impl AsyncRead + Unpin + Send),
hash: Hash,
) -> Result<Self, Error> {
use tokio::io::AsyncReadExt;
let mut type_id = [0u8];
header.read_exact(&mut type_id).await?;
match type_id[0] {
0 => Ok(Self::Missing),
1 => Ok(Self::File(FileContents::deserialize(source, header).await?)),
2 => Ok(Self::Directory(
DirectoryContents::deserialize(source, header, hash).await?,
)),
id => Err(Error::new(
eyre!("Unknown type id {id} found in MerkleArchive"),
ErrorKind::ParseS9pk,
)),
}
}
}
impl<S: FileSource> EntryContents<S> {
pub async fn hash(&self) -> Result<Hash, Error> {
match self {
Self::Missing => Err(Error::new(
eyre!("Cannot compute hash of missing file"),
ErrorKind::Pack,
)),
Self::File(f) => f.hash().await,
Self::Directory(d) => d.sighash().await,
}
}
#[instrument(skip_all)]
pub async fn serialize_header<W: Sink>(
&self,
position: u64,
w: &mut W,
) -> Result<Option<u64>, Error> {
use tokio::io::AsyncWriteExt;
w.write_all(&[self.type_id()]).await?;
Ok(match self {
Self::Missing => None,
Self::File(f) => Some(f.serialize_header(position, w).await?),
Self::Directory(d) => Some(d.serialize_header(position, w).await?),
})
}
}

View File

@@ -0,0 +1,70 @@
use tokio::io::{AsyncSeek, AsyncWrite};
use crate::prelude::*;
#[async_trait::async_trait]
pub trait Sink: AsyncWrite + Unpin + Send {
async fn current_position(&mut self) -> Result<u64, Error>;
}
#[async_trait::async_trait]
impl<S: AsyncWrite + AsyncSeek + Unpin + Send> Sink for S {
async fn current_position(&mut self) -> Result<u64, Error> {
use tokio::io::AsyncSeekExt;
Ok(self.stream_position().await?)
}
}
#[async_trait::async_trait]
impl<W: AsyncWrite + Unpin + Send> Sink for TrackingWriter<W> {
async fn current_position(&mut self) -> Result<u64, Error> {
Ok(self.position)
}
}
#[pin_project::pin_project]
pub struct TrackingWriter<W> {
position: u64,
#[pin]
writer: W,
}
impl<W> TrackingWriter<W> {
pub fn new(start: u64, w: W) -> Self {
Self {
position: start,
writer: w,
}
}
pub fn into_inner(self) -> W {
self.writer
}
}
impl<W: AsyncWrite + Unpin + Send> AsyncWrite for TrackingWriter<W> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.project();
match this.writer.poll_write(cx, buf) {
std::task::Poll::Ready(Ok(written)) => {
*this.position += written as u64;
std::task::Poll::Ready(Ok(written))
}
a => a,
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().writer.poll_flush(cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().writer.poll_shutdown(cx)
}
}

View File

@@ -0,0 +1,91 @@
use std::sync::Arc;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use http::header::{ACCEPT_RANGES, RANGE};
use reqwest::{Client, Url};
use tokio::io::AsyncRead;
use tokio::sync::Mutex;
use tokio_util::io::StreamReader;
use crate::prelude::*;
use crate::s9pk::merkle_archive::source::ArchiveSource;
#[derive(Clone)]
pub struct HttpSource {
url: Url,
client: Client,
range_support: Result<
(),
(), // Arc<Mutex<Option<RangelessReader>>>
>,
}
impl HttpSource {
pub async fn new(client: Client, url: Url) -> Result<Self, Error> {
let range_support = client
.head(url.clone())
.send()
.await
.with_kind(ErrorKind::Network)?
.error_for_status()
.with_kind(ErrorKind::Network)?
.headers()
.get(ACCEPT_RANGES)
.and_then(|s| s.to_str().ok())
== Some("bytes");
Ok(Self {
url,
client,
range_support: if range_support {
Ok(())
} else {
todo!() // Err(Arc::new(Mutex::new(None)))
},
})
}
}
#[async_trait::async_trait]
impl ArchiveSource for HttpSource {
type Reader = HttpReader;
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
match self.range_support {
Ok(_) => Ok(HttpReader::Range(StreamReader::new(if size > 0 {
self.client
.get(self.url.clone())
.header(RANGE, format!("bytes={}-{}", position, position + size - 1))
.send()
.await
.with_kind(ErrorKind::Network)?
.error_for_status()
.with_kind(ErrorKind::Network)?
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.boxed()
} else {
futures::stream::empty().boxed()
}))),
_ => todo!(),
}
}
}
#[pin_project::pin_project(project = HttpReaderProj)]
pub enum HttpReader {
Range(#[pin] StreamReader<BoxStream<'static, Result<Bytes, std::io::Error>>, Bytes>),
// Rangeless(#[pin] RangelessReader),
}
impl AsyncRead for HttpReader {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match self.project() {
HttpReaderProj::Range(r) => r.poll_read(cx, buf),
// HttpReaderProj::Rangeless(r) => r.poll_read(cx, buf),
}
}
}
// type RangelessReader = StreamReader<BoxStream<'static, Bytes>, Bytes>;

View File

@@ -0,0 +1,120 @@
use std::path::PathBuf;
use std::sync::Arc;
use blake3::Hash;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::prelude::*;
use crate::s9pk::merkle_archive::hash::VerifyingWriter;
pub mod http;
pub mod multi_cursor_file;
#[async_trait::async_trait]
pub trait FileSource: Send + Sync + Sized + 'static {
type Reader: AsyncRead + Unpin + Send;
async fn size(&self) -> Result<u64, Error>;
async fn reader(&self) -> Result<Self::Reader, Error>;
async fn copy<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
tokio::io::copy(&mut self.reader().await?, w).await?;
Ok(())
}
async fn copy_verify<W: AsyncWrite + Unpin + Send>(
&self,
w: &mut W,
verify: Option<Hash>,
) -> Result<(), Error> {
let mut w = VerifyingWriter::new(w, verify);
tokio::io::copy(&mut self.reader().await?, &mut w).await?;
w.verify()?;
Ok(())
}
async fn to_vec(&self, verify: Option<Hash>) -> Result<Vec<u8>, Error> {
let mut vec = Vec::with_capacity(self.size().await? as usize);
self.copy_verify(&mut vec, verify).await?;
Ok(vec)
}
}
#[async_trait::async_trait]
impl FileSource for PathBuf {
type Reader = File;
async fn size(&self) -> Result<u64, Error> {
Ok(tokio::fs::metadata(self).await?.len())
}
async fn reader(&self) -> Result<Self::Reader, Error> {
Ok(File::open(self).await?)
}
}
#[async_trait::async_trait]
impl FileSource for Arc<[u8]> {
type Reader = std::io::Cursor<Self>;
async fn size(&self) -> Result<u64, Error> {
Ok(self.len() as u64)
}
async fn reader(&self) -> Result<Self::Reader, Error> {
Ok(std::io::Cursor::new(self.clone()))
}
async fn copy<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
w.write_all(&*self).await?;
Ok(())
}
}
#[async_trait::async_trait]
pub trait ArchiveSource: Clone + Send + Sync + Sized + 'static {
type Reader: AsyncRead + Unpin + Send;
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error>;
async fn copy_to<W: AsyncWrite + Unpin + Send>(
&self,
position: u64,
size: u64,
w: &mut W,
) -> Result<(), Error> {
tokio::io::copy(&mut self.fetch(position, size).await?, w).await?;
Ok(())
}
fn section(&self, position: u64, size: u64) -> Section<Self> {
Section {
source: self.clone(),
position,
size,
}
}
}
#[async_trait::async_trait]
impl ArchiveSource for Arc<[u8]> {
type Reader = tokio::io::Take<std::io::Cursor<Self>>;
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
use tokio::io::AsyncReadExt;
let mut cur = std::io::Cursor::new(self.clone());
cur.set_position(position);
Ok(cur.take(size))
}
}
#[derive(Debug)]
pub struct Section<S> {
source: S,
position: u64,
size: u64,
}
#[async_trait::async_trait]
impl<S: ArchiveSource> FileSource for Section<S> {
type Reader = S::Reader;
async fn size(&self) -> Result<u64, Error> {
Ok(self.size)
}
async fn reader(&self) -> Result<Self::Reader, Error> {
self.source.fetch(self.position, self.size).await
}
async fn copy<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
self.source.copy_to(self.position, self.size, w).await
}
}

View File

@@ -0,0 +1,84 @@
use std::io::SeekFrom;
use std::os::fd::{AsRawFd, RawFd};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::sync::{Mutex, OwnedMutexGuard};
use crate::disk::mount::filesystem::loop_dev::LoopDev;
use crate::prelude::*;
use crate::s9pk::merkle_archive::source::{ArchiveSource, Section};
#[derive(Clone)]
pub struct MultiCursorFile {
fd: RawFd,
file: Arc<Mutex<File>>,
}
impl MultiCursorFile {
fn path(&self) -> PathBuf {
Path::new("/proc/self/fd").join(self.fd.to_string())
}
}
impl From<File> for MultiCursorFile {
fn from(value: File) -> Self {
Self {
fd: value.as_raw_fd(),
file: Arc::new(Mutex::new(value)),
}
}
}
#[pin_project::pin_project]
pub struct FileSectionReader {
#[pin]
file: OwnedMutexGuard<File>,
remaining: u64,
}
impl AsyncRead for FileSectionReader {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
if *this.remaining == 0 {
return std::task::Poll::Ready(Ok(()));
}
let before = buf.filled().len() as u64;
let res = std::pin::Pin::new(&mut **this.file.get_mut())
.poll_read(cx, &mut buf.take(*this.remaining as usize));
*this.remaining = this
.remaining
.saturating_sub(buf.filled().len() as u64 - before);
res
}
}
#[async_trait::async_trait]
impl ArchiveSource for MultiCursorFile {
type Reader = FileSectionReader;
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
use tokio::io::AsyncSeekExt;
let mut file = if let Ok(file) = self.file.clone().try_lock_owned() {
file
} else {
Arc::new(Mutex::new(File::open(self.path()).await?))
.try_lock_owned()
.expect("freshly created")
};
file.seek(SeekFrom::Start(position)).await?;
Ok(Self::Reader {
file,
remaining: size,
})
}
}
impl From<Section<MultiCursorFile>> for LoopDev<PathBuf> {
fn from(value: Section<MultiCursorFile>) -> Self {
LoopDev::new(value.source.path(), value.position, value.size)
}
}

View File

@@ -0,0 +1,138 @@
use std::collections::BTreeMap;
use std::io::Cursor;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use ed25519_dalek::SigningKey;
use crate::prelude::*;
use crate::s9pk::merkle_archive::directory_contents::DirectoryContents;
use crate::s9pk::merkle_archive::file_contents::FileContents;
use crate::s9pk::merkle_archive::sink::TrackingWriter;
use crate::s9pk::merkle_archive::source::FileSource;
use crate::s9pk::merkle_archive::{Entry, EntryContents, MerkleArchive};
/// Creates a MerkleArchive (a1) with the provided files at the provided paths. NOTE: later files can overwrite previous files/directories at the same path
/// Tests:
/// - a1.update_hashes(): returns Ok(_)
/// - a1.serialize(verify: true): returns Ok(s1)
/// - MerkleArchive::deserialize(s1): returns Ok(a2)
/// - a2: contains all expected files with expected content
/// - a2.serialize(verify: true): returns Ok(s2)
/// - s1 == s2
#[instrument]
fn test(files: Vec<(PathBuf, String)>) -> Result<(), Error> {
let mut root = DirectoryContents::<Arc<[u8]>>::new();
let mut check_set = BTreeMap::<PathBuf, String>::new();
for (path, content) in files {
if let Err(e) = root.insert_path(
&path,
Entry::new(EntryContents::File(FileContents::new(
content.clone().into_bytes().into(),
))),
) {
eprintln!("failed to insert file at {path:?}: {e}");
} else {
let path = path.strip_prefix("/").unwrap_or(&path);
let mut remaining = check_set.split_off(path);
while {
if let Some((p, s)) = remaining.pop_first() {
if !p.starts_with(path) {
remaining.insert(p, s);
false
} else {
true
}
} else {
false
}
} {}
check_set.append(&mut remaining);
check_set.insert(path.to_owned(), content);
}
}
let key = SigningKey::generate(&mut rand::thread_rng());
let mut a1 = MerkleArchive::new(root, key);
tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
a1.update_hashes(true).await?;
let mut s1 = Vec::new();
a1.serialize(&mut TrackingWriter::new(0, &mut s1), true)
.await?;
let s1: Arc<[u8]> = s1.into();
let a2 = MerkleArchive::deserialize(&s1, &mut Cursor::new(s1.clone())).await?;
for (path, content) in check_set {
match a2
.contents
.get_path(&path)
.map(|e| (e.as_contents(), e.hash()))
{
Some((EntryContents::File(f), hash)) => {
ensure_code!(
&f.to_vec(hash).await? == content.as_bytes(),
ErrorKind::ParseS9pk,
"File at {path:?} does not match input"
)
}
_ => {
return Err(Error::new(
eyre!("expected file at {path:?}"),
ErrorKind::ParseS9pk,
))
}
}
}
let mut s2 = Vec::new();
a2.serialize(&mut TrackingWriter::new(0, &mut s2), true)
.await?;
let s2: Arc<[u8]> = s2.into();
ensure_code!(s1 == s2, ErrorKind::Pack, "s1 does not match s2");
Ok(())
})
}
proptest::proptest! {
#[test]
fn property_test(files: Vec<(PathBuf, String)>) {
let files: Vec<(PathBuf, String)> = files.into_iter().filter(|(p, _)| p.file_name().is_some() && p.iter().all(|s| s.to_str().is_some())).collect();
if let Err(e) = test(files.clone()) {
panic!("{e}\nInput: {files:#?}\n{e:?}");
}
}
}
#[test]
fn test_example_1() {
if let Err(e) = test(vec![(Path::new("foo").into(), "bar".into())]) {
panic!("{e}\n{e:?}");
}
}
#[test]
fn test_example_2() {
if let Err(e) = test(vec![
(Path::new("a/a.txt").into(), "a.txt".into()),
(Path::new("a/b/a.txt").into(), "a.txt".into()),
(Path::new("a/b/b/a.txt").into(), "a.txt".into()),
(Path::new("a/b/c.txt").into(), "c.txt".into()),
(Path::new("a/c.txt").into(), "c.txt".into()),
]) {
panic!("{e}\n{e:?}");
}
}
#[test]
fn test_example_3() {
if let Err(e) = test(vec![
(Path::new("b/a").into(), "𑦪".into()),
(Path::new("a/c/a").into(), "·".into()),
]) {
panic!("{e}\n{e:?}");
}
}

View File

@@ -0,0 +1,159 @@
use integer_encoding::VarInt;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::prelude::*;
/// Most-significant byte, == 0x80
pub const MSB: u8 = 0b1000_0000;
const MAX_STR_LEN: u64 = 1024 * 1024; // 1 MiB
pub fn serialized_varint_size(n: u64) -> u64 {
VarInt::required_space(n) as u64
}
pub async fn serialize_varint<W: AsyncWrite + Unpin + Send>(
n: u64,
w: &mut W,
) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
let mut buf = [0 as u8; 10];
let b = n.encode_var(&mut buf);
w.write_all(&buf[0..b]).await?;
Ok(())
}
pub fn serialized_varstring_size(s: &str) -> u64 {
serialized_varint_size(s.len() as u64) + s.len() as u64
}
pub async fn serialize_varstring<W: AsyncWrite + Unpin + Send>(
s: &str,
w: &mut W,
) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
serialize_varint(s.len() as u64, w).await?;
w.write_all(s.as_bytes()).await?;
Ok(())
}
#[derive(Default)]
struct VarIntProcessor {
buf: [u8; 10],
maxsize: usize,
i: usize,
}
impl VarIntProcessor {
fn new() -> VarIntProcessor {
VarIntProcessor {
maxsize: (std::mem::size_of::<u64>() * 8 + 7) / 7,
..VarIntProcessor::default()
}
}
fn push(&mut self, b: u8) -> Result<(), Error> {
if self.i >= self.maxsize {
return Err(Error::new(
eyre!("Unterminated varint"),
ErrorKind::ParseS9pk,
));
}
self.buf[self.i] = b;
self.i += 1;
Ok(())
}
fn finished(&self) -> bool {
self.i > 0 && (self.buf[self.i - 1] & MSB == 0)
}
fn decode(&self) -> Option<u64> {
Some(u64::decode_var(&self.buf[0..self.i])?.0)
}
}
pub async fn deserialize_varint<R: AsyncRead + Unpin>(r: &mut R) -> Result<u64, Error> {
use tokio::io::AsyncReadExt;
let mut buf = [0 as u8; 1];
let mut p = VarIntProcessor::new();
while !p.finished() {
r.read_exact(&mut buf).await?;
p.push(buf[0])?;
}
p.decode()
.ok_or_else(|| Error::new(eyre!("Reached EOF"), ErrorKind::ParseS9pk))
}
pub async fn deserialize_varstring<R: AsyncRead + Unpin>(r: &mut R) -> Result<String, Error> {
use tokio::io::AsyncReadExt;
let len = std::cmp::min(deserialize_varint(r).await?, MAX_STR_LEN);
let mut res = String::with_capacity(len as usize);
r.take(len).read_to_string(&mut res).await?;
Ok(res)
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use crate::prelude::*;
fn test_int(n: u64) -> Result<(), Error> {
let n1 = n;
tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
let mut v = Vec::new();
super::serialize_varint(n1, &mut v).await?;
let n2 = super::deserialize_varint(&mut Cursor::new(v)).await?;
ensure_code!(n1 == n2, ErrorKind::Deserialization, "n1 does not match n2");
Ok(())
})
}
fn test_string(s: &str) -> Result<(), Error> {
let s1 = s;
tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.block_on(async move {
let mut v: Vec<u8> = Vec::new();
super::serialize_varstring(&s1, &mut v).await?;
let s2 = super::deserialize_varstring(&mut Cursor::new(v)).await?;
ensure_code!(
s1 == &s2,
ErrorKind::Deserialization,
"s1 does not match s2"
);
Ok(())
})
}
proptest::proptest! {
#[test]
fn proptest_int(n: u64) {
if let Err(e) = test_int(n) {
panic!("{e}\nInput: {n}\n{e:?}");
}
}
#[test]
fn proptest_string(s: String) {
if let Err(e) = test_string(&s) {
panic!("{e}\nInput: {s:?}\n{e:?}");
}
}
}
}

View File

@@ -0,0 +1,47 @@
use std::collections::VecDeque;
use crate::prelude::*;
use crate::s9pk::merkle_archive::sink::Sink;
use crate::s9pk::merkle_archive::source::FileSource;
use crate::s9pk::merkle_archive::{Entry, EntryContents};
use crate::util::MaybeOwned;
pub struct WriteQueue<'a, S> {
next_available_position: u64,
queue: VecDeque<&'a Entry<S>>,
}
impl<'a, S> WriteQueue<'a, S> {
pub fn new(next_available_position: u64) -> Self {
Self {
next_available_position,
queue: VecDeque::new(),
}
}
}
impl<'a, S: FileSource> WriteQueue<'a, S> {
pub async fn add(&mut self, entry: &'a Entry<S>) -> Result<u64, Error> {
let res = self.next_available_position;
let size = match entry.as_contents() {
EntryContents::Missing => return Ok(0),
EntryContents::File(f) => f.size().await?,
EntryContents::Directory(d) => d.toc_size(),
};
self.next_available_position += size;
self.queue.push_back(entry);
Ok(res)
}
pub async fn serialize<W: Sink>(&mut self, w: &mut W, verify: bool) -> Result<(), Error> {
loop {
let Some(next) = self.queue.pop_front() else {
break;
};
match next.as_contents() {
EntryContents::Missing => (),
EntryContents::File(f) => f.serialize_body(w, next.hash.filter(|_| verify)).await?,
EntryContents::Directory(d) => d.serialize_toc(self, w).await?,
}
}
Ok(())
}
}

View File

@@ -0,0 +1,145 @@
use sha2::{Digest, Sha512};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tracing::instrument;
use typed_builder::TypedBuilder;
use super::header::{FileSection, Header};
use super::manifest::Manifest;
use super::SIG_CONTEXT;
use crate::util::io::to_cbor_async_writer;
use crate::util::HashWriter;
use crate::{Error, ResultExt};
#[derive(TypedBuilder)]
pub struct S9pkPacker<
'a,
W: AsyncWriteExt + AsyncSeekExt,
RLicense: AsyncReadExt + Unpin,
RInstructions: AsyncReadExt + Unpin,
RIcon: AsyncReadExt + Unpin,
RDockerImages: AsyncReadExt + Unpin,
RAssets: AsyncReadExt + Unpin,
RScripts: AsyncReadExt + Unpin,
> {
writer: W,
manifest: &'a Manifest,
license: RLicense,
instructions: RInstructions,
icon: RIcon,
docker_images: RDockerImages,
assets: RAssets,
scripts: Option<RScripts>,
}
impl<
'a,
W: AsyncWriteExt + AsyncSeekExt + Unpin,
RLicense: AsyncReadExt + Unpin,
RInstructions: AsyncReadExt + Unpin,
RIcon: AsyncReadExt + Unpin,
RDockerImages: AsyncReadExt + Unpin,
RAssets: AsyncReadExt + Unpin,
RScripts: AsyncReadExt + Unpin,
> S9pkPacker<'a, W, RLicense, RInstructions, RIcon, RDockerImages, RAssets, RScripts>
{
/// BLOCKING
#[instrument(skip_all)]
pub async fn pack(mut self, key: &ed25519_dalek::SigningKey) -> Result<(), Error> {
let header_pos = self.writer.stream_position().await?;
if header_pos != 0 {
tracing::warn!("Appending to non-empty file.");
}
let mut header = Header::placeholder();
header.serialize(&mut self.writer).await.with_ctx(|_| {
(
crate::ErrorKind::Serialization,
"Writing Placeholder Header",
)
})?;
let mut position = self.writer.stream_position().await?;
let mut writer = HashWriter::new(Sha512::new(), &mut self.writer);
// manifest
to_cbor_async_writer(&mut writer, self.manifest).await?;
let new_pos = writer.inner_mut().stream_position().await?;
header.table_of_contents.manifest = FileSection {
position,
length: new_pos - position,
};
position = new_pos;
// license
tokio::io::copy(&mut self.license, &mut writer)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, "Copying License"))?;
let new_pos = writer.inner_mut().stream_position().await?;
header.table_of_contents.license = FileSection {
position,
length: new_pos - position,
};
position = new_pos;
// instructions
tokio::io::copy(&mut self.instructions, &mut writer)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, "Copying Instructions"))?;
let new_pos = writer.inner_mut().stream_position().await?;
header.table_of_contents.instructions = FileSection {
position,
length: new_pos - position,
};
position = new_pos;
// icon
tokio::io::copy(&mut self.icon, &mut writer)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, "Copying Icon"))?;
let new_pos = writer.inner_mut().stream_position().await?;
header.table_of_contents.icon = FileSection {
position,
length: new_pos - position,
};
position = new_pos;
// docker_images
tokio::io::copy(&mut self.docker_images, &mut writer)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, "Copying Docker Images"))?;
let new_pos = writer.inner_mut().stream_position().await?;
header.table_of_contents.docker_images = FileSection {
position,
length: new_pos - position,
};
position = new_pos;
// assets
tokio::io::copy(&mut self.assets, &mut writer)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, "Copying Assets"))?;
let new_pos = writer.inner_mut().stream_position().await?;
header.table_of_contents.assets = FileSection {
position,
length: new_pos - position,
};
position = new_pos;
// scripts
if let Some(mut scripts) = self.scripts {
tokio::io::copy(&mut scripts, &mut writer)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, "Copying Scripts"))?;
let new_pos = writer.inner_mut().stream_position().await?;
header.table_of_contents.scripts = Some(FileSection {
position,
length: new_pos - position,
});
position = new_pos;
}
// header
let (hash, _) = writer.finish();
self.writer.seek(SeekFrom::Start(header_pos)).await?;
header.pubkey = key.into();
header.signature = key.sign_prehashed(hash, Some(SIG_CONTEXT))?;
header
.serialize(&mut self.writer)
.await
.with_ctx(|_| (crate::ErrorKind::Serialization, "Writing Header"))?;
self.writer.seek(SeekFrom::Start(position)).await?;
Ok(())
}
}

View File

@@ -0,0 +1,95 @@
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::io::SeekFrom;
use std::path::Path;
use color_eyre::eyre::eyre;
use futures::{FutureExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio_tar::{Archive, Entry};
use crate::util::io::from_cbor_async_reader;
use crate::{Error, ErrorKind, ARCH};
#[derive(Default, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct DockerMultiArch {
pub default: String,
pub available: BTreeSet<String>,
}
#[pin_project::pin_project(project = DockerReaderProject)]
#[derive(Debug)]
pub enum DockerReader<R: AsyncRead + Unpin> {
SingleArch(#[pin] R),
MultiArch(#[pin] Entry<Archive<R>>),
}
impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> DockerReader<R> {
pub async fn new(mut rdr: R) -> Result<Self, Error> {
let arch = if let Some(multiarch) = tokio_tar::Archive::new(&mut rdr)
.entries()?
.try_filter_map(|e| {
async move {
Ok(if &*e.path()? == Path::new("multiarch.cbor") {
Some(e)
} else {
None
})
}
.boxed()
})
.try_next()
.await?
{
let multiarch: DockerMultiArch = from_cbor_async_reader(multiarch).await?;
Some(if multiarch.available.contains(&**ARCH) {
Cow::Borrowed(&**ARCH)
} else {
Cow::Owned(multiarch.default)
})
} else {
None
};
rdr.seek(SeekFrom::Start(0)).await?;
if let Some(arch) = arch {
if let Some(image) = tokio_tar::Archive::new(rdr)
.entries()?
.try_filter_map(|e| {
let arch = arch.clone();
async move {
Ok(if &*e.path()? == Path::new(&format!("{}.tar", arch)) {
Some(e)
} else {
None
})
}
.boxed()
})
.try_next()
.await?
{
Ok(Self::MultiArch(image))
} else {
Err(Error::new(
eyre!("Docker image section does not contain tarball for architecture"),
ErrorKind::ParseS9pk,
))
}
} else {
Ok(Self::SingleArch(rdr))
}
}
}
impl<R: AsyncRead + Unpin + Send + Sync> AsyncRead for DockerReader<R> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match self.project() {
DockerReaderProject::SingleArch(r) => r.poll_read(cx, buf),
DockerReaderProject::MultiArch(r) => r.poll_read(cx, buf),
}
}
}

View File

@@ -0,0 +1,41 @@
use std::path::Path;
use crate::Error;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct GitHash(String);
impl GitHash {
pub async fn from_path(path: impl AsRef<Path>) -> Result<GitHash, Error> {
let hash = tokio::process::Command::new("git")
.args(["describe", "--always", "--abbrev=40", "--dirty=-modified"])
.current_dir(path)
.output()
.await?;
if !hash.status.success() {
return Err(Error::new(
color_eyre::eyre::eyre!("Could not get hash: {}", String::from_utf8(hash.stderr)?),
crate::ErrorKind::Filesystem,
));
}
Ok(GitHash(String::from_utf8(hash.stdout)?))
}
}
impl AsRef<str> for GitHash {
fn as_ref(&self) -> &str {
&self.0
}
}
// #[tokio::test]
// async fn test_githash_for_current() {
// let answer: GitHash = GitHash::from_path(std::env::current_dir().unwrap())
// .await
// .unwrap();
// let answer_str: &str = answer.as_ref();
// assert!(
// !answer_str.is_empty(),
// "Should have a hash for this current working"
// );
// }

View File

@@ -0,0 +1,187 @@
use std::collections::BTreeMap;
use color_eyre::eyre::eyre;
use ed25519_dalek::{Signature, VerifyingKey};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use crate::Error;
pub const MAGIC: [u8; 2] = [59, 59];
pub const VERSION: u8 = 1;
#[derive(Debug)]
pub struct Header {
pub pubkey: VerifyingKey,
pub signature: Signature,
pub table_of_contents: TableOfContents,
}
impl Header {
pub fn placeholder() -> Self {
Header {
pubkey: VerifyingKey::default(),
signature: Signature::from_bytes(&[0; 64]),
table_of_contents: Default::default(),
}
}
// MUST BE SAME SIZE REGARDLESS OF DATA
pub async fn serialize<W: AsyncWriteExt + Unpin>(&self, mut writer: W) -> std::io::Result<()> {
writer.write_all(&MAGIC).await?;
writer.write_all(&[VERSION]).await?;
writer.write_all(self.pubkey.as_bytes()).await?;
writer.write_all(&self.signature.to_bytes()).await?;
self.table_of_contents.serialize(writer).await?;
Ok(())
}
pub async fn deserialize<R: AsyncRead + Unpin>(mut reader: R) -> Result<Self, Error> {
let mut magic = [0; 2];
reader.read_exact(&mut magic).await?;
if magic != MAGIC {
return Err(Error::new(
eyre!("Incorrect Magic: {:?}", magic),
crate::ErrorKind::ParseS9pk,
));
}
let mut version = [0];
reader.read_exact(&mut version).await?;
if version[0] != VERSION {
return Err(Error::new(
eyre!("Unknown Version: {}", version[0]),
crate::ErrorKind::ParseS9pk,
));
}
let mut pubkey_bytes = [0; 32];
reader.read_exact(&mut pubkey_bytes).await?;
let pubkey = VerifyingKey::from_bytes(&pubkey_bytes)
.map_err(|e| Error::new(e, crate::ErrorKind::ParseS9pk))?;
let mut sig_bytes = [0; 64];
reader.read_exact(&mut sig_bytes).await?;
let signature = Signature::from_bytes(&sig_bytes);
let table_of_contents = TableOfContents::deserialize(reader).await?;
Ok(Header {
pubkey,
signature,
table_of_contents,
})
}
}
#[derive(Debug, Default)]
pub struct TableOfContents {
pub manifest: FileSection,
pub license: FileSection,
pub instructions: FileSection,
pub icon: FileSection,
pub docker_images: FileSection,
pub assets: FileSection,
pub scripts: Option<FileSection>,
}
impl TableOfContents {
pub async fn serialize<W: AsyncWriteExt + Unpin>(&self, mut writer: W) -> std::io::Result<()> {
let len: u32 = ((1 + "manifest".len() + 16)
+ (1 + "license".len() + 16)
+ (1 + "instructions".len() + 16)
+ (1 + "icon".len() + 16)
+ (1 + "docker_images".len() + 16)
+ (1 + "assets".len() + 16)
+ (1 + "scripts".len() + 16)) as u32;
writer.write_all(&u32::to_be_bytes(len)).await?;
self.manifest
.serialize_entry("manifest", &mut writer)
.await?;
self.license.serialize_entry("license", &mut writer).await?;
self.instructions
.serialize_entry("instructions", &mut writer)
.await?;
self.icon.serialize_entry("icon", &mut writer).await?;
self.docker_images
.serialize_entry("docker_images", &mut writer)
.await?;
self.assets.serialize_entry("assets", &mut writer).await?;
self.scripts
.unwrap_or_default()
.serialize_entry("scripts", &mut writer)
.await?;
Ok(())
}
pub async fn deserialize<R: AsyncRead + Unpin>(mut reader: R) -> std::io::Result<Self> {
let mut toc_len = [0; 4];
reader.read_exact(&mut toc_len).await?;
let toc_len = u32::from_be_bytes(toc_len);
let mut reader = reader.take(toc_len as u64);
let mut table = BTreeMap::new();
while let Some((label, section)) = FileSection::deserialize_entry(&mut reader).await? {
table.insert(label, section);
}
fn from_table(
table: &BTreeMap<Vec<u8>, FileSection>,
label: &str,
) -> std::io::Result<FileSection> {
table.get(label.as_bytes()).copied().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("Missing Required Label: {}", label),
)
})
}
#[allow(dead_code)]
fn as_opt(fs: FileSection) -> Option<FileSection> {
if fs.position | fs.length == 0 {
// 0/0 is not a valid file section
None
} else {
Some(fs)
}
}
Ok(TableOfContents {
manifest: from_table(&table, "manifest")?,
license: from_table(&table, "license")?,
instructions: from_table(&table, "instructions")?,
icon: from_table(&table, "icon")?,
docker_images: from_table(&table, "docker_images")?,
assets: from_table(&table, "assets")?,
scripts: table.get("scripts".as_bytes()).cloned(),
})
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct FileSection {
pub position: u64,
pub length: u64,
}
impl FileSection {
pub async fn serialize_entry<W: AsyncWriteExt + Unpin>(
self,
label: &str,
mut writer: W,
) -> std::io::Result<()> {
writer.write_all(&[label.len() as u8]).await?;
writer.write_all(label.as_bytes()).await?;
writer.write_all(&u64::to_be_bytes(self.position)).await?;
writer.write_all(&u64::to_be_bytes(self.length)).await?;
Ok(())
}
pub async fn deserialize_entry<R: AsyncRead + Unpin>(
mut reader: R,
) -> std::io::Result<Option<(Vec<u8>, Self)>> {
let mut label_len = [0];
let read = reader.read(&mut label_len).await?;
if read == 0 {
return Ok(None);
}
let mut label = vec![0; label_len[0] as usize];
reader.read_exact(&mut label).await?;
let mut pos = [0; 8];
reader.read_exact(&mut pos).await?;
let mut len = [0; 8];
reader.read_exact(&mut len).await?;
Ok(Some((
label,
FileSection {
position: u64::from_be_bytes(pos),
length: u64::from_be_bytes(len),
},
)))
}
}

View File

@@ -0,0 +1,211 @@
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use color_eyre::eyre::eyre;
pub use models::PackageId;
use serde::{Deserialize, Serialize};
use url::Url;
use super::git_hash::GitHash;
use crate::action::Actions;
use crate::backup::BackupActions;
use crate::config::action::ConfigActions;
use crate::dependencies::Dependencies;
use crate::migration::Migrations;
use crate::net::interface::Interfaces;
use crate::prelude::*;
use crate::procedure::docker::DockerContainers;
use crate::procedure::PackageProcedure;
use crate::status::health_check::HealthChecks;
use crate::util::serde::Regex;
use crate::util::Version;
use crate::version::{Current, VersionT};
use crate::volume::Volumes;
use crate::Error;
fn current_version() -> Version {
Current::new().semver().into()
}
#[derive(Clone, Debug, Deserialize, Serialize, HasModel)]
#[serde(rename_all = "kebab-case")]
#[model = "Model<Self>"]
pub struct Manifest {
#[serde(default = "current_version")]
pub eos_version: Version,
pub id: PackageId,
#[serde(default)]
pub git_hash: Option<GitHash>,
pub title: String,
pub version: Version,
pub description: Description,
#[serde(default)]
pub assets: Assets,
#[serde(default)]
pub build: Option<Vec<String>>,
pub release_notes: String,
pub license: String, // type of license
pub wrapper_repo: Url,
pub upstream_repo: Url,
pub support_site: Option<Url>,
pub marketing_site: Option<Url>,
pub donation_url: Option<Url>,
#[serde(default)]
pub alerts: Alerts,
pub main: PackageProcedure,
pub health_checks: HealthChecks,
pub config: Option<ConfigActions>,
pub properties: Option<PackageProcedure>,
pub volumes: Volumes,
// #[serde(default)]
pub interfaces: Interfaces,
// #[serde(default)]
pub backup: BackupActions,
#[serde(default)]
pub migrations: Migrations,
#[serde(default)]
pub actions: Actions,
// #[serde(default)]
// pub permissions: Permissions,
#[serde(default)]
pub dependencies: Dependencies,
pub containers: Option<DockerContainers>,
#[serde(default)]
pub replaces: Vec<String>,
#[serde(default)]
pub hardware_requirements: HardwareRequirements,
}
impl Manifest {
pub fn package_procedures(&self) -> impl Iterator<Item = &PackageProcedure> {
use std::iter::once;
let main = once(&self.main);
let cfg_get = self.config.as_ref().map(|a| &a.get).into_iter();
let cfg_set = self.config.as_ref().map(|a| &a.set).into_iter();
let props = self.properties.iter();
let backups = vec![&self.backup.create, &self.backup.restore].into_iter();
let migrations = self
.migrations
.to
.values()
.chain(self.migrations.from.values());
let actions = self.actions.0.values().map(|a| &a.implementation);
main.chain(cfg_get)
.chain(cfg_set)
.chain(props)
.chain(backups)
.chain(migrations)
.chain(actions)
}
pub fn with_git_hash(mut self, git_hash: GitHash) -> Self {
self.git_hash = Some(git_hash);
self
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct HardwareRequirements {
#[serde(default)]
device: BTreeMap<String, Regex>,
ram: Option<u64>,
pub arch: Option<Vec<String>>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct Assets {
#[serde(default)]
pub license: Option<PathBuf>,
#[serde(default)]
pub instructions: Option<PathBuf>,
#[serde(default)]
pub icon: Option<PathBuf>,
#[serde(default)]
pub docker_images: Option<PathBuf>,
#[serde(default)]
pub assets: Option<PathBuf>,
#[serde(default)]
pub scripts: Option<PathBuf>,
}
impl Assets {
pub fn license_path(&self) -> &Path {
self.license
.as_ref()
.map(|a| a.as_path())
.unwrap_or(Path::new("LICENSE.md"))
}
pub fn instructions_path(&self) -> &Path {
self.instructions
.as_ref()
.map(|a| a.as_path())
.unwrap_or(Path::new("INSTRUCTIONS.md"))
}
pub fn icon_path(&self) -> &Path {
self.icon
.as_ref()
.map(|a| a.as_path())
.unwrap_or(Path::new("icon.png"))
}
pub fn icon_type(&self) -> &str {
self.icon
.as_ref()
.and_then(|icon| icon.extension())
.and_then(|ext| ext.to_str())
.unwrap_or("png")
}
pub fn docker_images_path(&self) -> &Path {
self.docker_images
.as_ref()
.map(|a| a.as_path())
.unwrap_or(Path::new("docker-images"))
}
pub fn assets_path(&self) -> &Path {
self.assets
.as_ref()
.map(|a| a.as_path())
.unwrap_or(Path::new("assets"))
}
pub fn scripts_path(&self) -> &Path {
self.scripts
.as_ref()
.map(|a| a.as_path())
.unwrap_or(Path::new("scripts"))
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Description {
pub short: String,
pub long: String,
}
impl Description {
pub fn validate(&self) -> Result<(), Error> {
if self.short.chars().skip(160).next().is_some() {
return Err(Error::new(
eyre!("Short description must be 160 characters or less."),
crate::ErrorKind::ValidateS9pk,
));
}
if self.long.chars().skip(5000).next().is_some() {
return Err(Error::new(
eyre!("Long description must be 5000 characters or less."),
crate::ErrorKind::ValidateS9pk,
));
}
Ok(())
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct Alerts {
pub install: Option<String>,
pub uninstall: Option<String>,
pub restore: Option<String>,
pub start: Option<String>,
pub stop: Option<String>,
}

View File

@@ -0,0 +1,246 @@
use std::ffi::OsStr;
use std::path::PathBuf;
use color_eyre::eyre::eyre;
use futures::TryStreamExt;
use imbl::OrdMap;
use rpc_toolkit::command;
use serde_json::Value;
use tokio::io::AsyncRead;
use tracing::instrument;
use crate::context::SdkContext;
use crate::s9pk::builder::S9pkPacker;
use crate::s9pk::docker::DockerMultiArch;
use crate::s9pk::git_hash::GitHash;
use crate::s9pk::manifest::Manifest;
use crate::s9pk::reader::S9pkReader;
use crate::util::display_none;
use crate::util::io::BufferedWriteReader;
use crate::util::serde::IoFormat;
use crate::volume::Volume;
use crate::{Error, ErrorKind, ResultExt};
pub mod builder;
pub mod docker;
pub mod git_hash;
pub mod header;
pub mod manifest;
pub mod reader;
pub const SIG_CONTEXT: &[u8] = b"s9pk";
#[command(cli_only, display(display_none))]
#[instrument(skip_all)]
pub async fn pack(#[context] ctx: SdkContext, #[arg] path: Option<PathBuf>) -> Result<(), Error> {
use tokio::fs::File;
let path = if let Some(path) = path {
path
} else {
std::env::current_dir()?
};
let manifest_value: Value = if path.join("manifest.toml").exists() {
IoFormat::Toml
.from_async_reader(File::open(path.join("manifest.toml")).await?)
.await?
} else if path.join("manifest.yaml").exists() {
IoFormat::Yaml
.from_async_reader(File::open(path.join("manifest.yaml")).await?)
.await?
} else if path.join("manifest.json").exists() {
IoFormat::Json
.from_async_reader(File::open(path.join("manifest.json")).await?)
.await?
} else {
return Err(Error::new(
eyre!("manifest not found"),
crate::ErrorKind::Pack,
));
};
let manifest: Manifest = serde_json::from_value::<Manifest>(manifest_value.clone())
.with_kind(crate::ErrorKind::Deserialization)?
.with_git_hash(GitHash::from_path(&path).await?);
let extra_keys =
enumerate_extra_keys(&serde_json::to_value(&manifest).unwrap(), &manifest_value);
for k in extra_keys {
tracing::warn!("Unrecognized Manifest Key: {}", k);
}
let outfile_path = path.join(format!("{}.s9pk", manifest.id));
let mut outfile = File::create(outfile_path).await?;
S9pkPacker::builder()
.manifest(&manifest)
.writer(&mut outfile)
.license(
File::open(path.join(manifest.assets.license_path()))
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
manifest.assets.license_path().display().to_string(),
)
})?,
)
.icon(
File::open(path.join(manifest.assets.icon_path()))
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
manifest.assets.icon_path().display().to_string(),
)
})?,
)
.instructions(
File::open(path.join(manifest.assets.instructions_path()))
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
manifest.assets.instructions_path().display().to_string(),
)
})?,
)
.docker_images({
let docker_images_path = path.join(manifest.assets.docker_images_path());
let res: Box<dyn AsyncRead + Unpin + Send + Sync> = if tokio::fs::metadata(&docker_images_path).await?.is_dir() {
let tars: Vec<_> = tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(&docker_images_path).await?).try_collect().await?;
let mut arch_info = DockerMultiArch::default();
for tar in &tars {
if tar.path().extension() == Some(OsStr::new("tar")) {
arch_info.available.insert(tar.path().file_stem().unwrap_or_default().to_str().unwrap_or_default().to_owned());
}
}
if arch_info.available.contains("aarch64") {
arch_info.default = "aarch64".to_owned();
} else {
arch_info.default = arch_info.available.iter().next().cloned().unwrap_or_default();
}
let arch_info_cbor = IoFormat::Cbor.to_vec(&arch_info)?;
Box::new(BufferedWriteReader::new(|w| async move {
let mut docker_images = tokio_tar::Builder::new(w);
let mut multiarch_header = tokio_tar::Header::new_gnu();
multiarch_header.set_path("multiarch.cbor")?;
multiarch_header.set_size(arch_info_cbor.len() as u64);
multiarch_header.set_cksum();
docker_images.append(&multiarch_header, std::io::Cursor::new(arch_info_cbor)).await?;
for tar in tars
{
docker_images
.append_path_with_name(
tar.path(),
tar.file_name(),
)
.await?;
}
Ok::<_, std::io::Error>(())
}, 1024 * 1024))
} else {
Box::new(File::open(docker_images_path)
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
manifest.assets.docker_images_path().display().to_string(),
)
})?)
};
res
})
.assets({
let asset_volumes = manifest
.volumes
.iter()
.filter(|(_, v)| matches!(v, &&Volume::Assets {})).map(|(id, _)| id.clone()).collect::<Vec<_>>();
let assets_path = manifest.assets.assets_path().to_owned();
let path = path.clone();
BufferedWriteReader::new(|w| async move {
let mut assets = tokio_tar::Builder::new(w);
for asset_volume in asset_volumes
{
assets
.append_dir_all(
&asset_volume,
path.join(&assets_path).join(&asset_volume),
)
.await?;
}
Ok::<_, std::io::Error>(())
}, 1024 * 1024)
})
.scripts({
let script_path = path.join(manifest.assets.scripts_path()).join("embassy.js");
let needs_script = manifest.package_procedures().any(|a| a.is_script());
let has_script = script_path.exists();
match (needs_script, has_script) {
(true, true) => Some(File::open(script_path).await?),
(true, false) => {
return Err(Error::new(eyre!("Script is declared in manifest, but no such script exists at ./scripts/embassy.js"), ErrorKind::Pack).into())
}
(false, true) => {
tracing::warn!("Manifest does not declare any actions that use scripts, but a script exists at ./scripts/embassy.js");
None
}
(false, false) => None
}
})
.build()
.pack(&ctx.developer_key()?)
.await?;
outfile.sync_all().await?;
Ok(())
}
#[command(rename = "s9pk", cli_only, display(display_none))]
pub async fn verify(#[arg] path: PathBuf) -> Result<(), Error> {
let mut s9pk = S9pkReader::open(path, true).await?;
s9pk.validate().await?;
Ok(())
}
fn enumerate_extra_keys(reference: &Value, candidate: &Value) -> Vec<String> {
match (reference, candidate) {
(Value::Object(m_r), Value::Object(m_c)) => {
let om_r: OrdMap<String, Value> = m_r.clone().into_iter().collect();
let om_c: OrdMap<String, Value> = m_c.clone().into_iter().collect();
let common = om_r.clone().intersection(om_c.clone());
let top_extra = common.clone().symmetric_difference(om_c.clone());
let mut all_extra = top_extra
.keys()
.map(|s| format!(".{}", s))
.collect::<Vec<String>>();
for (k, v) in common {
all_extra.extend(
enumerate_extra_keys(&v, om_c.get(&k).unwrap())
.into_iter()
.map(|s| format!(".{}{}", k, s)),
)
}
all_extra
}
(_, Value::Object(m1)) => m1.clone().keys().map(|s| format!(".{}", s)).collect(),
_ => Vec::new(),
}
}
#[test]
fn test_enumerate_extra_keys() {
use serde_json::json;
let extras = enumerate_extra_keys(
&json!({
"test": 1,
"test2": null,
}),
&json!({
"test": 1,
"test2": { "test3": null },
"test4": null
}),
);
println!("{:?}", extras)
}

View File

@@ -0,0 +1,406 @@
use std::collections::BTreeSet;
use std::io::SeekFrom;
use std::ops::Range;
use std::path::Path;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use color_eyre::eyre::eyre;
use digest::Output;
use ed25519_dalek::VerifyingKey;
use futures::TryStreamExt;
use models::ImageId;
use sha2::{Digest, Sha512};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, ReadBuf};
use tracing::instrument;
use super::header::{FileSection, Header, TableOfContents};
use super::manifest::{Manifest, PackageId};
use super::SIG_CONTEXT;
use crate::install::progress::InstallProgressTracker;
use crate::s9pk::docker::DockerReader;
use crate::util::Version;
use crate::{Error, ResultExt};
const MAX_REPLACES: usize = 10;
const MAX_TITLE_LEN: usize = 30;
#[pin_project::pin_project]
#[derive(Debug)]
pub struct ReadHandle<'a, R = File> {
pos: &'a mut u64,
range: Range<u64>,
#[pin]
rdr: &'a mut R,
}
impl<'a, R: AsyncRead + Unpin> ReadHandle<'a, R> {
pub async fn to_vec(mut self) -> std::io::Result<Vec<u8>> {
let mut buf = vec![0; (self.range.end - self.range.start) as usize];
self.read_exact(&mut buf).await?;
Ok(buf)
}
}
impl<'a, R: AsyncRead + Unpin> AsyncRead for ReadHandle<'a, R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let this = self.project();
let start = buf.filled().len();
let mut take_buf = buf.take(this.range.end.saturating_sub(**this.pos) as usize);
let res = AsyncRead::poll_read(this.rdr, cx, &mut take_buf);
let n = take_buf.filled().len();
unsafe { buf.assume_init(start + n) };
buf.advance(n);
**this.pos += n as u64;
res
}
}
impl<'a, R: AsyncSeek + Unpin> AsyncSeek for ReadHandle<'a, R> {
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
let this = self.project();
AsyncSeek::start_seek(
this.rdr,
match position {
SeekFrom::Current(n) => SeekFrom::Current(n),
SeekFrom::End(n) => SeekFrom::Start((this.range.end as i64 + n) as u64),
SeekFrom::Start(n) => SeekFrom::Start(this.range.start + n),
},
)
}
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
let this = self.project();
match AsyncSeek::poll_complete(this.rdr, cx) {
Poll::Ready(Ok(n)) => {
let res = n.saturating_sub(this.range.start);
**this.pos = this.range.start + res;
Poll::Ready(Ok(res))
}
a => a,
}
}
}
#[derive(Debug)]
pub struct ImageTag {
pub package_id: PackageId,
pub image_id: ImageId,
pub version: Version,
}
impl ImageTag {
#[instrument(skip_all)]
pub fn validate(&self, id: &PackageId, version: &Version) -> Result<(), Error> {
if id != &self.package_id {
return Err(Error::new(
eyre!(
"Contains image for incorrect package: id {}",
self.package_id,
),
crate::ErrorKind::ValidateS9pk,
));
}
if version != &self.version {
return Err(Error::new(
eyre!(
"Contains image with incorrect version: expected {} received {}",
version,
self.version,
),
crate::ErrorKind::ValidateS9pk,
));
}
Ok(())
}
}
impl FromStr for ImageTag {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let rest = s.strip_prefix("start9/").ok_or_else(|| {
Error::new(
eyre!("Invalid image tag prefix: expected start9/"),
crate::ErrorKind::ValidateS9pk,
)
})?;
let (package, rest) = rest.split_once("/").ok_or_else(|| {
Error::new(
eyre!("Image tag missing image id"),
crate::ErrorKind::ValidateS9pk,
)
})?;
let (image, version) = rest.split_once(":").ok_or_else(|| {
Error::new(
eyre!("Image tag missing version"),
crate::ErrorKind::ValidateS9pk,
)
})?;
Ok(ImageTag {
package_id: package.parse()?,
image_id: image.parse()?,
version: version.parse()?,
})
}
}
pub struct S9pkReader<R: AsyncRead + AsyncSeek + Unpin + Send + Sync = File> {
hash: Option<Output<Sha512>>,
hash_string: Option<String>,
developer_key: VerifyingKey,
toc: TableOfContents,
pos: u64,
rdr: R,
}
impl S9pkReader {
pub async fn open<P: AsRef<Path>>(path: P, check_sig: bool) -> Result<Self, Error> {
let p = path.as_ref();
let rdr = File::open(p)
.await
.with_ctx(|_| (crate::error::ErrorKind::Filesystem, p.display().to_string()))?;
Self::from_reader(rdr, check_sig).await
}
}
impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<InstallProgressTracker<R>> {
pub fn validated(&mut self) {
self.rdr.validated()
}
}
impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
#[instrument(skip_all)]
pub async fn validate(&mut self) -> Result<(), Error> {
if self.toc.icon.length > 102_400 {
// 100 KiB
return Err(Error::new(
eyre!("icon must be less than 100KiB"),
crate::ErrorKind::ValidateS9pk,
));
}
let image_tags = self.image_tags().await?;
let man = self.manifest().await?;
let containers = &man.containers;
let validated_image_ids = image_tags
.into_iter()
.map(|i| i.validate(&man.id, &man.version).map(|_| i.image_id))
.collect::<Result<BTreeSet<ImageId>, _>>()?;
man.description.validate()?;
man.actions.0.iter().try_for_each(|(_, action)| {
action.validate(
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
)
})?;
man.backup.validate(
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
)?;
if let Some(cfg) = &man.config {
cfg.validate(
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
)?;
}
man.health_checks
.validate(&man.eos_version, &man.volumes, &validated_image_ids)?;
man.interfaces.validate()?;
man.main
.validate(&man.eos_version, &man.volumes, &validated_image_ids, false)
.with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Main"))?;
man.migrations.validate(
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
)?;
#[cfg(feature = "js-engine")]
if man.containers.is_some()
|| matches!(man.main, crate::procedure::PackageProcedure::Script(_))
{
return Err(Error::new(
eyre!("Right now we don't support the containers and the long running main"),
crate::ErrorKind::ValidateS9pk,
));
}
if man.replaces.len() >= MAX_REPLACES {
return Err(Error::new(
eyre!("Cannot have more than {MAX_REPLACES} replaces"),
crate::ErrorKind::ValidateS9pk,
));
}
if let Some(too_big) = man.replaces.iter().find(|x| x.len() >= MAX_REPLACES) {
return Err(Error::new(
eyre!("We have found a replaces of ({too_big}) that exceeds the max length of {MAX_TITLE_LEN} "),
crate::ErrorKind::ValidateS9pk,
));
}
if man.title.len() >= MAX_TITLE_LEN {
return Err(Error::new(
eyre!("Cannot have more than a length of {MAX_TITLE_LEN} for title"),
crate::ErrorKind::ValidateS9pk,
));
}
if man.containers.is_some()
&& matches!(man.main, crate::procedure::PackageProcedure::Docker(_))
{
return Err(Error::new(
eyre!("Cannot have a main docker and a main in containers"),
crate::ErrorKind::ValidateS9pk,
));
}
if let Some(props) = &man.properties {
props
.validate(&man.eos_version, &man.volumes, &validated_image_ids, true)
.with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Properties"))?;
}
man.volumes.validate(&man.interfaces)?;
Ok(())
}
#[instrument(skip_all)]
pub async fn image_tags(&mut self) -> Result<Vec<ImageTag>, Error> {
let mut tar = tokio_tar::Archive::new(self.docker_images().await?);
let mut entries = tar.entries()?;
while let Some(mut entry) = entries.try_next().await? {
if &*entry.path()? != Path::new("manifest.json") {
continue;
}
let mut buf = Vec::with_capacity(entry.header().size()? as usize);
entry.read_to_end(&mut buf).await?;
#[derive(serde::Deserialize)]
struct ManEntry {
#[serde(rename = "RepoTags")]
tags: Vec<String>,
}
let man_entries = serde_json::from_slice::<Vec<ManEntry>>(&buf)
.with_ctx(|_| (crate::ErrorKind::Deserialization, "manifest.json"))?;
return man_entries
.iter()
.flat_map(|e| &e.tags)
.map(|t| t.parse())
.collect();
}
Err(Error::new(
eyre!("image.tar missing manifest.json"),
crate::ErrorKind::ParseS9pk,
))
}
#[instrument(skip_all)]
pub async fn from_reader(mut rdr: R, check_sig: bool) -> Result<Self, Error> {
let header = Header::deserialize(&mut rdr).await?;
let (hash, hash_string) = if check_sig {
let mut hasher = Sha512::new();
let mut buf = [0; 1024];
let mut read;
while {
read = rdr.read(&mut buf).await?;
read != 0
} {
hasher.update(&buf[0..read]);
}
let hash = hasher.clone().finalize();
header
.pubkey
.verify_prehashed(hasher, Some(SIG_CONTEXT), &header.signature)?;
(
Some(hash),
Some(base32::encode(
base32::Alphabet::RFC4648 { padding: false },
hash.as_slice(),
)),
)
} else {
(None, None)
};
let pos = rdr.stream_position().await?;
Ok(S9pkReader {
hash_string,
hash,
developer_key: header.pubkey,
toc: header.table_of_contents,
pos,
rdr,
})
}
pub fn hash(&self) -> Option<&Output<Sha512>> {
self.hash.as_ref()
}
pub fn hash_str(&self) -> Option<&str> {
self.hash_string.as_ref().map(|s| s.as_str())
}
pub fn developer_key(&self) -> &VerifyingKey {
&self.developer_key
}
pub async fn reset(&mut self) -> Result<(), Error> {
self.rdr.seek(SeekFrom::Start(0)).await?;
Ok(())
}
async fn read_handle<'a>(
&'a mut self,
section: FileSection,
) -> Result<ReadHandle<'a, R>, Error> {
if self.pos != section.position {
self.rdr.seek(SeekFrom::Start(section.position)).await?;
self.pos = section.position;
}
Ok(ReadHandle {
range: self.pos..(self.pos + section.length),
pos: &mut self.pos,
rdr: &mut self.rdr,
})
}
pub async fn manifest_raw(&mut self) -> Result<ReadHandle<'_, R>, Error> {
self.read_handle(self.toc.manifest).await
}
pub async fn manifest(&mut self) -> Result<Manifest, Error> {
let slice = self.manifest_raw().await?.to_vec().await?;
serde_cbor::de::from_reader(slice.as_slice())
.with_ctx(|_| (crate::ErrorKind::ParseS9pk, "Deserializing Manifest (CBOR)"))
}
pub async fn license(&mut self) -> Result<ReadHandle<'_, R>, Error> {
self.read_handle(self.toc.license).await
}
pub async fn instructions(&mut self) -> Result<ReadHandle<'_, R>, Error> {
self.read_handle(self.toc.instructions).await
}
pub async fn icon(&mut self) -> Result<ReadHandle<'_, R>, Error> {
self.read_handle(self.toc.icon).await
}
pub async fn docker_images(&mut self) -> Result<DockerReader<ReadHandle<'_, R>>, Error> {
DockerReader::new(self.read_handle(self.toc.docker_images).await?).await
}
pub async fn assets(&mut self) -> Result<ReadHandle<'_, R>, Error> {
self.read_handle(self.toc.assets).await
}
pub async fn scripts(&mut self) -> Result<Option<ReadHandle<'_, R>>, Error> {
Ok(match self.toc.scripts {
None => None,
Some(a) => Some(self.read_handle(a).await?),
})
}
}

View File

@@ -0,0 +1,41 @@
use crate::prelude::*;
use crate::s9pk::merkle_archive::sink::Sink;
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
use crate::s9pk::merkle_archive::MerkleArchive;
const MAGIC_AND_VERSION: &[u8] = &[0x3b, 0x3b, 0x02];
pub struct S9pk<S>(MerkleArchive<S>);
impl<S: FileSource> S9pk<S> {
pub async fn serialize<W: Sink>(&mut self, w: &mut W, verify: bool) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
w.write_all(MAGIC_AND_VERSION).await?;
self.0.serialize(w, verify).await?;
Ok(())
}
}
impl<S: ArchiveSource> S9pk<Section<S>> {
pub async fn deserialize(source: &S) -> Result<Self, Error> {
use tokio::io::AsyncReadExt;
let mut header = source
.fetch(
0,
MAGIC_AND_VERSION.len() as u64 + MerkleArchive::<Section<S>>::header_size(),
)
.await?;
let mut magic_version = [0u8; 3];
header.read_exact(&mut magic_version).await?;
ensure_code!(
&magic_version == MAGIC_AND_VERSION,
ErrorKind::ParseS9pk,
"Invalid Magic or Unexpected Version"
);
Ok(Self(MerkleArchive::deserialize(source, &mut header).await?))
}
}

View File

@@ -0,0 +1,89 @@
## Magic
`0x3b3b`
## Version
`0x02` (varint)
## Merkle Archive
### Header
- ed25519 pubkey (32B)
- ed25519 signature of TOC sighash (64B)
- TOC sighash: (32B)
- TOC position: (8B: u64 BE)
- TOC size: (8B: u64 BE)
### TOC
- number of entries (varint)
- FOREACH section
- name (varstring)
- hash (32B: BLAKE-3 of file contents / TOC sighash)
- TYPE (1B)
- TYPE=MISSING (`0x00`)
- TYPE=FILE (`0x01`)
- position (8B: u64 BE)
- size (8B: u64 BE)
- TYPE=TOC (`0x02`)
- position (8B: u64 BE)
- size (8B: u64 BE)
#### SigHash
Hash of TOC with all contents MISSING
### FILE
`<File contents>`
# Example
`foo/bar/baz.txt`
ROOT TOC:
- 1 section
- name: foo
hash: sighash('a)
type: TOC
position: 'a
length: _
'a:
- 1 section
- name: bar
hash: sighash('b)
type: TOC
position: 'b
size: _
'b:
- 2 sections
- name: baz.txt
hash: hash('c)
type: FILE
position: 'c
length: _
- name: qux
hash: `<unverifiable>`
type: MISSING
'c: `<CONTENTS OF baz.txt>`
"foo/"
hash: _
size: 15b
"bar.txt"
hash: _
size: 5b
`<CONTENTS OF foo/>` (
"baz.txt"
hash: _
size: 2b
)
`<CONTENTS OF bar.txt>` ("hello")
`<CONTENTS OF baz.txt>` ("hi")