add DbWatch

This commit is contained in:
Aiden McClelland
2025-12-02 11:32:30 -07:00
parent 90b336d6a9
commit bdb5a10114
4 changed files with 164 additions and 8 deletions

View File

@@ -4,8 +4,6 @@ use std::sync::Arc;
use json_ptr::JsonPointer;
use thiserror::Error;
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
mod model;
mod patch;
mod store;
@@ -15,15 +13,14 @@ mod subscriber;
mod test;
pub use imbl_value::Value;
pub use model::{DestructureMut, HasModel, Model, ModelExt};
pub use model::{DestructureMut, HasModel, Model, ModelExt, Pointer};
pub use patch::{DiffPatch, Dump, Revision};
pub use patch_db_macro::HasModel;
pub use store::{MutateResult, PatchDb, Store, TypedPatchDb};
pub use subscriber::{DbWatch, Subscriber, TypedDbWatch};
use tokio::sync::TryLockError;
pub use {imbl_value as value, json_patch, json_ptr};
pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver<Revision>;
#[derive(Error, Debug)]
pub enum Error {
#[error("IO Error: {0}")]

View File

@@ -1,4 +1,31 @@
use std::marker::PhantomData;
use imbl_value::{InternedString, Value};
use json_ptr::JsonPointer;
pub struct Pointer<T> {
ptr: JsonPointer,
phantom: PhantomData<T>,
}
impl<T> Default for Pointer<T> {
fn default() -> Self {
Self {
ptr: JsonPointer::default(),
phantom: PhantomData,
}
}
}
impl<T> Pointer<T> {
pub fn unwrap(self) -> JsonPointer {
self.ptr
}
}
impl<T> std::ops::Deref for Pointer<T> {
type Target = JsonPointer;
fn deref(&self) -> &Self::Target {
&self.ptr
}
}
pub trait HasModel: Sized {
type Model: Model<Self>;

View File

@@ -20,7 +20,7 @@ use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
use crate::patch::{diff, DiffPatch, Dump, Revision};
use crate::subscriber::Broadcast;
use crate::{Error, HasModel, Subscriber};
use crate::{DbWatch, Error, HasModel, Subscriber};
lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
@@ -325,6 +325,10 @@ impl PatchDb {
let mut store = self.store.write().await;
(store.dump(&ptr), store.broadcast.subscribe(ptr))
}
pub async fn watch(&self, ptr: JsonPointer) -> DbWatch {
let (dump, sub) = self.dump_and_sub(ptr).await;
DbWatch::new(dump, sub)
}
pub async fn subscribe(&self, ptr: JsonPointer) -> Subscriber {
self.store.write().await.subscribe(ptr)
}

View File

@@ -1,7 +1,15 @@
use std::marker::PhantomData;
use std::task::{ready, Poll};
use futures::Stream;
use imbl_value::Value;
use json_patch::patch;
use json_ptr::JsonPointer;
use tokio::sync::mpsc;
use crate::Revision;
use crate::{Dump, Error, HasModel, ModelExt, Revision};
pub type Subscriber = mpsc::UnboundedReceiver<Revision>;
#[derive(Debug)]
struct ScopedSender(JsonPointer, mpsc::UnboundedSender<Revision>);
@@ -42,9 +50,129 @@ impl Broadcast {
}
}
pub fn subscribe(&mut self, ptr: JsonPointer) -> mpsc::UnboundedReceiver<Revision> {
pub fn subscribe(&mut self, ptr: JsonPointer) -> Subscriber {
let (send, recv) = mpsc::unbounded_channel();
self.listeners.push(ScopedSender(ptr, send));
recv
}
}
#[derive(Debug)]
pub struct DbWatch {
state: Value,
subscriber: Subscriber,
seen: bool,
}
impl DbWatch {
pub fn new(dump: Dump, sub: Subscriber) -> Self {
Self {
state: dump.value,
subscriber: sub,
seen: false,
}
}
pub fn typed<T>(self) -> TypedDbWatch<T> {
TypedDbWatch {
watch: self,
_phantom: PhantomData,
}
}
pub fn sync(&mut self) -> Result<(), Error> {
while let Ok(rev) = self.subscriber.try_recv() {
patch(&mut self.state, &rev.patch.0)?;
self.seen = false;
}
Ok(())
}
pub fn peek(&mut self) -> Result<Value, Error> {
self.sync()?;
Ok(self.state.clone())
}
pub fn peek_and_mark_seen(&mut self) -> Result<Value, Error> {
self.sync()?;
self.seen = true;
Ok(self.state.clone())
}
pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Error>> {
if !self.seen {
self.seen = true;
return Poll::Ready(Ok(()));
}
let rev =
ready!(self.subscriber.poll_recv(cx)).ok_or(mpsc::error::TryRecvError::Disconnected)?;
patch(&mut self.state, &rev.patch.0)?;
Poll::Ready(Ok(()))
}
pub async fn changed(&mut self) -> Result<(), Error> {
futures::future::poll_fn(|cx| self.poll_changed(cx)).await
}
}
impl Unpin for DbWatch {}
impl Stream for DbWatch {
type Item = Result<Value, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Err(e) = ready!(this.poll_changed(cx)) {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Some(Ok(this.state.clone())))
}
}
pub struct TypedDbWatch<T> {
watch: DbWatch,
_phantom: PhantomData<T>,
}
impl<T> AsRef<DbWatch> for TypedDbWatch<T> {
fn as_ref(&self) -> &DbWatch {
&self.watch
}
}
impl<T> AsMut<DbWatch> for TypedDbWatch<T> {
fn as_mut(&mut self) -> &mut DbWatch {
&mut self.watch
}
}
impl<T> TypedDbWatch<T> {
pub fn untyped(self) -> DbWatch {
self.watch
}
pub fn sync(&mut self) -> Result<(), Error> {
self.as_mut().sync()
}
pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Error>> {
self.as_mut().poll_changed(cx)
}
pub async fn changed(&mut self) -> Result<(), Error> {
self.as_mut().changed().await
}
}
impl<T: HasModel> TypedDbWatch<T> {
pub fn peek(&mut self) -> Result<T::Model, Error> {
let peek = self.as_mut().peek()?;
Ok(<T::Model as ModelExt<T>>::from_value(peek))
}
pub fn peek_and_mark_seen(&mut self) -> Result<T::Model, Error> {
let peek = self.as_mut().peek_and_mark_seen()?;
Ok(<T::Model as ModelExt<T>>::from_value(peek))
}
}
impl<T> Unpin for TypedDbWatch<T> {}
impl<T: HasModel> Stream for TypedDbWatch<T> {
type Item = Result<T::Model, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Err(e) = ready!(this.poll_changed(cx)) {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Some(Ok(<T::Model as ModelExt<T>>::from_value(
this.watch.state.clone(),
))))
}
}