chore: Remove the other procedures since all are now via the js

This commit is contained in:
J H
2023-11-10 09:26:00 -07:00
parent b5da076e2c
commit 94d22ed1aa
6 changed files with 77 additions and 1225 deletions

View File

@@ -34,9 +34,10 @@ async fn execute(
input,
} = arg;
PackageLogger::init(&pkg_id);
procedure
.execute_impl(&directory, &pkg_id, &pkg_version, name, &volumes, input)
.await
// procedure
// .execute_impl(&directory, &pkg_id, &pkg_version, name, &volumes, input)
// .await
todo!("@DRB Remove")
}
#[command(cli_only, display(display_serializable))]
async fn sandbox(
@@ -52,9 +53,10 @@ async fn sandbox(
input,
} = arg;
PackageLogger::init(&pkg_id);
procedure
.sandboxed_impl(&directory, &pkg_id, &pkg_version, &volumes, input, name)
.await
// procedure
// .sandboxed_impl(&directory, &pkg_id, &pkg_version, &volumes, input, name)
// .await
todo!("@DRB Remove")
}
use tracing::Subscriber;

View File

@@ -13,6 +13,7 @@ use models::{ErrorKind, OptionExt, PackageId};
use nix::sys::signal::Signal;
use persistent_container::PersistentContainer;
use rand::SeedableRng;
use serde::de::DeserializeOwned;
use sqlx::Connection;
use start_stop::StartStop;
use tokio::sync::watch::{self, Sender};
@@ -328,6 +329,38 @@ impl Manager {
let transition = self.transition.borrow();
matches!(*transition, TransitionState::BackingUp(_))
}
pub async fn execute<O>(
&self,
name: ProcedureName,
input: Value,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error>
where
O: DeserializeOwned,
{
self.persistent_container
.execute(name, input, timeout)
.await
}
pub async fn sanboxed<O>(
&self,
name: ProcedureName,
input: Value,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error>
where
O: DeserializeOwned,
{
self.persistent_container
.sanboxed(name, input, timeout)
.await
}
pub async fn send_signal(&self, gid: Arc<Gid>, signal: Signal) -> Result<(), Error> {
self.persistent_container.send_signal(gid, signal).await
}
}
#[instrument(skip_all)]
@@ -817,36 +850,5 @@ async fn get_running_ip(seed: &ManagerSeed, mut runtime: &mut RuntimeOfCommand)
}
async fn send_signal(manager: &Manager, gid: Arc<Gid>, signal: Signal) -> Result<(), Error> {
// stop health checks from committing their results
// shared
// .commit_health_check_results
// .store(false, Ordering::SeqCst);
let rpc_client = manager.rpc_client();
let main_gid = *gid.main_gid.0.borrow();
let next_gid = gid.new_gid();
#[cfg(feature = "js_engine")]
if let Err(e) = crate::procedure::js_scripts::JsProcedure::default()
.execute::<_, NoOutput>(
&manager.seed.ctx.datadir,
&manager.seed.manifest.id,
&manager.seed.manifest.version,
ProcedureName::Signal,
&manager.seed.manifest.volumes,
Some(embassy_container_init::SignalGroupParams {
gid: main_gid,
signal: signal as u32,
}),
None, // TODO
next_gid,
Some(rpc_client),
)
.await?
{
tracing::error!("Failed to send js signal: {}", e.1);
tracing::debug!("{:?}", e);
}
Ok(())
manager.send_signal(gid, signal).await
}

View File

