mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
subscribe to subtrees
This commit is contained in:
2
json-ptr
2
json-ptr
Submodule json-ptr updated: 963405175a...17ea820abb
@@ -14,16 +14,15 @@ mod subscriber;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test;
|
mod test;
|
||||||
|
|
||||||
pub use imbl_value as value;
|
|
||||||
pub use imbl_value::Value;
|
pub use imbl_value::Value;
|
||||||
pub use model::{HasModel, Model, ModelExt};
|
pub use model::{HasModel, Model, ModelExt};
|
||||||
pub use patch::{DiffPatch, Dump, Revision};
|
pub use patch::{DiffPatch, Dump, Revision};
|
||||||
pub use patch_db_macro::HasModel;
|
pub use patch_db_macro::HasModel;
|
||||||
pub use store::{PatchDb, Store};
|
pub use store::{PatchDb, Store};
|
||||||
use tokio::sync::TryLockError;
|
use tokio::sync::TryLockError;
|
||||||
pub use {json_patch, json_ptr};
|
pub use {imbl_value as value, json_patch, json_ptr};
|
||||||
|
|
||||||
pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver<Arc<Revision>>;
|
pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver<Revision>;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
|||||||
@@ -83,12 +83,14 @@ impl<T, M: Model<T>> ModelExt<T> for M {}
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use crate as patch_db;
|
|
||||||
use crate::model::sealed::ModelMarker;
|
|
||||||
use imbl_value::{from_value, json, to_value, Value};
|
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
|
use imbl_value::{from_value, json, to_value, Value};
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
use crate as patch_db;
|
||||||
|
|
||||||
/// &mut Model<T> <=> &mut Value
|
/// &mut Model<T> <=> &mut Value
|
||||||
#[repr(transparent)]
|
#[repr(transparent)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -181,7 +183,7 @@ mod test {
|
|||||||
mutate_fn(&mut model);
|
mutate_fn(&mut model);
|
||||||
mutate_fn(&mut model);
|
mutate_fn(&mut model);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
model.as_value(),
|
crate::model::sealed::ModelMarker::as_value(&model),
|
||||||
&json!({
|
&json!({
|
||||||
"a": {
|
"a": {
|
||||||
"b": "Replaced"
|
"b": "Replaced"
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
|
use std::collections::BTreeSet;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
|
||||||
use imbl_value::Value;
|
use imbl_value::Value;
|
||||||
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
|
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
|
||||||
use json_ptr::{JsonPointer, SegList};
|
use json_ptr::{JsonPointer, SegList};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::BTreeSet;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
#[serde(rename_all = "kebab-case")]
|
#[serde(rename_all = "kebab-case")]
|
||||||
@@ -12,6 +12,14 @@ pub struct Revision {
|
|||||||
pub id: u64,
|
pub id: u64,
|
||||||
pub patch: DiffPatch,
|
pub patch: DiffPatch,
|
||||||
}
|
}
|
||||||
|
impl Revision {
|
||||||
|
pub fn for_path<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Revision {
|
||||||
|
Self {
|
||||||
|
id: self.id,
|
||||||
|
patch: self.patch.for_path(ptr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
#[serde(rename_all = "kebab-case")]
|
#[serde(rename_all = "kebab-case")]
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ pub struct Store {
|
|||||||
_lock: OwnedMutexGuard<()>,
|
_lock: OwnedMutexGuard<()>,
|
||||||
persistent: Value,
|
persistent: Value,
|
||||||
revision: u64,
|
revision: u64,
|
||||||
broadcast: Broadcast<Arc<Revision>>,
|
broadcast: Broadcast,
|
||||||
}
|
}
|
||||||
impl Store {
|
impl Store {
|
||||||
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||||
@@ -135,17 +135,17 @@ impl Store {
|
|||||||
) -> Result<T, Error> {
|
) -> Result<T, Error> {
|
||||||
Ok(imbl_value::from_value(self.get_value(ptr))?)
|
Ok(imbl_value::from_value(self.get_value(ptr))?)
|
||||||
}
|
}
|
||||||
pub(crate) fn dump(&self) -> Dump {
|
pub(crate) fn dump<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Dump {
|
||||||
Dump {
|
Dump {
|
||||||
id: self.revision,
|
id: self.revision,
|
||||||
value: self.persistent.clone(),
|
value: ptr.get(&self.persistent).cloned().unwrap_or(Value::Null),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub(crate) fn sequence(&self) -> u64 {
|
pub(crate) fn sequence(&self) -> u64 {
|
||||||
self.revision
|
self.revision
|
||||||
}
|
}
|
||||||
pub(crate) fn subscribe(&mut self) -> Subscriber {
|
pub(crate) fn subscribe(&mut self, ptr: JsonPointer) -> Subscriber {
|
||||||
self.broadcast.subscribe()
|
self.broadcast.subscribe(ptr)
|
||||||
}
|
}
|
||||||
pub(crate) async fn put_value<S: AsRef<str>, V: SegList>(
|
pub(crate) async fn put_value<S: AsRef<str>, V: SegList>(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -264,19 +264,18 @@ impl PatchDb {
|
|||||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
pub async fn dump(&self) -> Dump {
|
pub async fn dump<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Dump {
|
||||||
self.store.read().await.dump()
|
self.store.read().await.dump(ptr)
|
||||||
}
|
}
|
||||||
pub async fn sequence(&self) -> u64 {
|
pub async fn sequence(&self) -> u64 {
|
||||||
self.store.read().await.sequence()
|
self.store.read().await.sequence()
|
||||||
}
|
}
|
||||||
pub async fn dump_and_sub(&self) -> (Dump, Subscriber) {
|
pub async fn dump_and_sub(&self, ptr: JsonPointer) -> (Dump, Subscriber) {
|
||||||
let mut store = self.store.write().await;
|
let mut store = self.store.write().await;
|
||||||
let sub = store.broadcast.subscribe();
|
(store.dump(&ptr), store.broadcast.subscribe(ptr))
|
||||||
(store.dump(), sub)
|
|
||||||
}
|
}
|
||||||
pub async fn subscribe(&self) -> Subscriber {
|
pub async fn subscribe(&self, ptr: JsonPointer) -> Subscriber {
|
||||||
self.store.write().await.subscribe()
|
self.store.write().await.subscribe(ptr)
|
||||||
}
|
}
|
||||||
pub async fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
|
pub async fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
|
||||||
self.store.read().await.exists(ptr)
|
self.store.read().await.exists(ptr)
|
||||||
|
|||||||
@@ -1,25 +1,36 @@
|
|||||||
|
use json_ptr::JsonPointer;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
use crate::Revision;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Broadcast<T: Clone> {
|
struct ScopedSender(JsonPointer, mpsc::UnboundedSender<Revision>);
|
||||||
listeners: Vec<mpsc::UnboundedSender<T>>,
|
impl ScopedSender {
|
||||||
|
fn send(&self, revision: &Revision) -> Result<(), mpsc::error::SendError<Revision>> {
|
||||||
|
self.1.send(revision.for_path(&self.0))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
impl<T: Clone> Default for Broadcast<T> {
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Broadcast {
|
||||||
|
listeners: Vec<ScopedSender>,
|
||||||
|
}
|
||||||
|
impl Default for Broadcast {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
listeners: Vec::new(),
|
listeners: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl<T: Clone> Broadcast<T> {
|
impl Broadcast {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Default::default()
|
Default::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send(&mut self, value: &T) {
|
pub fn send(&mut self, value: &Revision) {
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
while i < self.listeners.len() {
|
while i < self.listeners.len() {
|
||||||
if self.listeners[i].send(value.clone()).is_err() {
|
if self.listeners[i].send(value).is_err() {
|
||||||
self.listeners.swap_remove(i);
|
self.listeners.swap_remove(i);
|
||||||
} else {
|
} else {
|
||||||
i += 1;
|
i += 1;
|
||||||
@@ -27,9 +38,9 @@ impl<T: Clone> Broadcast<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver<T> {
|
pub fn subscribe(&mut self, ptr: JsonPointer) -> mpsc::UnboundedReceiver<Revision> {
|
||||||
let (send, recv) = mpsc::unbounded_channel();
|
let (send, recv) = mpsc::unbounded_channel();
|
||||||
self.listeners.push(send);
|
self.listeners.push(ScopedSender(ptr, send));
|
||||||
recv
|
recv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user