diff --git a/backend/src/config/hook.rs b/backend/src/config/hook.rs new file mode 100644 index 000000000..4738a54f0 --- /dev/null +++ b/backend/src/config/hook.rs @@ -0,0 +1,53 @@ +use helpers::Callback; +use itertools::Itertools; +use jsonpath_lib::Compiled; +use models::PackageId; +use serde_json::Value; + +use crate::context::RpcContext; + +pub struct ConfigHook { + pub path: Compiled, + pub prev: Vec, + pub callback: Callback, +} + +impl RpcContext { + pub async fn add_config_hook(&self, id: PackageId, hook: ConfigHook) { + let mut hooks = self.config_hooks.lock().await; + let prev = hooks.remove(&id).unwrap_or_default(); + hooks.insert( + id, + prev.into_iter() + .filter(|h| h.callback.is_listening()) + .chain(std::iter::once(hook)) + .collect(), + ); + } + + pub async fn call_config_hooks(&self, id: PackageId, config: &Value) { + let mut hooks = self.config_hooks.lock().await; + let mut prev = hooks.remove(&id).unwrap_or_default(); + for hook in &mut prev { + let new = hook + .path + .select(config) + .unwrap_or_default() + .into_iter() + .cloned() + .collect_vec(); + if new != hook.prev { + hook.callback + .call(vec![Value::Array(new.clone())]) + .unwrap_or_default(); + hook.prev = new; + } + } + hooks.insert( + id, + prev.into_iter() + .filter(|h| h.callback.is_listening()) + .collect(), + ); + } +} diff --git a/backend/src/config/mod.rs b/backend/src/config/mod.rs index 17a6df1c2..0cd23e88e 100644 --- a/backend/src/config/mod.rs +++ b/backend/src/config/mod.rs @@ -1,29 +1,24 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::path::PathBuf; use std::time::Duration; use color_eyre::eyre::eyre; -use futures::future::{BoxFuture, FutureExt}; use indexmap::IndexSet; use itertools::Itertools; use models::ErrorKind; -use patch_db::{ - DbHandle, LockReceipt, LockTarget, LockTargetId, LockType, PatchDbHandle, Transaction, Verifier, -}; -use rand::SeedableRng; +use patch_db::{DbHandle, LockReceipt, LockTarget, LockTargetId, LockType, Verifier}; use regex::Regex; use rpc_toolkit::command; use serde_json::Value; use tracing::instrument; use crate::context::RpcContext; -use crate::db::model::{CurrentDependencies, CurrentDependencyInfo, CurrentDependents}; +use crate::db::model::{CurrentDependencies, CurrentDependents}; use crate::dependencies::{ - add_dependent_to_current_dependents_lists, break_transitive, heal_all_dependents_transitive, - BreakTransitiveReceipts, BreakageRes, Dependencies, DependencyConfig, DependencyError, - DependencyErrors, DependencyReceipt, TaggedDependencyError, TryHealReceipts, + BreakTransitiveReceipts, BreakageRes, Dependencies, DependencyConfig, DependencyErrors, + DependencyReceipt, TaggedDependencyError, TryHealReceipts, }; -use crate::install::cleanup::{remove_from_current_dependents_lists, UpdateDependencyReceipts}; +use crate::install::cleanup::UpdateDependencyReceipts; use crate::procedure::docker::DockerContainers; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::util::display_none; @@ -31,6 +26,7 @@ use crate::util::serde::{display_serializable, parse_stdin_deserializable, IoFor use crate::Error; pub mod action; +pub mod hook; pub mod spec; pub mod util; @@ -38,7 +34,7 @@ pub use spec::{ConfigSpec, Defaultable}; use util::NumRange; use self::action::{ConfigActions, ConfigRes}; -use self::spec::{ConfigPointerReceipts, PackagePointerSpec, ValueSpecPointer}; +use self::spec::{ConfigPointerReceipts, ValueSpecPointer}; pub type Config = serde_json::Map; pub trait TypeOf { diff --git a/backend/src/context/cli.rs b/backend/src/context/cli.rs index 25adecdde..aa8ab59f4 100644 --- a/backend/src/context/cli.rs +++ b/backend/src/context/cli.rs @@ -81,7 +81,7 @@ impl CliContext { .chain(std::iter::once(Path::new(crate::util::config::CONFIG_PATH))), )?; let mut url = if let Some(host) = matches.value_of("host") { - host.parse()? + Url::parse host.parse()? } else if let Some(host) = base.host { host } else { diff --git a/backend/src/context/rpc.rs b/backend/src/context/rpc.rs index 16c5297dc..2a015cb52 100644 --- a/backend/src/context/rpc.rs +++ b/backend/src/context/rpc.rs @@ -9,6 +9,7 @@ use std::time::Duration; use bollard::Docker; use helpers::to_tmp_path; use josekit::jwk::Jwk; +use models::PackageId; use patch_db::json_ptr::JsonPointer; use patch_db::{DbHandle, LockReceipt, LockType, PatchDb}; use reqwest::Url; @@ -21,6 +22,7 @@ use tracing::instrument; use super::setup::CURRENT_SECRET; use crate::account::AccountInfo; +use crate::config::hook::ConfigHook; use crate::core::rpc_continuations::{RequestGuid, RestHandler, RpcContinuation}; use crate::db::model::{CurrentDependents, Database, InstalledPackageDataEntry, PackageDataEntry}; use crate::disk::OsPartitionInfo; @@ -120,6 +122,7 @@ pub struct RpcContextSeed { pub rpc_stream_continuations: Mutex>, pub wifi_manager: Option>>, pub current_secret: Arc, + pub config_hooks: Mutex>>, } pub struct RpcCleanReceipts { @@ -235,6 +238,7 @@ impl RpcContext { ) })?, ), + config_hooks: Mutex::new(BTreeMap::new()), }); let res = Self(seed); diff --git a/backend/src/manager/js_api.rs b/backend/src/manager/js_api.rs index c924e7ec2..65174c7c3 100644 --- a/backend/src/manager/js_api.rs +++ b/backend/src/manager/js_api.rs @@ -1,22 +1,81 @@ -use color_eyre::{eyre::eyre, Report}; +use color_eyre::{ + eyre::{bail, eyre}, + Report, +}; use helpers::{AddressSchemaLocal, AddressSchemaOnion, Callback, OsApi}; +use itertools::Itertools; +use jsonpath_lib::Compiled; use models::{InterfaceId, PackageId}; +use serde_json::Value; use sqlx::Acquire; -use crate::{manager::Manager, net::keys::Key}; +use crate::{ + config::hook::ConfigHook, + manager::{start_stop::StartStop, Manager}, + net::keys::Key, +}; use super::try_get_running_ip; +const NULL_VALUE: &Value = &Value::Null; + #[async_trait::async_trait] impl OsApi for Manager { async fn get_service_config( &self, id: PackageId, path: &str, - callback: Callback, - ) -> Result { - todo!("BLUJ") + callback: Option, + ) -> Result, Report> { + let found = match self + .seed + .manifest + .dependencies + .0 + .iter() + .find(|x| x.0 == &id) + { + None => bail!("Cannot get a service that is not part of the dependencies"), + Some(a) => a, + }; + + let config = match crate::config::get(self.seed.ctx.clone(), id.clone(), None) + .await + .map(|x| x.config) + { + Ok(Some(a)) => a, + Ok(None) => bail!("No current config for the service"), + Err(err) => bail!("Could not fetch the config. {err}"), + }; + + let path = Compiled::compile(path).map_err(|e| eyre!("{e}"))?; + + let filtered_values = path + .select(&Value::Object(config))? + .into_iter() + .cloned() + .collect_vec(); + + if let Some(callback) = callback { + self.seed + .ctx + .add_config_hook( + id, + ConfigHook { + path, + prev: filtered_values.clone(), + callback, + }, + ) + .await; + } + + Ok(filtered_values) } + // Get tor key - base 32 + + // Certificate + Certificate key for interface + async fn bind_local( &self, internal_port: u16, @@ -77,24 +136,6 @@ impl OsApi for Manager { tx.commit().await?; Ok(helpers::Address(key)) } - async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> { - let ip = try_get_running_ip(&self.seed) - .await? - .ok_or_else(|| eyre!("No ip available"))?; - let mut svc = self - .seed - .ctx - .net_controller - .create_service(self.seed.manifest.id.clone(), ip) - .await - .map_err(|e| eyre!("Could not get to net controller: {e:?}"))?; - let mut secrets = self.seed.ctx.secret_store.acquire().await?; - - svc.remove_tor(id, external) - .await - .map_err(|e| eyre!("Could not add to tor: {e:?}"))?; - Ok(()) - } async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report> { let ip = try_get_running_ip(&self.seed) .await? @@ -113,4 +154,49 @@ impl OsApi for Manager { .map_err(|e| eyre!("Could not add to local: {e:?}"))?; Ok(()) } + async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> { + let ip = try_get_running_ip(&self.seed) + .await? + .ok_or_else(|| eyre!("No ip available"))?; + let mut svc = self + .seed + .ctx + .net_controller + .create_service(self.seed.manifest.id.clone(), ip) + .await + .map_err(|e| eyre!("Could not get to net controller: {e:?}"))?; + let mut secrets = self.seed.ctx.secret_store.acquire().await?; + + svc.remove_tor(id, external) + .await + .map_err(|e| eyre!("Could not add to tor: {e:?}"))?; + Ok(()) + } + + fn set_started(&self) -> Result<(), Report> { + self.manage_container + .current_state + .send(StartStop::Start) + .unwrap_or_default(); + Ok(()) + } + + async fn restart(&self) -> Result<(), Report> { + self.perform_restart().await; + Ok(()) + } + + async fn start(&self) -> Result<(), Report> { + self.manage_container + .wait_for_desired(StartStop::Start) + .await; + Ok(()) + } + + async fn stop(&self) -> Result<(), Report> { + self.manage_container + .wait_for_desired(StartStop::Stop) + .await; + Ok(()) + } } diff --git a/backend/src/manager/manager_container.rs b/backend/src/manager/manager_container.rs index 74eb73ef3..e0eb153f0 100644 --- a/backend/src/manager/manager_container.rs +++ b/backend/src/manager/manager_container.rs @@ -66,7 +66,15 @@ impl ManageContainer { } pub fn to_desired(&self, new_state: StartStop) { - self.desired_state.send(new_state); + self.desired_state.send(new_state).unwrap_or_default(); + } + + pub async fn wait_for_desired(&self, new_state: StartStop) { + let mut current_state = self.current_state(); + self.to_desired(new_state); + while *current_state.borrow() != new_state { + current_state.changed().await.unwrap_or_default(); + } } pub fn current_state(&self) -> watch::Receiver { diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index 262dea5e0..357517a58 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -420,6 +420,9 @@ fn configure( .set(ctx, id, &version, &dependencies, &volumes, &config) .await?; + ctx.call_config_hooks(id.clone(), &serde_json::Value::Object(config.clone())) + .await; + // track dependencies with no pointers for (package_id, health_checks) in res.depends_on.into_iter() { if let Some(current_dependency) = current_dependencies.0.get_mut(&package_id) { diff --git a/backend/src/procedure/js_scripts.rs b/backend/src/procedure/js_scripts.rs index ca1d2eb75..05d1c9bf1 100644 --- a/backend/src/procedure/js_scripts.rs +++ b/backend/src/procedure/js_scripts.rs @@ -8,7 +8,7 @@ use embassy_container_init::{ProcessGroupId, SignalGroup, SignalGroupParams}; use helpers::{Address, AddressSchemaLocal, AddressSchemaOnion, Callback, OsApi, UnixRpcClient}; pub use js_engine::JsError; use js_engine::{JsExecutionEnvironment, PathForVolumeId}; -use models::{ErrorKind, VolumeId}; +use models::{ErrorKind, InterfaceId, VolumeId}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tracing::instrument; @@ -56,9 +56,9 @@ impl OsApi for SandboxOsApi { &self, id: PackageId, path: &str, - callback: Callback, - ) -> Result { - todo!() + callback: Option, + ) -> Result, Report> { + Err(eyre!("Operation not permitted")) } #[allow(unused_variables)] async fn bind_local( @@ -66,7 +66,7 @@ impl OsApi for SandboxOsApi { internal_port: u16, address_schema: AddressSchemaLocal, ) -> Result { - todo!() + Err(eyre!("Operation not permitted")) } #[allow(unused_variables)] async fn bind_onion( @@ -74,7 +74,27 @@ impl OsApi for SandboxOsApi { internal_port: u16, address_schema: AddressSchemaOnion, ) -> Result { - todo!() + Err(eyre!("Operation not permitted")) + } + #[allow(unused_variables)] + async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report> { + Err(eyre!("Operation not permitted")) + } + #[allow(unused_variables)] + async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> { + Err(eyre!("Operation not permitted")) + } + fn set_started(&self) -> Result<(), Report> { + Err(eyre!("Operation not permitted")) + } + async fn restart(&self) -> Result<(), Report> { + Err(eyre!("Operation not permitted")) + } + async fn start(&self) -> Result<(), Report> { + Err(eyre!("Operation not permitted")) + } + async fn stop(&self) -> Result<(), Report> { + Err(eyre!("Operation not permitted")) } } diff --git a/libs/helpers/src/os_api.rs b/libs/helpers/src/os_api.rs index 07f7ba1ce..47735332f 100644 --- a/libs/helpers/src/os_api.rs +++ b/libs/helpers/src/os_api.rs @@ -1,19 +1,35 @@ +use std::sync::Arc; + use color_eyre::eyre::eyre; use color_eyre::Report; +use models::InterfaceId; use models::PackageId; -use models::{Error, InterfaceId}; use serde_json::Value; +use tokio::sync::mpsc; pub struct RuntimeDropped; -pub type Callback = Box Result<(), RuntimeDropped> + Send + Sync + 'static>; // bool indicating if - -fn method_not_available() -> Error { - Error::new( - eyre!("method not available"), - models::ErrorKind::InvalidRequest, - ) +pub struct Callback { + id: Arc, + sender: mpsc::UnboundedSender<(Arc, Vec)>, } +impl Callback { + pub fn new(id: String, sender: mpsc::UnboundedSender<(Arc, Vec)>) -> Self { + Self { + id: Arc::new(id), + sender, + } + } + pub fn is_listening(&self) -> bool { + self.sender.is_closed() + } + pub fn call(&self, args: Vec) -> Result<(), RuntimeDropped> { + self.sender + .send((self.id.clone(), args)) + .map_err(|_| RuntimeDropped) + } +} + #[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct AddressSchemaOnion { @@ -44,8 +60,8 @@ pub trait OsApi: Send + Sync + 'static { &self, id: PackageId, path: &str, - callback: Callback, - ) -> Result; + callback: Option, + ) -> Result, Report>; async fn bind_local( &self, @@ -58,34 +74,10 @@ pub trait OsApi: Send + Sync + 'static { address_schema: AddressSchemaOnion, ) -> Result; - async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report> { - todo!() - } - async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> { - todo!() - } - async fn list_address(&self) -> Result, Report> { - todo!() - } - async fn list_domains(&self) -> Result, Report> { - todo!() - } - async fn alloc_onion(&self, id: String) -> Result { - todo!() - } - async fn dealloc_onion(&self, id: String) -> Result<(), Report> { - todo!() - } - async fn alloc_local(&self, id: String) -> Result { - todo!() - } - async fn dealloc_local(&self, id: String) -> Result<(), Report> { - todo!() - } - async fn alloc_forward(&self, id: String) -> Result { - todo!() - } - async fn dealloc_forward(&self, id: String) -> Result<(), Report> { - todo!() - } + async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report>; + async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report>; + fn set_started(&self) -> Result<(), Report>; + async fn restart(&self) -> Result<(), Report>; + async fn start(&self) -> Result<(), Report>; + async fn stop(&self) -> Result<(), Report>; } diff --git a/libs/js_engine/src/artifacts/loadModule.js b/libs/js_engine/src/artifacts/loadModule.js index a285f7bf8..1998f7cc1 100644 --- a/libs/js_engine/src/artifacts/loadModule.js +++ b/libs/js_engine/src/artifacts/loadModule.js @@ -259,17 +259,7 @@ const runRsync = ( }; }; -const diskUsage = async ({ - volumeId = requireParam("volumeId"), - path = requireParam("path"), -} = { volumeId: null, path: null }) => { - const [used, total] = await Deno.core.opAsync("disk_usage", volumeId, path); - return { used, total } -} - -globalThis.runCallback = (uuid, data) => callbackMapping[uuid](data); -// window.runCallback = runCallback; -// Deno.runCallback = runCallback; +globalThis.runCallback = (uuid, args) => callbackMapping[uuid](...args); const getServiceConfig = async ( { diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index 41d407c34..ad639ef61 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -100,8 +100,9 @@ struct JsContext { container_process_gid: ProcessGroupId, container_rpc_client: Option>, rsyncs: Arc)>>, - callback_sender: mpsc::UnboundedSender<(String, Value)>, + callback_sender: mpsc::UnboundedSender<(Arc, Vec)>, } + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "kebab-case")] enum ResultType { @@ -381,7 +382,11 @@ impl JsExecutionEnvironment { .load_main_module(&"file:///loadModule.js".parse().unwrap(), None) .await?; let evaluated = runtime.mod_evaluate(mod_id); - let res = run_event_loop(&mut runtime, callback_receiver).await; + let res = RuntimeEventLoop { + runtime: &mut runtime, + callback_receiver, + } + .await; res?; evaluated.await??; Ok::<_, AnyError>(()) @@ -406,7 +411,7 @@ impl JsExecutionEnvironment { #[pin_project::pin_project] struct RuntimeEventLoop<'a> { runtime: &'a mut JsRuntime, - callback: mpsc::UnboundedReceiver<(String, Value)>, + callback_receiver: mpsc::UnboundedReceiver<(Arc, Vec)>, } impl<'a> Future for RuntimeEventLoop<'a> { type Output = Result<(), AnyError>; @@ -415,10 +420,10 @@ impl<'a> Future for RuntimeEventLoop<'a> { cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let this = self.project(); - if let Poll::Ready(Some((uuid, value))) = this.callback.poll_recv(cx) { + if let Poll::Ready(Some((uuid, args))) = this.callback_receiver.poll_recv(cx) { match this.runtime.execute_script( "callback", - &format!("globalThis.runCallback(\"{uuid}\", {value})"), + &format!("globalThis.runCallback(\"{uuid}\", {})", Value::Array(args)), ) { Ok(_) => (), Err(e) => return Poll::Ready(Err(e)), @@ -427,16 +432,6 @@ impl<'a> Future for RuntimeEventLoop<'a> { this.runtime.poll_event_loop(cx, false) } } -async fn run_event_loop( - runtime: &mut JsRuntime, - callback_receiver: mpsc::UnboundedReceiver<(String, Value)>, -) -> Result<(), AnyError> { - RuntimeEventLoop { - runtime, - callback: callback_receiver, - } - .await -} /// Note: Make sure that we have the assumption that all these methods are callable at any time, and all call restrictions should be in rust mod fns { @@ -458,8 +453,8 @@ mod fns { SendSignal, SendSignalParams, SignalGroup, SignalGroupParams, }; use helpers::{ - to_tmp_path, AddressSchemaLocal, AddressSchemaOnion, AtomicFile, Rsync, RsyncOptions, - RuntimeDropped, + to_tmp_path, AddressSchemaLocal, AddressSchemaOnion, AtomicFile, Callback, Rsync, + RsyncOptions, }; use itertools::Itertools; use models::{PackageId, VolumeId}; @@ -1493,8 +1488,8 @@ mod fns { state: Rc>, service_id: PackageId, path: String, - callback: String, - ) -> Result { + callback: Option, + ) -> Result, AnyError> { let (sender, os) = { let state = state.borrow(); let ctx = state.borrow::(); @@ -1503,11 +1498,7 @@ mod fns { os.get_service_config( service_id, &path, - Box::new(move |value| { - sender - .send((callback.clone(), value)) - .map_err(|_| RuntimeDropped) - }), + callback.map(|id| Callback::new(id, sender)), ) .await .map_err(|e| anyhow!("Couldn't get service config: {e:?}")) @@ -1544,6 +1535,75 @@ mod fns { .map_err(|e| anyhow!("{e:?}")) } + #[op] + fn set_started(state: &mut OpState) -> Result<(), AnyError> { + let os = { + let ctx = state.borrow::(); + ctx.os.clone() + }; + os.set_started().map_err(|e| anyhow!("{e:?}")) + } + + #[op] + async fn restart(state: Rc>) -> Result<(), AnyError> { + let sandboxed = { + let state = state.borrow(); + let ctx: &JsContext = state.borrow(); + ctx.sandboxed + }; + + if sandboxed { + bail!("Will not run restart in sandboxed mode"); + } + + let os = { + let state = state.borrow(); + let ctx = state.borrow::(); + ctx.os.clone() + }; + os.restart().await.map_err(|e| anyhow!("{e:?}")) + } + + #[op] + async fn start(state: Rc>) -> Result<(), AnyError> { + let sandboxed = { + let state = state.borrow(); + let ctx: &JsContext = state.borrow(); + ctx.sandboxed + }; + + if sandboxed { + bail!("Will not run start in sandboxed mode"); + } + + let os = { + let state = state.borrow(); + let ctx = state.borrow::(); + ctx.os.clone() + }; + os.start().await.map_err(|e| anyhow!("{e:?}")) + } + + #[op] + async fn stop(state: Rc>) -> Result<(), AnyError> { + let sandboxed = { + let state = state.borrow(); + let ctx: &JsContext = state.borrow(); + ctx.sandboxed + }; + + if sandboxed { + bail!("Will not run stop in sandboxed mode"); + } + + let os = { + let state = state.borrow(); + let ctx = state.borrow::(); + ctx.os.clone() + }; + os.stop().await.map_err(|e| anyhow!("{e:?}")) + } + /// We need to make sure that during the file accessing, we don't reach beyond our scope of control async fn is_subset( parent: impl AsRef,