diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index 95dbfee..006973d 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -4,8 +4,6 @@ use std::sync::Arc; use json_ptr::JsonPointer; use thiserror::Error; -// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour - mod model; mod patch; mod store; @@ -15,15 +13,14 @@ mod subscriber; mod test; pub use imbl_value::Value; -pub use model::{DestructureMut, HasModel, Model, ModelExt}; +pub use model::{DestructureMut, HasModel, Model, ModelExt, Pointer}; pub use patch::{DiffPatch, Dump, Revision}; pub use patch_db_macro::HasModel; pub use store::{MutateResult, PatchDb, Store, TypedPatchDb}; +pub use subscriber::{DbWatch, Subscriber, TypedDbWatch}; use tokio::sync::TryLockError; pub use {imbl_value as value, json_patch, json_ptr}; -pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver; - #[derive(Error, Debug)] pub enum Error { #[error("IO Error: {0}")] diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 3444aee..2a9dd30 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -1,4 +1,31 @@ +use std::marker::PhantomData; + use imbl_value::{InternedString, Value}; +use json_ptr::JsonPointer; + +pub struct Pointer { + ptr: JsonPointer, + phantom: PhantomData, +} +impl Default for Pointer { + fn default() -> Self { + Self { + ptr: JsonPointer::default(), + phantom: PhantomData, + } + } +} +impl Pointer { + pub fn unwrap(self) -> JsonPointer { + self.ptr + } +} +impl std::ops::Deref for Pointer { + type Target = JsonPointer; + fn deref(&self) -> &Self::Target { + &self.ptr + } +} pub trait HasModel: Sized { type Model: Model; diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 201bf56..cb569da 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -20,7 +20,7 @@ use tokio::sync::{Mutex, OwnedMutexGuard, RwLock}; use crate::patch::{diff, DiffPatch, Dump, Revision}; use crate::subscriber::Broadcast; -use crate::{Error, HasModel, Subscriber}; +use crate::{DbWatch, Error, HasModel, Subscriber}; lazy_static! { static ref OPEN_STORES: Mutex>>> = Mutex::new(HashMap::new()); @@ -325,6 +325,10 @@ impl PatchDb { let mut store = self.store.write().await; (store.dump(&ptr), store.broadcast.subscribe(ptr)) } + pub async fn watch(&self, ptr: JsonPointer) -> DbWatch { + let (dump, sub) = self.dump_and_sub(ptr).await; + DbWatch::new(dump, sub) + } pub async fn subscribe(&self, ptr: JsonPointer) -> Subscriber { self.store.write().await.subscribe(ptr) } diff --git a/patch-db/src/subscriber.rs b/patch-db/src/subscriber.rs index f9aa9c2..c26e1f8 100644 --- a/patch-db/src/subscriber.rs +++ b/patch-db/src/subscriber.rs @@ -1,7 +1,15 @@ +use std::marker::PhantomData; +use std::task::{ready, Poll}; + +use futures::Stream; +use imbl_value::Value; +use json_patch::patch; use json_ptr::JsonPointer; use tokio::sync::mpsc; -use crate::Revision; +use crate::{Dump, Error, HasModel, ModelExt, Revision}; + +pub type Subscriber = mpsc::UnboundedReceiver; #[derive(Debug)] struct ScopedSender(JsonPointer, mpsc::UnboundedSender); @@ -42,9 +50,129 @@ impl Broadcast { } } - pub fn subscribe(&mut self, ptr: JsonPointer) -> mpsc::UnboundedReceiver { + pub fn subscribe(&mut self, ptr: JsonPointer) -> Subscriber { let (send, recv) = mpsc::unbounded_channel(); self.listeners.push(ScopedSender(ptr, send)); recv } } + +#[derive(Debug)] +pub struct DbWatch { + state: Value, + subscriber: Subscriber, + seen: bool, +} +impl DbWatch { + pub fn new(dump: Dump, sub: Subscriber) -> Self { + Self { + state: dump.value, + subscriber: sub, + seen: false, + } + } + pub fn typed(self) -> TypedDbWatch { + TypedDbWatch { + watch: self, + _phantom: PhantomData, + } + } + pub fn sync(&mut self) -> Result<(), Error> { + while let Ok(rev) = self.subscriber.try_recv() { + patch(&mut self.state, &rev.patch.0)?; + self.seen = false; + } + Ok(()) + } + pub fn peek(&mut self) -> Result { + self.sync()?; + Ok(self.state.clone()) + } + pub fn peek_and_mark_seen(&mut self) -> Result { + self.sync()?; + self.seen = true; + Ok(self.state.clone()) + } + pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + if !self.seen { + self.seen = true; + return Poll::Ready(Ok(())); + } + let rev = + ready!(self.subscriber.poll_recv(cx)).ok_or(mpsc::error::TryRecvError::Disconnected)?; + patch(&mut self.state, &rev.patch.0)?; + Poll::Ready(Ok(())) + } + pub async fn changed(&mut self) -> Result<(), Error> { + futures::future::poll_fn(|cx| self.poll_changed(cx)).await + } +} +impl Unpin for DbWatch {} +impl Stream for DbWatch { + type Item = Result; + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + if let Err(e) = ready!(this.poll_changed(cx)) { + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(Some(Ok(this.state.clone()))) + } +} + +pub struct TypedDbWatch { + watch: DbWatch, + _phantom: PhantomData, +} +impl AsRef for TypedDbWatch { + fn as_ref(&self) -> &DbWatch { + &self.watch + } +} +impl AsMut for TypedDbWatch { + fn as_mut(&mut self) -> &mut DbWatch { + &mut self.watch + } +} +impl TypedDbWatch { + pub fn untyped(self) -> DbWatch { + self.watch + } + pub fn sync(&mut self) -> Result<(), Error> { + self.as_mut().sync() + } + pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.as_mut().poll_changed(cx) + } + pub async fn changed(&mut self) -> Result<(), Error> { + self.as_mut().changed().await + } +} +impl TypedDbWatch { + pub fn peek(&mut self) -> Result { + let peek = self.as_mut().peek()?; + Ok(>::from_value(peek)) + } + pub fn peek_and_mark_seen(&mut self) -> Result { + let peek = self.as_mut().peek_and_mark_seen()?; + Ok(>::from_value(peek)) + } +} +impl Unpin for TypedDbWatch {} +impl Stream for TypedDbWatch { + type Item = Result; + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + if let Err(e) = ready!(this.poll_changed(cx)) { + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(Some(Ok(>::from_value( + this.watch.state.clone(), + )))) + } +}