setup flow fixes

This commit is contained in:
Aiden McClelland
2021-09-14 22:16:28 -06:00
parent 7a166d4490
commit 93f37cd9d5
13 changed files with 445 additions and 221 deletions

View File

@@ -10,7 +10,8 @@ use tokio::fs::File;
use tokio::sync::broadcast::Sender;
use url::Host;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::util::io::from_toml_async_reader;
use crate::util::AsyncFileExt;
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]

View File

@@ -26,8 +26,9 @@ use crate::manager::ManagerMap;
use crate::net::tor::os_key;
use crate::net::NetController;
use crate::shutdown::Shutdown;
use crate::util::io::from_toml_async_reader;
use crate::util::logger::EmbassyLogger;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::util::AsyncFileExt;
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]

View File

@@ -17,7 +17,8 @@ use url::Host;
use crate::db::model::Database;
use crate::hostname::{get_hostname, get_id};
use crate::net::tor::os_key;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::util::io::from_toml_async_reader;
use crate::util::AsyncFileExt;
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]

View File

@@ -9,7 +9,8 @@ use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use crate::util::{from_yaml_async_reader, GeneralGuard, Invoke, Version};
use crate::util::io::from_yaml_async_reader;
use crate::util::{GeneralGuard, Invoke, Version};
use crate::{Error, ResultExt as _};
pub const TMP_MOUNTPOINT: &'static str = "/media/embassy-os";
@@ -150,12 +151,24 @@ pub async fn list() -> Result<Vec<DiskInfo>, Error> {
} else {
(disk_path, None)
};
let disk = tokio::fs::canonicalize(Path::new(DISK_PATH).join(disk_path)).await?;
let disk_path = Path::new(DISK_PATH).join(disk_path);
let disk = tokio::fs::canonicalize(&disk_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
disk_path.display().to_string(),
)
})?;
if !disks.contains_key(&disk) {
disks.insert(disk.clone(), IndexSet::new());
}
if let Some(part_path) = part_path {
let part = tokio::fs::canonicalize(part_path).await?;
let part_path = Path::new(DISK_PATH).join(disk_path);
let part = tokio::fs::canonicalize(&part_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
part_path.display().to_string(),
)
})?;
disks[&disk].insert(part);
}
}

View File

