diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 9cd9c90c7..09f76e872 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -119,6 +119,17 @@ dependencies = [ "syn 1.0.103", ] +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -596,6 +607,15 @@ dependencies = [ "tracing-error 0.2.0", ] +[[package]] +name = "concurrent-queue" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.0" @@ -1770,6 +1790,7 @@ version = "0.1.0" dependencies = [ "color-eyre", "futures", + "lazy_async_pool", "models", "pin-project", "serde", @@ -2260,6 +2281,16 @@ dependencies = [ "regex", ] +[[package]] +name = "lazy_async_pool" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cf485d4867e0714e35c1652e736bcf892d28fceecca01036764575db64ba84" +dependencies = [ + "async-channel", + "futures", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -4801,9 +4832,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.2" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", "bytes", @@ -4816,7 +4847,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys 0.42.0", ] [[package]] diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 8035b075f..359ddb0a0 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -136,8 +136,8 @@ sqlx = { version = "0.6.0", features = [ stderrlog = "0.5.3" tar = "0.4.38" thiserror = "1.0.31" -tokio = { version = "1.21.2", features = ["full"] } -tokio-stream = { version = "0.1.9", features = ["io-util", "sync"] } +tokio = { version = "1.23", features = ["full"] } +tokio-stream = { version = "0.1.11", features = ["io-util", "sync", "net"] } tokio-tar = { git = "https://github.com/dr-bonez/tokio-tar.git" } tokio-tungstenite = { version = "0.17.1", features = ["native-tls"] } tokio-rustls = "0.23.4" diff --git a/backend/src/backup/restore.rs b/backend/src/backup/restore.rs index 6fe2ebbfc..df03a4dd3 100644 --- a/backend/src/backup/restore.rs +++ b/backend/src/backup/restore.rs @@ -1,18 +1,17 @@ +use std::collections::BTreeMap; use std::path::Path; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use std::{collections::BTreeMap, pin::Pin}; use clap::ArgMatches; use color_eyre::eyre::eyre; -use futures::{future::BoxFuture, stream, Future}; +use futures::{future::BoxFuture, stream}; use futures::{FutureExt, StreamExt}; use openssl::x509::X509; use patch_db::{DbHandle, PatchDbHandle}; use rpc_toolkit::command; use tokio::fs::File; -use tokio::task::JoinHandle; use torut::onion::OnionAddressV3; use tracing::instrument; diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index 70fcf818f..ca749f9c1 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -9,7 +9,7 @@ use std::time::Duration; use bollard::container::{KillContainerOptions, StopContainerOptions}; use color_eyre::eyre::eyre; use embassy_container_init::{ProcessGroupId, SignalGroupParams}; -use helpers::RpcClient; +use helpers::UnixRpcClient; use nix::sys::signal::Signal; use patch_db::DbHandle; use sqlx::{Executor, Postgres}; @@ -359,7 +359,7 @@ impl Manager { gid } - pub fn rpc_client(&self) -> Option> { + pub fn rpc_client(&self) -> Option> { self.shared .persistent_container .as_ref() @@ -449,7 +449,7 @@ async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc, - rpc_client: Receiver>, + rpc_client: Receiver>, } impl PersistentContainer { @@ -471,12 +471,12 @@ impl PersistentContainer { async fn spawn_persistent_container( seed: Arc, container: DockerContainer, -) -> Result<(NonDetachingJoinHandle<()>, Receiver>), Error> { +) -> Result<(NonDetachingJoinHandle<()>, Receiver>), Error> { let (send_inserter, inserter) = oneshot::channel(); Ok(( tokio::task::spawn(async move { - let mut inserter_send: Option>> = None; - let mut send_inserter: Option>>> = Some(send_inserter); + let mut inserter_send: Option>> = None; + let mut send_inserter: Option>>> = Some(send_inserter); loop { if let Err(e) = async { let interfaces = main_interfaces(&*seed)?; @@ -518,6 +518,7 @@ async fn spawn_persistent_container( } else { break; } + tokio::time::sleep(Duration::from_millis(200)).await; } }) .into(), @@ -528,7 +529,7 @@ async fn spawn_persistent_container( async fn long_running_docker( seed: &ManagerSeed, container: &DockerContainer, -) -> Result<(LongRunning, RpcClient), Error> { +) -> Result<(LongRunning, UnixRpcClient), Error> { container .long_running_execute( &seed.ctx, diff --git a/backend/src/procedure/docker.rs b/backend/src/procedure/docker.rs index a7118185a..01fc61518 100644 --- a/backend/src/procedure/docker.rs +++ b/backend/src/procedure/docker.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::ffi::{OsStr, OsString}; use std::net::Ipv4Addr; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::time::Duration; use async_stream::stream; @@ -11,13 +11,16 @@ use color_eyre::eyre::eyre; use color_eyre::Report; use futures::future::Either as EitherFuture; use futures::TryStreamExt; -use helpers::{NonDetachingJoinHandle, RpcClient}; +use helpers::{NonDetachingJoinHandle, UnixRpcClient}; use nix::sys::signal; use nix::unistd::Pid; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader}; +use tokio::{ + io::{AsyncBufRead, AsyncBufReadExt, BufReader}, + time::timeout, +}; use tracing::instrument; use super::ProcedureName; @@ -66,6 +69,7 @@ pub struct DockerContainer { #[serde(default)] pub system: bool, } + impl DockerContainer { /// We created a new exec runner, where we are going to be passing the commands for it to run. /// Idea is that we are going to send it command and get the inputs be filtered back from the manager. @@ -78,9 +82,16 @@ impl DockerContainer { pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, - ) -> Result<(LongRunning, RpcClient), Error> { + ) -> Result<(LongRunning, UnixRpcClient), Error> { let container_name = DockerProcedure::container_name(pkg_id, None); + let socket_path = + Path::new("/tmp/embassy/containers").join(format!("{pkg_id}_{pkg_version}")); + if tokio::fs::metadata(&socket_path).await.is_ok() { + tokio::fs::remove_dir_all(&socket_path).await?; + } + tokio::fs::create_dir_all(&socket_path).await?; + let mut cmd = LongRunning::setup_long_running_docker_cmd( self, ctx, @@ -88,20 +99,13 @@ impl DockerContainer { volumes, pkg_id, pkg_version, + &socket_path, ) .await?; let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; - let client = - if let (Some(stdin), Some(stdout)) = (handle.stdin.take(), handle.stdout.take()) { - RpcClient::new(stdin, stdout) - } else { - return Err(Error::new( - eyre!("No stdin/stdout handle for container init"), - crate::ErrorKind::Incoherent, - )); - }; + let client = UnixRpcClient::new(socket_path.join("rpc.sock")); let running_output = NonDetachingJoinHandle::from(tokio::spawn(async move { if let Err(err) = handle @@ -114,6 +118,19 @@ impl DockerContainer { } })); + { + let socket = socket_path.join("rpc.sock"); + if let Err(_err) = timeout(Duration::from_secs(1), async move { + while tokio::fs::metadata(&socket).await.is_err() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + { + tracing::error!("Timed out waiting for init to create socket"); + } + } + Ok((LongRunning { running_output }, client)) } } @@ -771,9 +788,10 @@ impl LongRunning { volumes: &Volumes, pkg_id: &PackageId, pkg_version: &Version, + socket_path: &Path, ) -> Result { - const INIT_EXEC: &str = "/start9/embassy_container_init"; - const BIND_LOCATION: &str = "/usr/lib/embassy/container"; + const INIT_EXEC: &str = "/start9/bin/embassy_container_init"; + const BIND_LOCATION: &str = "/usr/lib/embassy/container/"; tracing::trace!("setup_long_running_docker_cmd"); LongRunning::cleanup_previous_container(ctx, container_name).await?; @@ -799,7 +817,14 @@ impl LongRunning { .arg("--network=start9") .arg(format!("--add-host=embassy:{}", Ipv4Addr::from(HOST_IP))) .arg("--mount") - .arg(format!("type=bind,src={BIND_LOCATION},dst=/start9")) + .arg(format!( + "type=bind,src={BIND_LOCATION},dst=/start9/bin/,readonly" + )) + .arg("--mount") + .arg(format!( + "type=bind,src={input},dst=/start9/sockets/", + input = socket_path.display() + )) .arg("--name") .arg(&container_name) .arg(format!("--hostname={}", &container_name)) diff --git a/backend/src/procedure/js_scripts.rs b/backend/src/procedure/js_scripts.rs index 8d7091a67..96629fbe1 100644 --- a/backend/src/procedure/js_scripts.rs +++ b/backend/src/procedure/js_scripts.rs @@ -4,7 +4,7 @@ use std::time::Duration; use color_eyre::eyre::eyre; use embassy_container_init::{ProcessGroupId, SignalGroup, SignalGroupParams}; -use helpers::RpcClient; +use helpers::UnixRpcClient; pub use js_engine::JsError; use js_engine::{JsExecutionEnvironment, PathForVolumeId}; use models::{ErrorKind, VolumeId}; @@ -68,7 +68,7 @@ impl JsProcedure { input: Option, timeout: Option, gid: ProcessGroupId, - rpc_client: Option>, + rpc_client: Option>, ) -> Result, Error> { let cleaner_client = rpc_client.clone(); let cleaner = GeneralGuard::new(move || { @@ -96,7 +96,7 @@ impl JsProcedure { ) .await? .run_action(name, input, self.args.clone()); - let output: ErrorValue = match timeout { + let output: Option = match timeout { Some(timeout_duration) => tokio::time::timeout(timeout_duration, running_action) .await .map_err(|_| (JsError::Timeout, "Timed out. Retrying soon...".to_owned()))??, @@ -134,7 +134,7 @@ impl JsProcedure { .await? .read_only_effects() .run_action(name, input, self.args.clone()); - let output: ErrorValue = match timeout { + let output: Option = match timeout { Some(timeout_duration) => tokio::time::timeout(timeout_duration, running_action) .await .map_err(|_| (JsError::Timeout, "Timed out. Retrying soon...".to_owned()))??, @@ -149,8 +149,9 @@ impl JsProcedure { } fn unwrap_known_error( - error_value: ErrorValue, + error_value: Option, ) -> Result { + let error_value = error_value.unwrap_or_else(|| ErrorValue::Result(serde_json::Value::Null)); match error_value { ErrorValue::Error(error) => Err((JsError::Javascript, error)), ErrorValue::ErrorCode((code, message)) => Err((JsError::Code(code), message)), diff --git a/backend/src/s9pk/reader.rs b/backend/src/s9pk/reader.rs index fe6bb7b42..eefc4ae63 100644 --- a/backend/src/s9pk/reader.rs +++ b/backend/src/s9pk/reader.rs @@ -231,16 +231,6 @@ impl S9pkReader { &validated_image_ids, )?; - #[cfg(feature = "js_engine")] - if man.containers.is_some() - || matches!(man.main, crate::procedure::PackageProcedure::Script(_)) - { - return Err(Error::new( - eyre!("Right now we don't support the containers and the long running main"), - crate::ErrorKind::ValidateS9pk, - )); - } - if man.containers.is_some() && matches!(man.main, crate::procedure::PackageProcedure::Docker(_)) { diff --git a/libs/Cargo.lock b/libs/Cargo.lock index e09c508a8..393b19c55 100644 --- a/libs/Cargo.lock +++ b/libs/Cargo.lock @@ -76,6 +76,17 @@ dependencies = [ "syn", ] +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -387,6 +398,15 @@ dependencies = [ "tracing-error 0.2.0", ] +[[package]] +name = "concurrent-queue" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -1189,6 +1209,7 @@ version = "0.1.0" dependencies = [ "color-eyre", "futures", + "lazy_async_pool", "models", "pin-project", "serde", @@ -1558,6 +1579,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838" +[[package]] +name = "lazy_async_pool" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cf485d4867e0714e35c1652e736bcf892d28fceecca01036764575db64ba84" +dependencies = [ + "async-channel", + "futures", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -3568,9 +3599,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.2" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", "bytes", @@ -3583,7 +3614,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys 0.42.0", ] [[package]] diff --git a/libs/embassy_container_init/Cargo.toml b/libs/embassy_container_init/Cargo.toml index 3bc6b5410..27fb2a834 100644 --- a/libs/embassy_container_init/Cargo.toml +++ b/libs/embassy_container_init/Cargo.toml @@ -2,6 +2,7 @@ name = "embassy_container_init" version = "0.1.0" edition = "2021" +rust = "1.66" [features] dev = [] @@ -12,6 +13,7 @@ unstable = [] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] async-stream = "0.3" +# cgroups-rs = "0.2" color-eyre = "0.6" futures = "0.3" serde = { version = "1", features = ["derive", "rc"] } @@ -20,7 +22,7 @@ helpers = { path = "../helpers" } imbl = "2" nix = "0.25" tokio = { version = "1", features = ["full"] } -tokio-stream = { version = "0.1.11" } +tokio-stream = { version = "0.1", features = ["io-util", "sync", "net"] } tracing = "0.1" tracing-error = "0.2" tracing-futures = "0.2" diff --git a/libs/embassy_container_init/src/main.rs b/libs/embassy_container_init/src/main.rs index f19efe7bf..6612b4ec7 100644 --- a/libs/embassy_container_init/src/main.rs +++ b/libs/embassy_container_init/src/main.rs @@ -14,7 +14,7 @@ use nix::errno::Errno; use nix::sys::signal::Signal; use serde::{Deserialize, Serialize}; use serde_json::json; -use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStderr, ChildStdout, Command}; use tokio::select; use tokio::sync::{watch, Mutex}; @@ -68,14 +68,22 @@ struct InheritOutput { stderr: watch::Receiver, } +struct HandlerMut { + processes: BTreeMap, + // groups: BTreeMap, +} + #[derive(Clone)] struct Handler { - children: Arc>>, + children: Arc>, } impl Handler { fn new() -> Self { Handler { - children: Arc::new(Mutex::new(BTreeMap::new())), + children: Arc::new(Mutex::new(HandlerMut { + processes: BTreeMap::new(), + // groups: BTreeMap::new(), + })), } } async fn handle(&self, req: Input) -> Result { @@ -184,7 +192,7 @@ impl Handler { } OutputStrategy::Collect => None, }; - self.children.lock().await.insert( + self.children.lock().await.processes.insert( pid, ChildInfo { gid, @@ -204,7 +212,7 @@ impl Handler { let mut child = { self.children .lock() - .await + .await.processes .get(&pid) .ok_or_else(not_found)? .child @@ -249,7 +257,7 @@ impl Handler { if signal == 9 { self.children .lock() - .await + .await.processes .remove(&pid) .ok_or_else(not_found)?; } @@ -260,12 +268,12 @@ impl Handler { let mut to_kill = Vec::new(); { let mut children_ref = self.children.lock().await; - let children = std::mem::take(children_ref.deref_mut()); + let children = std::mem::take(&mut children_ref.deref_mut().processes); for (pid, child_info) in children { if child_info.gid == Some(gid) { to_kill.push(pid); } else { - children_ref.insert(pid, child_info); + children_ref.processes.insert(pid, child_info); } } } @@ -294,7 +302,7 @@ impl Handler { async fn graceful_exit(self) { let kill_all = futures::stream::iter( - std::mem::take(self.children.lock().await.deref_mut()).into_iter(), + std::mem::take(&mut self.children.lock().await.deref_mut().processes).into_iter(), ) .for_each_concurrent(None, |(pid, child)| async move { let _ = Self::killall(pid, Signal::SIGTERM); @@ -329,34 +337,58 @@ async fn main() { color_eyre::install().unwrap(); let handler = Handler::new(); - let mut lines = BufReader::new(tokio::io::stdin()).lines(); let handler_thread = async { - while let Some(line) = lines.next_line().await? { - let local_hdlr = handler.clone(); + let listener = tokio::net::UnixListener::bind("/start9/sockets/rpc.sock")?; + loop { + let (stream, _) = listener.accept().await?; + let (r, w) = stream.into_split(); + let mut lines = BufReader::new(r).lines(); + let handler = handler.clone(); tokio::spawn(async move { - if let Err(e) = async { - eprintln!("{}", line); - let req = serde_json::from_str::(&line)?; - match local_hdlr.handle(req.input).await { - Ok(output) => { - println!( - "{}", - json!({ "id": req.id, "jsonrpc": "2.0", "result": output }) - ) + let w = Arc::new(Mutex::new(w)); + while let Some(line) = lines.next_line().await.transpose() { + + let handler = handler.clone(); + let w = w.clone(); + tokio::spawn(async move { + if let Err(e) = async { + let req = serde_json::from_str::(&line?)?; + match handler.handle(req.input).await { + Ok(output) => { + if let Err(err) = w.lock().await.write_all( + format!("{}\n", json!({ "id": req.id, "jsonrpc": "2.0", "result": output })) + .as_bytes(), + ) + .await { + tracing::error!("Error sending to {id:?}", id = req.id); + } + } + Err(e) => + if let Err(err) = w + .lock() + .await + .write_all( + format!("{}\n", json!({ "id": req.id, "jsonrpc": "2.0", "error": e })) + .as_bytes(), + ) + .await { + + tracing::error!("Handle + Error sending to {id:?}", id = req.id); + }, + } + Ok::<_, color_eyre::Report>(()) } - Err(e) => { - println!("{}", json!({ "id": req.id, "jsonrpc": "2.0", "error": e })) + .await + { + tracing::error!("Error parsing RPC request: {}", e); + tracing::debug!("{:?}", e); } - } - Ok::<_, serde_json::Error>(()) - } - .await - { - tracing::error!("Error parsing RPC request: {}", e); - tracing::debug!("{:?}", e); + }); } + Ok::<_, std::io::Error>(()) }); } + #[allow(unreachable_code)] Ok::<_, std::io::Error>(()) }; diff --git a/libs/helpers/Cargo.toml b/libs/helpers/Cargo.toml index 0aba69884..02258e8fb 100644 --- a/libs/helpers/Cargo.toml +++ b/libs/helpers/Cargo.toml @@ -8,11 +8,12 @@ edition = "2021" [dependencies] color-eyre = "0.6.2" futures = "0.3.21" +lazy_async_pool = "0.3.3" models = { path = "../models" } pin-project = "1.0.11" serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" -tokio = { version = "1.19.2", features = ["full"] } +tokio = { version = "1.23", features = ["full"] } tokio-stream = { version = "0.1.9", features = ["io-util", "sync"] } tracing = "0.1.35" yajrc = { version = "*", git = "https://github.com/dr-bonez/yajrc.git", branch = "develop" } diff --git a/libs/helpers/src/lib.rs b/libs/helpers/src/lib.rs index 4d93c2c96..f20cd400f 100644 --- a/libs/helpers/src/lib.rs +++ b/libs/helpers/src/lib.rs @@ -14,7 +14,7 @@ mod rpc_client; mod rsync; mod script_dir; pub use byte_replacement_reader::*; -pub use rpc_client::RpcClient; +pub use rpc_client::{RpcClient, UnixRpcClient}; pub use rsync::*; pub use script_dir::*; diff --git a/libs/helpers/src/rpc_client.rs b/libs/helpers/src/rpc_client.rs index ce8a94cb5..bdb505b40 100644 --- a/libs/helpers/src/rpc_client.rs +++ b/libs/helpers/src/rpc_client.rs @@ -1,11 +1,17 @@ use std::collections::BTreeMap; +use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, Weak}; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use lazy_async_pool::Pool; use models::{Error, ErrorKind, ResultExt}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; +use tokio::runtime::Handle; use tokio::sync::{oneshot, Mutex}; use yajrc::{Id, RpcError, RpcMethod, RpcRequest, RpcResponse}; @@ -14,10 +20,13 @@ use crate::NonDetachingJoinHandle; type DynWrite = Box; type ResponseMap = BTreeMap>>; +const MAX_TRIES: u64 = 3; + pub struct RpcClient { - id: AtomicUsize, + id: Arc, _handler: NonDetachingJoinHandle<()>, - writable: Weak>, + writer: DynWrite, + responses: Weak>, } impl RpcClient { pub fn new< @@ -26,23 +35,23 @@ impl RpcClient { >( writer: W, reader: R, + id: Arc, ) -> Self { let writer: DynWrite = Box::new(writer); - let writable = Arc::new(Mutex::new((writer, ResponseMap::new()))); - let weak_writable = Arc::downgrade(&writable); + let responses = Arc::new(Mutex::new(ResponseMap::new())); + let weak_responses = Arc::downgrade(&responses); RpcClient { - id: AtomicUsize::new(0), + id, _handler: tokio::spawn(async move { let mut lines = BufReader::new(reader).lines(); while let Some(line) = lines.next_line().await.transpose() { - let mut w = writable.lock().await; match line.map_err(Error::from).and_then(|l| { serde_json::from_str::(&l) .with_kind(ErrorKind::Deserialization) }) { Ok(l) => { if let Some(id) = l.id { - if let Some(res) = w.1.remove(&id) { + if let Some(res) = responses.lock().await.remove(&id) { if let Err(e) = res.send(l.result) { tracing::warn!( "RpcClient Response for Unknown ID: {:?}", @@ -67,7 +76,88 @@ impl RpcClient { } }) .into(), - writable: weak_writable, + writer, + responses: weak_responses, + } + } + + pub async fn request( + &mut self, + method: T, + params: T::Params, + ) -> Result + where + T: Serialize, + T::Params: Serialize, + T::Response: for<'de> Deserialize<'de>, + { + let id = Id::Number( + self.id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + .into(), + ); + let request = RpcRequest { + id: Some(id.clone()), + method, + params, + }; + if let Some(w) = self.responses.upgrade() { + let (send, recv) = oneshot::channel(); + w.lock().await.insert(id.clone(), send); + self.writer + .write_all((serde_json::to_string(&request)? + "\n").as_bytes()) + .await + .map_err(|e| { + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!(e.to_string())); + err + })?; + match recv.await { + Ok(val) => { + return Ok(serde_json::from_value(val?)?); + } + Err(_err) => { + tokio::task::yield_now().await; + } + } + } + tracing::debug!( + "Client has finished {:?}", + futures::poll!(&mut self._handler) + ); + let mut err = yajrc::INTERNAL_ERROR.clone(); + err.data = Some(json!("RpcClient thread has terminated")); + Err(err) + } +} + +pub struct UnixRpcClient { + pool: Pool< + RpcClient, + Box BoxFuture<'static, Result> + Send + Sync>, + BoxFuture<'static, Result>, + std::io::Error, + >, +} +impl UnixRpcClient { + pub fn new(path: PathBuf) -> Self { + let rt = Handle::current(); + let id = Arc::new(AtomicUsize::new(0)); + Self { + pool: Pool::new( + 0, + Box::new(move || { + let path = path.clone(); + let id = id.clone(); + rt.spawn(async move { + let (r, w) = UnixStream::connect(&path).await?.into_split(); + Ok(RpcClient::new(w, r, id)) + }) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .and_then(|x| async move { x }) + .boxed() + }), + ), } } @@ -77,40 +167,26 @@ impl RpcClient { params: T::Params, ) -> Result where - T: Serialize, - T::Params: Serialize, + T: Serialize + Clone, + T::Params: Serialize + Clone, T::Response: for<'de> Deserialize<'de>, { - if let Some(w) = self.writable.upgrade() { - let mut w = w.lock().await; - let id = Id::Number( - self.id - .fetch_add(1, std::sync::atomic::Ordering::SeqCst) - .into(), - ); - w.0.write_all( - (serde_json::to_string(&RpcRequest { - id: Some(id.clone()), - method, - params, - })? + "\n") - .as_bytes(), - ) - .await - .map_err(|e| { - let mut err = yajrc::INTERNAL_ERROR.clone(); - err.data = Some(json!(e.to_string())); - err - })?; - let (send, recv) = oneshot::channel(); - w.1.insert(id, send); - drop(w); - if let Ok(val) = recv.await { - return Ok(serde_json::from_value(val?)?); + let mut tries = 0; + let res = loop { + tries += 1; + let mut client = self.pool.clone().get().await?; + let res = client.request(method.clone(), params.clone()).await; + match &res { + Err(e) if e.code == yajrc::INTERNAL_ERROR.code => { + client.destroy(); + } + _ => break res, } - } - let mut err = yajrc::INTERNAL_ERROR.clone(); - err.data = Some(json!("RpcClient thread has terminated")); - Err(err) + if tries > MAX_TRIES { + tracing::warn!("Max Tries exceeded"); + break res; + } + }; + res } } diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index 0639e1598..6fc414a64 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -11,7 +11,7 @@ use deno_core::{ ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot, }; use embassy_container_init::ProcessGroupId; -use helpers::{script_dir, spawn_local, RpcClient, Rsync}; +use helpers::{script_dir, spawn_local, Rsync, UnixRpcClient}; use models::{PackageId, ProcedureName, Version, VolumeId}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -95,7 +95,7 @@ struct JsContext { input: Value, variable_args: Vec, container_process_gid: ProcessGroupId, - container_rpc_client: Option>, + container_rpc_client: Option>, rsyncs: Arc)>>, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] @@ -184,7 +184,7 @@ pub struct JsExecutionEnvironment { version: Version, volumes: Arc, container_process_gid: ProcessGroupId, - container_rpc_client: Option>, + container_rpc_client: Option>, } impl JsExecutionEnvironment { @@ -194,7 +194,7 @@ impl JsExecutionEnvironment { version: &Version, volumes: Box, container_process_gid: ProcessGroupId, - container_rpc_client: Option>, + container_rpc_client: Option>, ) -> Result { let data_dir = data_directory.as_ref(); let base_directory = data_dir; diff --git a/system-images/compat/Cargo.lock b/system-images/compat/Cargo.lock index 9f426efa8..c0ec35015 100644 --- a/system-images/compat/Cargo.lock +++ b/system-images/compat/Cargo.lock @@ -98,6 +98,17 @@ dependencies = [ "term", ] +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -528,6 +539,15 @@ dependencies = [ "serde_yaml 0.8.26", ] +[[package]] +name = "concurrent-queue" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.0" @@ -1510,6 +1530,7 @@ version = "0.1.0" dependencies = [ "color-eyre", "futures", + "lazy_async_pool", "models", "pin-project", "serde", @@ -1928,6 +1949,16 @@ dependencies = [ "regex", ] +[[package]] +name = "lazy_async_pool" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cf485d4867e0714e35c1652e736bcf892d28fceecca01036764575db64ba84" +dependencies = [ + "async-channel", + "futures", +] + [[package]] name = "lazy_static" version = "1.4.0"