From 0d9101841cb0cc5909dd9aa39bd6373b4e55279d Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Tue, 3 Aug 2021 15:49:40 -0600 Subject: [PATCH] ws subscription --- appmgr/Cargo.lock | 79 +++++++++++++++++++++ appmgr/Cargo.toml | 2 + appmgr/src/bin/embassyd.rs | 65 +++++++++++++++++- appmgr/src/context/rpc.rs | 9 ++- appmgr/src/db/mod.rs | 136 ++++++++++++++++++++++++++++++++++++- 5 files changed, 286 insertions(+), 5 deletions(-) diff --git a/appmgr/Cargo.lock b/appmgr/Cargo.lock index 4c5eef9a8..75e492e27 100644 --- a/appmgr/Cargo.lock +++ b/appmgr/Cargo.lock @@ -791,6 +791,7 @@ dependencies = [ "futures", "git-version", "http", + "hyper-ws-listener", "indexmap", "itertools 0.10.1", "jsonpath_lib", @@ -823,6 +824,7 @@ dependencies = [ "tokio-compat-02", "tokio-stream", "tokio-tar", + "tokio-tungstenite", "tokio-util", "toml", "torut", @@ -1278,6 +1280,22 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyper-ws-listener" +version = "0.1.0" +source = "git+https://github.com/Start9Labs/hyper-ws-listener.git?branch=main#d5db3698d61293375384a5d8aa980c834d45028a" +dependencies = [ + "anyhow", + "base64 0.13.0", + "env_logger", + "futures", + "hyper", + "log", + "sha-1", + "tokio 1.8.1", + "tokio-tungstenite", +] + [[package]] name = "hyperlocal" version = "0.8.0" @@ -1319,6 +1337,15 @@ dependencies = [ "serde", ] +[[package]] +name = "input_buffer" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" +dependencies = [ + "bytes 1.0.1", +] + [[package]] name = "instant" version = "0.1.10" @@ -2501,6 +2528,19 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "sha-1" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.9.0", + "opaque-debug 0.3.0", +] + [[package]] name = "sha1" version = "0.6.0" @@ -3137,6 +3177,19 @@ dependencies = [ "xattr", ] +[[package]] +name = "tokio-tungstenite" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2" +dependencies = [ + "futures-util", + "log", + "pin-project", + "tokio 1.8.1", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.7" @@ -3223,6 +3276,26 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.0.1", + "http", + "httparse", + "input_buffer", + "log", + "rand 0.8.3", + "sha-1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typed-builder" version = "0.9.0" @@ -3307,6 +3380,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/appmgr/Cargo.toml b/appmgr/Cargo.toml index a895b12a9..a2ff0f370 100644 --- a/appmgr/Cargo.toml +++ b/appmgr/Cargo.toml @@ -60,6 +60,7 @@ emver = { version = "0.1.2", features = ["serde"] } futures = "0.3.8" git-version = "0.3.4" http = "0.2.3" +hyper-ws-listener = { git = "https://github.com/Start9Labs/hyper-ws-listener.git", branch = "main" } indexmap = { version = "1.6.2", features = ["serde"] } itertools = "0.10.0" jsonpath_lib = "0.3.0" @@ -98,6 +99,7 @@ tokio = { version = "1.8.1", features = ["full"] } tokio-compat-02 = "0.2.0" tokio-stream = { version = "0.1.5", features = ["io-util", "sync"] } tokio-tar = "0.3.0" +tokio-tungstenite = "0.14.0" tokio-util = { version = "0.6.6", features = ["io"] } torut = "0.1.9" typed-builder = "0.9.0" diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index f11e15f27..72b609ac6 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -1,23 +1,32 @@ use std::path::Path; use std::time::Duration; +use anyhow::anyhow; use embassy::context::{EitherContext, RpcContext}; use embassy::db::model::Database; +use embassy::db::subscribe; use embassy::middleware::auth::auth; use embassy::middleware::cors::cors; use embassy::status::{check_all, synchronize_all}; use embassy::util::daemon; -use embassy::{Error, ErrorKind}; +use embassy::{Error, ErrorKind, ResultExt}; use futures::TryFutureExt; use patch_db::json_ptr::JsonPointer; -use rpc_toolkit::hyper::StatusCode; +use rpc_toolkit::hyper::{Body, Response, Server, StatusCode}; use rpc_toolkit::rpc_server; -use rpc_toolkit::rpc_server_helpers::DynMiddleware; fn status_fn(_: i32) -> StatusCode { StatusCode::OK } +fn err_to_500(e: Error) -> Response { + log::error!("{}", e); + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .unwrap() +} + async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { let rpc_ctx = RpcContext::init(cfg_path).await?; if !rpc_ctx.db.exists(&::default()).await? { @@ -37,6 +46,51 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { auth, ] }); + + let rev_cache_ctx = rpc_ctx.clone(); + let revision_cache_task = tokio::spawn(async move { + let mut sub = rev_cache_ctx.db.subscribe(); + loop { + let rev = match sub.recv().await { + Ok(a) => a, + Err(_) => { + rev_cache_ctx.revision_cache.write().await.truncate(0); + continue; + } + }; // TODO: handle falling behind + let mut cache = rev_cache_ctx.revision_cache.write().await; + cache.push_back(rev); + if cache.len() > rev_cache_ctx.revision_cache_size { + cache.pop_front(); + } + } + }); + + let ws_ctx = rpc_ctx.clone(); + let ws_server = { + let builder = Server::bind(&ws_ctx.bind_ws); + + let make_svc = ::rpc_toolkit::hyper::service::make_service_fn(move |_| { + let ctx = ws_ctx.clone(); + async move { + Ok::<_, ::rpc_toolkit::hyper::Error>(::rpc_toolkit::hyper::service::service_fn( + move |req| { + let ctx = ctx.clone(); + async move { + match req.uri().path() { + "/db" => Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500)), + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()), + } + } + }, + )) + } + }); + builder.serve(make_svc) + }; + let status_ctx = rpc_ctx.clone(); let status_daemon = daemon( move || { @@ -69,6 +123,11 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> { ); futures::try_join!( server.map_err(|e| Error::new(e, ErrorKind::Network)), + revision_cache_task.map_err(|e| Error::new( + anyhow!("{}", e).context("Revision Cache daemon panicked!"), + ErrorKind::Unknown + )), + ws_server.map_err(|e| Error::new(e, ErrorKind::Network)), status_daemon.map_err(|e| Error::new( e.context("Status Sync daemon panicked!"), ErrorKind::Unknown diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 5e1713476..da88595e5 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -1,16 +1,18 @@ +use std::collections::VecDeque; use std::net::{IpAddr, SocketAddr}; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::Arc; use bollard::Docker; -use patch_db::PatchDb; +use patch_db::{PatchDb, Revision}; use reqwest::Url; use rpc_toolkit::url::Host; use rpc_toolkit::Context; use serde::Deserialize; use sqlx::SqlitePool; use tokio::fs::File; +use tokio::sync::RwLock; use crate::manager::ManagerMap; use crate::net::NetController; @@ -25,6 +27,7 @@ pub struct RpcContextConfig { pub tor_control: Option, pub db: Option, pub secret_store: Option, + pub revision_cache_size: Option, } pub struct RpcContextSeed { @@ -35,6 +38,8 @@ pub struct RpcContextSeed { pub docker: Docker, pub net_controller: Arc, pub managers: ManagerMap, + pub revision_cache_size: usize, + pub revision_cache: RwLock>>, } #[derive(Clone)] @@ -88,6 +93,8 @@ impl RpcContext { docker, net_controller, managers, + revision_cache_size: base.revision_cache_size.unwrap_or(512), + revision_cache: RwLock::new(VecDeque::new()), }); Ok(Self(seed)) } diff --git a/appmgr/src/db/mod.rs b/appmgr/src/db/mod.rs index 88f72829d..8d089d94a 100644 --- a/appmgr/src/db/mod.rs +++ b/appmgr/src/db/mod.rs @@ -1,4 +1,138 @@ pub mod model; pub mod util; -pub use model::DatabaseModel; +use std::future::Future; +use std::sync::Arc; + +use futures::{SinkExt, StreamExt}; +use patch_db::json_ptr::JsonPointer; +use patch_db::{DiffPatch, Dump, Revision}; +use rpc_toolkit::command; +use rpc_toolkit::hyper::upgrade::Upgraded; +use rpc_toolkit::hyper::{Body, Error as HyperError, Request, Response}; +use rpc_toolkit::yajrc::RpcError; +use serde::Serialize; +use serde_json::Value; +use tokio::task::JoinError; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::WebSocketStream; + +pub use self::model::DatabaseModel; +use self::util::WithRevision; +use crate::context::RpcContext; +use crate::{Error, ResultExt}; + +async fn ws_handler< + WSFut: Future, HyperError>, JoinError>>, +>( + ctx: RpcContext, + ws_fut: WSFut, +) -> Result<(), Error> { + let (dump, mut sub) = ctx.db.dump_and_sub().await; + let mut stream = ws_fut + .await + .with_kind(crate::ErrorKind::Network)? + .with_kind(crate::ErrorKind::Unknown)?; + stream.next().await; + stream + .send(Message::Text( + rpc_toolkit::serde_json::to_string(&dump).with_kind(crate::ErrorKind::Serialization)?, + )) + .await + .with_kind(crate::ErrorKind::Network)?; + + loop { + let rev = sub.recv().await.with_kind(crate::ErrorKind::Database)?; + stream + .send(Message::Text( + rpc_toolkit::serde_json::to_string(&rev) + .with_kind(crate::ErrorKind::Serialization)?, + )) + .await + .with_kind(crate::ErrorKind::Network)?; + } +} + +pub async fn subscribe(ctx: RpcContext, req: Request) -> Result, Error> { + let (res, ws_fut) = hyper_ws_listener::create_ws(req).with_kind(crate::ErrorKind::Network)?; + if let Some(ws_fut) = ws_fut { + tokio::task::spawn(async move { + match ws_handler(ctx, ws_fut).await { + Ok(()) => (), + Err(e) => log::error!("WebSocket Closed: {}", e), + } + }); + } + + Ok(res) +} + +#[command(subcommands(revisions, dump, put, patch))] +pub fn db(#[context] ctx: RpcContext) -> Result { + Ok(ctx) +} + +#[derive(Serialize)] +#[serde(untagged)] +pub enum RevisionsRes { + Revisions(Vec>), + Dump(Dump), +} + +#[command(rpc_only)] +pub async fn revisions( + #[context] ctx: RpcContext, + #[arg] since: u64, +) -> Result { + let cache = ctx.revision_cache.read().await; + if cache + .front() + .map(|rev| rev.id <= since + 1) + .unwrap_or(false) + { + Ok(RevisionsRes::Revisions( + cache + .iter() + .skip_while(|rev| rev.id < since + 1) + .cloned() + .collect(), + )) + } else { + drop(cache); + Ok(RevisionsRes::Dump(ctx.db.dump().await)) + } +} + +#[command(rpc_only)] +pub async fn dump(#[context] ctx: RpcContext) -> Result { + Ok(ctx.db.dump().await) +} + +#[command(subcommands(ui))] +pub fn put(#[context] ctx: RpcContext) -> Result { + Ok(ctx) +} + +#[command(rpc_only)] +pub async fn ui( + #[context] ctx: RpcContext, + #[arg] pointer: JsonPointer, + #[arg] value: Value, +) -> Result, RpcError> { + let ptr = "/ui".parse::()? + &pointer; + Ok(WithRevision { + response: (), + revision: ctx.db.put(&ptr, &value, None).await?, + }) +} + +#[command(rpc_only)] +pub async fn patch( + #[context] ctx: RpcContext, + #[arg] patch: DiffPatch, +) -> Result, RpcError> { + Ok(WithRevision { + response: (), + revision: ctx.db.apply(patch, None, None).await?, + }) +}