From 9ff57e33d3789c004fe680326034e34f57ea3452 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Mon, 10 May 2021 15:40:46 -0600 Subject: [PATCH] utility fns --- patch-db-macro-internals/src/lib.rs | 2 +- patch-db/Cargo.toml | 17 ++--- patch-db/src/handle.rs | 24 +++++++ patch-db/src/lib.rs | 3 + patch-db/src/model.rs | 105 +++++++++++++++++++++++----- patch-db/src/patch.rs | 21 ++++++ patch-db/src/store.rs | 16 +++++ patch-db/src/transaction.rs | 18 +++++ 8 files changed, 181 insertions(+), 25 deletions(-) diff --git a/patch-db-macro-internals/src/lib.rs b/patch-db-macro-internals/src/lib.rs index 5dad4d2..fef8cf4 100644 --- a/patch-db-macro-internals/src/lib.rs +++ b/patch-db-macro-internals/src/lib.rs @@ -247,7 +247,7 @@ fn build_model_struct( } impl #model_name { #( - pub fn #child_fn_name(&self) -> #child_model { + pub fn #child_fn_name(self) -> #child_model { self.0.child(#child_path).into() } )* diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index ba92938..d637d55 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -1,14 +1,14 @@ [package] -name = "patch-db" -version = "0.1.0" authors = ["Aiden McClelland "] -edition = "2018" categories = ["database-implementations"] -keywords = ["json", "json-pointer", "json-patch"] description = "A database that tracks state updates as RFC 6902 JSON Patches" -repository = "https://github.com/Start9Labs/patch-db" +edition = "2018" +keywords = ["json", "json-pointer", "json-patch"] license = "MIT" +name = "patch-db" readme = "README.md" +repository = "https://github.com/Start9Labs/patch-db" +version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -16,17 +16,18 @@ readme = "README.md" async-trait = "0.1.42" fd-lock-rs = "0.1.3" futures = "0.3.8" +hashlink = "0.6.0" json-patch = { path = "../json-patch" } json-ptr = { path = "../json-ptr" } +lazy_static = "1.4.0" nix = "0.20.0" +patch-db-macro = { path = "../patch-db-macro" } qutex-2 = { path = "../qutex" } serde = { version = "1.0.118", features = ["rc"] } -serde_json = "1.0.61" serde_cbor = { path = "../cbor" } +serde_json = "1.0.61" thiserror = "1.0.23" tokio = { version = "1.0.1", features = ["sync", "fs", "rt", "io-util", "macros"] } -patch-db-macro = { path = "../patch-db-macro" } -lazy_static = "1.4.0" [dev-dependencies] proptest = "1.0.0" diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index 421abdd..d64484f 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; +use hashlink::LinkedHashSet; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -24,6 +25,11 @@ pub trait DbHandle: Sized + Send + Sync { ptr: &JsonPointer, store_read_lock: Option>, ) -> Result; + async fn keys + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result, Error>; async fn get_value + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, @@ -93,6 +99,13 @@ impl DbHandle for &mut Handle { ) -> Result { (*self).exists(ptr, store_read_lock).await } + async fn keys + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result, Error> { + (*self).keys(ptr, store_read_lock).await + } async fn get_value + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, @@ -178,6 +191,17 @@ impl DbHandle for PatchDbHandle { self.db.exists(ptr).await } } + async fn keys + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result, Error> { + if let Some(lock) = store_read_lock { + lock.keys(ptr) + } else { + self.db.keys(ptr).await + } + } async fn get_value + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index 0d1501f..3c61e4f 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -1,6 +1,7 @@ use std::io::Error as IOError; use std::sync::Arc; +use json_ptr::JsonPointer; use thiserror::Error; use tokio::sync::broadcast::error::TryRecvError; @@ -47,4 +48,6 @@ pub enum Error { CacheCorrupted(Arc), #[error("Subscriber Error: {0}")] Subscriber(#[from] TryRecvError), + #[error("Node Does Not Exist: {0}")] + NodeDoesNotExist(JsonPointer), } diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 083b039..01467c9 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -3,6 +3,7 @@ use std::hash::Hash; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; +use hashlink::LinkedHashSet; use json_ptr::JsonPointer; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -75,8 +76,8 @@ where }) } - pub fn child Deserialize<'de>>(&self, index: &str) -> Model { - let mut ptr = self.ptr.clone(); + pub fn child Deserialize<'de>>(self, index: &str) -> Model { + let mut ptr = self.ptr; ptr.push_end(index); Model { ptr, @@ -133,11 +134,18 @@ where } pub trait HasModel: Serialize + for<'de> Deserialize<'de> { - type Model: From - + AsRef - + Into - + From> - + Clone; + type Model: ModelFor; +} + +pub trait ModelFor Deserialize<'de>>: + From + AsRef + Into + From> + Clone +{ +} +impl< + T: Serialize + for<'de> Deserialize<'de>, + U: From + AsRef + Into + From> + Clone, + > ModelFor for U +{ } #[derive(Debug)] @@ -186,11 +194,53 @@ impl Deserialize<'de>> HasModel for Box { #[derive(Debug)] pub struct OptionModel Deserialize<'de>>(T::Model); impl Deserialize<'de>> OptionModel { + pub async fn lock(&self, db: &mut Db, lock: LockType) { + db.lock(self.0.as_ref(), lock).await + } + + pub async fn get(&self, db: &mut Db) -> Result>, Error> { + self.lock(db, LockType::Read).await; + Ok(ModelData(db.get(self.0.as_ref()).await?)) + } + + pub async fn get_mut(&self, db: &mut Db) -> Result, Error> { + self.lock(db, LockType::Write).await; + let original = db.get_value(self.0.as_ref(), None).await?; + let current = serde_json::from_value(original.clone())?; + Ok(ModelDataMut { + original, + current, + ptr: self.0.clone().into(), + }) + } + pub async fn exists(&self, db: &mut Db) -> Result { - db.lock(self.as_ref(), LockType::Read).await; + self.lock(db, LockType::Read).await; Ok(db.exists(&self.as_ref(), None).await?) } + pub fn map< + F: FnOnce(T::Model) -> V, + U: Serialize + for<'de> Deserialize<'de>, + V: ModelFor, + >( + self, + f: F, + ) -> Model> { + Into::::into(f(self.0)).into() + } + + pub fn and_then< + F: FnOnce(T::Model) -> V, + U: Serialize + for<'de> Deserialize<'de>, + V: ModelFor>, + >( + self, + f: F, + ) -> V { + Into::::into(f(self.0)).into() + } + pub async fn check(self, db: &mut Db) -> Result, Error> { Ok(if self.exists(db).await? { Some(self.0) @@ -199,6 +249,14 @@ impl Deserialize<'de>> OptionModel { }) } + pub async fn expect(self, db: &mut Db) -> Result { + if self.exists(db).await? { + Ok(self.0) + } else { + Err(Error::NodeDoesNotExist(self.0.into())) + } + } + pub async fn delete(&self, db: &mut Db) -> Result<(), Error> { db.lock(self.as_ref(), LockType::Write).await; db.put(self.as_ref(), &Value::Null).await @@ -256,13 +314,13 @@ impl Deserialize<'de>> Deref for VecModel { } } impl Deserialize<'de>> VecModel { - pub fn idx(&self, idx: usize) -> Model> { - self.child(&format!("{}", idx)) + pub fn idx(self, idx: usize) -> Model> { + self.0.child(&format!("{}", idx)) } } impl Deserialize<'de>> VecModel { - pub fn idx_model(&self, idx: usize) -> OptionModel { - self.child(&format!("{}", idx)).into() + pub fn idx_model(self, idx: usize) -> OptionModel { + self.0.child(&format!("{}", idx)).into() } } impl Deserialize<'de>> From>> for VecModel { @@ -350,8 +408,23 @@ where T: Serialize + for<'de> Deserialize<'de> + Map, T::Value: Serialize + for<'de> Deserialize<'de>, { - pub fn idx(&self, idx: &::Key) -> Model::Value>> { - self.child(idx.as_ref()) + pub fn idx(self, idx: &::Key) -> Model::Value>> { + self.0.child(idx.as_ref()) + } +} +impl MapModel +where + T: Serialize + for<'de> Deserialize<'de> + Map, + T::Key: Hash + Eq + for<'de> Deserialize<'de>, + T::Value: Serialize + for<'de> Deserialize<'de>, +{ + pub async fn keys(&self, db: &mut Db) -> Result, Error> { + db.lock(self.as_ref(), LockType::Read).await; + let set = db.keys(self.as_ref(), None).await?; + Ok(set + .into_iter() + .map(|s| serde_json::from_value(Value::String(s))) + .collect::>()?) } } impl MapModel @@ -359,8 +432,8 @@ where T: Serialize + for<'de> Deserialize<'de> + Map, T::Value: Serialize + for<'de> Deserialize<'de> + HasModel, { - pub fn idx_model(&self, idx: &::Key) -> OptionModel<::Value> { - self.child(idx.as_ref()).into() + pub fn idx_model(self, idx: &::Key) -> OptionModel<::Value> { + self.0.child(idx.as_ref()).into() } } impl From> for MapModel diff --git a/patch-db/src/patch.rs b/patch-db/src/patch.rs index 7ecb19b..e5e16da 100644 --- a/patch-db/src/patch.rs +++ b/patch-db/src/patch.rs @@ -1,5 +1,6 @@ use std::ops::Deref; +use hashlink::LinkedHashSet; use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation}; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; @@ -176,6 +177,26 @@ impl DiffPatch { } res } + + pub fn keys(&self, mut keys: LinkedHashSet) -> LinkedHashSet { + for op in &(self.0).0 { + match op { + PatchOperation::Add(a) => { + if a.path.len() == 1 { + keys.insert(a.path.get_segment(0).unwrap().to_owned()); + } + } + PatchOperation::Replace(_) => (), + PatchOperation::Remove(a) => { + if a.path.len() == 1 { + keys.remove(a.path.get_segment(0).unwrap()); + } + } + _ => unreachable!(), + } + } + keys + } } impl Default for DiffPatch { fn default() -> Self { diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 6211fbe..fd69604 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use fd_lock_rs::FdLock; +use hashlink::LinkedHashSet; use json_ptr::{JsonPointer, SegList}; use lazy_static::lazy_static; use qutex_2::{Guard, Qutex}; @@ -128,6 +129,15 @@ impl Store { ) -> Result { Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null) } + pub(crate) fn keys, V: SegList>( + &self, + ptr: &JsonPointer, + ) -> Result, Error> { + Ok(match ptr.get(self.get_data()?).unwrap_or(&Value::Null) { + Value::Object(o) => o.keys().cloned().collect(), + _ => LinkedHashSet::new(), + }) + } pub(crate) fn get Deserialize<'de>, S: AsRef, V: SegList>( &self, ptr: &JsonPointer, @@ -213,6 +223,12 @@ impl PatchDb { ) -> Result { self.store.read().await.exists(ptr) } + pub async fn keys, V: SegList>( + &self, + ptr: &JsonPointer, + ) -> Result, Error> { + self.store.read().await.keys(ptr) + } pub async fn get Deserialize<'de>, S: AsRef, V: SegList>( &self, ptr: &JsonPointer, diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index 7be7df2..b68e2d8 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; +use hashlink::LinkedHashSet; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -99,6 +100,23 @@ impl DbHandle for Transaction { }; Ok(self.updates.for_path(ptr).exists().unwrap_or(exists)) } + async fn keys + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result, Error> { + let keys = { + let store_lock = self.parent.store(); + let store = if let Some(store_read_lock) = store_read_lock { + store_read_lock + } else { + store_lock.read().await + }; + self.rebase()?; + self.parent.keys(ptr, Some(store)).await? + }; + Ok(self.updates.for_path(ptr).keys(keys)) + } async fn get_value + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer,