From bec307d0e9abb611ff306fa18c563f60e66db141 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Wed, 1 Feb 2023 16:53:25 -0700 Subject: [PATCH] js effect for subscribing to config fix errors chore: Fix some things in the manager for clippy --- backend/Cargo.lock | 6 +- backend/src/backup/mod.rs | 2 +- backend/src/control.rs | 4 +- backend/src/manager/manager_map.rs | 14 +- backend/src/manager/manager_seed.rs | 12 +- backend/src/manager/mod.rs | 147 +++++--------------- backend/src/manager/persistent_container.rs | 8 +- backend/src/manager/start_stop.rs | 2 +- backend/src/procedure/js_scripts.rs | 27 +++- backend/src/procedure/mod.rs | 30 ++-- backend/src/util/mod.rs | 4 +- libs/Cargo.lock | 6 +- libs/helpers/Cargo.toml | 1 + libs/helpers/src/lib.rs | 2 + libs/helpers/src/os_api.rs | 17 +++ libs/js_engine/Cargo.toml | 1 + libs/js_engine/src/artifacts/loadModule.js | 13 ++ libs/js_engine/src/lib.rs | 95 +++++++++++-- 18 files changed, 223 insertions(+), 168 deletions(-) create mode 100644 libs/helpers/src/os_api.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 95f92bef2..92ff2179e 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -182,9 +182,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.58" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2 1.0.51", "quote 1.0.23", @@ -1987,6 +1987,7 @@ dependencies = [ name = "helpers" version = "0.1.0" dependencies = [ + "async-trait", "color-eyre", "futures", "lazy_async_pool", @@ -2441,6 +2442,7 @@ dependencies = [ "helpers", "itertools 0.10.5", "models", + "pin-project", "reqwest", "serde", "serde_json", diff --git a/backend/src/backup/mod.rs b/backend/src/backup/mod.rs index 0125e75b1..c2bd9bcd0 100644 --- a/backend/src/backup/mod.rs +++ b/backend/src/backup/mod.rs @@ -47,7 +47,7 @@ pub struct ServerBackupReport { #[derive(Debug, Deserialize, Serialize)] pub struct PackageBackupReport { - error: Option, + pub error: Option, } #[command(subcommands(backup_bulk::backup_all, target::target))] diff --git a/backend/src/control.rs b/backend/src/control.rs index 044834a57..56d4e6788 100644 --- a/backend/src/control.rs +++ b/backend/src/control.rs @@ -28,7 +28,7 @@ impl StartReceipts { let mut locks = Vec::new(); let setup = Self::setup(&mut locks, id); - Ok(setup(&db.lock_all(locks).await?)?) + setup(&db.lock_all(locks).await?) } pub fn setup( @@ -95,7 +95,7 @@ impl StopReceipts { let mut locks = Vec::new(); let setup = Self::setup(&mut locks, id); - Ok(setup(&db.lock_all(locks).await?)?) + setup(&db.lock_all(locks).await?) } pub fn setup( diff --git a/backend/src/manager/manager_map.rs b/backend/src/manager/manager_map.rs index 86c9c2754..4e21004ac 100644 --- a/backend/src/manager/manager_map.rs +++ b/backend/src/manager/manager_map.rs @@ -5,12 +5,10 @@ use color_eyre::eyre::eyre; use patch_db::DbHandle; use sqlx::{Executor, Postgres}; use tokio::sync::RwLock; -use torut::onion::TorSecretKeyV3; use tracing::instrument; use super::Manager; use crate::context::RpcContext; -use crate::net::interface::InterfaceId; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::util::Version; use crate::Error; @@ -48,10 +46,9 @@ impl ManagerMap { continue; }; - let tor_keys = man.interfaces.tor_keys(secrets, &package).await?; res.insert( (package, man.version.clone()), - Arc::new(Manager::new(ctx.clone(), man, tor_keys).await?), + Arc::new(Manager::new(ctx.clone(), man).await?), ); } *self.0.write().await = res; @@ -59,18 +56,13 @@ impl ManagerMap { } #[instrument(skip_all)] - pub async fn add( - &self, - ctx: RpcContext, - manifest: Manifest, - tor_keys: BTreeMap, - ) -> Result<(), Error> { + pub async fn add(&self, ctx: RpcContext, manifest: Manifest) -> Result<(), Error> { let mut lock = self.0.write().await; let id = (manifest.id.clone(), manifest.version.clone()); if let Some(man) = lock.remove(&id) { man.exit().await; } - lock.insert(id, Arc::new(Manager::new(ctx, manifest, tor_keys).await?)); + lock.insert(id, Arc::new(Manager::new(ctx, manifest).await?)); Ok(()) } diff --git a/backend/src/manager/manager_seed.rs b/backend/src/manager/manager_seed.rs index 7466cd7f0..b92f96491 100644 --- a/backend/src/manager/manager_seed.rs +++ b/backend/src/manager/manager_seed.rs @@ -1,11 +1,6 @@ -use std::collections::BTreeMap; - use bollard::container::StopContainerOptions; -use torut::onion::TorSecretKeyV3; -use super::sigterm_timeout; use crate::context::RpcContext; -use crate::net::interface::InterfaceId; use crate::s9pk::manifest::Manifest; use crate::Error; @@ -13,7 +8,6 @@ pub struct ManagerSeed { pub ctx: RpcContext, pub manifest: Manifest, pub container_name: String, - pub tor_keys: BTreeMap, } impl ManagerSeed { @@ -24,7 +18,11 @@ impl ManagerSeed { .stop_container( &self.container_name, Some(StopContainerOptions { - t: sigterm_timeout(&self.manifest) + t: self + .manifest + .containers + .as_ref() + .and_then(|c| c.main.sigterm_timeout) .map(|d| d.as_secs()) .unwrap_or(30) as i64, }), diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index 221c725d5..47234ab6c 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; use std::net::Ipv4Addr; -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; @@ -18,12 +17,11 @@ use rand::SeedableRng; use serde_json::Value; use sqlx::Connection; use start_stop::StartStop; -use tokio::sync::{oneshot, Notify}; +use tokio::sync::oneshot; use tokio::sync::{ - watch::{self, Receiver, Sender}, + watch::{self, Sender}, Mutex, }; -use torut::onion::TorSecretKeyV3; use tracing::instrument; use transition_state::TransitionState; @@ -41,10 +39,9 @@ use crate::dependencies::{ use crate::disk::mount::backup::BackupMountGuard; use crate::disk::mount::guard::TmpMountGuard; use crate::install::cleanup::remove_from_current_dependents_lists; -use crate::net::interface::InterfaceId; use crate::net::net_controller::NetService; use crate::procedure::docker::{DockerContainer, DockerProcedure, LongRunning}; -use crate::procedure::{NoOutput, PackageProcedure, ProcedureName}; +use crate::procedure::{NoOutput, ProcedureName}; use crate::s9pk::manifest::Manifest; use crate::status::MainStatus; use crate::util::NonDetachingJoinHandle; @@ -69,7 +66,6 @@ pub const HEALTH_CHECK_GRACE_PERIOD_SECONDS: u64 = 5; type ManagerPersistentContainer = Arc>; type BackupGuard = Arc>>; -type BackupTask = Result; pub enum BackupReturn { Error(Error), AlreadyRunning(PackageBackupReport), @@ -94,7 +90,7 @@ impl Default for Gid { } impl Gid { pub fn new_gid(&self) -> ProcessGroupId { - let previous; + let mut previous = 0; self.next_gid.send_modify(|x| { previous = *x; *x = previous + 1; @@ -104,13 +100,13 @@ impl Gid { pub fn new_main_gid(&self) -> ProcessGroupId { let gid = self.new_gid(); - self.main_gid.send(gid); + self.main_gid.send(gid).unwrap(); gid } } #[derive(Clone)] -struct Manager { +pub struct Manager { seed: Arc, manage_container: Arc, @@ -120,16 +116,11 @@ struct Manager { pub gid: Arc, } impl Manager { - pub async fn new( - ctx: RpcContext, - manifest: Manifest, - tor_keys: BTreeMap, - ) -> Result { + pub async fn new(ctx: RpcContext, manifest: Manifest) -> Result { let seed = Arc::new(ManagerSeed { ctx, container_name: DockerProcedure::container_name(&manifest.id, None), manifest, - tor_keys, }); let persistent_container = Arc::new(PersistentContainer::init(&seed).await?); @@ -148,15 +139,13 @@ impl Manager { }) } - pub fn start(&self) -> Result<(), Error> { + pub fn start(&self) { self._transition_abort(); self.manage_container.to_desired(StartStop::Start); - Ok(()) } - pub fn stop(&self) -> Result<(), Error> { + pub fn stop(&self) { self._transition_abort(); self.manage_container.to_desired(StartStop::Stop); - Ok(()) } pub async fn restart(&self) { if self._is_transition_restart() { @@ -176,17 +165,15 @@ impl Manager { let (transition_state, done) = configure(context, id, configure_context).remote_handle(); self._transition_replace({ - let seed = self.seed.clone(); let manage_container = self.manage_container.clone(); TransitionState::Configuring(tokio::spawn(async move { let desired_state = manage_container.desired_state(); let state_reverter = DesiredStateReverter::new(manage_container.clone()); - let starting_desired = desired_state.borrow().clone(); let mut current_state = manage_container.current_state(); manage_container.to_desired(StartStop::Stop); while current_state.borrow().is_start() { - current_state.changed().await; + current_state.changed().await.unwrap(); } transition_state.await; @@ -211,7 +198,7 @@ impl Manager { let mut current_status = self.manage_container.current_state(); while current_status.borrow().is_start() { - current_status.changed().await; + current_status.changed().await.unwrap(); } } @@ -223,7 +210,9 @@ impl Manager { } pub fn rpc_client(&self) -> Option> { - self.persistent_container.clone().map(|x| x.rpc_client()) + (*self.persistent_container) + .as_ref() + .map(|x| x.rpc_client()) } fn _transition_abort(&self) { @@ -244,34 +233,30 @@ impl Manager { let transition = self.transition.clone(); let manage_container = self.manage_container.clone(); TransitionState::Restarting(tokio::spawn(async move { - let mut desired_state = manage_container.desired_state(); let mut current_state = manage_container.current_state(); let _ = manage_container.set_override(Some(MainStatus::Restarting)); manage_container.to_desired(StartStop::Stop); while current_state.borrow().is_start() { - current_state.changed().await; + current_state.changed().await.unwrap(); } manage_container.to_desired(StartStop::Start); while current_state.borrow().is_stop() { - current_state.changed().await; + current_state.changed().await.unwrap(); } transition.send_replace(Default::default()); })) } fn _transition_backup( &self, - mut backup_guard: BackupGuard, + backup_guard: BackupGuard, ) -> (TransitionState, BoxFuture) { - let transition = self.transition.clone(); let manage_container = self.manage_container.clone(); let seed = self.seed.clone(); let (send, done) = oneshot::channel(); ( TransitionState::BackingUp(tokio::spawn( async move { - let mut desired_state = manage_container.desired_state(); let state_reverter = DesiredStateReverter::new(manage_container.clone()); - let starting_desired = desired_state.borrow().clone(); let mut current_state = manage_container.current_state(); let mut tx = seed.ctx.db.handle(); let _ = manage_container.set_override(Some( @@ -279,7 +264,7 @@ impl Manager { )); manage_container.to_desired(StartStop::Stop); while current_state.borrow().is_start() { - current_state.changed().await; + current_state.changed().await.unwrap(); } let backup_guard = backup_guard.lock().await; @@ -333,8 +318,8 @@ fn configure( mut configure_context: ConfigureContext, ) -> BoxFuture<'static, Result, Error>> { async move { - let db = ctx.db.handle(); - let tx = db.begin().await?; + let mut db = ctx.db.handle(); + let mut tx = db.begin().await?; let db = &mut tx; let receipts = ConfigReceipts::new(db).await?; @@ -353,11 +338,6 @@ fn configure( .await? .ok_or_else(not_found)?; let volumes = receipts.volumes.get(db, id).await?.ok_or_else(not_found)?; - let is_needs_config = !receipts - .configured - .get(db, id) - .await? - .ok_or_else(not_found)?; let version = receipts.version.get(db, id).await?.ok_or_else(not_found)?; // get current config and current spec @@ -394,7 +374,7 @@ fn configure( // create backreferences to pointers let mut sys = receipts .system_pointers - .get(db, &id) + .get(db, id) .await? .ok_or_else(not_found)?; sys.truncate(0); @@ -431,7 +411,7 @@ fn configure( ValueSpecPointer::System(s) => sys.push(s), } } - receipts.system_pointers.set(db, sys, &id).await?; + receipts.system_pointers.set(db, sys, id).await?; let signal = if !configure_context.dry_run { // run config action @@ -475,7 +455,7 @@ fn configure( // update dependencies let prev_current_dependencies = receipts .current_dependencies - .get(db, &id) + .get(db, id) .await? .unwrap_or_default(); remove_from_current_dependents_lists( @@ -495,7 +475,7 @@ fn configure( current_dependencies.0.remove(id); receipts .current_dependencies - .set(db, current_dependencies.clone(), &id) + .set(db, current_dependencies.clone(), id) .await?; let errs = receipts @@ -512,7 +492,7 @@ fn configure( &receipts.dependency_receipt.try_heal, ) .await?; - receipts.dependency_errors.set(db, errs, &id).await?; + receipts.dependency_errors.set(db, errs, id).await?; // cache current config for dependents configure_context @@ -525,22 +505,18 @@ fn configure( .get(db, id) .await? .ok_or_else(not_found)?; - let prev = if is_needs_config { None } else { old_config } - .map(Value::Object) - .unwrap_or_default(); - let next = Value::Object(config.clone()); - for (dependent, dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) { - let dependent_container = receipts.docker_containers.get(db, &dependent).await?; + for (dependent, _dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) { + let dependent_container = receipts.docker_containers.get(db, dependent).await?; let dependent_container = &dependent_container; // check if config passes dependent check if let Some(cfg) = receipts .manifest_dependencies_config - .get(db, (&dependent, &id)) + .get(db, (dependent, id)) .await? { let manifest = receipts .manifest - .get(db, &dependent) + .get(db, dependent) .await? .ok_or_else(not_found)?; if let Err(error) = cfg @@ -605,7 +581,7 @@ struct DesiredStateReverter { } impl DesiredStateReverter { fn new(manage_container: Arc) -> Self { - let starting_state = manage_container.desired_state().borrow().clone(); + let starting_state = *manage_container.desired_state().borrow(); let manage_container = Some(manage_container); Self { starting_state, @@ -614,8 +590,8 @@ impl DesiredStateReverter { } async fn revert(mut self) { if let Some(mut current_state) = self._revert() { - while &*current_state.borrow() != &self.starting_state { - current_state.changed().await; + while *current_state.borrow() != self.starting_state { + current_state.changed().await.unwrap(); } } } @@ -646,7 +622,8 @@ fn finnish_up_backup_task( send.send(match result { Ok(a) => a, Err(e) => Err(e), - }); + }) + .unwrap_or_default(); } .boxed() } @@ -659,7 +636,7 @@ fn response_to_report(response: &Result) -> PackageBac } fn flatten_backup_error(input: Result, Error>) -> BackupReturn { match input { - Ok(a) | Ok(a) => BackupReturn::Ran { + Ok(a) => BackupReturn::Ran { report: response_to_report(&a), res: a, }, @@ -675,18 +652,6 @@ pub enum Status { Paused, Shutdown, } -pub struct ManagerSharedState { - seed: Arc, - persistent_container: Option, - status: (Sender, Receiver), - killer: Notify, - on_stop: Sender, - synchronized: Notify, - synchronize_now: Notify, - commit_health_check_results: AtomicBool, - next_gid: AtomicU32, - main_gid: (Sender, Receiver), -} #[derive(Debug, Clone, Copy)] pub enum OnStop { @@ -703,8 +668,6 @@ async fn run_main( persistent_container: ManagerPersistentContainer, started: Arc, ) -> RunMainResult { - let interfaces = main_interfaces(&seed)?; - let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image(seed.clone()))); let ip = match persistent_container.is_some() { false => Some(match get_running_ip(&seed, &mut runtime).await { @@ -749,35 +712,6 @@ async fn start_up_image(seed: Arc) -> Result Result< - Vec<( - InterfaceId, - &crate::net::interface::Interface, - TorSecretKeyV3, - )>, - Error, -> { - seed.manifest - .interfaces - .0 - .iter() - .map(|(id, info)| { - Ok(( - id.clone(), - info, - seed.tor_keys - .get(id) - .ok_or_else(|| { - Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor) - })? - .clone(), - )) - }) - .collect::, Error>>() -} - async fn long_running_docker( seed: &ManagerSeed, container: &DockerContainer, @@ -896,17 +830,11 @@ async fn main_health_check_daemon(seed: Arc) { } } -fn set_commit_health_true(state: &Arc) { - state - .commit_health_check_results - .store(true, Ordering::SeqCst); -} - type RuntimeOfCommand = NonDetachingJoinHandle, Error>>; async fn get_running_ip(seed: &ManagerSeed, mut runtime: &mut RuntimeOfCommand) -> GetRunningIp { loop { - match container_inspect(&seed).await { + match container_inspect(seed).await { Ok(res) => { match res .network_settings @@ -968,7 +896,7 @@ async fn send_signal( // .store(false, Ordering::SeqCst); if let Some(rpc_client) = rpc_client { - let main_gid = gid.main_gid.borrow().clone(); + let main_gid = *gid.main_gid.borrow(); let next_gid = gid.new_gid(); #[cfg(feature = "js_engine")] if let Err(e) = crate::procedure::js_scripts::JsProcedure::default() @@ -985,6 +913,7 @@ async fn send_signal( None, // TODO next_gid, Some(rpc_client), + Arc::new(seed.ctx.clone()), ) .await? { diff --git a/backend/src/manager/persistent_container.rs b/backend/src/manager/persistent_container.rs index 45863c1dc..06c2e6cd3 100644 --- a/backend/src/manager/persistent_container.rs +++ b/backend/src/manager/persistent_container.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use std::time::Duration; use color_eyre::eyre::eyre; -use embassy_container_init::ProcessGroupId; use helpers::UnixRpcClient; use tokio::sync::oneshot; use tokio::sync::watch::{self, Receiver}; @@ -11,7 +10,7 @@ use tracing::instrument; use super::manager_seed::ManagerSeed; use super::{ add_network_for_main, generate_certificate, get_long_running_ip, long_running_docker, - main_interfaces, remove_network_for_main, GetRunningIp, + remove_network_for_main, GetRunningIp, }; use crate::procedure::docker::DockerContainer; use crate::util::NonDetachingJoinHandle; @@ -53,8 +52,7 @@ pub async fn spawn_persistent_container( let mut send_inserter: Option>>> = Some(send_inserter); loop { if let Err(e) = async { - let interfaces = main_interfaces(&*seed)?; - let generated_certificate = generate_certificate(&*seed, &interfaces).await?; + let generated_certificate = generate_certificate(&*seed).await?; let (mut runtime, inserter) = long_running_docker(&seed, &container).await?; @@ -67,7 +65,7 @@ pub async fn spawn_persistent_container( return Ok(()); } }; - add_network_for_main(&*seed, ip, interfaces, generated_certificate).await?; + add_network_for_main(&*seed, ip, generated_certificate).await?; if let Some(inserter_send) = inserter_send.as_mut() { let _ = inserter_send.send(Arc::new(inserter)); diff --git a/backend/src/manager/start_stop.rs b/backend/src/manager/start_stop.rs index f242d079e..c79b52403 100644 --- a/backend/src/manager/start_stop.rs +++ b/backend/src/manager/start_stop.rs @@ -1,7 +1,7 @@ use crate::status::MainStatus; #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub(crate) enum StartStop { +pub enum StartStop { Start, Stop, } diff --git a/backend/src/procedure/js_scripts.rs b/backend/src/procedure/js_scripts.rs index 3494806b3..52787f4af 100644 --- a/backend/src/procedure/js_scripts.rs +++ b/backend/src/procedure/js_scripts.rs @@ -4,7 +4,7 @@ use std::time::Duration; use color_eyre::eyre::eyre; use embassy_container_init::{ProcessGroupId, SignalGroup, SignalGroupParams}; -use helpers::UnixRpcClient; +use helpers::{Callback, OsApi, UnixRpcClient}; pub use js_engine::JsError; use js_engine::{JsExecutionEnvironment, PathForVolumeId}; use models::{ErrorKind, VolumeId}; @@ -19,6 +19,18 @@ use crate::util::{GeneralGuard, Version}; use crate::volume::Volumes; use crate::Error; +#[async_trait::async_trait] +impl OsApi for RpcContext { + async fn get_service_config( + &self, + id: PackageId, + path: &str, + callback: Callback, + ) -> Result { + todo!() + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] @@ -69,6 +81,7 @@ impl JsProcedure { timeout: Option, gid: ProcessGroupId, rpc_client: Option>, + os: Arc, ) -> Result, Error> { let cleaner_client = rpc_client.clone(); let cleaner = GeneralGuard::new(move || { @@ -87,6 +100,7 @@ impl JsProcedure { }); let res = async move { let running_action = JsExecutionEnvironment::load_from_package( + os, directory, pkg_id, pkg_version, @@ -124,6 +138,7 @@ impl JsProcedure { ) -> Result, Error> { Ok(async move { let running_action = JsExecutionEnvironment::load_from_package( + Arc::new(ctx.clone()), &ctx.datadir, pkg_id, pkg_version, @@ -212,6 +227,7 @@ async fn js_action_execute() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() @@ -269,6 +285,7 @@ async fn js_action_execute_error() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap(); @@ -315,6 +332,7 @@ async fn js_action_fetch() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() @@ -363,6 +381,7 @@ async fn js_test_slow() { timeout, ProcessGroupId(0), None, + None, ) => { a.unwrap().unwrap(); }, _ = tokio::time::sleep(Duration::from_secs(1)) => () } @@ -412,6 +431,7 @@ async fn js_action_var_arg() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() @@ -458,6 +478,7 @@ async fn js_action_test_rename() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() @@ -504,6 +525,7 @@ async fn js_action_test_deep_dir() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() @@ -549,6 +571,7 @@ async fn js_action_test_deep_dir_escape() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() @@ -594,6 +617,7 @@ async fn js_action_test_zero_dir() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() @@ -685,6 +709,7 @@ async fn js_rsync() { timeout, ProcessGroupId(0), None, + None, ) .await .unwrap() diff --git a/backend/src/procedure/mod.rs b/backend/src/procedure/mod.rs index 7eefcc662..a89ae7b9b 100644 --- a/backend/src/procedure/mod.rs +++ b/backend/src/procedure/mod.rs @@ -1,4 +1,5 @@ use std::collections::BTreeSet; +use std::sync::Arc; use std::time::Duration; use color_eyre::eyre::eyre; @@ -83,26 +84,24 @@ impl PackageProcedure { } #[cfg(feature = "js_engine")] PackageProcedure::Script(procedure) => { - let (gid, rpc_client) = match ctx + let man = ctx .managers .get(&(pkg_id.clone(), pkg_version.clone())) .await - { - None => { - return Err(Error::new( + .ok_or_else(|| { + Error::new( eyre!("No manager found for {}", pkg_id), ErrorKind::NotFound, - )) - } - Some(man) => ( - if matches!(name, ProcedureName::Main) { - man.gid.new_main_gid() - } else { - man.gid.new_gid() - }, - man.rpc_client(), - ), - }; + ) + })?; + let gid; + let rpc_client = man.rpc_client(); + let os = Arc::new(ctx.clone()); + if matches!(name, ProcedureName::Main) { + gid = man.gid.new_main_gid(); + } else { + gid = man.gid.new_gid(); + } procedure .execute( @@ -115,6 +114,7 @@ impl PackageProcedure { timeout, gid, rpc_client, + os, ) .await } diff --git a/backend/src/util/mod.rs b/backend/src/util/mod.rs index 03075735d..52686fd27 100644 --- a/backend/src/util/mod.rs +++ b/backend/src/util/mod.rs @@ -280,8 +280,8 @@ impl T, T> Drop for GeneralGuard { pub struct GeneralBoxedGuard(Option ()>>); impl GeneralBoxedGuard { - pub fn new(f: impl FnOnce() -> ()) -> Self { - GeneralBoxedGuard(Some(f.boxed())) + pub fn new(f: impl FnOnce() -> () + 'static) -> Self { + GeneralBoxedGuard(Some(Box::new(f))) } pub fn drop(mut self) -> () { diff --git a/libs/Cargo.lock b/libs/Cargo.lock index e70d1b5f8..7ca047356 100644 --- a/libs/Cargo.lock +++ b/libs/Cargo.lock @@ -110,9 +110,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.61" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", @@ -1304,6 +1304,7 @@ dependencies = [ name = "helpers" version = "0.1.0" dependencies = [ + "async-trait", "color-eyre", "futures", "lazy_async_pool", @@ -1649,6 +1650,7 @@ dependencies = [ "helpers", "itertools 0.10.5", "models", + "pin-project", "reqwest", "serde", "serde_json", diff --git a/libs/helpers/Cargo.toml b/libs/helpers/Cargo.toml index 02258e8fb..bc82a968b 100644 --- a/libs/helpers/Cargo.toml +++ b/libs/helpers/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.64" color-eyre = "0.6.2" futures = "0.3.21" lazy_async_pool = "0.3.3" diff --git a/libs/helpers/src/lib.rs b/libs/helpers/src/lib.rs index f20cd400f..f4290b795 100644 --- a/libs/helpers/src/lib.rs +++ b/libs/helpers/src/lib.rs @@ -10,10 +10,12 @@ use tokio::sync::oneshot; use tokio::task::{JoinError, JoinHandle, LocalSet}; mod byte_replacement_reader; +mod os_api; mod rpc_client; mod rsync; mod script_dir; pub use byte_replacement_reader::*; +pub use os_api::*; pub use rpc_client::{RpcClient, UnixRpcClient}; pub use rsync::*; pub use script_dir::*; diff --git a/libs/helpers/src/os_api.rs b/libs/helpers/src/os_api.rs new file mode 100644 index 000000000..63e35f6b3 --- /dev/null +++ b/libs/helpers/src/os_api.rs @@ -0,0 +1,17 @@ +use models::Error; +use models::PackageId; +use serde_json::Value; + +pub struct RuntimeDropped; + +pub type Callback = Box Result<(), RuntimeDropped> + Send + Sync + 'static>; // bool indicating if + +#[async_trait::async_trait] +pub trait OsApi: Send + Sync + 'static { + async fn get_service_config( + &self, + id: PackageId, + path: &str, + callback: Callback, + ) -> Result; +} diff --git a/libs/js_engine/Cargo.toml b/libs/js_engine/Cargo.toml index d2f25bbc5..15154b6f7 100644 --- a/libs/js_engine/Cargo.toml +++ b/libs/js_engine/Cargo.toml @@ -43,3 +43,4 @@ serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" tokio = { version = "1", features = ["full"] } tracing = "0.1" +pin-project = "1" diff --git a/libs/js_engine/src/artifacts/loadModule.js b/libs/js_engine/src/artifacts/loadModule.js index 7dfd4a03e..ba39e837f 100644 --- a/libs/js_engine/src/artifacts/loadModule.js +++ b/libs/js_engine/src/artifacts/loadModule.js @@ -193,6 +193,18 @@ const diskUsage = async ({ return { used, total } } +const callbackMapping = {} +const registerCallback = (fn) => { + const uuid = generateUuid(); // TODO + callbackMapping[uuid] = fn; + return uuid +} +const runCallback = (uuid, data) => callbackMapping[uuid](data) + +const getServiceConfig = async (serviceId, configPath, onChange) => { + await Deno.core.opAsync("get_service_config", serviceId, configPath, registerCallback(onChange)) +} + const currentFunction = Deno.core.opSync("current_function"); const input = Deno.core.opSync("get_input"); const variable_args = Deno.core.opSync("get_variable_args"); @@ -223,6 +235,7 @@ const effects = { runRsync, readDir, diskUsage, + getServiceConfig, }; const defaults = { diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index b00f4c5eb..c2d196c39 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; +use std::future::Future; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; +use std::task::Poll; use std::time::SystemTime; use deno_core::anyhow::{anyhow, bail}; @@ -11,12 +13,12 @@ use deno_core::{ ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot, }; use embassy_container_init::ProcessGroupId; -use helpers::{script_dir, spawn_local, Rsync, UnixRpcClient}; +use helpers::{script_dir, spawn_local, OsApi, Rsync, UnixRpcClient}; use models::{PackageId, ProcedureName, Version, VolumeId}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::io::AsyncReadExt; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; pub trait PathForVolumeId: Send + Sync { fn path_for( @@ -87,6 +89,7 @@ const SNAPSHOT_BYTES: &[u8] = include_bytes!("./artifacts/ARM_JS_SNAPSHOT.bin"); #[derive(Clone)] struct JsContext { sandboxed: bool, + os: Arc, datadir: PathBuf, run_function: String, version: Version, @@ -97,6 +100,7 @@ struct JsContext { container_process_gid: ProcessGroupId, container_rpc_client: Option>, rsyncs: Arc)>>, + callback_sender: mpsc::UnboundedSender<(String, Value)>, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "kebab-case")] @@ -178,6 +182,7 @@ impl ModuleLoader for ModsLoader { pub struct JsExecutionEnvironment { sandboxed: bool, + os: Arc, base_directory: PathBuf, module_loader: ModsLoader, package_id: PackageId, @@ -189,13 +194,14 @@ pub struct JsExecutionEnvironment { impl JsExecutionEnvironment { pub async fn load_from_package( + os: Arc, data_directory: impl AsRef, package_id: &PackageId, version: &Version, volumes: Box, container_process_gid: ProcessGroupId, container_rpc_client: Option>, - ) -> Result { + ) -> Result { let data_dir = data_directory.as_ref(); let base_directory = data_dir; let js_code = JsCode({ @@ -222,6 +228,7 @@ impl JsExecutionEnvironment { buffer }); Ok(JsExecutionEnvironment { + os, base_directory: base_directory.to_owned(), module_loader: ModsLoader { code: js_code }, package_id: package_id.clone(), @@ -303,6 +310,7 @@ impl JsExecutionEnvironment { fns::rsync::decl(), fns::rsync_wait::decl(), fns::rsync_progress::decl(), + fns::get_service_config::decl(), ] } @@ -315,7 +323,9 @@ impl JsExecutionEnvironment { let base_directory = self.base_directory.clone(); let answer_state = AnswerState::default(); let ext_answer_state = answer_state.clone(); + let (callback_sender, callback_receiver) = mpsc::unbounded_channel(); let js_ctx = JsContext { + os: self.os, datadir: base_directory, run_function: procedure_name .js_function_name() @@ -334,6 +344,7 @@ impl JsExecutionEnvironment { variable_args, container_process_gid: self.container_process_gid, container_rpc_client: self.container_rpc_client.clone(), + callback_sender, rsyncs: Default::default(), }; let ext = Extension::builder() @@ -352,16 +363,14 @@ impl JsExecutionEnvironment { startup_snapshot: Some(Snapshot::Static(SNAPSHOT_BYTES)), ..Default::default() }; - let runtime = Arc::new(Mutex::new(JsRuntime::new(runtime_options))); + let mut runtime = JsRuntime::new(runtime_options); let future = async move { let mod_id = runtime - .lock() - .await .load_main_module(&"file:///loadModule.js".parse().unwrap(), None) .await?; - let evaluated = runtime.lock().await.mod_evaluate(mod_id); - let res = runtime.lock().await.run_event_loop(false).await; + let evaluated = runtime.mod_evaluate(mod_id); + let res = run_event_loop(&mut runtime, callback_receiver).await; res?; evaluated.await??; Ok::<_, AnyError>(()) @@ -377,6 +386,41 @@ impl JsExecutionEnvironment { } } +#[pin_project::pin_project] +struct RuntimeEventLoop<'a> { + runtime: &'a mut JsRuntime, + callback: mpsc::UnboundedReceiver<(String, Value)>, +} +impl<'a> Future for RuntimeEventLoop<'a> { + type Output = Result<(), AnyError>; + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + if let Poll::Ready(Some((uuid, value))) = this.callback.poll_recv(cx) { + match this + .runtime + .execute_script("callback", &format!("runCallback({uuid}, {value})")) + { + Ok(_) => (), + Err(e) => return Poll::Ready(Err(e)), + } + } + this.runtime.poll_event_loop(cx, false) + } +} +async fn run_event_loop( + runtime: &mut JsRuntime, + callback_receiver: mpsc::UnboundedReceiver<(String, Value)>, +) -> Result<(), AnyError> { + RuntimeEventLoop { + runtime, + callback: callback_receiver, + } + .await +} + /// Note: Make sure that we have the assumption that all these methods are callable at any time, and all call restrictions should be in rust mod fns { use std::cell::RefCell; @@ -396,9 +440,9 @@ mod fns { OutputParams, OutputStrategy, ProcessGroupId, ProcessId, RunCommand, RunCommandParams, SendSignal, SendSignalParams, SignalGroup, SignalGroupParams, }; - use helpers::{to_tmp_path, AtomicFile, Rsync, RsyncOptions}; + use helpers::{to_tmp_path, AtomicFile, Rsync, RsyncOptions, RuntimeDropped}; use itertools::Itertools; - use models::{ErrorKind, VolumeId}; + use models::{PackageId, VolumeId}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::io::AsyncWriteExt; @@ -1373,6 +1417,37 @@ mod fns { tokio::fs::set_permissions(new_file, Permissions::from_mode(mode)).await?; Ok(()) } + + #[op] + async fn get_service_config( + state: Rc>, + service_id: PackageId, + path: String, + callback: String, + ) -> Result { + let state = state.borrow(); + let ctx = state.borrow::(); + let sender = ctx.callback_sender.clone(); + Ok( + match ctx + .os + .get_service_config( + service_id, + &path, + Box::new(move |value| { + sender + .send((callback.clone(), value)) + .map_err(|_| RuntimeDropped) + }), + ) + .await + { + Ok(a) => ResultType::Result(a), + Err(e) => ResultType::ErrorCode(e.kind as i32, e.source.to_string()), + }, + ) + } + /// We need to make sure that during the file accessing, we don't reach beyond our scope of control async fn is_subset( parent: impl AsRef,