From 93f37cd9d507fd9e828779240eb1d0cc37ef9e00 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Tue, 14 Sep 2021 22:16:28 -0600 Subject: [PATCH] setup flow fixes --- appmgr/migrations/20210629193146_Init.sql | 2 +- appmgr/sqlx-data.json | 170 +++++++++++++++++-- appmgr/src/context/recovery.rs | 3 +- appmgr/src/context/rpc.rs | 3 +- appmgr/src/context/setup.rs | 3 +- appmgr/src/disk/util.rs | 19 ++- appmgr/src/install/mod.rs | 50 +++++- appmgr/src/middleware/encrypt.rs | 27 ++- appmgr/src/nginx/main-ui.conf | 6 +- appmgr/src/nginx/recovery-ui.conf | 2 +- appmgr/src/nginx/setup-wizard.conf | 2 +- appmgr/src/util/io.rs | 195 ++++++++++++++++++++++ appmgr/src/util/mod.rs | 184 +------------------- 13 files changed, 445 insertions(+), 221 deletions(-) create mode 100644 appmgr/src/util/io.rs diff --git a/appmgr/migrations/20210629193146_Init.sql b/appmgr/migrations/20210629193146_Init.sql index c0d2b2abc..418610eef 100644 --- a/appmgr/migrations/20210629193146_Init.sql +++ b/appmgr/migrations/20210629193146_Init.sql @@ -47,4 +47,4 @@ CREATE TABLE IF NOT EXISTS notifications title TEXT NOT NULL, message TEXT NOT NULL, data TEXT -) \ No newline at end of file +); \ No newline at end of file diff --git a/appmgr/sqlx-data.json b/appmgr/sqlx-data.json index 774a0aea7..e3812b646 100644 --- a/appmgr/sqlx-data.json +++ b/appmgr/sqlx-data.json @@ -20,6 +20,16 @@ "nullable": [] } }, + "177c4b9cc7901a3b906e5969b86b1c11e6acbfb8e86e98f197d7333030b17964": { + "query": "DELETE FROM notifications WHERE id = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + } + }, "3502e58f2ab48fb4566d21c920c096f81acfa3ff0d02f970626a4dcd67bac71d": { "query": "SELECT tor_key FROM account", "describe": { @@ -162,6 +172,76 @@ "nullable": [] } }, + "65e6c3fbb138da5cf385af096fdd3c062b6e826e12a8a4b23e16fcc773004c29": { + "query": "SELECT id, package_id, created_at, code, level, title, message, data FROM notifications WHERE id < ? ORDER BY id DESC LIMIT ?", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "package_id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at", + "ordinal": 2, + "type_info": "Datetime" + }, + { + "name": "code", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "level", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "title", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "message", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "data", + "ordinal": 7, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + true, + false, + false, + false, + false, + false, + true + ] + } + }, + "6b9abc9e079cff975f8a7f07ff70548c7877ecae3be0d0f2d3f439a6713326c0": { + "query": "DELETE FROM notifications WHERE id < ?", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + } + }, "6c96d76bffcc5f03290d8d8544a58521345ed2a843a509b17bbcd6257bb81821": { "query": "SELECT priv_key_pem, certificate_pem FROM certificates WHERE id = 1;", "describe": { @@ -230,16 +310,6 @@ ] } }, - "8c87b09f997838b7e5be48ebb202d97eb9d512a51c6e67b1fb7df33abd79a0af": { - "query": "-- Add migration script here\nCREATE TABLE IF NOT EXISTS tor\n(\n package TEXT NOT NULL,\n interface TEXT NOT NULL,\n key BLOB NOT NULL CHECK (length(key) = 64),\n PRIMARY KEY (package, interface)\n);\nCREATE TABLE IF NOT EXISTS session\n(\n id TEXT NOT NULL PRIMARY KEY,\n logged_in TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n logged_out TIMESTAMP,\n last_active TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n user_agent TEXT,\n metadata TEXT NOT NULL DEFAULT 'null'\n);\nCREATE TABLE IF NOT EXISTS account\n(\n id INTEGER PRIMARY KEY CHECK (id = 0),\n password TEXT NOT NULL,\n tor_key BLOB NOT NULL CHECK (length(tor_key) = 64)\n);\nCREATE TABLE IF NOT EXISTS ssh_keys\n(\n fingerprint TEXT NOT NULL,\n openssh_pubkey TEXT NOT NULL,\n created_at TEXT NOT NULL,\n PRIMARY KEY (fingerprint)\n);\nCREATE TABLE IF NOT EXISTS certificates\n(\n id INTEGER PRIMARY KEY, -- Root = 0, Int = 1, Other = 2..\n priv_key_pem TEXT NOT NULL,\n certificate_pem TEXT NOT NULL,\n lookup_string TEXT UNIQUE,\n created_at TEXT,\n updated_at TEXT\n);", - "describe": { - "columns": [], - "parameters": { - "Right": 0 - }, - "nullable": [] - } - }, "9496e17a73672ac3675e02efa7c4bf8bd479b866c0d31fa1e3a85ef159310a57": { "query": "SELECT priv_key_pem, certificate_pem FROM certificates WHERE lookup_string = ?", "describe": { @@ -274,6 +344,16 @@ "nullable": [] } }, + "a596bdc5014ba9e7b362398abf09ec6a100923e001247a79503d1e820ffe71c3": { + "query": "-- Add migration script here\nCREATE TABLE IF NOT EXISTS tor\n(\n package TEXT NOT NULL,\n interface TEXT NOT NULL,\n key BLOB NOT NULL CHECK (length(key) = 64),\n PRIMARY KEY (package, interface)\n);\nCREATE TABLE IF NOT EXISTS session\n(\n id TEXT NOT NULL PRIMARY KEY,\n logged_in TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n logged_out TIMESTAMP,\n last_active TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n user_agent TEXT,\n metadata TEXT NOT NULL DEFAULT 'null'\n);\nCREATE TABLE IF NOT EXISTS account\n(\n id INTEGER PRIMARY KEY CHECK (id = 0),\n password TEXT NOT NULL,\n tor_key BLOB NOT NULL CHECK (length(tor_key) = 64)\n);\nCREATE TABLE IF NOT EXISTS ssh_keys\n(\n fingerprint TEXT NOT NULL,\n openssh_pubkey TEXT NOT NULL,\n created_at TEXT NOT NULL,\n PRIMARY KEY (fingerprint)\n);\nCREATE TABLE IF NOT EXISTS certificates\n(\n id INTEGER PRIMARY KEY, -- Root = 0, Int = 1, Other = 2..\n priv_key_pem TEXT NOT NULL,\n certificate_pem TEXT NOT NULL,\n lookup_string TEXT UNIQUE,\n created_at TEXT,\n updated_at TEXT\n);\nCREATE TABLE IF NOT EXISTS notifications\n(\n id INTEGER PRIMARY KEY,\n package_id TEXT,\n created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n code INTEGER NOT NULL,\n level TEXT NOT NULL,\n title TEXT NOT NULL,\n message TEXT NOT NULL,\n data TEXT\n);", + "describe": { + "columns": [], + "parameters": { + "Right": 0 + }, + "nullable": [] + } + }, "a6b0c8909a3a5d6d9156aebfb359424e6b5a1d1402e028219e21726f1ebd282e": { "query": "SELECT fingerprint, openssh_pubkey, created_at FROM ssh_keys", "describe": { @@ -304,6 +384,66 @@ ] } }, + "abfdeea8cd10343b85f647d7abc5dc3bd0b5891101b143485938192ee3b8c907": { + "query": "SELECT id, package_id, created_at, code, level, title, message, data FROM notifications ORDER BY id DESC LIMIT ?", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "package_id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at", + "ordinal": 2, + "type_info": "Datetime" + }, + { + "name": "code", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "level", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "title", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "message", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "data", + "ordinal": 7, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + true, + false, + false, + false, + false, + false, + true + ] + } + }, "d5117054072476377f3c4f040ea429d4c9b2cf534e76f35c80a2bf60e8599cca": { "query": "SELECT openssh_pubkey FROM ssh_keys", "describe": { @@ -322,6 +462,16 @@ ] } }, + "d54bd5b53f8c760e1f8cde604aa8b1bdc66e4e025a636bc44ffbcd788b5168fd": { + "query": "INSERT INTO notifications (package_id, code, level, title, message, data) VALUES (?, ?, ?, ?, ?, ?)", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + } + }, "d79d608ceb862c15b741a6040044c6dd54a837a3a0c5594d15a6041c7bc68ea8": { "query": "INSERT OR IGNORE INTO tor (package, interface, key) VALUES (?, ?, ?)", "describe": { diff --git a/appmgr/src/context/recovery.rs b/appmgr/src/context/recovery.rs index ee6c228d0..0c53b11f6 100644 --- a/appmgr/src/context/recovery.rs +++ b/appmgr/src/context/recovery.rs @@ -10,7 +10,8 @@ use tokio::fs::File; use tokio::sync::broadcast::Sender; use url::Host; -use crate::util::{from_toml_async_reader, AsyncFileExt}; +use crate::util::io::from_toml_async_reader; +use crate::util::AsyncFileExt; use crate::{Error, ResultExt}; #[derive(Debug, Default, Deserialize)] diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 883f082e4..546bc7c88 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -26,8 +26,9 @@ use crate::manager::ManagerMap; use crate::net::tor::os_key; use crate::net::NetController; use crate::shutdown::Shutdown; +use crate::util::io::from_toml_async_reader; use crate::util::logger::EmbassyLogger; -use crate::util::{from_toml_async_reader, AsyncFileExt}; +use crate::util::AsyncFileExt; use crate::{Error, ResultExt}; #[derive(Debug, Default, Deserialize)] diff --git a/appmgr/src/context/setup.rs b/appmgr/src/context/setup.rs index 205212433..dfb07af1f 100644 --- a/appmgr/src/context/setup.rs +++ b/appmgr/src/context/setup.rs @@ -17,7 +17,8 @@ use url::Host; use crate::db::model::Database; use crate::hostname::{get_hostname, get_id}; use crate::net::tor::os_key; -use crate::util::{from_toml_async_reader, AsyncFileExt}; +use crate::util::io::from_toml_async_reader; +use crate::util::AsyncFileExt; use crate::{Error, ResultExt}; #[derive(Debug, Default, Deserialize)] diff --git a/appmgr/src/disk/util.rs b/appmgr/src/disk/util.rs index 29a684056..0770bad3e 100644 --- a/appmgr/src/disk/util.rs +++ b/appmgr/src/disk/util.rs @@ -9,7 +9,8 @@ use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; -use crate::util::{from_yaml_async_reader, GeneralGuard, Invoke, Version}; +use crate::util::io::from_yaml_async_reader; +use crate::util::{GeneralGuard, Invoke, Version}; use crate::{Error, ResultExt as _}; pub const TMP_MOUNTPOINT: &'static str = "/media/embassy-os"; @@ -150,12 +151,24 @@ pub async fn list() -> Result, Error> { } else { (disk_path, None) }; - let disk = tokio::fs::canonicalize(Path::new(DISK_PATH).join(disk_path)).await?; + let disk_path = Path::new(DISK_PATH).join(disk_path); + let disk = tokio::fs::canonicalize(&disk_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + disk_path.display().to_string(), + ) + })?; if !disks.contains_key(&disk) { disks.insert(disk.clone(), IndexSet::new()); } if let Some(part_path) = part_path { - let part = tokio::fs::canonicalize(part_path).await?; + let part_path = Path::new(DISK_PATH).join(disk_path); + let part = tokio::fs::canonicalize(&part_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + part_path.display().to_string(), + ) + })?; disks[&disk].insert(part); } } diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index fdaf14fad..bbf175702 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -14,7 +14,8 @@ use patch_db::DbHandle; use reqwest::Response; use rpc_toolkit::command; use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt}; +use tokio::process::Command; use self::cleanup::cleanup_failed; use crate::context::RpcContext; @@ -29,6 +30,7 @@ use crate::install::progress::{InstallProgress, InstallProgressTracker}; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::s9pk::reader::S9pkReader; use crate::status::{DependencyErrors, MainStatus, Status}; +use crate::util::io::copy_and_shutdown; use crate::util::{display_none, AsyncFileExt, Version}; use crate::volume::asset_dir; use crate::{Error, ResultExt}; @@ -38,6 +40,8 @@ pub mod progress; pub const PKG_CACHE: &'static str = "package-data/cache"; pub const PKG_PUBLIC_DIR: &'static str = "package-data/public"; +pub const PKG_DOCKER_DIR: &'static str = "package-data/docker"; +pub const PKG_WASM_DIR: &'static str = "package-data/wasm"; #[command(display(display_none))] pub async fn install( @@ -418,21 +422,53 @@ pub async fn install_s9pk( log::info!("Install {}@{}: Unpacking Docker Images", pkg_id, version); progress .track_read_during(progress_model.clone(), &ctx.db, || async { - let mut load = tokio::process::Command::new("docker") + let image_tar_dir = Path::new(PKG_DOCKER_DIR) + .join(pkg_id) + .join(version.as_str()); + if tokio::fs::metadata(&image_tar_dir).await.is_err() { + tokio::fs::create_dir_all(&image_tar_dir) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + image_tar_dir.display().to_string(), + ) + })?; + } + let image_tar_path = image_tar_dir.join("image.tar"); + let mut tee = Command::new("tee") + .arg(&image_tar_path) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + let mut load = Command::new("docker") .arg("load") .stdin(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; - let mut dst = load.stdin.take().ok_or_else(|| { + let tee_in = tee.stdin.take().ok_or_else(|| { + Error::new( + anyhow!("Could not write to stdin of tee"), + crate::ErrorKind::Docker, + ) + })?; + let mut tee_out = tee.stdout.take().ok_or_else(|| { + Error::new( + anyhow!("Could not read from stdout of tee"), + crate::ErrorKind::Docker, + ) + })?; + let load_in = load.stdin.take().ok_or_else(|| { Error::new( anyhow!("Could not write to stdin of docker load"), crate::ErrorKind::Docker, ) })?; - tokio::io::copy(&mut rdr.docker_images().await?, &mut dst).await?; - dst.flush().await?; - dst.shutdown().await?; - drop(dst); + let mut docker_rdr = rdr.docker_images().await?; + tokio::try_join!( + copy_and_shutdown(&mut docker_rdr, tee_in), + copy_and_shutdown(&mut tee_out, load_in), + )?; let res = load.wait_with_output().await?; if !res.status.success() { Err(Error::new( diff --git a/appmgr/src/middleware/encrypt.rs b/appmgr/src/middleware/encrypt.rs index b18f57e30..2153d2b7a 100644 --- a/appmgr/src/middleware/encrypt.rs +++ b/appmgr/src/middleware/encrypt.rs @@ -71,12 +71,15 @@ impl Stream for DecryptStream { pbkdf2::>( this.key.as_bytes(), &this.salt, - 100_000, + 1000, aeskey.as_mut_slice(), ); let ctr = Nonce::::from_slice(&this.ctr); - *this.aes = Some(Aes256Ctr::new(&aeskey, &ctr)); - buf.to_vec().into() + let mut aes = Aes256Ctr::new(dbg!(&aeskey), dbg!(&ctr)); + let mut res = buf.to_vec(); + aes.apply_keystream(&mut res); + *this.aes = Some(aes); + res.into() } else { hyper::body::Bytes::new() } @@ -98,12 +101,7 @@ impl EncryptStream { pub fn new(key: &str, body: Body) -> Self { let prefix: [u8; 32] = rand::random(); let mut aeskey = CipherKey::::default(); - pbkdf2::>( - key.as_bytes(), - &prefix[16..], - 100_000, - aeskey.as_mut_slice(), - ); + pbkdf2::>(key.as_bytes(), &prefix[16..], 1000, aeskey.as_mut_slice()); let ctr = Nonce::::from_slice(&prefix[..16]); let aes = Aes256Ctr::new(&aeskey, &ctr); EncryptStream { @@ -189,6 +187,17 @@ pub fn encrypt(key: Arc) -> DynMiddleware { "Content-Encoding", HeaderValue::from_static("aesctr256"), ); + if let Some(len_header) = + res.headers_mut().get_mut("Content-Length") + { + if let Some(len) = len_header + .to_str() + .ok() + .and_then(|l| l.parse::().ok()) + { + *len_header = HeaderValue::from(len + 32); + } + } let body = std::mem::take(res.body_mut()); *res.body_mut() = Body::wrap_stream( EncryptStream::new(&*key, body), diff --git a/appmgr/src/nginx/main-ui.conf b/appmgr/src/nginx/main-ui.conf index 68a2046cd..1b39a1c57 100644 --- a/appmgr/src/nginx/main-ui.conf +++ b/appmgr/src/nginx/main-ui.conf @@ -14,17 +14,17 @@ server { proxy_http_version 1.1; location /rpc/ { - proxy_pass http://localhost:5959/; + proxy_pass http://127.0.0.1:5959/; } location /ws/ { - proxy_pass http://localhost:5960/; + proxy_pass http://127.0.0.1:5960/; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; } location /marketplace/ { - proxy_pass https://beta-registry-0-3.start9labs.com/; + proxy_pass https://beta-registry-0-3.start9labs.com/; # TODO } location / { diff --git a/appmgr/src/nginx/recovery-ui.conf b/appmgr/src/nginx/recovery-ui.conf index fe65303db..10acc657b 100644 --- a/appmgr/src/nginx/recovery-ui.conf +++ b/appmgr/src/nginx/recovery-ui.conf @@ -14,7 +14,7 @@ server { proxy_http_version 1.1; location /rpc/ { - proxy_pass http://localhost:5959/; + proxy_pass http://127.0.0.1:5959/; } location / { diff --git a/appmgr/src/nginx/setup-wizard.conf b/appmgr/src/nginx/setup-wizard.conf index f42264f99..4521698e6 100644 --- a/appmgr/src/nginx/setup-wizard.conf +++ b/appmgr/src/nginx/setup-wizard.conf @@ -14,7 +14,7 @@ server { proxy_http_version 1.1; location /rpc/ { - proxy_pass http://localhost:5959/; + proxy_pass http://127.0.0.1:5959/; } location / { diff --git a/appmgr/src/util/io.rs b/appmgr/src/util/io.rs new file mode 100644 index 000000000..900d2128d --- /dev/null +++ b/appmgr/src/util/io.rs @@ -0,0 +1,195 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; + +use crate::ResultExt; + +#[derive(Clone, Debug)] +pub struct AsyncCompat(pub T); +impl futures::io::AsyncRead for AsyncCompat +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let mut read_buf = ReadBuf::new(buf); + tokio::io::AsyncRead::poll_read( + unsafe { self.map_unchecked_mut(|a| &mut a.0) }, + cx, + &mut read_buf, + ) + .map(|res| res.map(|_| read_buf.filled().len())) + } +} +impl tokio::io::AsyncRead for AsyncCompat +where + T: futures::io::AsyncRead, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf, + ) -> std::task::Poll> { + futures::io::AsyncRead::poll_read( + unsafe { self.map_unchecked_mut(|a| &mut a.0) }, + cx, + buf.initialize_unfilled(), + ) + .map(|res| res.map(|len| buf.set_filled(len))) + } +} +impl futures::io::AsyncWrite for AsyncCompat +where + T: tokio::io::AsyncWrite, +{ + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + tokio::io::AsyncWrite::poll_write(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf) + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + tokio::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) + } + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + tokio::io::AsyncWrite::poll_shutdown(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) + } +} +impl tokio::io::AsyncWrite for AsyncCompat +where + T: futures::io::AsyncWrite, +{ + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + futures::io::AsyncWrite::poll_write( + unsafe { self.map_unchecked_mut(|a| &mut a.0) }, + cx, + buf, + ) + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + futures::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) + } + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + futures::io::AsyncWrite::poll_close(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) + } +} + +pub async fn from_yaml_async_reader(mut reader: R) -> Result +where + T: for<'de> serde::Deserialize<'de>, + R: AsyncRead + Unpin, +{ + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await?; + serde_yaml::from_slice(&buffer) + .map_err(anyhow::Error::from) + .with_kind(crate::ErrorKind::Deserialization) +} + +pub async fn to_yaml_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> +where + T: serde::Serialize, + W: AsyncWrite + Unpin, +{ + let mut buffer = serde_yaml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?; + buffer.extend_from_slice(b"\n"); + writer.write_all(&buffer).await?; + Ok(()) +} + +pub async fn from_toml_async_reader(mut reader: R) -> Result +where + T: for<'de> serde::Deserialize<'de>, + R: AsyncRead + Unpin, +{ + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await?; + serde_toml::from_slice(&buffer) + .map_err(anyhow::Error::from) + .with_kind(crate::ErrorKind::Deserialization) +} + +pub async fn to_toml_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> +where + T: serde::Serialize, + W: AsyncWrite + Unpin, +{ + let mut buffer = serde_toml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?; + buffer.extend_from_slice(b"\n"); + writer.write_all(&buffer).await?; + Ok(()) +} + +pub async fn from_cbor_async_reader(mut reader: R) -> Result +where + T: for<'de> serde::Deserialize<'de>, + R: AsyncRead + Unpin, +{ + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await?; + serde_cbor::de::from_reader(buffer.as_slice()) + .map_err(anyhow::Error::from) + .with_kind(crate::ErrorKind::Deserialization) +} + +pub async fn from_json_async_reader(mut reader: R) -> Result +where + T: for<'de> serde::Deserialize<'de>, + R: AsyncRead + Unpin, +{ + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await?; + serde_json::from_slice(&buffer) + .map_err(anyhow::Error::from) + .with_kind(crate::ErrorKind::Deserialization) +} + +pub async fn to_json_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> +where + T: serde::Serialize, + W: AsyncWrite + Unpin, +{ + let buffer = serde_json::to_string(value).with_kind(crate::ErrorKind::Serialization)?; + writer.write_all(&buffer.as_bytes()).await?; + Ok(()) +} + +pub async fn to_json_pretty_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> +where + T: serde::Serialize, + W: AsyncWrite + Unpin, +{ + let mut buffer = + serde_json::to_string_pretty(value).with_kind(crate::ErrorKind::Serialization)?; + buffer.push_str("\n"); + writer.write_all(&buffer.as_bytes()).await?; + Ok(()) +} + +pub async fn copy_and_shutdown( + r: &mut R, + mut w: W, +) -> Result<(), std::io::Error> { + tokio::io::copy(r, &mut w).await?; + w.flush().await?; + w.shutdown().await?; + Ok(()) +} diff --git a/appmgr/src/util/mod.rs b/appmgr/src/util/mod.rs index 1f5c300fa..9255ca64c 100644 --- a/appmgr/src/util/mod.rs +++ b/appmgr/src/util/mod.rs @@ -14,13 +14,13 @@ use digest::Digest; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::Value; use tokio::fs::File; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::sync::RwLock; use tokio::task::{JoinError, JoinHandle}; use crate::shutdown::Shutdown; use crate::{Error, ResultExt as _}; +pub mod io; pub mod logger; #[derive(Clone, Copy, Debug)] @@ -38,188 +38,6 @@ impl std::fmt::Display for Never { } impl std::error::Error for Never {} -#[derive(Clone, Debug)] -pub struct AsyncCompat(pub T); -impl futures::io::AsyncRead for AsyncCompat -where - T: tokio::io::AsyncRead, -{ - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll> { - let mut read_buf = ReadBuf::new(buf); - tokio::io::AsyncRead::poll_read( - unsafe { self.map_unchecked_mut(|a| &mut a.0) }, - cx, - &mut read_buf, - ) - .map(|res| res.map(|_| read_buf.filled().len())) - } -} -impl tokio::io::AsyncRead for AsyncCompat -where - T: futures::io::AsyncRead, -{ - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut ReadBuf, - ) -> std::task::Poll> { - futures::io::AsyncRead::poll_read( - unsafe { self.map_unchecked_mut(|a| &mut a.0) }, - cx, - buf.initialize_unfilled(), - ) - .map(|res| res.map(|len| buf.set_filled(len))) - } -} -impl futures::io::AsyncWrite for AsyncCompat -where - T: tokio::io::AsyncWrite, -{ - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - tokio::io::AsyncWrite::poll_write(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf) - } - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - tokio::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) - } - fn poll_close( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - tokio::io::AsyncWrite::poll_shutdown(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) - } -} -impl tokio::io::AsyncWrite for AsyncCompat -where - T: futures::io::AsyncWrite, -{ - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - futures::io::AsyncWrite::poll_write( - unsafe { self.map_unchecked_mut(|a| &mut a.0) }, - cx, - buf, - ) - } - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - futures::io::AsyncWrite::poll_flush(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) - } - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - futures::io::AsyncWrite::poll_close(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx) - } -} - -pub async fn from_yaml_async_reader(mut reader: R) -> Result -where - T: for<'de> serde::Deserialize<'de>, - R: AsyncRead + Unpin, -{ - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await?; - serde_yaml::from_slice(&buffer) - .map_err(anyhow::Error::from) - .with_kind(crate::ErrorKind::Deserialization) -} - -pub async fn to_yaml_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> -where - T: serde::Serialize, - W: AsyncWrite + Unpin, -{ - let mut buffer = serde_yaml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?; - buffer.extend_from_slice(b"\n"); - writer.write_all(&buffer).await?; - Ok(()) -} - -pub async fn from_toml_async_reader(mut reader: R) -> Result -where - T: for<'de> serde::Deserialize<'de>, - R: AsyncRead + Unpin, -{ - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await?; - serde_toml::from_slice(&buffer) - .map_err(anyhow::Error::from) - .with_kind(crate::ErrorKind::Deserialization) -} - -pub async fn to_toml_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> -where - T: serde::Serialize, - W: AsyncWrite + Unpin, -{ - let mut buffer = serde_toml::to_vec(value).with_kind(crate::ErrorKind::Serialization)?; - buffer.extend_from_slice(b"\n"); - writer.write_all(&buffer).await?; - Ok(()) -} - -pub async fn from_cbor_async_reader(mut reader: R) -> Result -where - T: for<'de> serde::Deserialize<'de>, - R: AsyncRead + Unpin, -{ - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await?; - serde_cbor::de::from_reader(buffer.as_slice()) - .map_err(anyhow::Error::from) - .with_kind(crate::ErrorKind::Deserialization) -} - -pub async fn from_json_async_reader(mut reader: R) -> Result -where - T: for<'de> serde::Deserialize<'de>, - R: AsyncRead + Unpin, -{ - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await?; - serde_json::from_slice(&buffer) - .map_err(anyhow::Error::from) - .with_kind(crate::ErrorKind::Deserialization) -} - -pub async fn to_json_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> -where - T: serde::Serialize, - W: AsyncWrite + Unpin, -{ - let buffer = serde_json::to_string(value).with_kind(crate::ErrorKind::Serialization)?; - writer.write_all(&buffer.as_bytes()).await?; - Ok(()) -} - -pub async fn to_json_pretty_async_writer(mut writer: W, value: &T) -> Result<(), crate::Error> -where - T: serde::Serialize, - W: AsyncWrite + Unpin, -{ - let mut buffer = - serde_json::to_string_pretty(value).with_kind(crate::ErrorKind::Serialization)?; - buffer.push_str("\n"); - writer.write_all(&buffer.as_bytes()).await?; - Ok(()) -} - #[async_trait::async_trait] pub trait Invoke { async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result, Error>;