From 94d22ed1aa1ccb29b3c921d53b64a9eb244215aa Mon Sep 17 00:00:00 2001 From: J H <2364004+Blu-J@users.noreply.github.com> Date: Fri, 10 Nov 2023 09:26:00 -0700 Subject: [PATCH] chore: Remove the other procedures since all are now via the js --- backend/src/bins/start_deno.rs | 14 +- backend/src/manager/mod.rs | 66 +- backend/src/manager/persistent_container.rs | 9 + backend/src/procedure/docker.rs | 437 ------------- backend/src/procedure/js_scripts.rs | 684 -------------------- backend/src/procedure/mod.rs | 92 +-- 6 files changed, 77 insertions(+), 1225 deletions(-) diff --git a/backend/src/bins/start_deno.rs b/backend/src/bins/start_deno.rs index 0be507082..870821734 100644 --- a/backend/src/bins/start_deno.rs +++ b/backend/src/bins/start_deno.rs @@ -34,9 +34,10 @@ async fn execute( input, } = arg; PackageLogger::init(&pkg_id); - procedure - .execute_impl(&directory, &pkg_id, &pkg_version, name, &volumes, input) - .await + // procedure + // .execute_impl(&directory, &pkg_id, &pkg_version, name, &volumes, input) + // .await + todo!("@DRB Remove") } #[command(cli_only, display(display_serializable))] async fn sandbox( @@ -52,9 +53,10 @@ async fn sandbox( input, } = arg; PackageLogger::init(&pkg_id); - procedure - .sandboxed_impl(&directory, &pkg_id, &pkg_version, &volumes, input, name) - .await + // procedure + // .sandboxed_impl(&directory, &pkg_id, &pkg_version, &volumes, input, name) + // .await + todo!("@DRB Remove") } use tracing::Subscriber; diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index b34e29efb..56fa408fd 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -13,6 +13,7 @@ use models::{ErrorKind, OptionExt, PackageId}; use nix::sys::signal::Signal; use persistent_container::PersistentContainer; use rand::SeedableRng; +use serde::de::DeserializeOwned; use sqlx::Connection; use start_stop::StartStop; use tokio::sync::watch::{self, Sender}; @@ -328,6 +329,38 @@ impl Manager { let transition = self.transition.borrow(); matches!(*transition, TransitionState::BackingUp(_)) } + + pub async fn execute( + &self, + name: ProcedureName, + input: Value, + timeout: Option, + ) -> Result, Error> + where + O: DeserializeOwned, + { + self.persistent_container + .execute(name, input, timeout) + .await + } + + pub async fn sanboxed( + &self, + name: ProcedureName, + input: Value, + timeout: Option, + ) -> Result, Error> + where + O: DeserializeOwned, + { + self.persistent_container + .sanboxed(name, input, timeout) + .await + } + + pub async fn send_signal(&self, gid: Arc, signal: Signal) -> Result<(), Error> { + self.persistent_container.send_signal(gid, signal).await + } } #[instrument(skip_all)] @@ -817,36 +850,5 @@ async fn get_running_ip(seed: &ManagerSeed, mut runtime: &mut RuntimeOfCommand) } async fn send_signal(manager: &Manager, gid: Arc, signal: Signal) -> Result<(), Error> { - // stop health checks from committing their results - // shared - // .commit_health_check_results - // .store(false, Ordering::SeqCst); - - let rpc_client = manager.rpc_client(); - - let main_gid = *gid.main_gid.0.borrow(); - let next_gid = gid.new_gid(); - #[cfg(feature = "js_engine")] - if let Err(e) = crate::procedure::js_scripts::JsProcedure::default() - .execute::<_, NoOutput>( - &manager.seed.ctx.datadir, - &manager.seed.manifest.id, - &manager.seed.manifest.version, - ProcedureName::Signal, - &manager.seed.manifest.volumes, - Some(embassy_container_init::SignalGroupParams { - gid: main_gid, - signal: signal as u32, - }), - None, // TODO - next_gid, - Some(rpc_client), - ) - .await? - { - tracing::error!("Failed to send js signal: {}", e.1); - tracing::debug!("{:?}", e); - } - - Ok(()) + manager.send_signal(gid, signal).await } diff --git a/backend/src/manager/persistent_container.rs b/backend/src/manager/persistent_container.rs index 0753a9bb4..e6380bf49 100644 --- a/backend/src/manager/persistent_container.rs +++ b/backend/src/manager/persistent_container.rs @@ -4,6 +4,7 @@ use std::time::Duration; use color_eyre::eyre::eyre; use helpers::UnixRpcClient; use models::ProcedureName; +use nix::sys::signal::Signal; use serde::de::DeserializeOwned; use tokio::sync::watch::{self, Receiver}; use tokio::sync::{oneshot, Mutex}; @@ -30,6 +31,9 @@ pub struct PersistentContainer { procedures: Mutex>, } +// BLUJ TODO Need to get the only action is this and not procedure/ +// BLUJ Modify the rpc client to match the new type + impl PersistentContainer { #[instrument(skip_all)] pub async fn init(seed: &Arc) -> Result { @@ -71,6 +75,7 @@ impl PersistentContainer { Err(e) => Err(e), } } + pub async fn sanboxed( &self, name: ProcedureName, @@ -119,6 +124,10 @@ impl PersistentContainer { ) -> Result, Error> { todo!("DRB") } + + pub async fn send_signal(&self, gid: Arc, signal: Signal) -> Result<(), Error> { + todo!("DRB") + } } pub async fn spawn_persistent_container( diff --git a/backend/src/procedure/docker.rs b/backend/src/procedure/docker.rs index 57207d5c9..c6912c97f 100644 --- a/backend/src/procedure/docker.rs +++ b/backend/src/procedure/docker.rs @@ -217,443 +217,6 @@ impl DockerProcedure { Ok(()) } - #[instrument(skip_all)] - pub async fn execute( - &self, - ctx: &RpcContext, - pkg_id: &PackageId, - pkg_version: &Version, - name: ProcedureName, - volumes: &Volumes, - input: Option, - timeout: Option, - ) -> Result, Error> { - let name = name.docker_name(); - let name: Option<&str> = name.as_deref(); - let mut cmd = tokio::process::Command::new(CONTAINER_TOOL); - let container_name = Self::container_name(pkg_id, name); - cmd.arg("run") - .arg("--rm") - .arg("--network=start9") - .arg(format!("--add-host=embassy:{}", Ipv4Addr::from(HOST_IP))) - .arg("--name") - .arg(&container_name) - .arg(format!("--hostname={}", &container_name)) - .arg("--no-healthcheck") - .kill_on_drop(true); - remove_container(&container_name, true).await?; - cmd.args(self.docker_args(ctx, pkg_id, pkg_version, volumes).await?); - let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { - cmd.stdin(std::process::Stdio::piped()); - Some(format.to_vec(input)?) - } else { - None - }; - cmd.stdout(std::process::Stdio::piped()); - cmd.stderr(std::process::Stdio::piped()); - tracing::trace!( - "{}", - format!("{:?}", cmd) - .split(r#"" ""#) - .collect::>() - .join(" ") - ); - let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; - let id = handle.id(); - let timeout_fut = if let Some(timeout) = timeout { - EitherFuture::Right(async move { - tokio::time::sleep(timeout).await; - - Ok(()) - }) - } else { - EitherFuture::Left(futures::future::pending::>()) - }; - if let (Some(input), Some(mut stdin)) = (&input_buf, handle.stdin.take()) { - use tokio::io::AsyncWriteExt; - stdin - .write_all(input) - .await - .with_kind(crate::ErrorKind::Docker)?; - stdin.flush().await?; - stdin.shutdown().await?; - drop(stdin); - } - enum Race { - Done(T), - TimedOut, - } - - let io_format = self.io_format; - let mut output = BufReader::new( - handle - .stdout - .take() - .ok_or_else(|| eyre!("Can't takeout stdout in execute")) - .with_kind(crate::ErrorKind::Docker)?, - ); - let output = NonDetachingJoinHandle::from(tokio::spawn(async move { - match async { - if let Some(format) = io_format { - return match max_by_lines(&mut output, None).await { - MaxByLines::Done(buffer) => { - Ok::( - match format.from_slice(buffer.as_bytes()) { - Ok(a) => a, - Err(e) => { - tracing::trace!( - "Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.", - format, - e - ); - Value::String(buffer) - } - }, - ) - }, - MaxByLines::Error(e) => Err(e), - MaxByLines::Overflow(buffer) => Ok(Value::String(buffer)) - } - } - - let lines = buf_reader_to_lines(&mut output, 1000).await?; - if lines.is_empty() { - return Ok(Value::Null); - } - - let joined_output = lines.join("\n"); - Ok(Value::String(joined_output)) - }.await { - Ok(a) => Ok((a, output)), - Err(e) => Err((e, output)) - } - })); - let err_output = BufReader::new( - handle - .stderr - .take() - .ok_or_else(|| eyre!("Can't takeout std err")) - .with_kind(crate::ErrorKind::Docker)?, - ); - - let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move { - let lines = buf_reader_to_lines(err_output, 1000).await?; - let joined_output = lines.join("\n"); - Ok::<_, Error>(joined_output) - })); - - let res = tokio::select! { - res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?), - res = timeout_fut => { - res?; - Race::TimedOut - }, - }; - let exit_status = match res { - Race::Done(x) => x, - Race::TimedOut => { - if let Some(id) = id { - signal::kill(Pid::from_raw(id as i32), signal::SIGKILL) - .with_kind(crate::ErrorKind::Docker)?; - } - return Ok(Err((143, "Timed out. Retrying soon...".to_owned()))); - } - }; - Ok( - if exit_status.success() || exit_status.code() == Some(143) { - Ok(serde_json::from_value( - output - .await - .with_kind(crate::ErrorKind::Unknown)? - .map(|(v, _)| v) - .map_err(|(e, _)| tracing::warn!("{}", e)) - .unwrap_or_default(), - ) - .with_kind(crate::ErrorKind::Deserialization)?) - } else { - Err(( - exit_status.code().unwrap_or_default(), - err_output.await.with_kind(crate::ErrorKind::Unknown)??, - )) - }, - ) - } - - #[instrument(skip_all)] - pub async fn inject( - &self, - _ctx: &RpcContext, - pkg_id: &PackageId, - _pkg_version: &Version, - _name: ProcedureName, - _volumes: &Volumes, - input: Option, - timeout: Option, - ) -> Result, Error> { - let mut cmd = tokio::process::Command::new(CONTAINER_TOOL); - - cmd.arg("exec"); - - cmd.args(self.docker_args_inject(pkg_id)); - let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { - cmd.stdin(std::process::Stdio::piped()); - Some(format.to_vec(input)?) - } else { - None - }; - cmd.stdout(std::process::Stdio::piped()); - cmd.stderr(std::process::Stdio::piped()); - tracing::trace!( - "{}", - format!("{:?}", cmd) - .split(r#"" ""#) - .collect::>() - .join(" ") - ); - let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; - let id = handle.id(); - let timeout_fut = if let Some(timeout) = timeout { - EitherFuture::Right(async move { - tokio::time::sleep(timeout).await; - - Ok(()) - }) - } else { - EitherFuture::Left(futures::future::pending::>()) - }; - if let (Some(input), Some(mut stdin)) = (&input_buf, handle.stdin.take()) { - use tokio::io::AsyncWriteExt; - stdin - .write_all(input) - .await - .with_kind(crate::ErrorKind::Docker)?; - stdin.flush().await?; - stdin.shutdown().await?; - drop(stdin); - } - enum Race { - Done(T), - TimedOut, - } - - let io_format = self.io_format; - let mut output = BufReader::new( - handle - .stdout - .take() - .ok_or_else(|| eyre!("Can't takeout stdout in inject")) - .with_kind(crate::ErrorKind::Docker)?, - ); - let output = NonDetachingJoinHandle::from(tokio::spawn(async move { - match async { - if let Some(format) = io_format { - return match max_by_lines(&mut output, None).await { - MaxByLines::Done(buffer) => { - Ok::( - match format.from_slice(buffer.as_bytes()) { - Ok(a) => a, - Err(e) => { - tracing::trace!( - "Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.", - format, - e - ); - Value::String(buffer) - } - }, - ) - }, - MaxByLines::Error(e) => Err(e), - MaxByLines::Overflow(buffer) => Ok(Value::String(buffer)) - } - } - - let lines = buf_reader_to_lines(&mut output, 1000).await?; - if lines.is_empty() { - return Ok(Value::Null); - } - - let joined_output = lines.join("\n"); - Ok(Value::String(joined_output)) - }.await { - Ok(a) => Ok((a, output)), - Err(e) => Err((e, output)) - } - })); - let err_output = BufReader::new( - handle - .stderr - .take() - .ok_or_else(|| eyre!("Can't takeout std err")) - .with_kind(crate::ErrorKind::Docker)?, - ); - - let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move { - let lines = buf_reader_to_lines(err_output, 1000).await?; - let joined_output = lines.join("\n"); - Ok::<_, Error>(joined_output) - })); - - let res = tokio::select! { - res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?), - res = timeout_fut => { - res?; - Race::TimedOut - }, - }; - let exit_status = match res { - Race::Done(x) => x, - Race::TimedOut => { - if let Some(id) = id { - signal::kill(Pid::from_raw(id as i32), signal::SIGKILL) - .with_kind(crate::ErrorKind::Docker)?; - } - return Ok(Err((143, "Timed out. Retrying soon...".to_owned()))); - } - }; - Ok( - if exit_status.success() || exit_status.code() == Some(143) { - Ok(serde_json::from_value( - output - .await - .with_kind(crate::ErrorKind::Unknown)? - .map(|(v, _)| v) - .map_err(|(e, _)| tracing::warn!("{}", e)) - .unwrap_or_default(), - ) - .with_kind(crate::ErrorKind::Deserialization)?) - } else { - Err(( - exit_status.code().unwrap_or_default(), - err_output.await.with_kind(crate::ErrorKind::Unknown)??, - )) - }, - ) - } - - #[instrument(skip_all)] - pub async fn sandboxed( - &self, - ctx: &RpcContext, - pkg_id: &PackageId, - pkg_version: &Version, - volumes: &Volumes, - input: Option, - timeout: Option, - ) -> Result, Error> { - let mut cmd = tokio::process::Command::new(CONTAINER_TOOL); - cmd.arg("run").arg("--rm").arg("--network=none"); - cmd.args( - self.docker_args(ctx, pkg_id, pkg_version, &volumes.to_readonly()) - .await?, - ); - let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { - cmd.stdin(std::process::Stdio::piped()); - Some(format.to_vec(input)?) - } else { - None - }; - cmd.stdout(std::process::Stdio::piped()); - cmd.stderr(std::process::Stdio::piped()); - let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; - if let (Some(input), Some(stdin)) = (&input_buf, &mut handle.stdin) { - use tokio::io::AsyncWriteExt; - stdin - .write_all(input) - .await - .with_kind(crate::ErrorKind::Docker)?; - } - - let err_output = BufReader::new( - handle - .stderr - .take() - .ok_or_else(|| eyre!("Can't takeout std err")) - .with_kind(crate::ErrorKind::Docker)?, - ); - let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move { - let lines = buf_reader_to_lines(err_output, 1000).await?; - let joined_output = lines.join("\n"); - Ok::<_, Error>(joined_output) - })); - - let io_format = self.io_format; - let mut output = BufReader::new( - handle - .stdout - .take() - .ok_or_else(|| eyre!("Can't takeout stdout in sandboxed")) - .with_kind(crate::ErrorKind::Docker)?, - ); - let output = NonDetachingJoinHandle::from(tokio::spawn(async move { - match async { - if let Some(format) = io_format { - return match max_by_lines(&mut output, None).await { - MaxByLines::Done(buffer) => { - Ok::( - match format.from_slice(buffer.as_bytes()) { - Ok(a) => a, - Err(e) => { - tracing::trace!( - "Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.", - format, - e - ); - Value::String(buffer) - } - }, - ) - }, - MaxByLines::Error(e) => Err(e), - MaxByLines::Overflow(buffer) => Ok(Value::String(buffer)) - } - } - - let lines = buf_reader_to_lines(&mut output, 1000).await?; - if lines.is_empty() { - return Ok(Value::Null); - } - - let joined_output = lines.join("\n"); - Ok(Value::String(joined_output)) - }.await { - Ok(a) => Ok((a, output)), - Err(e) => Err((e, output)) - } - })); - - let handle = if let Some(dur) = timeout { - async move { - tokio::time::timeout(dur, handle.wait()) - .await - .with_kind(crate::ErrorKind::Docker)? - .with_kind(crate::ErrorKind::Docker) - } - .boxed() - } else { - async { handle.wait().await.with_kind(crate::ErrorKind::Docker) }.boxed() - }; - let exit_status = handle.await?; - Ok( - if exit_status.success() || exit_status.code() == Some(143) { - Ok(serde_json::from_value( - output - .await - .with_kind(crate::ErrorKind::Unknown)? - .map(|(v, _)| v) - .map_err(|(e, _)| tracing::warn!("{}", e)) - .unwrap_or_default(), - ) - .with_kind(crate::ErrorKind::Deserialization)?) - } else { - Err(( - exit_status.code().unwrap_or_default(), - err_output.await.with_kind(crate::ErrorKind::Unknown)??, - )) - }, - ) - } - pub fn container_name(pkg_id: &PackageId, name: Option<&str>) -> String { if let Some(name) = name { format!("{}_{}.{}", pkg_id, name, NET_TLD) diff --git a/backend/src/procedure/js_scripts.rs b/backend/src/procedure/js_scripts.rs index 27756b4a3..c27fae29b 100644 --- a/backend/src/procedure/js_scripts.rs +++ b/backend/src/procedure/js_scripts.rs @@ -67,125 +67,6 @@ impl JsProcedure { pub fn validate(&self, _volumes: &Volumes) -> Result<(), color_eyre::eyre::Report> { Ok(()) } - - #[instrument(skip_all)] - pub async fn execute( - &self, - directory: &PathBuf, - pkg_id: &PackageId, - pkg_version: &Version, - name: ProcedureName, - volumes: &Volumes, - input: Option, - timeout: Option, - _gid: ProcessGroupId, - _rpc_client: Option>, - ) -> Result, Error> { - Command::new("start-deno") - .arg("execute") - .input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec( - &ExecuteArgs { - procedure: self.clone(), - directory: directory.clone(), - pkg_id: pkg_id.clone(), - pkg_version: pkg_version.clone(), - name, - volumes: volumes.clone(), - input: input.and_then(|x| serde_json::to_value(x).ok()), - }, - )?))) - .timeout(timeout) - .invoke(ErrorKind::Javascript) - .await - .and_then(|res| IoFormat::Json.from_slice(&res)) - } - - #[instrument(skip_all)] - pub async fn sandboxed( - &self, - directory: &PathBuf, - pkg_id: &PackageId, - pkg_version: &Version, - volumes: &Volumes, - input: Option, - timeout: Option, - name: ProcedureName, - ) -> Result, Error> { - Command::new("start-deno") - .arg("sandbox") - .input(Some(&mut std::io::Cursor::new(IoFormat::Json.to_vec( - &ExecuteArgs { - procedure: self.clone(), - directory: directory.clone(), - pkg_id: pkg_id.clone(), - pkg_version: pkg_version.clone(), - name, - volumes: volumes.clone(), - input: input.and_then(|x| serde_json::to_value(x).ok()), - }, - )?))) - .timeout(timeout) - .invoke(ErrorKind::Javascript) - .await - .and_then(|res| IoFormat::Json.from_slice(&res)) - } - - #[instrument(skip_all)] - pub async fn execute_impl( - &self, - directory: &PathBuf, - pkg_id: &PackageId, - pkg_version: &Version, - name: ProcedureName, - volumes: &Volumes, - input: Option, - ) -> Result, Error> { - let res = async move { - let running_action = JsExecutionEnvironment::load_from_package( - directory, - pkg_id, - pkg_version, - Box::new(volumes.clone()), - ) - .await? - .run_action(name, input, self.args.clone()); - let output: Option = running_action.await?; - let output: O = unwrap_known_error(output)?; - Ok(output) - } - .await - .map_err(|(error, message)| (error.as_code_num(), message)); - - Ok(res) - } - - #[instrument(skip_all)] - pub async fn sandboxed_impl( - &self, - directory: &PathBuf, - pkg_id: &PackageId, - pkg_version: &Version, - volumes: &Volumes, - input: Option, - name: ProcedureName, - ) -> Result, Error> { - Ok(async move { - let running_action = JsExecutionEnvironment::load_from_package( - directory, - pkg_id, - pkg_version, - Box::new(volumes.clone()), - ) - .await? - .read_only_effects() - .run_action(name, input, self.args.clone()); - let output: Option = running_action.await?; - let output: O = unwrap_known_error(output)?; - Ok(output) - } - .await - .map_err(|(error, message)| (error.as_code_num(), message))) - } } fn unwrap_known_error( @@ -211,568 +92,3 @@ fn unwrap_known_error( }, } } - -#[tokio::test] -async fn js_action_execute() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::GetConfig; - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = Some(serde_json::json!({"test":123})); - let timeout = Some(Duration::from_secs(10)); - let _output: crate::config::action::ConfigRes = js_action - .execute( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); - assert_eq!( - &std::fs::read_to_string( - "test/js_action_execute/package-data/volumes/test-package/data/main/test.log" - ) - .unwrap(), - "This is a test" - ); - std::fs::remove_file( - "test/js_action_execute/package-data/volumes/test-package/data/main/test.log", - ) - .unwrap(); -} - -#[tokio::test] -async fn js_action_execute_error() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::SetConfig; - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - let output: Result = js_action - .execute( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap(); - assert_eq!("Err((2, \"Not setup\"))", &format!("{:?}", output)); -} - -#[tokio::test] -async fn js_action_fetch() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("fetch".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} - -#[tokio::test] -async fn js_test_slow() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("slow".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - tracing::debug!("testing start"); - tokio::select! { - a = js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) => { a.unwrap().unwrap(); }, - _ = tokio::time::sleep(Duration::from_secs(1)) => () - } - tracing::debug!("testing end should"); - tokio::time::sleep(Duration::from_secs(2)).await; - tracing::debug!("Done"); -} -#[tokio::test] -async fn js_action_var_arg() { - let js_action = JsProcedure { - args: vec![42.into()], - }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("js-action-var-arg".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} - -#[tokio::test] -async fn js_action_test_rename() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("test-rename".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} - -#[tokio::test] -async fn js_action_test_deep_dir() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("test-deep-dir".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} -#[tokio::test] -async fn js_action_test_deep_dir_escape() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("test-deep-dir-escape".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} -#[tokio::test] -async fn js_action_test_zero_dir() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("test-zero-dir".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} -#[tokio::test] -async fn js_action_test_read_dir() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("test-read-dir".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} - -#[tokio::test] -async fn js_rsync() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("test-rsync".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} - -#[tokio::test] -async fn js_disk_usage() { - let js_action = JsProcedure { args: vec![] }; - let path: PathBuf = "test/js_action_execute/" - .parse::() - .unwrap() - .canonicalize() - .unwrap(); - let package_id = "test-package".parse().unwrap(); - let package_version: Version = "0.3.0.3".parse().unwrap(); - let name = ProcedureName::Action("test-disk-usage".parse().unwrap()); - let volumes: Volumes = serde_json::from_value(serde_json::json!({ - "main": { - "type": "data" - }, - "compat": { - "type": "assets" - }, - "filebrowser" :{ - "package-id": "filebrowser", - "path": "data", - "readonly": true, - "type": "pointer", - "volume-id": "main", - } - })) - .unwrap(); - let input: Option = None; - let timeout = Some(Duration::from_secs(10)); - js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - ProcessGroupId(0), - None, - ) - .await - .unwrap() - .unwrap(); -} diff --git a/backend/src/procedure/mod.rs b/backend/src/procedure/mod.rs index 7c02a03d5..68b295af9 100644 --- a/backend/src/procedure/mod.rs +++ b/backend/src/procedure/mod.rs @@ -69,51 +69,19 @@ impl PackageProcedure { timeout: Option, ) -> Result, Error> { tracing::trace!("Procedure execute {} {} - {:?}", self, pkg_id, name); - match self { - PackageProcedure::Docker(procedure) if procedure.inject == true => { - procedure - .inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout) - .await - } - PackageProcedure::Docker(procedure) => { - procedure - .execute(ctx, pkg_id, pkg_version, name, volumes, input, timeout) - .await - } - #[cfg(feature = "js_engine")] - PackageProcedure::Script(procedure) => { - let man = ctx - .managers - .get(&(pkg_id.clone(), pkg_version.clone())) - .await - .ok_or_else(|| { - Error::new( - eyre!("No manager found for {}", pkg_id), - ErrorKind::NotFound, - ) - })?; - let rpc_client = man.rpc_client(); - let gid = if matches!(name, ProcedureName::Main) { - man.gid.new_main_gid() - } else { - man.gid.new_gid() - }; - - procedure - .execute( - &ctx.datadir, - pkg_id, - pkg_version, - name, - volumes, - input, - timeout, - gid, - Some(rpc_client), - ) - .await - } - } + let manager = ctx + .managers + .get(&(pkg_id.clone(), pkg_version.clone())) + .await + .ok_or_else(|| { + Error::new( + eyre!("No manager found for {}", pkg_id), + ErrorKind::NotFound, + ) + })?; + manager + .execute(name, imbl_value::to_value(&input)?, timeout) + .await } #[instrument(skip_all)] @@ -128,27 +96,19 @@ impl PackageProcedure { name: ProcedureName, ) -> Result, Error> { tracing::trace!("Procedure sandboxed {} {} - {:?}", self, pkg_id, name); - match self { - PackageProcedure::Docker(procedure) => { - procedure - .sandboxed(ctx, pkg_id, pkg_version, volumes, input, timeout) - .await - } - #[cfg(feature = "js_engine")] - PackageProcedure::Script(procedure) => { - procedure - .sandboxed( - &ctx.datadir, - pkg_id, - pkg_version, - volumes, - input, - timeout, - name, - ) - .await - } - } + let manager = ctx + .managers + .get(&(pkg_id.clone(), pkg_version.clone())) + .await + .ok_or_else(|| { + Error::new( + eyre!("No manager found for {}", pkg_id), + ErrorKind::NotFound, + ) + })?; + manager + .sanboxed(name, imbl_value::to_value(&input)?, timeout) + .await } }