From 96e9be16ec7936688b3b72f43c7935b6c13ea74c Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Thu, 8 Apr 2021 12:16:31 -0600 Subject: [PATCH] feature/macros (#3) * finish macros for structs * create before canonicalize * fix locking behaviour to use single db exclusively * clean up tests * punting on enums for now * reorganize codebase * metadata and formatting --- json-ptr | 2 +- patch-db-macro-internals/rustfmt.toml | 2 + patch-db-macro-internals/src/lib.rs | 128 ++- patch-db-macro/rustfmt.toml | 2 + patch-db/Cargo.toml | 9 +- patch-db/rustfmt.toml | 2 + patch-db/src/lib.rs | 1046 +------------------------ patch-db/src/locker.rs | 211 +++++ patch-db/src/model.rs | 214 +++++ patch-db/src/patch.rs | 166 ++++ patch-db/src/store.rs | 234 ++++++ patch-db/src/test.rs | 73 +- patch-db/src/transaction.rs | 398 ++++++++++ 13 files changed, 1414 insertions(+), 1073 deletions(-) create mode 100644 patch-db-macro-internals/rustfmt.toml create mode 100644 patch-db-macro/rustfmt.toml create mode 100644 patch-db/rustfmt.toml create mode 100644 patch-db/src/locker.rs create mode 100644 patch-db/src/model.rs create mode 100644 patch-db/src/patch.rs create mode 100644 patch-db/src/store.rs create mode 100644 patch-db/src/transaction.rs diff --git a/json-ptr b/json-ptr index b2da19f..1a13341 160000 --- a/json-ptr +++ b/json-ptr @@ -1 +1 @@ -Subproject commit b2da19fe6acd754228ebacc3ade51321240bc74a +Subproject commit 1a1334141b9093b90953c039fcef8a68eec8f1f5 diff --git a/patch-db-macro-internals/rustfmt.toml b/patch-db-macro-internals/rustfmt.toml new file mode 100644 index 0000000..64d94de --- /dev/null +++ b/patch-db-macro-internals/rustfmt.toml @@ -0,0 +1,2 @@ +group_imports = "StdExternalCrate" +imports_granularity = "Module" diff --git a/patch-db-macro-internals/src/lib.rs b/patch-db-macro-internals/src/lib.rs index 94f1e69..3f5042a 100644 --- a/patch-db-macro-internals/src/lib.rs +++ b/patch-db-macro-internals/src/lib.rs @@ -60,7 +60,27 @@ fn build_model_struct( let ident = field.ident.clone().unwrap(); child_fn_name.push(ident.clone()); let ty = &field.ty; - child_model.push(syn::parse2(quote! { patch_db::Model<#ty> }).unwrap()); // TODO: check attr + if let Some(child_model_name) = field + .attrs + .iter() + .filter(|attr| attr.path.is_ident("model")) + .filter_map(|attr| attr.parse_args::().ok()) + .filter(|nv| nv.path.is_ident("name")) + .find_map(|nv| match nv.lit { + Lit::Str(s) => Some(s), + _ => None, + }) + { + let child_model_ty = + Ident::new(&child_model_name.value(), child_model_name.span()); + child_model + .push(syn::parse2(quote! { #child_model_ty }).expect("invalid model name")); + } else if field.attrs.iter().any(|attr| attr.path.is_ident("model")) { + child_model + .push(syn::parse2(quote! { <#ty as patch_db::HasModel>::Model }).unwrap()); + } else { + child_model.push(syn::parse2(quote! { patch_db::Model<#ty> }).unwrap()); + } let serde_rename = field .attrs .iter() @@ -107,9 +127,92 @@ fn build_model_struct( } Fields::Unnamed(f) => { if f.unnamed.len() == 1 { + // newtype wrapper + let field = &f.unnamed[0]; + let ty = &field.ty; + let inner_model: Type = if let Some(child_model_name) = field + .attrs + .iter() + .filter(|attr| attr.path.is_ident("model")) + .filter_map(|attr| Some(attr.parse_args::().unwrap())) + .filter(|nv| nv.path.is_ident("name")) + .find_map(|nv| match nv.lit { + Lit::Str(s) => Some(s), + _ => None, + }) { + let child_model_ty = + Ident::new(&child_model_name.value(), child_model_name.span()); + syn::parse2(quote! { #child_model_ty }).unwrap() + } else if field.attrs.iter().any(|attr| attr.path.is_ident("model")) { + syn::parse2(quote! { <#ty as patch_db::HasModel>::Model }).unwrap() + } else { + syn::parse2(quote! { patch_db::Model::<#ty> }).unwrap() + }; + return quote! { + #[derive(Debug, Clone)] + #model_vis struct #model_name(#inner_model); + impl core::ops::Deref for #model_name { + type Target = #inner_model; + fn deref(&self) -> &Self::Target { + &self.0 + } + } + impl From for #model_name { + fn from(ptr: json_ptr::JsonPointer) -> Self { + #model_name(#inner_model::from(ptr)) + } + } + impl From> for #model_name { + fn from(model: patch_db::Model<#base_name>) -> Self { + #model_name(#inner_model::from(json_ptr::JsonPointer::from(model))) + } + } + impl From<#inner_model> for #model_name { + fn from(model: #inner_model) -> Self { + #model_name(model) + } + } + impl patch_db::HasModel for #base_name { + type Model = #model_name; + } + }; } else if f.unnamed.len() > 1 { + for (i, field) in f.unnamed.iter().enumerate() { + child_fn_name.push(Ident::new( + &format!("idx_{}", i), + proc_macro2::Span::call_site(), + )); + let ty = &field.ty; + if let Some(child_model_name) = field + .attrs + .iter() + .filter(|attr| attr.path.is_ident("model")) + .filter_map(|attr| Some(attr.parse_args::().unwrap())) + .filter(|nv| nv.path.is_ident("name")) + .find_map(|nv| match nv.lit { + Lit::Str(s) => Some(s), + _ => None, + }) + { + let child_model_ty = + Ident::new(&child_model_name.value(), child_model_name.span()); + child_model.push( + syn::parse2(quote! { #child_model_ty }).expect("invalid model name"), + ); + } else if field.attrs.iter().any(|attr| attr.path.is_ident("model")) { + child_model.push( + syn::parse2(quote! { <#ty as patch_db::HasModel>::Model }).unwrap(), + ); + } else { + child_model.push(syn::parse2(quote! { patch_db::Model<#ty> }).unwrap()); + } + // TODO: serde rename for tuple structs? + child_path.push(LitStr::new( + &format!("{}", i), + proc_macro2::Span::call_site(), + )); + } } - todo!() } Fields::Unit => (), } @@ -123,16 +226,22 @@ fn build_model_struct( } } impl #model_name { - pub fn new(ptr: json_ptr::JsonPointer) -> Self { - #model_name(patch_db::Model::new(ptr)) - } - // foreach element, create accessor fn #( pub fn #child_fn_name(&self) -> #child_model { self.0.child(#child_path).into() } )* } + impl From for #model_name { + fn from(ptr: json_ptr::JsonPointer) -> Self { + #model_name(From::from(ptr)) + } + } + impl From> for #model_name { + fn from(model: patch_db::Model<#base_name>) -> Self { + #model_name(model) + } + } impl patch_db::HasModel for #base_name { type Model = #model_name; } @@ -140,5 +249,10 @@ fn build_model_struct( } fn build_model_enum(base: &DeriveInput, ast: &DataEnum, model_name: Option) -> TokenStream { - todo!() + todo!( + "use {:?}, {:?} and {:?} to create a model that can become an enum of models", + base, + ast, + model_name + ) } diff --git a/patch-db-macro/rustfmt.toml b/patch-db-macro/rustfmt.toml new file mode 100644 index 0000000..64d94de --- /dev/null +++ b/patch-db-macro/rustfmt.toml @@ -0,0 +1,2 @@ +group_imports = "StdExternalCrate" +imports_granularity = "Module" diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index 0085fd3..ba92938 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -1,8 +1,14 @@ [package] name = "patch-db" version = "0.1.0" -authors = ["Aiden McClelland "] +authors = ["Aiden McClelland "] edition = "2018" +categories = ["database-implementations"] +keywords = ["json", "json-pointer", "json-patch"] +description = "A database that tracks state updates as RFC 6902 JSON Patches" +repository = "https://github.com/Start9Labs/patch-db" +license = "MIT" +readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -20,6 +26,7 @@ serde_cbor = { path = "../cbor" } thiserror = "1.0.23" tokio = { version = "1.0.1", features = ["sync", "fs", "rt", "io-util", "macros"] } patch-db-macro = { path = "../patch-db-macro" } +lazy_static = "1.4.0" [dev-dependencies] proptest = "1.0.0" diff --git a/patch-db/rustfmt.toml b/patch-db/rustfmt.toml new file mode 100644 index 0000000..64d94de --- /dev/null +++ b/patch-db/rustfmt.toml @@ -0,0 +1,2 @@ +group_imports = "StdExternalCrate" +imports_granularity = "Module" diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index fc7aafa..1cd92ef 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -1,32 +1,27 @@ -use std::collections::HashMap; -use std::fs::OpenOptions; use std::io::Error as IOError; -use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; -use std::path::Path; use std::sync::Arc; -use fd_lock_rs::FdLock; -use futures::future::{BoxFuture, FutureExt}; -use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation}; -use json_ptr::{JsonPointer, SegList}; -use qutex_2::{QrwLock, ReadGuard, WriteGuard}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; use thiserror::Error; -use tokio::{ - fs::File, - sync::{ - broadcast::{error::TryRecvError, Receiver, Sender}, - Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, - }, -}; +use tokio::sync::broadcast::error::TryRecvError; // note: inserting into an array (before another element) without proper locking can result in unexpected behaviour +mod locker; +mod model; +mod patch; +mod store; +mod transaction; + #[cfg(test)] mod test; +pub use locker::{LockType, Locker}; +pub use model::{BoxModel, HasModel, MapModel, Model, ModelData, ModelDataMut, VecModel}; +pub use patch::Revision; +pub use patch_db_macro::HasModel; +pub use store::{PatchDb, Store}; +pub use transaction::{Checkpoint, SubTransaction, Transaction}; + #[derive(Error, Debug)] pub enum Error { #[error("IO Error: {0}")] @@ -48,1016 +43,3 @@ pub enum Error { #[error("Subscriber Error: {0}")] Subscriber(#[from] TryRecvError), } - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct Revision { - pub id: u64, - pub patch: DiffPatch, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct DiffPatch(Patch); -impl DiffPatch { - // safe to assume dictionary style symantics for arrays since patches will always be rebased before being applied - pub fn for_path, V: SegList>(&self, ptr: &JsonPointer) -> DiffPatch { - let DiffPatch(Patch(ops)) = self; - DiffPatch(Patch( - ops.iter() - .filter_map(|op| match op { - PatchOperation::Add(op) => { - if let Some(tail) = op.path.strip_prefix(ptr) { - Some(PatchOperation::Add(AddOperation { - path: tail.to_owned(), - value: op.value.clone(), - })) - } else if let Some(tail) = ptr.strip_prefix(&op.path) { - Some(PatchOperation::Add(AddOperation { - path: Default::default(), - value: tail.get(&op.value).cloned().unwrap_or_default(), - })) - } else { - None - } - } - PatchOperation::Replace(op) => { - if let Some(tail) = op.path.strip_prefix(ptr) { - Some(PatchOperation::Replace(ReplaceOperation { - path: tail.to_owned(), - value: op.value.clone(), - })) - } else if let Some(tail) = ptr.strip_prefix(&op.path) { - Some(PatchOperation::Replace(ReplaceOperation { - path: Default::default(), - value: tail.get(&op.value).cloned().unwrap_or_default(), - })) - } else { - None - } - } - PatchOperation::Remove(op) => { - if ptr.starts_with(&op.path) { - Some(PatchOperation::Replace(ReplaceOperation { - path: Default::default(), - value: Default::default(), - })) - } else if let Some(tail) = op.path.strip_prefix(ptr) { - Some(PatchOperation::Remove(RemoveOperation { - path: tail.to_owned(), - })) - } else { - None - } - } - _ => unreachable!(), - }) - .collect(), - )) - } - pub fn rebase(&mut self, onto: &DiffPatch) { - let DiffPatch(Patch(ops)) = self; - let DiffPatch(Patch(onto_ops)) = onto; - for onto_op in onto_ops { - if let PatchOperation::Add(onto_op) = onto_op { - let arr_path_idx = onto_op.path.len() - 1; - if let Some(onto_idx) = onto_op - .path - .get_segment(arr_path_idx) - .and_then(|seg| seg.parse::().ok()) - { - let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default(); - for op in ops.iter_mut() { - let path = match op { - PatchOperation::Add(op) => &mut op.path, - PatchOperation::Replace(op) => &mut op.path, - PatchOperation::Remove(op) => &mut op.path, - _ => unreachable!(), - }; - if path.starts_with(&prefix) { - if let Some(idx) = path - .get_segment(arr_path_idx) - .and_then(|seg| seg.parse::().ok()) - { - if idx >= onto_idx { - let mut new_path = prefix.clone().to_owned(); - new_path.push_end_idx(idx + 1); - if let Some(tail) = path.slice(arr_path_idx + 1..) { - new_path.append(&tail); - } - *path = new_path; - } - } - } - } - } - } else if let PatchOperation::Remove(onto_op) = onto_op { - let arr_path_idx = onto_op.path.len() - 1; - if let Some(onto_idx) = onto_op - .path - .get_segment(arr_path_idx) - .and_then(|seg| seg.parse::().ok()) - { - let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default(); - for op in ops.iter_mut() { - let path = match op { - PatchOperation::Add(op) => &mut op.path, - PatchOperation::Replace(op) => &mut op.path, - PatchOperation::Remove(op) => &mut op.path, - _ => unreachable!(), - }; - if path.starts_with(&prefix) { - if let Some(idx) = path - .get_segment(arr_path_idx) - .and_then(|seg| seg.parse::().ok()) - { - if idx >= onto_idx { - let mut new_path = prefix.clone().to_owned(); - new_path.push_end_idx(idx - 1); - if let Some(tail) = path.slice(arr_path_idx + 1..) { - new_path.append(&tail); - } - *path = new_path; - } - } - } - } - } - } - } - } -} - -pub struct Store { - file: FdLock, - cache_corrupted: Option>, - data: Value, - revision: u64, -} -impl Store { - pub async fn open + Send + 'static>(path: P) -> Result { - Ok(tokio::task::spawn_blocking(move || { - use std::io::Write; - - let p = path.as_ref(); - let bak = p.with_extension("bak"); - if bak.exists() { - std::fs::rename(&bak, p)?; - } - let mut f = FdLock::lock( - OpenOptions::new() - .create(true) - .read(true) - .append(true) - .open(p)?, - fd_lock_rs::LockType::Exclusive, - true, - )?; - let mut stream = - serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f)); - let mut revision: u64 = stream.next().transpose()?.unwrap_or(0); - let mut stream = stream.change_output_type(); - let mut data = stream.next().transpose()?.unwrap_or_else(|| Value::Null); - let mut stream = stream.change_output_type(); - while let Some(Ok(patch)) = stream.next() { - json_patch::patch(&mut data, &patch)?; - revision += 1; - } - let bak_tmp = bak.with_extension("bak.tmp"); - let mut backup_file = std::fs::File::create(&bak_tmp)?; - serde_cbor::to_writer(&mut backup_file, &revision)?; - serde_cbor::to_writer(&mut backup_file, &data)?; - backup_file.flush()?; - backup_file.sync_all()?; - std::fs::rename(&bak_tmp, &bak)?; - nix::unistd::ftruncate(std::os::unix::io::AsRawFd::as_raw_fd(&*f), 0) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - serde_cbor::to_writer(&mut *f, &revision)?; - serde_cbor::to_writer(&mut *f, &data)?; - f.flush()?; - f.sync_all()?; - std::fs::remove_file(&bak)?; - - Ok::<_, Error>(Store { - file: f.map(File::from_std), - cache_corrupted: None, - data, - revision, - }) - }) - .await??) - } - fn check_cache_corrupted(&self) -> Result<(), Error> { - if let Some(ref err) = self.cache_corrupted { - Err(Error::CacheCorrupted(err.clone())) - } else { - Ok(()) - } - } - fn get_data(&self) -> Result<&Value, Error> { - self.check_cache_corrupted()?; - Ok(&self.data) - } - fn get_data_mut(&mut self) -> Result<&mut Value, Error> { - self.check_cache_corrupted()?; - Ok(&mut self.data) - } - pub async fn close(mut self) -> Result<(), Error> { - use tokio::io::AsyncWriteExt; - - self.file.flush().await?; - self.file.shutdown().await?; - self.file.unlock(true).map_err(|e| e.1)?; - Ok(()) - } - pub fn get Deserialize<'de>, S: AsRef, V: SegList>( - &self, - ptr: &JsonPointer, - ) -> Result { - Ok(serde_json::from_value( - ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(), - )?) - } - pub fn dump(&self) -> Value { - self.get_data().unwrap().clone() - } - pub async fn put, V: SegList>( - &mut self, - ptr: &JsonPointer, - value: &T, - ) -> Result, Error> { - let mut patch = DiffPatch(json_patch::diff( - ptr.get(self.get_data()?).unwrap_or(&Value::Null), - &serde_json::to_value(value)?, - )); - patch.0.prepend(ptr); - self.apply(patch).await - } - pub async fn apply(&mut self, patch: DiffPatch) -> Result, Error> { - use tokio::io::AsyncWriteExt; - - self.check_cache_corrupted()?; - let patch_bin = serde_cbor::to_vec(&patch.0)?; - json_patch::patch(self.get_data_mut()?, &patch.0)?; - - async fn sync_to_disk(file: &mut File, patch_bin: &[u8]) -> Result<(), IOError> { - file.write_all(patch_bin).await?; - file.flush().await?; - file.sync_data().await?; - Ok(()) - } - if let Err(e) = sync_to_disk(&mut *self.file, &patch_bin).await { - let e = Arc::new(e); - self.cache_corrupted = Some(e.clone()); - return Err(Error::CacheCorrupted(e)); - // TODO: try to recover. - } - - let id = self.revision; - self.revision += 1; - let res = Arc::new(Revision { id, patch }); - - Ok(res) - } -} - -#[derive(Clone)] -pub struct PatchDb { - store: Arc>, - subscriber: Arc>>, - locker: Locker, -} -impl PatchDb { - pub async fn open + Send + 'static>(path: P) -> Result { - let (subscriber, _) = tokio::sync::broadcast::channel(16); // TODO: make this unbounded - - Ok(PatchDb { - store: Arc::new(RwLock::new(Store::open(path).await?)), - locker: Locker::new(), - subscriber: Arc::new(subscriber), - }) - } - pub async fn get Deserialize<'de>, S: AsRef, V: SegList>( - &self, - ptr: &JsonPointer, - ) -> Result { - self.store.read().await.get(ptr) - } - pub async fn put, V: SegList>( - &self, - ptr: &JsonPointer, - value: &T, - ) -> Result, Error> { - let mut store = self.store.write().await; - let rev = store.put(ptr, value).await?; - self.subscriber.send(rev.clone()).unwrap_or_default(); - Ok(rev) - } - pub async fn apply( - &self, - patch: DiffPatch, - store_write_lock: Option>, - ) -> Result, Error> { - let mut store = if let Some(store_write_lock) = store_write_lock { - store_write_lock - } else { - self.store.write().await - }; - let rev = store.apply(patch).await?; - self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors - Ok(rev) - } - pub fn subscribe(&self) -> Receiver> { - self.subscriber.subscribe() - } - pub fn begin(&self) -> Transaction { - Transaction { - db: self.clone(), - locks: Vec::new(), - updates: DiffPatch(Patch(Vec::new())), - sub: self.subscribe(), - } - } -} -pub trait Checkpoint: Sized { - fn rebase(&mut self) -> Result<(), Error>; - fn get_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( - &'a mut self, - ptr: &'a JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'a, Result>; - fn put_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( - &'a mut self, - ptr: &'a JsonPointer, - value: &'a Value, - ) -> BoxFuture<'a, Result<(), Error>>; - fn store(&self) -> Arc>; - fn subscribe(&self) -> Receiver>; - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); - fn apply(&mut self, patch: DiffPatch); - fn lock<'a, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &'a mut self, - ptr: &'a JsonPointer, - lock: LockType, - ) -> BoxFuture<'a, ()>; - fn get< - 'a, - T: for<'de> Deserialize<'de> + 'a, - S: AsRef + Clone + Send + Sync + 'a, - V: SegList + Clone + Send + Sync + 'a, - >( - &'a mut self, - ptr: &'a JsonPointer, - lock: LockType, - ) -> BoxFuture<'a, Result>; - fn put< - 'a, - T: Serialize + Send + Sync + 'a, - S: AsRef + Send + Sync + 'a, - V: SegList + Send + Sync + 'a, - >( - &'a mut self, - ptr: &'a JsonPointer, - value: &'a T, - ) -> BoxFuture<'a, Result<(), Error>>; -} - -pub struct Transaction { - db: PatchDb, - locks: Vec<(JsonPointer, LockerGuard)>, - updates: DiffPatch, - sub: Receiver>, -} -impl Transaction { - pub fn rebase(&mut self) -> Result<(), Error> { - while let Some(rev) = match self.sub.try_recv() { - Ok(a) => Some(a), - Err(TryRecvError::Empty) => None, - Err(e) => return Err(e.into()), - } { - self.updates.rebase(&rev.patch); - } - Ok(()) - } - async fn get_value, V: SegList>( - &mut self, - ptr: &JsonPointer, - store_read_lock: Option>, - ) -> Result { - let mut data = { - let store_lock = self.db.store.clone(); - let store = if let Some(store_read_lock) = store_read_lock { - store_read_lock - } else { - store_lock.read().await - }; - self.rebase()?; - ptr.get(store.get_data()?).cloned().unwrap_or_default() - }; - json_patch::patch(&mut data, &self.updates.for_path(ptr).0)?; - Ok(data) - } - async fn put_value, V: SegList>( - &mut self, - ptr: &JsonPointer, - value: &Value, - ) -> Result<(), Error> { - let old = Transaction::get_value(self, ptr, None).await?; - let mut patch = json_patch::diff(&old, &value); - patch.prepend(ptr); - (self.updates.0).0.extend(patch.0); - Ok(()) - } - pub async fn lock + Clone, V: SegList + Clone>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - ) { - match lock { - LockType::None => (), - LockType::Read => { - self.db - .locker - .add_read_lock(ptr, &mut self.locks, &mut []) - .await - } - LockType::Write => { - self.db - .locker - .add_write_lock(ptr, &mut self.locks, &mut []) - .await - } - } - } - pub async fn get Deserialize<'de>, S: AsRef + Clone, V: SegList + Clone>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - ) -> Result { - self.lock(ptr, lock).await; - Ok(serde_json::from_value( - Transaction::get_value(self, ptr, None).await?, - )?) - } - pub async fn put, V: SegList>( - &mut self, - ptr: &JsonPointer, - value: &T, - ) -> Result<(), Error> { - Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await - } - pub async fn commit(mut self) -> Result, Error> { - let store_lock = self.db.store.clone(); - let store = store_lock.write().await; - self.rebase()?; - self.db.apply(self.updates, Some(store)).await - } - pub async fn begin(&mut self) -> Result, Error> { - let store_lock = self.db.store.clone(); - let store = store_lock.read().await; - self.rebase()?; - let sub = self.db.subscribe(); - drop(store); - Ok(SubTransaction { - parent: self, - locks: Vec::new(), - updates: DiffPatch(Patch(Vec::new())), - sub, - }) - } -} -impl<'a> Checkpoint for &'a mut Transaction { - fn rebase(&mut self) -> Result<(), Error> { - Transaction::rebase(self) - } - fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'b, Result> { - Transaction::get_value(self, ptr, store_read_lock).boxed() - } - fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b Value, - ) -> BoxFuture<'b, Result<(), Error>> { - Transaction::put_value(self, ptr, value).boxed() - } - fn store(&self) -> Arc> { - self.db.store.clone() - } - fn subscribe(&self) -> Receiver> { - self.db.subscribe() - } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { - (&self.db.locker, vec![&mut self.locks]) - } - fn apply(&mut self, patch: DiffPatch) { - (self.updates.0).0.extend((patch.0).0) - } - fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, ()> { - Transaction::lock(self, ptr, lock).boxed() - } - fn get< - 'b, - T: for<'de> Deserialize<'de> + 'b, - S: AsRef + Clone + Send + Sync + 'b, - V: SegList + Clone + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, Result> { - Transaction::get(self, ptr, lock).boxed() - } - fn put< - 'b, - T: Serialize + Send + Sync + 'b, - S: AsRef + Send + Sync + 'b, - V: SegList + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b T, - ) -> BoxFuture<'b, Result<(), Error>> { - Transaction::put(self, ptr, value).boxed() - } -} - -pub struct SubTransaction { - parent: Tx, - locks: Vec<(JsonPointer, LockerGuard)>, - updates: DiffPatch, - sub: Receiver>, -} -impl SubTransaction { - pub fn rebase(&mut self) -> Result<(), Error> { - self.parent.rebase()?; - while let Some(rev) = match self.sub.try_recv() { - Ok(a) => Some(a), - Err(TryRecvError::Empty) => None, - Err(e) => return Err(e.into()), - } { - self.updates.rebase(&rev.patch); - } - Ok(()) - } - async fn get_value + Send + Sync, V: SegList + Send + Sync>( - &mut self, - ptr: &JsonPointer, - store_read_lock: Option>, - ) -> Result { - let mut data = { - let store_lock = self.parent.store(); - let store = if let Some(store_read_lock) = store_read_lock { - store_read_lock - } else { - store_lock.read().await - }; - self.rebase()?; - self.parent.get_value(ptr, Some(store)).await? - }; - json_patch::patch(&mut data, &self.updates.for_path(ptr).0)?; - Ok(data) - } - async fn put_value + Send + Sync, V: SegList + Send + Sync>( - &mut self, - ptr: &JsonPointer, - value: &Value, - ) -> Result<(), Error> { - let old = SubTransaction::get_value(self, ptr, None).await?; - let mut patch = json_patch::diff(&old, &value); - patch.prepend(ptr); - (self.updates.0).0.extend(patch.0); - Ok(()) - } - pub async fn lock + Clone, V: SegList + Clone>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - ) { - match lock { - LockType::None => (), - LockType::Read => { - let (locker, mut locks) = self.parent.locker_and_locks(); - locker.add_read_lock(ptr, &mut self.locks, &mut locks).await - } - LockType::Write => { - let (locker, mut locks) = self.parent.locker_and_locks(); - locker - .add_write_lock(ptr, &mut self.locks, &mut locks) - .await - } - } - } - pub async fn get< - T: for<'de> Deserialize<'de>, - S: AsRef + Clone + Send + Sync, - V: SegList + Clone + Send + Sync, - >( - &mut self, - ptr: &JsonPointer, - lock: LockType, - ) -> Result { - self.lock(ptr, lock).await; - Ok(serde_json::from_value( - SubTransaction::get_value(self, ptr, None).await?, - )?) - } - pub async fn put + Send + Sync, V: SegList + Send + Sync>( - &mut self, - ptr: &JsonPointer, - value: &T, - ) -> Result<(), Error> { - SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await - } - pub async fn commit(mut self) -> Result<(), Error> { - let store_lock = self.parent.store(); - let store = store_lock.read().await; - self.rebase()?; - self.parent.apply(self.updates); - drop(store); - Ok(()) - } - pub async fn begin(&mut self) -> Result, Error> { - let store_lock = self.parent.store(); - let store = store_lock.read().await; - self.rebase()?; - let sub = self.parent.subscribe(); - drop(store); - Ok(SubTransaction { - parent: self, - locks: Vec::new(), - updates: DiffPatch(Patch(Vec::new())), - sub, - }) - } -} -impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction { - fn rebase(&mut self) -> Result<(), Error> { - SubTransaction::rebase(self) - } - fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'b, Result> { - SubTransaction::get_value(self, ptr, store_read_lock).boxed() - } - fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b Value, - ) -> BoxFuture<'b, Result<(), Error>> { - SubTransaction::put_value(self, ptr, value).boxed() - } - fn store(&self) -> Arc> { - self.parent.store() - } - fn subscribe(&self) -> Receiver> { - self.parent.subscribe() - } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { - let (locker, mut locks) = self.parent.locker_and_locks(); - locks.push(&mut self.locks); - (locker, locks) - } - fn apply(&mut self, patch: DiffPatch) { - (self.updates.0).0.extend((patch.0).0) - } - fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, ()> { - SubTransaction::lock(self, ptr, lock).boxed() - } - fn get< - 'b, - T: for<'de> Deserialize<'de> + 'b, - S: AsRef + Clone + Send + Sync + 'b, - V: SegList + Clone + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, Result> { - SubTransaction::get(self, ptr, lock).boxed() - } - fn put< - 'b, - T: Serialize + Send + Sync + 'b, - S: AsRef + Send + Sync + 'b, - V: SegList + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b T, - ) -> BoxFuture<'b, Result<(), Error>> { - SubTransaction::put(self, ptr, value).boxed() - } -} - -#[derive(Debug, Clone, Copy)] -pub enum LockType { - None, - Read, - Write, -} - -pub enum LockerGuard { - Empty, - Read(LockerReadGuard), - Write(LockerWriteGuard), -} -impl LockerGuard { - pub fn take(&mut self) -> Self { - std::mem::replace(self, LockerGuard::Empty) - } -} - -#[derive(Debug, Clone)] -pub struct LockerReadGuard(Arc>>>>); -impl LockerReadGuard { - async fn upgrade(&self) -> Option { - let guard = self.0.try_lock().unwrap().take(); - if let Some(g) = guard { - Some(LockerWriteGuard( - Some(ReadGuard::upgrade(g).await.unwrap()), - Some(self.clone()), - )) - } else { - None - } - } -} -impl From>> for LockerReadGuard { - fn from(guard: ReadGuard>) -> Self { - LockerReadGuard(Arc::new(Mutex::new(Some(guard)))) - } -} - -pub struct LockerWriteGuard( - Option>>, - Option, -); -impl From>> for LockerWriteGuard { - fn from(guard: WriteGuard>) -> Self { - LockerWriteGuard(Some(guard), None) - } -} -impl Drop for LockerWriteGuard { - fn drop(&mut self) { - if let (Some(write), Some(read)) = (self.0.take(), self.1.take()) { - *read.0.try_lock().unwrap() = Some(WriteGuard::downgrade(write)); - } - } -} - -#[derive(Clone, Debug)] -pub struct Locker(QrwLock>); -impl Locker { - pub fn new() -> Self { - Locker(QrwLock::new(HashMap::new())) - } - pub async fn lock_read, V: SegList>( - &self, - ptr: &JsonPointer, - ) -> ReadGuard> { - let mut lock = Some(self.0.clone().read().await.unwrap()); - for seg in ptr.iter() { - let new_lock = if let Some(locker) = lock.as_ref().unwrap().get(seg) { - locker.0.clone().read().await.unwrap() - } else { - let mut writer = ReadGuard::upgrade(lock.take().unwrap()).await.unwrap(); - writer.insert(seg.to_owned(), Locker::new()); - let reader = WriteGuard::downgrade(writer); - reader.get(seg).unwrap().0.clone().read().await.unwrap() - }; - lock = Some(new_lock); - } - lock.unwrap() - } - async fn add_read_lock + Clone, V: SegList + Clone>( - &self, - ptr: &JsonPointer, - locks: &mut Vec<(JsonPointer, LockerGuard)>, - extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], - ) { - for lock in extra_locks - .iter() - .flat_map(|a| a.iter()) - .chain(locks.iter()) - { - if ptr.starts_with(&lock.0) { - return; - } - } - locks.push(( - JsonPointer::to_owned(ptr.clone()), - LockerGuard::Read(self.lock_read(ptr).await.into()), - )); - } - pub async fn lock_write, V: SegList>( - &self, - ptr: &JsonPointer, - ) -> WriteGuard> { - let mut lock = self.0.clone().write().await.unwrap(); - for seg in ptr.iter() { - let new_lock = if let Some(locker) = lock.get(seg) { - locker.0.clone().write().await.unwrap() - } else { - lock.insert(seg.to_owned(), Locker::new()); - lock.get(seg).unwrap().0.clone().write().await.unwrap() - }; - lock = new_lock; - } - lock - } - async fn add_write_lock + Clone, V: SegList + Clone>( - &self, - ptr: &JsonPointer, - locks: &mut Vec<(JsonPointer, LockerGuard)>, // tx locks - extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], // tx parent locks - ) { - let mut final_lock = None; - for lock in extra_locks - .iter_mut() - .flat_map(|a| a.iter_mut()) - .chain(locks.iter_mut()) - { - enum Choice { - Return, - Continue, - Break, - } - let choice: Choice; - if let Some(remainder) = ptr.strip_prefix(&lock.0) { - let guard = lock.1.take(); - lock.1 = match guard { - LockerGuard::Read(LockerReadGuard(guard)) if !remainder.is_empty() => { - // read guard already exists at higher level - let mut lock = guard.lock().await; - if let Some(l) = lock.take() { - let mut orig_lock = None; - let mut lock = ReadGuard::upgrade(l).await.unwrap(); - for seg in remainder.iter() { - let new_lock = if let Some(locker) = lock.get(seg) { - locker.0.clone().write().await.unwrap() - } else { - lock.insert(seg.to_owned(), Locker::new()); - lock.get(seg).unwrap().0.clone().write().await.unwrap() - }; - if orig_lock.is_none() { - orig_lock = Some(lock); - } - lock = new_lock; - } - final_lock = Some(LockerGuard::Write(lock.into())); - choice = Choice::Break; - LockerGuard::Read(WriteGuard::downgrade(orig_lock.unwrap()).into()) - } else { - drop(lock); - choice = Choice::Return; - LockerGuard::Read(LockerReadGuard(guard)) - } - } - LockerGuard::Read(l) => { - // read exists, convert to write - if let Some(upgraded) = l.upgrade().await { - final_lock = Some(LockerGuard::Write(upgraded)); - choice = Choice::Break; - } else { - choice = Choice::Continue; - } - LockerGuard::Read(l) - } - LockerGuard::Write(l) => { - choice = Choice::Return; - LockerGuard::Write(l) - } // leave it alone, already sufficiently locked - LockerGuard::Empty => { - unreachable!("LockerGuard found empty"); - } - }; - match choice { - Choice::Return => return, - Choice::Break => break, - Choice::Continue => continue, - } - } - } - locks.push(( - JsonPointer::to_owned(ptr.clone()), - if let Some(lock) = final_lock { - lock - } else { - LockerGuard::Write(self.lock_write(ptr).await.into()) - }, - )); - } -} -impl Default for Locker { - fn default() -> Self { - Locker::new() - } -} - -pub struct ModelData Deserialize<'de>>(T); -impl Deserialize<'de>> Deref for ModelData { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -pub struct ModelDataMut Deserialize<'de>> { - original: Value, - current: T, - ptr: JsonPointer, -} -impl Deserialize<'de>> ModelDataMut { - pub async fn save(self, tx: &mut Tx) -> Result<(), Error> { - let current = serde_json::to_value(&self.current)?; - let mut diff = DiffPatch(json_patch::diff(&self.original, ¤t)); - let target = tx.get_value(&self.ptr, None).await?; - diff.rebase(&DiffPatch(json_patch::diff(&self.original, &target))); - diff.0.prepend(&self.ptr); - tx.apply(diff); - Ok(()) - } -} -impl Deserialize<'de>> Deref for ModelDataMut { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.current - } -} -impl Deserialize<'de>> DerefMut for ModelDataMut { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.current - } -} - -#[derive(Debug)] -pub struct Model Deserialize<'de>> { - ptr: JsonPointer, - phantom: PhantomData, -} -impl Model -where - T: Serialize + for<'de> Deserialize<'de>, -{ - pub fn new(ptr: JsonPointer) -> Self { - Self { - ptr, - phantom: PhantomData, - } - } - - pub async fn lock(&self, tx: &mut Tx, lock: LockType) { - tx.lock(&self.ptr, lock).await - } - - pub async fn get(&self, tx: &mut Tx) -> Result, Error> { - Ok(ModelData(tx.get(&self.ptr, LockType::Read).await?)) - } - - pub async fn get_mut(&self, tx: &mut Tx) -> Result, Error> { - self.lock(tx, LockType::Write).await; - let original = tx.get_value(&self.ptr, None).await?; - let current = serde_json::from_value(original.clone())?; - Ok(ModelDataMut { - original, - current, - ptr: self.ptr.clone(), - }) - } - - pub fn child Deserialize<'de>>(&self, index: &str) -> Model { - let mut ptr = self.ptr.clone(); - ptr.push_end(index); - Model { - ptr, - phantom: PhantomData, - } - } -} -impl std::clone::Clone for Model -where - T: Serialize + for<'de> Deserialize<'de>, -{ - fn clone(&self) -> Self { - Model { - ptr: self.ptr.clone(), - phantom: PhantomData, - } - } -} - -pub trait HasModel { - type Model; -} diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs new file mode 100644 index 0000000..27e53f6 --- /dev/null +++ b/patch-db/src/locker.rs @@ -0,0 +1,211 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use json_ptr::{JsonPointer, SegList}; +use qutex_2::{QrwLock, ReadGuard, WriteGuard}; +use tokio::sync::Mutex; + +#[derive(Debug, Clone, Copy)] +pub enum LockType { + None, + Read, + Write, +} + +pub enum LockerGuard { + Empty, + Read(LockerReadGuard), + Write(LockerWriteGuard), +} +impl LockerGuard { + pub fn take(&mut self) -> Self { + std::mem::replace(self, LockerGuard::Empty) + } +} + +#[derive(Debug, Clone)] +pub struct LockerReadGuard(Arc>>>>); +impl LockerReadGuard { + async fn upgrade(&self) -> Option { + let guard = self.0.try_lock().unwrap().take(); + if let Some(g) = guard { + Some(LockerWriteGuard( + Some(ReadGuard::upgrade(g).await.unwrap()), + Some(self.clone()), + )) + } else { + None + } + } +} +impl From>> for LockerReadGuard { + fn from(guard: ReadGuard>) -> Self { + LockerReadGuard(Arc::new(Mutex::new(Some(guard)))) + } +} + +pub struct LockerWriteGuard( + Option>>, + Option, +); +impl From>> for LockerWriteGuard { + fn from(guard: WriteGuard>) -> Self { + LockerWriteGuard(Some(guard), None) + } +} +impl Drop for LockerWriteGuard { + fn drop(&mut self) { + if let (Some(write), Some(read)) = (self.0.take(), self.1.take()) { + *read.0.try_lock().unwrap() = Some(WriteGuard::downgrade(write)); + } + } +} + +#[derive(Clone, Debug)] +pub struct Locker(QrwLock>); +impl Locker { + pub fn new() -> Self { + Locker(QrwLock::new(HashMap::new())) + } + pub async fn lock_read, V: SegList>( + &self, + ptr: &JsonPointer, + ) -> ReadGuard> { + let mut lock = Some(self.0.clone().read().await.unwrap()); + for seg in ptr.iter() { + let new_lock = if let Some(locker) = lock.as_ref().unwrap().get(seg) { + locker.0.clone().read().await.unwrap() + } else { + let mut writer = ReadGuard::upgrade(lock.take().unwrap()).await.unwrap(); + writer.insert(seg.to_owned(), Locker::new()); + let reader = WriteGuard::downgrade(writer); + reader.get(seg).unwrap().0.clone().read().await.unwrap() + }; + lock = Some(new_lock); + } + lock.unwrap() + } + pub(crate) async fn add_read_lock + Clone, V: SegList + Clone>( + &self, + ptr: &JsonPointer, + locks: &mut Vec<(JsonPointer, LockerGuard)>, + extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], + ) { + for lock in extra_locks + .iter() + .flat_map(|a| a.iter()) + .chain(locks.iter()) + { + if ptr.starts_with(&lock.0) { + return; + } + } + locks.push(( + JsonPointer::to_owned(ptr.clone()), + LockerGuard::Read(self.lock_read(ptr).await.into()), + )); + } + pub async fn lock_write, V: SegList>( + &self, + ptr: &JsonPointer, + ) -> WriteGuard> { + let mut lock = self.0.clone().write().await.unwrap(); + for seg in ptr.iter() { + let new_lock = if let Some(locker) = lock.get(seg) { + locker.0.clone().write().await.unwrap() + } else { + lock.insert(seg.to_owned(), Locker::new()); + lock.get(seg).unwrap().0.clone().write().await.unwrap() + }; + lock = new_lock; + } + lock + } + pub(crate) async fn add_write_lock + Clone, V: SegList + Clone>( + &self, + ptr: &JsonPointer, + locks: &mut Vec<(JsonPointer, LockerGuard)>, // tx locks + extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], // tx parent locks + ) { + let mut final_lock = None; + for lock in extra_locks + .iter_mut() + .flat_map(|a| a.iter_mut()) + .chain(locks.iter_mut()) + { + enum Choice { + Return, + Continue, + Break, + } + let choice: Choice; + if let Some(remainder) = ptr.strip_prefix(&lock.0) { + let guard = lock.1.take(); + lock.1 = match guard { + LockerGuard::Read(LockerReadGuard(guard)) if !remainder.is_empty() => { + // read guard already exists at higher level + let mut lock = guard.lock().await; + if let Some(l) = lock.take() { + let mut orig_lock = None; + let mut lock = ReadGuard::upgrade(l).await.unwrap(); + for seg in remainder.iter() { + let new_lock = if let Some(locker) = lock.get(seg) { + locker.0.clone().write().await.unwrap() + } else { + lock.insert(seg.to_owned(), Locker::new()); + lock.get(seg).unwrap().0.clone().write().await.unwrap() + }; + if orig_lock.is_none() { + orig_lock = Some(lock); + } + lock = new_lock; + } + final_lock = Some(LockerGuard::Write(lock.into())); + choice = Choice::Break; + LockerGuard::Read(WriteGuard::downgrade(orig_lock.unwrap()).into()) + } else { + drop(lock); + choice = Choice::Return; + LockerGuard::Read(LockerReadGuard(guard)) + } + } + LockerGuard::Read(l) => { + // read exists, convert to write + if let Some(upgraded) = l.upgrade().await { + final_lock = Some(LockerGuard::Write(upgraded)); + choice = Choice::Break; + } else { + choice = Choice::Continue; + } + LockerGuard::Read(l) + } + LockerGuard::Write(l) => { + choice = Choice::Return; + LockerGuard::Write(l) + } // leave it alone, already sufficiently locked + LockerGuard::Empty => { + unreachable!("LockerGuard found empty"); + } + }; + match choice { + Choice::Return => return, + Choice::Break => break, + Choice::Continue => continue, + } + } + } + locks.push(( + JsonPointer::to_owned(ptr.clone()), + if let Some(lock) = final_lock { + lock + } else { + LockerGuard::Write(self.lock_write(ptr).await.into()) + }, + )); + } +} +impl Default for Locker { + fn default() -> Self { + Locker::new() + } +} diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs new file mode 100644 index 0000000..a112a85 --- /dev/null +++ b/patch-db/src/model.rs @@ -0,0 +1,214 @@ +use std::collections::{BTreeMap, HashMap}; +use std::hash::Hash; +use std::marker::PhantomData; +use std::ops::{Deref, DerefMut, Index}; + +use json_ptr::JsonPointer; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::locker::LockType; +use crate::transaction::Checkpoint; +use crate::Error; + +pub struct ModelData Deserialize<'de>>(T); +impl Deserialize<'de>> Deref for ModelData { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub struct ModelDataMut Deserialize<'de>> { + original: Value, + current: T, + ptr: JsonPointer, +} +impl Deserialize<'de>> ModelDataMut { + pub async fn save(self, tx: &mut Tx) -> Result<(), Error> { + let current = serde_json::to_value(&self.current)?; + let mut diff = crate::patch::diff(&self.original, ¤t); + let target = tx.get_value(&self.ptr, None).await?; + diff.rebase(&crate::patch::diff(&self.original, &target)); + diff.prepend(&self.ptr); + tx.apply(diff); + Ok(()) + } +} +impl Deserialize<'de>> Deref for ModelDataMut { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.current + } +} +impl Deserialize<'de>> DerefMut for ModelDataMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.current + } +} + +#[derive(Debug)] +pub struct Model Deserialize<'de>> { + ptr: JsonPointer, + phantom: PhantomData, +} +impl Model +where + T: Serialize + for<'de> Deserialize<'de>, +{ + pub async fn lock(&self, tx: &mut Tx, lock: LockType) { + tx.lock(&self.ptr, lock).await + } + + pub async fn get(&self, tx: &mut Tx) -> Result, Error> { + Ok(ModelData(tx.get(&self.ptr, LockType::Read).await?)) + } + + pub async fn get_mut(&self, tx: &mut Tx) -> Result, Error> { + self.lock(tx, LockType::Write).await; + let original = tx.get_value(&self.ptr, None).await?; + let current = serde_json::from_value(original.clone())?; + Ok(ModelDataMut { + original, + current, + ptr: self.ptr.clone(), + }) + } + + pub fn child Deserialize<'de>>(&self, index: &str) -> Model { + let mut ptr = self.ptr.clone(); + ptr.push_end(index); + Model { + ptr, + phantom: PhantomData, + } + } +} +impl From for Model +where + T: Serialize + for<'de> Deserialize<'de>, +{ + fn from(ptr: JsonPointer) -> Self { + Self { + ptr, + phantom: PhantomData, + } + } +} +impl From> for JsonPointer +where + T: Serialize + for<'de> Deserialize<'de>, +{ + fn from(model: Model) -> Self { + model.ptr + } +} +impl std::clone::Clone for Model +where + T: Serialize + for<'de> Deserialize<'de>, +{ + fn clone(&self) -> Self { + Model { + ptr: self.ptr.clone(), + phantom: PhantomData, + } + } +} + +pub trait HasModel { + type Model: From; +} + +pub struct BoxModel Deserialize<'de>>(T::Model); +impl Deserialize<'de>> Deref for BoxModel { + type Target = T::Model; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl Deserialize<'de>> From>> for BoxModel { + fn from(model: Model>) -> Self { + BoxModel(T::Model::from(JsonPointer::from(model))) + } +} +impl Deserialize<'de>> From for BoxModel { + fn from(ptr: JsonPointer) -> Self { + BoxModel(T::Model::from(ptr)) + } +} +impl Deserialize<'de>> HasModel for Box { + type Model = BoxModel; +} + +pub struct VecModel Deserialize<'de>>(Model>); +impl Deserialize<'de>> Deref for VecModel { + type Target = Model>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl Deserialize<'de>> VecModel { + pub fn idx(&self, idx: usize) -> Model> { + self.child(&format!("{}", idx)) + } +} +impl Deserialize<'de>> From>> for VecModel { + fn from(model: Model>) -> Self { + VecModel(From::from(JsonPointer::from(model))) + } +} +impl Deserialize<'de>> From for VecModel { + fn from(ptr: JsonPointer) -> Self { + VecModel(From::from(ptr)) + } +} +impl Deserialize<'de>> HasModel for Vec { + type Model = VecModel; +} + +pub struct MapModel Deserialize<'de> + for<'a> Index<&'a str>>(Model); +impl Deserialize<'de> + for<'a> Index<&'a str>> Deref for MapModel { + type Target = Model; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl MapModel +where + T: Serialize + for<'de> Deserialize<'de> + for<'a> Index<&'a str>, + for<'a, 'de> >::Output: Serialize + Deserialize<'de>, +{ + pub fn idx(&self, idx: &str) -> Model>::Output>> { + self.child(idx) + } +} +impl Deserialize<'de> + for<'a> Index<&'a str>> From>> + for MapModel +{ + fn from(model: Model>) -> Self { + MapModel(From::from(JsonPointer::from(model))) + } +} +impl Deserialize<'de> + for<'a> Index<&'a str>> From + for MapModel +{ + fn from(ptr: JsonPointer) -> Self { + MapModel(From::from(ptr)) + } +} +impl HasModel for HashMap +where + K: Serialize + for<'de> Deserialize<'de> + Hash + Eq, + V: Serialize + for<'de> Deserialize<'de>, + for<'a> HashMap: Index<&'a str>, +{ + type Model = MapModel>; +} +impl HasModel for BTreeMap +where + K: Serialize + for<'de> Deserialize<'de> + Hash + Eq, + V: Serialize + for<'de> Deserialize<'de>, + for<'a> HashMap: Index<&'a str>, +{ + type Model = MapModel>; +} diff --git a/patch-db/src/patch.rs b/patch-db/src/patch.rs new file mode 100644 index 0000000..7f488aa --- /dev/null +++ b/patch-db/src/patch.rs @@ -0,0 +1,166 @@ +use std::ops::Deref; + +use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation}; +use json_ptr::{JsonPointer, SegList}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Revision { + pub id: u64, + pub patch: DiffPatch, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct DiffPatch(Patch); +impl DiffPatch { + pub fn prepend, V: SegList>(&mut self, ptr: &JsonPointer) { + self.0.prepend(ptr) + } + + pub fn append(&mut self, other: DiffPatch) { + (self.0).0.extend((other.0).0) + } + + // safe to assume dictionary style symantics for arrays since patches will always be rebased before being applied + pub fn for_path, V: SegList>(&self, ptr: &JsonPointer) -> DiffPatch { + let DiffPatch(Patch(ops)) = self; + DiffPatch(Patch( + ops.iter() + .filter_map(|op| match op { + PatchOperation::Add(op) => { + if let Some(tail) = op.path.strip_prefix(ptr) { + Some(PatchOperation::Add(AddOperation { + path: tail.to_owned(), + value: op.value.clone(), + })) + } else if let Some(tail) = ptr.strip_prefix(&op.path) { + Some(PatchOperation::Add(AddOperation { + path: Default::default(), + value: tail.get(&op.value).cloned().unwrap_or_default(), + })) + } else { + None + } + } + PatchOperation::Replace(op) => { + if let Some(tail) = op.path.strip_prefix(ptr) { + Some(PatchOperation::Replace(ReplaceOperation { + path: tail.to_owned(), + value: op.value.clone(), + })) + } else if let Some(tail) = ptr.strip_prefix(&op.path) { + Some(PatchOperation::Replace(ReplaceOperation { + path: Default::default(), + value: tail.get(&op.value).cloned().unwrap_or_default(), + })) + } else { + None + } + } + PatchOperation::Remove(op) => { + if ptr.starts_with(&op.path) { + Some(PatchOperation::Replace(ReplaceOperation { + path: Default::default(), + value: Default::default(), + })) + } else if let Some(tail) = op.path.strip_prefix(ptr) { + Some(PatchOperation::Remove(RemoveOperation { + path: tail.to_owned(), + })) + } else { + None + } + } + _ => unreachable!(), + }) + .collect(), + )) + } + pub fn rebase(&mut self, onto: &DiffPatch) { + let DiffPatch(Patch(ops)) = self; + let DiffPatch(Patch(onto_ops)) = onto; + for onto_op in onto_ops { + if let PatchOperation::Add(onto_op) = onto_op { + let arr_path_idx = onto_op.path.len() - 1; + if let Some(onto_idx) = onto_op + .path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default(); + for op in ops.iter_mut() { + let path = match op { + PatchOperation::Add(op) => &mut op.path, + PatchOperation::Replace(op) => &mut op.path, + PatchOperation::Remove(op) => &mut op.path, + _ => unreachable!(), + }; + if path.starts_with(&prefix) { + if let Some(idx) = path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + if idx >= onto_idx { + let mut new_path = prefix.clone().to_owned(); + new_path.push_end_idx(idx + 1); + if let Some(tail) = path.slice(arr_path_idx + 1..) { + new_path.append(&tail); + } + *path = new_path; + } + } + } + } + } + } else if let PatchOperation::Remove(onto_op) = onto_op { + let arr_path_idx = onto_op.path.len() - 1; + if let Some(onto_idx) = onto_op + .path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default(); + for op in ops.iter_mut() { + let path = match op { + PatchOperation::Add(op) => &mut op.path, + PatchOperation::Replace(op) => &mut op.path, + PatchOperation::Remove(op) => &mut op.path, + _ => unreachable!(), + }; + if path.starts_with(&prefix) { + if let Some(idx) = path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + if idx >= onto_idx { + let mut new_path = prefix.clone().to_owned(); + new_path.push_end_idx(idx - 1); + if let Some(tail) = path.slice(arr_path_idx + 1..) { + new_path.append(&tail); + } + *path = new_path; + } + } + } + } + } + } + } + } +} +impl Default for DiffPatch { + fn default() -> Self { + DiffPatch(Patch(Vec::default())) + } +} +impl Deref for DiffPatch { + type Target = Patch; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub fn diff(left: &Value, right: &Value) -> DiffPatch { + DiffPatch(json_patch::diff(left, right)) +} diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs new file mode 100644 index 0000000..cb6199e --- /dev/null +++ b/patch-db/src/store.rs @@ -0,0 +1,234 @@ +use std::collections::HashMap; +use std::fs::OpenOptions; +use std::io::Error as IOError; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use fd_lock_rs::FdLock; +use json_ptr::{JsonPointer, SegList}; +use lazy_static::lazy_static; +use qutex_2::{Guard, Qutex}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::fs::File; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::{Mutex, RwLock, RwLockWriteGuard}; + +use crate::locker::Locker; +use crate::patch::{diff, DiffPatch, Revision}; +use crate::transaction::Transaction; +use crate::Error; + +lazy_static! { + static ref OPEN_STORES: Mutex>> = Mutex::new(HashMap::new()); +} + +pub struct Store { + file: FdLock, + _lock: Guard<()>, + cache_corrupted: Option>, + data: Value, + revision: u64, +} +impl Store { + pub async fn open>(path: P) -> Result { + let (_lock, path) = { + if !path.as_ref().exists() { + tokio::fs::File::create(path.as_ref()).await?; + } + let path = tokio::fs::canonicalize(path).await?; + let mut lock = OPEN_STORES.lock().await; + ( + if let Some(open) = lock.get(&path) { + open.clone().lock().await.unwrap() + } else { + let tex = Qutex::new(()); + lock.insert(path.clone(), tex.clone()); + tex.lock().await.unwrap() + }, + path, + ) + }; + Ok(tokio::task::spawn_blocking(move || { + use std::io::Write; + + let bak = path.with_extension("bak"); + if bak.exists() { + std::fs::rename(&bak, &path)?; + } + let mut f = FdLock::lock( + OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(&path)?, + fd_lock_rs::LockType::Exclusive, + true, + )?; + let mut stream = + serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f)); + let mut revision: u64 = stream.next().transpose()?.unwrap_or(0); + let mut stream = stream.change_output_type(); + let mut data = stream.next().transpose()?.unwrap_or_else(|| Value::Null); + let mut stream = stream.change_output_type(); + while let Some(Ok(patch)) = stream.next() { + json_patch::patch(&mut data, &patch)?; + revision += 1; + } + let bak_tmp = bak.with_extension("bak.tmp"); + let mut backup_file = std::fs::File::create(&bak_tmp)?; + serde_cbor::to_writer(&mut backup_file, &revision)?; + serde_cbor::to_writer(&mut backup_file, &data)?; + backup_file.flush()?; + backup_file.sync_all()?; + std::fs::rename(&bak_tmp, &bak)?; + nix::unistd::ftruncate(std::os::unix::io::AsRawFd::as_raw_fd(&*f), 0) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + serde_cbor::to_writer(&mut *f, &revision)?; + serde_cbor::to_writer(&mut *f, &data)?; + f.flush()?; + f.sync_all()?; + std::fs::remove_file(&bak)?; + + Ok::<_, Error>(Store { + file: f.map(File::from_std), + _lock, + cache_corrupted: None, + data, + revision, + }) + }) + .await??) + } + fn check_cache_corrupted(&self) -> Result<(), Error> { + if let Some(ref err) = self.cache_corrupted { + Err(Error::CacheCorrupted(err.clone())) + } else { + Ok(()) + } + } + pub(crate) fn get_data(&self) -> Result<&Value, Error> { + self.check_cache_corrupted()?; + Ok(&self.data) + } + fn get_data_mut(&mut self) -> Result<&mut Value, Error> { + self.check_cache_corrupted()?; + Ok(&mut self.data) + } + pub async fn close(mut self) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + + self.file.flush().await?; + self.file.shutdown().await?; + self.file.unlock(true).map_err(|e| e.1)?; + Ok(()) + } + pub fn get Deserialize<'de>, S: AsRef, V: SegList>( + &self, + ptr: &JsonPointer, + ) -> Result { + Ok(serde_json::from_value( + ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(), + )?) + } + pub fn dump(&self) -> Value { + self.get_data().unwrap().clone() + } + pub async fn put, V: SegList>( + &mut self, + ptr: &JsonPointer, + value: &T, + ) -> Result, Error> { + let mut patch = diff( + ptr.get(self.get_data()?).unwrap_or(&Value::Null), + &serde_json::to_value(value)?, + ); + patch.prepend(ptr); + self.apply(patch).await + } + pub async fn apply(&mut self, patch: DiffPatch) -> Result, Error> { + use tokio::io::AsyncWriteExt; + + self.check_cache_corrupted()?; + let patch_bin = serde_cbor::to_vec(&*patch)?; + json_patch::patch(self.get_data_mut()?, &*patch)?; + + async fn sync_to_disk(file: &mut File, patch_bin: &[u8]) -> Result<(), IOError> { + file.write_all(patch_bin).await?; + file.flush().await?; + file.sync_data().await?; + Ok(()) + } + if let Err(e) = sync_to_disk(&mut *self.file, &patch_bin).await { + let e = Arc::new(e); + self.cache_corrupted = Some(e.clone()); + return Err(Error::CacheCorrupted(e)); + // TODO: try to recover. + } + + let id = self.revision; + self.revision += 1; + let res = Arc::new(Revision { id, patch }); + + Ok(res) + } +} + +#[derive(Clone)] +pub struct PatchDb { + pub(crate) store: Arc>, + subscriber: Arc>>, + pub(crate) locker: Locker, +} +impl PatchDb { + pub async fn open>(path: P) -> Result { + let (subscriber, _) = tokio::sync::broadcast::channel(16); // TODO: make this unbounded + + Ok(PatchDb { + store: Arc::new(RwLock::new(Store::open(path).await?)), + locker: Locker::new(), + subscriber: Arc::new(subscriber), + }) + } + pub async fn get Deserialize<'de>, S: AsRef, V: SegList>( + &self, + ptr: &JsonPointer, + ) -> Result { + self.store.read().await.get(ptr) + } + pub async fn put, V: SegList>( + &self, + ptr: &JsonPointer, + value: &T, + ) -> Result, Error> { + let mut store = self.store.write().await; + let rev = store.put(ptr, value).await?; + self.subscriber.send(rev.clone()).unwrap_or_default(); + Ok(rev) + } + pub async fn apply( + &self, + patch: DiffPatch, + store_write_lock: Option>, + ) -> Result, Error> { + let mut store = if let Some(store_write_lock) = store_write_lock { + store_write_lock + } else { + self.store.write().await + }; + let rev = store.apply(patch).await?; + self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors + Ok(rev) + } + pub fn subscribe(&self) -> Receiver> { + self.subscriber.subscribe() + } + pub fn begin(&self) -> Transaction { + Transaction { + db: self.clone(), + locks: Vec::new(), + updates: DiffPatch::default(), + sub: self.subscribe(), + } + } +} diff --git a/patch-db/src/test.rs b/patch-db/src/test.rs index 6e3c147..a467ca4 100644 --- a/patch-db/src/test.rs +++ b/patch-db/src/test.rs @@ -1,8 +1,14 @@ -use super::*; -use crate as patch_db; +use std::future::Future; +use std::sync::Arc; + +use json_ptr::JsonPointer; +use patch_db::{HasModel, PatchDb, Revision}; use proptest::prelude::*; -use tokio::{fs, runtime::Builder}; -use patch_db_macro::HasModel; +use serde_json::Value; +use tokio::fs; +use tokio::runtime::Builder; + +use crate as patch_db; async fn init_db(db_name: String) -> PatchDb { cleanup_db(&db_name).await; @@ -14,78 +20,81 @@ async fn init_db(db_name: String) -> PatchDb { b: Child { a: "test2".to_string(), b: 1, + c: NewType(None), }, }, ) - .await.unwrap(); + .await + .unwrap(); db } -async fn cleanup_db(db_name: &String) { +async fn cleanup_db(db_name: &str) { fs::remove_file(db_name).await.ok(); } async fn put_string_into_root(db: PatchDb, s: String) -> Arc { - println!("trying string: {}", s); - db.put( - &JsonPointer::<&'static str>::default(), - &s - ) - .await.unwrap() + db.put(&JsonPointer::<&'static str>::default(), &s) + .await + .unwrap() } - #[tokio::test] async fn basic() { - let db = init_db("basic.test.db".to_string()).await; + let db = init_db("test.db".to_string()).await; let ptr: JsonPointer = "/b/b".parse().unwrap(); let mut get_res: Value = db.get(&ptr).await.unwrap(); assert_eq!(get_res, 1); - db.put(&ptr, &"hello").await.unwrap(); + db.put(&ptr, "hello").await.unwrap(); get_res = db.get(&ptr).await.unwrap(); assert_eq!(get_res, "hello"); - cleanup_db(&"basic.test.db".to_string()).await; + cleanup_db("test.db").await; } #[tokio::test] async fn transaction() { - let db = init_db("transaction.test.db".to_string()).await; + let db = init_db("test.db".to_string()).await; let mut tx = db.begin(); let ptr: JsonPointer = "/b/b".parse().unwrap(); tx.put(&ptr, &(2 as usize)).await.unwrap(); tx.put(&ptr, &(1 as usize)).await.unwrap(); let _res = tx.commit().await.unwrap(); println!("res = {:?}", _res); - cleanup_db(&"transaction.test.db".to_string()).await; + cleanup_db("test.db").await; +} + +fn run_future, Fut: Future>(name: S, fut: Fut) { + Builder::new_multi_thread() + .thread_name(name) + .build() + .unwrap() + .block_on(fut) } proptest! { #[test] fn doesnt_crash(s in "\\PC*") { - // build runtime - let runtime = Builder::new_multi_thread() - .worker_threads(4) - .thread_name("test-doesnt-crash") - .thread_stack_size(3 * 1024 * 1024) - .build() - .unwrap(); - let db = runtime.block_on(init_db("doesnt_crash.test.db".to_string())); - let put_future = put_string_into_root(db, s); - runtime.block_on(put_future); - runtime.block_on(cleanup_db(&"doesnt_crash.test.db".to_string())); - + run_future("test-doesnt-crash", async { + let db = init_db("test.db".to_string()).await; + put_string_into_root(db, s).await; + cleanup_db(&"test.db".to_string()).await; + }); } } #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)] pub struct Sample { a: String, - #[model(name = ChildModel)] + #[model] b: Child, } -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)] pub struct Child { a: String, b: usize, + c: NewType, } + +#[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)] +pub struct NewType(Option>); diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs new file mode 100644 index 0000000..fef9f0b --- /dev/null +++ b/patch-db/src/transaction.rs @@ -0,0 +1,398 @@ +use std::sync::Arc; + +use futures::future::{BoxFuture, FutureExt}; +use json_ptr::{JsonPointer, SegList}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::sync::broadcast::error::TryRecvError; +use tokio::sync::broadcast::Receiver; +use tokio::sync::{RwLock, RwLockReadGuard}; + +use crate::locker::{LockType, Locker, LockerGuard}; +use crate::patch::{DiffPatch, Revision}; +use crate::store::{PatchDb, Store}; +use crate::Error; + +pub trait Checkpoint: Sized { + fn rebase(&mut self) -> Result<(), Error>; + fn get_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( + &'a mut self, + ptr: &'a JsonPointer, + store_read_lock: Option>, + ) -> BoxFuture<'a, Result>; + fn put_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( + &'a mut self, + ptr: &'a JsonPointer, + value: &'a Value, + ) -> BoxFuture<'a, Result<(), Error>>; + fn store(&self) -> Arc>; + fn subscribe(&self) -> Receiver>; + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); + fn apply(&mut self, patch: DiffPatch); + fn lock<'a, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &'a mut self, + ptr: &'a JsonPointer, + lock: LockType, + ) -> BoxFuture<'a, ()>; + fn get< + 'a, + T: for<'de> Deserialize<'de> + 'a, + S: AsRef + Clone + Send + Sync + 'a, + V: SegList + Clone + Send + Sync + 'a, + >( + &'a mut self, + ptr: &'a JsonPointer, + lock: LockType, + ) -> BoxFuture<'a, Result>; + fn put< + 'a, + T: Serialize + Send + Sync + 'a, + S: AsRef + Send + Sync + 'a, + V: SegList + Send + Sync + 'a, + >( + &'a mut self, + ptr: &'a JsonPointer, + value: &'a T, + ) -> BoxFuture<'a, Result<(), Error>>; +} + +pub struct Transaction { + pub(crate) db: PatchDb, + pub(crate) locks: Vec<(JsonPointer, LockerGuard)>, + pub(crate) updates: DiffPatch, + pub(crate) sub: Receiver>, +} +impl Transaction { + pub fn rebase(&mut self) -> Result<(), Error> { + while let Some(rev) = match self.sub.try_recv() { + Ok(a) => Some(a), + Err(TryRecvError::Empty) => None, + Err(e) => return Err(e.into()), + } { + self.updates.rebase(&rev.patch); + } + Ok(()) + } + async fn get_value, V: SegList>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result { + let mut data = { + let store_lock = self.db.store.clone(); + let store = if let Some(store_read_lock) = store_read_lock { + store_read_lock + } else { + store_lock.read().await + }; + self.rebase()?; + ptr.get(store.get_data()?).cloned().unwrap_or_default() + }; + json_patch::patch(&mut data, &*self.updates.for_path(ptr))?; + Ok(data) + } + async fn put_value, V: SegList>( + &mut self, + ptr: &JsonPointer, + value: &Value, + ) -> Result<(), Error> { + let old = Transaction::get_value(self, ptr, None).await?; + let mut patch = crate::patch::diff(&old, &value); + patch.prepend(ptr); + self.updates.append(patch); + Ok(()) + } + pub async fn lock + Clone, V: SegList + Clone>( + &mut self, + ptr: &JsonPointer, + lock: LockType, + ) { + match lock { + LockType::None => (), + LockType::Read => { + self.db + .locker + .add_read_lock(ptr, &mut self.locks, &mut []) + .await + } + LockType::Write => { + self.db + .locker + .add_write_lock(ptr, &mut self.locks, &mut []) + .await + } + } + } + pub async fn get Deserialize<'de>, S: AsRef + Clone, V: SegList + Clone>( + &mut self, + ptr: &JsonPointer, + lock: LockType, + ) -> Result { + self.lock(ptr, lock).await; + Ok(serde_json::from_value( + Transaction::get_value(self, ptr, None).await?, + )?) + } + pub async fn put, V: SegList>( + &mut self, + ptr: &JsonPointer, + value: &T, + ) -> Result<(), Error> { + Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await + } + pub async fn commit(mut self) -> Result, Error> { + let store_lock = self.db.store.clone(); + let store = store_lock.write().await; + self.rebase()?; + self.db.apply(self.updates, Some(store)).await + } + pub async fn begin(&mut self) -> Result, Error> { + let store_lock = self.db.store.clone(); + let store = store_lock.read().await; + self.rebase()?; + let sub = self.db.subscribe(); + drop(store); + Ok(SubTransaction { + parent: self, + locks: Vec::new(), + updates: DiffPatch::default(), + sub, + }) + } +} +impl<'a> Checkpoint for &'a mut Transaction { + fn rebase(&mut self) -> Result<(), Error> { + Transaction::rebase(self) + } + fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( + &'b mut self, + ptr: &'b JsonPointer, + store_read_lock: Option>, + ) -> BoxFuture<'b, Result> { + Transaction::get_value(self, ptr, store_read_lock).boxed() + } + fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( + &'b mut self, + ptr: &'b JsonPointer, + value: &'b Value, + ) -> BoxFuture<'b, Result<(), Error>> { + Transaction::put_value(self, ptr, value).boxed() + } + fn store(&self) -> Arc> { + self.db.store.clone() + } + fn subscribe(&self) -> Receiver> { + self.db.subscribe() + } + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + (&self.db.locker, vec![&mut self.locks]) + } + fn apply(&mut self, patch: DiffPatch) { + self.updates.append(patch) + } + fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &'b mut self, + ptr: &'b JsonPointer, + lock: LockType, + ) -> BoxFuture<'b, ()> { + Transaction::lock(self, ptr, lock).boxed() + } + fn get< + 'b, + T: for<'de> Deserialize<'de> + 'b, + S: AsRef + Clone + Send + Sync + 'b, + V: SegList + Clone + Send + Sync + 'b, + >( + &'b mut self, + ptr: &'b JsonPointer, + lock: LockType, + ) -> BoxFuture<'b, Result> { + Transaction::get(self, ptr, lock).boxed() + } + fn put< + 'b, + T: Serialize + Send + Sync + 'b, + S: AsRef + Send + Sync + 'b, + V: SegList + Send + Sync + 'b, + >( + &'b mut self, + ptr: &'b JsonPointer, + value: &'b T, + ) -> BoxFuture<'b, Result<(), Error>> { + Transaction::put(self, ptr, value).boxed() + } +} + +pub struct SubTransaction { + parent: Tx, + locks: Vec<(JsonPointer, LockerGuard)>, + updates: DiffPatch, + sub: Receiver>, +} +impl SubTransaction { + pub fn rebase(&mut self) -> Result<(), Error> { + self.parent.rebase()?; + while let Some(rev) = match self.sub.try_recv() { + Ok(a) => Some(a), + Err(TryRecvError::Empty) => None, + Err(e) => return Err(e.into()), + } { + self.updates.rebase(&rev.patch); + } + Ok(()) + } + async fn get_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result { + let mut data = { + let store_lock = self.parent.store(); + let store = if let Some(store_read_lock) = store_read_lock { + store_read_lock + } else { + store_lock.read().await + }; + self.rebase()?; + self.parent.get_value(ptr, Some(store)).await? + }; + json_patch::patch(&mut data, &*self.updates.for_path(ptr))?; + Ok(data) + } + async fn put_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + value: &Value, + ) -> Result<(), Error> { + let old = SubTransaction::get_value(self, ptr, None).await?; + let mut patch = crate::patch::diff(&old, &value); + patch.prepend(ptr); + self.updates.append(patch); + Ok(()) + } + pub async fn lock + Clone, V: SegList + Clone>( + &mut self, + ptr: &JsonPointer, + lock: LockType, + ) { + match lock { + LockType::None => (), + LockType::Read => { + let (locker, mut locks) = self.parent.locker_and_locks(); + locker.add_read_lock(ptr, &mut self.locks, &mut locks).await + } + LockType::Write => { + let (locker, mut locks) = self.parent.locker_and_locks(); + locker + .add_write_lock(ptr, &mut self.locks, &mut locks) + .await + } + } + } + pub async fn get< + T: for<'de> Deserialize<'de>, + S: AsRef + Clone + Send + Sync, + V: SegList + Clone + Send + Sync, + >( + &mut self, + ptr: &JsonPointer, + lock: LockType, + ) -> Result { + self.lock(ptr, lock).await; + Ok(serde_json::from_value( + SubTransaction::get_value(self, ptr, None).await?, + )?) + } + pub async fn put + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + value: &T, + ) -> Result<(), Error> { + SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await + } + pub async fn commit(mut self) -> Result<(), Error> { + let store_lock = self.parent.store(); + let store = store_lock.read().await; + self.rebase()?; + self.parent.apply(self.updates); + drop(store); + Ok(()) + } + pub async fn begin(&mut self) -> Result, Error> { + let store_lock = self.parent.store(); + let store = store_lock.read().await; + self.rebase()?; + let sub = self.parent.subscribe(); + drop(store); + Ok(SubTransaction { + parent: self, + locks: Vec::new(), + updates: DiffPatch::default(), + sub, + }) + } +} +impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction { + fn rebase(&mut self) -> Result<(), Error> { + SubTransaction::rebase(self) + } + fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( + &'b mut self, + ptr: &'b JsonPointer, + store_read_lock: Option>, + ) -> BoxFuture<'b, Result> { + SubTransaction::get_value(self, ptr, store_read_lock).boxed() + } + fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( + &'b mut self, + ptr: &'b JsonPointer, + value: &'b Value, + ) -> BoxFuture<'b, Result<(), Error>> { + SubTransaction::put_value(self, ptr, value).boxed() + } + fn store(&self) -> Arc> { + self.parent.store() + } + fn subscribe(&self) -> Receiver> { + self.parent.subscribe() + } + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + let (locker, mut locks) = self.parent.locker_and_locks(); + locks.push(&mut self.locks); + (locker, locks) + } + fn apply(&mut self, patch: DiffPatch) { + self.updates.append(patch) + } + fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &'b mut self, + ptr: &'b JsonPointer, + lock: LockType, + ) -> BoxFuture<'b, ()> { + SubTransaction::lock(self, ptr, lock).boxed() + } + fn get< + 'b, + T: for<'de> Deserialize<'de> + 'b, + S: AsRef + Clone + Send + Sync + 'b, + V: SegList + Clone + Send + Sync + 'b, + >( + &'b mut self, + ptr: &'b JsonPointer, + lock: LockType, + ) -> BoxFuture<'b, Result> { + SubTransaction::get(self, ptr, lock).boxed() + } + fn put< + 'b, + T: Serialize + Send + Sync + 'b, + S: AsRef + Send + Sync + 'b, + V: SegList + Send + Sync + 'b, + >( + &'b mut self, + ptr: &'b JsonPointer, + value: &'b T, + ) -> BoxFuture<'b, Result<(), Error>> { + SubTransaction::put(self, ptr, value).boxed() + } +}