setup flow fixes

This commit is contained in:
Aiden McClelland
2021-09-14 22:16:28 -06:00
committed by Aiden McClelland
parent 81b83782e3
commit 55b003cb72
13 changed files with 445 additions and 221 deletions

View File

@@ -47,4 +47,4 @@ CREATE TABLE IF NOT EXISTS notifications
title TEXT NOT NULL,
message TEXT NOT NULL,
data TEXT
)
);

View File

@@ -20,6 +20,16 @@
"nullable": []
}
},
"177c4b9cc7901a3b906e5969b86b1c11e6acbfb8e86e98f197d7333030b17964": {
"query": "DELETE FROM notifications WHERE id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
}
},
"3502e58f2ab48fb4566d21c920c096f81acfa3ff0d02f970626a4dcd67bac71d": {
"query": "SELECT tor_key FROM account",
"describe": {
@@ -162,6 +172,76 @@
"nullable": []
}
},
"65e6c3fbb138da5cf385af096fdd3c062b6e826e12a8a4b23e16fcc773004c29": {
"query": "SELECT id, package_id, created_at, code, level, title, message, data FROM notifications WHERE id < ? ORDER BY id DESC LIMIT ?",
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "package_id",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "created_at",
"ordinal": 2,
"type_info": "Datetime"
},
{
"name": "code",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "level",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "title",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "message",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "data",
"ordinal": 7,
"type_info": "Text"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
true,
false,
false,
false,
false,
false,
true
]
}
},
"6b9abc9e079cff975f8a7f07ff70548c7877ecae3be0d0f2d3f439a6713326c0": {
"query": "DELETE FROM notifications WHERE id < ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
}
},
"6c96d76bffcc5f03290d8d8544a58521345ed2a843a509b17bbcd6257bb81821": {
"query": "SELECT priv_key_pem, certificate_pem FROM certificates WHERE id = 1;",
"describe": {
@@ -230,16 +310,6 @@
]
}
},
"8c87b09f997838b7e5be48ebb202d97eb9d512a51c6e67b1fb7df33abd79a0af": {
"query": "-- Add migration script here\nCREATE TABLE IF NOT EXISTS tor\n(\n package TEXT NOT NULL,\n interface TEXT NOT NULL,\n key BLOB NOT NULL CHECK (length(key) = 64),\n PRIMARY KEY (package, interface)\n);\nCREATE TABLE IF NOT EXISTS session\n(\n id TEXT NOT NULL PRIMARY KEY,\n logged_in TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n logged_out TIMESTAMP,\n last_active TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n user_agent TEXT,\n metadata TEXT NOT NULL DEFAULT 'null'\n);\nCREATE TABLE IF NOT EXISTS account\n(\n id INTEGER PRIMARY KEY CHECK (id = 0),\n password TEXT NOT NULL,\n tor_key BLOB NOT NULL CHECK (length(tor_key) = 64)\n);\nCREATE TABLE IF NOT EXISTS ssh_keys\n(\n fingerprint TEXT NOT NULL,\n openssh_pubkey TEXT NOT NULL,\n created_at TEXT NOT NULL,\n PRIMARY KEY (fingerprint)\n);\nCREATE TABLE IF NOT EXISTS certificates\n(\n id INTEGER PRIMARY KEY, -- Root = 0, Int = 1, Other = 2..\n priv_key_pem TEXT NOT NULL,\n certificate_pem TEXT NOT NULL,\n lookup_string TEXT UNIQUE,\n created_at TEXT,\n updated_at TEXT\n);",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
}
},
"9496e17a73672ac3675e02efa7c4bf8bd479b866c0d31fa1e3a85ef159310a57": {
"query": "SELECT priv_key_pem, certificate_pem FROM certificates WHERE lookup_string = ?",
"describe": {
@@ -274,6 +344,16 @@
"nullable": []
}
},
"a596bdc5014ba9e7b362398abf09ec6a100923e001247a79503d1e820ffe71c3": {
"query": "-- Add migration script here\nCREATE TABLE IF NOT EXISTS tor\n(\n package TEXT NOT NULL,\n interface TEXT NOT NULL,\n key BLOB NOT NULL CHECK (length(key) = 64),\n PRIMARY KEY (package, interface)\n);\nCREATE TABLE IF NOT EXISTS session\n(\n id TEXT NOT NULL PRIMARY KEY,\n logged_in TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n logged_out TIMESTAMP,\n last_active TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n user_agent TEXT,\n metadata TEXT NOT NULL DEFAULT 'null'\n);\nCREATE TABLE IF NOT EXISTS account\n(\n id INTEGER PRIMARY KEY CHECK (id = 0),\n password TEXT NOT NULL,\n tor_key BLOB NOT NULL CHECK (length(tor_key) = 64)\n);\nCREATE TABLE IF NOT EXISTS ssh_keys\n(\n fingerprint TEXT NOT NULL,\n openssh_pubkey TEXT NOT NULL,\n created_at TEXT NOT NULL,\n PRIMARY KEY (fingerprint)\n);\nCREATE TABLE IF NOT EXISTS certificates\n(\n id INTEGER PRIMARY KEY, -- Root = 0, Int = 1, Other = 2..\n priv_key_pem TEXT NOT NULL,\n certificate_pem TEXT NOT NULL,\n lookup_string TEXT UNIQUE,\n created_at TEXT,\n updated_at TEXT\n);\nCREATE TABLE IF NOT EXISTS notifications\n(\n id INTEGER PRIMARY KEY,\n package_id TEXT,\n created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n code INTEGER NOT NULL,\n level TEXT NOT NULL,\n title TEXT NOT NULL,\n message TEXT NOT NULL,\n data TEXT\n);",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
}
},
"a6b0c8909a3a5d6d9156aebfb359424e6b5a1d1402e028219e21726f1ebd282e": {
"query": "SELECT fingerprint, openssh_pubkey, created_at FROM ssh_keys",
"describe": {
@@ -304,6 +384,66 @@
]
}
},
"abfdeea8cd10343b85f647d7abc5dc3bd0b5891101b143485938192ee3b8c907": {
"query": "SELECT id, package_id, created_at, code, level, title, message, data FROM notifications ORDER BY id DESC LIMIT ?",
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "package_id",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "created_at",
"ordinal": 2,
"type_info": "Datetime"
},
{
"name": "code",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "level",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "title",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "message",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "data",
"ordinal": 7,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
true,
false,
false,
false,
false,
false,
true
]
}
},
"d5117054072476377f3c4f040ea429d4c9b2cf534e76f35c80a2bf60e8599cca": {
"query": "SELECT openssh_pubkey FROM ssh_keys",
"describe": {
@@ -322,6 +462,16 @@
]
}
},
"d54bd5b53f8c760e1f8cde604aa8b1bdc66e4e025a636bc44ffbcd788b5168fd": {
"query": "INSERT INTO notifications (package_id, code, level, title, message, data) VALUES (?, ?, ?, ?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
}
},
"d79d608ceb862c15b741a6040044c6dd54a837a3a0c5594d15a6041c7bc68ea8": {
"query": "INSERT OR IGNORE INTO tor (package, interface, key) VALUES (?, ?, ?)",
"describe": {

View File

@@ -10,7 +10,8 @@ use tokio::fs::File;
use tokio::sync::broadcast::Sender;
use url::Host;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::util::io::from_toml_async_reader;
use crate::util::AsyncFileExt;
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]

View File

@@ -26,8 +26,9 @@ use crate::manager::ManagerMap;
use crate::net::tor::os_key;
use crate::net::NetController;
use crate::shutdown::Shutdown;
use crate::util::io::from_toml_async_reader;
use crate::util::logger::EmbassyLogger;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::util::AsyncFileExt;
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]

