diff --git a/.github/workflows/debian.yaml b/.github/workflows/debian.yaml index d1f696789..748cd3f73 100644 --- a/.github/workflows/debian.yaml +++ b/.github/workflows/debian.yaml @@ -15,6 +15,16 @@ jobs: - uses: actions/checkout@v3 with: repository: Start9Labs/embassy-os-deb + + - uses: actions/checkout@v3 + with: + submodules: recursive + path: embassyos-0.3.x + - run: | + cp -r debian embassyos-0.3.x/ + VERSION=0.3.x ./control.sh + cp embassyos-0.3.x/backend/embassyd.service embassyos-0.3.x/debian/embassyos.embassyd.service + cp embassyos-0.3.x/backend/embassy-init.service embassyos-0.3.x/debian/embassyos.embassy-init.service - uses: actions/setup-node@v3 with: diff --git a/.github/workflows/pureos-iso.yaml b/.github/workflows/pureos-iso.yaml index 4c51259bd..9d0ed850a 100644 --- a/.github/workflows/pureos-iso.yaml +++ b/.github/workflows/pureos-iso.yaml @@ -7,7 +7,10 @@ on: branches: - master - next - - feature/iso-ci + pull_request: + branches: + - master + - next jobs: dpkg: diff --git a/backend/Cargo.lock b/backend/Cargo.lock index cf0630b07..9c1a9b4d1 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1281,6 +1281,10 @@ dependencies = [ "async-stream", "color-eyre", "futures", + "helpers", + "imbl 2.0.0", + "nix 0.25.0", + "procfs", "serde", "serde_json", "tokio", @@ -1289,6 +1293,7 @@ dependencies = [ "tracing-error 0.2.0", "tracing-futures", "tracing-subscriber 0.3.16", + "yajrc 0.1.0 (git+https://github.com/dr-bonez/yajrc.git?branch=develop)", ] [[package]] @@ -1382,6 +1387,27 @@ dependencies = [ "termcolor", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -1747,9 +1773,11 @@ dependencies = [ "models", "pin-project", "serde", + "serde_json", "tokio", "tokio-stream", "tracing", + "yajrc 0.1.0 (git+https://github.com/dr-bonez/yajrc.git?branch=develop)", ] [[package]] @@ -2030,6 +2058,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "io-lifetimes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ce5ef949d49ee85593fc4d3f3f95ad61657076395cbbce23e2121fc5542074" + [[package]] name = "ipnet" version = "2.5.1" @@ -2336,6 +2370,12 @@ dependencies = [ "cc", ] +[[package]] +name = "linux-raw-sys" +version = "0.0.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" + [[package]] name = "lock_api" version = "0.4.9" @@ -2458,7 +2498,6 @@ dependencies = [ "bollard", "color-eyre", "ed25519-dalek", - "embassy_container_init", "emver", "mbrman", "openssl", @@ -3110,6 +3149,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfb6451c91904606a1abe93e83a8ec851f45827fa84273f256ade45dc095818" +dependencies = [ + "bitflags", + "byteorder", + "chrono", + "flate2", + "hex", + "lazy_static", + "rustix", +] + [[package]] name = "proptest" version = "1.0.0" @@ -3452,7 +3506,7 @@ dependencies = [ "thiserror", "tokio", "url", - "yajrc", + "yajrc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3519,6 +3573,20 @@ dependencies = [ "semver 1.0.14", ] +[[package]] +name = "rustix" +version = "0.35.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727a1a6d65f786ec22df8a81ca3121107f235970dc1705ed681d3e6e8b9cd5f9" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.42.0", +] + [[package]] name = "rustls" version = "0.20.7" @@ -5593,6 +5661,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "yajrc" +version = "0.1.0" +source = "git+https://github.com/dr-bonez/yajrc.git?branch=develop#72a22f7ac2197d7a5cdce4be601cf20e5280eec5" +dependencies = [ + "anyhow", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "zeroize" version = "1.5.7" diff --git a/backend/src/bin/embassyd.rs b/backend/src/bin/embassyd.rs index 5487c62d3..d4b1e11c0 100644 --- a/backend/src/bin/embassyd.rs +++ b/backend/src/bin/embassyd.rs @@ -4,8 +4,6 @@ use std::time::Duration; use color_eyre::eyre::eyre; use embassy::context::{DiagnosticContext, RpcContext}; - -use embassy::hostname::get_current_ip; use embassy::net::embassy_service_http_server::EmbassyServiceHTTPServer; #[cfg(feature = "avahi")] use embassy::net::mdns::MdnsController; diff --git a/backend/src/context/cli.rs b/backend/src/context/cli.rs index 15e29fd22..7ca6a6b5b 100644 --- a/backend/src/context/cli.rs +++ b/backend/src/context/cli.rs @@ -1,6 +1,6 @@ use std::fs::File; use std::io::BufReader; -use std::net::{Ipv4Addr, SocketAddr}; +use std::net::Ipv4Addr; use std::path::{Path, PathBuf}; use std::sync::Arc; diff --git a/backend/src/install/cleanup.rs b/backend/src/install/cleanup.rs index c65187361..06b5e50bb 100644 --- a/backend/src/install/cleanup.rs +++ b/backend/src/install/cleanup.rs @@ -1,4 +1,6 @@ -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; use bollard::image::ListImagesOptions; use patch_db::{DbHandle, LockReceipt, LockTargetId, LockType, PatchDbHandle, Verifier}; diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 00c8b7457..e4e8cbfbe 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "256"] + pub const DEFAULT_MARKETPLACE: &str = "https://registry.start9.com"; pub const COMMUNITY_MARKETPLACE: &str = "https://community-registry.start9.com"; pub const BUFFER_SIZE: usize = 1024; diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index baa58c78d..d356fe08d 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -1,25 +1,21 @@ use std::collections::BTreeMap; -use std::convert::TryInto; use std::future::Future; use std::net::Ipv4Addr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; use bollard::container::{KillContainerOptions, StopContainerOptions}; use color_eyre::eyre::eyre; -use embassy_container_init::{InputJsonRpc, RpcId}; -use models::{ExecCommand, TermCommand}; +use embassy_container_init::{ProcessGroupId, SignalGroupParams}; +use helpers::RpcClient; use nix::sys::signal::Signal; -use num_enum::TryFromPrimitive; use patch_db::DbHandle; use sqlx::{Executor, Postgres}; -use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch::error::RecvError; use tokio::sync::watch::{channel, Receiver, Sender}; -use tokio::sync::{Mutex, Notify, RwLock}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::sync::{oneshot, Notify, RwLock}; use torut::onion::TorSecretKeyV3; use tracing::instrument; @@ -27,14 +23,12 @@ use crate::context::RpcContext; use crate::manager::sync::synchronizer; use crate::net::interface::InterfaceId; use crate::net::GeneratedCertificateMountPoint; -use crate::notifications::NotificationLevel; use crate::procedure::docker::{DockerContainer, DockerProcedure, LongRunning}; #[cfg(feature = "js_engine")] use crate::procedure::js_scripts::JsProcedure; use crate::procedure::{NoOutput, PackageProcedure, ProcedureName}; use crate::s9pk::manifest::{Manifest, PackageId}; -use crate::status::MainStatus; -use crate::util::{Container, NonDetachingJoinHandle, Version}; +use crate::util::{ApplyRef, Container, NonDetachingJoinHandle, Version}; use crate::Error; pub mod health; @@ -153,29 +147,35 @@ impl ManagerMap { pub struct Manager { shared: Arc, thread: Container>, - persistant_container: Arc, } -#[derive(TryFromPrimitive)] -#[repr(usize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Status { - Starting = 0, - Running = 1, - Stopped = 2, - Paused = 3, - Shutdown = 4, + Starting, + Running, + Stopped, + Paused, + Shutdown, } -pub struct ManagerSharedState { +struct ManagerSeed { ctx: RpcContext, - status: AtomicUsize, - on_stop: Sender, manifest: Manifest, container_name: String, tor_keys: BTreeMap, +} + +pub struct ManagerSharedState { + seed: Arc, + persistent_container: Option, + status: (Sender, Receiver), + killer: Notify, + on_stop: Sender, synchronized: Notify, synchronize_now: Notify, commit_health_check_results: AtomicBool, + next_gid: AtomicU32, + main_gid: (Sender, Receiver), } #[derive(Debug, Clone, Copy)] @@ -185,32 +185,29 @@ pub enum OnStop { Exit, } -#[instrument(skip(state, persistant))] +#[instrument(skip(state))] async fn run_main( state: &Arc, - persistant: Arc, ) -> Result, Error> { let rt_state = state.clone(); - let interfaces = states_main_interfaces(state)?; - let generated_certificate = generate_certificate(state, &interfaces).await?; + let interfaces = main_interfaces(&*state.seed)?; + let generated_certificate = generate_certificate(&*state.seed, &interfaces).await?; - persistant.wait_for_persistant().await; - let is_injectable_main = check_is_injectable_main(state); let mut runtime = NonDetachingJoinHandle::from(tokio::spawn(start_up_image( rt_state, generated_certificate, ))); - let ip = match is_injectable_main { + let ip = match state.persistent_container.is_some() { false => Some(match get_running_ip(state, &mut runtime).await { - GetRunninIp::Ip(x) => x, - GetRunninIp::Error(e) => return Err(e), - GetRunninIp::EarlyExit(x) => return Ok(x), + GetRunningIp::Ip(x) => x, + GetRunningIp::Error(e) => return Err(e), + GetRunningIp::EarlyExit(x) => return Ok(x), }), true => None, }; if let Some(ip) = ip { - add_network_for_main(state, ip, interfaces, generated_certificate).await?; + add_network_for_main(&*state.seed, ip, interfaces, generated_certificate).await?; } set_commit_health_true(state); @@ -219,10 +216,10 @@ async fn run_main( let res = tokio::select! { a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).and_then(|a| a), _ = health => Err(Error::new(eyre!("Health check daemon exited!"), crate::ErrorKind::Unknown)), - + _ = state.killer.notified() => Ok(Err((137, "Killed".to_string()))) }; if let Some(ip) = ip { - remove_network_for_main(state, ip).await?; + remove_network_for_main(&*state.seed, ip).await?; } res } @@ -234,14 +231,15 @@ async fn start_up_image( _generated_certificate: GeneratedCertificateMountPoint, ) -> Result, Error> { rt_state + .seed .manifest .main .execute::<(), NoOutput>( - &rt_state.ctx, - &rt_state.manifest.id, - &rt_state.manifest.version, + &rt_state.seed.ctx, + &rt_state.seed.manifest.id, + &rt_state.seed.manifest.version, ProcedureName::Main, - &rt_state.manifest.volumes, + &rt_state.seed.manifest.volumes, None, None, ) @@ -256,46 +254,633 @@ impl Manager { tor_keys: BTreeMap, ) -> Result { let (on_stop, recv) = channel(OnStop::Sleep); - let shared = Arc::new(ManagerSharedState { + let seed = Arc::new(ManagerSeed { ctx, - status: AtomicUsize::new(Status::Stopped as usize), - on_stop, container_name: DockerProcedure::container_name(&manifest.id, None), manifest, tor_keys, + }); + let persistent_container = PersistentContainer::init(&seed).await?; + let shared = Arc::new(ManagerSharedState { + seed, + persistent_container, + status: channel(Status::Stopped), + killer: Notify::new(), + on_stop, synchronized: Notify::new(), synchronize_now: Notify::new(), commit_health_check_results: AtomicBool::new(true), + next_gid: AtomicU32::new(1), + main_gid: channel(ProcessGroupId(0)), }); shared.synchronize_now.notify_one(); let thread_shared = shared.clone(); - let persistant_container = PersistantContainer::new(&thread_shared); - let managers_persistant = persistant_container.clone(); let thread = NonDetachingJoinHandle::from(tokio::spawn(async move { tokio::select! { - _ = manager_thread_loop(recv, &thread_shared, managers_persistant.clone()) => (), - _ = synchronizer(&*thread_shared, managers_persistant) => (), + _ = manager_thread_loop(recv, &thread_shared) => (), + _ = synchronizer(&*thread_shared) => (), } })); Ok(Manager { shared, - persistant_container, thread: Container::new(Some(thread)), }) } pub async fn signal(&self, signal: &Signal) -> Result<(), Error> { - // stop health checks from committing their results + send_signal(&self.shared, signal).await + } + + #[instrument(skip(self))] + async fn exit(&self) -> Result<(), Error> { self.shared .commit_health_check_results .store(false, Ordering::SeqCst); + let _ = self.shared.on_stop.send(OnStop::Exit); - // send signal to container + match self + .shared + .seed + .ctx + .docker + .stop_container( + &self.shared.seed.container_name, + Some(StopContainerOptions { + t: sigterm_timeout(&self.shared.seed.manifest) + .map(|d| d.as_secs()) + .unwrap_or(30) as i64, + }), + ) + .await + { + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 409, // CONFLICT + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 304, // NOT MODIFIED + .. + }) => (), // Already stopped + a => a?, + }; + + self.shared.killer.notify_waiters(); + + if let Some(thread) = self.thread.take().await { + thread.await.map_err(|e| { + Error::new( + eyre!("Manager thread panicked: {}", e), + crate::ErrorKind::Docker, + ) + })?; + } + Ok(()) + } + /// this will depend on locks to main status. if you hold any locks when calling this function that conflict, this will deadlock + pub async fn synchronize(&self) { + self.shared.synchronize_now.notify_waiters(); + self.shared.synchronized.notified().await + } + + pub fn new_gid(&self) -> ProcessGroupId { + ProcessGroupId( + self.shared + .next_gid + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + ) + } + + pub fn new_main_gid(&self) -> ProcessGroupId { + let gid = self.new_gid(); + self.shared.main_gid.0.send_modify(|x| *x = gid); + gid + } + + pub fn rpc_client(&self) -> Option> { self.shared + .persistent_container + .as_ref() + .map(|c| c.rpc_client.borrow().clone()) + } +} + +async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc) { + loop { + fn handle_stop_action<'a>( + recv: &'a mut Receiver, + ) -> ( + OnStop, + Option> + 'a>, + ) { + let val = *recv.borrow_and_update(); + match val { + OnStop::Sleep => (OnStop::Sleep, Some(recv.changed())), + a => (a, None), + } + } + let (stop_action, fut) = handle_stop_action(&mut recv); + match stop_action { + OnStop::Sleep => { + if let Some(fut) = fut { + let _ = thread_shared.status.0.send(Status::Stopped); + fut.await.unwrap(); + continue; + } + } + OnStop::Exit => { + let _ = thread_shared.status.0.send(Status::Shutdown); + break; + } + OnStop::Restart => { + let _ = thread_shared.status.0.send(Status::Running); + } + } + match run_main(thread_shared).await { + Ok(Ok(NoOutput)) => (), // restart + Ok(Err(e)) => { + #[cfg(feature = "unstable")] + { + use crate::notifications::NotificationLevel; + use crate::status::MainStatus; + let mut db = thread_shared.seed.ctx.db.handle(); + let started = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&thread_shared.seed.manifest.id) + .and_then(|pde| pde.installed()) + .map::<_, MainStatus>(|i| i.status().main()) + .get(&mut db, false) + .await; + match started.as_deref() { + Ok(Some(MainStatus::Running { .. })) => { + let res = thread_shared.seed.ctx.notification_manager + .notify( + &mut db, + Some(thread_shared.seed.manifest.id.clone()), + NotificationLevel::Warning, + String::from("Service Crashed"), + format!("The service {} has crashed with the following exit code: {}\nDetails: {}", thread_shared.seed.manifest.id.clone(), e.0, e.1), + (), + Some(3600) // 1 hour + ) + .await; + if let Err(e) = res { + tracing::error!("Failed to issue notification: {}", e); + tracing::debug!("{:?}", e); + } + } + _ => { + tracing::error!("service just started. not issuing crash notification") + } + } + } + tracing::error!("service crashed: {}: {}", e.0, e.1); + tokio::time::sleep(Duration::from_secs(15)).await; + } + Err(e) => { + tracing::error!("failed to start service: {}", e); + tracing::debug!("{:?}", e); + } + } + } +} + +pub struct PersistentContainer { + _running_docker: NonDetachingJoinHandle<()>, + rpc_client: Receiver>, +} + +impl PersistentContainer { + #[instrument(skip(seed))] + async fn init(seed: &Arc) -> Result, Error> { + Ok(if let Some(containers) = &seed.manifest.containers { + let (running_docker, rpc_client) = + spawn_persistent_container(seed.clone(), containers.main.clone()).await?; + Some(Self { + _running_docker: running_docker, + rpc_client, + }) + } else { + None + }) + } +} + +async fn spawn_persistent_container( + seed: Arc, + container: DockerContainer, +) -> Result<(NonDetachingJoinHandle<()>, Receiver>), Error> { + let (send_inserter, inserter) = oneshot::channel(); + Ok(( + tokio::task::spawn(async move { + let mut inserter_send: Option>> = None; + let mut send_inserter: Option>>> = Some(send_inserter); + loop { + if let Err(e) = async { + let interfaces = main_interfaces(&*seed)?; + let generated_certificate = generate_certificate(&*seed, &interfaces).await?; + let (mut runtime, inserter) = + long_running_docker(&seed, &container).await?; + + let ip = match get_long_running_ip(&*seed, &mut runtime).await { + GetRunningIp::Ip(x) => x, + GetRunningIp::Error(e) => return Err(e), + GetRunningIp::EarlyExit(e) => { + tracing::error!("Early Exit"); + tracing::debug!("{:?}", e); + return Ok(()); + } + }; + add_network_for_main(&*seed, ip, interfaces, generated_certificate).await?; + + if let Some(inserter_send) = inserter_send.as_mut() { + let _ = inserter_send.send(Arc::new(inserter)); + } else { + let (s, r) = channel(Arc::new(inserter)); + inserter_send = Some(s); + if let Some(send_inserter) = send_inserter.take() { + let _ = send_inserter.send(r); + } + } + + let res = tokio::select! { + a = runtime.running_output => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()), + }; + + remove_network_for_main(&*seed, ip).await?; + + res + }.await { + tracing::error!("Error in persistent container: {}", e); + tracing::debug!("{:?}", e); + } else { + break; + } + } + }) + .into(), + inserter.await.map_err(|_| Error::new(eyre!("Container handle dropped before inserter sent"), crate::ErrorKind::Unknown))?, + )) +} + +async fn long_running_docker( + seed: &ManagerSeed, + container: &DockerContainer, +) -> Result<(LongRunning, RpcClient), Error> { + container + .long_running_execute( + &seed.ctx, + &seed.manifest.id, + &seed.manifest.version, + &seed.manifest.volumes, + ) + .await +} + +async fn remove_network_for_main(seed: &ManagerSeed, ip: std::net::Ipv4Addr) -> Result<(), Error> { + seed.ctx + .net_controller + .remove( + &seed.manifest.id, + ip, + seed.manifest.interfaces.0.keys().cloned(), + ) + .await?; + Ok(()) +} + +fn fetch_starting_to_running(state: &Arc) { + let _ = state.status.0.send_modify(|x| { + if *x == Status::Starting { + *x = Status::Running; + } + }); +} + +async fn main_health_check_daemon(state: Arc) { + tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_GRACE_PERIOD_SECONDS)).await; + loop { + let mut db = state.seed.ctx.db.handle(); + if let Err(e) = health::check( + &state.seed.ctx, + &mut db, + &state.seed.manifest.id, + &state.commit_health_check_results, + ) + .await + { + tracing::error!( + "Failed to run health check for {}: {}", + &state.seed.manifest.id, + e + ); + tracing::debug!("{:?}", e); + } + tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await; + } +} + +fn set_commit_health_true(state: &Arc) { + state + .commit_health_check_results + .store(true, Ordering::SeqCst); +} + +async fn add_network_for_main( + seed: &ManagerSeed, + ip: std::net::Ipv4Addr, + interfaces: Vec<( + InterfaceId, + &crate::net::interface::Interface, + TorSecretKeyV3, + )>, + generated_certificate: GeneratedCertificateMountPoint, +) -> Result<(), Error> { + seed.ctx + .net_controller + .add(&seed.manifest.id, ip, interfaces, generated_certificate) + .await?; + Ok(()) +} + +enum GetRunningIp { + Ip(Ipv4Addr), + Error(Error), + EarlyExit(Result), +} + +type RuntimeOfCommand = NonDetachingJoinHandle, Error>>; + +async fn get_running_ip( + state: &Arc, + mut runtime: &mut RuntimeOfCommand, +) -> GetRunningIp { + loop { + match container_inspect(&*state.seed).await { + Ok(res) => { + match res + .network_settings + .and_then(|ns| ns.networks) + .and_then(|mut n| n.remove("start9")) + .and_then(|es| es.ip_address) + .filter(|ip| !ip.is_empty()) + .map(|ip| ip.parse()) + .transpose() + { + Ok(Some(ip_addr)) => return GetRunningIp::Ip(ip_addr), + Ok(None) => (), + Err(e) => return GetRunningIp::Error(e.into()), + } + } + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) => (), + Err(e) => return GetRunningIp::Error(e.into()), + } + if let Poll::Ready(res) = futures::poll!(&mut runtime) { + match res { + Ok(Ok(response)) => return GetRunningIp::EarlyExit(response), + Err(_) | Ok(Err(_)) => { + return GetRunningIp::Error(Error::new( + eyre!("Manager runtime panicked!"), + crate::ErrorKind::Docker, + )) + } + } + } + } +} + +async fn get_long_running_ip(seed: &ManagerSeed, runtime: &mut LongRunning) -> GetRunningIp { + loop { + match container_inspect(seed).await { + Ok(res) => { + match res + .network_settings + .and_then(|ns| ns.networks) + .and_then(|mut n| n.remove("start9")) + .and_then(|es| es.ip_address) + .filter(|ip| !ip.is_empty()) + .map(|ip| ip.parse()) + .transpose() + { + Ok(Some(ip_addr)) => return GetRunningIp::Ip(ip_addr), + Ok(None) => (), + Err(e) => return GetRunningIp::Error(e.into()), + } + } + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) => (), + Err(e) => return GetRunningIp::Error(e.into()), + } + if let Poll::Ready(res) = futures::poll!(&mut runtime.running_output) { + match res { + Ok(_) => return GetRunningIp::EarlyExit(Ok(NoOutput)), + Err(_e) => { + return GetRunningIp::Error(Error::new( + eyre!("Manager runtime panicked!"), + crate::ErrorKind::Docker, + )) + } + } + } + } +} + +async fn container_inspect( + seed: &ManagerSeed, +) -> Result { + seed.ctx + .docker + .inspect_container(&seed.container_name, None) + .await +} + +async fn generate_certificate( + seed: &ManagerSeed, + interfaces: &Vec<( + InterfaceId, + &crate::net::interface::Interface, + TorSecretKeyV3, + )>, +) -> Result { + seed.ctx + .net_controller + .generate_certificate_mountpoint(&seed.manifest.id, interfaces) + .await +} + +fn main_interfaces( + seed: &ManagerSeed, +) -> Result< + Vec<( + InterfaceId, + &crate::net::interface::Interface, + TorSecretKeyV3, + )>, + Error, +> { + seed.manifest + .interfaces + .0 + .iter() + .map(|(id, info)| { + Ok(( + id.clone(), + info, + seed.tor_keys + .get(id) + .ok_or_else(|| { + Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor) + })? + .clone(), + )) + }) + .collect::, Error>>() +} + +async fn wait_for_status(shared: &ManagerSharedState, status: Status) { + let mut recv = shared.status.0.subscribe(); + while { + let s = *recv.borrow(); + s != status + } { + if recv.changed().await.is_ok() { + break; + } + } +} + +fn sigterm_timeout(manifest: &Manifest) -> Option { + if let PackageProcedure::Docker(d) = &manifest.main { + d.sigterm_timeout.map(|d| *d) + } else if let Some(c) = &manifest.containers { + c.main.sigterm_timeout.map(|d| *d) + } else { + None + } +} + +#[instrument(skip(shared))] +async fn stop(shared: &ManagerSharedState) -> Result<(), Error> { + shared + .commit_health_check_results + .store(false, Ordering::SeqCst); + shared.on_stop.send(OnStop::Sleep).map_err(|_| { + Error::new( + eyre!("Manager has already been shutdown"), + crate::ErrorKind::Docker, + ) + })?; + if *shared.status.1.borrow() == Status::Paused { + resume(shared).await?; + } + send_signal(shared, &Signal::SIGTERM).await?; + let _ = tokio::time::timeout( + sigterm_timeout(&shared.seed.manifest).unwrap_or(Duration::from_secs(30)), + wait_for_status(shared, Status::Stopped), + ) + .await; + shared.killer.notify_waiters(); + + Ok(()) +} + +#[instrument(skip(shared))] +async fn start(shared: &ManagerSharedState) -> Result<(), Error> { + shared.on_stop.send(OnStop::Restart).map_err(|_| { + Error::new( + eyre!("Manager has already been shutdown"), + crate::ErrorKind::Docker, + ) + })?; + let _ = shared.status.0.send_modify(|x| { + if *x != Status::Running { + *x = Status::Starting + } + }); + Ok(()) +} + +#[instrument(skip(shared))] +async fn pause(shared: &ManagerSharedState) -> Result<(), Error> { + if let Err(e) = shared + .seed + .ctx + .docker + .pause_container(&shared.seed.container_name) + .await + { + tracing::error!("failed to pause container. stopping instead. {}", e); + tracing::debug!("{:?}", e); + return stop(shared).await; + } + let _ = shared.status.0.send(Status::Paused); + Ok(()) +} + +#[instrument(skip(shared))] +async fn resume(shared: &ManagerSharedState) -> Result<(), Error> { + shared + .seed + .ctx + .docker + .unpause_container(&shared.seed.container_name) + .await?; + let _ = shared.status.0.send(Status::Running); + Ok(()) +} + +async fn send_signal(shared: &ManagerSharedState, signal: &Signal) -> Result<(), Error> { + // stop health checks from committing their results + shared + .commit_health_check_results + .store(false, Ordering::SeqCst); + + if let Some(rpc_client) = shared + .persistent_container + .as_ref() + .map(|c| c.rpc_client.borrow().clone()) + { + #[cfg(feature = "js_engine")] + if let Err(e) = JsProcedure::default() + .execute::<_, NoOutput>( + &shared.seed.ctx.datadir, + &shared.seed.manifest.id, + &shared.seed.manifest.version, + ProcedureName::Signal, + &shared.seed.manifest.volumes, + Some(SignalGroupParams { + gid: shared.main_gid.1.apply_ref(|g| *g.borrow()), + signal: *signal as u32, + }), + None, // TODO + ProcessGroupId( + shared + .next_gid + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + ), + Some(rpc_client), + ) + .await? + { + tracing::error!("Failed to send js signal: {}", e.1); + tracing::debug!("{:?}", e); + } + } else { + // send signal to container + shared + .seed .ctx .docker .kill_container( - &self.shared.container_name, + &shared.seed.container_name, Some(KillContainerOptions { signal: signal.to_string(), }), @@ -317,909 +902,7 @@ impl Manager { Err(e) } })?; - Ok(()) } - #[instrument(skip(self))] - async fn exit(&self) -> Result<(), Error> { - self.shared - .commit_health_check_results - .store(false, Ordering::SeqCst); - let _ = self.shared.on_stop.send(OnStop::Exit); - let sigterm_timeout: Option = match self - .shared - .manifest - .containers - .as_ref() - .map(|x| x.main.sigterm_timeout) - { - Some(a) => a, - None => match &self.shared.manifest.main { - PackageProcedure::Docker(DockerProcedure { - sigterm_timeout, .. - }) => *sigterm_timeout, - #[cfg(feature = "js_engine")] - PackageProcedure::Script(_) => return Ok(()), - }, - }; - self.persistant_container.stop().await; - - if !check_is_injectable_main(&self.shared) { - match self - .shared - .ctx - .docker - .stop_container( - &self.shared.container_name, - Some(StopContainerOptions { - t: sigterm_timeout - .map(|a| *a) - .unwrap_or(Duration::from_secs(30)) - .as_secs_f64() as i64, - }), - ) - .await - { - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 409, // CONFLICT - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 304, // NOT MODIFIED - .. - }) => (), // Already stopped - a => a?, - }; - } else { - stop_long_running_processes( - &*self.shared.container_name, - self.persistant_container.command_inserter.clone(), - ) - .await; - } - - self.shared.status.store( - Status::Shutdown as usize, - std::sync::atomic::Ordering::SeqCst, - ); - if let Some(thread) = self.thread.take().await { - thread.await.map_err(|e| { - Error::new( - eyre!("Manager thread panicked: {}", e), - crate::ErrorKind::Docker, - ) - })?; - } - Ok(()) - } - /// this will depend on locks to main status. if you hold any locks when calling this function that conflict, this will deadlock - pub async fn synchronize(&self) { - self.shared.synchronize_now.notify_waiters(); - self.shared.synchronized.notified().await - } - - pub fn exec_command(&self) -> ExecCommand { - self.persistant_container.exec_command() - } - pub fn term_command(&self) -> TermCommand { - self.persistant_container.term_command() - } -} - -async fn manager_thread_loop( - mut recv: Receiver, - thread_shared: &Arc, - persistant_container: Arc, -) { - loop { - fn handle_stop_action<'a>( - recv: &'a mut Receiver, - ) -> ( - OnStop, - Option> + 'a>, - ) { - let val = *recv.borrow_and_update(); - match val { - OnStop::Sleep => (OnStop::Sleep, Some(recv.changed())), - a => (a, None), - } - } - let (stop_action, fut) = handle_stop_action(&mut recv); - match stop_action { - OnStop::Sleep => { - if let Some(fut) = fut { - thread_shared.status.store( - Status::Stopped as usize, - std::sync::atomic::Ordering::SeqCst, - ); - fut.await.unwrap(); - continue; - } - } - OnStop::Exit => { - thread_shared.status.store( - Status::Stopped as usize, - std::sync::atomic::Ordering::SeqCst, - ); - break; - } - OnStop::Restart => { - thread_shared.status.store( - Status::Running as usize, - std::sync::atomic::Ordering::SeqCst, - ); - } - } - match run_main(thread_shared, persistant_container.clone()).await { - Ok(Ok(NoOutput)) => (), // restart - Ok(Err(e)) => { - let mut db = thread_shared.ctx.db.handle(); - let started = crate::db::DatabaseModel::new() - .package_data() - .idx_model(&thread_shared.manifest.id) - .and_then(|pde| pde.installed()) - .map::<_, MainStatus>(|i| i.status().main()) - .get(&mut db, false) - .await; - match started.as_deref() { - Ok(Some(MainStatus::Running { .. })) if cfg!(feature = "unstable") => { - let res = thread_shared.ctx.notification_manager - .notify( - &mut db, - Some(thread_shared.manifest.id.clone()), - NotificationLevel::Warning, - String::from("Service Crashed"), - format!("The service {} has crashed with the following exit code: {}\nDetails: {}", thread_shared.manifest.id.clone(), e.0, e.1), - (), - Some(3600) // 1 hour - ) - .await; - if let Err(e) = res { - tracing::error!("Failed to issue notification: {}", e); - tracing::debug!("{:?}", e); - } - } - _ => tracing::error!("service just started. not issuing crash notification"), - } - tracing::error!("service crashed: {}: {}", e.0, e.1); - tokio::time::sleep(Duration::from_secs(15)).await; - } - Err(e) => { - tracing::error!("failed to start service: {}", e); - tracing::debug!("{:?}", e); - } - } - } -} - -struct LongRunningHandle(NonDetachingJoinHandle<()>); -pub struct CommandInserter { - command_counter: AtomicUsize, - input: UnboundedSender, - outputs: Arc>>>, -} -impl Drop for CommandInserter { - fn drop(&mut self) { - use embassy_container_init::{Input, JsonRpc}; - let CommandInserter { - command_counter, - input, - outputs: _, - } = self; - let upper: usize = command_counter.load(Ordering::Relaxed); - for i in 0..upper { - let _ignored_result = input.send(JsonRpc::new(RpcId::UInt(i as u32), Input::Term())); - } - } -} -impl CommandInserter { - fn new( - long_running: LongRunning, - input: UnboundedSender, - ) -> (Self, LongRunningHandle) { - let LongRunning { - mut output, - running_output, - } = long_running; - let command_counter = AtomicUsize::new(0); - let outputs: Arc>>> = - Default::default(); - let handle = LongRunningHandle(running_output); - tokio::spawn({ - let outputs = outputs.clone(); - async move { - while let Some(output) = output.recv().await { - let (id, output) = output.into_pair(); - let mut outputs = outputs.lock().await; - let output_sender = outputs.get_mut(&id); - if let Some(output_sender) = output_sender { - if let Err(err) = output_sender.send(output) { - tracing::warn!("Could no longer send an output"); - tracing::debug!("{err:?}"); - outputs.remove(&id); - } - } - } - } - }); - - ( - Self { - command_counter, - input, - outputs, - }, - handle, - ) - } - - pub async fn exec_command( - &self, - command: String, - args: Vec, - sender: UnboundedSender, - timeout: Option, - ) -> Option { - use embassy_container_init::{Input, JsonRpc}; - let mut outputs = self.outputs.lock().await; - let command_counter = self.command_counter.fetch_add(1, Ordering::SeqCst) as u32; - let command_id = RpcId::UInt(command_counter); - outputs.insert(command_id.clone(), sender); - if let Some(timeout) = timeout { - tokio::spawn({ - let input = self.input.clone(); - let command_id = command_id.clone(); - async move { - tokio::time::sleep(timeout).await; - let _ignored_output = input.send(JsonRpc::new(command_id, Input::Kill())); - } - }); - } - if let Err(err) = self.input.send(JsonRpc::new( - command_id.clone(), - Input::Command { command, args }, - )) { - tracing::warn!("Trying to send to input but can't"); - tracing::debug!("{err:?}"); - return None; - } - - Some(command_id) - } - pub async fn term(&self, id: RpcId) { - use embassy_container_init::{Input, JsonRpc}; - self.outputs.lock().await.remove(&id); - let _ignored_term = self.input.send(JsonRpc::new(id, Input::Term())); - } - - pub async fn term_all(&self) { - for i in 0..self.command_counter.load(Ordering::Relaxed) { - self.term(RpcId::UInt(i as u32)).await; - } - } -} - -type RunningDocker = - Arc, Error>>>>>; -pub struct PersistantContainer { - container_name: String, - running_docker: RunningDocker, - should_stop_running: Arc, - wait_for_start: (Sender, Receiver), - command_inserter: Arc>>, -} - -impl PersistantContainer { - #[instrument(skip(thread_shared))] - fn new(thread_shared: &Arc) -> Arc { - let wait_for_start = channel(false); - let container = Arc::new(Self { - container_name: thread_shared.container_name.clone(), - running_docker: Arc::new(Mutex::new(None)), - should_stop_running: Arc::new(AtomicBool::new(false)), - wait_for_start, - command_inserter: Default::default(), - }); - tokio::spawn(persistant_container( - thread_shared.clone(), - container.clone(), - )); - container - } - #[instrument(skip(self))] - async fn stop(&self) { - let container_name = &self.container_name; - self.should_stop_running.store(true, Ordering::SeqCst); - let mut running_docker = self.running_docker.lock().await; - *running_docker = None; - use tokio::process::Command; - if let Err(_err) = Command::new("docker") - .args(["stop", "-t", "30", container_name]) - .output() - .await - {} - } - - async fn wait_for_persistant(&self) { - let mut changed_rx = self.wait_for_start.1.clone(); - loop { - if !*changed_rx.borrow() { - return; - } - changed_rx.changed().await.unwrap(); - } - } - - async fn start_wait(&self) { - self.wait_for_start.0.send(true).unwrap(); - } - async fn done_waiting(&self) { - self.wait_for_start.0.send(false).unwrap(); - } - fn term_command(&self) -> TermCommand { - let cloned = self.command_inserter.clone(); - Arc::new(move |id| { - let cloned = cloned.clone(); - Box::pin(async move { - let lock = cloned.lock().await; - let _id = match &*lock { - Some(command_inserter) => command_inserter.term(id).await, - None => { - return Err("Couldn't get a command inserter in current service".to_string()) - } - }; - Ok::<(), String>(()) - }) - }) - } - - fn exec_command(&self) -> ExecCommand { - let cloned = self.command_inserter.clone(); - - /// A handle that on drop will clean all the ids that are inserter in the fn. - struct Cleaner { - command_inserter: Arc>>, - ids: ::std::collections::BTreeSet, - } - impl Drop for Cleaner { - fn drop(&mut self) { - let command_inserter = self.command_inserter.clone(); - let ids = ::std::mem::take(&mut self.ids); - tokio::spawn(async move { - let command_inserter_lock = command_inserter.lock().await; - let command_inserter = match &*command_inserter_lock { - Some(a) => a, - None => { - return; - } - }; - for id in ids { - command_inserter.term(id).await; - } - }); - } - } - let cleaner = Arc::new(Mutex::new(Cleaner { - command_inserter: cloned.clone(), - ids: Default::default(), - })); - Arc::new(move |command, args, sender, timeout| { - let cloned = cloned.clone(); - let cleaner = cleaner.clone(); - Box::pin(async move { - let lock = cloned.lock().await; - let id = match &*lock { - Some(command_inserter) => { - if let Some(id) = command_inserter - .exec_command(command.clone(), args.clone(), sender, timeout) - .await - { - let mut cleaner = cleaner.lock().await; - cleaner.ids.insert(id.clone()); - id - } else { - return Err("Couldn't get command started ".to_string()); - } - } - None => { - return Err("Expecting containers.main in the package manifest".to_string()) - } - }; - Ok::(id) - }) - }) - } -} -impl Drop for PersistantContainer { - fn drop(&mut self) { - self.should_stop_running.store(true, Ordering::SeqCst); - } -} - -async fn persistant_container( - thread_shared: Arc, - container: Arc, -) { - let main_docker_procedure_for_long = injectable_main(&thread_shared); - match main_docker_procedure_for_long { - InjectableMain::None => futures::future::pending().await, - #[cfg(feature = "js_engine")] - InjectableMain::Script((container_inject, procedure)) => loop { - let main = DockerProcedure::main_docker_procedure_js(container_inject, procedure); - if container.should_stop_running.load(Ordering::SeqCst) { - return; - } - container.start_wait().await; - match run_persistant_container(&thread_shared, container.clone(), main).await { - Ok(_) => (), - Err(e) => { - tracing::error!("failed to start persistant container: {}", e); - tracing::debug!("{:?}", e); - } - } - }, - } -} - -#[cfg(not(feature = "js_engine"))] -enum InjectableMain { - None, -} - -#[cfg(feature = "js_engine")] -enum InjectableMain<'a> { - None, - Script((&'a DockerContainer, &'a JsProcedure)), -} - -fn injectable_main(thread_shared: &Arc) -> InjectableMain { - match ( - &thread_shared.manifest.main, - &thread_shared.manifest.containers.as_ref().map(|x| &x.main), - ) { - #[cfg(feature = "js_engine")] - (PackageProcedure::Script(inject), Some(container)) => { - InjectableMain::Script((container, inject)) - } - _ => InjectableMain::None, - } -} -fn check_is_injectable_main(thread_shared: &ManagerSharedState) -> bool { - match &thread_shared.manifest.main { - PackageProcedure::Docker(_a) => false, - #[cfg(feature = "js_engine")] - PackageProcedure::Script(_) => true, - } -} -async fn run_persistant_container( - state: &Arc, - persistant: Arc, - docker_procedure: DockerProcedure, -) -> Result<(), Error> { - let interfaces = states_main_interfaces(state)?; - let generated_certificate = generate_certificate(state, &interfaces).await?; - let mut runtime = - long_running_docker(state.clone(), docker_procedure, persistant.clone()).await?; - - let ip = match get_long_running_ip(state, &mut runtime).await { - GetRunninIp::Ip(x) => x, - GetRunninIp::Error(e) => return Err(e), - GetRunninIp::EarlyExit(e) => { - tracing::error!("Early Exit"); - tracing::debug!("{:?}", e); - return Ok(()); - } - }; - persistant.done_waiting().await; - add_network_for_main(state, ip, interfaces, generated_certificate).await?; - - fetch_starting_to_running(state); - let res = tokio::select! { - a = runtime.0 => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()), - }; - remove_network_for_main(state, ip).await?; - res -} - -async fn long_running_docker( - rt_state: Arc, - main_status: DockerProcedure, - container: Arc, -) -> Result { - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - let long_running = main_status - .long_running_execute( - &rt_state.ctx, - &rt_state.manifest.id, - &rt_state.manifest.version, - ProcedureName::LongRunning, - &rt_state.manifest.volumes, - UnboundedReceiverStream::new(receiver), - ) - .await?; - let (command_inserter, long_running_handle) = CommandInserter::new(long_running, sender); - *container.command_inserter.lock().await = Some(command_inserter); - Ok(long_running_handle) -} - -async fn remove_network_for_main( - state: &Arc, - ip: std::net::Ipv4Addr, -) -> Result<(), Error> { - state - .ctx - .net_controller - .remove( - &state.manifest.id, - ip, - state.manifest.interfaces.0.keys().cloned(), - ) - .await?; - Ok(()) -} - -fn fetch_starting_to_running(state: &Arc) { - let _ = state - .status - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { - if x == Status::Starting as usize { - Some(Status::Running as usize) - } else { - None - } - }); -} - -async fn main_health_check_daemon(state: Arc) { - tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_GRACE_PERIOD_SECONDS)).await; - loop { - let mut db = state.ctx.db.handle(); - if let Err(e) = health::check( - &state.ctx, - &mut db, - &state.manifest.id, - &state.commit_health_check_results, - ) - .await - { - tracing::error!( - "Failed to run health check for {}: {}", - &state.manifest.id, - e - ); - tracing::debug!("{:?}", e); - } - tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await; - } -} - -fn set_commit_health_true(state: &Arc) { - state - .commit_health_check_results - .store(true, Ordering::SeqCst); -} - -async fn add_network_for_main( - state: &Arc, - ip: std::net::Ipv4Addr, - interfaces: Vec<( - InterfaceId, - &crate::net::interface::Interface, - TorSecretKeyV3, - )>, - generated_certificate: GeneratedCertificateMountPoint, -) -> Result<(), Error> { - state - .ctx - .net_controller - .add(&state.manifest.id, ip, interfaces, generated_certificate) - .await?; - Ok(()) -} - -enum GetRunninIp { - Ip(Ipv4Addr), - Error(Error), - EarlyExit(Result), -} - -type RuntimeOfCommand = NonDetachingJoinHandle, Error>>; - -async fn get_running_ip( - state: &Arc, - mut runtime: &mut RuntimeOfCommand, -) -> GetRunninIp { - loop { - match container_inspect(state).await { - Ok(res) => { - match res - .network_settings - .and_then(|ns| ns.networks) - .and_then(|mut n| n.remove("start9")) - .and_then(|es| es.ip_address) - .filter(|ip| !ip.is_empty()) - .map(|ip| ip.parse()) - .transpose() - { - Ok(Some(ip_addr)) => return GetRunninIp::Ip(ip_addr), - Ok(None) => (), - Err(e) => return GetRunninIp::Error(e.into()), - } - } - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) => (), - Err(e) => return GetRunninIp::Error(e.into()), - } - if let Poll::Ready(res) = futures::poll!(&mut runtime) { - match res { - Ok(Ok(response)) => return GetRunninIp::EarlyExit(response), - Err(_) | Ok(Err(_)) => { - return GetRunninIp::Error(Error::new( - eyre!("Manager runtime panicked!"), - crate::ErrorKind::Docker, - )) - } - } - } - } -} - -async fn get_long_running_ip( - state: &Arc, - runtime: &mut LongRunningHandle, -) -> GetRunninIp { - loop { - match container_inspect(state).await { - Ok(res) => { - match res - .network_settings - .and_then(|ns| ns.networks) - .and_then(|mut n| n.remove("start9")) - .and_then(|es| es.ip_address) - .filter(|ip| !ip.is_empty()) - .map(|ip| ip.parse()) - .transpose() - { - Ok(Some(ip_addr)) => return GetRunninIp::Ip(ip_addr), - Ok(None) => (), - Err(e) => return GetRunninIp::Error(e.into()), - } - } - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) => (), - Err(e) => return GetRunninIp::Error(e.into()), - } - if let Poll::Ready(res) = futures::poll!(&mut runtime.0) { - match res { - Ok(_) => return GetRunninIp::EarlyExit(Ok(NoOutput)), - Err(_e) => { - return GetRunninIp::Error(Error::new( - eyre!("Manager runtime panicked!"), - crate::ErrorKind::Docker, - )) - } - } - } - } -} - -async fn container_inspect( - state: &Arc, -) -> Result { - state - .ctx - .docker - .inspect_container(&state.container_name, None) - .await -} - -async fn generate_certificate( - state: &Arc, - interfaces: &Vec<( - InterfaceId, - &crate::net::interface::Interface, - TorSecretKeyV3, - )>, -) -> Result { - state - .ctx - .net_controller - .generate_certificate_mountpoint(&state.manifest.id, interfaces) - .await -} - -fn states_main_interfaces( - state: &Arc, -) -> Result< - Vec<( - InterfaceId, - &crate::net::interface::Interface, - TorSecretKeyV3, - )>, - Error, -> { - state - .manifest - .interfaces - .0 - .iter() - .map(|(id, info)| { - Ok(( - id.clone(), - info, - state - .tor_keys - .get(id) - .ok_or_else(|| { - Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor) - })? - .clone(), - )) - }) - .collect::, Error>>() -} - -#[instrument(skip(shared, persistant_container))] -async fn stop( - shared: &ManagerSharedState, - persistant_container: Arc, -) -> Result<(), Error> { - shared - .commit_health_check_results - .store(false, Ordering::SeqCst); - shared.on_stop.send(OnStop::Sleep).map_err(|_| { - Error::new( - eyre!("Manager has already been shutdown"), - crate::ErrorKind::Docker, - ) - })?; - if matches!( - shared.status.load(Ordering::SeqCst).try_into().unwrap(), - Status::Paused - ) { - resume(shared).await?; - } - match &shared.manifest.main { - PackageProcedure::Docker(DockerProcedure { - sigterm_timeout, .. - }) => { - if !check_is_injectable_main(shared) { - match shared - .ctx - .docker - .stop_container( - &shared.container_name, - Some(StopContainerOptions { - t: sigterm_timeout - .map(|a| *a) - .unwrap_or(Duration::from_secs(30)) - .as_secs_f64() as i64, - }), - ) - .await - { - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 409, // CONFLICT - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 304, // NOT MODIFIED - .. - }) => (), // Already stopped - a => a?, - }; - } else { - stop_long_running_processes( - &shared.container_name, - persistant_container.command_inserter.clone(), - ) - .await; - } - } - #[cfg(feature = "js_engine")] - PackageProcedure::Script(_) => { - if check_is_injectable_main(shared) { - stop_long_running_processes( - &shared.container_name, - persistant_container.command_inserter.clone(), - ) - .await; - } - } - }; - tracing::debug!("Stopping a docker"); - shared.status.store( - Status::Stopped as usize, - std::sync::atomic::Ordering::SeqCst, - ); - Ok(()) -} - -/// So the sleep infinity, which is the long running, is pid 1. So we kill the others -async fn stop_long_running_processes( - container_name: &str, - command_inserter: Arc>>, -) { - if let Some(command_inserter) = &*command_inserter.lock().await { - command_inserter.term_all().await; - } - - let _ = tokio::process::Command::new("docker") - .args([ - "container", - "exec", - container_name, - "sh", - "-c", - "ps ax | awk '$1 ~ /^[:0-9:]/ && $1 > 1 {print $1}' | xargs kill", - ]) - .output() - .await; -} - -#[instrument(skip(shared))] -async fn start(shared: &ManagerSharedState) -> Result<(), Error> { - shared.on_stop.send(OnStop::Restart).map_err(|_| { - Error::new( - eyre!("Manager has already been shutdown"), - crate::ErrorKind::Docker, - ) - })?; - let _ = shared - .status - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { - if x != Status::Running as usize { - Some(Status::Starting as usize) - } else { - None - } - }); - Ok(()) -} - -#[instrument(skip(shared, persistant_container))] -async fn pause( - shared: &ManagerSharedState, - persistant_container: Arc, -) -> Result<(), Error> { - if let Err(e) = shared - .ctx - .docker - .pause_container(&shared.container_name) - .await - { - tracing::error!("failed to pause container. stopping instead. {}", e); - tracing::debug!("{:?}", e); - return stop(shared, persistant_container).await; - } - shared - .status - .store(Status::Paused as usize, std::sync::atomic::Ordering::SeqCst); - Ok(()) -} - -#[instrument(skip(shared))] -async fn resume(shared: &ManagerSharedState) -> Result<(), Error> { - shared - .ctx - .docker - .unpause_container(&shared.container_name) - .await?; - shared.status.store( - Status::Running as usize, - std::sync::atomic::Ordering::SeqCst, - ); Ok(()) } diff --git a/backend/src/manager/sync.rs b/backend/src/manager/sync.rs index 1e6927eee..41a6445c5 100644 --- a/backend/src/manager/sync.rs +++ b/backend/src/manager/sync.rs @@ -1,23 +1,18 @@ -use std::convert::TryInto; -use std::sync::atomic::Ordering; +use std::collections::BTreeMap; use std::time::Duration; -use std::{collections::BTreeMap, sync::Arc}; use chrono::Utc; -use super::{pause, resume, start, stop, ManagerSharedState, PersistantContainer, Status}; +use super::{pause, resume, start, stop, ManagerSharedState, Status}; use crate::status::MainStatus; use crate::Error; /// Allocates a db handle. DO NOT CALL with a db handle already in scope -async fn synchronize_once( - shared: &ManagerSharedState, - persistant_container: Arc, -) -> Result { - let mut db = shared.ctx.db.handle(); +async fn synchronize_once(shared: &ManagerSharedState) -> Result { + let mut db = shared.seed.ctx.db.handle(); let mut status = crate::db::DatabaseModel::new() .package_data() - .idx_model(&shared.manifest.id) + .idx_model(&shared.seed.manifest.id) .expect(&mut db) .await? .installed() @@ -27,7 +22,7 @@ async fn synchronize_once( .main() .get_mut(&mut db) .await?; - let manager_status = shared.status.load(Ordering::SeqCst).try_into().unwrap(); + let manager_status = *shared.status.1.borrow(); match manager_status { Status::Stopped => match &mut *status { MainStatus::Stopped => (), @@ -48,16 +43,16 @@ async fn synchronize_once( }, Status::Starting => match *status { MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { - stop(shared, persistant_container).await?; + stop(shared).await?; } MainStatus::Starting { .. } | MainStatus::Running { .. } => (), MainStatus::BackingUp { .. } => { - pause(shared, persistant_container).await?; + pause(shared).await?; } }, Status::Running => match *status { MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { - stop(shared, persistant_container).await?; + stop(shared).await?; } MainStatus::Starting { .. } => { *status = MainStatus::Running { @@ -67,12 +62,12 @@ async fn synchronize_once( } MainStatus::Running { .. } => (), MainStatus::BackingUp { .. } => { - pause(shared, persistant_container).await?; + pause(shared).await?; } }, Status::Paused => match *status { MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => { - stop(shared, persistant_container).await?; + stop(shared).await?; } MainStatus::Starting { .. } | MainStatus::Running { .. } => { resume(shared).await?; @@ -85,21 +80,20 @@ async fn synchronize_once( Ok(manager_status) } -pub async fn synchronizer( - shared: &ManagerSharedState, - persistant_container: Arc, -) { +pub async fn synchronizer(shared: &ManagerSharedState) { + let mut status_recv = shared.status.0.subscribe(); loop { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(5)) => (), _ = shared.synchronize_now.notified() => (), + _ = status_recv.changed() => (), } - let status = match synchronize_once(shared, persistant_container.clone()).await { + let status = match synchronize_once(shared).await { Err(e) => { tracing::error!( "Synchronizer for {}@{} failed: {}", - shared.manifest.id, - shared.manifest.version, + shared.seed.manifest.id, + shared.seed.manifest.version, e ); tracing::debug!("{:?}", e); @@ -107,7 +101,7 @@ pub async fn synchronizer( } Ok(status) => status, }; - tracing::trace!("{} status synchronized", shared.manifest.id); + tracing::trace!("{} status synchronized", shared.seed.manifest.id); shared.synchronized.notify_waiters(); match status { Status::Shutdown => { diff --git a/backend/src/net/cert_resolver.rs b/backend/src/net/cert_resolver.rs index 9e2e9a3d2..9a24144aa 100644 --- a/backend/src/net/cert_resolver.rs +++ b/backend/src/net/cert_resolver.rs @@ -105,7 +105,6 @@ impl ResolvesServerCert for EmbassyCertResolver { match hostname_raw { Some(hostname_str) => { - let full_fqdn = match ResourceFqdn::from_str(hostname_str) { Ok(fqdn) => fqdn, Err(_) => { diff --git a/backend/src/net/mod.rs b/backend/src/net/mod.rs index 73d59f9b0..a4e762391 100644 --- a/backend/src/net/mod.rs +++ b/backend/src/net/mod.rs @@ -7,22 +7,20 @@ use indexmap::IndexSet; use rpc_toolkit::command; use self::interface::InterfaceId; - use crate::net::interface::LanPortConfig; - use crate::util::serde::Port; use crate::Error; +pub mod cert_resolver; pub mod dns; +pub mod embassy_service_http_server; pub mod interface; #[cfg(feature = "avahi")] pub mod mdns; -pub mod embassy_service_http_server; pub mod net_controller; pub mod net_utils; pub mod proxy_controller; pub mod ssl; -pub mod cert_resolver; pub mod static_server; pub mod tor; pub mod vhost_controller; diff --git a/backend/src/net/net_controller.rs b/backend/src/net/net_controller.rs index 7e4b8cb4a..c02dc5dac 100644 --- a/backend/src/net/net_controller.rs +++ b/backend/src/net/net_controller.rs @@ -11,7 +11,7 @@ use torut::onion::{OnionAddressV3, TorSecretKeyV3}; use tracing::instrument; use crate::context::RpcContext; -use crate::hostname::{get_current_ip, get_embassyd_tor_addr, get_hostname, HostNameReceipt}; +use crate::hostname::{get_embassyd_tor_addr, get_hostname, HostNameReceipt}; use crate::net::dns::DnsController; use crate::net::interface::{Interface, TorConfig}; #[cfg(feature = "avahi")] @@ -115,7 +115,7 @@ impl NetController { async fn setup_embassy_http_ui_handle(rpc_ctx: RpcContext) -> Result<(), Error> { let host_name = rpc_ctx.net_controller.proxy.get_hostname().await; - + let embassy_tor_addr = get_embassyd_tor_addr(rpc_ctx.clone()).await?; let embassy_tor_fqdn: ResourceFqdn = embassy_tor_addr.parse()?; let host_name_fqdn: ResourceFqdn = host_name.parse()?; diff --git a/backend/src/net/proxy_controller.rs b/backend/src/net/proxy_controller.rs index 1da295dcc..a6855ff8f 100644 --- a/backend/src/net/proxy_controller.rs +++ b/backend/src/net/proxy_controller.rs @@ -15,7 +15,7 @@ use tokio::net::TcpStream; use tokio::sync::Mutex; use tracing::{error, info, instrument}; -use crate::net::net_utils::{host_addr_fqdn, ResourceFqdn}; +use crate::net::net_utils::ResourceFqdn; use crate::net::ssl::SslManager; use crate::net::vhost_controller::VHOSTController; use crate::net::{HttpClient, HttpHandler, InterfaceMetadata, PackageNetInfo}; diff --git a/backend/src/net/static_server.rs b/backend/src/net/static_server.rs index 06c5de980..1e0675e11 100644 --- a/backend/src/net/static_server.rs +++ b/backend/src/net/static_server.rs @@ -8,7 +8,6 @@ use digest::Digest; use futures::FutureExt; use http::response::Builder; use hyper::{Body, Method, Request, Response, StatusCode}; - use rpc_toolkit::rpc_handler; use tokio::fs::File; use tokio_util::codec::{BytesCodec, FramedRead}; diff --git a/backend/src/net/tor.rs b/backend/src/net/tor.rs index 5f5fe89ff..8c7f99e19 100644 --- a/backend/src/net/tor.rs +++ b/backend/src/net/tor.rs @@ -377,8 +377,7 @@ pub async fn tor_health_check(client: &Client, tor_controller: &TorController) { .await; if let Err(e) = result { let mut num_attempt = 1; - tracing::error!( - "Unable to reach self over tor, we will retry now..."); + tracing::error!("Unable to reach self over tor, we will retry now..."); tracing::error!("The first TOR error: {}", e); loop { diff --git a/backend/src/net/vhost_controller.rs b/backend/src/net/vhost_controller.rs index 5d0a0494f..56fdc61d2 100644 --- a/backend/src/net/vhost_controller.rs +++ b/backend/src/net/vhost_controller.rs @@ -3,12 +3,12 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio_rustls::rustls::ServerConfig; -use crate::net::cert_resolver::EmbassyCertResolver; -use crate::net::embassy_service_http_server::{EmbassyServiceHTTPServer}; +use crate::net::cert_resolver::EmbassyCertResolver; +use crate::net::embassy_service_http_server::EmbassyServiceHTTPServer; +use crate::net::net_utils::ResourceFqdn; use crate::net::HttpHandler; use crate::Error; -use crate::net::net_utils::ResourceFqdn; pub struct VHOSTController { pub service_servers: BTreeMap, @@ -67,7 +67,6 @@ impl VHOSTController { None }; - let mut new_service_server = EmbassyServiceHTTPServer::new(self.embassyd_addr.ip(), external_svc_port, ssl_cfg) .await?; @@ -76,7 +75,7 @@ impl VHOSTController { .await?; self.service_servers .insert(external_svc_port, new_service_server); - + Ok(()) } } diff --git a/backend/src/procedure/docker.rs b/backend/src/procedure/docker.rs index 746247bfc..a7118185a 100644 --- a/backend/src/procedure/docker.rs +++ b/backend/src/procedure/docker.rs @@ -9,19 +9,15 @@ use async_stream::stream; use bollard::container::RemoveContainerOptions; use color_eyre::eyre::eyre; use color_eyre::Report; -use embassy_container_init::{InputJsonRpc, OutputJsonRpc}; use futures::future::Either as EitherFuture; -use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; -use helpers::NonDetachingJoinHandle; +use futures::TryStreamExt; +use helpers::{NonDetachingJoinHandle, RpcClient}; use nix::sys::signal; use nix::unistd::Pid; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::{ - io::{AsyncBufRead, AsyncBufReadExt, BufReader}, - process::Child, - sync::mpsc::UnboundedReceiver, -}; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader}; use tracing::instrument; use super::ProcedureName; @@ -70,6 +66,57 @@ pub struct DockerContainer { #[serde(default)] pub system: bool, } +impl DockerContainer { + /// We created a new exec runner, where we are going to be passing the commands for it to run. + /// Idea is that we are going to send it command and get the inputs be filtered back from the manager. + /// Then we could in theory run commands without the cost of running the docker exec which is known to have + /// a dely of > 200ms which is not acceptable. + #[instrument(skip(ctx))] + pub async fn long_running_execute( + &self, + ctx: &RpcContext, + pkg_id: &PackageId, + pkg_version: &Version, + volumes: &Volumes, + ) -> Result<(LongRunning, RpcClient), Error> { + let container_name = DockerProcedure::container_name(pkg_id, None); + + let mut cmd = LongRunning::setup_long_running_docker_cmd( + self, + ctx, + &container_name, + volumes, + pkg_id, + pkg_version, + ) + .await?; + + let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; + + let client = + if let (Some(stdin), Some(stdout)) = (handle.stdin.take(), handle.stdout.take()) { + RpcClient::new(stdin, stdout) + } else { + return Err(Error::new( + eyre!("No stdin/stdout handle for container init"), + crate::ErrorKind::Incoherent, + )); + }; + + let running_output = NonDetachingJoinHandle::from(tokio::spawn(async move { + if let Err(err) = handle + .wait() + .await + .map_err(|e| eyre!("Runtime error: {e:?}")) + { + tracing::error!("{}", err); + tracing::debug!("{:?}", err); + } + })); + + Ok((LongRunning { running_output }, client)) + } +} #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] @@ -122,23 +169,6 @@ impl DockerProcedure { shm_size_mb: container.shm_size_mb, } } - #[cfg(feature = "js_engine")] - pub fn main_docker_procedure_js( - container: &DockerContainer, - _procedure: &super::js_scripts::JsProcedure, - ) -> DockerProcedure { - DockerProcedure { - image: container.image.clone(), - system: container.system, - entrypoint: "sleep".to_string(), - args: Vec::new(), - inject: false, - mounts: container.mounts.clone(), - io_format: None, - sigterm_timeout: container.sigterm_timeout, - shm_size_mb: container.shm_size_mb, - } - } pub fn validate( &self, @@ -346,64 +376,6 @@ impl DockerProcedure { ) } - /// We created a new exec runner, where we are going to be passing the commands for it to run. - /// Idea is that we are going to send it command and get the inputs be filtered back from the manager. - /// Then we could in theory run commands without the cost of running the docker exec which is known to have - /// a dely of > 200ms which is not acceptable. - #[instrument(skip(ctx, input))] - pub async fn long_running_execute( - &self, - ctx: &RpcContext, - pkg_id: &PackageId, - pkg_version: &Version, - name: ProcedureName, - volumes: &Volumes, - input: S, - ) -> Result - where - S: Stream + Send + 'static, - { - let name = name.docker_name(); - let name: Option<&str> = name.as_deref(); - let container_name = Self::container_name(pkg_id, name); - - let mut cmd = LongRunning::setup_long_running_docker_cmd( - self, - ctx, - &container_name, - volumes, - pkg_id, - pkg_version, - ) - .await?; - - let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; - let input_handle = LongRunning::spawn_input_handle(&mut handle, input)? - .map_err(|e| eyre!("Input Handle Error: {e:?}")); - - let (output, output_handle) = LongRunning::spawn_output_handle(&mut handle)?; - let output_handle = output_handle.map_err(|e| eyre!("Output Handle Error: {e:?}")); - let err_handle = LongRunning::spawn_error_handle(&mut handle)? - .map_err(|e| eyre!("Err Handle Error: {e:?}")); - - let running_output = NonDetachingJoinHandle::from(tokio::spawn(async move { - if let Err(err) = tokio::select!( - x = handle.wait().map_err(|e| eyre!("Runtime error: {e:?}")) => x.map(|_| ()), - x = err_handle => x.map(|_| ()), - x = output_handle => x.map(|_| ()), - x = input_handle => x.map(|_| ()) - ) { - tracing::debug!("{:?}", err); - tracing::error!("Join error"); - } - })); - - Ok(LongRunning { - output, - running_output, - }) - } - #[instrument(skip(_ctx, input))] pub async fn inject( &self, @@ -788,13 +760,12 @@ impl RingVec { /// We wanted a long running since we want to be able to have the equivelent to the docker execute without the heavy costs of 400 + ms time lag. /// Also the long running let's us have the ability to start/ end the services quicker. pub struct LongRunning { - pub output: UnboundedReceiver, pub running_output: NonDetachingJoinHandle<()>, } impl LongRunning { async fn setup_long_running_docker_cmd( - docker: &DockerProcedure, + docker: &DockerContainer, ctx: &RpcContext, container_name: &str, volumes: &Volumes, @@ -865,7 +836,7 @@ impl LongRunning { cmd.arg(docker.image.for_package(pkg_id, Some(pkg_version))); } cmd.stdout(std::process::Stdio::piped()); - cmd.stderr(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::inherit()); cmd.stdin(std::process::Stdio::piped()); Ok(cmd) } @@ -894,104 +865,6 @@ impl LongRunning { Err(e) => Err(e)?, } } - fn spawn_input_handle( - handle: &mut Child, - input: S, - ) -> Result, Error> - where - S: Stream + Send + 'static, - { - use tokio::io::AsyncWriteExt; - let mut stdin = handle - .stdin - .take() - .ok_or_else(|| eyre!("Can't takeout stdin")) - .with_kind(crate::ErrorKind::Docker)?; - let handle = NonDetachingJoinHandle::from(tokio::spawn(async move { - let input = input; - tokio::pin!(input); - while let Some(input) = input.next().await { - let input = match serde_json::to_string(&input) { - Ok(a) => a, - Err(e) => { - tracing::debug!("{:?}", e); - tracing::error!("Docker Input Serialization issue"); - continue; - } - }; - if let Err(e) = stdin.write_all(format!("{input}\n").as_bytes()).await { - tracing::debug!("{:?}", e); - tracing::error!("Docker Input issue"); - return; - } - } - })); - Ok(handle) - } - fn spawn_error_handle(handle: &mut Child) -> Result, Error> { - let id = handle.id(); - let mut output = tokio::io::BufReader::new( - handle - .stderr - .take() - .ok_or_else(|| eyre!("Can't takeout stderr")) - .with_kind(crate::ErrorKind::Docker)?, - ) - .lines(); - Ok(NonDetachingJoinHandle::from(tokio::spawn(async move { - while let Ok(Some(line)) = output.next_line().await { - tracing::debug!("{:?}", id); - tracing::error!("Error from long running container"); - tracing::error!("{}", line); - } - }))) - } - - fn spawn_output_handle( - handle: &mut Child, - ) -> Result<(UnboundedReceiver, NonDetachingJoinHandle<()>), Error> { - let mut output = tokio::io::BufReader::new( - handle - .stdout - .take() - .ok_or_else(|| eyre!("Can't takeout stdout for long running")) - .with_kind(crate::ErrorKind::Docker)?, - ) - .lines(); - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); - Ok(( - receiver, - NonDetachingJoinHandle::from(tokio::spawn(async move { - loop { - let next = output.next_line().await; - let next = match next { - Ok(Some(a)) => a, - Ok(None) => { - tracing::error!("The docker pipe is closed?"); - break; - } - Err(e) => { - tracing::debug!("{:?}", e); - tracing::error!("Output from docker, killing"); - break; - } - }; - let next = match serde_json::from_str(&next) { - Ok(a) => a, - Err(_e) => { - tracing::trace!("Could not decode output from long running binary"); - continue; - } - }; - if let Err(e) = sender.send(next) { - tracing::debug!("{:?}", e); - tracing::error!("Could no longer send output"); - break; - } - } - })), - )) - } } async fn buf_reader_to_lines( reader: impl AsyncBufRead + Unpin, diff --git a/backend/src/procedure/js_scripts.rs b/backend/src/procedure/js_scripts.rs index b76e70d01..8d7091a67 100644 --- a/backend/src/procedure/js_scripts.rs +++ b/backend/src/procedure/js_scripts.rs @@ -2,17 +2,20 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use color_eyre::eyre::eyre; +use embassy_container_init::{ProcessGroupId, SignalGroup, SignalGroupParams}; +use helpers::RpcClient; pub use js_engine::JsError; use js_engine::{JsExecutionEnvironment, PathForVolumeId}; -use models::VolumeId; -use models::{ExecCommand, TermCommand}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use models::{ErrorKind, VolumeId}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use tracing::instrument; use super::ProcedureName; use crate::context::RpcContext; use crate::s9pk::manifest::PackageId; -use crate::util::Version; +use crate::util::{GeneralGuard, Version}; use crate::volume::Volumes; use crate::Error; @@ -42,7 +45,7 @@ impl PathForVolumeId for Volumes { } } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct JsProcedure { #[serde(default)] @@ -54,7 +57,7 @@ impl JsProcedure { Ok(()) } - #[instrument(skip(directory, input, exec_command, term_command))] + #[instrument(skip(directory, input, rpc_client))] pub async fn execute( &self, directory: &PathBuf, @@ -64,17 +67,32 @@ impl JsProcedure { volumes: &Volumes, input: Option, timeout: Option, - exec_command: ExecCommand, - term_command: TermCommand, + gid: ProcessGroupId, + rpc_client: Option>, ) -> Result, Error> { - Ok(async move { + let cleaner_client = rpc_client.clone(); + let cleaner = GeneralGuard::new(move || { + tokio::spawn(async move { + if let Some(client) = cleaner_client { + client + .request(SignalGroup, SignalGroupParams { gid, signal: 9 }) + .await + .map_err(|e| { + Error::new(eyre!("{}: {:?}", e.message, e.data), ErrorKind::Docker) + }) + } else { + Ok(()) + } + }) + }); + let res = async move { let running_action = JsExecutionEnvironment::load_from_package( directory, pkg_id, pkg_version, Box::new(volumes.clone()), - exec_command, - term_command, + gid, + rpc_client, ) .await? .run_action(name, input, self.args.clone()); @@ -88,7 +106,9 @@ impl JsProcedure { Ok(output) } .await - .map_err(|(error, message)| (error.as_code_num(), message))) + .map_err(|(error, message)| (error.as_code_num(), message)); + cleaner.drop().await.unwrap()?; + Ok(res) } #[instrument(skip(ctx, input))] @@ -108,12 +128,8 @@ impl JsProcedure { pkg_id, pkg_version, Box::new(volumes.clone()), - Arc::new(|_, _, _, _| { - Box::pin(async { Err("Can't run commands in sandox mode".to_string()) }) - }), - Arc::new(|_| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), + ProcessGroupId(0), + None, ) .await? .read_only_effects() @@ -193,10 +209,8 @@ async fn js_action_execute() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap() @@ -252,10 +266,8 @@ async fn js_action_execute_error() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap(); @@ -300,10 +312,8 @@ async fn js_action_fetch() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap() @@ -341,23 +351,18 @@ async fn js_test_slow() { let timeout = Some(Duration::from_secs(10)); tracing::debug!("testing start"); tokio::select! { - a = js_action - .execute::( - &path, - &package_id, - &package_version, - name, - &volumes, - input, - timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), - ) - => {a - .unwrap() - .unwrap();}, + a = js_action + .execute::( + &path, + &package_id, + &package_version, + name, + &volumes, + input, + timeout, + ProcessGroupId(0), + None, + ) => { a.unwrap().unwrap(); }, _ = tokio::time::sleep(Duration::from_secs(1)) => () } tracing::debug!("testing end should"); @@ -404,10 +409,8 @@ async fn js_action_var_arg() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap() @@ -452,10 +455,8 @@ async fn js_action_test_rename() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap() @@ -500,10 +501,8 @@ async fn js_action_test_deep_dir() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap() @@ -547,10 +546,8 @@ async fn js_action_test_deep_dir_escape() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap() @@ -595,10 +592,8 @@ async fn js_rsync() { &volumes, input, timeout, - Arc::new(|_, _, _, _| { - Box::pin(async move { Err("Can't run commands in test".to_string()) }) - }), - Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })), + ProcessGroupId(0), + None, ) .await .unwrap() diff --git a/backend/src/procedure/mod.rs b/backend/src/procedure/mod.rs index b8a73830c..2924c0c54 100644 --- a/backend/src/procedure/mod.rs +++ b/backend/src/procedure/mod.rs @@ -3,7 +3,8 @@ use std::time::Duration; use color_eyre::eyre::eyre; use patch_db::HasModel; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use tracing::instrument; use self::docker::{DockerContainers, DockerProcedure}; @@ -82,7 +83,7 @@ impl PackageProcedure { } #[cfg(feature = "js_engine")] PackageProcedure::Script(procedure) => { - let exec_command = match ctx + let (gid, rpc_client) = match ctx .managers .get(&(pkg_id.clone(), pkg_version.clone())) .await @@ -93,23 +94,16 @@ impl PackageProcedure { ErrorKind::NotFound, )) } - Some(x) => x, - } - .exec_command(); - let term_command = match ctx - .managers - .get(&(pkg_id.clone(), pkg_version.clone())) - .await - { - None => { - return Err(Error::new( - eyre!("No manager found for {}", pkg_id), - ErrorKind::NotFound, - )) - } - Some(x) => x, - } - .term_command(); + Some(man) => ( + if matches!(name, ProcedureName::Main) { + man.new_main_gid() + } else { + man.new_gid() + }, + man.rpc_client(), + ), + }; + procedure .execute( &ctx.datadir, @@ -119,77 +113,14 @@ impl PackageProcedure { volumes, input, timeout, - exec_command, - term_command, + gid, + rpc_client, ) .await } } } - #[instrument(skip(ctx, input))] - pub async fn inject( - &self, - ctx: &RpcContext, - pkg_id: &PackageId, - pkg_version: &Version, - name: ProcedureName, - volumes: &Volumes, - input: Option, - timeout: Option, - ) -> Result, Error> { - match self { - PackageProcedure::Docker(procedure) => { - procedure - .inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout) - .await - } - #[cfg(feature = "js_engine")] - PackageProcedure::Script(procedure) => { - let exec_command = match ctx - .managers - .get(&(pkg_id.clone(), pkg_version.clone())) - .await - { - None => { - return Err(Error::new( - eyre!("No manager found for {}", pkg_id), - ErrorKind::NotFound, - )) - } - Some(x) => x, - } - .exec_command(); - let term_command = match ctx - .managers - .get(&(pkg_id.clone(), pkg_version.clone())) - .await - { - None => { - return Err(Error::new( - eyre!("No manager found for {}", pkg_id), - ErrorKind::NotFound, - )) - } - Some(x) => x, - } - .term_command(); - procedure - .execute( - &ctx.datadir, - pkg_id, - pkg_version, - name, - volumes, - input, - timeout, - exec_command, - term_command, - ) - .await - } - } - } #[instrument(skip(ctx, input))] pub async fn sandboxed( &self, diff --git a/backend/src/s9pk/builder.rs b/backend/src/s9pk/builder.rs index 3a30d8efc..28052a086 100644 --- a/backend/src/s9pk/builder.rs +++ b/backend/src/s9pk/builder.rs @@ -1,4 +1,3 @@ -use nom::combinator::success; use sha2_old::{Digest, Sha512}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tracing::instrument; diff --git a/backend/src/s9pk/manifest.rs b/backend/src/s9pk/manifest.rs index d54700488..5f787554d 100644 --- a/backend/src/s9pk/manifest.rs +++ b/backend/src/s9pk/manifest.rs @@ -6,6 +6,7 @@ use patch_db::HasModel; use serde::{Deserialize, Serialize}; use url::Url; +use super::git_hash::GitHash; use crate::action::Actions; use crate::backup::BackupActions; use crate::config::action::ConfigActions; @@ -20,8 +21,6 @@ use crate::version::{Current, VersionT}; use crate::volume::Volumes; use crate::Error; -use super::git_hash::GitHash; - fn current_version() -> Version { Current::new().semver().into() } diff --git a/backend/src/update/mod.rs b/backend/src/update/mod.rs index 3caebb8a6..2634cedb6 100644 --- a/backend/src/update/mod.rs +++ b/backend/src/update/mod.rs @@ -24,7 +24,6 @@ use crate::sound::{ CIRCLE_OF_5THS_SHORT, UPDATE_FAILED_1, UPDATE_FAILED_2, UPDATE_FAILED_3, UPDATE_FAILED_4, }; use crate::update::latest_information::LatestInformation; - use crate::util::Invoke; use crate::version::{Current, VersionT}; use crate::{Error, ErrorKind, ResultExt, IS_RASPBERRY_PI}; @@ -250,7 +249,7 @@ impl EosUrl { }; Ok(format!("{host}::{version}/{arch}/") .parse() - .map_err(|e| Error::new(eyre!("Could not parse path"), ErrorKind::ParseUrl))?) + .map_err(|_| Error::new(eyre!("Could not parse path"), ErrorKind::ParseUrl))?) } } diff --git a/backend/src/version/v0_3_2.rs b/backend/src/version/v0_3_2.rs index e7aeb43db..8079d9057 100644 --- a/backend/src/version/v0_3_2.rs +++ b/backend/src/version/v0_3_2.rs @@ -2,10 +2,8 @@ use emver::VersionRange; use super::v0_3_0::V0_3_0_COMPAT; use super::*; -use crate::{ - config::util::MergeWith, - hostname::{generate_id, sync_hostname}, -}; +use crate::config::util::MergeWith; +use crate::hostname::{generate_id, sync_hostname}; const V0_3_2: emver::Version = emver::Version::new(0, 3, 2, 0); diff --git a/libs/Cargo.lock b/libs/Cargo.lock index 103b00415..e09c508a8 100644 --- a/libs/Cargo.lock +++ b/libs/Cargo.lock @@ -200,6 +200,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "bitmaps" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703642b98a00b3b90513279a8ede3fcfa479c126c5fb46e78f3051522f021403" + [[package]] name = "bitvec" version = "0.22.3" @@ -312,8 +318,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" dependencies = [ "iana-time-zone", + "js-sys", "num-integer", "num-traits", + "time", + "wasm-bindgen", "winapi", ] @@ -424,6 +433,15 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-queue" version = "0.3.6" @@ -787,6 +805,10 @@ dependencies = [ "async-stream", "color-eyre", "futures", + "helpers", + "imbl 2.0.0", + "nix 0.25.0", + "procfs", "serde", "serde_json", "tokio", @@ -795,6 +817,7 @@ dependencies = [ "tracing-error 0.2.0", "tracing-futures", "tracing-subscriber 0.3.16", + "yajrc 0.1.0 (git+https://github.com/dr-bonez/yajrc.git?branch=develop)", ] [[package]] @@ -829,6 +852,27 @@ dependencies = [ "syn", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -863,6 +907,16 @@ dependencies = [ "nix 0.24.2", ] +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1138,9 +1192,11 @@ dependencies = [ "models", "pin-project", "serde", + "serde_json", "tokio", "tokio-stream", "tracing", + "yajrc 0.1.0 (git+https://github.com/dr-bonez/yajrc.git?branch=develop)", ] [[package]] @@ -1322,7 +1378,7 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "543682c9082b25e63d03b5acbd65ad111fd49dd93e70843e5175db4ff81d606b" dependencies = [ - "bitmaps", + "bitmaps 2.1.0", "rand_core 0.6.4", "rand_xoshiro", "sized-chunks", @@ -1330,6 +1386,28 @@ dependencies = [ "version_check", ] +[[package]] +name = "imbl" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2806b69cd9f4664844027b64465eacb444c67c1db9c778e341adff0c25cdb0d" +dependencies = [ + "bitmaps 3.2.0", + "imbl-sized-chunks", + "rand_core 0.6.4", + "rand_xoshiro", + "version_check", +] + +[[package]] +name = "imbl-sized-chunks" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6957ea0b2541c5ca561d3ef4538044af79f8a05a1eb3a3b148936aaceaa1076" +dependencies = [ + "bitmaps 3.2.0", +] + [[package]] name = "indenter" version = "0.3.3" @@ -1355,6 +1433,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-lifetimes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ce5ef949d49ee85593fc4d3f3f95ad61657076395cbbce23e2121fc5542074" + [[package]] name = "ipnet" version = "2.5.1" @@ -1568,6 +1652,12 @@ dependencies = [ "cc", ] +[[package]] +name = "linux-raw-sys" +version = "0.0.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" + [[package]] name = "lock_api" version = "0.4.9" @@ -1679,7 +1769,6 @@ dependencies = [ "bollard", "color-eyre", "ed25519-dalek", - "embassy_container_init", "emver", "mbrman", "openssl", @@ -1744,6 +1833,20 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nix" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e322c04a9e3440c327fca7b6c8a63e6890a32fa2ad689db972425f07e0d22abb" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "libc", + "memoffset", + "pin-utils", +] + [[package]] name = "nom" version = "7.1.1" @@ -1960,7 +2063,7 @@ dependencies = [ "async-trait", "fd-lock-rs", "futures", - "imbl", + "imbl 1.0.1", "json-patch", "json-ptr", "lazy_static", @@ -2131,6 +2234,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfb6451c91904606a1abe93e83a8ec851f45827fa84273f256ade45dc095818" +dependencies = [ + "bitflags", + "byteorder", + "chrono", + "flate2", + "hex", + "lazy_static", + "rustix", +] + [[package]] name = "quote" version = "1.0.21" @@ -2352,7 +2470,7 @@ dependencies = [ "thiserror", "tokio", "url", - "yajrc", + "yajrc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2407,6 +2525,20 @@ dependencies = [ "semver 1.0.14", ] +[[package]] +name = "rustix" +version = "0.35.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727a1a6d65f786ec22df8a81ca3121107f235970dc1705ed681d3e6e8b9cd5f9" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.42.0", +] + [[package]] name = "rustls" version = "0.20.7" @@ -2724,7 +2856,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16d69225bde7a69b235da73377861095455d298f2b970996eec25ddbb42b3d1e" dependencies = [ - "bitmaps", + "bitmaps 2.1.0", "typenum", ] @@ -3408,6 +3540,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3797,6 +3940,12 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -4081,6 +4230,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "yajrc" +version = "0.1.0" +source = "git+https://github.com/dr-bonez/yajrc.git?branch=develop#72a22f7ac2197d7a5cdce4be601cf20e5280eec5" +dependencies = [ + "anyhow", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "zeroize" version = "1.3.0" diff --git a/libs/embassy_container_init/Cargo.toml b/libs/embassy_container_init/Cargo.toml index 6c9d59227..084a65d93 100644 --- a/libs/embassy_container_init/Cargo.toml +++ b/libs/embassy_container_init/Cargo.toml @@ -11,17 +11,22 @@ unstable = [] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-stream = "0.3.*" -color-eyre = "0.6.*" -futures = "0.3.*" -serde = { version = "1.*", features = ["derive", "rc"] } -serde_json = "1.*" -tokio = { version = "1.*", features = ["full"] } +async-stream = "0.3" +color-eyre = "0.6" +futures = "0.3" +serde = { version = "1", features = ["derive", "rc"] } +serde_json = "1" +helpers = { path = "../helpers" } +imbl = "2" +nix = "0.25" +procfs = "0.14" +tokio = { version = "1", features = ["full"] } tokio-stream = { version = "0.1.11" } -tracing = "0.1.*" -tracing-error = "0.2.*" -tracing-futures = "0.2.*" -tracing-subscriber = { version = "0.3.*", features = ["env-filter"] } +tracing = "0.1" +tracing-error = "0.2" +tracing-futures = "0.2" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +yajrc = { version = "*", git = "https://github.com/dr-bonez/yajrc.git", branch = "develop" } [profile.test] opt-level = 3 diff --git a/libs/embassy_container_init/src/lib.rs b/libs/embassy_container_init/src/lib.rs index 21723f1bf..c1684620a 100644 --- a/libs/embassy_container_init/src/lib.rs +++ b/libs/embassy_container_init/src/lib.rs @@ -1,104 +1,173 @@ -use serde::{Deserialize, Serialize}; -use tracing::instrument; +use nix::unistd::Pid; +use serde::{Deserialize, Serialize, Serializer}; +use yajrc::RpcMethod; -/// The inputs that the executable is expecting -pub type InputJsonRpc = JsonRpc; -/// The outputs that the executable is expected to output -pub type OutputJsonRpc = JsonRpc; - -/// Based on the jsonrpc spec, but we are limiting the rpc to a subset -#[derive(Debug, Serialize, Copy, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)] -#[serde(untagged)] -pub enum RpcId { - UInt(u32), -} /// Know what the process is called -#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ProcessId(pub u32); - -/// We use the JSON rpc as the format to share between the stdin and stdout for the executable. -/// Note: We are not allowing the id to not exist, used to ensure all pairs of messages are tracked -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] -pub struct JsonRpc { - id: RpcId, - #[serde(flatten)] - pub version_rpc: VersionRpc, +impl From for Pid { + fn from(pid: ProcessId) -> Self { + Pid::from_raw(pid.0 as i32) + } } - -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] -#[serde(tag = "jsonrpc", rename_all = "camelCase")] -pub enum VersionRpc { - #[serde(rename = "2.0")] - Two(T), +impl From for ProcessId { + fn from(pid: Pid) -> Self { + ProcessId(pid.as_raw() as u32) + } } - -impl JsonRpc -where - T: Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug, -{ - /// Using this to simplify creating this nested struct. Used for creating input mostly for executable stdin - pub fn new(id: RpcId, body: T) -> Self { - JsonRpc { - id, - version_rpc: VersionRpc::Two(body), - } - } - /// Use this to get the data out of the probably destructed output - pub fn into_pair(self) -> (RpcId, T) { - let Self { id, version_rpc } = self; - let VersionRpc::Two(body) = version_rpc; - (id, body) - } - /// Used during the execution. - #[instrument] - pub fn maybe_serialize(&self) -> Option { - match serde_json::to_string(self) { - Ok(x) => Some(x), - Err(e) => { - tracing::warn!("Could not stringify and skipping"); - tracing::debug!("{:?}", e); - None - } - } - } - /// Used during the execution - #[instrument] - pub fn maybe_parse(s: &str) -> Option { - match serde_json::from_str::(s) { - Ok(a) => Some(a), - Err(e) => { - tracing::warn!("Could not parse and skipping: {}", s); - tracing::debug!("{:?}", e); - None - } - } +impl From for ProcessId { + fn from(pid: i32) -> Self { + ProcessId(pid as u32) } } -/// Outputs embedded in the JSONRpc output of the executable. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -#[serde(tag = "method", content = "params", rename_all = "camelCase")] -pub enum Output { - /// Will be called almost right away and only once per RpcId. Indicates what - /// was spawned in the container. - ProcessId(ProcessId), - /// This is the line buffered output of the command - Line(String), - /// This is some kind of error with the program - Error(String), - /// Indication that the command is done - Done(Option), +#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ProcessGroupId(pub u32); + +#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[serde(rename_all = "kebab-case")] +pub enum OutputStrategy { + Inherit, + Collect, } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -#[serde(tag = "method", content = "params", rename_all = "camelCase")] -pub enum Input { - /// Create a new command, with the args - Command { command: String, args: Vec }, - /// Send the sigkill to the process - Kill(), - /// Send the sigterm to the process - Term(), +#[derive(Debug, Clone, Copy)] +pub struct RunCommand; +impl Serialize for RunCommand { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + Serialize::serialize(Self.as_str(), serializer) + } +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunCommandParams { + pub gid: Option, + pub command: String, + pub args: Vec, + pub output: OutputStrategy, +} +impl RpcMethod for RunCommand { + type Params = RunCommandParams; + type Response = ProcessId; + fn as_str<'a>(&'a self) -> &'a str { + "command" + } +} + +#[derive(Debug, Clone, Copy)] +pub struct ReadLineStdout; +impl Serialize for ReadLineStdout { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + Serialize::serialize(Self.as_str(), serializer) + } +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReadLineStdoutParams { + pub pid: ProcessId, +} +impl RpcMethod for ReadLineStdout { + type Params = ReadLineStdoutParams; + type Response = String; + fn as_str<'a>(&'a self) -> &'a str { + "read-line-stdout" + } +} + +#[derive(Debug, Clone, Copy)] +pub struct ReadLineStderr; +impl Serialize for ReadLineStderr { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + Serialize::serialize(Self.as_str(), serializer) + } +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReadLineStderrParams { + pub pid: ProcessId, +} +impl RpcMethod for ReadLineStderr { + type Params = ReadLineStderrParams; + type Response = String; + fn as_str<'a>(&'a self) -> &'a str { + "read-line-stderr" + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Output; +impl Serialize for Output { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + Serialize::serialize(Self.as_str(), serializer) + } +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OutputParams { + pub pid: ProcessId, +} +impl RpcMethod for Output { + type Params = OutputParams; + type Response = String; + fn as_str<'a>(&'a self) -> &'a str { + "output" + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SendSignal; +impl Serialize for SendSignal { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + Serialize::serialize(Self.as_str(), serializer) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SendSignalParams { + pub pid: ProcessId, + pub signal: u32, +} +impl RpcMethod for SendSignal { + type Params = SendSignalParams; + type Response = (); + fn as_str<'a>(&'a self) -> &'a str { + "signal" + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SignalGroup; +impl Serialize for SignalGroup { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + Serialize::serialize(Self.as_str(), serializer) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SignalGroupParams { + pub gid: ProcessGroupId, + pub signal: u32, +} +impl RpcMethod for SignalGroup { + type Params = SignalGroupParams; + type Response = (); + fn as_str<'a>(&'a self) -> &'a str { + "signal-group" + } } #[test] diff --git a/libs/embassy_container_init/src/main.rs b/libs/embassy_container_init/src/main.rs index c659a8c99..f19efe7bf 100644 --- a/libs/embassy_container_init/src/main.rs +++ b/libs/embassy_container_init/src/main.rs @@ -1,299 +1,388 @@ use std::collections::BTreeMap; +use std::ops::DerefMut; +use std::os::unix::process::ExitStatusExt; use std::process::Stdio; use std::sync::Arc; -use async_stream::stream; use embassy_container_init::{ - Input, InputJsonRpc, JsonRpc, Output, OutputJsonRpc, ProcessId, RpcId, + OutputParams, OutputStrategy, ProcessGroupId, ProcessId, ReadLineStderrParams, + ReadLineStdoutParams, RunCommandParams, SendSignalParams, SignalGroupParams, }; -use futures::{pin_mut, Stream, StreamExt}; +use futures::StreamExt; +use helpers::NonDetachingJoinHandle; +use nix::errno::Errno; +use nix::sys::signal::Signal; +use serde::{Deserialize, Serialize}; +use serde_json::json; use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::{Child, Command}; +use tokio::process::{Child, ChildStderr, ChildStdout, Command}; use tokio::select; -use tokio::sync::{oneshot, Mutex}; -use tracing::instrument; +use tokio::sync::{watch, Mutex}; +use yajrc::{Id, RpcError}; -const MAX_COMMANDS: usize = 10; - -enum DoneProgramStatus { - Wait(Result), - Killed, -} -/// Created from the child and rpc, to prove that the cmd was the one who died -struct DoneProgram { - id: RpcId, - status: DoneProgramStatus, +/// Outputs embedded in the JSONRpc output of the executable. +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] +enum Output { + Command(ProcessId), + ReadLineStdout(String), + ReadLineStderr(String), + Output(String), + Signal, + SignalGroup, } -/// Used to attach the running command with the rpc -struct ChildAndRpc { - id: RpcId, - child: Child, +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "method", content = "params", rename_all = "kebab-case")] +enum Input { + /// Run a new command, with the args + Command(RunCommandParams), + // /// Get a line of stdout from the command + // ReadLineStdout(ReadLineStdoutParams), + // /// Get a line of stderr from the command + // ReadLineStderr(ReadLineStderrParams), + /// Get output of command + Output(OutputParams), + /// Send the sigterm to the process + Signal(SendSignalParams), + /// Signal a group of processes + SignalGroup(SignalGroupParams), } -impl ChildAndRpc { - fn new(id: RpcId, mut command: tokio::process::Command) -> ::std::io::Result { - Ok(Self { - id, - child: command.spawn()?, +#[derive(Deserialize)] +struct IncomingRpc { + id: Id, + #[serde(flatten)] + input: Input, +} + +struct ChildInfo { + gid: Option, + child: Arc>>, + output: Option, +} + +struct InheritOutput { + _thread: NonDetachingJoinHandle<()>, + stdout: watch::Receiver, + stderr: watch::Receiver, +} + +#[derive(Clone)] +struct Handler { + children: Arc>>, +} +impl Handler { + fn new() -> Self { + Handler { + children: Arc::new(Mutex::new(BTreeMap::new())), + } + } + async fn handle(&self, req: Input) -> Result { + Ok(match req { + Input::Command(RunCommandParams { + gid, + command, + args, + output, + }) => Output::Command(self.command(gid, command, args, output).await?), + // Input::ReadLineStdout(ReadLineStdoutParams { pid }) => { + // Output::ReadLineStdout(self.read_line_stdout(pid).await?) + // } + // Input::ReadLineStderr(ReadLineStderrParams { pid }) => { + // Output::ReadLineStderr(self.read_line_stderr(pid).await?) + // } + Input::Output(OutputParams { pid }) => Output::Output(self.output(pid).await?), + Input::Signal(SendSignalParams { pid, signal }) => { + self.signal(pid, signal).await?; + Output::Signal + } + Input::SignalGroup(SignalGroupParams { gid, signal }) => { + self.signal_group(gid, signal).await?; + Output::SignalGroup + } }) } - fn id(&self) -> Option { - self.child.id().map(ProcessId) - } - async fn wait(&mut self) -> DoneProgram { - let status = DoneProgramStatus::Wait(self.child.wait().await); - DoneProgram { - id: self.id.clone(), - status, - } - } - async fn kill(mut self) -> DoneProgram { - if let Err(err) = self.child.kill().await { - let id = &self.id; - tracing::error!("Error while trying to kill a process {id:?}"); - tracing::debug!("{err:?}"); - } - DoneProgram { - id: self.id.clone(), - status: DoneProgramStatus::Killed, - } - } -} -/// Controlls the tracing + other io events -/// Can get the inputs from stdin -/// Can start a command from an intputrpc returning stream of outputs -/// Can output to stdout -#[derive(Debug, Clone)] -struct Io { - commands: Arc>>>, - ids: Arc>>, -} - -impl Io { - fn start() -> Self { - use tracing_error::ErrorLayer; - use tracing_subscriber::prelude::*; - use tracing_subscriber::{fmt, EnvFilter}; - - let filter_layer = EnvFilter::new("embassy_container_init=debug"); - let fmt_layer = fmt::layer().with_target(true); - - tracing_subscriber::registry() - .with(filter_layer) - .with(fmt_layer) - .with(ErrorLayer::default()) - .init(); - color_eyre::install().unwrap(); - Self { - commands: Default::default(), - ids: Default::default(), - } - } - - #[instrument] - fn command(&self, input: InputJsonRpc) -> impl Stream { - let io = self.clone(); - stream! { - let (id, command) = input.into_pair(); - match command { - Input::Command { - ref command, - ref args, - } => { - let mut cmd = Command::new(command); - cmd.args(args); - - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); - let mut child_and_rpc = match ChildAndRpc::new(id.clone(), cmd) { - Err(_e) => return, - Ok(a) => a, - }; - - if let Some(child_id) = child_and_rpc.id() { - io.ids.lock().await.insert(id.clone(), child_id.clone()); - yield JsonRpc::new(id.clone(), Output::ProcessId(child_id)); - } - - let stdout = child_and_rpc.child - .stdout - .take() - .expect("child did not have a handle to stdout"); - let stderr = child_and_rpc.child - .stderr - .take() - .expect("child did not have a handle to stderr"); - - let mut buff_out = BufReader::new(stdout).lines(); - let mut buff_err = BufReader::new(stderr).lines(); - - let spawned = tokio::spawn({ - let id = id.clone(); - async move { - let end_command_receiver = io.create_end_command(id.clone()).await; - tokio::select!{ - waited = child_and_rpc - .wait() => { - io.clean_id(&waited).await; - match &waited.status { - DoneProgramStatus::Wait(Ok(st)) => return st.code(), - DoneProgramStatus::Wait(Err(err)) => tracing::debug!("Child {id:?} got error: {err:?}"), - DoneProgramStatus::Killed => tracing::debug!("Child {id:?} already killed?"), + async fn command( + &self, + gid: Option, + command: String, + args: Vec, + output: OutputStrategy, + ) -> Result { + let mut cmd = Command::new(command); + cmd.args(args); + cmd.kill_on_drop(true); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + let mut child = cmd.spawn().map_err(|e| { + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!(e.to_string())); + err + })?; + let pid = ProcessId(child.id().ok_or_else(|| { + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!("Child has no pid")); + err + })?); + let output = match output { + OutputStrategy::Inherit => { + let (stdout_send, stdout) = watch::channel(String::new()); + let (stderr_send, stderr) = watch::channel(String::new()); + if let (Some(child_stdout), Some(child_stderr)) = + (child.stdout.take(), child.stderr.take()) + { + Some(InheritOutput { + _thread: tokio::spawn(async move { + tokio::join!( + async { + if let Err(e) = async { + let mut lines = BufReader::new(child_stdout).lines(); + while let Some(line) = lines.next_line().await? { + tracing::info!("({}): {}", pid.0, line); + let _ = stdout_send.send(line); } - - }, - _ = end_command_receiver => { - let status = child_and_rpc.kill().await; - io.clean_id(&status).await; + Ok::<_, std::io::Error>(()) + } + .await + { + tracing::error!( + "Error reading stdout of pid {}: {}", + pid.0, + e + ); + } }, - } - None - } - - }); - while let Ok(Some(line)) = buff_out.next_line().await { - let output = Output::Line(line); - let output = JsonRpc::new(id.clone(), output); - tracing::trace!("OutputJsonRpc {{ id, output_rpc }} = {:?}", output); - yield output; - } - while let Ok(Some(line)) = buff_err.next_line().await { - yield JsonRpc::new(id.clone(), Output::Error(line)); - } - let code = spawned.await.ok().flatten(); - yield JsonRpc::new(id, Output::Done(code)); - }, - Input::Kill() => { - io.trigger_end_command(id).await; + async { + if let Err(e) = async { + let mut lines = BufReader::new(child_stderr).lines(); + while let Some(line) = lines.next_line().await? { + tracing::warn!("({}): {}", pid.0, line); + let _ = stderr_send.send(line); + } + Ok::<_, std::io::Error>(()) + } + .await + { + tracing::error!( + "Error reading stdout of pid {}: {}", + pid.0, + e + ); + } + } + ); + }) + .into(), + stdout, + stderr, + }) + } else { + None } - Input::Term() => { - io.term_by_rpc(&id).await; + } + OutputStrategy::Collect => None, + }; + self.children.lock().await.insert( + pid, + ChildInfo { + gid, + child: Arc::new(Mutex::new(Some(child))), + output, + }, + ); + Ok(pid) + } + + async fn output(&self, pid: ProcessId) -> Result { + let not_found = || { + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!(format!("Child with pid {} not found", pid.0))); + err + }; + let mut child = { + self.children + .lock() + .await + .get(&pid) + .ok_or_else(not_found)? + .child + .clone() + } + .lock_owned() + .await; + if let Some(child) = child.take() { + let output = child.wait_with_output().await?; + if output.status.success() { + Ok(String::from_utf8(output.stdout).map_err(|_| yajrc::PARSE_ERROR)?) + } else { + Err(RpcError { + code: output + .status + .code() + .or_else(|| output.status.signal().map(|s| 128 + s)) + .unwrap_or(0), + message: "Command failed".into(), + data: Some(json!(String::from_utf8(if output.stderr.is_empty() { + output.stdout + } else { + output.stderr + }) + .map_err(|_| yajrc::PARSE_ERROR)?)), + }) + } + } else { + Err(not_found()) + } + } + + async fn signal(&self, pid: ProcessId, signal: u32) -> Result<(), RpcError> { + let not_found = || { + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!(format!("Child with pid {} not found", pid.0))); + err + }; + + Self::killall(pid, Signal::try_from(signal as i32)?)?; + + if signal == 9 { + self.children + .lock() + .await + .remove(&pid) + .ok_or_else(not_found)?; + } + Ok(()) + } + + async fn signal_group(&self, gid: ProcessGroupId, signal: u32) -> Result<(), RpcError> { + let mut to_kill = Vec::new(); + { + let mut children_ref = self.children.lock().await; + let children = std::mem::take(children_ref.deref_mut()); + for (pid, child_info) in children { + if child_info.gid == Some(gid) { + to_kill.push(pid); + } else { + children_ref.insert(pid, child_info); } } } + for pid in to_kill { + tracing::info!("Killing pid {}", pid.0); + Self::killall(pid, Signal::try_from(signal as i32)?)?; + } + + Ok(()) } - /// Used to get the string lines from the stdin - fn inputs(&self) -> impl Stream { - use std::io::BufRead; - let (sender, receiver) = tokio::sync::mpsc::channel(100); - tokio::task::spawn_blocking(move || { - let stdin = std::io::stdin(); - for line in stdin.lock().lines().flatten() { - tracing::trace!("Line = {}", line); - sender.blocking_send(line).unwrap(); + + fn killall(pid: ProcessId, signal: Signal) -> Result<(), RpcError> { + for proc in procfs::process::all_processes()? { + let stat = proc?.stat()?; + if ProcessId::from(stat.ppid) == pid { + Self::killall(stat.pid.into(), signal)?; + } + } + if let Err(e) = nix::sys::signal::kill(pid.into(), Some(signal)) { + if e != Errno::ESRCH { + tracing::error!("Failed to kill pid {}: {}", pid.0, e); + } + } + Ok(()) + } + + async fn graceful_exit(self) { + let kill_all = futures::stream::iter( + std::mem::take(self.children.lock().await.deref_mut()).into_iter(), + ) + .for_each_concurrent(None, |(pid, child)| async move { + let _ = Self::killall(pid, Signal::SIGTERM); + if let Some(child) = child.child.lock().await.take() { + let _ = child.wait_with_output().await; } }); - tokio_stream::wrappers::ReceiverStream::new(receiver) - } - - ///Convert a stream of string to stdout - async fn output(&self, outputs: impl Stream) { - pin_mut!(outputs); - while let Some(output) = outputs.next().await { - println!("{}", output); - } - } - - /// Helper for the command fn - /// Part of a pair for the signal map, that indicates that we should kill the command - async fn trigger_end_command(&self, id: RpcId) { - if let Some(command) = self.commands.lock().await.remove(&id) { - if command.send(()).is_err() { - tracing::trace!("Command {id:?} could not be ended, possible error or was done"); - } - } - } - - /// Helper for the command fn - /// Part of a pair for the signal map, that indicates that we should kill the command - async fn create_end_command(&self, id: RpcId) -> oneshot::Receiver<()> { - let (send, receiver) = oneshot::channel(); - if let Some(other_command) = self.commands.lock().await.insert(id.clone(), send) { - if other_command.send(()).is_err() { - tracing::trace!( - "Found other command {id:?} could not be ended, possible error or was done" - ); - } - } - receiver - } - - /// Used during cleaning up a procress - async fn clean_id( - &self, - done_program: &DoneProgram, - ) -> (Option, Option>) { - ( - self.ids.lock().await.remove(&done_program.id), - self.commands.lock().await.remove(&done_program.id), - ) - } - - /// Given the rpcid, will try and term the running command - async fn term_by_rpc(&self, rpc: &RpcId) { - let output = match self.remove_cmd_id(rpc).await { - Some(id) => { - let mut cmd = tokio::process::Command::new("kill"); - cmd.arg(format!("{}", id.0)); - cmd.output().await - } - None => return, - }; - match output { - Ok(_) => (), - Err(err) => { - tracing::error!("Could not kill rpc {rpc:?}"); - tracing::debug!("{err}"); - } - } - } - - /// Used as a cleanup - async fn term_all(self) { - let ids: Vec<_> = self.ids.lock().await.keys().cloned().collect(); - for id in ids { - self.term_by_rpc(&id).await; - } - } - - async fn remove_cmd_id(&self, rpc: &RpcId) -> Option { - self.ids.lock().await.remove(rpc) + kill_all.await } } + #[tokio::main] async fn main() { - use futures::StreamExt; use tokio::signal::unix::{signal, SignalKind}; let mut sigint = signal(SignalKind::interrupt()).unwrap(); let mut sigterm = signal(SignalKind::terminate()).unwrap(); let mut sigquit = signal(SignalKind::quit()).unwrap(); let mut sighangup = signal(SignalKind::hangup()).unwrap(); - let io = Io::start(); - let outputs = io - .inputs() - .filter_map(|x| async move { InputJsonRpc::maybe_parse(&x) }) - .flat_map_unordered(MAX_COMMANDS, |x| io.command(x).boxed()) - .filter_map(|x| async move { x.maybe_serialize() }); + + use tracing_error::ErrorLayer; + use tracing_subscriber::prelude::*; + use tracing_subscriber::{fmt, EnvFilter}; + + let filter_layer = EnvFilter::new("embassy_container_init=debug"); + let fmt_layer = fmt::layer().with_target(true); + + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .with(ErrorLayer::default()) + .init(); + color_eyre::install().unwrap(); + + let handler = Handler::new(); + let mut lines = BufReader::new(tokio::io::stdin()).lines(); + let handler_thread = async { + while let Some(line) = lines.next_line().await? { + let local_hdlr = handler.clone(); + tokio::spawn(async move { + if let Err(e) = async { + eprintln!("{}", line); + let req = serde_json::from_str::(&line)?; + match local_hdlr.handle(req.input).await { + Ok(output) => { + println!( + "{}", + json!({ "id": req.id, "jsonrpc": "2.0", "result": output }) + ) + } + Err(e) => { + println!("{}", json!({ "id": req.id, "jsonrpc": "2.0", "error": e })) + } + } + Ok::<_, serde_json::Error>(()) + } + .await + { + tracing::error!("Error parsing RPC request: {}", e); + tracing::debug!("{:?}", e); + } + }); + } + Ok::<_, std::io::Error>(()) + }; select! { - _ = io.output(outputs) => { - tracing::debug!("Done with inputs/outputs") + res = handler_thread => { + match res { + Ok(()) => tracing::debug!("Done with inputs/outputs"), + Err(e) => { + tracing::error!("Error reading RPC input: {}", e); + tracing::debug!("{:?}", e); + } + } }, _ = sigint.recv() => { - tracing::debug!("Sigint") + tracing::debug!("SIGINT"); }, _ = sigterm.recv() => { - tracing::debug!("Sig Term") + tracing::debug!("SIGTERM"); }, _ = sigquit.recv() => { - tracing::debug!("Sigquit") + tracing::debug!("SIGQUIT"); }, _ = sighangup.recv() => { - tracing::debug!("Sighangup") + tracing::debug!("SIGHUP"); } } - io.term_all().await; - ::std::process::exit(0); + handler.graceful_exit().await; + ::std::process::exit(0) } diff --git a/libs/helpers/Cargo.toml b/libs/helpers/Cargo.toml index 08549c670..0aba69884 100644 --- a/libs/helpers/Cargo.toml +++ b/libs/helpers/Cargo.toml @@ -11,6 +11,8 @@ futures = "0.3.21" models = { path = "../models" } pin-project = "1.0.11" serde = { version = "1.0", features = ["derive", "rc"] } +serde_json = "1.0" tokio = { version = "1.19.2", features = ["full"] } tokio-stream = { version = "0.1.9", features = ["io-util", "sync"] } -tracing = "0.1.35" \ No newline at end of file +tracing = "0.1.35" +yajrc = { version = "*", git = "https://github.com/dr-bonez/yajrc.git", branch = "develop" } diff --git a/libs/helpers/src/lib.rs b/libs/helpers/src/lib.rs index 9dcd370a2..4d93c2c96 100644 --- a/libs/helpers/src/lib.rs +++ b/libs/helpers/src/lib.rs @@ -10,9 +10,11 @@ use tokio::sync::oneshot; use tokio::task::{JoinError, JoinHandle, LocalSet}; mod byte_replacement_reader; +mod rpc_client; mod rsync; mod script_dir; pub use byte_replacement_reader::*; +pub use rpc_client::RpcClient; pub use rsync::*; pub use script_dir::*; diff --git a/libs/helpers/src/rpc_client.rs b/libs/helpers/src/rpc_client.rs new file mode 100644 index 000000000..ce8a94cb5 --- /dev/null +++ b/libs/helpers/src/rpc_client.rs @@ -0,0 +1,116 @@ +use std::collections::BTreeMap; +use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, Weak}; + +use models::{Error, ErrorKind, ResultExt}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::sync::{oneshot, Mutex}; +use yajrc::{Id, RpcError, RpcMethod, RpcRequest, RpcResponse}; + +use crate::NonDetachingJoinHandle; + +type DynWrite = Box; +type ResponseMap = BTreeMap>>; + +pub struct RpcClient { + id: AtomicUsize, + _handler: NonDetachingJoinHandle<()>, + writable: Weak>, +} +impl RpcClient { + pub fn new< + W: AsyncWrite + Unpin + Send + Sync + 'static, + R: AsyncRead + Unpin + Send + Sync + 'static, + >( + writer: W, + reader: R, + ) -> Self { + let writer: DynWrite = Box::new(writer); + let writable = Arc::new(Mutex::new((writer, ResponseMap::new()))); + let weak_writable = Arc::downgrade(&writable); + RpcClient { + id: AtomicUsize::new(0), + _handler: tokio::spawn(async move { + let mut lines = BufReader::new(reader).lines(); + while let Some(line) = lines.next_line().await.transpose() { + let mut w = writable.lock().await; + match line.map_err(Error::from).and_then(|l| { + serde_json::from_str::(&l) + .with_kind(ErrorKind::Deserialization) + }) { + Ok(l) => { + if let Some(id) = l.id { + if let Some(res) = w.1.remove(&id) { + if let Err(e) = res.send(l.result) { + tracing::warn!( + "RpcClient Response for Unknown ID: {:?}", + e + ); + } + } else { + tracing::warn!( + "RpcClient Response for Unknown ID: {:?}", + l.result + ); + } + } else { + tracing::info!("RpcClient Notification: {:?}", l); + } + } + Err(e) => { + tracing::error!("RpcClient Error: {}", e); + tracing::debug!("{:?}", e); + } + } + } + }) + .into(), + writable: weak_writable, + } + } + + pub async fn request( + &self, + method: T, + params: T::Params, + ) -> Result + where + T: Serialize, + T::Params: Serialize, + T::Response: for<'de> Deserialize<'de>, + { + if let Some(w) = self.writable.upgrade() { + let mut w = w.lock().await; + let id = Id::Number( + self.id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + .into(), + ); + w.0.write_all( + (serde_json::to_string(&RpcRequest { + id: Some(id.clone()), + method, + params, + })? + "\n") + .as_bytes(), + ) + .await + .map_err(|e| { + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!(e.to_string())); + err + })?; + let (send, recv) = oneshot::channel(); + w.1.insert(id, send); + drop(w); + if let Ok(val) = recv.await { + return Ok(serde_json::from_value(val?)?); + } + } + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!("RpcClient thread has terminated")); + Err(err) + } +} diff --git a/libs/helpers/src/rsync.rs b/libs/helpers/src/rsync.rs index e291c8754..70e2b7daf 100644 --- a/libs/helpers/src/rsync.rs +++ b/libs/helpers/src/rsync.rs @@ -1,13 +1,14 @@ -use color_eyre::eyre::eyre; use std::path::Path; -use crate::{const_true, ByteReplacementReader, NonDetachingJoinHandle}; +use color_eyre::eyre::eyre; use models::{Error, ErrorKind}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; +use crate::{const_true, ByteReplacementReader, NonDetachingJoinHandle}; + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct RsyncOptions { diff --git a/libs/js_engine/src/artifacts/loadModule.js b/libs/js_engine/src/artifacts/loadModule.js index 67359761d..f77371fe9 100644 --- a/libs/js_engine/src/artifacts/loadModule.js +++ b/libs/js_engine/src/artifacts/loadModule.js @@ -43,27 +43,31 @@ const readFile = ( const runDaemon = ( { command = requireParam("command"), args = [] } = requireParam("options"), ) => { - let id = Deno.core.opAsync("start_command", command, args); - let rpcId = id.then(x => x.rpcId) + let id = Deno.core.opAsync("start_command", command, args, "inherit", null); let processId = id.then(x => x.processId) let waitPromise = null; return { processId, - rpcId, async wait() { - waitPromise = waitPromise || Deno.core.opAsync("wait_command", await rpcId) + waitPromise = waitPromise || Deno.core.opAsync("wait_command", await processId) return waitPromise }, - async term() { - return Deno.core.opAsync("term_command", await rpcId) + async term(signal = 15) { + return Deno.core.opAsync("send_signal", await processId, 15) } } }; const runCommand = async ( { command = requireParam("command"), args = [], timeoutMillis = 30000 } = requireParam("options"), ) => { - let id = Deno.core.opAsync("start_command", command, args, timeoutMillis); - return Deno.core.opAsync("wait_command", await id) + let id = Deno.core.opAsync("start_command", command, args, "collect", timeoutMillis); + let pid = id.then(x => x.processId) + return Deno.core.opAsync("wait_command", await pid) +}; +const signalGroup = async ( + { gid = requireParam("gid"), signal = requireParam("signal") } = requireParam("gid and signal") +) => { + return Deno.core.opAsync("signal_group", gid, signal); }; const sleep = (timeMs = requireParam("timeMs"), ) => Deno.core.opAsync("sleep", timeMs); @@ -181,10 +185,17 @@ const effects = { runCommand, sleep, runDaemon, + signalGroup, runRsync }; -const runFunction = jsonPointerValue(mainModule, currentFunction); +const defaults = { + "handleSignal": (effects, { gid, signal }) => { + return effects.signalGroup({ gid, signal }) + } +} + +const runFunction = jsonPointerValue(mainModule, currentFunction) || jsonPointerValue(defaults, currentFunction); (async () => { if (typeof runFunction !== "function") { error(`Expecting ${currentFunction} to be a function`); diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index fa906da5f..0bdcc4be7 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -1,5 +1,4 @@ use std::collections::BTreeMap; -use std::future::Future; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; @@ -11,9 +10,9 @@ use deno_core::{ resolve_import, Extension, JsRuntime, ModuleLoader, ModuleSource, ModuleSourceFuture, ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot, }; -use embassy_container_init::RpcId; -use helpers::{script_dir, spawn_local, Rsync}; -use models::{ExecCommand, PackageId, ProcedureName, TermCommand, Version, VolumeId}; +use embassy_container_init::ProcessGroupId; +use helpers::{script_dir, spawn_local, RpcClient, Rsync}; +use models::{PackageId, ProcedureName, Version, VolumeId}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::io::AsyncReadExt; @@ -84,7 +83,6 @@ const SNAPSHOT_BYTES: &[u8] = include_bytes!("./artifacts/JS_SNAPSHOT.bin"); #[cfg(target_arch = "aarch64")] const SNAPSHOT_BYTES: &[u8] = include_bytes!("./artifacts/ARM_JS_SNAPSHOT.bin"); -type WaitFns = Arc>>>>>; #[derive(Clone)] struct JsContext { @@ -96,9 +94,8 @@ struct JsContext { volumes: Arc, input: Value, variable_args: Vec, - command_inserter: ExecCommand, - term_command: TermCommand, - wait_fns: WaitFns, + container_process_gid: ProcessGroupId, + container_rpc_client: Option>, rsyncs: Arc)>>, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] @@ -186,8 +183,8 @@ pub struct JsExecutionEnvironment { package_id: PackageId, version: Version, volumes: Arc, - command_inserter: ExecCommand, - term_command: TermCommand, + container_process_gid: ProcessGroupId, + container_rpc_client: Option>, } impl JsExecutionEnvironment { @@ -196,8 +193,8 @@ impl JsExecutionEnvironment { package_id: &PackageId, version: &Version, volumes: Box, - command_inserter: ExecCommand, - term_command: TermCommand, + container_process_gid: ProcessGroupId, + container_rpc_client: Option>, ) -> Result { let data_dir = data_directory.as_ref(); let base_directory = data_dir; @@ -231,8 +228,8 @@ impl JsExecutionEnvironment { version: version.clone(), volumes: volumes.into(), sandboxed: false, - command_inserter, - term_command, + container_process_gid, + container_rpc_client, }) } pub fn read_only_effects(mut self) -> Self { @@ -297,7 +294,8 @@ impl JsExecutionEnvironment { fns::start_command::decl(), fns::wait_command::decl(), fns::sleep::decl(), - fns::term_command::decl(), + fns::send_signal::decl(), + fns::signal_group::decl(), fns::rsync::decl(), fns::rsync_wait::decl(), fns::rsync_progress::decl(), @@ -330,9 +328,8 @@ impl JsExecutionEnvironment { sandboxed: self.sandboxed, input, variable_args, - command_inserter: self.command_inserter.clone(), - term_command: self.term_command.clone(), - wait_fns: Default::default(), + container_process_gid: self.container_process_gid, + container_rpc_client: self.container_rpc_client.clone(), rsyncs: Default::default(), }; let ext = Extension::builder() @@ -378,21 +375,25 @@ impl JsExecutionEnvironment { /// Note: Make sure that we have the assumption that all these methods are callable at any time, and all call restrictions should be in rust mod fns { + use std::cell::RefCell; use std::collections::BTreeMap; use std::convert::TryFrom; use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; use std::rc::Rc; - use std::{cell::RefCell, time::Duration}; + use std::time::Duration; use deno_core::anyhow::{anyhow, bail}; use deno_core::error::AnyError; use deno_core::*; - use embassy_container_init::{ProcessId, RpcId}; + use embassy_container_init::{ + OutputParams, OutputStrategy, ProcessGroupId, ProcessId, RunCommand, RunCommandParams, + SendSignal, SendSignalParams, SignalGroup, SignalGroupParams, + }; use helpers::{to_tmp_path, AtomicFile, Rsync, RsyncOptions}; - use models::{TermCommand, VolumeId}; + use models::VolumeId; use serde::{Deserialize, Serialize}; - use serde_json::Value; + use serde_json::{json, Value}; use tokio::io::AsyncWriteExt; use super::{AnswerState, JsContext}; @@ -930,22 +931,64 @@ mod fns { } #[op] - async fn term_command(state: Rc>, id: u32) -> Result<(), AnyError> { - let term_command_impl: TermCommand = { + async fn send_signal( + state: Rc>, + pid: u32, + signal: u32, + ) -> Result<(), AnyError> { + if let Some(rpc_client) = { let state = state.borrow(); let ctx = state.borrow::(); - ctx.term_command.clone() - }; - if let Err(err) = term_command_impl(embassy_container_init::RpcId::UInt(id)).await { - bail!("{}", err); + ctx.container_rpc_client.clone() + } { + rpc_client + .request( + SendSignal, + SendSignalParams { + pid: ProcessId(pid), + signal, + }, + ) + .await + .map_err(|e| anyhow!("{}: {:?}", e.message, e.data))?; + + Ok(()) + } else { + Err(anyhow!("No RpcClient for command operations")) + } + } + + #[op] + async fn signal_group( + state: Rc>, + gid: u32, + signal: u32, + ) -> Result<(), AnyError> { + if let Some(rpc_client) = { + let state = state.borrow(); + let ctx = state.borrow::(); + ctx.container_rpc_client.clone() + } { + rpc_client + .request( + SignalGroup, + SignalGroupParams { + gid: ProcessGroupId(gid), + signal, + }, + ) + .await + .map_err(|e| anyhow!("{}: {:?}", e.message, e.data))?; + + Ok(()) + } else { + Err(anyhow!("No RpcClient for command operations")) } - Ok(()) } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct StartCommand { - rpc_id: RpcId, process_id: ProcessId, } @@ -954,91 +997,77 @@ mod fns { state: Rc>, command: String, args: Vec, + output: OutputStrategy, timeout: Option, ) -> Result { - use embassy_container_init::Output; - let (command_inserter, wait_fns) = { + if let (gid, Some(rpc_client)) = { let state = state.borrow(); let ctx = state.borrow::(); - (ctx.command_inserter.clone(), ctx.wait_fns.clone()) - }; + (ctx.container_process_gid, ctx.container_rpc_client.clone()) + } { + let pid = rpc_client + .request( + RunCommand, + RunCommandParams { + gid: Some(gid), + command, + args, + output, + }, + ) + .await + .map_err(|e| anyhow!("{}: {:?}", e.message, e.data))?; - let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::(); - let rpc_id = match command_inserter( - command, - args.into_iter().collect(), - sender, - timeout.map(std::time::Duration::from_millis), - ) - .await - { - Err(err) => bail!(err), - Ok(rpc_id) => rpc_id, - }; - - let (process_id_send, process_id_recv) = tokio::sync::oneshot::channel::(); - - let wait = async move { - let mut answer = String::new(); - let mut command_error = String::new(); - let mut status: Option = None; - let mut process_id_send = Some(process_id_send); - while let Some(output) = receiver.recv().await { - match output { - Output::ProcessId(process_id) => { - if let Some(process_id_send) = process_id_send.take() { - if let Err(err) = process_id_send.send(process_id) { - tracing::error!( - "Could not get a process id {process_id:?} sent for {rpc_id:?}" - ); - tracing::debug!("{err:?}"); - } - } + if let Some(timeout) = timeout { + tokio::spawn(async move { + tokio::time::sleep(Duration::from_micros(timeout)).await; + if let Err(err) = rpc_client + .request(SendSignal, SendSignalParams { pid, signal: 9 }) + .await + .map_err(|e| anyhow!("{}: {:?}", e.message, e.data)) + { + tracing::warn!("Could not kill process {pid:?}"); + tracing::debug!("{err:?}"); } - Output::Line(value) => { - answer.push_str(&value); - answer.push('\n'); - } - Output::Error(error) => { - command_error.push_str(&error); - command_error.push('\n'); - } - Output::Done(error_code) => { - status = error_code; - break; - } - } - } - if !command_error.is_empty() { - if let Some(status) = status { - return ResultType::ErrorCode(status, command_error); - } - - return ResultType::Error(command_error); + }); } - ResultType::Result(serde_json::Value::String(answer)) - }; - wait_fns.lock().await.insert(rpc_id, Box::pin(wait)); - let process_id = process_id_recv.await?; - Ok(StartCommand { rpc_id, process_id }) + Ok(StartCommand { process_id: pid }) + } else { + Err(anyhow!("No RpcClient for command operations")) + } } #[op] - async fn wait_command(state: Rc>, id: RpcId) -> Result { - let wait_fns = { + async fn wait_command( + state: Rc>, + pid: ProcessId, + ) -> Result { + if let Some(rpc_client) = { let state = state.borrow(); let ctx = state.borrow::(); - ctx.wait_fns.clone() - }; - - let found_future = match wait_fns.lock().await.remove(&id) { - Some(a) => a, - None => bail!("No future for id {id:?}, could have been removed already"), - }; - - Ok(found_future.await) + ctx.container_rpc_client.clone() + } { + Ok( + match rpc_client + .request(embassy_container_init::Output, OutputParams { pid }) + .await + { + Ok(a) => ResultType::Result(json!(a)), + Err(e) => ResultType::ErrorCode( + e.code, + match e.data { + Some(Value::String(s)) => s, + e => format!("{:?}", e), + }, + ), + }, + ) + } else { + Err(anyhow!("No RpcClient for command operations")) + } } + #[op] async fn sleep(time_ms: u64) -> Result<(), AnyError> { tokio::time::sleep(Duration::from_millis(time_ms)).await; diff --git a/libs/models/Cargo.toml b/libs/models/Cargo.toml index 1f93b7c13..25b05247b 100644 --- a/libs/models/Cargo.toml +++ b/libs/models/Cargo.toml @@ -10,7 +10,6 @@ bollard = "0.13.0" color-eyre = "0.6.1" ed25519-dalek = { version = "1.0.1", features = ["serde"] } mbrman = "0.5.0" -embassy_container_init = { path = "../embassy_container_init" } emver = { version = "0.1", git = "https://github.com/Start9Labs/emver-rs.git", features = [ "serde", ] } diff --git a/libs/models/src/errors.rs b/libs/models/src/errors.rs index 31ad54156..a6fedd547 100644 --- a/libs/models/src/errors.rs +++ b/libs/models/src/errors.rs @@ -1,9 +1,11 @@ use std::fmt::Display; -use crate::InvalidId; use color_eyre::eyre::eyre; use patch_db::Revision; -use rpc_toolkit::{hyper::http::uri::InvalidUri, yajrc::RpcError}; +use rpc_toolkit::hyper::http::uri::InvalidUri; +use rpc_toolkit::yajrc::RpcError; + +use crate::InvalidId; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ErrorKind { diff --git a/libs/models/src/lib.rs b/libs/models/src/lib.rs index 28bf659ae..1cb9dca9c 100644 --- a/libs/models/src/lib.rs +++ b/libs/models/src/lib.rs @@ -7,7 +7,6 @@ mod interface_id; mod invalid_id; mod package_id; mod procedure_name; -mod type_aliases; mod version; mod volume_id; @@ -20,6 +19,5 @@ pub use interface_id::*; pub use invalid_id::*; pub use package_id::*; pub use procedure_name::*; -pub use type_aliases::*; pub use version::*; pub use volume_id::*; diff --git a/libs/models/src/procedure_name.rs b/libs/models/src/procedure_name.rs index 1e1d242b1..ae71e3ad5 100644 --- a/libs/models/src/procedure_name.rs +++ b/libs/models/src/procedure_name.rs @@ -14,6 +14,7 @@ pub enum ProcedureName { AutoConfig(PackageId), Health(HealthCheckId), Action(ActionId), + Signal, } impl ProcedureName { @@ -31,6 +32,7 @@ impl ProcedureName { ProcedureName::Action(id) => Some(format!("{}Action", id)), ProcedureName::Check(_) => None, ProcedureName::AutoConfig(_) => None, + ProcedureName::Signal => None, } } pub fn js_function_name(&self) -> Option { @@ -47,6 +49,7 @@ impl ProcedureName { ProcedureName::Action(id) => Some(format!("/action/{}", id)), ProcedureName::Check(id) => Some(format!("/dependencies/{}/check", id)), ProcedureName::AutoConfig(id) => Some(format!("/dependencies/{}/autoConfigure", id)), + ProcedureName::Signal => Some("/handleSignal".to_string()), } } } diff --git a/libs/models/src/type_aliases.rs b/libs/models/src/type_aliases.rs deleted file mode 100644 index 0fdb79093..000000000 --- a/libs/models/src/type_aliases.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; - -use embassy_container_init::RpcId; -use tokio::sync::mpsc::UnboundedSender; - -/// Used by the js-executor, it is the ability to just create a command in an already running exec -pub type ExecCommand = Arc< - dyn Fn( - String, - Vec, - UnboundedSender, - Option, - ) -> Pin> + 'static>> - + Send - + Sync - + 'static, ->; - -/// Used by the js-executor, it is the ability to just create a command in an already running exec -pub type TermCommand = Arc< - dyn Fn(RpcId) -> Pin> + 'static>> - + Send - + Sync - + 'static, ->; diff --git a/system-images/compat/Cargo.lock b/system-images/compat/Cargo.lock index 57d52d764..9f589b0d4 100644 --- a/system-images/compat/Cargo.lock +++ b/system-images/compat/Cargo.lock @@ -1052,6 +1052,7 @@ dependencies = [ "async-stream", "color-eyre", "futures", + "imbl 2.0.0", "serde", "serde_json", "tokio",