utility fns

This commit is contained in:
Aiden McClelland
2021-05-10 15:40:46 -06:00
committed by Aiden McClelland
parent b74a54390e
commit 9ff57e33d3
8 changed files with 181 additions and 25 deletions

View File

@@ -247,7 +247,7 @@ fn build_model_struct(
}
impl #model_name {
#(
pub fn #child_fn_name(&self) -> #child_model {
pub fn #child_fn_name(self) -> #child_model {
self.0.child(#child_path).into()
}
)*

View File

@@ -1,14 +1,14 @@
[package]
name = "patch-db"
version = "0.1.0"
authors = ["Aiden McClelland <aiden@start9labs.com>"]
edition = "2018"
categories = ["database-implementations"]
keywords = ["json", "json-pointer", "json-patch"]
description = "A database that tracks state updates as RFC 6902 JSON Patches"
repository = "https://github.com/Start9Labs/patch-db"
edition = "2018"
keywords = ["json", "json-pointer", "json-patch"]
license = "MIT"
name = "patch-db"
readme = "README.md"
repository = "https://github.com/Start9Labs/patch-db"
version = "0.1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -16,17 +16,18 @@ readme = "README.md"
async-trait = "0.1.42"
fd-lock-rs = "0.1.3"
futures = "0.3.8"
hashlink = "0.6.0"
json-patch = { path = "../json-patch" }
json-ptr = { path = "../json-ptr" }
lazy_static = "1.4.0"
nix = "0.20.0"
patch-db-macro = { path = "../patch-db-macro" }
qutex-2 = { path = "../qutex" }
serde = { version = "1.0.118", features = ["rc"] }
serde_json = "1.0.61"
serde_cbor = { path = "../cbor" }
serde_json = "1.0.61"
thiserror = "1.0.23"
tokio = { version = "1.0.1", features = ["sync", "fs", "rt", "io-util", "macros"] }
patch-db-macro = { path = "../patch-db-macro" }
lazy_static = "1.4.0"
[dev-dependencies]
proptest = "1.0.0"

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use hashlink::LinkedHashSet;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -24,6 +25,11 @@ pub trait DbHandle: Sized + Send + Sync {
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error>;
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<LinkedHashSet<String>, Error>;
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -93,6 +99,13 @@ impl<Handle: DbHandle + Send + Sync> DbHandle for &mut Handle {
) -> Result<bool, Error> {
(*self).exists(ptr, store_read_lock).await
}
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<LinkedHashSet<String>, Error> {
(*self).keys(ptr, store_read_lock).await
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -178,6 +191,17 @@ impl DbHandle for PatchDbHandle {
self.db.exists(ptr).await
}
}
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<LinkedHashSet<String>, Error> {
if let Some(lock) = store_read_lock {
lock.keys(ptr)
} else {
self.db.keys(ptr).await
}
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,

View File

@@ -1,6 +1,7 @@
use std::io::Error as IOError;
use std::sync::Arc;
use json_ptr::JsonPointer;
use thiserror::Error;
use tokio::sync::broadcast::error::TryRecvError;
@@ -47,4 +48,6 @@ pub enum Error {
CacheCorrupted(Arc<IOError>),
#[error("Subscriber Error: {0}")]
Subscriber(#[from] TryRecvError),
#[error("Node Does Not Exist: {0}")]
NodeDoesNotExist(JsonPointer),
}

View File

@@ -3,6 +3,7 @@ use std::hash::Hash;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use hashlink::LinkedHashSet;
use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -75,8 +76,8 @@ where
})
}
pub fn child<C: Serialize + for<'de> Deserialize<'de>>(&self, index: &str) -> Model<C> {
let mut ptr = self.ptr.clone();
pub fn child<C: Serialize + for<'de> Deserialize<'de>>(self, index: &str) -> Model<C> {
let mut ptr = self.ptr;
ptr.push_end(index);
Model {
ptr,
@@ -133,11 +134,18 @@ where
}
pub trait HasModel: Serialize + for<'de> Deserialize<'de> {
type Model: From<JsonPointer>
+ AsRef<JsonPointer>
+ Into<JsonPointer>
+ From<Model<Self>>
+ Clone;
type Model: ModelFor<Self>;
}
pub trait ModelFor<T: Serialize + for<'de> Deserialize<'de>>:
From<JsonPointer> + AsRef<JsonPointer> + Into<JsonPointer> + From<Model<T>> + Clone
{
}
impl<
T: Serialize + for<'de> Deserialize<'de>,
U: From<JsonPointer> + AsRef<JsonPointer> + Into<JsonPointer> + From<Model<T>> + Clone,
> ModelFor<T> for U
{
}
#[derive(Debug)]
@@ -186,11 +194,53 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
#[derive(Debug)]
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
db.lock(self.0.as_ref(), lock).await
}
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<Option<T>>, Error> {
self.lock(db, LockType::Read).await;
Ok(ModelData(db.get(self.0.as_ref()).await?))
}
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
self.lock(db, LockType::Write).await;
let original = db.get_value(self.0.as_ref(), None).await?;
let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut {
original,
current,
ptr: self.0.clone().into(),
})
}
pub async fn exists<Db: DbHandle>(&self, db: &mut Db) -> Result<bool, Error> {
db.lock(self.as_ref(), LockType::Read).await;
self.lock(db, LockType::Read).await;
Ok(db.exists(&self.as_ref(), None).await?)
}
pub fn map<
F: FnOnce(T::Model) -> V,
U: Serialize + for<'de> Deserialize<'de>,
V: ModelFor<U>,
>(
self,
f: F,
) -> Model<Option<U>> {
Into::<JsonPointer>::into(f(self.0)).into()
}
pub fn and_then<
F: FnOnce(T::Model) -> V,
U: Serialize + for<'de> Deserialize<'de>,
V: ModelFor<Option<U>>,
>(
self,
f: F,
) -> V {
Into::<JsonPointer>::into(f(self.0)).into()
}
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
Ok(if self.exists(db).await? {
Some(self.0)
@@ -199,6 +249,14 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
})
}
pub async fn expect<Db: DbHandle>(self, db: &mut Db) -> Result<T::Model, Error> {
if self.exists(db).await? {
Ok(self.0)
} else {
Err(Error::NodeDoesNotExist(self.0.into()))
}
}
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
db.lock(self.as_ref(), LockType::Write).await;
db.put(self.as_ref(), &Value::Null).await
@@ -256,13 +314,13 @@ impl<T: Serialize + for<'de> Deserialize<'de>> Deref for VecModel<T> {
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> VecModel<T> {
pub fn idx(&self, idx: usize) -> Model<Option<T>> {
self.child(&format!("{}", idx))
pub fn idx(self, idx: usize) -> Model<Option<T>> {
self.0.child(&format!("{}", idx))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> VecModel<T> {
pub fn idx_model(&self, idx: usize) -> OptionModel<T> {
self.child(&format!("{}", idx)).into()
pub fn idx_model(self, idx: usize) -> OptionModel<T> {
self.0.child(&format!("{}", idx)).into()
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> From<Model<Vec<T>>> for VecModel<T> {
@@ -350,8 +408,23 @@ where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
pub fn idx(&self, idx: &<T as Map>::Key) -> Model<Option<<T as Map>::Value>> {
self.child(idx.as_ref())
pub fn idx(self, idx: &<T as Map>::Key) -> Model<Option<<T as Map>::Value>> {
self.0.child(idx.as_ref())
}
}
impl<T> MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Key: Hash + Eq + for<'de> Deserialize<'de>,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
pub async fn keys<Db: DbHandle>(&self, db: &mut Db) -> Result<LinkedHashSet<T::Key>, Error> {
db.lock(self.as_ref(), LockType::Read).await;
let set = db.keys(self.as_ref(), None).await?;
Ok(set
.into_iter()
.map(|s| serde_json::from_value(Value::String(s)))
.collect::<Result<_, _>>()?)
}
}
impl<T> MapModel<T>
@@ -359,8 +432,8 @@ where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de> + HasModel,
{
pub fn idx_model(&self, idx: &<T as Map>::Key) -> OptionModel<<T as Map>::Value> {
self.child(idx.as_ref()).into()
pub fn idx_model(self, idx: &<T as Map>::Key) -> OptionModel<<T as Map>::Value> {
self.0.child(idx.as_ref()).into()
}
}
impl<T> From<Model<T>> for MapModel<T>

View File

@@ -1,5 +1,6 @@
use std::ops::Deref;
use hashlink::LinkedHashSet;
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
@@ -176,6 +177,26 @@ impl DiffPatch {
}
res
}
pub fn keys(&self, mut keys: LinkedHashSet<String>) -> LinkedHashSet<String> {
for op in &(self.0).0 {
match op {
PatchOperation::Add(a) => {
if a.path.len() == 1 {
keys.insert(a.path.get_segment(0).unwrap().to_owned());
}
}
PatchOperation::Replace(_) => (),
PatchOperation::Remove(a) => {
if a.path.len() == 1 {
keys.remove(a.path.get_segment(0).unwrap());
}
}
_ => unreachable!(),
}
}
keys
}
}
impl Default for DiffPatch {
fn default() -> Self {

View File

@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use fd_lock_rs::FdLock;
use hashlink::LinkedHashSet;
use json_ptr::{JsonPointer, SegList};
use lazy_static::lazy_static;
use qutex_2::{Guard, Qutex};
@@ -128,6 +129,15 @@ impl Store {
) -> Result<bool, Error> {
Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null)
}
pub(crate) fn keys<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> Result<LinkedHashSet<String>, Error> {
Ok(match ptr.get(self.get_data()?).unwrap_or(&Value::Null) {
Value::Object(o) => o.keys().cloned().collect(),
_ => LinkedHashSet::new(),
})
}
pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
@@ -213,6 +223,12 @@ impl PatchDb {
) -> Result<bool, Error> {
self.store.read().await.exists(ptr)
}
pub async fn keys<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> Result<LinkedHashSet<String>, Error> {
self.store.read().await.keys(ptr)
}
pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use hashlink::LinkedHashSet;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -99,6 +100,23 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
};
Ok(self.updates.for_path(ptr).exists().unwrap_or(exists))
}
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<LinkedHashSet<String>, Error> {
let keys = {
let store_lock = self.parent.store();
let store = if let Some(store_read_lock) = store_read_lock {
store_read_lock
} else {
store_lock.read().await
};
self.rebase()?;
self.parent.keys(ptr, Some(store)).await?
};
Ok(self.updates.for_path(ptr).keys(keys))
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,