fix sync_db middleware

This commit is contained in:
Aiden McClelland
2024-07-25 12:26:49 -06:00
parent e04b93a51a
commit c3d17bf847
3 changed files with 54 additions and 7 deletions

View File

@@ -11,7 +11,7 @@ use josekit::jwk::Jwk;
use reqwest::{Client, Proxy}; use reqwest::{Client, Proxy};
use rpc_toolkit::yajrc::RpcError; use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{CallRemote, Context, Empty}; use rpc_toolkit::{CallRemote, Context, Empty};
use tokio::sync::{broadcast, Mutex, RwLock}; use tokio::sync::{broadcast, watch, Mutex, RwLock};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::instrument; use tracing::instrument;
@@ -43,6 +43,7 @@ pub struct RpcContextSeed {
pub datadir: PathBuf, pub datadir: PathBuf,
pub disk_guid: Arc<String>, pub disk_guid: Arc<String>,
pub db: TypedPatchDb<Database>, pub db: TypedPatchDb<Database>,
pub sync_db: watch::Sender<u64>,
pub account: RwLock<AccountInfo>, pub account: RwLock<AccountInfo>,
pub net_controller: Arc<NetController>, pub net_controller: Arc<NetController>,
pub s9pk_arch: Option<&'static str>, pub s9pk_arch: Option<&'static str>,
@@ -212,6 +213,7 @@ impl RpcContext {
find_eth_iface().await? find_eth_iface().await?
}, },
disk_guid, disk_guid,
sync_db: watch::Sender::new(db.sequence().await),
db, db,
account: RwLock::new(account), account: RwLock::new(account),
net_controller, net_controller,

View File

@@ -10,10 +10,12 @@ use clap::Parser;
use imbl_value::InternedString; use imbl_value::InternedString;
use itertools::Itertools; use itertools::Itertools;
use patch_db::json_ptr::{JsonPointer, ROOT}; use patch_db::json_ptr::{JsonPointer, ROOT};
use patch_db::{Dump, Revision}; use patch_db::{DiffPatch, Dump, Revision};
use rpc_toolkit::yajrc::RpcError; use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{from_fn_async, Context, HandlerArgs, HandlerExt, ParentHandler}; use rpc_toolkit::{from_fn_async, Context, HandlerArgs, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::watch;
use tracing::instrument; use tracing::instrument;
use ts_rs::TS; use ts_rs::TS;
@@ -124,14 +126,56 @@ pub struct SubscribeRes {
pub guid: Guid, pub guid: Guid,
} }
struct DbSubscriber {
rev: u64,
sub: UnboundedReceiver<Revision>,
sync_db: watch::Receiver<u64>,
}
impl DbSubscriber {
async fn recv(&mut self) -> Option<Revision> {
loop {
tokio::select! {
rev = self.sub.recv() => {
if let Some(rev) = rev.as_ref() {
self.rev = rev.id;
}
return rev
}
_ = self.sync_db.changed() => {
let id = *self.sync_db.borrow();
if id > self.rev {
match self.sub.try_recv() {
Ok(rev) => {
self.rev = rev.id;
return Some(rev)
}
Err(mpsc::error::TryRecvError::Disconnected) => {
return None
}
Err(mpsc::error::TryRecvError::Empty) => {
return Some(Revision { id, patch: DiffPatch::default() })
}
}
}
}
}
}
}
}
pub async fn subscribe( pub async fn subscribe(
ctx: RpcContext, ctx: RpcContext,
SubscribeParams { pointer, session }: SubscribeParams, SubscribeParams { pointer, session }: SubscribeParams,
) -> Result<SubscribeRes, Error> { ) -> Result<SubscribeRes, Error> {
let (dump, mut sub) = ctx let (dump, sub) = ctx
.db .db
.dump_and_sub(pointer.unwrap_or_else(|| PUBLIC.clone())) .dump_and_sub(pointer.unwrap_or_else(|| PUBLIC.clone()))
.await; .await;
let mut sub = DbSubscriber {
rev: dump.id,
sub,
sync_db: ctx.sync_db.subscribe(),
};
let guid = Guid::new(); let guid = Guid::new();
ctx.rpc_continuations ctx.rpc_continuations
.add( .add(

View File

@@ -36,10 +36,11 @@ impl Middleware<RpcContext> for SyncDb {
async fn process_http_response(&mut self, context: &RpcContext, response: &mut Response) { async fn process_http_response(&mut self, context: &RpcContext, response: &mut Response) {
if let Err(e) = async { if let Err(e) = async {
if self.sync_db { if self.sync_db {
response.headers_mut().append( let id = context.db.sequence().await;
"X-Patch-Sequence", response
HeaderValue::from_str(&context.db.sequence().await.to_string())?, .headers_mut()
); .append("X-Patch-Sequence", HeaderValue::from_str(&id.to_string())?);
context.sync_db.send_replace(id);
} }
Ok::<_, InvalidHeaderValue>(()) Ok::<_, InvalidHeaderValue>(())
} }