@@ -14,7 +14,8 @@ use patch_db::DbHandle;
use reqwest::Response;
use rpc_toolkit::command;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio::process::Command;
use self::cleanup::cleanup_failed;
use crate::context::RpcContext;
@@ -29,6 +30,7 @@ use crate::install::progress::{InstallProgress, InstallProgressTracker};
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::s9pk::reader::S9pkReader;
use crate::status::{DependencyErrors, MainStatus, Status};
use crate::util::io::copy_and_shutdown;
use crate::util::{display_none, AsyncFileExt, Version};
use crate::volume::asset_dir;
use crate::{Error, ResultExt};
@@ -38,6 +40,8 @@ pub mod progress;
pub const PKG_CACHE: &'static str = "package-data/cache";
pub const PKG_PUBLIC_DIR: &'static str = "package-data/public";
pub const PKG_DOCKER_DIR: &'static str = "package-data/docker";
pub const PKG_WASM_DIR: &'static str = "package-data/wasm";
#[command(display(display_none))]
pub async fn install(
@@ -418,21 +422,53 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
log::info!("Install {}@{}: Unpacking Docker Images", pkg_id, version);
progress
.track_read_during(progress_model.clone(), &ctx.db, || async {
let mut load = tokio::process::Command::new("docker")
let image_tar_dir = Path::new(PKG_DOCKER_DIR)
.join(pkg_id)
.join(version.as_str());
if tokio::fs::metadata(&image_tar_dir).await.is_err() {
tokio::fs::create_dir_all(&image_tar_dir)
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
image_tar_dir.display().to_string(),
)
})?;
}
let image_tar_path = image_tar_dir.join("image.tar");
let mut tee = Command::new("tee")
.arg(&image_tar_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let mut load = Command::new("docker")
.arg("load")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let mut dst = load.stdin.take().ok_or_else(|| {
let tee_in = tee.stdin.take().ok_or_else(|| {
Error::new(
anyhow!("Could not write to stdin of tee"),
crate::ErrorKind::Docker,
)
})?;
let mut tee_out = tee.stdout.take().ok_or_else(|| {
Error::new(
anyhow!("Could not read from stdout of tee"),
crate::ErrorKind::Docker,
)
})?;
let load_in = load.stdin.take().ok_or_else(|| {
Error::new(
anyhow!("Could not write to stdin of docker load"),
crate::ErrorKind::Docker,
)
})?;
tokio::io::copy(&mut rdr.docker_images().await?, &mut dst).await?;
dst.flush().await?;
dst.shutdown().await?;
drop(dst);
let mut docker_rdr = rdr.docker_images().await?;
tokio::try_join!(
copy_and_shutdown(&mut docker_rdr, tee_in),
copy_and_shutdown(&mut tee_out, load_in),
)?;
let res = load.wait_with_output().await?;
if !res.status.success() {
Err(Error::new(

View File

@@ -71,12 +71,15 @@ impl Stream for DecryptStream {
pbkdf2::<Hmac<Sha256>>(
this.key.as_bytes(),
&this.salt,
100_000,
1000,
aeskey.as_mut_slice(),
);
let ctr = Nonce::<Aes256Ctr>::from_slice(&this.ctr);
*this.aes = Some(Aes256Ctr::new(&aeskey, &ctr));
buf.to_vec().into()
let mut aes = Aes256Ctr::new(dbg!(&aeskey), dbg!(&ctr));
let mut res = buf.to_vec();
aes.apply_keystream(&mut res);
*this.aes = Some(aes);
res.into()
} else {
hyper::body::Bytes::new()
}
@@ -98,12 +101,7 @@ impl EncryptStream {
pub fn new(key: &str, body: Body) -> Self {
let prefix: [u8; 32] = rand::random();
let mut aeskey = CipherKey::<Aes256Ctr>::default();
pbkdf2::<Hmac<Sha256>>(
key.as_bytes(),
&prefix[16..],
100_000,
aeskey.as_mut_slice(),
);
pbkdf2::<Hmac<Sha256>>(key.as_bytes(), &prefix[16..], 1000, aeskey.as_mut_slice());
let ctr = Nonce::<Aes256Ctr>::from_slice(&prefix[..16]);
let aes = Aes256Ctr::new(&aeskey, &ctr);
EncryptStream {
@@ -189,6 +187,17 @@ pub fn encrypt<M: Metadata>(key: Arc<String>) -> DynMiddleware<M> {
"Content-Encoding",
HeaderValue::from_static("aesctr256"),
);
if let Some(len_header) =
res.headers_mut().get_mut("Content-Length")
{
if let Some(len) = len_header
.to_str()
.ok()
.and_then(|l| l.parse::<u64>().ok())
{
*len_header = HeaderValue::from(len + 32);
}
}
let body = std::mem::take(res.body_mut());
*res.body_mut() = Body::wrap_stream(
EncryptStream::new(&*key, body),

View File

@@ -14,17 +14,17 @@ server {
proxy_http_version 1.1;
location /rpc/ {
proxy_pass http://localhost:5959/;
proxy_pass http://127.0.0.1:5959/;
}
location /ws/ {
proxy_pass http://localhost:5960/;
proxy_pass http://127.0.0.1:5960/;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
location /marketplace/ {
proxy_pass https://beta-registry-0-3.start9labs.com/;
proxy_pass https://beta-registry-0-3.start9labs.com/; # TODO
}
location / {

View File

@@ -14,7 +14,7 @@ server {
proxy_http_version 1.1;
location /rpc/ {
proxy_pass http://localhost:5959/;
proxy_pass http://127.0.0.1:5959/;
}
location / {

View File

@@ -14,7 +14,7 @@ server {
proxy_http_version 1.1;
location /rpc/ {
proxy_pass http://localhost:5959/;
proxy_pass http://127.0.0.1:5959/;
}
location / {

195
appmgr/src/util/io.rs Normal file
View File

@@ -0,0 +1,195 @@
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use crate::ResultExt;
#[derive(Clone, Debug)]
pub struct AsyncCompat<T>(pub T);
impl<T> futures::io::AsyncRead for AsyncCompat<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let mut read_buf = ReadBuf::new(buf);
tokio::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
&mut read_buf,
)
.map(|res| res.map(|_| read_buf.filled().len()))
}
}
impl<T> tokio::io::AsyncRead for AsyncCompat<T>
where
T: futures::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf.initialize_unfilled(),
)
.map(|res| res.map(|len| buf.set_filled(len)))
}
}
impl<T> futures::io::AsyncWrite for AsyncCompat<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
tokio::io::AsyncWrite::poll_write(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_shutdown(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
impl<T> tokio::io::AsyncWrite for AsyncCompat<T>
where
T: futures::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
futures::io::AsyncWrite::poll_write(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf,
)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_close(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
pub async fn from_yaml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_yaml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_yaml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_yaml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_toml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_toml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_toml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_toml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_cbor_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_cbor::de::from_reader(buffer.as_slice())
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn from_json_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_json::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_json_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let buffer = serde_json::to_string(value).with_kind(crate::ErrorKind::Serialization)?;
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
pub async fn to_json_pretty_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer =
serde_json::to_string_pretty(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.push_str("\n");
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
pub async fn copy_and_shutdown<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
r: &mut R,
mut w: W,
) -> Result<(), std::io::Error> {
tokio::io::copy(r, &mut w).await?;
w.flush().await?;
w.shutdown().await?;
Ok(())
}

View File

@@ -14,13 +14,13 @@ use digest::Digest;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use crate::shutdown::Shutdown;
use crate::{Error, ResultExt as _};
pub mod io;
pub mod logger;
#[derive(Clone, Copy, Debug)]
@@ -38,188 +38,6 @@ impl std::fmt::Display for Never {
}
impl std::error::Error for Never {}
#[derive(Clone, Debug)]
pub struct AsyncCompat<T>(pub T);
impl<T> futures::io::AsyncRead for AsyncCompat<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let mut read_buf = ReadBuf::new(buf);
tokio::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
&mut read_buf,
)
.map(|res| res.map(|_| read_buf.filled().len()))
}
}
impl<T> tokio::io::AsyncRead for AsyncCompat<T>
where
T: futures::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf.initialize_unfilled(),
)
.map(|res| res.map(|len| buf.set_filled(len)))
}
}
impl<T> futures::io::AsyncWrite for AsyncCompat<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
tokio::io::AsyncWrite::poll_write(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_shutdown(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
impl<T> tokio::io::AsyncWrite for AsyncCompat<T>
where
T: futures::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
futures::io::AsyncWrite::poll_write(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf,
)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_close(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
pub async fn from_yaml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_yaml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_yaml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_yaml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_toml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_toml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_toml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_toml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_cbor_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_cbor::de::from_reader(buffer.as_slice())
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn from_json_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_json::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_json_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let buffer = serde_json::to_string(value).with_kind(crate::ErrorKind::Serialization)?;
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
pub async fn to_json_pretty_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer =
serde_json::to_string_pretty(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.push_str("\n");
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
#[async_trait::async_trait]
pub trait Invoke {
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error>;