begin setup flow

This commit is contained in:
Aiden McClelland
2021-09-08 16:18:03 -06:00
committed by Aiden McClelland
parent 6c68c7fed9
commit 8f362e7d1e
6 changed files with 312 additions and 9 deletions

View File

@@ -1,7 +1,10 @@
use std::path::Path;
use std::sync::Arc;
use embassy::context::rpc::RpcContextConfig;
use embassy::context::{RecoveryContext, SetupContext};
use embassy::hostname::get_product_key;
use embassy::middleware::encrypt::encrypt;
use embassy::util::Invoke;
use embassy::{Error, ResultExt};
use http::StatusCode;
@@ -14,6 +17,7 @@ fn status_fn(_: i32) -> StatusCode {
async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
let cfg = RpcContextConfig::load(cfg_path).await?;
embassy::disk::util::mount("LABEL=EMBASSY", "/embassy-os").await?;
if tokio::fs::metadata("/embassy-os/disk.guid").await.is_ok() {
embassy::disk::main::load(
&cfg,
@@ -26,11 +30,14 @@ async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
log::info!("Loaded Disk");
} else {
let ctx = SetupContext::init(cfg_path).await?;
let encrypt = encrypt(Arc::new(get_product_key().await?));
rpc_server!({
command: embassy::setup_api,
context: ctx.clone(),
status: status_fn,
middleware: [ ]
middleware: [
encrypt,
]
})
.with_graceful_shutdown({
let mut shutdown = ctx.shutdown.subscribe();
@@ -65,6 +72,11 @@ async fn init(cfg_path: Option<&str>) -> Result<(), Error> {
.invoke(embassy::ErrorKind::Filesystem)
.await?;
embassy::disk::util::bind(&tmp_docker, "/var/lib/docker", false).await?;
Command::new("systemctl")
.arg("restart")
.arg("docker")
.invoke(embassy::ErrorKind::Journald)
.await?;
log::info!("Mounted Docker Data");
embassy::ssh::sync_keys_from_db(&secret_store, "/root/.ssh/authorized_keys").await?;
log::info!("Synced SSH Keys");

View File

@@ -0,0 +1,215 @@
use std::sync::Arc;
use aes::cipher::{CipherKey, NewCipher, Nonce, StreamCipher};
use aes::Aes256Ctr;
use anyhow::anyhow;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use hmac::Hmac;
use http::{HeaderMap, HeaderValue};
use pbkdf2::pbkdf2;
use rpc_toolkit::hyper::http::Error as HttpError;
use rpc_toolkit::hyper::{self, Body, Request, Response, StatusCode};
use rpc_toolkit::rpc_server_helpers::{
to_response, DynMiddleware, DynMiddlewareStage2, DynMiddlewareStage3, DynMiddlewareStage4,
};
use rpc_toolkit::yajrc::RpcMethod;
use rpc_toolkit::Metadata;
use sha2::Sha256;
use crate::util::Apply;
use crate::Error;
#[pin_project::pin_project]
pub struct DecryptStream {
key: Arc<String>,
#[pin]
body: Body,
ctr: Vec<u8>,
salt: Vec<u8>,
aes: Option<Aes256Ctr>,
}
impl DecryptStream {
pub fn new(key: Arc<String>, body: Body) -> Self {
DecryptStream {
key,
body,
ctr: Vec::new(),
salt: Vec::new(),
aes: None,
}
}
}
impl Stream for DecryptStream {
type Item = hyper::Result<hyper::body::Bytes>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
match this.body.poll_next(cx) {
std::task::Poll::Pending => std::task::Poll::Pending,
std::task::Poll::Ready(Some(Ok(bytes))) => std::task::Poll::Ready(Some(Ok({
let mut buf = &*bytes;
if let Some(aes) = this.aes.as_mut() {
let mut res = buf.to_vec();
aes.apply_keystream(&mut res);
res.into()
} else {
if this.ctr.len() < 16 && buf.len() > 0 {
let to_read = std::cmp::min(16 - this.ctr.len(), buf.len());
this.ctr.extend_from_slice(&buf[0..to_read]);
buf = &buf[to_read..];
}
if this.salt.len() < 16 && buf.len() > 0 {
let to_read = std::cmp::min(16 - this.salt.len(), buf.len());
this.salt.extend_from_slice(&buf[0..to_read]);
buf = &buf[to_read..];
}
if this.ctr.len() == 16 && this.salt.len() == 16 {
let mut aeskey = CipherKey::<Aes256Ctr>::default();
pbkdf2::<Hmac<Sha256>>(
this.key.as_bytes(),
&this.salt,
100_000,
aeskey.as_mut_slice(),
);
let ctr = Nonce::<Aes256Ctr>::from_slice(&this.ctr);
*this.aes = Some(Aes256Ctr::new(&aeskey, &ctr));
buf.to_vec().into()
} else {
hyper::body::Bytes::new()
}
}
}))),
std::task::Poll::Ready(a) => std::task::Poll::Ready(a),
}
}
}
#[pin_project::pin_project]
pub struct EncryptStream {
#[pin]
body: Body,
aes: Aes256Ctr,
prefix: Option<[u8; 32]>,
}
impl EncryptStream {
pub fn new(key: &str, body: Body) -> Self {
let prefix: [u8; 32] = rand::random();
let mut aeskey = CipherKey::<Aes256Ctr>::default();
pbkdf2::<Hmac<Sha256>>(
key.as_bytes(),
&prefix[16..],
100_000,
aeskey.as_mut_slice(),
);
let ctr = Nonce::<Aes256Ctr>::from_slice(&prefix[..16]);
let aes = Aes256Ctr::new(&aeskey, &ctr);
EncryptStream {
body,
aes,
prefix: Some(prefix),
}
}
}
impl Stream for EncryptStream {
type Item = hyper::Result<hyper::body::Bytes>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if let Some(prefix) = this.prefix.take() {
std::task::Poll::Ready(Some(Ok(prefix.to_vec().into())))
} else {
match this.body.poll_next(cx) {
std::task::Poll::Pending => std::task::Poll::Pending,
std::task::Poll::Ready(Some(Ok(bytes))) => std::task::Poll::Ready(Some(Ok({
let mut res = bytes.to_vec();
this.aes.apply_keystream(&mut res);
res.into()
}))),
std::task::Poll::Ready(a) => std::task::Poll::Ready(a),
}
}
}
}
fn encrypted(headers: &HeaderMap) -> bool {
headers
.get("Content-Encoding")
.and_then(|h| {
h.to_str()
.ok()?
.split(",")
.any(|s| s == "aesctr256")
.apply(Some)
})
.unwrap_or_default()
}
pub fn encrypt<M: Metadata>(key: Arc<String>) -> DynMiddleware<M> {
Box::new(
move |req: &mut Request<Body>,
metadata: M|
-> BoxFuture<Result<Result<DynMiddlewareStage2, Response<Body>>, HttpError>> {
let key = key.clone();
async move {
let encrypted = encrypted(req.headers());
if encrypted {
let body = std::mem::take(req.body_mut());
*req.body_mut() = Body::wrap_stream(DecryptStream::new(key.clone(), body));
};
let res: DynMiddlewareStage2 = Box::new(move |req, rpc_req| {
async move {
if !encrypted
&& metadata
.get(&rpc_req.method.as_str(), "encrypted")
.unwrap_or_default()
{
let (res_parts, _) = Response::new(()).into_parts();
Ok(Err(to_response(
&req.headers,
res_parts,
Err(Error::new(
anyhow!("Must be encrypted"),
crate::ErrorKind::Authorization,
)
.into()),
|_| StatusCode::OK,
)?))
} else {
let res: DynMiddlewareStage3 = Box::new(move |_, _| {
async move {
let res: DynMiddlewareStage4 = Box::new(move |res| {
async move {
if encrypted {
res.headers_mut().insert(
"Content-Encoding",
HeaderValue::from_static("aesctr256"),
);
let body = std::mem::take(res.body_mut());
*res.body_mut() = Body::wrap_stream(
EncryptStream::new(&*key, body),
);
}
Ok(())
}
.boxed()
});
Ok(Ok(res))
}
.boxed()
});
Ok(Ok(res))
}
}
.boxed()
});
Ok(Ok(res))
}
.boxed()
},
)
}

View File

@@ -1,2 +1,3 @@
pub mod auth;
pub mod cors;
pub mod encrypt;

View File

@@ -347,14 +347,14 @@ pub async fn daemon<F: FnMut() -> Fut, Fut: Future<Output = ()> + Send + 'static
mut shutdown: tokio::sync::broadcast::Receiver<Option<Shutdown>>,
) -> Result<(), anyhow::Error> {
loop {
match tokio::spawn(f()).await {
Err(e) if e.is_panic() => return Err(anyhow!("daemon panicked!")),
_ => (),
}
tokio::select! {
_ = shutdown.recv() => return Ok(()),
_ = tokio::time::sleep(cooldown) => (),
}
match tokio::spawn(f()).await {
Err(e) if e.is_panic() => return Err(anyhow!("daemon panicked!")),
_ => (),
}
}
}