mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-04-04 14:29:45 +00:00
[feature]: s9pk v2 (#2507)
* feature: s9pk v2 wip wip wip wip refactor * use WriteQueue * fix proptest * LoopDev eager directory hash verification
This commit is contained in:
25
backend/Cargo.lock
generated
25
backend/Cargo.lock
generated
@@ -452,6 +452,19 @@ dependencies = [
|
|||||||
"constant_time_eq",
|
"constant_time_eq",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "blake3"
|
||||||
|
version = "1.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87"
|
||||||
|
dependencies = [
|
||||||
|
"arrayref",
|
||||||
|
"arrayvec",
|
||||||
|
"cc",
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
"constant_time_eq",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "block-buffer"
|
name = "block-buffer"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
@@ -2385,6 +2398,16 @@ dependencies = [
|
|||||||
"cfg-if 1.0.0",
|
"cfg-if 1.0.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "integer-encoding"
|
||||||
|
version = "4.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "io-lifetimes"
|
name = "io-lifetimes"
|
||||||
version = "1.0.11"
|
version = "1.0.11"
|
||||||
@@ -4898,6 +4921,7 @@ dependencies = [
|
|||||||
"base64 0.21.4",
|
"base64 0.21.4",
|
||||||
"base64ct",
|
"base64ct",
|
||||||
"basic-cookies",
|
"basic-cookies",
|
||||||
|
"blake3",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"ciborium",
|
"ciborium",
|
||||||
@@ -4929,6 +4953,7 @@ dependencies = [
|
|||||||
"include_dir",
|
"include_dir",
|
||||||
"indexmap 2.0.2",
|
"indexmap 2.0.2",
|
||||||
"indicatif",
|
"indicatif",
|
||||||
|
"integer-encoding",
|
||||||
"ipnet",
|
"ipnet",
|
||||||
"iprange",
|
"iprange",
|
||||||
"isocountry",
|
"isocountry",
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ base32 = "0.4.0"
|
|||||||
base64 = "0.21.4"
|
base64 = "0.21.4"
|
||||||
base64ct = "1.6.0"
|
base64ct = "1.6.0"
|
||||||
basic-cookies = "0.1.4"
|
basic-cookies = "0.1.4"
|
||||||
|
blake3 = "1.5.0"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
chrono = { version = "0.4.31", features = ["serde"] }
|
chrono = { version = "0.4.31", features = ["serde"] }
|
||||||
clap = "3.2.25"
|
clap = "3.2.25"
|
||||||
@@ -89,6 +90,7 @@ imbl-value = { git = "https://github.com/Start9Labs/imbl-value.git" }
|
|||||||
include_dir = "0.7.3"
|
include_dir = "0.7.3"
|
||||||
indexmap = { version = "2.0.2", features = ["serde"] }
|
indexmap = { version = "2.0.2", features = ["serde"] }
|
||||||
indicatif = { version = "0.17.7", features = ["tokio"] }
|
indicatif = { version = "0.17.7", features = ["tokio"] }
|
||||||
|
integer-encoding = { version = "4.0.0", features = ["tokio_async"] }
|
||||||
ipnet = { version = "2.8.0", features = ["serde"] }
|
ipnet = { version = "2.8.0", features = ["serde"] }
|
||||||
iprange = { version = "0.6.7", features = ["serde"] }
|
iprange = { version = "0.6.7", features = ["serde"] }
|
||||||
isocountry = "0.3.2"
|
isocountry = "0.3.2"
|
||||||
|
|||||||
89
backend/src/disk/mount/filesystem/loop_dev.rs
Normal file
89
backend/src/disk/mount/filesystem/loop_dev.rs
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
use std::os::unix::ffi::OsStrExt;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use digest::generic_array::GenericArray;
|
||||||
|
use digest::{Digest, OutputSizeUser};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use sha2::Sha256;
|
||||||
|
|
||||||
|
use super::{FileSystem, MountType, ReadOnly};
|
||||||
|
use crate::util::Invoke;
|
||||||
|
use crate::{Error, ResultExt};
|
||||||
|
|
||||||
|
pub async fn mount(
|
||||||
|
logicalname: impl AsRef<Path>,
|
||||||
|
offset: u64,
|
||||||
|
size: u64,
|
||||||
|
mountpoint: impl AsRef<Path>,
|
||||||
|
mount_type: MountType,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
tokio::fs::create_dir_all(mountpoint.as_ref()).await?;
|
||||||
|
let mut opts = format!("loop,offset={offset},sizelimit={size}");
|
||||||
|
if mount_type == ReadOnly {
|
||||||
|
opts += ",ro";
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::process::Command::new("mount")
|
||||||
|
.arg(logicalname.as_ref())
|
||||||
|
.arg(mountpoint.as_ref())
|
||||||
|
.arg("-o")
|
||||||
|
.arg(opts)
|
||||||
|
.invoke(crate::ErrorKind::Filesystem)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
#[serde(rename_all = "kebab-case")]
|
||||||
|
pub struct LoopDev<LogicalName: AsRef<Path>> {
|
||||||
|
logicalname: LogicalName,
|
||||||
|
offset: u64,
|
||||||
|
size: u64,
|
||||||
|
}
|
||||||
|
impl<LogicalName: AsRef<Path>> LoopDev<LogicalName> {
|
||||||
|
pub fn new(logicalname: LogicalName, offset: u64, size: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
logicalname,
|
||||||
|
offset,
|
||||||
|
size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl<LogicalName: AsRef<Path> + Send + Sync> FileSystem for LoopDev<LogicalName> {
|
||||||
|
async fn mount<P: AsRef<Path> + Send + Sync>(
|
||||||
|
&self,
|
||||||
|
mountpoint: P,
|
||||||
|
mount_type: MountType,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
mount(
|
||||||
|
self.logicalname.as_ref(),
|
||||||
|
self.offset,
|
||||||
|
self.size,
|
||||||
|
mountpoint,
|
||||||
|
mount_type,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
async fn source_hash(
|
||||||
|
&self,
|
||||||
|
) -> Result<GenericArray<u8, <Sha256 as OutputSizeUser>::OutputSize>, Error> {
|
||||||
|
let mut sha = Sha256::new();
|
||||||
|
sha.update("LoopDev");
|
||||||
|
sha.update(
|
||||||
|
tokio::fs::canonicalize(self.logicalname.as_ref())
|
||||||
|
.await
|
||||||
|
.with_ctx(|_| {
|
||||||
|
(
|
||||||
|
crate::ErrorKind::Filesystem,
|
||||||
|
self.logicalname.as_ref().display().to_string(),
|
||||||
|
)
|
||||||
|
})?
|
||||||
|
.as_os_str()
|
||||||
|
.as_bytes(),
|
||||||
|
);
|
||||||
|
sha.update(&u64::to_be_bytes(self.offset)[..]);
|
||||||
|
Ok(sha.finalize())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,6 +14,7 @@ pub mod ecryptfs;
|
|||||||
pub mod efivarfs;
|
pub mod efivarfs;
|
||||||
pub mod httpdirfs;
|
pub mod httpdirfs;
|
||||||
pub mod label;
|
pub mod label;
|
||||||
|
pub mod loop_dev;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
pub enum MountType {
|
pub enum MountType {
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
pub use color_eyre::eyre::eyre;
|
pub use color_eyre::eyre::eyre;
|
||||||
pub use models::OptionExt;
|
pub use models::OptionExt;
|
||||||
|
pub use tracing::instrument;
|
||||||
|
|
||||||
pub use crate::db::prelude::*;
|
pub use crate::db::prelude::*;
|
||||||
pub use crate::ensure_code;
|
pub use crate::ensure_code;
|
||||||
|
|||||||
199
backend/src/s9pk/merkle_archive/directory_contents.rs
Normal file
199
backend/src/s9pk/merkle_archive/directory_contents.rs
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use futures::FutureExt;
|
||||||
|
use imbl_value::InternedString;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::hash::{Hash, HashWriter};
|
||||||
|
use crate::s9pk::merkle_archive::sink::{Sink, TrackingWriter};
|
||||||
|
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
|
||||||
|
use crate::s9pk::merkle_archive::write_queue::WriteQueue;
|
||||||
|
use crate::s9pk::merkle_archive::{varint, Entry, EntryContents};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DirectoryContents<S>(BTreeMap<InternedString, Entry<S>>);
|
||||||
|
impl<S> DirectoryContents<S> {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self(BTreeMap::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn get_path(&self, path: impl AsRef<Path>) -> Option<&Entry<S>> {
|
||||||
|
let mut dir = Some(self);
|
||||||
|
let mut res = None;
|
||||||
|
for segment in path.as_ref().into_iter() {
|
||||||
|
let segment = segment.to_str()?;
|
||||||
|
if segment == "/" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
res = dir?.get(segment);
|
||||||
|
if let Some(EntryContents::Directory(d)) = res.as_ref().map(|e| e.as_contents()) {
|
||||||
|
dir = Some(d);
|
||||||
|
} else {
|
||||||
|
dir = None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_path(&mut self, path: impl AsRef<Path>, entry: Entry<S>) -> Result<(), Error> {
|
||||||
|
let path = path.as_ref();
|
||||||
|
let (parent, Some(file)) = (path.parent(), path.file_name().and_then(|f| f.to_str()))
|
||||||
|
else {
|
||||||
|
return Err(Error::new(
|
||||||
|
eyre!("cannot create file at root"),
|
||||||
|
ErrorKind::Pack,
|
||||||
|
));
|
||||||
|
};
|
||||||
|
let mut dir = self;
|
||||||
|
for segment in parent.into_iter().flatten() {
|
||||||
|
let segment = segment
|
||||||
|
.to_str()
|
||||||
|
.ok_or_else(|| Error::new(eyre!("non-utf8 path segment"), ErrorKind::Utf8))?;
|
||||||
|
if segment == "/" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if !dir.contains_key(segment) {
|
||||||
|
dir.insert(
|
||||||
|
segment.into(),
|
||||||
|
Entry::new(EntryContents::Directory(DirectoryContents::new())),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(EntryContents::Directory(d)) =
|
||||||
|
dir.get_mut(segment).map(|e| e.as_contents_mut())
|
||||||
|
{
|
||||||
|
dir = d;
|
||||||
|
} else {
|
||||||
|
return Err(Error::new(eyre!("failed to insert entry at path {path:?}: ancestor exists and is not a directory"), ErrorKind::Pack));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dir.insert(file.into(), entry);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const fn header_size() -> u64 {
|
||||||
|
8 // position: u64 BE
|
||||||
|
+ 8 // size: u64 BE
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn serialize_header<W: Sink>(&self, position: u64, w: &mut W) -> Result<u64, Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let size = self.toc_size();
|
||||||
|
|
||||||
|
w.write_all(&position.to_be_bytes()).await?;
|
||||||
|
w.write_all(&size.to_be_bytes()).await?;
|
||||||
|
|
||||||
|
Ok(position)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn toc_size(&self) -> u64 {
|
||||||
|
self.0.iter().fold(
|
||||||
|
varint::serialized_varint_size(self.0.len() as u64),
|
||||||
|
|acc, (name, entry)| {
|
||||||
|
acc + varint::serialized_varstring_size(&**name) + entry.header_size()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: ArchiveSource> DirectoryContents<Section<S>> {
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn deserialize<'a>(
|
||||||
|
source: &'a S,
|
||||||
|
header: &'a mut (impl AsyncRead + Unpin + Send),
|
||||||
|
sighash: Hash,
|
||||||
|
) -> BoxFuture<'a, Result<Self, Error>> {
|
||||||
|
async move {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut position = [0u8; 8];
|
||||||
|
header.read_exact(&mut position).await?;
|
||||||
|
let position = u64::from_be_bytes(position);
|
||||||
|
|
||||||
|
let mut size = [0u8; 8];
|
||||||
|
header.read_exact(&mut size).await?;
|
||||||
|
let size = u64::from_be_bytes(size);
|
||||||
|
|
||||||
|
let mut toc_reader = source.fetch(position, size).await?;
|
||||||
|
|
||||||
|
let len = varint::deserialize_varint(&mut toc_reader).await?;
|
||||||
|
let mut entries = BTreeMap::new();
|
||||||
|
for _ in 0..len {
|
||||||
|
entries.insert(
|
||||||
|
varint::deserialize_varstring(&mut toc_reader).await?.into(),
|
||||||
|
Entry::deserialize(source, &mut toc_reader).await?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let res = Self(entries);
|
||||||
|
|
||||||
|
if res.sighash().await? == sighash {
|
||||||
|
Ok(res)
|
||||||
|
} else {
|
||||||
|
Err(Error::new(
|
||||||
|
eyre!("hash sum does not match"),
|
||||||
|
ErrorKind::InvalidSignature,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: FileSource> DirectoryContents<S> {
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn update_hashes<'a>(&'a mut self, only_missing: bool) -> BoxFuture<'a, Result<(), Error>> {
|
||||||
|
async move {
|
||||||
|
for (_, entry) in &mut self.0 {
|
||||||
|
entry.update_hash(only_missing).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn sighash<'a>(&'a self) -> BoxFuture<'a, Result<Hash, Error>> {
|
||||||
|
async move {
|
||||||
|
let mut hasher = TrackingWriter::new(0, HashWriter::new());
|
||||||
|
let mut sig_contents = BTreeMap::new();
|
||||||
|
for (name, entry) in &self.0 {
|
||||||
|
sig_contents.insert(name.clone(), entry.to_missing().await?);
|
||||||
|
}
|
||||||
|
Self(sig_contents)
|
||||||
|
.serialize_toc(&mut WriteQueue::new(0), &mut hasher)
|
||||||
|
.await?;
|
||||||
|
Ok(hasher.into_inner().finalize())
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn serialize_toc<'a, W: Sink>(
|
||||||
|
&'a self,
|
||||||
|
queue: &mut WriteQueue<'a, S>,
|
||||||
|
w: &mut W,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
varint::serialize_varint(self.0.len() as u64, w).await?;
|
||||||
|
for (name, entry) in self.0.iter() {
|
||||||
|
varint::serialize_varstring(&**name, w).await?;
|
||||||
|
entry.serialize_header(queue.add(entry).await?, w).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S> std::ops::Deref for DirectoryContents<S> {
|
||||||
|
type Target = BTreeMap<InternedString, Entry<S>>;
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S> std::ops::DerefMut for DirectoryContents<S> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
82
backend/src/s9pk/merkle_archive/file_contents.rs
Normal file
82
backend/src/s9pk/merkle_archive/file_contents.rs
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
use tokio::io::AsyncRead;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::hash::{Hash, HashWriter};
|
||||||
|
use crate::s9pk::merkle_archive::sink::{Sink, TrackingWriter};
|
||||||
|
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FileContents<S>(S);
|
||||||
|
impl<S> FileContents<S> {
|
||||||
|
pub fn new(source: S) -> Self {
|
||||||
|
Self(source)
|
||||||
|
}
|
||||||
|
pub const fn header_size() -> u64 {
|
||||||
|
8 // position: u64 BE
|
||||||
|
+ 8 // size: u64 BE
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: ArchiveSource> FileContents<Section<S>> {
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn deserialize(
|
||||||
|
source: &S,
|
||||||
|
header: &mut (impl AsyncRead + Unpin + Send),
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut position = [0u8; 8];
|
||||||
|
header.read_exact(&mut position).await?;
|
||||||
|
let position = u64::from_be_bytes(position);
|
||||||
|
|
||||||
|
let mut size = [0u8; 8];
|
||||||
|
header.read_exact(&mut size).await?;
|
||||||
|
let size = u64::from_be_bytes(size);
|
||||||
|
|
||||||
|
Ok(Self(source.section(position, size)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: FileSource> FileContents<S> {
|
||||||
|
pub async fn hash(&self) -> Result<Hash, Error> {
|
||||||
|
let mut hasher = TrackingWriter::new(0, HashWriter::new());
|
||||||
|
self.serialize_body(&mut hasher, None).await?;
|
||||||
|
Ok(hasher.into_inner().finalize())
|
||||||
|
}
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn serialize_header<W: Sink>(&self, position: u64, w: &mut W) -> Result<u64, Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let size = self.0.size().await?;
|
||||||
|
|
||||||
|
w.write_all(&position.to_be_bytes()).await?;
|
||||||
|
w.write_all(&size.to_be_bytes()).await?;
|
||||||
|
|
||||||
|
Ok(position)
|
||||||
|
}
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn serialize_body<W: Sink>(
|
||||||
|
&self,
|
||||||
|
w: &mut W,
|
||||||
|
verify: Option<Hash>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let start = if verify.is_some() {
|
||||||
|
Some(w.current_position().await?)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
self.0.copy_verify(w, verify).await?;
|
||||||
|
if let Some(start) = start {
|
||||||
|
ensure_code!(
|
||||||
|
w.current_position().await? - start == self.0.size().await?,
|
||||||
|
ErrorKind::Pack,
|
||||||
|
"FileSource::copy wrote a number of bytes that does not match FileSource::size"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S> std::ops::Deref for FileContents<S> {
|
||||||
|
type Target = S;
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
97
backend/src/s9pk/merkle_archive/hash.rs
Normal file
97
backend/src/s9pk/merkle_archive/hash.rs
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
pub use blake3::Hash;
|
||||||
|
use blake3::Hasher;
|
||||||
|
use tokio::io::AsyncWrite;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct HashWriter {
|
||||||
|
hasher: Hasher,
|
||||||
|
}
|
||||||
|
impl HashWriter {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
hasher: Hasher::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn finalize(self) -> Hash {
|
||||||
|
self.hasher.finalize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl AsyncWrite for HashWriter {
|
||||||
|
fn poll_write(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
_cx: &mut std::task::Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
||||||
|
self.project().hasher.update(buf);
|
||||||
|
std::task::Poll::Ready(Ok(buf.len()))
|
||||||
|
}
|
||||||
|
fn poll_flush(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
_cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||||
|
std::task::Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
fn poll_shutdown(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
_cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||||
|
std::task::Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct VerifyingWriter<W> {
|
||||||
|
verify: Option<(Hasher, Hash)>,
|
||||||
|
#[pin]
|
||||||
|
writer: W,
|
||||||
|
}
|
||||||
|
impl<W: AsyncWrite> VerifyingWriter<W> {
|
||||||
|
pub fn new(w: W, verify: Option<Hash>) -> Self {
|
||||||
|
Self {
|
||||||
|
verify: verify.map(|v| (Hasher::new(), v)),
|
||||||
|
writer: w,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn verify(self) -> Result<W, Error> {
|
||||||
|
if let Some((actual, expected)) = self.verify {
|
||||||
|
ensure_code!(
|
||||||
|
actual.finalize() == expected,
|
||||||
|
ErrorKind::InvalidSignature,
|
||||||
|
"hash sum does not match"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(self.writer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<W: AsyncWrite> AsyncWrite for VerifyingWriter<W> {
|
||||||
|
fn poll_write(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
||||||
|
let this = self.project();
|
||||||
|
match this.writer.poll_write(cx, buf) {
|
||||||
|
std::task::Poll::Ready(Ok(written)) => {
|
||||||
|
if let Some((h, _)) = this.verify {
|
||||||
|
h.update(&buf[..written]);
|
||||||
|
}
|
||||||
|
std::task::Poll::Ready(Ok(written))
|
||||||
|
}
|
||||||
|
a => a,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn poll_flush(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||||
|
self.project().writer.poll_flush(cx)
|
||||||
|
}
|
||||||
|
fn poll_shutdown(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||||
|
self.project().writer.poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
268
backend/src/s9pk/merkle_archive/mod.rs
Normal file
268
backend/src/s9pk/merkle_archive/mod.rs
Normal file
@@ -0,0 +1,268 @@
|
|||||||
|
use ed25519_dalek::{Signature, SigningKey, VerifyingKey};
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::directory_contents::DirectoryContents;
|
||||||
|
use crate::s9pk::merkle_archive::file_contents::FileContents;
|
||||||
|
use crate::s9pk::merkle_archive::hash::Hash;
|
||||||
|
use crate::s9pk::merkle_archive::sink::Sink;
|
||||||
|
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
|
||||||
|
use crate::s9pk::merkle_archive::write_queue::WriteQueue;
|
||||||
|
|
||||||
|
pub mod directory_contents;
|
||||||
|
pub mod file_contents;
|
||||||
|
pub mod hash;
|
||||||
|
pub mod sink;
|
||||||
|
pub mod source;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test;
|
||||||
|
pub mod varint;
|
||||||
|
pub mod write_queue;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Signer {
|
||||||
|
Signed(VerifyingKey, Signature),
|
||||||
|
Signer(SigningKey),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MerkleArchive<S> {
|
||||||
|
signer: Signer,
|
||||||
|
contents: DirectoryContents<S>,
|
||||||
|
}
|
||||||
|
impl<S> MerkleArchive<S> {
|
||||||
|
pub fn new(contents: DirectoryContents<S>, signer: SigningKey) -> Self {
|
||||||
|
Self {
|
||||||
|
signer: Signer::Signer(signer),
|
||||||
|
contents,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub const fn header_size() -> u64 {
|
||||||
|
32 // pubkey
|
||||||
|
+ 64 // signature
|
||||||
|
+ DirectoryContents::<Section<S>>::header_size()
|
||||||
|
}
|
||||||
|
pub fn contents(&self) -> &DirectoryContents<S> {
|
||||||
|
&self.contents
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: ArchiveSource> MerkleArchive<Section<S>> {
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn deserialize(
|
||||||
|
source: &S,
|
||||||
|
header: &mut (impl AsyncRead + Unpin + Send),
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut pubkey = [0u8; 32];
|
||||||
|
header.read_exact(&mut pubkey).await?;
|
||||||
|
let pubkey = VerifyingKey::from_bytes(&pubkey)?;
|
||||||
|
|
||||||
|
let mut signature = [0u8; 64];
|
||||||
|
header.read_exact(&mut signature).await?;
|
||||||
|
let signature = Signature::from_bytes(&signature);
|
||||||
|
|
||||||
|
let mut sighash = [0u8; 32];
|
||||||
|
header.read_exact(&mut sighash).await?;
|
||||||
|
let sighash = Hash::from_bytes(sighash);
|
||||||
|
|
||||||
|
let contents = DirectoryContents::deserialize(source, header, sighash).await?;
|
||||||
|
|
||||||
|
pubkey.verify_strict(contents.sighash().await?.as_bytes(), &signature)?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
signer: Signer::Signed(pubkey, signature),
|
||||||
|
contents,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: FileSource> MerkleArchive<S> {
|
||||||
|
pub async fn update_hashes(&mut self, only_missing: bool) -> Result<(), Error> {
|
||||||
|
self.contents.update_hashes(only_missing).await
|
||||||
|
}
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn serialize<W: Sink>(&self, w: &mut W, verify: bool) -> Result<(), Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let sighash = self.contents.sighash().await?;
|
||||||
|
|
||||||
|
let (pubkey, signature) = match &self.signer {
|
||||||
|
Signer::Signed(pubkey, signature) => (*pubkey, *signature),
|
||||||
|
Signer::Signer(s) => (s.into(), ed25519_dalek::Signer::sign(s, sighash.as_bytes())),
|
||||||
|
};
|
||||||
|
|
||||||
|
w.write_all(pubkey.as_bytes()).await?;
|
||||||
|
w.write_all(&signature.to_bytes()).await?;
|
||||||
|
w.write_all(sighash.as_bytes()).await?;
|
||||||
|
let mut next_pos = w.current_position().await?;
|
||||||
|
next_pos += DirectoryContents::<S>::header_size();
|
||||||
|
self.contents.serialize_header(next_pos, w).await?;
|
||||||
|
next_pos += self.contents.toc_size();
|
||||||
|
let mut queue = WriteQueue::new(next_pos);
|
||||||
|
self.contents.serialize_toc(&mut queue, w).await?;
|
||||||
|
queue.serialize(w, verify).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Entry<S> {
|
||||||
|
hash: Option<Hash>,
|
||||||
|
contents: EntryContents<S>,
|
||||||
|
}
|
||||||
|
impl<S> Entry<S> {
|
||||||
|
pub fn new(contents: EntryContents<S>) -> Self {
|
||||||
|
Self {
|
||||||
|
hash: None,
|
||||||
|
contents,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn hash(&self) -> Option<Hash> {
|
||||||
|
self.hash
|
||||||
|
}
|
||||||
|
pub fn as_contents(&self) -> &EntryContents<S> {
|
||||||
|
&self.contents
|
||||||
|
}
|
||||||
|
pub fn as_contents_mut(&mut self) -> &mut EntryContents<S> {
|
||||||
|
self.hash = None;
|
||||||
|
&mut self.contents
|
||||||
|
}
|
||||||
|
pub fn into_contents(self) -> EntryContents<S> {
|
||||||
|
self.contents
|
||||||
|
}
|
||||||
|
pub fn header_size(&self) -> u64 {
|
||||||
|
32 // hash
|
||||||
|
+ self.contents.header_size()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: ArchiveSource> Entry<Section<S>> {
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn deserialize(
|
||||||
|
source: &S,
|
||||||
|
header: &mut (impl AsyncRead + Unpin + Send),
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut hash = [0u8; 32];
|
||||||
|
header.read_exact(&mut hash).await?;
|
||||||
|
let hash = Hash::from_bytes(hash);
|
||||||
|
|
||||||
|
let contents = EntryContents::deserialize(source, header, hash).await?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
hash: Some(hash),
|
||||||
|
contents,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: FileSource> Entry<S> {
|
||||||
|
pub async fn to_missing(&self) -> Result<Self, Error> {
|
||||||
|
let hash = if let Some(hash) = self.hash {
|
||||||
|
hash
|
||||||
|
} else {
|
||||||
|
self.contents.hash().await?
|
||||||
|
};
|
||||||
|
Ok(Self {
|
||||||
|
hash: Some(hash),
|
||||||
|
contents: EntryContents::Missing,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub async fn update_hash(&mut self, only_missing: bool) -> Result<(), Error> {
|
||||||
|
if let EntryContents::Directory(d) = &mut self.contents {
|
||||||
|
d.update_hashes(only_missing).await?;
|
||||||
|
}
|
||||||
|
self.hash = Some(self.contents.hash().await?);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn serialize_header<W: Sink>(
|
||||||
|
&self,
|
||||||
|
position: u64,
|
||||||
|
w: &mut W,
|
||||||
|
) -> Result<Option<u64>, Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let hash = if let Some(hash) = self.hash {
|
||||||
|
hash
|
||||||
|
} else {
|
||||||
|
self.contents.hash().await?
|
||||||
|
};
|
||||||
|
w.write_all(hash.as_bytes()).await?;
|
||||||
|
self.contents.serialize_header(position, w).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum EntryContents<S> {
|
||||||
|
Missing,
|
||||||
|
File(FileContents<S>),
|
||||||
|
Directory(DirectoryContents<S>),
|
||||||
|
}
|
||||||
|
impl<S> EntryContents<S> {
|
||||||
|
fn type_id(&self) -> u8 {
|
||||||
|
match self {
|
||||||
|
Self::Missing => 0,
|
||||||
|
Self::File(_) => 1,
|
||||||
|
Self::Directory(_) => 2,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn header_size(&self) -> u64 {
|
||||||
|
1 // type
|
||||||
|
+ match self {
|
||||||
|
Self::Missing => 0,
|
||||||
|
Self::File(_) => FileContents::<S>::header_size(),
|
||||||
|
Self::Directory(_) => DirectoryContents::<S>::header_size(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: ArchiveSource> EntryContents<Section<S>> {
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn deserialize(
|
||||||
|
source: &S,
|
||||||
|
header: &mut (impl AsyncRead + Unpin + Send),
|
||||||
|
hash: Hash,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut type_id = [0u8];
|
||||||
|
header.read_exact(&mut type_id).await?;
|
||||||
|
match type_id[0] {
|
||||||
|
0 => Ok(Self::Missing),
|
||||||
|
1 => Ok(Self::File(FileContents::deserialize(source, header).await?)),
|
||||||
|
2 => Ok(Self::Directory(
|
||||||
|
DirectoryContents::deserialize(source, header, hash).await?,
|
||||||
|
)),
|
||||||
|
id => Err(Error::new(
|
||||||
|
eyre!("Unknown type id {id} found in MerkleArchive"),
|
||||||
|
ErrorKind::ParseS9pk,
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<S: FileSource> EntryContents<S> {
|
||||||
|
pub async fn hash(&self) -> Result<Hash, Error> {
|
||||||
|
match self {
|
||||||
|
Self::Missing => Err(Error::new(
|
||||||
|
eyre!("Cannot compute hash of missing file"),
|
||||||
|
ErrorKind::Pack,
|
||||||
|
)),
|
||||||
|
Self::File(f) => f.hash().await,
|
||||||
|
Self::Directory(d) => d.sighash().await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn serialize_header<W: Sink>(
|
||||||
|
&self,
|
||||||
|
position: u64,
|
||||||
|
w: &mut W,
|
||||||
|
) -> Result<Option<u64>, Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
w.write_all(&[self.type_id()]).await?;
|
||||||
|
Ok(match self {
|
||||||
|
Self::Missing => None,
|
||||||
|
Self::File(f) => Some(f.serialize_header(position, w).await?),
|
||||||
|
Self::Directory(d) => Some(d.serialize_header(position, w).await?),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
70
backend/src/s9pk/merkle_archive/sink.rs
Normal file
70
backend/src/s9pk/merkle_archive/sink.rs
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
use tokio::io::{AsyncSeek, AsyncWrite};
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait Sink: AsyncWrite + Unpin + Send {
|
||||||
|
async fn current_position(&mut self) -> Result<u64, Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<S: AsyncWrite + AsyncSeek + Unpin + Send> Sink for S {
|
||||||
|
async fn current_position(&mut self) -> Result<u64, Error> {
|
||||||
|
use tokio::io::AsyncSeekExt;
|
||||||
|
|
||||||
|
Ok(self.stream_position().await?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<W: AsyncWrite + Unpin + Send> Sink for TrackingWriter<W> {
|
||||||
|
async fn current_position(&mut self) -> Result<u64, Error> {
|
||||||
|
Ok(self.position)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct TrackingWriter<W> {
|
||||||
|
position: u64,
|
||||||
|
#[pin]
|
||||||
|
writer: W,
|
||||||
|
}
|
||||||
|
impl<W> TrackingWriter<W> {
|
||||||
|
pub fn new(start: u64, w: W) -> Self {
|
||||||
|
Self {
|
||||||
|
position: start,
|
||||||
|
writer: w,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn into_inner(self) -> W {
|
||||||
|
self.writer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<W: AsyncWrite + Unpin + Send> AsyncWrite for TrackingWriter<W> {
|
||||||
|
fn poll_write(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
||||||
|
let this = self.project();
|
||||||
|
match this.writer.poll_write(cx, buf) {
|
||||||
|
std::task::Poll::Ready(Ok(written)) => {
|
||||||
|
*this.position += written as u64;
|
||||||
|
std::task::Poll::Ready(Ok(written))
|
||||||
|
}
|
||||||
|
a => a,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn poll_flush(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||||
|
self.project().writer.poll_flush(cx)
|
||||||
|
}
|
||||||
|
fn poll_shutdown(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||||
|
self.project().writer.poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
91
backend/src/s9pk/merkle_archive/source/http.rs
Normal file
91
backend/src/s9pk/merkle_archive/source/http.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
use futures::{StreamExt, TryStreamExt};
|
||||||
|
use http::header::{ACCEPT_RANGES, RANGE};
|
||||||
|
use reqwest::{Client, Url};
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tokio_util::io::StreamReader;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::source::ArchiveSource;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct HttpSource {
|
||||||
|
url: Url,
|
||||||
|
client: Client,
|
||||||
|
range_support: Result<
|
||||||
|
(),
|
||||||
|
(), // Arc<Mutex<Option<RangelessReader>>>
|
||||||
|
>,
|
||||||
|
}
|
||||||
|
impl HttpSource {
|
||||||
|
pub async fn new(client: Client, url: Url) -> Result<Self, Error> {
|
||||||
|
let range_support = client
|
||||||
|
.head(url.clone())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.with_kind(ErrorKind::Network)?
|
||||||
|
.error_for_status()
|
||||||
|
.with_kind(ErrorKind::Network)?
|
||||||
|
.headers()
|
||||||
|
.get(ACCEPT_RANGES)
|
||||||
|
.and_then(|s| s.to_str().ok())
|
||||||
|
== Some("bytes");
|
||||||
|
Ok(Self {
|
||||||
|
url,
|
||||||
|
client,
|
||||||
|
range_support: if range_support {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
todo!() // Err(Arc::new(Mutex::new(None)))
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ArchiveSource for HttpSource {
|
||||||
|
type Reader = HttpReader;
|
||||||
|
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
|
||||||
|
match self.range_support {
|
||||||
|
Ok(_) => Ok(HttpReader::Range(StreamReader::new(if size > 0 {
|
||||||
|
self.client
|
||||||
|
.get(self.url.clone())
|
||||||
|
.header(RANGE, format!("bytes={}-{}", position, position + size - 1))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.with_kind(ErrorKind::Network)?
|
||||||
|
.error_for_status()
|
||||||
|
.with_kind(ErrorKind::Network)?
|
||||||
|
.bytes_stream()
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
|
||||||
|
.boxed()
|
||||||
|
} else {
|
||||||
|
futures::stream::empty().boxed()
|
||||||
|
}))),
|
||||||
|
_ => todo!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project(project = HttpReaderProj)]
|
||||||
|
pub enum HttpReader {
|
||||||
|
Range(#[pin] StreamReader<BoxStream<'static, Result<Bytes, std::io::Error>>, Bytes>),
|
||||||
|
// Rangeless(#[pin] RangelessReader),
|
||||||
|
}
|
||||||
|
impl AsyncRead for HttpReader {
|
||||||
|
fn poll_read(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
|
) -> std::task::Poll<std::io::Result<()>> {
|
||||||
|
match self.project() {
|
||||||
|
HttpReaderProj::Range(r) => r.poll_read(cx, buf),
|
||||||
|
// HttpReaderProj::Rangeless(r) => r.poll_read(cx, buf),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// type RangelessReader = StreamReader<BoxStream<'static, Bytes>, Bytes>;
|
||||||
120
backend/src/s9pk/merkle_archive/source/mod.rs
Normal file
120
backend/src/s9pk/merkle_archive/source/mod.rs
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use blake3::Hash;
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::hash::VerifyingWriter;
|
||||||
|
|
||||||
|
pub mod http;
|
||||||
|
pub mod multi_cursor_file;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait FileSource: Send + Sync + Sized + 'static {
|
||||||
|
type Reader: AsyncRead + Unpin + Send;
|
||||||
|
async fn size(&self) -> Result<u64, Error>;
|
||||||
|
async fn reader(&self) -> Result<Self::Reader, Error>;
|
||||||
|
async fn copy<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
|
||||||
|
tokio::io::copy(&mut self.reader().await?, w).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn copy_verify<W: AsyncWrite + Unpin + Send>(
|
||||||
|
&self,
|
||||||
|
w: &mut W,
|
||||||
|
verify: Option<Hash>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut w = VerifyingWriter::new(w, verify);
|
||||||
|
tokio::io::copy(&mut self.reader().await?, &mut w).await?;
|
||||||
|
w.verify()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn to_vec(&self, verify: Option<Hash>) -> Result<Vec<u8>, Error> {
|
||||||
|
let mut vec = Vec::with_capacity(self.size().await? as usize);
|
||||||
|
self.copy_verify(&mut vec, verify).await?;
|
||||||
|
Ok(vec)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl FileSource for PathBuf {
|
||||||
|
type Reader = File;
|
||||||
|
async fn size(&self) -> Result<u64, Error> {
|
||||||
|
Ok(tokio::fs::metadata(self).await?.len())
|
||||||
|
}
|
||||||
|
async fn reader(&self) -> Result<Self::Reader, Error> {
|
||||||
|
Ok(File::open(self).await?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl FileSource for Arc<[u8]> {
|
||||||
|
type Reader = std::io::Cursor<Self>;
|
||||||
|
async fn size(&self) -> Result<u64, Error> {
|
||||||
|
Ok(self.len() as u64)
|
||||||
|
}
|
||||||
|
async fn reader(&self) -> Result<Self::Reader, Error> {
|
||||||
|
Ok(std::io::Cursor::new(self.clone()))
|
||||||
|
}
|
||||||
|
async fn copy<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
w.write_all(&*self).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait ArchiveSource: Clone + Send + Sync + Sized + 'static {
|
||||||
|
type Reader: AsyncRead + Unpin + Send;
|
||||||
|
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error>;
|
||||||
|
async fn copy_to<W: AsyncWrite + Unpin + Send>(
|
||||||
|
&self,
|
||||||
|
position: u64,
|
||||||
|
size: u64,
|
||||||
|
w: &mut W,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
tokio::io::copy(&mut self.fetch(position, size).await?, w).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
fn section(&self, position: u64, size: u64) -> Section<Self> {
|
||||||
|
Section {
|
||||||
|
source: self.clone(),
|
||||||
|
position,
|
||||||
|
size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ArchiveSource for Arc<[u8]> {
|
||||||
|
type Reader = tokio::io::Take<std::io::Cursor<Self>>;
|
||||||
|
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut cur = std::io::Cursor::new(self.clone());
|
||||||
|
cur.set_position(position);
|
||||||
|
Ok(cur.take(size))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Section<S> {
|
||||||
|
source: S,
|
||||||
|
position: u64,
|
||||||
|
size: u64,
|
||||||
|
}
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<S: ArchiveSource> FileSource for Section<S> {
|
||||||
|
type Reader = S::Reader;
|
||||||
|
async fn size(&self) -> Result<u64, Error> {
|
||||||
|
Ok(self.size)
|
||||||
|
}
|
||||||
|
async fn reader(&self) -> Result<Self::Reader, Error> {
|
||||||
|
self.source.fetch(self.position, self.size).await
|
||||||
|
}
|
||||||
|
async fn copy<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
|
||||||
|
self.source.copy_to(self.position, self.size, w).await
|
||||||
|
}
|
||||||
|
}
|
||||||
84
backend/src/s9pk/merkle_archive/source/multi_cursor_file.rs
Normal file
84
backend/src/s9pk/merkle_archive/source/multi_cursor_file.rs
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
use std::io::SeekFrom;
|
||||||
|
use std::os::fd::{AsRawFd, RawFd};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||||
|
|
||||||
|
use crate::disk::mount::filesystem::loop_dev::LoopDev;
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::source::{ArchiveSource, Section};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MultiCursorFile {
|
||||||
|
fd: RawFd,
|
||||||
|
file: Arc<Mutex<File>>,
|
||||||
|
}
|
||||||
|
impl MultiCursorFile {
|
||||||
|
fn path(&self) -> PathBuf {
|
||||||
|
Path::new("/proc/self/fd").join(self.fd.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<File> for MultiCursorFile {
|
||||||
|
fn from(value: File) -> Self {
|
||||||
|
Self {
|
||||||
|
fd: value.as_raw_fd(),
|
||||||
|
file: Arc::new(Mutex::new(value)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct FileSectionReader {
|
||||||
|
#[pin]
|
||||||
|
file: OwnedMutexGuard<File>,
|
||||||
|
remaining: u64,
|
||||||
|
}
|
||||||
|
impl AsyncRead for FileSectionReader {
|
||||||
|
fn poll_read(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
|
) -> std::task::Poll<std::io::Result<()>> {
|
||||||
|
let this = self.project();
|
||||||
|
if *this.remaining == 0 {
|
||||||
|
return std::task::Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
|
let before = buf.filled().len() as u64;
|
||||||
|
let res = std::pin::Pin::new(&mut **this.file.get_mut())
|
||||||
|
.poll_read(cx, &mut buf.take(*this.remaining as usize));
|
||||||
|
*this.remaining = this
|
||||||
|
.remaining
|
||||||
|
.saturating_sub(buf.filled().len() as u64 - before);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ArchiveSource for MultiCursorFile {
|
||||||
|
type Reader = FileSectionReader;
|
||||||
|
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
|
||||||
|
use tokio::io::AsyncSeekExt;
|
||||||
|
|
||||||
|
let mut file = if let Ok(file) = self.file.clone().try_lock_owned() {
|
||||||
|
file
|
||||||
|
} else {
|
||||||
|
Arc::new(Mutex::new(File::open(self.path()).await?))
|
||||||
|
.try_lock_owned()
|
||||||
|
.expect("freshly created")
|
||||||
|
};
|
||||||
|
file.seek(SeekFrom::Start(position)).await?;
|
||||||
|
Ok(Self::Reader {
|
||||||
|
file,
|
||||||
|
remaining: size,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Section<MultiCursorFile>> for LoopDev<PathBuf> {
|
||||||
|
fn from(value: Section<MultiCursorFile>) -> Self {
|
||||||
|
LoopDev::new(value.source.path(), value.position, value.size)
|
||||||
|
}
|
||||||
|
}
|
||||||
138
backend/src/s9pk/merkle_archive/test.rs
Normal file
138
backend/src/s9pk/merkle_archive/test.rs
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::io::Cursor;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use ed25519_dalek::SigningKey;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::directory_contents::DirectoryContents;
|
||||||
|
use crate::s9pk::merkle_archive::file_contents::FileContents;
|
||||||
|
use crate::s9pk::merkle_archive::sink::TrackingWriter;
|
||||||
|
use crate::s9pk::merkle_archive::source::FileSource;
|
||||||
|
use crate::s9pk::merkle_archive::{Entry, EntryContents, MerkleArchive};
|
||||||
|
|
||||||
|
/// Creates a MerkleArchive (a1) with the provided files at the provided paths. NOTE: later files can overwrite previous files/directories at the same path
|
||||||
|
/// Tests:
|
||||||
|
/// - a1.update_hashes(): returns Ok(_)
|
||||||
|
/// - a1.serialize(verify: true): returns Ok(s1)
|
||||||
|
/// - MerkleArchive::deserialize(s1): returns Ok(a2)
|
||||||
|
/// - a2: contains all expected files with expected content
|
||||||
|
/// - a2.serialize(verify: true): returns Ok(s2)
|
||||||
|
/// - s1 == s2
|
||||||
|
#[instrument]
|
||||||
|
fn test(files: Vec<(PathBuf, String)>) -> Result<(), Error> {
|
||||||
|
let mut root = DirectoryContents::<Arc<[u8]>>::new();
|
||||||
|
let mut check_set = BTreeMap::<PathBuf, String>::new();
|
||||||
|
for (path, content) in files {
|
||||||
|
if let Err(e) = root.insert_path(
|
||||||
|
&path,
|
||||||
|
Entry::new(EntryContents::File(FileContents::new(
|
||||||
|
content.clone().into_bytes().into(),
|
||||||
|
))),
|
||||||
|
) {
|
||||||
|
eprintln!("failed to insert file at {path:?}: {e}");
|
||||||
|
} else {
|
||||||
|
let path = path.strip_prefix("/").unwrap_or(&path);
|
||||||
|
let mut remaining = check_set.split_off(path);
|
||||||
|
while {
|
||||||
|
if let Some((p, s)) = remaining.pop_first() {
|
||||||
|
if !p.starts_with(path) {
|
||||||
|
remaining.insert(p, s);
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
} {}
|
||||||
|
check_set.append(&mut remaining);
|
||||||
|
check_set.insert(path.to_owned(), content);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let key = SigningKey::generate(&mut rand::thread_rng());
|
||||||
|
let mut a1 = MerkleArchive::new(root, key);
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_io()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(async move {
|
||||||
|
a1.update_hashes(true).await?;
|
||||||
|
let mut s1 = Vec::new();
|
||||||
|
a1.serialize(&mut TrackingWriter::new(0, &mut s1), true)
|
||||||
|
.await?;
|
||||||
|
let s1: Arc<[u8]> = s1.into();
|
||||||
|
let a2 = MerkleArchive::deserialize(&s1, &mut Cursor::new(s1.clone())).await?;
|
||||||
|
|
||||||
|
for (path, content) in check_set {
|
||||||
|
match a2
|
||||||
|
.contents
|
||||||
|
.get_path(&path)
|
||||||
|
.map(|e| (e.as_contents(), e.hash()))
|
||||||
|
{
|
||||||
|
Some((EntryContents::File(f), hash)) => {
|
||||||
|
ensure_code!(
|
||||||
|
&f.to_vec(hash).await? == content.as_bytes(),
|
||||||
|
ErrorKind::ParseS9pk,
|
||||||
|
"File at {path:?} does not match input"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return Err(Error::new(
|
||||||
|
eyre!("expected file at {path:?}"),
|
||||||
|
ErrorKind::ParseS9pk,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut s2 = Vec::new();
|
||||||
|
a2.serialize(&mut TrackingWriter::new(0, &mut s2), true)
|
||||||
|
.await?;
|
||||||
|
let s2: Arc<[u8]> = s2.into();
|
||||||
|
ensure_code!(s1 == s2, ErrorKind::Pack, "s1 does not match s2");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
proptest::proptest! {
|
||||||
|
#[test]
|
||||||
|
fn property_test(files: Vec<(PathBuf, String)>) {
|
||||||
|
let files: Vec<(PathBuf, String)> = files.into_iter().filter(|(p, _)| p.file_name().is_some() && p.iter().all(|s| s.to_str().is_some())).collect();
|
||||||
|
if let Err(e) = test(files.clone()) {
|
||||||
|
panic!("{e}\nInput: {files:#?}\n{e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_example_1() {
|
||||||
|
if let Err(e) = test(vec![(Path::new("foo").into(), "bar".into())]) {
|
||||||
|
panic!("{e}\n{e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_example_2() {
|
||||||
|
if let Err(e) = test(vec![
|
||||||
|
(Path::new("a/a.txt").into(), "a.txt".into()),
|
||||||
|
(Path::new("a/b/a.txt").into(), "a.txt".into()),
|
||||||
|
(Path::new("a/b/b/a.txt").into(), "a.txt".into()),
|
||||||
|
(Path::new("a/b/c.txt").into(), "c.txt".into()),
|
||||||
|
(Path::new("a/c.txt").into(), "c.txt".into()),
|
||||||
|
]) {
|
||||||
|
panic!("{e}\n{e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_example_3() {
|
||||||
|
if let Err(e) = test(vec![
|
||||||
|
(Path::new("b/a").into(), "𑦪".into()),
|
||||||
|
(Path::new("a/c/a").into(), "·".into()),
|
||||||
|
]) {
|
||||||
|
panic!("{e}\n{e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
159
backend/src/s9pk/merkle_archive/varint.rs
Normal file
159
backend/src/s9pk/merkle_archive/varint.rs
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
use integer_encoding::VarInt;
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
|
||||||
|
/// Most-significant byte, == 0x80
|
||||||
|
pub const MSB: u8 = 0b1000_0000;
|
||||||
|
|
||||||
|
const MAX_STR_LEN: u64 = 1024 * 1024; // 1 MiB
|
||||||
|
|
||||||
|
pub fn serialized_varint_size(n: u64) -> u64 {
|
||||||
|
VarInt::required_space(n) as u64
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn serialize_varint<W: AsyncWrite + Unpin + Send>(
|
||||||
|
n: u64,
|
||||||
|
w: &mut W,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let mut buf = [0 as u8; 10];
|
||||||
|
let b = n.encode_var(&mut buf);
|
||||||
|
w.write_all(&buf[0..b]).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialized_varstring_size(s: &str) -> u64 {
|
||||||
|
serialized_varint_size(s.len() as u64) + s.len() as u64
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn serialize_varstring<W: AsyncWrite + Unpin + Send>(
|
||||||
|
s: &str,
|
||||||
|
w: &mut W,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
serialize_varint(s.len() as u64, w).await?;
|
||||||
|
w.write_all(s.as_bytes()).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct VarIntProcessor {
|
||||||
|
buf: [u8; 10],
|
||||||
|
maxsize: usize,
|
||||||
|
i: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VarIntProcessor {
|
||||||
|
fn new() -> VarIntProcessor {
|
||||||
|
VarIntProcessor {
|
||||||
|
maxsize: (std::mem::size_of::<u64>() * 8 + 7) / 7,
|
||||||
|
..VarIntProcessor::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn push(&mut self, b: u8) -> Result<(), Error> {
|
||||||
|
if self.i >= self.maxsize {
|
||||||
|
return Err(Error::new(
|
||||||
|
eyre!("Unterminated varint"),
|
||||||
|
ErrorKind::ParseS9pk,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
self.buf[self.i] = b;
|
||||||
|
self.i += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
fn finished(&self) -> bool {
|
||||||
|
self.i > 0 && (self.buf[self.i - 1] & MSB == 0)
|
||||||
|
}
|
||||||
|
fn decode(&self) -> Option<u64> {
|
||||||
|
Some(u64::decode_var(&self.buf[0..self.i])?.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn deserialize_varint<R: AsyncRead + Unpin>(r: &mut R) -> Result<u64, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut buf = [0 as u8; 1];
|
||||||
|
let mut p = VarIntProcessor::new();
|
||||||
|
|
||||||
|
while !p.finished() {
|
||||||
|
r.read_exact(&mut buf).await?;
|
||||||
|
|
||||||
|
p.push(buf[0])?;
|
||||||
|
}
|
||||||
|
|
||||||
|
p.decode()
|
||||||
|
.ok_or_else(|| Error::new(eyre!("Reached EOF"), ErrorKind::ParseS9pk))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn deserialize_varstring<R: AsyncRead + Unpin>(r: &mut R) -> Result<String, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let len = std::cmp::min(deserialize_varint(r).await?, MAX_STR_LEN);
|
||||||
|
let mut res = String::with_capacity(len as usize);
|
||||||
|
r.take(len).read_to_string(&mut res).await?;
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use std::io::Cursor;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
|
||||||
|
fn test_int(n: u64) -> Result<(), Error> {
|
||||||
|
let n1 = n;
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_io()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(async move {
|
||||||
|
let mut v = Vec::new();
|
||||||
|
super::serialize_varint(n1, &mut v).await?;
|
||||||
|
let n2 = super::deserialize_varint(&mut Cursor::new(v)).await?;
|
||||||
|
|
||||||
|
ensure_code!(n1 == n2, ErrorKind::Deserialization, "n1 does not match n2");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_string(s: &str) -> Result<(), Error> {
|
||||||
|
let s1 = s;
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_io()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(async move {
|
||||||
|
let mut v: Vec<u8> = Vec::new();
|
||||||
|
super::serialize_varstring(&s1, &mut v).await?;
|
||||||
|
let s2 = super::deserialize_varstring(&mut Cursor::new(v)).await?;
|
||||||
|
|
||||||
|
ensure_code!(
|
||||||
|
s1 == &s2,
|
||||||
|
ErrorKind::Deserialization,
|
||||||
|
"s1 does not match s2"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
proptest::proptest! {
|
||||||
|
#[test]
|
||||||
|
fn proptest_int(n: u64) {
|
||||||
|
if let Err(e) = test_int(n) {
|
||||||
|
panic!("{e}\nInput: {n}\n{e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn proptest_string(s: String) {
|
||||||
|
if let Err(e) = test_string(&s) {
|
||||||
|
panic!("{e}\nInput: {s:?}\n{e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
47
backend/src/s9pk/merkle_archive/write_queue.rs
Normal file
47
backend/src/s9pk/merkle_archive/write_queue.rs
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::sink::Sink;
|
||||||
|
use crate::s9pk::merkle_archive::source::FileSource;
|
||||||
|
use crate::s9pk::merkle_archive::{Entry, EntryContents};
|
||||||
|
use crate::util::MaybeOwned;
|
||||||
|
|
||||||
|
pub struct WriteQueue<'a, S> {
|
||||||
|
next_available_position: u64,
|
||||||
|
queue: VecDeque<&'a Entry<S>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, S> WriteQueue<'a, S> {
|
||||||
|
pub fn new(next_available_position: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
next_available_position,
|
||||||
|
queue: VecDeque::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<'a, S: FileSource> WriteQueue<'a, S> {
|
||||||
|
pub async fn add(&mut self, entry: &'a Entry<S>) -> Result<u64, Error> {
|
||||||
|
let res = self.next_available_position;
|
||||||
|
let size = match entry.as_contents() {
|
||||||
|
EntryContents::Missing => return Ok(0),
|
||||||
|
EntryContents::File(f) => f.size().await?,
|
||||||
|
EntryContents::Directory(d) => d.toc_size(),
|
||||||
|
};
|
||||||
|
self.next_available_position += size;
|
||||||
|
self.queue.push_back(entry);
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
pub async fn serialize<W: Sink>(&mut self, w: &mut W, verify: bool) -> Result<(), Error> {
|
||||||
|
loop {
|
||||||
|
let Some(next) = self.queue.pop_front() else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
match next.as_contents() {
|
||||||
|
EntryContents::Missing => (),
|
||||||
|
EntryContents::File(f) => f.serialize_body(w, next.hash.filter(|_| verify)).await?,
|
||||||
|
EntryContents::Directory(d) => d.serialize_toc(self, w).await?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,246 +1,5 @@
|
|||||||
use std::ffi::OsStr;
|
pub mod merkle_archive;
|
||||||
use std::path::PathBuf;
|
pub mod v1;
|
||||||
|
pub mod v2;
|
||||||
|
|
||||||
use color_eyre::eyre::eyre;
|
pub use v1::*;
|
||||||
use futures::TryStreamExt;
|
|
||||||
use imbl::OrdMap;
|
|
||||||
use rpc_toolkit::command;
|
|
||||||
use serde_json::Value;
|
|
||||||
use tokio::io::AsyncRead;
|
|
||||||
use tracing::instrument;
|
|
||||||
|
|
||||||
use crate::context::SdkContext;
|
|
||||||
use crate::s9pk::builder::S9pkPacker;
|
|
||||||
use crate::s9pk::docker::DockerMultiArch;
|
|
||||||
use crate::s9pk::git_hash::GitHash;
|
|
||||||
use crate::s9pk::manifest::Manifest;
|
|
||||||
use crate::s9pk::reader::S9pkReader;
|
|
||||||
use crate::util::display_none;
|
|
||||||
use crate::util::io::BufferedWriteReader;
|
|
||||||
use crate::util::serde::IoFormat;
|
|
||||||
use crate::volume::Volume;
|
|
||||||
use crate::{Error, ErrorKind, ResultExt};
|
|
||||||
|
|
||||||
pub mod builder;
|
|
||||||
pub mod docker;
|
|
||||||
pub mod git_hash;
|
|
||||||
pub mod header;
|
|
||||||
pub mod manifest;
|
|
||||||
pub mod reader;
|
|
||||||
|
|
||||||
pub const SIG_CONTEXT: &[u8] = b"s9pk";
|
|
||||||
|
|
||||||
#[command(cli_only, display(display_none))]
|
|
||||||
#[instrument(skip_all)]
|
|
||||||
pub async fn pack(#[context] ctx: SdkContext, #[arg] path: Option<PathBuf>) -> Result<(), Error> {
|
|
||||||
use tokio::fs::File;
|
|
||||||
|
|
||||||
let path = if let Some(path) = path {
|
|
||||||
path
|
|
||||||
} else {
|
|
||||||
std::env::current_dir()?
|
|
||||||
};
|
|
||||||
let manifest_value: Value = if path.join("manifest.toml").exists() {
|
|
||||||
IoFormat::Toml
|
|
||||||
.from_async_reader(File::open(path.join("manifest.toml")).await?)
|
|
||||||
.await?
|
|
||||||
} else if path.join("manifest.yaml").exists() {
|
|
||||||
IoFormat::Yaml
|
|
||||||
.from_async_reader(File::open(path.join("manifest.yaml")).await?)
|
|
||||||
.await?
|
|
||||||
} else if path.join("manifest.json").exists() {
|
|
||||||
IoFormat::Json
|
|
||||||
.from_async_reader(File::open(path.join("manifest.json")).await?)
|
|
||||||
.await?
|
|
||||||
} else {
|
|
||||||
return Err(Error::new(
|
|
||||||
eyre!("manifest not found"),
|
|
||||||
crate::ErrorKind::Pack,
|
|
||||||
));
|
|
||||||
};
|
|
||||||
|
|
||||||
let manifest: Manifest = serde_json::from_value::<Manifest>(manifest_value.clone())
|
|
||||||
.with_kind(crate::ErrorKind::Deserialization)?
|
|
||||||
.with_git_hash(GitHash::from_path(&path).await?);
|
|
||||||
let extra_keys =
|
|
||||||
enumerate_extra_keys(&serde_json::to_value(&manifest).unwrap(), &manifest_value);
|
|
||||||
for k in extra_keys {
|
|
||||||
tracing::warn!("Unrecognized Manifest Key: {}", k);
|
|
||||||
}
|
|
||||||
|
|
||||||
let outfile_path = path.join(format!("{}.s9pk", manifest.id));
|
|
||||||
let mut outfile = File::create(outfile_path).await?;
|
|
||||||
S9pkPacker::builder()
|
|
||||||
.manifest(&manifest)
|
|
||||||
.writer(&mut outfile)
|
|
||||||
.license(
|
|
||||||
File::open(path.join(manifest.assets.license_path()))
|
|
||||||
.await
|
|
||||||
.with_ctx(|_| {
|
|
||||||
(
|
|
||||||
crate::ErrorKind::Filesystem,
|
|
||||||
manifest.assets.license_path().display().to_string(),
|
|
||||||
)
|
|
||||||
})?,
|
|
||||||
)
|
|
||||||
.icon(
|
|
||||||
File::open(path.join(manifest.assets.icon_path()))
|
|
||||||
.await
|
|
||||||
.with_ctx(|_| {
|
|
||||||
(
|
|
||||||
crate::ErrorKind::Filesystem,
|
|
||||||
manifest.assets.icon_path().display().to_string(),
|
|
||||||
)
|
|
||||||
})?,
|
|
||||||
)
|
|
||||||
.instructions(
|
|
||||||
File::open(path.join(manifest.assets.instructions_path()))
|
|
||||||
.await
|
|
||||||
.with_ctx(|_| {
|
|
||||||
(
|
|
||||||
crate::ErrorKind::Filesystem,
|
|
||||||
manifest.assets.instructions_path().display().to_string(),
|
|
||||||
)
|
|
||||||
})?,
|
|
||||||
)
|
|
||||||
.docker_images({
|
|
||||||
let docker_images_path = path.join(manifest.assets.docker_images_path());
|
|
||||||
let res: Box<dyn AsyncRead + Unpin + Send + Sync> = if tokio::fs::metadata(&docker_images_path).await?.is_dir() {
|
|
||||||
let tars: Vec<_> = tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(&docker_images_path).await?).try_collect().await?;
|
|
||||||
let mut arch_info = DockerMultiArch::default();
|
|
||||||
for tar in &tars {
|
|
||||||
if tar.path().extension() == Some(OsStr::new("tar")) {
|
|
||||||
arch_info.available.insert(tar.path().file_stem().unwrap_or_default().to_str().unwrap_or_default().to_owned());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if arch_info.available.contains("aarch64") {
|
|
||||||
arch_info.default = "aarch64".to_owned();
|
|
||||||
} else {
|
|
||||||
arch_info.default = arch_info.available.iter().next().cloned().unwrap_or_default();
|
|
||||||
}
|
|
||||||
let arch_info_cbor = IoFormat::Cbor.to_vec(&arch_info)?;
|
|
||||||
Box::new(BufferedWriteReader::new(|w| async move {
|
|
||||||
let mut docker_images = tokio_tar::Builder::new(w);
|
|
||||||
let mut multiarch_header = tokio_tar::Header::new_gnu();
|
|
||||||
multiarch_header.set_path("multiarch.cbor")?;
|
|
||||||
multiarch_header.set_size(arch_info_cbor.len() as u64);
|
|
||||||
multiarch_header.set_cksum();
|
|
||||||
docker_images.append(&multiarch_header, std::io::Cursor::new(arch_info_cbor)).await?;
|
|
||||||
for tar in tars
|
|
||||||
{
|
|
||||||
docker_images
|
|
||||||
.append_path_with_name(
|
|
||||||
tar.path(),
|
|
||||||
tar.file_name(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Ok::<_, std::io::Error>(())
|
|
||||||
}, 1024 * 1024))
|
|
||||||
} else {
|
|
||||||
Box::new(File::open(docker_images_path)
|
|
||||||
.await
|
|
||||||
.with_ctx(|_| {
|
|
||||||
(
|
|
||||||
crate::ErrorKind::Filesystem,
|
|
||||||
manifest.assets.docker_images_path().display().to_string(),
|
|
||||||
)
|
|
||||||
})?)
|
|
||||||
};
|
|
||||||
res
|
|
||||||
})
|
|
||||||
.assets({
|
|
||||||
let asset_volumes = manifest
|
|
||||||
.volumes
|
|
||||||
.iter()
|
|
||||||
.filter(|(_, v)| matches!(v, &&Volume::Assets {})).map(|(id, _)| id.clone()).collect::<Vec<_>>();
|
|
||||||
let assets_path = manifest.assets.assets_path().to_owned();
|
|
||||||
let path = path.clone();
|
|
||||||
|
|
||||||
BufferedWriteReader::new(|w| async move {
|
|
||||||
let mut assets = tokio_tar::Builder::new(w);
|
|
||||||
for asset_volume in asset_volumes
|
|
||||||
{
|
|
||||||
assets
|
|
||||||
.append_dir_all(
|
|
||||||
&asset_volume,
|
|
||||||
path.join(&assets_path).join(&asset_volume),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Ok::<_, std::io::Error>(())
|
|
||||||
}, 1024 * 1024)
|
|
||||||
})
|
|
||||||
.scripts({
|
|
||||||
let script_path = path.join(manifest.assets.scripts_path()).join("embassy.js");
|
|
||||||
let needs_script = manifest.package_procedures().any(|a| a.is_script());
|
|
||||||
let has_script = script_path.exists();
|
|
||||||
match (needs_script, has_script) {
|
|
||||||
(true, true) => Some(File::open(script_path).await?),
|
|
||||||
(true, false) => {
|
|
||||||
return Err(Error::new(eyre!("Script is declared in manifest, but no such script exists at ./scripts/embassy.js"), ErrorKind::Pack).into())
|
|
||||||
}
|
|
||||||
(false, true) => {
|
|
||||||
tracing::warn!("Manifest does not declare any actions that use scripts, but a script exists at ./scripts/embassy.js");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
(false, false) => None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.build()
|
|
||||||
.pack(&ctx.developer_key()?)
|
|
||||||
.await?;
|
|
||||||
outfile.sync_all().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[command(rename = "s9pk", cli_only, display(display_none))]
|
|
||||||
pub async fn verify(#[arg] path: PathBuf) -> Result<(), Error> {
|
|
||||||
let mut s9pk = S9pkReader::open(path, true).await?;
|
|
||||||
s9pk.validate().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn enumerate_extra_keys(reference: &Value, candidate: &Value) -> Vec<String> {
|
|
||||||
match (reference, candidate) {
|
|
||||||
(Value::Object(m_r), Value::Object(m_c)) => {
|
|
||||||
let om_r: OrdMap<String, Value> = m_r.clone().into_iter().collect();
|
|
||||||
let om_c: OrdMap<String, Value> = m_c.clone().into_iter().collect();
|
|
||||||
let common = om_r.clone().intersection(om_c.clone());
|
|
||||||
let top_extra = common.clone().symmetric_difference(om_c.clone());
|
|
||||||
let mut all_extra = top_extra
|
|
||||||
.keys()
|
|
||||||
.map(|s| format!(".{}", s))
|
|
||||||
.collect::<Vec<String>>();
|
|
||||||
for (k, v) in common {
|
|
||||||
all_extra.extend(
|
|
||||||
enumerate_extra_keys(&v, om_c.get(&k).unwrap())
|
|
||||||
.into_iter()
|
|
||||||
.map(|s| format!(".{}{}", k, s)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
all_extra
|
|
||||||
}
|
|
||||||
(_, Value::Object(m1)) => m1.clone().keys().map(|s| format!(".{}", s)).collect(),
|
|
||||||
_ => Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_enumerate_extra_keys() {
|
|
||||||
use serde_json::json;
|
|
||||||
let extras = enumerate_extra_keys(
|
|
||||||
&json!({
|
|
||||||
"test": 1,
|
|
||||||
"test2": null,
|
|
||||||
}),
|
|
||||||
&json!({
|
|
||||||
"test": 1,
|
|
||||||
"test2": { "test3": null },
|
|
||||||
"test4": null
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
println!("{:?}", extras)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,28 +0,0 @@
|
|||||||
## Header
|
|
||||||
|
|
||||||
### Magic
|
|
||||||
|
|
||||||
2B: `0x3b3b`
|
|
||||||
|
|
||||||
### Version
|
|
||||||
|
|
||||||
varint: `0x02`
|
|
||||||
|
|
||||||
### Pubkey
|
|
||||||
|
|
||||||
32B: ed25519 pubkey
|
|
||||||
|
|
||||||
### TOC
|
|
||||||
|
|
||||||
- number of sections (varint)
|
|
||||||
- FOREACH section
|
|
||||||
- sig (32B: ed25519 signature of BLAKE-3 of rest of section)
|
|
||||||
- name (varstring)
|
|
||||||
- TYPE (varint)
|
|
||||||
- TYPE=FILE (`0x01`)
|
|
||||||
- mime (varstring)
|
|
||||||
- pos (32B: u64 BE)
|
|
||||||
- len (32B: u64 BE)
|
|
||||||
- hash (32B: BLAKE-3 of file contents)
|
|
||||||
- TYPE=TOC (`0x02`)
|
|
||||||
- recursively defined
|
|
||||||
246
backend/src/s9pk/v1/mod.rs
Normal file
246
backend/src/s9pk/v1/mod.rs
Normal file
@@ -0,0 +1,246 @@
|
|||||||
|
use std::ffi::OsStr;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use color_eyre::eyre::eyre;
|
||||||
|
use futures::TryStreamExt;
|
||||||
|
use imbl::OrdMap;
|
||||||
|
use rpc_toolkit::command;
|
||||||
|
use serde_json::Value;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
use tracing::instrument;
|
||||||
|
|
||||||
|
use crate::context::SdkContext;
|
||||||
|
use crate::s9pk::builder::S9pkPacker;
|
||||||
|
use crate::s9pk::docker::DockerMultiArch;
|
||||||
|
use crate::s9pk::git_hash::GitHash;
|
||||||
|
use crate::s9pk::manifest::Manifest;
|
||||||
|
use crate::s9pk::reader::S9pkReader;
|
||||||
|
use crate::util::display_none;
|
||||||
|
use crate::util::io::BufferedWriteReader;
|
||||||
|
use crate::util::serde::IoFormat;
|
||||||
|
use crate::volume::Volume;
|
||||||
|
use crate::{Error, ErrorKind, ResultExt};
|
||||||
|
|
||||||
|
pub mod builder;
|
||||||
|
pub mod docker;
|
||||||
|
pub mod git_hash;
|
||||||
|
pub mod header;
|
||||||
|
pub mod manifest;
|
||||||
|
pub mod reader;
|
||||||
|
|
||||||
|
pub const SIG_CONTEXT: &[u8] = b"s9pk";
|
||||||
|
|
||||||
|
#[command(cli_only, display(display_none))]
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn pack(#[context] ctx: SdkContext, #[arg] path: Option<PathBuf>) -> Result<(), Error> {
|
||||||
|
use tokio::fs::File;
|
||||||
|
|
||||||
|
let path = if let Some(path) = path {
|
||||||
|
path
|
||||||
|
} else {
|
||||||
|
std::env::current_dir()?
|
||||||
|
};
|
||||||
|
let manifest_value: Value = if path.join("manifest.toml").exists() {
|
||||||
|
IoFormat::Toml
|
||||||
|
.from_async_reader(File::open(path.join("manifest.toml")).await?)
|
||||||
|
.await?
|
||||||
|
} else if path.join("manifest.yaml").exists() {
|
||||||
|
IoFormat::Yaml
|
||||||
|
.from_async_reader(File::open(path.join("manifest.yaml")).await?)
|
||||||
|
.await?
|
||||||
|
} else if path.join("manifest.json").exists() {
|
||||||
|
IoFormat::Json
|
||||||
|
.from_async_reader(File::open(path.join("manifest.json")).await?)
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
return Err(Error::new(
|
||||||
|
eyre!("manifest not found"),
|
||||||
|
crate::ErrorKind::Pack,
|
||||||
|
));
|
||||||
|
};
|
||||||
|
|
||||||
|
let manifest: Manifest = serde_json::from_value::<Manifest>(manifest_value.clone())
|
||||||
|
.with_kind(crate::ErrorKind::Deserialization)?
|
||||||
|
.with_git_hash(GitHash::from_path(&path).await?);
|
||||||
|
let extra_keys =
|
||||||
|
enumerate_extra_keys(&serde_json::to_value(&manifest).unwrap(), &manifest_value);
|
||||||
|
for k in extra_keys {
|
||||||
|
tracing::warn!("Unrecognized Manifest Key: {}", k);
|
||||||
|
}
|
||||||
|
|
||||||
|
let outfile_path = path.join(format!("{}.s9pk", manifest.id));
|
||||||
|
let mut outfile = File::create(outfile_path).await?;
|
||||||
|
S9pkPacker::builder()
|
||||||
|
.manifest(&manifest)
|
||||||
|
.writer(&mut outfile)
|
||||||
|
.license(
|
||||||
|
File::open(path.join(manifest.assets.license_path()))
|
||||||
|
.await
|
||||||
|
.with_ctx(|_| {
|
||||||
|
(
|
||||||
|
crate::ErrorKind::Filesystem,
|
||||||
|
manifest.assets.license_path().display().to_string(),
|
||||||
|
)
|
||||||
|
})?,
|
||||||
|
)
|
||||||
|
.icon(
|
||||||
|
File::open(path.join(manifest.assets.icon_path()))
|
||||||
|
.await
|
||||||
|
.with_ctx(|_| {
|
||||||
|
(
|
||||||
|
crate::ErrorKind::Filesystem,
|
||||||
|
manifest.assets.icon_path().display().to_string(),
|
||||||
|
)
|
||||||
|
})?,
|
||||||
|
)
|
||||||
|
.instructions(
|
||||||
|
File::open(path.join(manifest.assets.instructions_path()))
|
||||||
|
.await
|
||||||
|
.with_ctx(|_| {
|
||||||
|
(
|
||||||
|
crate::ErrorKind::Filesystem,
|
||||||
|
manifest.assets.instructions_path().display().to_string(),
|
||||||
|
)
|
||||||
|
})?,
|
||||||
|
)
|
||||||
|
.docker_images({
|
||||||
|
let docker_images_path = path.join(manifest.assets.docker_images_path());
|
||||||
|
let res: Box<dyn AsyncRead + Unpin + Send + Sync> = if tokio::fs::metadata(&docker_images_path).await?.is_dir() {
|
||||||
|
let tars: Vec<_> = tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(&docker_images_path).await?).try_collect().await?;
|
||||||
|
let mut arch_info = DockerMultiArch::default();
|
||||||
|
for tar in &tars {
|
||||||
|
if tar.path().extension() == Some(OsStr::new("tar")) {
|
||||||
|
arch_info.available.insert(tar.path().file_stem().unwrap_or_default().to_str().unwrap_or_default().to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if arch_info.available.contains("aarch64") {
|
||||||
|
arch_info.default = "aarch64".to_owned();
|
||||||
|
} else {
|
||||||
|
arch_info.default = arch_info.available.iter().next().cloned().unwrap_or_default();
|
||||||
|
}
|
||||||
|
let arch_info_cbor = IoFormat::Cbor.to_vec(&arch_info)?;
|
||||||
|
Box::new(BufferedWriteReader::new(|w| async move {
|
||||||
|
let mut docker_images = tokio_tar::Builder::new(w);
|
||||||
|
let mut multiarch_header = tokio_tar::Header::new_gnu();
|
||||||
|
multiarch_header.set_path("multiarch.cbor")?;
|
||||||
|
multiarch_header.set_size(arch_info_cbor.len() as u64);
|
||||||
|
multiarch_header.set_cksum();
|
||||||
|
docker_images.append(&multiarch_header, std::io::Cursor::new(arch_info_cbor)).await?;
|
||||||
|
for tar in tars
|
||||||
|
{
|
||||||
|
docker_images
|
||||||
|
.append_path_with_name(
|
||||||
|
tar.path(),
|
||||||
|
tar.file_name(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Ok::<_, std::io::Error>(())
|
||||||
|
}, 1024 * 1024))
|
||||||
|
} else {
|
||||||
|
Box::new(File::open(docker_images_path)
|
||||||
|
.await
|
||||||
|
.with_ctx(|_| {
|
||||||
|
(
|
||||||
|
crate::ErrorKind::Filesystem,
|
||||||
|
manifest.assets.docker_images_path().display().to_string(),
|
||||||
|
)
|
||||||
|
})?)
|
||||||
|
};
|
||||||
|
res
|
||||||
|
})
|
||||||
|
.assets({
|
||||||
|
let asset_volumes = manifest
|
||||||
|
.volumes
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, v)| matches!(v, &&Volume::Assets {})).map(|(id, _)| id.clone()).collect::<Vec<_>>();
|
||||||
|
let assets_path = manifest.assets.assets_path().to_owned();
|
||||||
|
let path = path.clone();
|
||||||
|
|
||||||
|
BufferedWriteReader::new(|w| async move {
|
||||||
|
let mut assets = tokio_tar::Builder::new(w);
|
||||||
|
for asset_volume in asset_volumes
|
||||||
|
{
|
||||||
|
assets
|
||||||
|
.append_dir_all(
|
||||||
|
&asset_volume,
|
||||||
|
path.join(&assets_path).join(&asset_volume),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Ok::<_, std::io::Error>(())
|
||||||
|
}, 1024 * 1024)
|
||||||
|
})
|
||||||
|
.scripts({
|
||||||
|
let script_path = path.join(manifest.assets.scripts_path()).join("embassy.js");
|
||||||
|
let needs_script = manifest.package_procedures().any(|a| a.is_script());
|
||||||
|
let has_script = script_path.exists();
|
||||||
|
match (needs_script, has_script) {
|
||||||
|
(true, true) => Some(File::open(script_path).await?),
|
||||||
|
(true, false) => {
|
||||||
|
return Err(Error::new(eyre!("Script is declared in manifest, but no such script exists at ./scripts/embassy.js"), ErrorKind::Pack).into())
|
||||||
|
}
|
||||||
|
(false, true) => {
|
||||||
|
tracing::warn!("Manifest does not declare any actions that use scripts, but a script exists at ./scripts/embassy.js");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
(false, false) => None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build()
|
||||||
|
.pack(&ctx.developer_key()?)
|
||||||
|
.await?;
|
||||||
|
outfile.sync_all().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[command(rename = "s9pk", cli_only, display(display_none))]
|
||||||
|
pub async fn verify(#[arg] path: PathBuf) -> Result<(), Error> {
|
||||||
|
let mut s9pk = S9pkReader::open(path, true).await?;
|
||||||
|
s9pk.validate().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn enumerate_extra_keys(reference: &Value, candidate: &Value) -> Vec<String> {
|
||||||
|
match (reference, candidate) {
|
||||||
|
(Value::Object(m_r), Value::Object(m_c)) => {
|
||||||
|
let om_r: OrdMap<String, Value> = m_r.clone().into_iter().collect();
|
||||||
|
let om_c: OrdMap<String, Value> = m_c.clone().into_iter().collect();
|
||||||
|
let common = om_r.clone().intersection(om_c.clone());
|
||||||
|
let top_extra = common.clone().symmetric_difference(om_c.clone());
|
||||||
|
let mut all_extra = top_extra
|
||||||
|
.keys()
|
||||||
|
.map(|s| format!(".{}", s))
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
for (k, v) in common {
|
||||||
|
all_extra.extend(
|
||||||
|
enumerate_extra_keys(&v, om_c.get(&k).unwrap())
|
||||||
|
.into_iter()
|
||||||
|
.map(|s| format!(".{}{}", k, s)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
all_extra
|
||||||
|
}
|
||||||
|
(_, Value::Object(m1)) => m1.clone().keys().map(|s| format!(".{}", s)).collect(),
|
||||||
|
_ => Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_enumerate_extra_keys() {
|
||||||
|
use serde_json::json;
|
||||||
|
let extras = enumerate_extra_keys(
|
||||||
|
&json!({
|
||||||
|
"test": 1,
|
||||||
|
"test2": null,
|
||||||
|
}),
|
||||||
|
&json!({
|
||||||
|
"test": 1,
|
||||||
|
"test2": { "test3": null },
|
||||||
|
"test4": null
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
println!("{:?}", extras)
|
||||||
|
}
|
||||||
41
backend/src/s9pk/v2/mod.rs
Normal file
41
backend/src/s9pk/v2/mod.rs
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
use crate::prelude::*;
|
||||||
|
use crate::s9pk::merkle_archive::sink::Sink;
|
||||||
|
use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section};
|
||||||
|
use crate::s9pk::merkle_archive::MerkleArchive;
|
||||||
|
|
||||||
|
const MAGIC_AND_VERSION: &[u8] = &[0x3b, 0x3b, 0x02];
|
||||||
|
|
||||||
|
pub struct S9pk<S>(MerkleArchive<S>);
|
||||||
|
impl<S: FileSource> S9pk<S> {
|
||||||
|
pub async fn serialize<W: Sink>(&mut self, w: &mut W, verify: bool) -> Result<(), Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
w.write_all(MAGIC_AND_VERSION).await?;
|
||||||
|
self.0.serialize(w, verify).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: ArchiveSource> S9pk<Section<S>> {
|
||||||
|
pub async fn deserialize(source: &S) -> Result<Self, Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut header = source
|
||||||
|
.fetch(
|
||||||
|
0,
|
||||||
|
MAGIC_AND_VERSION.len() as u64 + MerkleArchive::<Section<S>>::header_size(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut magic_version = [0u8; 3];
|
||||||
|
header.read_exact(&mut magic_version).await?;
|
||||||
|
ensure_code!(
|
||||||
|
&magic_version == MAGIC_AND_VERSION,
|
||||||
|
ErrorKind::ParseS9pk,
|
||||||
|
"Invalid Magic or Unexpected Version"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(Self(MerkleArchive::deserialize(source, &mut header).await?))
|
||||||
|
}
|
||||||
|
}
|
||||||
89
backend/src/s9pk/v2/specv2.md
Normal file
89
backend/src/s9pk/v2/specv2.md
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
## Magic
|
||||||
|
|
||||||
|
`0x3b3b`
|
||||||
|
|
||||||
|
## Version
|
||||||
|
|
||||||
|
`0x02` (varint)
|
||||||
|
|
||||||
|
## Merkle Archive
|
||||||
|
|
||||||
|
### Header
|
||||||
|
|
||||||
|
- ed25519 pubkey (32B)
|
||||||
|
- ed25519 signature of TOC sighash (64B)
|
||||||
|
- TOC sighash: (32B)
|
||||||
|
- TOC position: (8B: u64 BE)
|
||||||
|
- TOC size: (8B: u64 BE)
|
||||||
|
|
||||||
|
### TOC
|
||||||
|
|
||||||
|
- number of entries (varint)
|
||||||
|
- FOREACH section
|
||||||
|
- name (varstring)
|
||||||
|
- hash (32B: BLAKE-3 of file contents / TOC sighash)
|
||||||
|
- TYPE (1B)
|
||||||
|
- TYPE=MISSING (`0x00`)
|
||||||
|
- TYPE=FILE (`0x01`)
|
||||||
|
- position (8B: u64 BE)
|
||||||
|
- size (8B: u64 BE)
|
||||||
|
- TYPE=TOC (`0x02`)
|
||||||
|
- position (8B: u64 BE)
|
||||||
|
- size (8B: u64 BE)
|
||||||
|
|
||||||
|
#### SigHash
|
||||||
|
Hash of TOC with all contents MISSING
|
||||||
|
|
||||||
|
### FILE
|
||||||
|
|
||||||
|
`<File contents>`
|
||||||
|
|
||||||
|
# Example
|
||||||
|
|
||||||
|
`foo/bar/baz.txt`
|
||||||
|
|
||||||
|
ROOT TOC:
|
||||||
|
- 1 section
|
||||||
|
- name: foo
|
||||||
|
hash: sighash('a)
|
||||||
|
type: TOC
|
||||||
|
position: 'a
|
||||||
|
length: _
|
||||||
|
|
||||||
|
'a:
|
||||||
|
- 1 section
|
||||||
|
- name: bar
|
||||||
|
hash: sighash('b)
|
||||||
|
type: TOC
|
||||||
|
position: 'b
|
||||||
|
size: _
|
||||||
|
|
||||||
|
'b:
|
||||||
|
- 2 sections
|
||||||
|
- name: baz.txt
|
||||||
|
hash: hash('c)
|
||||||
|
type: FILE
|
||||||
|
position: 'c
|
||||||
|
length: _
|
||||||
|
- name: qux
|
||||||
|
hash: `<unverifiable>`
|
||||||
|
type: MISSING
|
||||||
|
|
||||||
|
'c: `<CONTENTS OF baz.txt>`
|
||||||
|
|
||||||
|
"foo/"
|
||||||
|
hash: _
|
||||||
|
size: 15b
|
||||||
|
|
||||||
|
"bar.txt"
|
||||||
|
hash: _
|
||||||
|
size: 5b
|
||||||
|
|
||||||
|
`<CONTENTS OF foo/>` (
|
||||||
|
"baz.txt"
|
||||||
|
hash: _
|
||||||
|
size: 2b
|
||||||
|
)
|
||||||
|
`<CONTENTS OF bar.txt>` ("hello")
|
||||||
|
`<CONTENTS OF baz.txt>` ("hi")
|
||||||
|
|
||||||
@@ -466,3 +466,27 @@ impl FileLock {
|
|||||||
pub fn assure_send<T: Send>(x: T) -> T {
|
pub fn assure_send<T: Send>(x: T) -> T {
|
||||||
x
|
x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum MaybeOwned<'a, T> {
|
||||||
|
Borrowed(&'a T),
|
||||||
|
Owned(T),
|
||||||
|
}
|
||||||
|
impl<'a, T> std::ops::Deref for MaybeOwned<'a, T> {
|
||||||
|
type Target = T;
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
match self {
|
||||||
|
Self::Borrowed(a) => *a,
|
||||||
|
Self::Owned(a) => a,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<'a, T> From<T> for MaybeOwned<'a, T> {
|
||||||
|
fn from(value: T) -> Self {
|
||||||
|
MaybeOwned::Owned(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<'a, T> From<&'a T> for MaybeOwned<'a, T> {
|
||||||
|
fn from(value: &'a T) -> Self {
|
||||||
|
MaybeOwned::Borrowed(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user