improve Invoke api (#2499)

* improve Invoke api

* fix formatting
This commit is contained in:
Aiden McClelland
2023-11-06 17:26:45 -07:00
committed by GitHub
parent 733000eaa2
commit bf40a9ef6d
6 changed files with 193 additions and 227 deletions

View File

@@ -2,13 +2,12 @@ use std::os::unix::ffi::OsStrExt;
use std::path::Path; use std::path::Path;
use async_trait::async_trait; use async_trait::async_trait;
use color_eyre::eyre::eyre;
use digest::generic_array::GenericArray; use digest::generic_array::GenericArray;
use digest::{Digest, OutputSizeUser}; use digest::{Digest, OutputSizeUser};
use sha2::Sha256; use sha2::Sha256;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::{FileSystem, MountType}; use super::{FileSystem, MountType};
use crate::util::Invoke;
use crate::{Error, ResultExt}; use crate::{Error, ResultExt};
pub async fn mount_ecryptfs<P0: AsRef<Path>, P1: AsRef<Path>>( pub async fn mount_ecryptfs<P0: AsRef<Path>, P1: AsRef<Path>>(
@@ -17,7 +16,7 @@ pub async fn mount_ecryptfs<P0: AsRef<Path>, P1: AsRef<Path>>(
key: &str, key: &str,
) -> Result<(), Error> { ) -> Result<(), Error> {
tokio::fs::create_dir_all(dst.as_ref()).await?; tokio::fs::create_dir_all(dst.as_ref()).await?;
let mut ecryptfs = tokio::process::Command::new("mount") tokio::process::Command::new("mount")
.arg("-t") .arg("-t")
.arg("ecryptfs") .arg("ecryptfs")
.arg(src.as_ref()) .arg(src.as_ref())
@@ -25,22 +24,9 @@ pub async fn mount_ecryptfs<P0: AsRef<Path>, P1: AsRef<Path>>(
.arg("-o") .arg("-o")
// for more information `man ecryptfs` // for more information `man ecryptfs`
.arg(format!("key=passphrase:passphrase_passwd={},ecryptfs_cipher=aes,ecryptfs_key_bytes=32,ecryptfs_passthrough=n,ecryptfs_enable_filename_crypto=y,no_sig_cache", key)) .arg(format!("key=passphrase:passphrase_passwd={},ecryptfs_cipher=aes,ecryptfs_key_bytes=32,ecryptfs_passthrough=n,ecryptfs_enable_filename_crypto=y,no_sig_cache", key))
.stdin(std::process::Stdio::piped()) .input(Some(&mut std::io::Cursor::new(b"\n")))
.stderr(std::process::Stdio::piped()) .invoke(crate::ErrorKind::Filesystem).await?;
.spawn()?;
let mut stdin = ecryptfs.stdin.take().unwrap();
let mut stderr = ecryptfs.stderr.take().unwrap();
stdin.write_all(b"\n").await?;
stdin.flush().await?;
stdin.shutdown().await?;
drop(stdin);
let mut err = String::new();
stderr.read_to_string(&mut err).await?;
if !ecryptfs.wait().await?.success() {
Err(Error::new(eyre!("{}", err), crate::ErrorKind::Filesystem))
} else {
Ok(()) Ok(())
}
} }
pub struct EcryptFS<EncryptedDir: AsRef<Path>, Key: AsRef<str>> { pub struct EcryptFS<EncryptedDir: AsRef<Path>, Key: AsRef<str>> {

View File

@@ -1,9 +1,8 @@
use std::path::Path; use std::path::Path;
use std::process::Stdio;
use async_compression::tokio::bufread::GzipDecoder; use async_compression::tokio::bufread::GzipDecoder;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader}; use tokio::io::{AsyncRead, BufReader};
use tokio::process::Command; use tokio::process::Command;
use crate::disk::fsck::RequiresReboot; use crate::disk::fsck::RequiresReboot;
@@ -44,7 +43,8 @@ pub async fn update_firmware() -> Result<RequiresReboot, Error> {
let mut firmware_read_dir = tokio::fs::read_dir(&firmware_dir).await?; let mut firmware_read_dir = tokio::fs::read_dir(&firmware_dir).await?;
while let Some(entry) = firmware_read_dir.next_entry().await? { while let Some(entry) = firmware_read_dir.next_entry().await? {
let filename = entry.file_name().to_string_lossy().into_owned(); let filename = entry.file_name().to_string_lossy().into_owned();
let rdr: Option<Box<dyn AsyncRead + Unpin>> = if filename.ends_with(".rom.gz") { let rdr: Option<Box<dyn AsyncRead + Unpin + Send>> =
if filename.ends_with(".rom.gz") {
Some(Box::new(GzipDecoder::new(BufReader::new( Some(Box::new(GzipDecoder::new(BufReader::new(
File::open(entry.path()).await?, File::open(entry.path()).await?,
)))) ))))
@@ -54,29 +54,17 @@ pub async fn update_firmware() -> Result<RequiresReboot, Error> {
None None
}; };
if let Some(mut rdr) = rdr { if let Some(mut rdr) = rdr {
let mut flashrom = Command::new("flashrom") Command::new("flashrom")
.arg("-p") .arg("-p")
.arg("internal") .arg("internal")
.arg("-w-") .arg("-w-")
.stdin(Stdio::piped()) .input(Some(&mut rdr))
.spawn()?; .invoke(ErrorKind::Firmware)
let mut rom_dest = flashrom.stdin.take().or_not_found("stdin")?; .await?;
tokio::io::copy(&mut rdr, &mut rom_dest).await?;
rom_dest.flush().await?;
rom_dest.shutdown().await?;
drop(rom_dest);
let o = flashrom.wait_with_output().await?;
if !o.status.success() {
return Err(Error::new(
eyre!("{}", std::str::from_utf8(&o.stderr)?),
ErrorKind::Firmware,
));
} else {
return Ok(RequiresReboot(true)); return Ok(RequiresReboot(true));
} }
} }
} }
} }
}
Ok(RequiresReboot(false)) Ok(RequiresReboot(false))
} }