View File

@@ -17,7 +17,8 @@ use url::Host;
use crate::db::model::Database;
use crate::hostname::{get_hostname, get_id};
use crate::net::tor::os_key;
use crate::util::{from_toml_async_reader, AsyncFileExt};
use crate::util::io::from_toml_async_reader;
use crate::util::AsyncFileExt;
use crate::{Error, ResultExt};
#[derive(Debug, Default, Deserialize)]

View File

@@ -9,7 +9,8 @@ use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use crate::util::{from_yaml_async_reader, GeneralGuard, Invoke, Version};
use crate::util::io::from_yaml_async_reader;
use crate::util::{GeneralGuard, Invoke, Version};
use crate::{Error, ResultExt as _};
pub const TMP_MOUNTPOINT: &'static str = "/media/embassy-os";
@@ -150,12 +151,24 @@ pub async fn list() -> Result<Vec<DiskInfo>, Error> {
} else {
(disk_path, None)
};
let disk = tokio::fs::canonicalize(Path::new(DISK_PATH).join(disk_path)).await?;
let disk_path = Path::new(DISK_PATH).join(disk_path);
let disk = tokio::fs::canonicalize(&disk_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
disk_path.display().to_string(),
)
})?;
if !disks.contains_key(&disk) {
disks.insert(disk.clone(), IndexSet::new());
}
if let Some(part_path) = part_path {
let part = tokio::fs::canonicalize(part_path).await?;
let part_path = Path::new(DISK_PATH).join(disk_path);
let part = tokio::fs::canonicalize(&part_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
part_path.display().to_string(),
)
})?;
disks[&disk].insert(part);
}
}

