From bf40a9ef6d6402773030982961b1ada1a3672795 Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Mon, 6 Nov 2023 17:26:45 -0700 Subject: [PATCH] improve Invoke api (#2499) * improve Invoke api * fix formatting --- backend/src/disk/mount/filesystem/ecryptfs.rs | 24 +--- backend/src/firmware.rs | 44 +++--- backend/src/install/mod.rs | 111 +++++----------- backend/src/procedure/docker.rs | 2 - backend/src/procedure/js_scripts.rs | 114 +++++----------- backend/src/util/mod.rs | 125 +++++++++++++++--- 6 files changed, 193 insertions(+), 227 deletions(-) diff --git a/backend/src/disk/mount/filesystem/ecryptfs.rs b/backend/src/disk/mount/filesystem/ecryptfs.rs index 3c828f4c7..78570f49b 100644 --- a/backend/src/disk/mount/filesystem/ecryptfs.rs +++ b/backend/src/disk/mount/filesystem/ecryptfs.rs @@ -2,13 +2,12 @@ use std::os::unix::ffi::OsStrExt; use std::path::Path; use async_trait::async_trait; -use color_eyre::eyre::eyre; use digest::generic_array::GenericArray; use digest::{Digest, OutputSizeUser}; use sha2::Sha256; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::{FileSystem, MountType}; +use crate::util::Invoke; use crate::{Error, ResultExt}; pub async fn mount_ecryptfs, P1: AsRef>( @@ -17,7 +16,7 @@ pub async fn mount_ecryptfs, P1: AsRef>( key: &str, ) -> Result<(), Error> { tokio::fs::create_dir_all(dst.as_ref()).await?; - let mut ecryptfs = tokio::process::Command::new("mount") + tokio::process::Command::new("mount") .arg("-t") .arg("ecryptfs") .arg(src.as_ref()) @@ -25,22 +24,9 @@ pub async fn mount_ecryptfs, P1: AsRef>( .arg("-o") // for more information `man ecryptfs` .arg(format!("key=passphrase:passphrase_passwd={},ecryptfs_cipher=aes,ecryptfs_key_bytes=32,ecryptfs_passthrough=n,ecryptfs_enable_filename_crypto=y,no_sig_cache", key)) - .stdin(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn()?; - let mut stdin = ecryptfs.stdin.take().unwrap(); - let mut stderr = ecryptfs.stderr.take().unwrap(); - stdin.write_all(b"\n").await?; - stdin.flush().await?; - stdin.shutdown().await?; - drop(stdin); - let mut err = String::new(); - stderr.read_to_string(&mut err).await?; - if !ecryptfs.wait().await?.success() { - Err(Error::new(eyre!("{}", err), crate::ErrorKind::Filesystem)) - } else { - Ok(()) - } + .input(Some(&mut std::io::Cursor::new(b"\n"))) + .invoke(crate::ErrorKind::Filesystem).await?; + Ok(()) } pub struct EcryptFS, Key: AsRef> { diff --git a/backend/src/firmware.rs b/backend/src/firmware.rs index 708ae2169..9f3e3c52c 100644 --- a/backend/src/firmware.rs +++ b/backend/src/firmware.rs @@ -1,9 +1,8 @@ use std::path::Path; -use std::process::Stdio; use async_compression::tokio::bufread::GzipDecoder; use tokio::fs::File; -use tokio::io::{AsyncRead, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncRead, BufReader}; use tokio::process::Command; use crate::disk::fsck::RequiresReboot; @@ -44,36 +43,25 @@ pub async fn update_firmware() -> Result { let mut firmware_read_dir = tokio::fs::read_dir(&firmware_dir).await?; while let Some(entry) = firmware_read_dir.next_entry().await? { let filename = entry.file_name().to_string_lossy().into_owned(); - let rdr: Option> = if filename.ends_with(".rom.gz") { - Some(Box::new(GzipDecoder::new(BufReader::new( - File::open(entry.path()).await?, - )))) - } else if filename.ends_with(".rom") { - Some(Box::new(File::open(entry.path()).await?)) - } else { - None - }; + let rdr: Option> = + if filename.ends_with(".rom.gz") { + Some(Box::new(GzipDecoder::new(BufReader::new( + File::open(entry.path()).await?, + )))) + } else if filename.ends_with(".rom") { + Some(Box::new(File::open(entry.path()).await?)) + } else { + None + }; if let Some(mut rdr) = rdr { - let mut flashrom = Command::new("flashrom") + Command::new("flashrom") .arg("-p") .arg("internal") .arg("-w-") - .stdin(Stdio::piped()) - .spawn()?; - let mut rom_dest = flashrom.stdin.take().or_not_found("stdin")?; - tokio::io::copy(&mut rdr, &mut rom_dest).await?; - rom_dest.flush().await?; - rom_dest.shutdown().await?; - drop(rom_dest); - let o = flashrom.wait_with_output().await?; - if !o.status.success() { - return Err(Error::new( - eyre!("{}", std::str::from_utf8(&o.stderr)?), - ErrorKind::Firmware, - )); - } else { - return Ok(RequiresReboot(true)); - } + .input(Some(&mut rdr)) + .invoke(ErrorKind::Firmware) + .await?; + return Ok(RequiresReboot(true)); } } } diff --git a/backend/src/install/mod.rs b/backend/src/install/mod.rs index b46be0c4c..e2089608e 100644 --- a/backend/src/install/mod.rs +++ b/backend/src/install/mod.rs @@ -2,7 +2,6 @@ use std::collections::BTreeMap; use std::io::SeekFrom; use std::marker::PhantomData; use std::path::{Path, PathBuf}; -use std::process::Stdio; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -49,9 +48,9 @@ use crate::s9pk::manifest::{Manifest, PackageId}; use crate::s9pk::reader::S9pkReader; use crate::status::{MainStatus, Status}; use crate::util::docker::CONTAINER_TOOL; -use crate::util::io::{copy_and_shutdown, response_to_reader}; +use crate::util::io::response_to_reader; use crate::util::serde::{display_serializable, Port}; -use crate::util::{display_none, AsyncFileExt, Version}; +use crate::util::{display_none, AsyncFileExt, Invoke, Version}; use crate::volume::{asset_dir, script_dir}; use crate::{Error, ErrorKind, ResultExt}; @@ -953,32 +952,11 @@ pub async fn install_s9pk( tracing::info!("Install {}@{}: Unpacking Docker Images", pkg_id, version); progress .track_read_during(ctx.db.clone(), pkg_id, || async { - let mut load = Command::new(CONTAINER_TOOL) + Command::new(CONTAINER_TOOL) .arg("load") - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - let load_in = load.stdin.take().ok_or_else(|| { - Error::new( - eyre!("Could not write to stdin of docker load"), - crate::ErrorKind::Docker, - ) - })?; - let mut docker_rdr = rdr.docker_images().await?; - copy_and_shutdown(&mut docker_rdr, load_in).await?; - let res = load.wait_with_output().await?; - if !res.status.success() { - Err(Error::new( - eyre!( - "{}", - String::from_utf8(res.stderr) - .unwrap_or_else(|e| format!("Could not parse stderr: {}", e)) - ), - crate::ErrorKind::Docker, - )) - } else { - Ok(()) - } + .input(Some(&mut rdr.docker_images().await?)) + .invoke(ErrorKind::Docker) + .await }) .await?; tracing::info!("Install {}@{}: Unpacked Docker Images", pkg_id, version,); @@ -1275,57 +1253,36 @@ pub fn load_images<'a, P: AsRef + 'a + Send + Sync>( let path = entry.path(); let ext = path.extension().and_then(|ext| ext.to_str()); if ext == Some("tar") || ext == Some("s9pk") { - let mut load = Command::new(CONTAINER_TOOL) - .arg("load") - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - let load_in = load.stdin.take().ok_or_else(|| { - Error::new( - eyre!("Could not write to stdin of docker load"), - crate::ErrorKind::Docker, - ) - })?; - match ext { - Some("tar") => { - copy_and_shutdown(&mut File::open(&path).await?, load_in) - .await? - } - Some("s9pk") => match async { - let mut reader = S9pkReader::open(&path, true).await?; - copy_and_shutdown(&mut reader.docker_images().await?, load_in) - .await?; - Ok::<_, Error>(()) - } - .await - { - Ok(()) => (), - Err(e) => { - tracing::error!( - "Error loading docker images from s9pk: {e}" - ); - tracing::debug!("{e:?}"); - return Ok(()); + if let Err(e) = async { + match ext { + Some("tar") => { + Command::new(CONTAINER_TOOL) + .arg("load") + .input(Some(&mut File::open(&path).await?)) + .invoke(ErrorKind::Docker) + .await } - }, - _ => unreachable!(), - }; - - let res = load.wait_with_output().await?; - if !res.status.success() { - Err(Error::new( - eyre!( - "{}", - String::from_utf8(res.stderr).unwrap_or_else(|e| format!( - "Could not parse stderr: {}", - e - )) - ), - crate::ErrorKind::Docker, - )) - } else { - Ok(()) + Some("s9pk") => { + Command::new(CONTAINER_TOOL) + .arg("load") + .input(Some( + &mut S9pkReader::open(&path, true) + .await? + .docker_images() + .await?, + )) + .invoke(ErrorKind::Docker) + .await + } + _ => unreachable!(), + } } + .await + { + tracing::error!("Error loading docker images from s9pk: {e}"); + tracing::debug!("{e:?}"); + } + Ok(()) } else { Ok(()) } diff --git a/backend/src/procedure/docker.rs b/backend/src/procedure/docker.rs index 3f72993a0..57207d5c9 100644 --- a/backend/src/procedure/docker.rs +++ b/backend/src/procedure/docker.rs @@ -6,9 +6,7 @@ use std::os::unix::prelude::FileTypeExt; use std::path::{Path, PathBuf}; use std::time::Duration; -use async_stream::stream; use color_eyre::eyre::eyre; -use color_eyre::Report; use futures::future::{BoxFuture, Either as EitherFuture}; use futures::{FutureExt, TryStreamExt}; use helpers::{NonDetachingJoinHandle, UnixRpcClient}; diff --git a/backend/src/procedure/js_scripts.rs b/backend/src/procedure/js_scripts.rs index 3e5d6dfa2..27756b4a3 100644 --- a/backend/src/procedure/js_scripts.rs +++ b/backend/src/procedure/js_scripts.rs @@ -1,9 +1,7 @@ use std::path::{Path, PathBuf}; -use std::process::Stdio; use std::sync::Arc; use std::time::Duration; -use color_eyre::eyre::eyre; use embassy_container_init::ProcessGroupId; use helpers::UnixRpcClient; pub use js_engine::JsError; @@ -17,8 +15,8 @@ use tracing::instrument; use super::ProcedureName; use crate::prelude::*; use crate::s9pk::manifest::PackageId; -use crate::util::io::to_json_async_writer; -use crate::util::Version; +use crate::util::serde::IoFormat; +use crate::util::{Invoke, Version}; use crate::volume::Volumes; #[derive(Debug, Serialize, Deserialize, Clone)] @@ -83,45 +81,23 @@ impl JsProcedure { _gid: ProcessGroupId, _rpc_client: Option>, ) -> Result, Error> { - let runner_argument = ExecuteArgs { - procedure: self.clone(), - directory: directory.clone(), - pkg_id: pkg_id.clone(), - pkg_version: pkg_version.clone(), - name, - volumes: volumes.clone(), - input: input.and_then(|x| serde_json::to_value(x).ok()), - }; - let mut runner = Command::new("start-deno") + Command::new("start-deno") .arg("execute") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn()?; - to_json_async_writer( - &mut runner.stdin.take().or_not_found("stdin")?, - &runner_argument, - ) - .await?; - - let res = if let Some(timeout) = timeout { - tokio::time::timeout(timeout, runner.wait_with_output()) - .await - .with_kind(ErrorKind::Timeout)?? - } else { - runner.wait_with_output().await? - }; - - if res.status.success() { - serde_json::from_str::>(std::str::from_utf8(&res.stdout)?) - .with_kind(ErrorKind::Deserialization) - } else { - Err(Error::new( - eyre!("{}", String::from_utf8(res.stderr)?), - ErrorKind::Javascript, - )) - } + .input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec( + &ExecuteArgs { + procedure: self.clone(), + directory: directory.clone(), + pkg_id: pkg_id.clone(), + pkg_version: pkg_version.clone(), + name, + volumes: volumes.clone(), + input: input.and_then(|x| serde_json::to_value(x).ok()), + }, + )?))) + .timeout(timeout) + .invoke(ErrorKind::Javascript) + .await + .and_then(|res| IoFormat::Json.from_slice(&res)) } #[instrument(skip_all)] @@ -135,45 +111,23 @@ impl JsProcedure { timeout: Option, name: ProcedureName, ) -> Result, Error> { - let runner_argument = ExecuteArgs { - procedure: self.clone(), - directory: directory.clone(), - pkg_id: pkg_id.clone(), - pkg_version: pkg_version.clone(), - name, - volumes: volumes.clone(), - input: input.and_then(|x| serde_json::to_value(x).ok()), - }; - let mut runner = Command::new("start-deno") + Command::new("start-deno") .arg("sandbox") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn()?; - to_json_async_writer( - &mut runner.stdin.take().or_not_found("stdin")?, - &runner_argument, - ) - .await?; - - let res = if let Some(timeout) = timeout { - tokio::time::timeout(timeout, runner.wait_with_output()) - .await - .with_kind(ErrorKind::Timeout)?? - } else { - runner.wait_with_output().await? - }; - - if res.status.success() { - serde_json::from_str::>(std::str::from_utf8(&res.stdout)?) - .with_kind(ErrorKind::Deserialization) - } else { - Err(Error::new( - eyre!("{}", String::from_utf8(res.stderr)?), - ErrorKind::Javascript, - )) - } + .input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec( + &ExecuteArgs { + procedure: self.clone(), + directory: directory.clone(), + pkg_id: pkg_id.clone(), + pkg_version: pkg_version.clone(), + name, + volumes: volumes.clone(), + input: input.and_then(|x| serde_json::to_value(x).ok()), + }, + )?))) + .timeout(timeout) + .invoke(ErrorKind::Javascript) + .await + .and_then(|res| IoFormat::Json.from_slice(&res)) } #[instrument(skip_all)] diff --git a/backend/src/util/mod.rs b/backend/src/util/mod.rs index 5ee9cbe5f..2683f23c8 100644 --- a/backend/src/util/mod.rs +++ b/backend/src/util/mod.rs @@ -50,30 +50,113 @@ impl std::fmt::Display for Never { impl std::error::Error for Never {} #[async_trait::async_trait] -pub trait Invoke { +pub trait Invoke<'a> { + type Extended<'ext> + where + Self: 'ext, + 'ext: 'a; + fn timeout<'ext: 'a>(&'ext mut self, timeout: Option) -> Self::Extended<'ext>; + fn input<'ext: 'a, Input: tokio::io::AsyncRead + Unpin + Send>( + &'ext mut self, + input: Option<&'ext mut Input>, + ) -> Self::Extended<'ext>; async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result, Error>; - async fn invoke_timeout( - &mut self, - error_kind: crate::ErrorKind, - timeout: Option, - ) -> Result, Error>; } -#[async_trait::async_trait] -impl Invoke for tokio::process::Command { - async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result, Error> { - self.invoke_timeout(error_kind, None).await + +pub struct ExtendedCommand<'a> { + cmd: &'a mut tokio::process::Command, + timeout: Option, + input: Option<&'a mut (dyn tokio::io::AsyncRead + Unpin + Send)>, +} +impl<'a> std::ops::Deref for ExtendedCommand<'a> { + type Target = tokio::process::Command; + fn deref(&self) -> &Self::Target { + &*self.cmd } - async fn invoke_timeout( - &mut self, - error_kind: crate::ErrorKind, - timeout: Option, - ) -> Result, Error> { - self.kill_on_drop(true); - self.stdout(Stdio::piped()); - self.stderr(Stdio::piped()); - let res = match timeout { - None => self.output().await?, - Some(t) => tokio::time::timeout(t, self.output()) +} +impl<'a> std::ops::DerefMut for ExtendedCommand<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.cmd + } +} + +#[async_trait::async_trait] +impl<'a> Invoke<'a> for tokio::process::Command { + type Extended<'ext> = ExtendedCommand<'ext> + where + Self: 'ext, + 'ext: 'a; + fn timeout<'ext: 'a>(&'ext mut self, timeout: Option) -> Self::Extended<'ext> { + ExtendedCommand { + cmd: self, + timeout, + input: None, + } + } + fn input<'ext: 'a, Input: tokio::io::AsyncRead + Unpin + Send>( + &'ext mut self, + input: Option<&'ext mut Input>, + ) -> Self::Extended<'ext> { + ExtendedCommand { + cmd: self, + timeout: None, + input: if let Some(input) = input { + Some(&mut *input) + } else { + None + }, + } + } + async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result, Error> { + ExtendedCommand { + cmd: self, + timeout: None, + input: None, + } + .invoke(error_kind) + .await + } +} + +#[async_trait::async_trait] +impl<'a> Invoke<'a> for ExtendedCommand<'a> { + type Extended<'ext> = &'ext mut ExtendedCommand<'ext> + where + Self: 'ext, + 'ext: 'a; + fn timeout<'ext: 'a>(&'ext mut self, timeout: Option) -> Self::Extended<'ext> { + self.timeout = timeout; + self + } + fn input<'ext: 'a, Input: tokio::io::AsyncRead + Unpin + Send>( + &'ext mut self, + input: Option<&'ext mut Input>, + ) -> Self::Extended<'ext> { + self.input = if let Some(input) = input { + Some(&mut *input) + } else { + None + }; + self + } + async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result, Error> { + self.cmd.kill_on_drop(true); + if self.input.is_some() { + self.cmd.stdin(Stdio::piped()); + } + self.cmd.stdout(Stdio::piped()); + self.cmd.stderr(Stdio::piped()); + let mut child = self.cmd.spawn()?; + if let (Some(mut stdin), Some(input)) = (child.stdin.take(), self.input.take()) { + use tokio::io::AsyncWriteExt; + tokio::io::copy(input, &mut stdin).await?; + stdin.flush().await?; + stdin.shutdown().await?; + drop(stdin); + } + let res = match self.timeout { + None => child.wait_with_output().await?, + Some(t) => tokio::time::timeout(t, child.wait_with_output()) .await .with_kind(ErrorKind::Timeout)??, };