View File

@@ -2,7 +2,6 @@ use std::collections::BTreeMap;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -49,9 +48,9 @@ use crate::s9pk::manifest::{Manifest, PackageId};
use crate::s9pk::reader::S9pkReader; use crate::s9pk::reader::S9pkReader;
use crate::status::{MainStatus, Status}; use crate::status::{MainStatus, Status};
use crate::util::docker::CONTAINER_TOOL; use crate::util::docker::CONTAINER_TOOL;
use crate::util::io::{copy_and_shutdown, response_to_reader}; use crate::util::io::response_to_reader;
use crate::util::serde::{display_serializable, Port}; use crate::util::serde::{display_serializable, Port};
use crate::util::{display_none, AsyncFileExt, Version}; use crate::util::{display_none, AsyncFileExt, Invoke, Version};
use crate::volume::{asset_dir, script_dir}; use crate::volume::{asset_dir, script_dir};
use crate::{Error, ErrorKind, ResultExt}; use crate::{Error, ErrorKind, ResultExt};
@@ -953,32 +952,11 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
tracing::info!("Install {}@{}: Unpacking Docker Images", pkg_id, version); tracing::info!("Install {}@{}: Unpacking Docker Images", pkg_id, version);
progress progress
.track_read_during(ctx.db.clone(), pkg_id, || async { .track_read_during(ctx.db.clone(), pkg_id, || async {
let mut load = Command::new(CONTAINER_TOOL) Command::new(CONTAINER_TOOL)
.arg("load") .arg("load")
.stdin(Stdio::piped()) .input(Some(&mut rdr.docker_images().await?))
.stderr(Stdio::piped()) .invoke(ErrorKind::Docker)
.spawn()?; .await
let load_in = load.stdin.take().ok_or_else(|| {
Error::new(
eyre!("Could not write to stdin of docker load"),
crate::ErrorKind::Docker,
)
})?;
let mut docker_rdr = rdr.docker_images().await?;
copy_and_shutdown(&mut docker_rdr, load_in).await?;
let res = load.wait_with_output().await?;
if !res.status.success() {
Err(Error::new(
eyre!(
"{}",
String::from_utf8(res.stderr)
.unwrap_or_else(|e| format!("Could not parse stderr: {}", e))
),
crate::ErrorKind::Docker,
))
} else {
Ok(())
}
}) })
.await?; .await?;
tracing::info!("Install {}@{}: Unpacked Docker Images", pkg_id, version,); tracing::info!("Install {}@{}: Unpacked Docker Images", pkg_id, version,);
@@ -1275,57 +1253,36 @@ pub fn load_images<'a, P: AsRef<Path> + 'a + Send + Sync>(
let path = entry.path(); let path = entry.path();
let ext = path.extension().and_then(|ext| ext.to_str()); let ext = path.extension().and_then(|ext| ext.to_str());
if ext == Some("tar") || ext == Some("s9pk") { if ext == Some("tar") || ext == Some("s9pk") {
let mut load = Command::new(CONTAINER_TOOL) if let Err(e) = async {
.arg("load")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let load_in = load.stdin.take().ok_or_else(|| {
Error::new(
eyre!("Could not write to stdin of docker load"),
crate::ErrorKind::Docker,
)
})?;
match ext { match ext {
Some("tar") => { Some("tar") => {
copy_and_shutdown(&mut File::open(&path).await?, load_in) Command::new(CONTAINER_TOOL)
.await? .arg("load")
.input(Some(&mut File::open(&path).await?))
.invoke(ErrorKind::Docker)
.await
}
Some("s9pk") => {
Command::new(CONTAINER_TOOL)
.arg("load")
.input(Some(
&mut S9pkReader::open(&path, true)
.await?
.docker_images()
.await?,
))
.invoke(ErrorKind::Docker)
.await
}
_ => unreachable!(),
} }
Some("s9pk") => match async {
let mut reader = S9pkReader::open(&path, true).await?;
copy_and_shutdown(&mut reader.docker_images().await?, load_in)
.await?;
Ok::<_, Error>(())
} }
.await .await
{ {
Ok(()) => (), tracing::error!("Error loading docker images from s9pk: {e}");
Err(e) => {
tracing::error!(
"Error loading docker images from s9pk: {e}"
);
tracing::debug!("{e:?}"); tracing::debug!("{e:?}");
return Ok(());
} }
},
_ => unreachable!(),
};
let res = load.wait_with_output().await?;
if !res.status.success() {
Err(Error::new(
eyre!(
"{}",
String::from_utf8(res.stderr).unwrap_or_else(|e| format!(
"Could not parse stderr: {}",
e
))
),
crate::ErrorKind::Docker,
))
} else {
Ok(()) Ok(())
}
} else { } else {
Ok(()) Ok(())
} }

View File

@@ -6,9 +6,7 @@ use std::os::unix::prelude::FileTypeExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::Duration; use std::time::Duration;
use async_stream::stream;
use color_eyre::eyre::eyre; use color_eyre::eyre::eyre;
use color_eyre::Report;
use futures::future::{BoxFuture, Either as EitherFuture}; use futures::future::{BoxFuture, Either as EitherFuture};
use futures::{FutureExt, TryStreamExt}; use futures::{FutureExt, TryStreamExt};
use helpers::{NonDetachingJoinHandle, UnixRpcClient}; use helpers::{NonDetachingJoinHandle, UnixRpcClient};

View File

@@ -1,9 +1,7 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use color_eyre::eyre::eyre;
use embassy_container_init::ProcessGroupId; use embassy_container_init::ProcessGroupId;
use helpers::UnixRpcClient; use helpers::UnixRpcClient;
pub use js_engine::JsError; pub use js_engine::JsError;
@@ -17,8 +15,8 @@ use tracing::instrument;
use super::ProcedureName; use super::ProcedureName;
use crate::prelude::*; use crate::prelude::*;
use crate::s9pk::manifest::PackageId; use crate::s9pk::manifest::PackageId;
use crate::util::io::to_json_async_writer; use crate::util::serde::IoFormat;
use crate::util::Version; use crate::util::{Invoke, Version};
use crate::volume::Volumes; use crate::volume::Volumes;
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@@ -83,7 +81,10 @@ impl JsProcedure {
_gid: ProcessGroupId, _gid: ProcessGroupId,
_rpc_client: Option<Arc<UnixRpcClient>>, _rpc_client: Option<Arc<UnixRpcClient>>,
) -> Result<Result<O, (i32, String)>, Error> { ) -> Result<Result<O, (i32, String)>, Error> {
let runner_argument = ExecuteArgs { Command::new("start-deno")
.arg("execute")
.input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec(
&ExecuteArgs {
procedure: self.clone(), procedure: self.clone(),
directory: directory.clone(), directory: directory.clone(),
pkg_id: pkg_id.clone(), pkg_id: pkg_id.clone(),
@@ -91,37 +92,12 @@ impl JsProcedure {
name, name,
volumes: volumes.clone(), volumes: volumes.clone(),
input: input.and_then(|x| serde_json::to_value(x).ok()), input: input.and_then(|x| serde_json::to_value(x).ok()),
}; },
let mut runner = Command::new("start-deno") )?)))
.arg("execute") .timeout(timeout)
.stdin(Stdio::piped()) .invoke(ErrorKind::Javascript)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()?;
to_json_async_writer(
&mut runner.stdin.take().or_not_found("stdin")?,
&runner_argument,
)
.await?;
let res = if let Some(timeout) = timeout {
tokio::time::timeout(timeout, runner.wait_with_output())
.await .await
.with_kind(ErrorKind::Timeout)?? .and_then(|res| IoFormat::Json.from_slice(&res))
} else {
runner.wait_with_output().await?
};
if res.status.success() {
serde_json::from_str::<Result<O, (i32, String)>>(std::str::from_utf8(&res.stdout)?)
.with_kind(ErrorKind::Deserialization)
} else {
Err(Error::new(
eyre!("{}", String::from_utf8(res.stderr)?),
ErrorKind::Javascript,
))
}
} }
#[instrument(skip_all)] #[instrument(skip_all)]
@@ -135,7 +111,10 @@ impl JsProcedure {
timeout: Option<Duration>, timeout: Option<Duration>,
name: ProcedureName, name: ProcedureName,
) -> Result<Result<O, (i32, String)>, Error> { ) -> Result<Result<O, (i32, String)>, Error> {
let runner_argument = ExecuteArgs { Command::new("start-deno")
.arg("sandbox")
.input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec(
&ExecuteArgs {
procedure: self.clone(), procedure: self.clone(),
directory: directory.clone(), directory: directory.clone(),
pkg_id: pkg_id.clone(), pkg_id: pkg_id.clone(),
@@ -143,37 +122,12 @@ impl JsProcedure {
name, name,
volumes: volumes.clone(), volumes: volumes.clone(),
input: input.and_then(|x| serde_json::to_value(x).ok()), input: input.and_then(|x| serde_json::to_value(x).ok()),
}; },
let mut runner = Command::new("start-deno") )?)))
.arg("sandbox") .timeout(timeout)
.stdin(Stdio::piped()) .invoke(ErrorKind::Javascript)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()?;
to_json_async_writer(
&mut runner.stdin.take().or_not_found("stdin")?,
&runner_argument,
)
.await?;
let res = if let Some(timeout) = timeout {
tokio::time::timeout(timeout, runner.wait_with_output())
.await .await
.with_kind(ErrorKind::Timeout)?? .and_then(|res| IoFormat::Json.from_slice(&res))
} else {
runner.wait_with_output().await?
};
if res.status.success() {
serde_json::from_str::<Result<O, (i32, String)>>(std::str::from_utf8(&res.stdout)?)
.with_kind(ErrorKind::Deserialization)
} else {
Err(Error::new(
eyre!("{}", String::from_utf8(res.stderr)?),
ErrorKind::Javascript,
))
}
} }
#[instrument(skip_all)] #[instrument(skip_all)]

