diff --git a/appmgr/Cargo.lock b/appmgr/Cargo.lock
index 4c5eef9a8..75e492e27 100644
--- a/appmgr/Cargo.lock
+++ b/appmgr/Cargo.lock
@@ -791,6 +791,7 @@ dependencies = [
"futures",
"git-version",
"http",
+ "hyper-ws-listener",
"indexmap",
"itertools 0.10.1",
"jsonpath_lib",
@@ -823,6 +824,7 @@ dependencies = [
"tokio-compat-02",
"tokio-stream",
"tokio-tar",
+ "tokio-tungstenite",
"tokio-util",
"toml",
"torut",
@@ -1278,6 +1280,22 @@ dependencies = [
"tokio-native-tls",
]
+[[package]]
+name = "hyper-ws-listener"
+version = "0.1.0"
+source = "git+https://github.com/Start9Labs/hyper-ws-listener.git?branch=main#d5db3698d61293375384a5d8aa980c834d45028a"
+dependencies = [
+ "anyhow",
+ "base64 0.13.0",
+ "env_logger",
+ "futures",
+ "hyper",
+ "log",
+ "sha-1",
+ "tokio 1.8.1",
+ "tokio-tungstenite",
+]
+
[[package]]
name = "hyperlocal"
version = "0.8.0"
@@ -1319,6 +1337,15 @@ dependencies = [
"serde",
]
+[[package]]
+name = "input_buffer"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413"
+dependencies = [
+ "bytes 1.0.1",
+]
+
[[package]]
name = "instant"
version = "0.1.10"
@@ -2501,6 +2528,19 @@ dependencies = [
"yaml-rust",
]
+[[package]]
+name = "sha-1"
+version = "0.9.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16"
+dependencies = [
+ "block-buffer 0.9.0",
+ "cfg-if 1.0.0",
+ "cpufeatures",
+ "digest 0.9.0",
+ "opaque-debug 0.3.0",
+]
+
[[package]]
name = "sha1"
version = "0.6.0"
@@ -3137,6 +3177,19 @@ dependencies = [
"xattr",
]
+[[package]]
+name = "tokio-tungstenite"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2"
+dependencies = [
+ "futures-util",
+ "log",
+ "pin-project",
+ "tokio 1.8.1",
+ "tungstenite",
+]
+
[[package]]
name = "tokio-util"
version = "0.6.7"
@@ -3223,6 +3276,26 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
+[[package]]
+name = "tungstenite"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093"
+dependencies = [
+ "base64 0.13.0",
+ "byteorder",
+ "bytes 1.0.1",
+ "http",
+ "httparse",
+ "input_buffer",
+ "log",
+ "rand 0.8.3",
+ "sha-1",
+ "thiserror",
+ "url",
+ "utf-8",
+]
+
[[package]]
name = "typed-builder"
version = "0.9.0"
@@ -3307,6 +3380,12 @@ dependencies = [
"serde",
]
+[[package]]
+name = "utf-8"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
+
[[package]]
name = "vcpkg"
version = "0.2.15"
diff --git a/appmgr/Cargo.toml b/appmgr/Cargo.toml
index a895b12a9..a2ff0f370 100644
--- a/appmgr/Cargo.toml
+++ b/appmgr/Cargo.toml
@@ -60,6 +60,7 @@ emver = { version = "0.1.2", features = ["serde"] }
futures = "0.3.8"
git-version = "0.3.4"
http = "0.2.3"
+hyper-ws-listener = { git = "https://github.com/Start9Labs/hyper-ws-listener.git", branch = "main" }
indexmap = { version = "1.6.2", features = ["serde"] }
itertools = "0.10.0"
jsonpath_lib = "0.3.0"
@@ -98,6 +99,7 @@ tokio = { version = "1.8.1", features = ["full"] }
tokio-compat-02 = "0.2.0"
tokio-stream = { version = "0.1.5", features = ["io-util", "sync"] }
tokio-tar = "0.3.0"
+tokio-tungstenite = "0.14.0"
tokio-util = { version = "0.6.6", features = ["io"] }
torut = "0.1.9"
typed-builder = "0.9.0"
diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs
index f11e15f27..72b609ac6 100644
--- a/appmgr/src/bin/embassyd.rs
+++ b/appmgr/src/bin/embassyd.rs
@@ -1,23 +1,32 @@
use std::path::Path;
use std::time::Duration;
+use anyhow::anyhow;
use embassy::context::{EitherContext, RpcContext};
use embassy::db::model::Database;
+use embassy::db::subscribe;
use embassy::middleware::auth::auth;
use embassy::middleware::cors::cors;
use embassy::status::{check_all, synchronize_all};
use embassy::util::daemon;
-use embassy::{Error, ErrorKind};
+use embassy::{Error, ErrorKind, ResultExt};
use futures::TryFutureExt;
use patch_db::json_ptr::JsonPointer;
-use rpc_toolkit::hyper::StatusCode;
+use rpc_toolkit::hyper::{Body, Response, Server, StatusCode};
use rpc_toolkit::rpc_server;
-use rpc_toolkit::rpc_server_helpers::DynMiddleware;
fn status_fn(_: i32) -> StatusCode {
StatusCode::OK
}
+fn err_to_500(e: Error) -> Response
{
+ log::error!("{}", e);
+ Response::builder()
+ .status(StatusCode::INTERNAL_SERVER_ERROR)
+ .body(Body::empty())
+ .unwrap()
+}
+
async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
let rpc_ctx = RpcContext::init(cfg_path).await?;
if !rpc_ctx.db.exists(&::default()).await? {
@@ -37,6 +46,51 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
auth,
]
});
+
+ let rev_cache_ctx = rpc_ctx.clone();
+ let revision_cache_task = tokio::spawn(async move {
+ let mut sub = rev_cache_ctx.db.subscribe();
+ loop {
+ let rev = match sub.recv().await {
+ Ok(a) => a,
+ Err(_) => {
+ rev_cache_ctx.revision_cache.write().await.truncate(0);
+ continue;
+ }
+ }; // TODO: handle falling behind
+ let mut cache = rev_cache_ctx.revision_cache.write().await;
+ cache.push_back(rev);
+ if cache.len() > rev_cache_ctx.revision_cache_size {
+ cache.pop_front();
+ }
+ }
+ });
+
+ let ws_ctx = rpc_ctx.clone();
+ let ws_server = {
+ let builder = Server::bind(&ws_ctx.bind_ws);
+
+ let make_svc = ::rpc_toolkit::hyper::service::make_service_fn(move |_| {
+ let ctx = ws_ctx.clone();
+ async move {
+ Ok::<_, ::rpc_toolkit::hyper::Error>(::rpc_toolkit::hyper::service::service_fn(
+ move |req| {
+ let ctx = ctx.clone();
+ async move {
+ match req.uri().path() {
+ "/db" => Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500)),
+ _ => Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::empty()),
+ }
+ }
+ },
+ ))
+ }
+ });
+ builder.serve(make_svc)
+ };
+
let status_ctx = rpc_ctx.clone();
let status_daemon = daemon(
move || {
@@ -69,6 +123,11 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
);
futures::try_join!(
server.map_err(|e| Error::new(e, ErrorKind::Network)),
+ revision_cache_task.map_err(|e| Error::new(
+ anyhow!("{}", e).context("Revision Cache daemon panicked!"),
+ ErrorKind::Unknown
+ )),
+ ws_server.map_err(|e| Error::new(e, ErrorKind::Network)),
status_daemon.map_err(|e| Error::new(
e.context("Status Sync daemon panicked!"),
ErrorKind::Unknown
diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs
index 5e1713476..da88595e5 100644
--- a/appmgr/src/context/rpc.rs
+++ b/appmgr/src/context/rpc.rs
@@ -1,16 +1,18 @@
+use std::collections::VecDeque;
use std::net::{IpAddr, SocketAddr};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bollard::Docker;
-use patch_db::PatchDb;
+use patch_db::{PatchDb, Revision};
use reqwest::Url;
use rpc_toolkit::url::Host;
use rpc_toolkit::Context;
use serde::Deserialize;
use sqlx::SqlitePool;
use tokio::fs::File;
+use tokio::sync::RwLock;
use crate::manager::ManagerMap;
use crate::net::NetController;
@@ -25,6 +27,7 @@ pub struct RpcContextConfig {
pub tor_control: Option,
pub db: Option,
pub secret_store: Option,
+ pub revision_cache_size: Option,
}
pub struct RpcContextSeed {
@@ -35,6 +38,8 @@ pub struct RpcContextSeed {
pub docker: Docker,
pub net_controller: Arc,
pub managers: ManagerMap,
+ pub revision_cache_size: usize,
+ pub revision_cache: RwLock>>,
}
#[derive(Clone)]
@@ -88,6 +93,8 @@ impl RpcContext {
docker,
net_controller,
managers,
+ revision_cache_size: base.revision_cache_size.unwrap_or(512),
+ revision_cache: RwLock::new(VecDeque::new()),
});
Ok(Self(seed))
}
diff --git a/appmgr/src/db/mod.rs b/appmgr/src/db/mod.rs
index 88f72829d..8d089d94a 100644
--- a/appmgr/src/db/mod.rs
+++ b/appmgr/src/db/mod.rs
@@ -1,4 +1,138 @@
pub mod model;
pub mod util;
-pub use model::DatabaseModel;
+use std::future::Future;
+use std::sync::Arc;
+
+use futures::{SinkExt, StreamExt};
+use patch_db::json_ptr::JsonPointer;
+use patch_db::{DiffPatch, Dump, Revision};
+use rpc_toolkit::command;
+use rpc_toolkit::hyper::upgrade::Upgraded;
+use rpc_toolkit::hyper::{Body, Error as HyperError, Request, Response};
+use rpc_toolkit::yajrc::RpcError;
+use serde::Serialize;
+use serde_json::Value;
+use tokio::task::JoinError;
+use tokio_tungstenite::tungstenite::Message;
+use tokio_tungstenite::WebSocketStream;
+
+pub use self::model::DatabaseModel;
+use self::util::WithRevision;
+use crate::context::RpcContext;
+use crate::{Error, ResultExt};
+
+async fn ws_handler<
+ WSFut: Future