From 5a83aa5926af59dfc840bd40321c7cbc831903f7 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Wed, 17 Mar 2021 19:49:06 -0600 Subject: [PATCH] fix transactions --- patch-db-derive-internals/src/lib.rs | 10 +- patch-db/Cargo.toml | 3 + patch-db/src/lib.rs | 383 +++++++++++++++++++++------ patch-db/src/test.rs | 22 ++ 4 files changed, 328 insertions(+), 90 deletions(-) diff --git a/patch-db-derive-internals/src/lib.rs b/patch-db-derive-internals/src/lib.rs index 638038f..061d401 100644 --- a/patch-db-derive-internals/src/lib.rs +++ b/patch-db-derive-internals/src/lib.rs @@ -17,20 +17,22 @@ fn build_model_struct(input: &syn::DeriveInput, ast: &syn::DataStruct) -> TokenS let base_name = &input.ident; let model_vis = &input.vis; quote! { - #model_vis struct #model_name { + #model_vis struct #model_name { data: Option>, + lock_type: patch_db::LockType, ptr: json_ptr::JsonPointer, tx: Tx, } impl #model_name { pub fn get(&mut self, lock: patch_db::LockType) -> Result<&#base_name, patch_db::Error> { if let Some(data) = self.data.as_ref() { - match lock { - patch_db::LockType::None => Ok(data), + match (self.lock_type, lock) { + (patch_db::LockType::None, patch_db::LockType::Read) => Ok(data), } } else { - self.tx.get(&self.ptr, lock) + self.data = Some(Box::new(self.tx.get(&self.ptr, lock).await?)); + Ok(self.data.as_ref().unwrap()) } } } diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index 36dda6b..dbcbd4f 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -19,3 +19,6 @@ serde_json = "1.0.61" serde_cbor = { git = "https://github.com/dr-bonez/cbor.git" } thiserror = "1.0.23" tokio = { version = "1.0.1", features = ["sync", "fs", "rt", "io-util", "macros"] } + +[dev-dependencies] +serde = { version = "1.0.118", features = ["rc", "derive"] } diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index 502ae4d..a09dced 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use fd_lock_rs::FdLock; use futures::future::{BoxFuture, FutureExt}; -use json_patch::{Patch, PatchOperation}; +use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation}; use json_ptr::{JsonPointer, SegList}; use qutex_2::{QrwLock, ReadGuard, WriteGuard}; use serde::{Deserialize, Serialize}; @@ -16,11 +16,13 @@ use thiserror::Error; use tokio::{ fs::File, sync::{ - broadcast::{Receiver, Sender}, - Mutex, RwLock, + broadcast::{error::TryRecvError, Receiver, Sender}, + Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, }, }; +// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour + #[cfg(test)] mod test; @@ -42,18 +44,146 @@ pub enum Error { FDLock(#[from] fd_lock_rs::Error), #[error("Database Cache Corrupted: {0}")] CacheCorrupted(Arc), - #[error("Mutex Error (should be unreachable): {0}")] - MutexError(#[from] tokio::sync::TryLockError), + #[error("Subscriber Error: {0}")] + Subscriber(#[from] TryRecvError), } #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Revision { pub id: u64, - pub patch: Patch, + pub patch: DiffPatch, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct DiffPatch(Patch); +impl DiffPatch { + // safe to assume dictionary style symantics for arrays since patches will always be rebased before being applied + pub fn for_path, V: SegList>(&self, ptr: &JsonPointer) -> DiffPatch { + let DiffPatch(Patch(ops)) = self; + DiffPatch(Patch( + ops.iter() + .filter_map(|op| match op { + PatchOperation::Add(op) => { + if let Some(tail) = op.path.strip_prefix(ptr) { + Some(PatchOperation::Add(AddOperation { + path: tail.to_owned(), + value: op.value.clone(), + })) + } else if let Some(tail) = ptr.strip_prefix(&op.path) { + Some(PatchOperation::Add(AddOperation { + path: Default::default(), + value: tail.get(&op.value).cloned().unwrap_or_default(), + })) + } else { + None + } + } + PatchOperation::Replace(op) => { + if let Some(tail) = op.path.strip_prefix(ptr) { + Some(PatchOperation::Replace(ReplaceOperation { + path: tail.to_owned(), + value: op.value.clone(), + })) + } else if let Some(tail) = ptr.strip_prefix(&op.path) { + Some(PatchOperation::Replace(ReplaceOperation { + path: Default::default(), + value: tail.get(&op.value).cloned().unwrap_or_default(), + })) + } else { + None + } + } + PatchOperation::Remove(op) => { + if ptr.starts_with(&op.path) { + Some(PatchOperation::Replace(ReplaceOperation { + path: Default::default(), + value: Default::default(), + })) + } else if let Some(tail) = op.path.strip_prefix(ptr) { + Some(PatchOperation::Remove(RemoveOperation { + path: tail.to_owned(), + })) + } else { + None + } + } + _ => unreachable!(), + }) + .collect(), + )) + } + pub fn rebase(&mut self, onto: &DiffPatch) { + let DiffPatch(Patch(ops)) = self; + let DiffPatch(Patch(onto_ops)) = onto; + for onto_op in onto_ops { + if let PatchOperation::Add(onto_op) = onto_op { + let arr_path_idx = onto_op.path.len() - 1; + if let Some(onto_idx) = onto_op + .path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default(); + for op in ops.iter_mut() { + let path = match op { + PatchOperation::Add(op) => &mut op.path, + PatchOperation::Replace(op) => &mut op.path, + PatchOperation::Remove(op) => &mut op.path, + _ => unreachable!(), + }; + if path.starts_with(&prefix) { + if let Some(idx) = path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + if idx >= onto_idx { + let mut new_path = prefix.clone().to_owned(); + new_path.push_end_idx(idx + 1); + if let Some(tail) = path.slice(arr_path_idx + 1..) { + new_path.append(&tail); + } + *path = new_path; + } + } + } + } + } + } else if let PatchOperation::Remove(onto_op) = onto_op { + let arr_path_idx = onto_op.path.len() - 1; + if let Some(onto_idx) = onto_op + .path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default(); + for op in ops.iter_mut() { + let path = match op { + PatchOperation::Add(op) => &mut op.path, + PatchOperation::Replace(op) => &mut op.path, + PatchOperation::Remove(op) => &mut op.path, + _ => unreachable!(), + }; + if path.starts_with(&prefix) { + if let Some(idx) = path + .get_segment(arr_path_idx) + .and_then(|seg| seg.parse::().ok()) + { + if idx >= onto_idx { + let mut new_path = prefix.clone().to_owned(); + new_path.push_end_idx(idx - 1); + if let Some(tail) = path.slice(arr_path_idx + 1..) { + new_path.append(&tail); + } + *path = new_path; + } + } + } + } + } + } + } + } +} pub struct Store { file: FdLock, @@ -176,7 +306,7 @@ impl Store { let id = self.revision; self.revision += 1; - let res = Arc::new(Revision { id, patch: patch.0 }); + let res = Arc::new(Revision { id, patch }); Ok(res) } @@ -211,8 +341,19 @@ impl PatchDb { ) -> Result, Error> { self.store.write().await.put(ptr, value).await } - pub async fn apply(&self, patch: DiffPatch) -> Result, Error> { - self.store.write().await.apply(patch).await + pub async fn apply( + &self, + patch: DiffPatch, + store_write_lock: Option>, + ) -> Result, Error> { + let mut store = if let Some(store_write_lock) = store_write_lock { + store_write_lock + } else { + self.store.write().await + }; + let rev = store.apply(patch).await?; + self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors + Ok(rev) } pub fn subscribe(&self) -> Receiver> { self.subscriber.subscribe() @@ -222,20 +363,24 @@ impl PatchDb { db: self.clone(), locks: Vec::new(), updates: DiffPatch(Patch(Vec::new())), + sub: self.subscribe(), } } } -pub trait Checkpoint { - type SubTx: Checkpoint; +pub trait Checkpoint: Sized { + fn rebase(&mut self) -> Result<(), Error>; fn get_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( &'a mut self, ptr: &'a JsonPointer, + store_read_lock: Option>, ) -> BoxFuture<'a, Result>; fn put_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( &'a mut self, ptr: &'a JsonPointer, value: &'a Value, ) -> BoxFuture<'a, Result<(), Error>>; + fn store(&self) -> Arc>; + fn subscribe(&self) -> Receiver>; fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); fn apply(&mut self, patch: DiffPatch); fn lock<'a, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( @@ -269,36 +414,35 @@ pub struct Transaction { db: PatchDb, locks: Vec<(JsonPointer, LockerGuard)>, updates: DiffPatch, + sub: Receiver>, } impl Transaction { + pub fn rebase(&mut self) -> Result<(), Error> { + while let Some(rev) = match self.sub.try_recv() { + Ok(a) => Some(a), + Err(TryRecvError::Empty) => None, + Err(e) => return Err(e.into()), + } { + self.updates.rebase(&rev.patch); + } + Ok(()) + } async fn get_value, V: SegList>( &mut self, ptr: &JsonPointer, + store_read_lock: Option>, ) -> Result { - let mut data: Value = ptr - .get(self.db.store.read().await.get_data()?) - .unwrap_or(&Value::Null) - .clone(); - for op in (self.updates.0).0.iter() { - match op { - PatchOperation::Add(ref op) => { - if let Some(path) = op.path.strip_prefix(ptr) { - path.insert(&mut data, op.value.clone(), false)?; - } - } - PatchOperation::Remove(ref op) => { - if let Some(path) = op.path.strip_prefix(ptr) { - path.remove(&mut data, false); - } - } - PatchOperation::Replace(ref op) => { - if let Some(path) = op.path.strip_prefix(ptr) { - path.set(&mut data, op.value.clone(), false)?; - } - } - _ => unreachable!("Diff patches cannot contain other operations."), - } - } + let mut data = { + let store_lock = self.db.store.clone(); + let store = if let Some(store_read_lock) = store_read_lock { + store_read_lock + } else { + store_lock.read().await + }; + self.rebase()?; + ptr.get(store.get_data()?).cloned().unwrap_or_default() + }; + json_patch::patch(&mut data, &self.updates.for_path(ptr).0)?; Ok(data) } async fn put_value, V: SegList>( @@ -306,8 +450,8 @@ impl Transaction { ptr: &JsonPointer, value: &Value, ) -> Result<(), Error> { - let old = Transaction::get_value(self, ptr).await?; - let mut patch = json_patch::diff(&old, value); + let old = Transaction::get_value(self, ptr, None).await?; + let mut patch = json_patch::diff(&old, &value); patch.prepend(ptr); (self.updates.0).0.extend(patch.0); Ok(()) @@ -340,7 +484,7 @@ impl Transaction { ) -> Result { self.lock(ptr, lock).await; Ok(serde_json::from_value( - Transaction::get_value(self, ptr).await?, + Transaction::get_value(self, ptr, None).await?, )?) } pub async fn put, V: SegList>( @@ -350,17 +494,36 @@ impl Transaction { ) -> Result<(), Error> { Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await } - pub async fn commit(self) -> Result, Error> { - self.db.apply(self.updates).await + pub async fn commit(mut self) -> Result, Error> { + let store_lock = self.db.store.clone(); + let store = store_lock.write().await; + self.rebase()?; + self.db.apply(self.updates, Some(store)).await + } + pub async fn begin(&mut self) -> Result, Error> { + let store_lock = self.db.store.clone(); + let store = store_lock.read().await; + self.rebase()?; + let sub = self.db.subscribe(); + drop(store); + Ok(SubTransaction { + parent: self, + locks: Vec::new(), + updates: DiffPatch(Patch(Vec::new())), + sub, + }) } } impl<'a> Checkpoint for &'a mut Transaction { - type SubTx = &'a mut SubTransaction; + fn rebase(&mut self) -> Result<(), Error> { + Transaction::rebase(self) + } fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( &'b mut self, ptr: &'b JsonPointer, + store_read_lock: Option>, ) -> BoxFuture<'b, Result> { - Transaction::get_value(self, ptr).boxed() + Transaction::get_value(self, ptr, store_read_lock).boxed() } fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( &'b mut self, @@ -369,6 +532,12 @@ impl<'a> Checkpoint for &'a mut Transaction { ) -> BoxFuture<'b, Result<(), Error>> { Transaction::put_value(self, ptr, value).boxed() } + fn store(&self) -> Arc> { + self.db.store.clone() + } + fn subscribe(&self) -> Receiver> { + self.db.subscribe() + } fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { (&self.db.locker, vec![&mut self.locks]) } @@ -412,33 +581,36 @@ pub struct SubTransaction { parent: Tx, locks: Vec<(JsonPointer, LockerGuard)>, updates: DiffPatch, + sub: Receiver>, } -impl SubTransaction { +impl SubTransaction { + pub fn rebase(&mut self) -> Result<(), Error> { + self.parent.rebase()?; + while let Some(rev) = match self.sub.try_recv() { + Ok(a) => Some(a), + Err(TryRecvError::Empty) => None, + Err(e) => return Err(e.into()), + } { + self.updates.rebase(&rev.patch); + } + Ok(()) + } async fn get_value + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, + store_read_lock: Option>, ) -> Result { - let mut data: Value = self.parent.get_value(ptr).await?; - for op in (self.updates.0).0.iter() { - match op { - PatchOperation::Add(ref op) => { - if let Some(path) = op.path.strip_prefix(ptr) { - path.insert(&mut data, op.value.clone(), false)?; - } - } - PatchOperation::Remove(ref op) => { - if let Some(path) = op.path.strip_prefix(ptr) { - path.remove(&mut data, false); - } - } - PatchOperation::Replace(ref op) => { - if let Some(path) = op.path.strip_prefix(ptr) { - path.set(&mut data, op.value.clone(), false)?; - } - } - _ => unreachable!("Diff patches cannot contain other operations."), - } - } + let mut data = { + let store_lock = self.parent.store(); + let store = if let Some(store_read_lock) = store_read_lock { + store_read_lock + } else { + store_lock.read().await + }; + self.rebase()?; + self.parent.get_value(ptr, Some(store)).await? + }; + json_patch::patch(&mut data, &self.updates.for_path(ptr).0)?; Ok(data) } async fn put_value + Send + Sync, V: SegList + Send + Sync>( @@ -446,8 +618,8 @@ impl SubTransaction { ptr: &JsonPointer, value: &Value, ) -> Result<(), Error> { - let old = SubTransaction::get_value(self, ptr).await?; - let mut patch = json_patch::diff(&old, value); + let old = SubTransaction::get_value(self, ptr, None).await?; + let mut patch = json_patch::diff(&old, &value); patch.prepend(ptr); (self.updates.0).0.extend(patch.0); Ok(()) @@ -482,7 +654,7 @@ impl SubTransaction { ) -> Result { self.lock(ptr, lock).await; Ok(serde_json::from_value( - SubTransaction::get_value(self, ptr).await?, + SubTransaction::get_value(self, ptr, None).await?, )?) } pub async fn put + Send + Sync, V: SegList + Send + Sync>( @@ -490,24 +662,40 @@ impl SubTransaction { ptr: &JsonPointer, value: &T, ) -> Result<(), Error> { - let old = SubTransaction::get_value(self, ptr).await?; - let new = serde_json::to_value(value)?; - let mut patch = json_patch::diff(&old, &new); - patch.prepend(ptr); - (self.updates.0).0.extend(patch.0); + SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await + } + pub async fn commit(mut self) -> Result<(), Error> { + let store_lock = self.parent.store(); + let store = store_lock.read().await; + self.rebase()?; + self.parent.apply(self.updates); + drop(store); Ok(()) } - pub fn commit(mut self) { - self.parent.apply(self.updates) + pub async fn begin(&mut self) -> Result, Error> { + let store_lock = self.parent.store(); + let store = store_lock.read().await; + self.rebase()?; + let sub = self.parent.subscribe(); + drop(store); + Ok(SubTransaction { + parent: self, + locks: Vec::new(), + updates: DiffPatch(Patch(Vec::new())), + sub, + }) } } impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction { - type SubTx = &'a mut SubTransaction; + fn rebase(&mut self) -> Result<(), Error> { + SubTransaction::rebase(self) + } fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( &'b mut self, ptr: &'b JsonPointer, + store_read_lock: Option>, ) -> BoxFuture<'b, Result> { - SubTransaction::get_value(self, ptr).boxed() + SubTransaction::get_value(self, ptr, store_read_lock).boxed() } fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( &'b mut self, @@ -516,6 +704,12 @@ impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction ) -> BoxFuture<'b, Result<(), Error>> { SubTransaction::put_value(self, ptr, value).boxed() } + fn store(&self) -> Arc> { + self.parent.store() + } + fn subscribe(&self) -> Receiver> { + self.parent.subscribe() + } fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { let (locker, mut locks) = self.parent.locker_and_locks(); locks.push(&mut self.locks); @@ -855,15 +1049,22 @@ impl ChildHooks { } } +pub trait Model { + type Data: ModelData; +} + +impl Model for Never { + type Data = Never; +} + pub struct GenericModel Deserialize<'de>, Parent: ModelData> { data: Arc>>, ptr: JsonPointer, } -impl< - 'a, - T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, - Parent: ModelData + 'static, - > GenericModel +impl GenericModel +where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, + Parent: ModelData + 'static, { pub fn new(ptr: JsonPointer) -> Self { Self { @@ -872,7 +1073,7 @@ impl< } } - /// locks + // locks async fn fetch(&mut self, tx: &mut Tx) -> Result<(), Error> { let mut data = self.data.lock().await; if let GenericModelData::Uninitialized(children) = &mut *data { @@ -890,7 +1091,7 @@ impl< tx.lock(&self.ptr, lock).await } - /// locks + // locks pub async fn peek< Tx: Checkpoint, F: FnOnce(&T) -> ResFut + Send + Sync, @@ -901,11 +1102,12 @@ impl< tx: &mut Tx, f: F, ) -> Result { + self.lock(tx, LockType::Read).await; self.fetch(tx).await?; Ok(self.data.lock().await.apply(|t| f(t)).await) } - /// locks + // locks pub async fn apply< Tx: Checkpoint, F: FnOnce(&mut T) -> ResFut + Send + Sync, @@ -916,11 +1118,12 @@ impl< tx: &mut Tx, f: F, ) -> Result { + self.lock(tx, LockType::Write).await; self.fetch(tx).await?; Ok(self.data.lock().await.apply(f).await) } - /// locks + // locks pub async fn child( &mut self, path: &JsonPointer, @@ -1018,3 +1221,11 @@ impl< Ok(()) } } + +impl Model for GenericModel +where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync, + Parent: ModelData, +{ + type Data = GenericModelData; +} diff --git a/patch-db/src/test.rs b/patch-db/src/test.rs index a3c9d80..84fbc76 100644 --- a/patch-db/src/test.rs +++ b/patch-db/src/test.rs @@ -8,3 +8,25 @@ async fn basic() { .await .unwrap(); } + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct Sample { + a: String, + b: Child, +} + +pub struct SampleModel(GenericModel); +impl Model for SampleModel { + type Data = GenericModelData; +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct Child { + a: String, + b: usize, +} + +pub struct ChildModel(GenericModel); +impl Model for ChildModel { + type Data = GenericModelData; +}