View File

@@ -50,30 +50,113 @@ impl std::fmt::Display for Never {
impl std::error::Error for Never {} impl std::error::Error for Never {}
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Invoke { pub trait Invoke<'a> {
type Extended<'ext>
where
Self: 'ext,
'ext: 'a;
fn timeout<'ext: 'a>(&'ext mut self, timeout: Option<Duration>) -> Self::Extended<'ext>;
fn input<'ext: 'a, Input: tokio::io::AsyncRead + Unpin + Send>(
&'ext mut self,
input: Option<&'ext mut Input>,
) -> Self::Extended<'ext>;
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error>; async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error>;
async fn invoke_timeout(
&mut self,
error_kind: crate::ErrorKind,
timeout: Option<Duration>,
) -> Result<Vec<u8>, Error>;
} }
#[async_trait::async_trait]
impl Invoke for tokio::process::Command { pub struct ExtendedCommand<'a> {
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error> { cmd: &'a mut tokio::process::Command,
self.invoke_timeout(error_kind, None).await
}
async fn invoke_timeout(
&mut self,
error_kind: crate::ErrorKind,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<Vec<u8>, Error> { input: Option<&'a mut (dyn tokio::io::AsyncRead + Unpin + Send)>,
self.kill_on_drop(true); }
self.stdout(Stdio::piped()); impl<'a> std::ops::Deref for ExtendedCommand<'a> {
self.stderr(Stdio::piped()); type Target = tokio::process::Command;
let res = match timeout { fn deref(&self) -> &Self::Target {
None => self.output().await?, &*self.cmd
Some(t) => tokio::time::timeout(t, self.output()) }
}
impl<'a> std::ops::DerefMut for ExtendedCommand<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.cmd
}
}
#[async_trait::async_trait]
impl<'a> Invoke<'a> for tokio::process::Command {
type Extended<'ext> = ExtendedCommand<'ext>
where
Self: 'ext,
'ext: 'a;
fn timeout<'ext: 'a>(&'ext mut self, timeout: Option<Duration>) -> Self::Extended<'ext> {
ExtendedCommand {
cmd: self,
timeout,
input: None,
}
}
fn input<'ext: 'a, Input: tokio::io::AsyncRead + Unpin + Send>(
&'ext mut self,
input: Option<&'ext mut Input>,
) -> Self::Extended<'ext> {
ExtendedCommand {
cmd: self,
timeout: None,
input: if let Some(input) = input {
Some(&mut *input)
} else {
None
},
}
}
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error> {
ExtendedCommand {
cmd: self,
timeout: None,
input: None,
}
.invoke(error_kind)
.await
}
}
#[async_trait::async_trait]
impl<'a> Invoke<'a> for ExtendedCommand<'a> {
type Extended<'ext> = &'ext mut ExtendedCommand<'ext>
where
Self: 'ext,
'ext: 'a;
fn timeout<'ext: 'a>(&'ext mut self, timeout: Option<Duration>) -> Self::Extended<'ext> {
self.timeout = timeout;
self
}
fn input<'ext: 'a, Input: tokio::io::AsyncRead + Unpin + Send>(
&'ext mut self,
input: Option<&'ext mut Input>,
) -> Self::Extended<'ext> {
self.input = if let Some(input) = input {
Some(&mut *input)
} else {
None
};
self
}
async fn invoke(&mut self, error_kind: crate::ErrorKind) -> Result<Vec<u8>, Error> {
self.cmd.kill_on_drop(true);
if self.input.is_some() {
self.cmd.stdin(Stdio::piped());
}
self.cmd.stdout(Stdio::piped());
self.cmd.stderr(Stdio::piped());
let mut child = self.cmd.spawn()?;
if let (Some(mut stdin), Some(input)) = (child.stdin.take(), self.input.take()) {
use tokio::io::AsyncWriteExt;
tokio::io::copy(input, &mut stdin).await?;
stdin.flush().await?;
stdin.shutdown().await?;
drop(stdin);
}
let res = match self.timeout {
None => child.wait_with_output().await?,
Some(t) => tokio::time::timeout(t, child.wait_with_output())
.await .await
.with_kind(ErrorKind::Timeout)??, .with_kind(ErrorKind::Timeout)??,
}; };