ws subscription

This commit is contained in:
Aiden McClelland
2021-08-03 15:49:40 -06:00
committed by Aiden McClelland
parent e1c123c4e3
commit 0d9101841c
5 changed files with 286 additions and 5 deletions

79
appmgr/Cargo.lock generated
View File

@@ -791,6 +791,7 @@ dependencies = [
"futures",
"git-version",
"http",
"hyper-ws-listener",
"indexmap",
"itertools 0.10.1",
"jsonpath_lib",
@@ -823,6 +824,7 @@ dependencies = [
"tokio-compat-02",
"tokio-stream",
"tokio-tar",
"tokio-tungstenite",
"tokio-util",
"toml",
"torut",
@@ -1278,6 +1280,22 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "hyper-ws-listener"
version = "0.1.0"
source = "git+https://github.com/Start9Labs/hyper-ws-listener.git?branch=main#d5db3698d61293375384a5d8aa980c834d45028a"
dependencies = [
"anyhow",
"base64 0.13.0",
"env_logger",
"futures",
"hyper",
"log",
"sha-1",
"tokio 1.8.1",
"tokio-tungstenite",
]
[[package]]
name = "hyperlocal"
version = "0.8.0"
@@ -1319,6 +1337,15 @@ dependencies = [
"serde",
]
[[package]]
name = "input_buffer"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413"
dependencies = [
"bytes 1.0.1",
]
[[package]]
name = "instant"
version = "0.1.10"
@@ -2501,6 +2528,19 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "sha-1"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16"
dependencies = [
"block-buffer 0.9.0",
"cfg-if 1.0.0",
"cpufeatures",
"digest 0.9.0",
"opaque-debug 0.3.0",
]
[[package]]
name = "sha1"
version = "0.6.0"
@@ -3137,6 +3177,19 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-tungstenite"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e96bb520beab540ab664bd5a9cfeaa1fcd846fa68c830b42e2c8963071251d2"
dependencies = [
"futures-util",
"log",
"pin-project",
"tokio 1.8.1",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.6.7"
@@ -3223,6 +3276,26 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093"
dependencies = [
"base64 0.13.0",
"byteorder",
"bytes 1.0.1",
"http",
"httparse",
"input_buffer",
"log",
"rand 0.8.3",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typed-builder"
version = "0.9.0"
@@ -3307,6 +3380,12 @@ dependencies = [
"serde",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@@ -60,6 +60,7 @@ emver = { version = "0.1.2", features = ["serde"] }
futures = "0.3.8"
git-version = "0.3.4"
http = "0.2.3"
hyper-ws-listener = { git = "https://github.com/Start9Labs/hyper-ws-listener.git", branch = "main" }
indexmap = { version = "1.6.2", features = ["serde"] }
itertools = "0.10.0"
jsonpath_lib = "0.3.0"
@@ -98,6 +99,7 @@ tokio = { version = "1.8.1", features = ["full"] }
tokio-compat-02 = "0.2.0"
tokio-stream = { version = "0.1.5", features = ["io-util", "sync"] }
tokio-tar = "0.3.0"
tokio-tungstenite = "0.14.0"
tokio-util = { version = "0.6.6", features = ["io"] }
torut = "0.1.9"
typed-builder = "0.9.0"

View File

@@ -1,23 +1,32 @@
use std::path::Path;
use std::time::Duration;
use anyhow::anyhow;
use embassy::context::{EitherContext, RpcContext};
use embassy::db::model::Database;
use embassy::db::subscribe;
use embassy::middleware::auth::auth;
use embassy::middleware::cors::cors;
use embassy::status::{check_all, synchronize_all};
use embassy::util::daemon;
use embassy::{Error, ErrorKind};
use embassy::{Error, ErrorKind, ResultExt};
use futures::TryFutureExt;
use patch_db::json_ptr::JsonPointer;
use rpc_toolkit::hyper::StatusCode;
use rpc_toolkit::hyper::{Body, Response, Server, StatusCode};
use rpc_toolkit::rpc_server;
use rpc_toolkit::rpc_server_helpers::DynMiddleware;
fn status_fn(_: i32) -> StatusCode {
StatusCode::OK
}
fn err_to_500(e: Error) -> Response<Body> {
log::error!("{}", e);
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap()
}
async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
let rpc_ctx = RpcContext::init(cfg_path).await?;
if !rpc_ctx.db.exists(&<JsonPointer>::default()).await? {
@@ -37,6 +46,51 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
auth,
]
});
let rev_cache_ctx = rpc_ctx.clone();
let revision_cache_task = tokio::spawn(async move {
let mut sub = rev_cache_ctx.db.subscribe();
loop {
let rev = match sub.recv().await {
Ok(a) => a,
Err(_) => {
rev_cache_ctx.revision_cache.write().await.truncate(0);
continue;
}
}; // TODO: handle falling behind
let mut cache = rev_cache_ctx.revision_cache.write().await;
cache.push_back(rev);
if cache.len() > rev_cache_ctx.revision_cache_size {
cache.pop_front();
}
}
});
let ws_ctx = rpc_ctx.clone();
let ws_server = {
let builder = Server::bind(&ws_ctx.bind_ws);
let make_svc = ::rpc_toolkit::hyper::service::make_service_fn(move |_| {
let ctx = ws_ctx.clone();
async move {
Ok::<_, ::rpc_toolkit::hyper::Error>(::rpc_toolkit::hyper::service::service_fn(
move |req| {
let ctx = ctx.clone();
async move {
match req.uri().path() {
"/db" => Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500)),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty()),
}
}
},
))
}
});
builder.serve(make_svc)
};
let status_ctx = rpc_ctx.clone();
let status_daemon = daemon(
move || {
@@ -69,6 +123,11 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<(), Error> {
);
futures::try_join!(
server.map_err(|e| Error::new(e, ErrorKind::Network)),
revision_cache_task.map_err(|e| Error::new(
anyhow!("{}", e).context("Revision Cache daemon panicked!"),
ErrorKind::Unknown
)),
ws_server.map_err(|e| Error::new(e, ErrorKind::Network)),
status_daemon.map_err(|e| Error::new(
e.context("Status Sync daemon panicked!"),
ErrorKind::Unknown

View File

@@ -1,16 +1,18 @@
use std::collections::VecDeque;
use std::net::{IpAddr, SocketAddr};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bollard::Docker;
use patch_db::PatchDb;
use patch_db::{PatchDb, Revision};
use reqwest::Url;
use rpc_toolkit::url::Host;
use rpc_toolkit::Context;
use serde::Deserialize;
use sqlx::SqlitePool;
use tokio::fs::File;
use tokio::sync::RwLock;
use crate::manager::ManagerMap;
use crate::net::NetController;
@@ -25,6 +27,7 @@ pub struct RpcContextConfig {
pub tor_control: Option<SocketAddr>,
pub db: Option<PathBuf>,
pub secret_store: Option<PathBuf>,
pub revision_cache_size: Option<usize>,
}
pub struct RpcContextSeed {
@@ -35,6 +38,8 @@ pub struct RpcContextSeed {
pub docker: Docker,
pub net_controller: Arc<NetController>,
pub managers: ManagerMap,
pub revision_cache_size: usize,
pub revision_cache: RwLock<VecDeque<Arc<Revision>>>,
}
#[derive(Clone)]
@@ -88,6 +93,8 @@ impl RpcContext {
docker,
net_controller,
managers,
revision_cache_size: base.revision_cache_size.unwrap_or(512),
revision_cache: RwLock::new(VecDeque::new()),
});
Ok(Self(seed))
}

View File

@@ -1,4 +1,138 @@
pub mod model;
pub mod util;
pub use model::DatabaseModel;
use std::future::Future;
use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use patch_db::json_ptr::JsonPointer;
use patch_db::{DiffPatch, Dump, Revision};
use rpc_toolkit::command;
use rpc_toolkit::hyper::upgrade::Upgraded;
use rpc_toolkit::hyper::{Body, Error as HyperError, Request, Response};
use rpc_toolkit::yajrc::RpcError;
use serde::Serialize;
use serde_json::Value;
use tokio::task::JoinError;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
pub use self::model::DatabaseModel;
use self::util::WithRevision;
use crate::context::RpcContext;
use crate::{Error, ResultExt};
async fn ws_handler<
WSFut: Future<Output = Result<Result<WebSocketStream<Upgraded>, HyperError>, JoinError>>,
>(
ctx: RpcContext,
ws_fut: WSFut,
) -> Result<(), Error> {
let (dump, mut sub) = ctx.db.dump_and_sub().await;
let mut stream = ws_fut
.await
.with_kind(crate::ErrorKind::Network)?
.with_kind(crate::ErrorKind::Unknown)?;
stream.next().await;
stream
.send(Message::Text(
rpc_toolkit::serde_json::to_string(&dump).with_kind(crate::ErrorKind::Serialization)?,
))
.await
.with_kind(crate::ErrorKind::Network)?;
loop {
let rev = sub.recv().await.with_kind(crate::ErrorKind::Database)?;
stream
.send(Message::Text(
rpc_toolkit::serde_json::to_string(&rev)
.with_kind(crate::ErrorKind::Serialization)?,
))
.await
.with_kind(crate::ErrorKind::Network)?;
}
}
pub async fn subscribe(ctx: RpcContext, req: Request<Body>) -> Result<Response<Body>, Error> {
let (res, ws_fut) = hyper_ws_listener::create_ws(req).with_kind(crate::ErrorKind::Network)?;
if let Some(ws_fut) = ws_fut {
tokio::task::spawn(async move {
match ws_handler(ctx, ws_fut).await {
Ok(()) => (),
Err(e) => log::error!("WebSocket Closed: {}", e),
}
});
}
Ok(res)
}
#[command(subcommands(revisions, dump, put, patch))]
pub fn db(#[context] ctx: RpcContext) -> Result<RpcContext, RpcError> {
Ok(ctx)
}
#[derive(Serialize)]
#[serde(untagged)]
pub enum RevisionsRes {
Revisions(Vec<Arc<Revision>>),
Dump(Dump),
}
#[command(rpc_only)]
pub async fn revisions(
#[context] ctx: RpcContext,
#[arg] since: u64,
) -> Result<RevisionsRes, RpcError> {
let cache = ctx.revision_cache.read().await;
if cache
.front()
.map(|rev| rev.id <= since + 1)
.unwrap_or(false)
{
Ok(RevisionsRes::Revisions(
cache
.iter()
.skip_while(|rev| rev.id < since + 1)
.cloned()
.collect(),
))
} else {
drop(cache);
Ok(RevisionsRes::Dump(ctx.db.dump().await))
}
}
#[command(rpc_only)]
pub async fn dump(#[context] ctx: RpcContext) -> Result<Dump, RpcError> {
Ok(ctx.db.dump().await)
}
#[command(subcommands(ui))]
pub fn put(#[context] ctx: RpcContext) -> Result<RpcContext, RpcError> {
Ok(ctx)
}
#[command(rpc_only)]
pub async fn ui(
#[context] ctx: RpcContext,
#[arg] pointer: JsonPointer,
#[arg] value: Value,
) -> Result<WithRevision<()>, RpcError> {
let ptr = "/ui".parse::<JsonPointer>()? + &pointer;
Ok(WithRevision {
response: (),
revision: ctx.db.put(&ptr, &value, None).await?,
})
}
#[command(rpc_only)]
pub async fn patch(
#[context] ctx: RpcContext,
#[arg] patch: DiffPatch,
) -> Result<WithRevision<()>, RpcError> {
Ok(WithRevision {
response: (),
revision: ctx.db.apply(patch, None, None).await?,
})
}