From 899c63afb5367c66bca6cafb98164d8cbd78f7e3 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Mon, 19 Feb 2024 13:46:09 -0700 Subject: [PATCH] subscribe to subtrees --- json-ptr | 2 +- patch-db/src/lib.rs | 5 ++--- patch-db/src/model.rs | 12 +++++++----- patch-db/src/patch.rs | 10 +++++++++- patch-db/src/store.rs | 23 +++++++++++------------ patch-db/src/subscriber.rs | 27 +++++++++++++++++++-------- 6 files changed, 49 insertions(+), 30 deletions(-) diff --git a/json-ptr b/json-ptr index 9634051..17ea820 160000 --- a/json-ptr +++ b/json-ptr @@ -1 +1 @@ -Subproject commit 963405175ac72b94c93043b8da326527824887dd +Subproject commit 17ea820abb3d6078ea9ecc853674bf4f7abcbdea diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index c85d675..1bf595a 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -14,16 +14,15 @@ mod subscriber; #[cfg(test)] mod test; -pub use imbl_value as value; pub use imbl_value::Value; pub use model::{HasModel, Model, ModelExt}; pub use patch::{DiffPatch, Dump, Revision}; pub use patch_db_macro::HasModel; pub use store::{PatchDb, Store}; use tokio::sync::TryLockError; -pub use {json_patch, json_ptr}; +pub use {imbl_value as value, json_patch, json_ptr}; -pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver>; +pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver; #[derive(Error, Debug)] pub enum Error { diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 4fa7d33..0a68cdd 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -83,12 +83,14 @@ impl> ModelExt for M {} #[cfg(test)] mod test { - use crate as patch_db; - use crate::model::sealed::ModelMarker; - use imbl_value::{from_value, json, to_value, Value}; - use serde::{de::DeserializeOwned, Serialize}; use std::marker::PhantomData; + use imbl_value::{from_value, json, to_value, Value}; + use serde::de::DeserializeOwned; + use serde::Serialize; + + use crate as patch_db; + /// &mut Model <=> &mut Value #[repr(transparent)] #[derive(Debug)] @@ -181,7 +183,7 @@ mod test { mutate_fn(&mut model); mutate_fn(&mut model); assert_eq!( - model.as_value(), + crate::model::sealed::ModelMarker::as_value(&model), &json!({ "a": { "b": "Replaced" diff --git a/patch-db/src/patch.rs b/patch-db/src/patch.rs index 19111cb..bfaef46 100644 --- a/patch-db/src/patch.rs +++ b/patch-db/src/patch.rs @@ -1,10 +1,10 @@ +use std::collections::BTreeSet; use std::ops::Deref; use imbl_value::Value; use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation}; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; -use std::collections::BTreeSet; #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] @@ -12,6 +12,14 @@ pub struct Revision { pub id: u64, pub patch: DiffPatch, } +impl Revision { + pub fn for_path, V: SegList>(&self, ptr: &JsonPointer) -> Revision { + Self { + id: self.id, + patch: self.patch.for_path(ptr), + } + } +} #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 61b5283..9a12c91 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -31,7 +31,7 @@ pub struct Store { _lock: OwnedMutexGuard<()>, persistent: Value, revision: u64, - broadcast: Broadcast>, + broadcast: Broadcast, } impl Store { pub(crate) async fn open>(path: P) -> Result { @@ -135,17 +135,17 @@ impl Store { ) -> Result { Ok(imbl_value::from_value(self.get_value(ptr))?) } - pub(crate) fn dump(&self) -> Dump { + pub(crate) fn dump, V: SegList>(&self, ptr: &JsonPointer) -> Dump { Dump { id: self.revision, - value: self.persistent.clone(), + value: ptr.get(&self.persistent).cloned().unwrap_or(Value::Null), } } pub(crate) fn sequence(&self) -> u64 { self.revision } - pub(crate) fn subscribe(&mut self) -> Subscriber { - self.broadcast.subscribe() + pub(crate) fn subscribe(&mut self, ptr: JsonPointer) -> Subscriber { + self.broadcast.subscribe(ptr) } pub(crate) async fn put_value, V: SegList>( &mut self, @@ -264,19 +264,18 @@ impl PatchDb { store: Arc::new(RwLock::new(Store::open(path).await?)), }) } - pub async fn dump(&self) -> Dump { - self.store.read().await.dump() + pub async fn dump, V: SegList>(&self, ptr: &JsonPointer) -> Dump { + self.store.read().await.dump(ptr) } pub async fn sequence(&self) -> u64 { self.store.read().await.sequence() } - pub async fn dump_and_sub(&self) -> (Dump, Subscriber) { + pub async fn dump_and_sub(&self, ptr: JsonPointer) -> (Dump, Subscriber) { let mut store = self.store.write().await; - let sub = store.broadcast.subscribe(); - (store.dump(), sub) + (store.dump(&ptr), store.broadcast.subscribe(ptr)) } - pub async fn subscribe(&self) -> Subscriber { - self.store.write().await.subscribe() + pub async fn subscribe(&self, ptr: JsonPointer) -> Subscriber { + self.store.write().await.subscribe(ptr) } pub async fn exists, V: SegList>(&self, ptr: &JsonPointer) -> bool { self.store.read().await.exists(ptr) diff --git a/patch-db/src/subscriber.rs b/patch-db/src/subscriber.rs index bc04aa5..c30d802 100644 --- a/patch-db/src/subscriber.rs +++ b/patch-db/src/subscriber.rs @@ -1,25 +1,36 @@ +use json_ptr::JsonPointer; use tokio::sync::mpsc; +use crate::Revision; + #[derive(Debug)] -pub struct Broadcast { - listeners: Vec>, +struct ScopedSender(JsonPointer, mpsc::UnboundedSender); +impl ScopedSender { + fn send(&self, revision: &Revision) -> Result<(), mpsc::error::SendError> { + self.1.send(revision.for_path(&self.0)) + } } -impl Default for Broadcast { + +#[derive(Debug)] +pub struct Broadcast { + listeners: Vec, +} +impl Default for Broadcast { fn default() -> Self { Self { listeners: Vec::new(), } } } -impl Broadcast { +impl Broadcast { pub fn new() -> Self { Default::default() } - pub fn send(&mut self, value: &T) { + pub fn send(&mut self, value: &Revision) { let mut i = 0; while i < self.listeners.len() { - if self.listeners[i].send(value.clone()).is_err() { + if self.listeners[i].send(value).is_err() { self.listeners.swap_remove(i); } else { i += 1; @@ -27,9 +38,9 @@ impl Broadcast { } } - pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver { + pub fn subscribe(&mut self, ptr: JsonPointer) -> mpsc::UnboundedReceiver { let (send, recv) = mpsc::unbounded_channel(); - self.listeners.push(send); + self.listeners.push(ScopedSender(ptr, send)); recv } }