@@ -4,6 +4,7 @@ use std::time::Duration;
use color_eyre::eyre::eyre;
use helpers::UnixRpcClient;
use models::ProcedureName;
use nix::sys::signal::Signal;
use serde::de::DeserializeOwned;
use tokio::sync::watch::{self, Receiver};
use tokio::sync::{oneshot, Mutex};
@@ -30,6 +31,9 @@ pub struct PersistentContainer {
procedures: Mutex<Vec<(ProcedureName, ProcedureId)>>,
}
// BLUJ TODO Need to get the only action is this and not procedure/<docker,js_scripts>
// BLUJ Modify the rpc client to match the new type
impl PersistentContainer {
#[instrument(skip_all)]
pub async fn init(seed: &Arc<ManagerSeed>) -> Result<Self, Error> {
@@ -71,6 +75,7 @@ impl PersistentContainer {
Err(e) => Err(e),
}
}
pub async fn sanboxed<O>(
&self,
name: ProcedureName,
@@ -119,6 +124,10 @@ impl PersistentContainer {
) -> Result<Result<Value, (i32, String)>, Error> {
todo!("DRB")
}
pub async fn send_signal(&self, gid: Arc<super::Gid>, signal: Signal) -> Result<(), Error> {
todo!("DRB")
}
}
pub async fn spawn_persistent_container(

View File

@@ -217,443 +217,6 @@ impl DockerProcedure {
Ok(())
}
#[instrument(skip_all)]
pub async fn execute<I: Serialize, O: DeserializeOwned>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
name: ProcedureName,
volumes: &Volumes,
input: Option<I>,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error> {
let name = name.docker_name();
let name: Option<&str> = name.as_deref();
let mut cmd = tokio::process::Command::new(CONTAINER_TOOL);
let container_name = Self::container_name(pkg_id, name);
cmd.arg("run")
.arg("--rm")
.arg("--network=start9")
.arg(format!("--add-host=embassy:{}", Ipv4Addr::from(HOST_IP)))
.arg("--name")
.arg(&container_name)
.arg(format!("--hostname={}", &container_name))
.arg("--no-healthcheck")
.kill_on_drop(true);
remove_container(&container_name, true).await?;
cmd.args(self.docker_args(ctx, pkg_id, pkg_version, volumes).await?);
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
cmd.stdin(std::process::Stdio::piped());
Some(format.to_vec(input)?)
} else {
None
};
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
tracing::trace!(
"{}",
format!("{:?}", cmd)
.split(r#"" ""#)
.collect::<Vec<&str>>()
.join(" ")
);
let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?;
let id = handle.id();
let timeout_fut = if let Some(timeout) = timeout {
EitherFuture::Right(async move {
tokio::time::sleep(timeout).await;
Ok(())
})
} else {
EitherFuture::Left(futures::future::pending::<Result<_, Error>>())
};
if let (Some(input), Some(mut stdin)) = (&input_buf, handle.stdin.take()) {
use tokio::io::AsyncWriteExt;
stdin
.write_all(input)
.await
.with_kind(crate::ErrorKind::Docker)?;
stdin.flush().await?;
stdin.shutdown().await?;
drop(stdin);
}
enum Race<T> {
Done(T),
TimedOut,
}
let io_format = self.io_format;
let mut output = BufReader::new(
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stdout in execute"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
match async {
if let Some(format) = io_format {
return match max_by_lines(&mut output, None).await {
MaxByLines::Done(buffer) => {
Ok::<Value, Error>(
match format.from_slice(buffer.as_bytes()) {
Ok(a) => a,
Err(e) => {
tracing::trace!(
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
format,
e
);
Value::String(buffer)
}
},
)
},
MaxByLines::Error(e) => Err(e),
MaxByLines::Overflow(buffer) => Ok(Value::String(buffer))
}
}
let lines = buf_reader_to_lines(&mut output, 1000).await?;
if lines.is_empty() {
return Ok(Value::Null);
}
let joined_output = lines.join("\n");
Ok(Value::String(joined_output))
}.await {
Ok(a) => Ok((a, output)),
Err(e) => Err((e, output))
}
}));
let err_output = BufReader::new(
handle
.stderr
.take()
.ok_or_else(|| eyre!("Can't takeout std err"))
.with_kind(crate::ErrorKind::Docker)?,
);
let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
let lines = buf_reader_to_lines(err_output, 1000).await?;
let joined_output = lines.join("\n");
Ok::<_, Error>(joined_output)
}));
let res = tokio::select! {
res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?),
res = timeout_fut => {
res?;
Race::TimedOut
},
};
let exit_status = match res {
Race::Done(x) => x,
Race::TimedOut => {
if let Some(id) = id {
signal::kill(Pid::from_raw(id as i32), signal::SIGKILL)
.with_kind(crate::ErrorKind::Docker)?;
}
return Ok(Err((143, "Timed out. Retrying soon...".to_owned())));
}
};
Ok(
if exit_status.success() || exit_status.code() == Some(143) {
Ok(serde_json::from_value(
output
.await
.with_kind(crate::ErrorKind::Unknown)?
.map(|(v, _)| v)
.map_err(|(e, _)| tracing::warn!("{}", e))
.unwrap_or_default(),
)
.with_kind(crate::ErrorKind::Deserialization)?)
} else {
Err((
exit_status.code().unwrap_or_default(),
err_output.await.with_kind(crate::ErrorKind::Unknown)??,
))
},
)
}
#[instrument(skip_all)]
pub async fn inject<I: Serialize, O: DeserializeOwned>(
&self,
_ctx: &RpcContext,
pkg_id: &PackageId,
_pkg_version: &Version,
_name: ProcedureName,
_volumes: &Volumes,
input: Option<I>,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error> {
let mut cmd = tokio::process::Command::new(CONTAINER_TOOL);
cmd.arg("exec");
cmd.args(self.docker_args_inject(pkg_id));
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
cmd.stdin(std::process::Stdio::piped());
Some(format.to_vec(input)?)
} else {
None
};
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
tracing::trace!(
"{}",
format!("{:?}", cmd)
.split(r#"" ""#)
.collect::<Vec<&str>>()
.join(" ")
);
let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?;
let id = handle.id();
let timeout_fut = if let Some(timeout) = timeout {
EitherFuture::Right(async move {
tokio::time::sleep(timeout).await;
Ok(())
})
} else {
EitherFuture::Left(futures::future::pending::<Result<_, Error>>())
};
if let (Some(input), Some(mut stdin)) = (&input_buf, handle.stdin.take()) {
use tokio::io::AsyncWriteExt;
stdin
.write_all(input)
.await
.with_kind(crate::ErrorKind::Docker)?;
stdin.flush().await?;
stdin.shutdown().await?;
drop(stdin);
}
enum Race<T> {
Done(T),
TimedOut,
}
let io_format = self.io_format;
let mut output = BufReader::new(
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stdout in inject"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
match async {
if let Some(format) = io_format {
return match max_by_lines(&mut output, None).await {
MaxByLines::Done(buffer) => {
Ok::<Value, Error>(
match format.from_slice(buffer.as_bytes()) {
Ok(a) => a,
Err(e) => {
tracing::trace!(
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
format,
e
);
Value::String(buffer)
}
},
)
},
MaxByLines::Error(e) => Err(e),
MaxByLines::Overflow(buffer) => Ok(Value::String(buffer))
}
}
let lines = buf_reader_to_lines(&mut output, 1000).await?;
if lines.is_empty() {
return Ok(Value::Null);
}
let joined_output = lines.join("\n");
Ok(Value::String(joined_output))
}.await {
Ok(a) => Ok((a, output)),
Err(e) => Err((e, output))
}
}));
let err_output = BufReader::new(
handle
.stderr
.take()
.ok_or_else(|| eyre!("Can't takeout std err"))
.with_kind(crate::ErrorKind::Docker)?,
);
let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
let lines = buf_reader_to_lines(err_output, 1000).await?;
let joined_output = lines.join("\n");
Ok::<_, Error>(joined_output)
}));
let res = tokio::select! {
res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?),
res = timeout_fut => {
res?;
Race::TimedOut
},
};
let exit_status = match res {
Race::Done(x) => x,
Race::TimedOut => {
if let Some(id) = id {
signal::kill(Pid::from_raw(id as i32), signal::SIGKILL)
.with_kind(crate::ErrorKind::Docker)?;
}
return Ok(Err((143, "Timed out. Retrying soon...".to_owned())));
}
};
Ok(
if exit_status.success() || exit_status.code() == Some(143) {
Ok(serde_json::from_value(
output
.await
.with_kind(crate::ErrorKind::Unknown)?
.map(|(v, _)| v)
.map_err(|(e, _)| tracing::warn!("{}", e))
.unwrap_or_default(),
)
.with_kind(crate::ErrorKind::Deserialization)?)
} else {
Err((
exit_status.code().unwrap_or_default(),
err_output.await.with_kind(crate::ErrorKind::Unknown)??,
))
},
)
}
#[instrument(skip_all)]
pub async fn sandboxed<I: Serialize, O: DeserializeOwned>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
input: Option<I>,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error> {
let mut cmd = tokio::process::Command::new(CONTAINER_TOOL);
cmd.arg("run").arg("--rm").arg("--network=none");
cmd.args(
self.docker_args(ctx, pkg_id, pkg_version, &volumes.to_readonly())
.await?,
);
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
cmd.stdin(std::process::Stdio::piped());
Some(format.to_vec(input)?)
} else {
None
};
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?;
if let (Some(input), Some(stdin)) = (&input_buf, &mut handle.stdin) {
use tokio::io::AsyncWriteExt;
stdin
.write_all(input)
.await
.with_kind(crate::ErrorKind::Docker)?;
}
let err_output = BufReader::new(
handle
.stderr
.take()
.ok_or_else(|| eyre!("Can't takeout std err"))
.with_kind(crate::ErrorKind::Docker)?,
);
let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
let lines = buf_reader_to_lines(err_output, 1000).await?;
let joined_output = lines.join("\n");
Ok::<_, Error>(joined_output)
}));
let io_format = self.io_format;
let mut output = BufReader::new(
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stdout in sandboxed"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
match async {
if let Some(format) = io_format {
return match max_by_lines(&mut output, None).await {
MaxByLines::Done(buffer) => {
Ok::<Value, Error>(
match format.from_slice(buffer.as_bytes()) {
Ok(a) => a,
Err(e) => {
tracing::trace!(
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
format,
e
);
Value::String(buffer)
}
},
)
},
MaxByLines::Error(e) => Err(e),
MaxByLines::Overflow(buffer) => Ok(Value::String(buffer))
}
}
let lines = buf_reader_to_lines(&mut output, 1000).await?;
if lines.is_empty() {
return Ok(Value::Null);
}
let joined_output = lines.join("\n");
Ok(Value::String(joined_output))
}.await {
Ok(a) => Ok((a, output)),
Err(e) => Err((e, output))
}
}));
let handle = if let Some(dur) = timeout {
async move {
tokio::time::timeout(dur, handle.wait())
.await
.with_kind(crate::ErrorKind::Docker)?
.with_kind(crate::ErrorKind::Docker)
}
.boxed()
} else {
async { handle.wait().await.with_kind(crate::ErrorKind::Docker) }.boxed()
};
let exit_status = handle.await?;
Ok(
if exit_status.success() || exit_status.code() == Some(143) {
Ok(serde_json::from_value(
output
.await
.with_kind(crate::ErrorKind::Unknown)?
.map(|(v, _)| v)
.map_err(|(e, _)| tracing::warn!("{}", e))
.unwrap_or_default(),
)
.with_kind(crate::ErrorKind::Deserialization)?)
} else {
Err((
exit_status.code().unwrap_or_default(),
err_output.await.with_kind(crate::ErrorKind::Unknown)??,
))
},
)
}
pub fn container_name(pkg_id: &PackageId, name: Option<&str>) -> String {
if let Some(name) = name {
format!("{}_{}.{}", pkg_id, name, NET_TLD)

View File

@@ -67,125 +67,6 @@ impl JsProcedure {
pub fn validate(&self, _volumes: &Volumes) -> Result<(), color_eyre::eyre::Report> {
Ok(())
}
#[instrument(skip_all)]
pub async fn execute<I: Serialize, O: DeserializeOwned>(
&self,
directory: &PathBuf,
pkg_id: &PackageId,
pkg_version: &Version,
name: ProcedureName,
volumes: &Volumes,
input: Option<I>,
timeout: Option<Duration>,
_gid: ProcessGroupId,
_rpc_client: Option<Arc<UnixRpcClient>>,
) -> Result<Result<O, (i32, String)>, Error> {
Command::new("start-deno")
.arg("execute")
.input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec(
&ExecuteArgs {
procedure: self.clone(),
directory: directory.clone(),
pkg_id: pkg_id.clone(),
pkg_version: pkg_version.clone(),
name,
volumes: volumes.clone(),
input: input.and_then(|x| serde_json::to_value(x).ok()),
},
)?)))
.timeout(timeout)
.invoke(ErrorKind::Javascript)
.await
.and_then(|res| IoFormat::Json.from_slice(&res))
}
#[instrument(skip_all)]
pub async fn sandboxed<I: Serialize, O: DeserializeOwned>(
&self,
directory: &PathBuf,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
input: Option<I>,
timeout: Option<Duration>,
name: ProcedureName,
) -> Result<Result<O, (i32, String)>, Error> {
Command::new("start-deno")
.arg("sandbox")
.input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec(
&ExecuteArgs {
procedure: self.clone(),
directory: directory.clone(),
pkg_id: pkg_id.clone(),
pkg_version: pkg_version.clone(),
name,
volumes: volumes.clone(),
input: input.and_then(|x| serde_json::to_value(x).ok()),
},
)?)))
.timeout(timeout)
.invoke(ErrorKind::Javascript)
.await
.and_then(|res| IoFormat::Json.from_slice(&res))
}
#[instrument(skip_all)]
pub async fn execute_impl<I: Serialize, O: DeserializeOwned>(
&self,
directory: &PathBuf,
pkg_id: &PackageId,
pkg_version: &Version,
name: ProcedureName,
volumes: &Volumes,
input: Option<I>,
) -> Result<Result<O, (i32, String)>, Error> {
let res = async move {
let running_action = JsExecutionEnvironment::load_from_package(
directory,
pkg_id,
pkg_version,
Box::new(volumes.clone()),
)
.await?
.run_action(name, input, self.args.clone());
let output: Option<ErrorValue> = running_action.await?;
let output: O = unwrap_known_error(output)?;
Ok(output)
}
.await
.map_err(|(error, message)| (error.as_code_num(), message));
Ok(res)
}
#[instrument(skip_all)]
pub async fn sandboxed_impl<I: Serialize, O: DeserializeOwned>(
&self,
directory: &PathBuf,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
input: Option<I>,
name: ProcedureName,
) -> Result<Result<O, (i32, String)>, Error> {
Ok(async move {
let running_action = JsExecutionEnvironment::load_from_package(
directory,
pkg_id,
pkg_version,
Box::new(volumes.clone()),
)
.await?
.read_only_effects()
.run_action(name, input, self.args.clone());
let output: Option<ErrorValue> = running_action.await?;
let output: O = unwrap_known_error(output)?;
Ok(output)
}
.await
.map_err(|(error, message)| (error.as_code_num(), message)))
}
}
fn unwrap_known_error<O: DeserializeOwned>(
@@ -211,568 +92,3 @@ fn unwrap_known_error<O: DeserializeOwned>(
},
}
}
#[tokio::test]
async fn js_action_execute() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::GetConfig;
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = Some(serde_json::json!({"test":123}));
let timeout = Some(Duration::from_secs(10));
let _output: crate::config::action::ConfigRes = js_action
.execute(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
assert_eq!(
&std::fs::read_to_string(
"test/js_action_execute/package-data/volumes/test-package/data/main/test.log"
)
.unwrap(),
"This is a test"
);
std::fs::remove_file(
"test/js_action_execute/package-data/volumes/test-package/data/main/test.log",
)
.unwrap();
}
#[tokio::test]
async fn js_action_execute_error() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::SetConfig;
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
let output: Result<serde_json::Value, _> = js_action
.execute(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap();
assert_eq!("Err((2, \"Not setup\"))", &format!("{:?}", output));
}
#[tokio::test]
async fn js_action_fetch() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("fetch".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_test_slow() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("slow".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
tracing::debug!("testing start");
tokio::select! {
a = js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
) => { a.unwrap().unwrap(); },
_ = tokio::time::sleep(Duration::from_secs(1)) => ()
}
tracing::debug!("testing end should");
tokio::time::sleep(Duration::from_secs(2)).await;
tracing::debug!("Done");
}
#[tokio::test]
async fn js_action_var_arg() {
let js_action = JsProcedure {
args: vec![42.into()],
};
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("js-action-var-arg".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_action_test_rename() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("test-rename".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_action_test_deep_dir() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("test-deep-dir".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_action_test_deep_dir_escape() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("test-deep-dir-escape".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_action_test_zero_dir() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("test-zero-dir".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_action_test_read_dir() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("test-read-dir".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_rsync() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("test-rsync".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_disk_usage() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("test-disk-usage".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
ProcessGroupId(0),
None,
)
.await
.unwrap()
.unwrap();
}

View File

@@ -69,51 +69,19 @@ impl PackageProcedure {
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error> {
tracing::trace!("Procedure execute {} {} - {:?}", self, pkg_id, name);
match self {
PackageProcedure::Docker(procedure) if procedure.inject == true => {
procedure
.inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout)
.await
}
PackageProcedure::Docker(procedure) => {
procedure
.execute(ctx, pkg_id, pkg_version, name, volumes, input, timeout)
.await
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(procedure) => {
let man = ctx
.managers
.get(&(pkg_id.clone(), pkg_version.clone()))
.await
.ok_or_else(|| {
Error::new(
eyre!("No manager found for {}", pkg_id),
ErrorKind::NotFound,
)
})?;
let rpc_client = man.rpc_client();
let gid = if matches!(name, ProcedureName::Main) {
man.gid.new_main_gid()
} else {
man.gid.new_gid()
};
procedure
.execute(
&ctx.datadir,
pkg_id,
pkg_version,
name,
volumes,
input,
timeout,
gid,
Some(rpc_client),
)
.await
}
}
let manager = ctx
.managers
.get(&(pkg_id.clone(), pkg_version.clone()))
.await
.ok_or_else(|| {
Error::new(
eyre!("No manager found for {}", pkg_id),
ErrorKind::NotFound,
)
})?;
manager
.execute(name, imbl_value::to_value(&input)?, timeout)
.await
}
#[instrument(skip_all)]
@@ -128,27 +96,19 @@ impl PackageProcedure {
name: ProcedureName,
) -> Result<Result<O, (i32, String)>, Error> {
tracing::trace!("Procedure sandboxed {} {} - {:?}", self, pkg_id, name);
match self {
PackageProcedure::Docker(procedure) => {
procedure
.sandboxed(ctx, pkg_id, pkg_version, volumes, input, timeout)
.await
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(procedure) => {
procedure
.sandboxed(
&ctx.datadir,
pkg_id,
pkg_version,
volumes,
input,
timeout,
name,
)
.await
}
}
let manager = ctx
.managers
.get(&(pkg_id.clone(), pkg_version.clone()))
.await
.ok_or_else(|| {
Error::new(
eyre!("No manager found for {}", pkg_id),
ErrorKind::NotFound,
)
})?;
manager
.sanboxed(name, imbl_value::to_value(&input)?, timeout)
.await
}
}