diff --git a/Makefile b/Makefile index 305519239..3b4aca221 100644 --- a/Makefile +++ b/Makefile @@ -85,6 +85,7 @@ install: $(ALL_TARGETS) $(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/startd) $(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/start-cli) $(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/start-sdk) + $(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/start-deno) $(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/avahi-alias) $(call ln,/usr/bin/startbox,$(DESTDIR)/usr/bin/embassy-cli) if [ "$(OS_ARCH)" = "raspberrypi" ]; then $(call cp,cargo-deps/aarch64-unknown-linux-gnu/release/pi-beep,$(DESTDIR)/usr/bin/pi-beep); fi diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 8b7eeebac..f312cdb74 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -4886,6 +4886,7 @@ dependencies = [ "tracing", "tracing-error", "tracing-futures", + "tracing-journald", "tracing-subscriber", "trust-dns-server", "typed-builder", @@ -5848,6 +5849,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-journald" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba316a74e8fc3c3896a850dba2375928a9fa171b085ecddfc7c054d39970f3fd" +dependencies = [ + "libc", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "tracing-log" version = "0.1.3" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index a1fc3914d..665e0c651 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -153,6 +153,7 @@ torut = "0.2.1" tracing = "0.1.35" tracing-error = "0.2.0" tracing-futures = "0.2.5" +tracing-journald = "0.3.0" tracing-subscriber = { version = "0.3.14", features = ["env-filter"] } trust-dns-server = "0.22.0" typed-builder = "0.10.0" diff --git a/backend/src/bins/mod.rs b/backend/src/bins/mod.rs index 76329e094..e131d22c4 100644 --- a/backend/src/bins/mod.rs +++ b/backend/src/bins/mod.rs @@ -5,6 +5,8 @@ pub mod avahi_alias; pub mod deprecated; #[cfg(feature = "cli")] pub mod start_cli; +#[cfg(feature = "js_engine")] +pub mod start_deno; #[cfg(feature = "daemon")] pub mod start_init; #[cfg(feature = "sdk")] @@ -16,6 +18,8 @@ fn select_executable(name: &str) -> Option { match name { #[cfg(feature = "avahi-alias")] "avahi-alias" => Some(avahi_alias::main), + #[cfg(feature = "js_engine")] + "start-deno" => Some(start_deno::main), #[cfg(feature = "cli")] "start-cli" => Some(start_cli::main), #[cfg(feature = "sdk")] diff --git a/backend/src/bins/start_deno.rs b/backend/src/bins/start_deno.rs new file mode 100644 index 000000000..1a68decfc --- /dev/null +++ b/backend/src/bins/start_deno.rs @@ -0,0 +1,145 @@ +use clap::Arg; +use rpc_toolkit::command; +use rpc_toolkit::run_cli; +use rpc_toolkit::yajrc::RpcError; +use serde_json::Value; + +use crate::context::CliContext; +use crate::procedure::js_scripts::ExecuteArgs; +use crate::s9pk::manifest::PackageId; +use crate::util::logger::EmbassyLogger; +use crate::util::serde::{display_serializable, parse_stdin_deserializable}; +use crate::version::{Current, VersionT}; +use crate::Error; + +lazy_static::lazy_static! { + static ref VERSION_STRING: String = Current::new().semver().to_string(); +} + +#[command(subcommands(execute, sandbox))] +fn deno_api() -> Result<(), Error> { + Ok(()) +} + +#[command(cli_only, display(display_serializable))] +async fn execute( + #[arg(stdin, parse(parse_stdin_deserializable))] arg: ExecuteArgs, +) -> Result, Error> { + let ExecuteArgs { + procedure, + directory, + pkg_id, + pkg_version, + name, + volumes, + input, + } = arg; + PackageLogger::init(&pkg_id); + procedure + .execute_impl(&directory, &pkg_id, &pkg_version, name, &volumes, input) + .await +} +#[command(cli_only, display(display_serializable))] +async fn sandbox( + #[arg(stdin, parse(parse_stdin_deserializable))] arg: ExecuteArgs, +) -> Result, Error> { + let ExecuteArgs { + procedure, + directory, + pkg_id, + pkg_version, + name, + volumes, + input, + } = arg; + PackageLogger::init(&pkg_id); + procedure + .sandboxed_impl(&directory, &pkg_id, &pkg_version, &volumes, input, name) + .await +} + +use tracing::Subscriber; +use tracing_subscriber::util::SubscriberInitExt; + +#[derive(Clone)] +struct PackageLogger {} + +impl PackageLogger { + fn base_subscriber(id: &PackageId) -> impl Subscriber { + use tracing_error::ErrorLayer; + use tracing_subscriber::prelude::*; + use tracing_subscriber::{fmt, EnvFilter}; + + let filter_layer = EnvFilter::builder() + .with_default_directive( + format!("{}=info", std::module_path!().split("::").next().unwrap()) + .parse() + .unwrap(), + ) + .from_env_lossy(); + let fmt_layer = fmt::layer().with_writer(std::io::stderr).with_target(true); + let journald_layer = tracing_journald::layer() + .unwrap() + .with_syslog_identifier(format!("{id}.embassy")); + + let sub = tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .with(journald_layer) + .with(ErrorLayer::default()); + + sub + } + pub fn init(id: &PackageId) -> Self { + Self::base_subscriber(id).init(); + color_eyre::install().unwrap_or_else(|_| tracing::warn!("tracing too many times")); + + Self {} + } +} + +fn inner_main() -> Result<(), Error> { + run_cli!({ + command: deno_api, + app: app => app + .name("StartOS Deno Executor") + .version(&**VERSION_STRING) + .arg( + clap::Arg::with_name("config") + .short('c') + .long("config") + .takes_value(true), + ), + context: matches => { + CliContext::init(matches)? + }, + exit: |e: RpcError| { + match e.data { + Some(Value::String(s)) => eprintln!("{}: {}", e.message, s), + Some(Value::Object(o)) => if let Some(Value::String(s)) = o.get("details") { + eprintln!("{}: {}", e.message, s); + if let Some(Value::String(s)) = o.get("debug") { + tracing::debug!("{}", s) + } + } + Some(a) => eprintln!("{}: {}", e.message, a), + None => eprintln!("{}", e.message), + } + + std::process::exit(e.code); + } + }); + Ok(()) +} + +pub fn main() { + match inner_main() { + Ok(_) => (), + Err(e) => { + eprintln!("{}", e.source); + tracing::debug!("{:?}", e.source); + drop(e.source); + std::process::exit(e.kind as i32) + } + } +} diff --git a/backend/src/config/mod.rs b/backend/src/config/mod.rs index 3674142ab..8224860a5 100644 --- a/backend/src/config/mod.rs +++ b/backend/src/config/mod.rs @@ -14,9 +14,9 @@ use rpc_toolkit::command; use tracing::instrument; use crate::context::RpcContext; -use crate::db::model::CurrentDependencies; + use crate::prelude::*; -use crate::s9pk::manifest::{Manifest, PackageId}; +use crate::s9pk::manifest::{PackageId}; use crate::util::display_none; use crate::util::serde::{display_serializable, parse_stdin_deserializable, IoFormat}; use crate::Error; diff --git a/backend/src/logs.rs b/backend/src/logs.rs index 5725a3b72..baacb2845 100644 --- a/backend/src/logs.rs +++ b/backend/src/logs.rs @@ -374,6 +374,12 @@ pub async fn journalctl( cmd.arg(format!("_COMM={}", SYSTEM_UNIT)); } LogSource::Container(id) => { + #[cfg(feature = "podman")] + cmd.arg(format!( + "SYSLOG_IDENTIFIER={}", + DockerProcedure::container_name(&id, None) + )); + #[cfg(not(feature = "podman"))] cmd.arg(format!( "CONTAINER_NAME={}", DockerProcedure::container_name(&id, None) diff --git a/backend/src/middleware/db.rs b/backend/src/middleware/db.rs index 5eb0b482c..c3ceadda6 100644 --- a/backend/src/middleware/db.rs +++ b/backend/src/middleware/db.rs @@ -18,7 +18,7 @@ pub fn db(ctx: RpcContext) -> DynMiddleware { -> BoxFuture>, HttpError>> { let ctx = ctx.clone(); async move { - let m2: DynMiddlewareStage2 = Box::new(move |req, rpc_req| { + let m2: DynMiddlewareStage2 = Box::new(move |_req, rpc_req| { async move { let sync_db = metadata .get(rpc_req.method.as_str(), "sync_db") diff --git a/backend/src/net/ssl.rs b/backend/src/net/ssl.rs index bbce1debd..c2cab3355 100644 --- a/backend/src/net/ssl.rs +++ b/backend/src/net/ssl.rs @@ -4,7 +4,7 @@ use std::net::IpAddr; use std::path::Path; use std::time::{SystemTime, UNIX_EPOCH}; -use chrono::format; + use futures::FutureExt; use openssl::asn1::{Asn1Integer, Asn1Time}; use openssl::bn::{BigNum, MsbOption}; @@ -19,11 +19,11 @@ use tokio::sync::{Mutex, RwLock}; use tracing::instrument; use crate::account::AccountInfo; -use crate::context::{self, RpcContext}; +use crate::context::{RpcContext}; use crate::hostname::Hostname; use crate::net::dhcp::ips; use crate::net::keys::{Key, KeyInfo}; -use crate::prelude::*; + use crate::{Error, ErrorKind, ResultExt}; static CERTIFICATE_VERSION: i32 = 2; // X509 version 3 is actually encoded as '2' in the cert because fuck you. diff --git a/backend/src/procedure/js_scripts.rs b/backend/src/procedure/js_scripts.rs index 3494806b3..35d85c32e 100644 --- a/backend/src/procedure/js_scripts.rs +++ b/backend/src/procedure/js_scripts.rs @@ -1,23 +1,27 @@ -use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use std::{ + path::{Path, PathBuf}, + process::Stdio, +}; use color_eyre::eyre::eyre; -use embassy_container_init::{ProcessGroupId, SignalGroup, SignalGroupParams}; +use embassy_container_init::ProcessGroupId; use helpers::UnixRpcClient; pub use js_engine::JsError; use js_engine::{JsExecutionEnvironment, PathForVolumeId}; -use models::{ErrorKind, VolumeId}; +use models::VolumeId; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use tokio::process::Command; use tracing::instrument; use super::ProcedureName; -use crate::context::RpcContext; +use crate::prelude::*; use crate::s9pk::manifest::PackageId; -use crate::util::{GeneralGuard, Version}; +use crate::util::io::to_json_async_writer; +use crate::util::Version; use crate::volume::Volumes; -use crate::Error; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] @@ -45,6 +49,17 @@ impl PathForVolumeId for Volumes { } } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ExecuteArgs { + pub procedure: JsProcedure, + pub directory: PathBuf, + pub pkg_id: PackageId, + pub pkg_version: Version, + pub name: ProcedureName, + pub volumes: Volumes, + pub input: Option, +} + #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct JsProcedure { @@ -67,54 +82,54 @@ impl JsProcedure { volumes: &Volumes, input: Option, timeout: Option, - gid: ProcessGroupId, - rpc_client: Option>, + _gid: ProcessGroupId, + _rpc_client: Option>, ) -> Result, Error> { - let cleaner_client = rpc_client.clone(); - let cleaner = GeneralGuard::new(move || { - tokio::spawn(async move { - if let Some(client) = cleaner_client { - client - .request(SignalGroup, SignalGroupParams { gid, signal: 9 }) - .await - .map_err(|e| { - Error::new(eyre!("{}: {:?}", e.message, e.data), ErrorKind::Docker) - }) - } else { - Ok(()) - } - }) - }); - let res = async move { - let running_action = JsExecutionEnvironment::load_from_package( - directory, - pkg_id, - pkg_version, - Box::new(volumes.clone()), - gid, - rpc_client, - ) - .await? - .run_action(name, input, self.args.clone()); - let output: Option = match timeout { - Some(timeout_duration) => tokio::time::timeout(timeout_duration, running_action) - .await - .map_err(|_| (JsError::Timeout, "Timed out. Retrying soon...".to_owned()))??, - None => running_action.await?, - }; - let output: O = unwrap_known_error(output)?; - Ok(output) + let runner_argument = ExecuteArgs { + procedure: self.clone(), + directory: directory.clone(), + pkg_id: pkg_id.clone(), + pkg_version: pkg_version.clone(), + name, + volumes: volumes.clone(), + input: input.and_then(|x| serde_json::to_value(x).ok()), + }; + let mut runner = Command::new("start-deno") + .arg("execute") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn()?; + to_json_async_writer( + &mut runner.stdin.take().or_not_found("stdin")?, + &runner_argument, + ) + .await?; + + let res = if let Some(timeout) = timeout { + tokio::time::timeout(timeout, runner.wait_with_output()) + .await + .with_kind(ErrorKind::Timeout)?? + } else { + runner.wait_with_output().await? + }; + + if res.status.success() { + serde_json::from_str::>(std::str::from_utf8(&res.stdout)?) + .with_kind(ErrorKind::Deserialization) + } else { + Err(Error::new( + eyre!("{}", String::from_utf8(res.stderr)?), + ErrorKind::Javascript, + )) } - .await - .map_err(|(error, message)| (error.as_code_num(), message)); - cleaner.drop().await.unwrap()?; - Ok(res) } #[instrument(skip_all)] pub async fn sandboxed( &self, - ctx: &RpcContext, + directory: &PathBuf, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -122,24 +137,97 @@ impl JsProcedure { timeout: Option, name: ProcedureName, ) -> Result, Error> { - Ok(async move { + let runner_argument = ExecuteArgs { + procedure: self.clone(), + directory: directory.clone(), + pkg_id: pkg_id.clone(), + pkg_version: pkg_version.clone(), + name, + volumes: volumes.clone(), + input: input.and_then(|x| serde_json::to_value(x).ok()), + }; + let mut runner = Command::new("start-deno") + .arg("sandbox") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn()?; + to_json_async_writer( + &mut runner.stdin.take().or_not_found("stdin")?, + &runner_argument, + ) + .await?; + + let res = if let Some(timeout) = timeout { + tokio::time::timeout(timeout, runner.wait_with_output()) + .await + .with_kind(ErrorKind::Timeout)?? + } else { + runner.wait_with_output().await? + }; + + if res.status.success() { + serde_json::from_str::>(std::str::from_utf8(&res.stdout)?) + .with_kind(ErrorKind::Deserialization) + } else { + Err(Error::new( + eyre!("{}", String::from_utf8(res.stderr)?), + ErrorKind::Javascript, + )) + } + } + + #[instrument(skip_all)] + pub async fn execute_impl( + &self, + directory: &PathBuf, + pkg_id: &PackageId, + pkg_version: &Version, + name: ProcedureName, + volumes: &Volumes, + input: Option, + ) -> Result, Error> { + let res = async move { let running_action = JsExecutionEnvironment::load_from_package( - &ctx.datadir, + directory, + pkg_id, + pkg_version, + Box::new(volumes.clone()), + ) + .await? + .run_action(name, input, self.args.clone()); + let output: Option = running_action.await?; + let output: O = unwrap_known_error(output)?; + Ok(output) + } + .await + .map_err(|(error, message)| (error.as_code_num(), message)); + + Ok(res) + } + + #[instrument(skip_all)] + pub async fn sandboxed_impl( + &self, + directory: &PathBuf, + pkg_id: &PackageId, + pkg_version: &Version, + volumes: &Volumes, + input: Option, + name: ProcedureName, + ) -> Result, Error> { + Ok(async move { + let running_action = JsExecutionEnvironment::load_from_package( + directory, pkg_id, pkg_version, Box::new(volumes.clone()), - ProcessGroupId(0), - None, ) .await? .read_only_effects() .run_action(name, input, self.args.clone()); - let output: Option = match timeout { - Some(timeout_duration) => tokio::time::timeout(timeout_duration, running_action) - .await - .map_err(|_| (JsError::Timeout, "Timed out. Retrying soon...".to_owned()))??, - None => running_action.await?, - }; + let output: Option = running_action.await?; let output: O = unwrap_known_error(output)?; Ok(output) } @@ -720,7 +808,7 @@ async fn js_disk_usage() { .unwrap(); let input: Option = None; let timeout = Some(Duration::from_secs(10)); - dbg!(js_action + js_action .execute::( &path, &package_id, @@ -734,5 +822,5 @@ async fn js_disk_usage() { ) .await .unwrap() - .unwrap()); + .unwrap(); } diff --git a/backend/src/procedure/mod.rs b/backend/src/procedure/mod.rs index 4cc259303..d6d01bf00 100644 --- a/backend/src/procedure/mod.rs +++ b/backend/src/procedure/mod.rs @@ -21,8 +21,6 @@ pub mod docker; pub mod js_scripts; pub use models::ProcedureName; -// TODO: create RPC endpoint that looks up the appropriate action and calls `execute` - #[derive(Clone, Debug, Deserialize, Serialize, HasModel)] #[serde(rename_all = "kebab-case")] #[serde(tag = "type")] @@ -139,7 +137,15 @@ impl PackageProcedure { #[cfg(feature = "js_engine")] PackageProcedure::Script(procedure) => { procedure - .sandboxed(ctx, pkg_id, pkg_version, volumes, input, timeout, name) + .sandboxed( + &ctx.datadir, + pkg_id, + pkg_version, + volumes, + input, + timeout, + name, + ) .await } } @@ -157,13 +163,15 @@ impl std::fmt::Display for PackageProcedure { } } +// TODO: make this not allocate #[derive(Debug)] pub struct NoOutput; impl<'de> Deserialize<'de> for NoOutput { - fn deserialize(_: D) -> Result + fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { + let _ = Value::deserialize(deserializer)?; Ok(NoOutput) } } diff --git a/backend/src/registry/admin.rs b/backend/src/registry/admin.rs index f6eecc1f2..44b83d161 100644 --- a/backend/src/registry/admin.rs +++ b/backend/src/registry/admin.rs @@ -52,7 +52,7 @@ async fn do_index( pkg: &Package, ) -> Result<(), Error> { url.set_path("/admin/v0/index"); - let mut req = httpc + let req = httpc .post(url) .header(header::ACCEPT, "text/plain") .basic_auth(user, Some(pass)) @@ -74,7 +74,7 @@ async fn do_upload( body: Body, ) -> Result<(), Error> { url.set_path("/admin/v0/upload"); - let mut req = httpc + let req = httpc .post(url) .header(header::ACCEPT, "text/plain") .basic_auth(user, Some(pass)) diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index 023726d7f..b09e1fbb0 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -11,8 +11,7 @@ use deno_core::{ ModuleSourceFuture, ModuleSpecifier, ModuleType, OpDecl, ResolutionKind, RuntimeOptions, Snapshot, }; -use embassy_container_init::ProcessGroupId; -use helpers::{script_dir, spawn_local, Rsync, UnixRpcClient}; +use helpers::{script_dir, spawn_local, Rsync}; use models::{PackageId, ProcedureName, Version, VolumeId}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -101,8 +100,6 @@ struct JsContext { volumes: Arc, input: Value, variable_args: Vec, - container_process_gid: ProcessGroupId, - container_rpc_client: Option>, rsyncs: Arc)>>, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] @@ -183,8 +180,6 @@ pub struct JsExecutionEnvironment { package_id: PackageId, version: Version, volumes: Arc, - container_process_gid: ProcessGroupId, - container_rpc_client: Option>, } impl JsExecutionEnvironment { @@ -193,8 +188,6 @@ impl JsExecutionEnvironment { package_id: &PackageId, version: &Version, volumes: Box, - container_process_gid: ProcessGroupId, - container_rpc_client: Option>, ) -> Result { let data_dir = data_directory.as_ref(); let base_directory = data_dir; @@ -228,8 +221,6 @@ impl JsExecutionEnvironment { version: version.clone(), volumes: volumes.into(), sandboxed: false, - container_process_gid, - container_rpc_client, }) } pub fn read_only_effects(mut self) -> Self { @@ -295,11 +286,7 @@ impl JsExecutionEnvironment { fns::get_variable_args::decl(), fns::set_value::decl(), fns::is_sandboxed::decl(), - fns::start_command::decl(), - fns::wait_command::decl(), fns::sleep::decl(), - fns::send_signal::decl(), - fns::signal_group::decl(), fns::rsync::decl(), fns::rsync_wait::decl(), fns::rsync_progress::decl(), @@ -332,8 +319,6 @@ impl JsExecutionEnvironment { sandboxed: self.sandboxed, input, variable_args, - container_process_gid: self.container_process_gid, - container_rpc_client: self.container_rpc_client.clone(), rsyncs: Default::default(), }; let ext = Extension::builder("embassy") @@ -389,20 +374,17 @@ mod fns { use deno_core::anyhow::{anyhow, bail}; use deno_core::error::AnyError; use deno_core::*; - use embassy_container_init::{ - OutputParams, OutputStrategy, ProcessGroupId, ProcessId, RunCommand, RunCommandParams, - SendSignal, SendSignalParams, SignalGroup, SignalGroupParams, - }; + use embassy_container_init::ProcessId; use helpers::{to_tmp_path, AtomicFile, Rsync, RsyncOptions}; use itertools::Itertools; use models::VolumeId; use serde::{Deserialize, Serialize}; - use serde_json::{json, Value}; + use serde_json::Value; use tokio::io::AsyncWriteExt; use tokio::process::Command; use super::{AnswerState, JsContext}; - use crate::{system_time_as_unix_ms, MetadataJs, ResultType}; + use crate::{system_time_as_unix_ms, MetadataJs}; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)] struct FetchOptions { @@ -992,18 +974,6 @@ mod fns { let state = state.borrow(); state.borrow::().clone() }; - if let Some(rpc_client) = ctx.container_rpc_client { - return rpc_client - .request( - embassy_container_init::Log, - embassy_container_init::LogParams { - gid: Some(ctx.container_process_gid), - level: embassy_container_init::LogLevel::Trace(input), - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)); - } tracing::trace!( package_id = tracing::field::display(&ctx.package_id), run_function = tracing::field::display(&ctx.run_function), @@ -1018,18 +988,6 @@ mod fns { let state = state.borrow(); state.borrow::().clone() }; - if let Some(rpc_client) = ctx.container_rpc_client { - return rpc_client - .request( - embassy_container_init::Log, - embassy_container_init::LogParams { - gid: Some(ctx.container_process_gid), - level: embassy_container_init::LogLevel::Warn(input), - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)); - } tracing::warn!( package_id = tracing::field::display(&ctx.package_id), run_function = tracing::field::display(&ctx.run_function), @@ -1044,18 +1002,6 @@ mod fns { let state = state.borrow(); state.borrow::().clone() }; - if let Some(rpc_client) = ctx.container_rpc_client { - return rpc_client - .request( - embassy_container_init::Log, - embassy_container_init::LogParams { - gid: Some(ctx.container_process_gid), - level: embassy_container_init::LogLevel::Error(input), - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)); - } tracing::error!( package_id = tracing::field::display(&ctx.package_id), run_function = tracing::field::display(&ctx.run_function), @@ -1070,18 +1016,6 @@ mod fns { let state = state.borrow(); state.borrow::().clone() }; - if let Some(rpc_client) = ctx.container_rpc_client { - return rpc_client - .request( - embassy_container_init::Log, - embassy_container_init::LogParams { - gid: Some(ctx.container_process_gid), - level: embassy_container_init::LogLevel::Debug(input), - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)); - } tracing::debug!( package_id = tracing::field::display(&ctx.package_id), run_function = tracing::field::display(&ctx.run_function), @@ -1092,28 +1026,11 @@ mod fns { } #[op] async fn log_info(state: Rc>, input: String) -> Result<(), AnyError> { - let (container_rpc_client, container_process_gid, package_id, run_function) = { + let (package_id, run_function) = { let state = state.borrow(); let ctx: JsContext = state.borrow::().clone(); - ( - ctx.container_rpc_client, - ctx.container_process_gid, - ctx.package_id, - ctx.run_function, - ) + (ctx.package_id, ctx.run_function) }; - if let Some(rpc_client) = container_rpc_client { - return rpc_client - .request( - embassy_container_init::Log, - embassy_container_init::LogParams { - gid: Some(container_process_gid), - level: embassy_container_init::LogLevel::Info(input), - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)); - } tracing::info!( package_id = tracing::field::display(&package_id), run_function = tracing::field::display(&run_function), @@ -1145,144 +1062,12 @@ mod fns { Ok(ctx.sandboxed) } - #[op] - async fn send_signal( - state: Rc>, - pid: u32, - signal: u32, - ) -> Result<(), AnyError> { - if let Some(rpc_client) = { - let state = state.borrow(); - let ctx = state.borrow::(); - ctx.container_rpc_client.clone() - } { - rpc_client - .request( - SendSignal, - SendSignalParams { - pid: ProcessId(pid), - signal, - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data))?; - - Ok(()) - } else { - Err(anyhow!("No RpcClient for command operations")) - } - } - - #[op] - async fn signal_group( - state: Rc>, - gid: u32, - signal: u32, - ) -> Result<(), AnyError> { - if let Some(rpc_client) = { - let state = state.borrow(); - let ctx = state.borrow::(); - ctx.container_rpc_client.clone() - } { - rpc_client - .request( - SignalGroup, - SignalGroupParams { - gid: ProcessGroupId(gid), - signal, - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data))?; - - Ok(()) - } else { - Err(anyhow!("No RpcClient for command operations")) - } - } - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct StartCommand { process_id: ProcessId, } - #[op] - async fn start_command( - state: Rc>, - command: String, - args: Vec, - output: OutputStrategy, - timeout: Option, - ) -> Result { - if let (gid, Some(rpc_client)) = { - let state = state.borrow(); - let ctx = state.borrow::(); - (ctx.container_process_gid, ctx.container_rpc_client.clone()) - } { - let pid = rpc_client - .request( - RunCommand, - RunCommandParams { - gid: Some(gid), - command, - args, - output, - }, - ) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data))?; - - if let Some(timeout) = timeout { - tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(timeout)).await; - if let Err(err) = rpc_client - .request(SendSignal, SendSignalParams { pid, signal: 9 }) - .await - .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)) - { - tracing::warn!("Could not kill process {pid:?}"); - tracing::debug!("{err:?}"); - } - }); - } - - Ok(StartCommand { process_id: pid }) - } else { - Err(anyhow!("No RpcClient for command operations")) - } - } - - #[op] - async fn wait_command( - state: Rc>, - pid: ProcessId, - ) -> Result { - if let Some(rpc_client) = { - let state = state.borrow(); - let ctx = state.borrow::(); - ctx.container_rpc_client.clone() - } { - Ok( - match rpc_client - .request(embassy_container_init::Output, OutputParams { pid }) - .await - { - Ok(a) => ResultType::Result(json!(a)), - Err(e) => ResultType::ErrorCode( - e.code, - match e.data { - Some(Value::String(s)) => s, - e => format!("{:?}", e), - }, - ), - }, - ) - } else { - Err(anyhow!("No RpcClient for command operations")) - } - } - #[op] async fn sleep(time_ms: u64) -> Result<(), AnyError> { tokio::time::sleep(Duration::from_millis(time_ms)).await; diff --git a/libs/models/src/procedure_name.rs b/libs/models/src/procedure_name.rs index ae71e3ad5..6a092955a 100644 --- a/libs/models/src/procedure_name.rs +++ b/libs/models/src/procedure_name.rs @@ -1,6 +1,8 @@ +use serde::{Deserialize, Serialize}; + use crate::{ActionId, HealthCheckId, PackageId}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum ProcedureName { Main, // Usually just run container CreateBackup,