View File

@@ -14,7 +14,8 @@ use patch_db::DbHandle;
use reqwest::Response;
use rpc_toolkit::command;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio::process::Command;
use self::cleanup::cleanup_failed;
use crate::context::RpcContext;
@@ -29,6 +30,7 @@ use crate::install::progress::{InstallProgress, InstallProgressTracker};
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::s9pk::reader::S9pkReader;
use crate::status::{DependencyErrors, MainStatus, Status};
use crate::util::io::copy_and_shutdown;
use crate::util::{display_none, AsyncFileExt, Version};
use crate::volume::asset_dir;
use crate::{Error, ResultExt};
@@ -38,6 +40,8 @@ pub mod progress;
pub const PKG_CACHE: &'static str = "package-data/cache";
pub const PKG_PUBLIC_DIR: &'static str = "package-data/public";
pub const PKG_DOCKER_DIR: &'static str = "package-data/docker";
pub const PKG_WASM_DIR: &'static str = "package-data/wasm";
#[command(display(display_none))]
pub async fn install(
@@ -418,21 +422,53 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin>(
log::info!("Install {}@{}: Unpacking Docker Images", pkg_id, version);
progress
.track_read_during(progress_model.clone(), &ctx.db, || async {
let mut load = tokio::process::Command::new("docker")
let image_tar_dir = Path::new(PKG_DOCKER_DIR)
.join(pkg_id)
.join(version.as_str());
if tokio::fs::metadata(&image_tar_dir).await.is_err() {
tokio::fs::create_dir_all(&image_tar_dir)
.await
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
image_tar_dir.display().to_string(),
)
})?;
}
let image_tar_path = image_tar_dir.join("image.tar");
let mut tee = Command::new("tee")
.arg(&image_tar_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let mut load = Command::new("docker")
.arg("load")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let mut dst = load.stdin.take().ok_or_else(|| {
let tee_in = tee.stdin.take().ok_or_else(|| {
Error::new(
anyhow!("Could not write to stdin of tee"),
crate::ErrorKind::Docker,
)
})?;
let mut tee_out = tee.stdout.take().ok_or_else(|| {
Error::new(
anyhow!("Could not read from stdout of tee"),
crate::ErrorKind::Docker,
)
})?;
let load_in = load.stdin.take().ok_or_else(|| {
Error::new(
anyhow!("Could not write to stdin of docker load"),
crate::ErrorKind::Docker,
)
})?;
tokio::io::copy(&mut rdr.docker_images().await?, &mut dst).await?;
dst.flush().await?;
dst.shutdown().await?;
drop(dst);
let mut docker_rdr = rdr.docker_images().await?;
tokio::try_join!(
copy_and_shutdown(&mut docker_rdr, tee_in),
copy_and_shutdown(&mut tee_out, load_in),
)?;
let res = load.wait_with_output().await?;
if !res.status.success() {
Err(Error::new(

View File

@@ -71,12 +71,15 @@ impl Stream for DecryptStream {
pbkdf2::<Hmac<Sha256>>(
this.key.as_bytes(),
&this.salt,
100_000,
1000,
aeskey.as_mut_slice(),
);
let ctr = Nonce::<Aes256Ctr>::from_slice(&this.ctr);
*this.aes = Some(Aes256Ctr::new(&aeskey, &ctr));
buf.to_vec().into()
let mut aes = Aes256Ctr::new(dbg!(&aeskey), dbg!(&ctr));
let mut res = buf.to_vec();
aes.apply_keystream(&mut res);
*this.aes = Some(aes);
res.into()
} else {
hyper::body::Bytes::new()
}
@@ -98,12 +101,7 @@ impl EncryptStream {
pub fn new(key: &str, body: Body) -> Self {
let prefix: [u8; 32] = rand::random();
let mut aeskey = CipherKey::<Aes256Ctr>::default();
pbkdf2::<Hmac<Sha256>>(
key.as_bytes(),
&prefix[16..],
100_000,
aeskey.as_mut_slice(),
);
pbkdf2::<Hmac<Sha256>>(key.as_bytes(), &prefix[16..], 1000, aeskey.as_mut_slice());
let ctr = Nonce::<Aes256Ctr>::from_slice(&prefix[..16]);
let aes = Aes256Ctr::new(&aeskey, &ctr);
EncryptStream {
@@ -189,6 +187,17 @@ pub fn encrypt<M: Metadata>(key: Arc<String>) -> DynMiddleware<M> {
"Content-Encoding",
HeaderValue::from_static("aesctr256"),
);
if let Some(len_header) =
res.headers_mut().get_mut("Content-Length")
{
if let Some(len) = len_header
.to_str()
.ok()
.and_then(|l| l.parse::<u64>().ok())
{
*len_header = HeaderValue::from(len + 32);
}
}
let body = std::mem::take(res.body_mut());
*res.body_mut() = Body::wrap_stream(
EncryptStream::new(&*key, body),

View File

@@ -14,17 +14,17 @@ server {
proxy_http_version 1.1;
location /rpc/ {
proxy_pass http://localhost:5959/;
proxy_pass http://127.0.0.1:5959/;
}
location /ws/ {
proxy_pass http://localhost:5960/;
proxy_pass http://127.0.0.1:5960/;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
location /marketplace/ {
proxy_pass https://beta-registry-0-3.start9labs.com/;
proxy_pass https://beta-registry-0-3.start9labs.com/; # TODO
}
location / {

View File

@@ -14,7 +14,7 @@ server {
proxy_http_version 1.1;
location /rpc/ {
proxy_pass http://localhost:5959/;
proxy_pass http://127.0.0.1:5959/;
}
location / {

View File

@@ -14,7 +14,7 @@ server {
proxy_http_version 1.1;
location /rpc/ {
proxy_pass http://localhost:5959/;
proxy_pass http://127.0.0.1:5959/;
}
location / {

195
appmgr/src/util/io.rs Normal file
View File

@@ -0,0 +1,195 @@
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use crate::ResultExt;
#[derive(Clone, Debug)]
pub struct AsyncCompat<T>(pub T);
impl<T> futures::io::AsyncRead for AsyncCompat<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let mut read_buf = ReadBuf::new(buf);
tokio::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
&mut read_buf,
)
.map(|res| res.map(|_| read_buf.filled().len()))
}
}
impl<T> tokio::io::AsyncRead for AsyncCompat<T>
where
T: futures::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf.initialize_unfilled(),
)
.map(|res| res.map(|len| buf.set_filled(len)))
}
}
impl<T> futures::io::AsyncWrite for AsyncCompat<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
tokio::io::AsyncWrite::poll_write(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_shutdown(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
impl<T> tokio::io::AsyncWrite for AsyncCompat<T>
where
T: futures::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
futures::io::AsyncWrite::poll_write(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf,
)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_close(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
pub async fn from_yaml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_yaml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_yaml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_yaml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_toml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_toml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_toml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_toml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_cbor_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_cbor::de::from_reader(buffer.as_slice())
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn from_json_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_json::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_json_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let buffer = serde_json::to_string(value).with_kind(crate::ErrorKind::Serialization)?;
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
pub async fn to_json_pretty_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer =
serde_json::to_string_pretty(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.push_str("\n");
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
pub async fn copy_and_shutdown<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
r: &mut R,
mut w: W,
) -> Result<(), std::io::Error> {
tokio::io::copy(r, &mut w).await?;
w.flush().await?;
w.shutdown().await?;
Ok(())
}

View File

@@ -14,13 +14,13 @@ use digest::Digest;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use crate::shutdown::Shutdown;
use crate::{Error, ResultExt as _};
pub mod io;
pub mod logger;
#[derive(Clone, Copy, Debug)]
@@ -38,188 +38,6 @@ impl std::fmt::Display for Never {
}
impl std::error::Error for Never {}
#[derive(Clone, Debug)]
pub struct AsyncCompat<T>(pub T);
impl<T> futures::io::AsyncRead for AsyncCompat<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
let mut read_buf = ReadBuf::new(buf);
tokio::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
&mut read_buf,
)
.map(|res| res.map(|_| read_buf.filled().len()))
}
}
impl<T> tokio::io::AsyncRead for AsyncCompat<T>
where
T: futures::io::AsyncRead,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf.initialize_unfilled(),
)
.map(|res| res.map(|len| buf.set_filled(len)))
}
}
impl<T> futures::io::AsyncWrite for AsyncCompat<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
tokio::io::AsyncWrite::poll_write(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
tokio::io::AsyncWrite::poll_shutdown(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
impl<T> tokio::io::AsyncWrite for AsyncCompat<T>
where
T: futures::io::AsyncWrite,
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
futures::io::AsyncWrite::poll_write(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf,
)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncWrite::poll_close(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx)
}
}
pub async fn from_yaml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_yaml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_yaml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_yaml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_toml_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_toml::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_toml_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer = serde_toml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.extend_from_slice(b"\n");
writer.write_all(&buffer).await?;
Ok(())
}
pub async fn from_cbor_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_cbor::de::from_reader(buffer.as_slice())
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn from_json_async_reader<T, R>(mut reader: R) -> Result<T, crate::Error>
where
T: for<'de> serde::Deserialize<'de>,
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
serde_json::from_slice(&buffer)
.map_err(anyhow::Error::from)
.with_kind(crate::ErrorKind::Deserialization)
}
pub async fn to_json_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let buffer = serde_json::to_string(value).with_kind(crate::ErrorKind::Serialization)?;
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
pub async fn to_json_pretty_async_writer<T, W>(mut writer: W, value: &T) -> Result<(), crate::Error>
where
T: serde::Serialize,
W: AsyncWrite + Unpin,
{
let mut buffer =
serde_json::to_string_pretty(value).with_kind(crate::ErrorKind::Serialization)?;
buffer.push_str("\n");
writer.write_all(&buffer.as_bytes()).await?;
Ok(())
}
#[async_trait::async_trait]
pub trait Invoke {
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error>;