Feat/long running sockets (#2090)

* wip: Working on sockets, but can't connect?

* simplify unix socket connection

* wip: Get responses back from the server at least once.

* WIP: Get the sockets working'

* feat: Sockets can start/ stop/ config/ properites/ uninstall

* fix: Restart services

* Fix: Sockets work and can stop main and not kill client

Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
J M
2023-01-12 09:58:14 -07:00
committed by Aiden McClelland
parent 274db6f606
commit 928de47d1d
15 changed files with 346 additions and 126 deletions

37
libs/Cargo.lock generated
View File

@@ -76,6 +76,17 @@ dependencies = [
"syn",
]
[[package]]
name = "async-channel"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-stream"
version = "0.3.3"
@@ -387,6 +398,15 @@ dependencies = [
"tracing-error 0.2.0",
]
[[package]]
name = "concurrent-queue"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "convert_case"
version = "0.4.0"
@@ -1189,6 +1209,7 @@ version = "0.1.0"
dependencies = [
"color-eyre",
"futures",
"lazy_async_pool",
"models",
"pin-project",
"serde",
@@ -1558,6 +1579,16 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838"
[[package]]
name = "lazy_async_pool"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06cf485d4867e0714e35c1652e736bcf892d28fceecca01036764575db64ba84"
dependencies = [
"async-channel",
"futures",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@@ -3568,9 +3599,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.21.2"
version = "1.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46"
dependencies = [
"autocfg",
"bytes",
@@ -3583,7 +3614,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi",
"windows-sys 0.42.0",
]
[[package]]

View File

@@ -2,6 +2,7 @@
name = "embassy_container_init"
version = "0.1.0"
edition = "2021"
rust = "1.66"
[features]
dev = []
@@ -12,6 +13,7 @@ unstable = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-stream = "0.3"
# cgroups-rs = "0.2"
color-eyre = "0.6"
futures = "0.3"
serde = { version = "1", features = ["derive", "rc"] }
@@ -20,7 +22,7 @@ helpers = { path = "../helpers" }
imbl = "2"
nix = "0.25"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1.11" }
tokio-stream = { version = "0.1", features = ["io-util", "sync", "net"] }
tracing = "0.1"
tracing-error = "0.2"
tracing-futures = "0.2"

View File

@@ -14,7 +14,7 @@ use nix::errno::Errno;
use nix::sys::signal::Signal;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStderr, ChildStdout, Command};
use tokio::select;
use tokio::sync::{watch, Mutex};
@@ -68,14 +68,22 @@ struct InheritOutput {
stderr: watch::Receiver<String>,
}
struct HandlerMut {
processes: BTreeMap<ProcessId, ChildInfo>,
// groups: BTreeMap<ProcessGroupId, Cgroup>,
}
#[derive(Clone)]
struct Handler {
children: Arc<Mutex<BTreeMap<ProcessId, ChildInfo>>>,
children: Arc<Mutex<HandlerMut>>,
}
impl Handler {
fn new() -> Self {
Handler {
children: Arc::new(Mutex::new(BTreeMap::new())),
children: Arc::new(Mutex::new(HandlerMut {
processes: BTreeMap::new(),
// groups: BTreeMap::new(),
})),
}
}
async fn handle(&self, req: Input) -> Result<Output, RpcError> {
@@ -184,7 +192,7 @@ impl Handler {
}
OutputStrategy::Collect => None,
};
self.children.lock().await.insert(
self.children.lock().await.processes.insert(
pid,
ChildInfo {
gid,
@@ -204,7 +212,7 @@ impl Handler {
let mut child = {
self.children
.lock()
.await
.await.processes
.get(&pid)
.ok_or_else(not_found)?
.child
@@ -249,7 +257,7 @@ impl Handler {
if signal == 9 {
self.children
.lock()
.await
.await.processes
.remove(&pid)
.ok_or_else(not_found)?;
}
@@ -260,12 +268,12 @@ impl Handler {
let mut to_kill = Vec::new();
{
let mut children_ref = self.children.lock().await;
let children = std::mem::take(children_ref.deref_mut());
let children = std::mem::take(&mut children_ref.deref_mut().processes);
for (pid, child_info) in children {
if child_info.gid == Some(gid) {
to_kill.push(pid);
} else {
children_ref.insert(pid, child_info);
children_ref.processes.insert(pid, child_info);
}
}
}
@@ -294,7 +302,7 @@ impl Handler {
async fn graceful_exit(self) {
let kill_all = futures::stream::iter(
std::mem::take(self.children.lock().await.deref_mut()).into_iter(),
std::mem::take(&mut self.children.lock().await.deref_mut().processes).into_iter(),
)
.for_each_concurrent(None, |(pid, child)| async move {
let _ = Self::killall(pid, Signal::SIGTERM);
@@ -329,34 +337,58 @@ async fn main() {
color_eyre::install().unwrap();
let handler = Handler::new();
let mut lines = BufReader::new(tokio::io::stdin()).lines();
let handler_thread = async {
while let Some(line) = lines.next_line().await? {
let local_hdlr = handler.clone();
let listener = tokio::net::UnixListener::bind("/start9/sockets/rpc.sock")?;
loop {
let (stream, _) = listener.accept().await?;
let (r, w) = stream.into_split();
let mut lines = BufReader::new(r).lines();
let handler = handler.clone();
tokio::spawn(async move {
if let Err(e) = async {
eprintln!("{}", line);
let req = serde_json::from_str::<IncomingRpc>(&line)?;
match local_hdlr.handle(req.input).await {
Ok(output) => {
println!(
"{}",
json!({ "id": req.id, "jsonrpc": "2.0", "result": output })
)
let w = Arc::new(Mutex::new(w));
while let Some(line) = lines.next_line().await.transpose() {
let handler = handler.clone();
let w = w.clone();
tokio::spawn(async move {
if let Err(e) = async {
let req = serde_json::from_str::<IncomingRpc>(&line?)?;
match handler.handle(req.input).await {
Ok(output) => {
if let Err(err) = w.lock().await.write_all(
format!("{}\n", json!({ "id": req.id, "jsonrpc": "2.0", "result": output }))
.as_bytes(),
)
.await {
tracing::error!("Error sending to {id:?}", id = req.id);
}
}
Err(e) =>
if let Err(err) = w
.lock()
.await
.write_all(
format!("{}\n", json!({ "id": req.id, "jsonrpc": "2.0", "error": e }))
.as_bytes(),
)
.await {
tracing::error!("Handle + Error sending to {id:?}", id = req.id);
},
}
Ok::<_, color_eyre::Report>(())
}
Err(e) => {
println!("{}", json!({ "id": req.id, "jsonrpc": "2.0", "error": e }))
.await
{
tracing::error!("Error parsing RPC request: {}", e);
tracing::debug!("{:?}", e);
}
}
Ok::<_, serde_json::Error>(())
}
.await
{
tracing::error!("Error parsing RPC request: {}", e);
tracing::debug!("{:?}", e);
});
}
Ok::<_, std::io::Error>(())
});
}
#[allow(unreachable_code)]
Ok::<_, std::io::Error>(())
};

View File

@@ -8,11 +8,12 @@ edition = "2021"
[dependencies]
color-eyre = "0.6.2"
futures = "0.3.21"
lazy_async_pool = "0.3.3"
models = { path = "../models" }
pin-project = "1.0.11"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
tokio = { version = "1.19.2", features = ["full"] }
tokio = { version = "1.23", features = ["full"] }
tokio-stream = { version = "0.1.9", features = ["io-util", "sync"] }
tracing = "0.1.35"
yajrc = { version = "*", git = "https://github.com/dr-bonez/yajrc.git", branch = "develop" }

View File

@@ -14,7 +14,7 @@ mod rpc_client;
mod rsync;
mod script_dir;
pub use byte_replacement_reader::*;
pub use rpc_client::RpcClient;
pub use rpc_client::{RpcClient, UnixRpcClient};
pub use rsync::*;
pub use script_dir::*;

View File

@@ -1,11 +1,17 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Weak};
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use lazy_async_pool::Pool;
use models::{Error, ErrorKind, ResultExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio::runtime::Handle;
use tokio::sync::{oneshot, Mutex};
use yajrc::{Id, RpcError, RpcMethod, RpcRequest, RpcResponse};
@@ -14,10 +20,13 @@ use crate::NonDetachingJoinHandle;
type DynWrite = Box<dyn AsyncWrite + Unpin + Send + Sync + 'static>;
type ResponseMap = BTreeMap<Id, oneshot::Sender<Result<Value, RpcError>>>;
const MAX_TRIES: u64 = 3;
pub struct RpcClient {
id: AtomicUsize,
id: Arc<AtomicUsize>,
_handler: NonDetachingJoinHandle<()>,
writable: Weak<Mutex<(DynWrite, ResponseMap)>>,
writer: DynWrite,
responses: Weak<Mutex<ResponseMap>>,
}
impl RpcClient {
pub fn new<
@@ -26,23 +35,23 @@ impl RpcClient {
>(
writer: W,
reader: R,
id: Arc<AtomicUsize>,
) -> Self {
let writer: DynWrite = Box::new(writer);
let writable = Arc::new(Mutex::new((writer, ResponseMap::new())));
let weak_writable = Arc::downgrade(&writable);
let responses = Arc::new(Mutex::new(ResponseMap::new()));
let weak_responses = Arc::downgrade(&responses);
RpcClient {
id: AtomicUsize::new(0),
id,
_handler: tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
while let Some(line) = lines.next_line().await.transpose() {
let mut w = writable.lock().await;
match line.map_err(Error::from).and_then(|l| {
serde_json::from_str::<RpcResponse>(&l)
.with_kind(ErrorKind::Deserialization)
}) {
Ok(l) => {
if let Some(id) = l.id {
if let Some(res) = w.1.remove(&id) {
if let Some(res) = responses.lock().await.remove(&id) {
if let Err(e) = res.send(l.result) {
tracing::warn!(
"RpcClient Response for Unknown ID: {:?}",
@@ -67,7 +76,88 @@ impl RpcClient {
}
})
.into(),
writable: weak_writable,
writer,
responses: weak_responses,
}
}
pub async fn request<T: RpcMethod>(
&mut self,
method: T,
params: T::Params,
) -> Result<T::Response, RpcError>
where
T: Serialize,
T::Params: Serialize,
T::Response: for<'de> Deserialize<'de>,
{
let id = Id::Number(
self.id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
.into(),
);
let request = RpcRequest {
id: Some(id.clone()),
method,
params,
};
if let Some(w) = self.responses.upgrade() {
let (send, recv) = oneshot::channel();
w.lock().await.insert(id.clone(), send);
self.writer
.write_all((serde_json::to_string(&request)? + "\n").as_bytes())
.await
.map_err(|e| {
let mut err = yajrc::INTERNAL_ERROR.clone();
err.data = Some(json!(e.to_string()));
err
})?;
match recv.await {
Ok(val) => {
return Ok(serde_json::from_value(val?)?);
}
Err(_err) => {
tokio::task::yield_now().await;
}
}
}
tracing::debug!(
"Client has finished {:?}",
futures::poll!(&mut self._handler)
);
let mut err = yajrc::INTERNAL_ERROR.clone();
err.data = Some(json!("RpcClient thread has terminated"));
Err(err)
}
}
pub struct UnixRpcClient {
pool: Pool<
RpcClient,
Box<dyn Fn() -> BoxFuture<'static, Result<RpcClient, std::io::Error>> + Send + Sync>,
BoxFuture<'static, Result<RpcClient, std::io::Error>>,
std::io::Error,
>,
}
impl UnixRpcClient {
pub fn new(path: PathBuf) -> Self {
let rt = Handle::current();
let id = Arc::new(AtomicUsize::new(0));
Self {
pool: Pool::new(
0,
Box::new(move || {
let path = path.clone();
let id = id.clone();
rt.spawn(async move {
let (r, w) = UnixStream::connect(&path).await?.into_split();
Ok(RpcClient::new(w, r, id))
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.and_then(|x| async move { x })
.boxed()
}),
),
}
}
@@ -77,40 +167,26 @@ impl RpcClient {
params: T::Params,
) -> Result<T::Response, RpcError>
where
T: Serialize,
T::Params: Serialize,
T: Serialize + Clone,
T::Params: Serialize + Clone,
T::Response: for<'de> Deserialize<'de>,
{
if let Some(w) = self.writable.upgrade() {
let mut w = w.lock().await;
let id = Id::Number(
self.id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
.into(),
);
w.0.write_all(
(serde_json::to_string(&RpcRequest {
id: Some(id.clone()),
method,
params,
})? + "\n")
.as_bytes(),
)
.await
.map_err(|e| {
let mut err = yajrc::INTERNAL_ERROR.clone();
err.data = Some(json!(e.to_string()));
err
})?;
let (send, recv) = oneshot::channel();
w.1.insert(id, send);
drop(w);
if let Ok(val) = recv.await {
return Ok(serde_json::from_value(val?)?);
let mut tries = 0;
let res = loop {
tries += 1;
let mut client = self.pool.clone().get().await?;
let res = client.request(method.clone(), params.clone()).await;
match &res {
Err(e) if e.code == yajrc::INTERNAL_ERROR.code => {
client.destroy();
}
_ => break res,
}
}
let mut err = yajrc::INTERNAL_ERROR.clone();
err.data = Some(json!("RpcClient thread has terminated"));
Err(err)
if tries > MAX_TRIES {
tracing::warn!("Max Tries exceeded");
break res;
}
};
res
}
}

View File

@@ -11,7 +11,7 @@ use deno_core::{
ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot,
};
use embassy_container_init::ProcessGroupId;
use helpers::{script_dir, spawn_local, RpcClient, Rsync};
use helpers::{script_dir, spawn_local, Rsync, UnixRpcClient};
use models::{PackageId, ProcedureName, Version, VolumeId};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -95,7 +95,7 @@ struct JsContext {
input: Value,
variable_args: Vec<serde_json::Value>,
container_process_gid: ProcessGroupId,
container_rpc_client: Option<Arc<RpcClient>>,
container_rpc_client: Option<Arc<UnixRpcClient>>,
rsyncs: Arc<Mutex<(usize, BTreeMap<usize, Rsync>)>>,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
@@ -184,7 +184,7 @@ pub struct JsExecutionEnvironment {
version: Version,
volumes: Arc<dyn PathForVolumeId>,
container_process_gid: ProcessGroupId,
container_rpc_client: Option<Arc<RpcClient>>,
container_rpc_client: Option<Arc<UnixRpcClient>>,
}
impl JsExecutionEnvironment {
@@ -194,7 +194,7 @@ impl JsExecutionEnvironment {
version: &Version,
volumes: Box<dyn PathForVolumeId>,
container_process_gid: ProcessGroupId,
container_rpc_client: Option<Arc<RpcClient>>,
container_rpc_client: Option<Arc<UnixRpcClient>>,
) -> Result<JsExecutionEnvironment, (JsError, String)> {
let data_dir = data_directory.as_ref();
let base_directory = data_dir;