From c3d17bf8476bf312c08f7a7808526a0be93b3cc4 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Thu, 25 Jul 2024 12:26:49 -0600 Subject: [PATCH] fix sync_db middleware --- core/startos/src/context/rpc.rs | 4 ++- core/startos/src/db/mod.rs | 48 +++++++++++++++++++++++++++++-- core/startos/src/middleware/db.rs | 9 +++--- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/core/startos/src/context/rpc.rs b/core/startos/src/context/rpc.rs index ba1ab3fbb..cea2059d3 100644 --- a/core/startos/src/context/rpc.rs +++ b/core/startos/src/context/rpc.rs @@ -11,7 +11,7 @@ use josekit::jwk::Jwk; use reqwest::{Client, Proxy}; use rpc_toolkit::yajrc::RpcError; use rpc_toolkit::{CallRemote, Context, Empty}; -use tokio::sync::{broadcast, Mutex, RwLock}; +use tokio::sync::{broadcast, watch, Mutex, RwLock}; use tokio::time::Instant; use tracing::instrument; @@ -43,6 +43,7 @@ pub struct RpcContextSeed { pub datadir: PathBuf, pub disk_guid: Arc, pub db: TypedPatchDb, + pub sync_db: watch::Sender, pub account: RwLock, pub net_controller: Arc, pub s9pk_arch: Option<&'static str>, @@ -212,6 +213,7 @@ impl RpcContext { find_eth_iface().await? }, disk_guid, + sync_db: watch::Sender::new(db.sequence().await), db, account: RwLock::new(account), net_controller, diff --git a/core/startos/src/db/mod.rs b/core/startos/src/db/mod.rs index e59161e9b..ef35bd30d 100644 --- a/core/startos/src/db/mod.rs +++ b/core/startos/src/db/mod.rs @@ -10,10 +10,12 @@ use clap::Parser; use imbl_value::InternedString; use itertools::Itertools; use patch_db::json_ptr::{JsonPointer, ROOT}; -use patch_db::{Dump, Revision}; +use patch_db::{DiffPatch, Dump, Revision}; use rpc_toolkit::yajrc::RpcError; use rpc_toolkit::{from_fn_async, Context, HandlerArgs, HandlerExt, ParentHandler}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tokio::sync::watch; use tracing::instrument; use ts_rs::TS; @@ -124,14 +126,56 @@ pub struct SubscribeRes { pub guid: Guid, } +struct DbSubscriber { + rev: u64, + sub: UnboundedReceiver, + sync_db: watch::Receiver, +} +impl DbSubscriber { + async fn recv(&mut self) -> Option { + loop { + tokio::select! { + rev = self.sub.recv() => { + if let Some(rev) = rev.as_ref() { + self.rev = rev.id; + } + return rev + } + _ = self.sync_db.changed() => { + let id = *self.sync_db.borrow(); + if id > self.rev { + match self.sub.try_recv() { + Ok(rev) => { + self.rev = rev.id; + return Some(rev) + } + Err(mpsc::error::TryRecvError::Disconnected) => { + return None + } + Err(mpsc::error::TryRecvError::Empty) => { + return Some(Revision { id, patch: DiffPatch::default() }) + } + } + } + } + } + } + } +} + pub async fn subscribe( ctx: RpcContext, SubscribeParams { pointer, session }: SubscribeParams, ) -> Result { - let (dump, mut sub) = ctx + let (dump, sub) = ctx .db .dump_and_sub(pointer.unwrap_or_else(|| PUBLIC.clone())) .await; + let mut sub = DbSubscriber { + rev: dump.id, + sub, + sync_db: ctx.sync_db.subscribe(), + }; let guid = Guid::new(); ctx.rpc_continuations .add( diff --git a/core/startos/src/middleware/db.rs b/core/startos/src/middleware/db.rs index e8b4f8887..4e5f0e037 100644 --- a/core/startos/src/middleware/db.rs +++ b/core/startos/src/middleware/db.rs @@ -36,10 +36,11 @@ impl Middleware for SyncDb { async fn process_http_response(&mut self, context: &RpcContext, response: &mut Response) { if let Err(e) = async { if self.sync_db { - response.headers_mut().append( - "X-Patch-Sequence", - HeaderValue::from_str(&context.db.sequence().await.to_string())?, - ); + let id = context.db.sequence().await; + response + .headers_mut() + .append("X-Patch-Sequence", HeaderValue::from_str(&id.to_string())?); + context.sync_db.send_replace(id); } Ok::<_, InvalidHeaderValue>(()) }