From b70f1ce3af2cb23fdae7b5c275d958833fe2642c Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Tue, 27 Apr 2021 14:59:53 -0600 Subject: [PATCH] minor refactor of transactions --- patch-db-macro-internals/src/lib.rs | 15 ++ patch-db/src/handle.rs | 247 +++++++++++++++++ patch-db/src/lib.rs | 4 +- patch-db/src/model.rs | 101 +++++-- patch-db/src/store.rs | 24 +- patch-db/src/test.rs | 5 +- patch-db/src/transaction.rs | 400 +++++----------------------- 7 files changed, 421 insertions(+), 375 deletions(-) create mode 100644 patch-db/src/handle.rs diff --git a/patch-db-macro-internals/src/lib.rs b/patch-db-macro-internals/src/lib.rs index 7465f10..b94a807 100644 --- a/patch-db-macro-internals/src/lib.rs +++ b/patch-db-macro-internals/src/lib.rs @@ -162,6 +162,11 @@ fn build_model_struct( #model_name(#inner_model::from(ptr)) } } + impl From<#model_name> for patch_db::json_ptr::JsonPointer { + fn from(model: #model_name) -> Self { + model.0.into() + } + } impl AsRef for #model_name { fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer { self.0.as_ref() @@ -242,6 +247,11 @@ fn build_model_struct( #model_name(From::from(ptr)) } } + impl From<#model_name> for patch_db::json_ptr::JsonPointer { + fn from(model: #model_name) -> Self { + model.0.into() + } + } impl AsRef for #model_name { fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer { self.0.as_ref() @@ -281,6 +291,11 @@ fn build_model_enum(base: &DeriveInput, _: &DataEnum, model_name: Option) #model_name(From::from(ptr)) } } + impl From<#model_name> for patch_db::json_ptr::JsonPointer { + fn from(model: #model_name) -> Self { + model.0.into() + } + } impl AsRef for #model_name { fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer { self.0.as_ref() diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs new file mode 100644 index 0000000..7ba74d0 --- /dev/null +++ b/patch-db/src/handle.rs @@ -0,0 +1,247 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use json_ptr::{JsonPointer, SegList}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard}; + +use crate::{ + locker::{LockType, LockerGuard}, + Locker, PatchDb, Revision, Store, Transaction, +}; +use crate::{patch::DiffPatch, Error}; + +#[async_trait] +pub trait DbHandle: Sized + Send + Sync { + async fn begin<'a>(&'a mut self) -> Result, Error>; + fn rebase(&mut self) -> Result<(), Error>; + fn store(&self) -> Arc>; + fn subscribe(&self) -> Receiver>; + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); + async fn exists + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result; + async fn get_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result; + async fn put_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + value: &Value, + ) -> Result<(), Error>; + async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error>; + async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &mut self, + ptr: &JsonPointer, + lock: LockType, + ) -> (); + async fn get< + T: for<'de> Deserialize<'de>, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, + >( + &mut self, + ptr: &JsonPointer, + ) -> Result; + async fn put< + T: Serialize + Send + Sync, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, + >( + &mut self, + ptr: &JsonPointer, + value: &T, + ) -> Result<(), Error>; +} +#[async_trait] +impl DbHandle for &mut Handle { + async fn begin<'a>(&'a mut self) -> Result, Error> { + let Transaction { + locks, + updates, + sub, + .. + } = (*self).begin().await?; + Ok(Transaction { + parent: self, + locks, + updates, + sub, + }) + } + fn rebase(&mut self) -> Result<(), Error> { + (*self).rebase() + } + fn store(&self) -> Arc> { + (**self).store() + } + fn subscribe(&self) -> Receiver> { + (**self).subscribe() + } + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + (*self).locker_and_locks() + } + async fn exists + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result { + (*self).exists(ptr, store_read_lock).await + } + async fn get_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result { + (*self).get_value(ptr, store_read_lock).await + } + async fn put_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + value: &Value, + ) -> Result<(), Error> { + (*self).put_value(ptr, value).await + } + async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> { + (*self).apply(patch).await + } + async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &mut self, + ptr: &JsonPointer, + lock: LockType, + ) { + (*self).lock(ptr, lock).await + } + async fn get< + T: for<'de> Deserialize<'de>, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, + >( + &mut self, + ptr: &JsonPointer, + ) -> Result { + (*self).get(ptr).await + } + async fn put< + T: Serialize + Send + Sync, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, + >( + &mut self, + ptr: &JsonPointer, + value: &T, + ) -> Result<(), Error> { + (*self).put(ptr, value).await + } +} + +pub struct PatchDbHandle { + pub(crate) db: PatchDb, + pub(crate) locks: Vec<(JsonPointer, LockerGuard)>, +} + +#[async_trait] +impl DbHandle for PatchDbHandle { + async fn begin<'a>(&'a mut self) -> Result, Error> { + Ok(Transaction { + sub: self.subscribe(), + parent: self, + locks: Vec::new(), + updates: DiffPatch::default(), + }) + } + fn rebase(&mut self) -> Result<(), Error> { + Ok(()) + } + fn store(&self) -> Arc> { + self.db.store.clone() + } + fn subscribe(&self) -> Receiver> { + self.db.subscribe() + } + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + (&self.db.locker, vec![self.locks.as_mut_slice()]) + } + async fn exists + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result { + if let Some(lock) = store_read_lock { + lock.exists(ptr) + } else { + self.db.exists(ptr).await + } + } + async fn get_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + store_read_lock: Option>, + ) -> Result { + if let Some(lock) = store_read_lock { + lock.get(ptr) + } else { + self.db.get(ptr).await + } + } + async fn put_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + value: &Value, + ) -> Result<(), Error> { + self.db.put(ptr, value).await?; + Ok(()) + } + async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> { + self.db.apply(patch, None).await?; + Ok(()) + } + async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &mut self, + ptr: &JsonPointer, + lock: LockType, + ) { + match lock { + LockType::Read => { + self.db + .locker + .add_read_lock(ptr, &mut self.locks, &mut []) + .await; + } + LockType::Write => { + self.db + .locker + .add_write_lock(ptr, &mut self.locks, &mut []) + .await; + } + LockType::None => (), + } + } + async fn get< + T: for<'de> Deserialize<'de>, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, + >( + &mut self, + ptr: &JsonPointer, + ) -> Result { + self.db.get(ptr).await + } + async fn put< + T: Serialize + Send + Sync, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, + >( + &mut self, + ptr: &JsonPointer, + value: &T, + ) -> Result<(), Error> { + self.db.put(ptr, value).await?; + Ok(()) + } +} diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index 872fbc8..0d1501f 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -6,6 +6,7 @@ use tokio::sync::broadcast::error::TryRecvError; // note: inserting into an array (before another element) without proper locking can result in unexpected behaviour +mod handle; mod locker; mod model; mod patch; @@ -15,6 +16,7 @@ mod transaction; #[cfg(test)] mod test; +pub use handle::{DbHandle, PatchDbHandle}; pub use json_ptr; pub use locker::{LockType, Locker}; pub use model::{ @@ -23,7 +25,7 @@ pub use model::{ pub use patch::Revision; pub use patch_db_macro::HasModel; pub use store::{PatchDb, Store}; -pub use transaction::{Checkpoint, SubTransaction, Transaction}; +pub use transaction::Transaction; #[derive(Error, Debug)] pub enum Error { diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index bb304ce..310e415 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -7,9 +7,8 @@ use json_ptr::JsonPointer; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::locker::LockType; -use crate::transaction::Checkpoint; use crate::Error; +use crate::{locker::LockType, DbHandle}; pub struct ModelData Deserialize<'de>>(T); impl Deserialize<'de>> Deref for ModelData { @@ -25,13 +24,13 @@ pub struct ModelDataMut Deserialize<'de>> { ptr: JsonPointer, } impl Deserialize<'de>> ModelDataMut { - pub async fn save(self, tx: &mut Tx) -> Result<(), Error> { + pub async fn save(self, db: &mut Db) -> Result<(), Error> { let current = serde_json::to_value(&self.current)?; let mut diff = crate::patch::diff(&self.original, ¤t); - let target = tx.get_value(&self.ptr, None).await?; + let target = db.get_value(&self.ptr, None).await?; diff.rebase(&crate::patch::diff(&self.original, &target)); diff.prepend(&self.ptr); - tx.apply(diff); + db.apply(diff).await?; Ok(()) } } @@ -56,17 +55,18 @@ impl Model where T: Serialize + for<'de> Deserialize<'de>, { - pub async fn lock(&self, tx: &mut Tx, lock: LockType) { - tx.lock(&self.ptr, lock).await + pub async fn lock(&self, db: &mut Db, lock: LockType) { + db.lock(&self.ptr, lock).await } - pub async fn get(&self, tx: &mut Tx) -> Result, Error> { - Ok(ModelData(tx.get(&self.ptr, LockType::Read).await?)) + pub async fn get(&self, db: &mut Db) -> Result, Error> { + db.lock(&self.ptr, LockType::Read).await; + Ok(ModelData(db.get(&self.ptr).await?)) } - pub async fn get_mut(&self, tx: &mut Tx) -> Result, Error> { - self.lock(tx, LockType::Write).await; - let original = tx.get_value(&self.ptr, None).await?; + pub async fn get_mut(&self, db: &mut Db) -> Result, Error> { + self.lock(db, LockType::Write).await; + let original = db.get_value(&self.ptr, None).await?; let current = serde_json::from_value(original.clone())?; Ok(ModelDataMut { original, @@ -88,9 +88,9 @@ impl Model where T: Serialize + for<'de> Deserialize<'de> + Send + Sync, { - pub async fn put(&self, tx: &mut Tx, value: &T) -> Result<(), Error> { - self.lock(tx, LockType::Write).await; - tx.put(&self.ptr, value).await + pub async fn put(&self, db: &mut Db, value: &T) -> Result<(), Error> { + self.lock(db, LockType::Write).await; + db.put(&self.ptr, value).await } } impl From for Model @@ -132,8 +132,8 @@ where } } -pub trait HasModel { - type Model: From + AsRef; +pub trait HasModel: Serialize + for<'de> Deserialize<'de> { + type Model: From + AsRef + Into + From>; } #[derive(Debug, Clone)] @@ -162,6 +162,11 @@ impl Deserialize<'de>> From for BoxModel(T::Model::from(ptr)) } } +impl Deserialize<'de>> From> for JsonPointer { + fn from(model: BoxModel) -> Self { + model.0.into() + } +} impl Deserialize<'de>> HasModel for Box { type Model = BoxModel; } @@ -169,13 +174,32 @@ impl Deserialize<'de>> HasModel for Box { #[derive(Debug, Clone)] pub struct OptionModel Deserialize<'de>>(T::Model); impl Deserialize<'de>> OptionModel { - pub async fn check(self, tx: &mut Tx) -> Result, Error> { - Ok(if tx.exists(self.0.as_ref(), None).await? { + pub async fn exists(&self, db: &mut Db) -> Result { + db.lock(self.as_ref(), LockType::Read).await; + Ok(db.exists(&self.as_ref(), None).await?) + } + + pub async fn check(self, db: &mut Db) -> Result, Error> { + Ok(if self.exists(db).await? { Some(self.0) } else { None }) } + + pub async fn delete(&self, db: &mut Db) -> Result<(), Error> { + db.lock(self.as_ref(), LockType::Write).await; + db.put(self.as_ref(), &Value::Null).await + } +} +impl OptionModel +where + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel, +{ + pub async fn put(&self, db: &mut Db, value: &T) -> Result<(), Error> { + db.lock(self.as_ref(), LockType::Write).await; + db.put(self.as_ref(), value).await + } } impl Deserialize<'de>> From>> for OptionModel @@ -189,8 +213,18 @@ impl Deserialize<'de>> From for OptionModel(T::Model::from(ptr)) } } +impl Deserialize<'de>> From> for JsonPointer { + fn from(model: OptionModel) -> Self { + model.0.into() + } +} +impl Deserialize<'de>> AsRef for OptionModel { + fn as_ref(&self) -> &JsonPointer { + self.0.as_ref() + } +} impl Deserialize<'de>> HasModel for Option { - type Model = BoxModel; + type Model = OptionModel; } #[derive(Debug, Clone)] @@ -216,6 +250,11 @@ impl Deserialize<'de>> From for VecModel VecModel(From::from(ptr)) } } +impl Deserialize<'de>> From> for JsonPointer { + fn from(model: VecModel) -> Self { + model.0.into() + } +} impl AsRef for VecModel where T: Serialize + for<'de> Deserialize<'de>, @@ -282,6 +321,15 @@ where self.child(idx.as_ref()).into() } } +impl From> for MapModel +where + T: Serialize + for<'de> Deserialize<'de> + Map, + T::Value: Serialize + for<'de> Deserialize<'de>, +{ + fn from(model: Model) -> Self { + MapModel(model) + } +} impl From for MapModel where T: Serialize + for<'de> Deserialize<'de> + Map, @@ -291,6 +339,15 @@ where MapModel(From::from(ptr)) } } +impl From> for JsonPointer +where + T: Serialize + for<'de> Deserialize<'de> + Map, + T::Value: Serialize + for<'de> Deserialize<'de>, +{ + fn from(model: MapModel) -> Self { + model.0.into() + } +} impl AsRef for MapModel where T: Serialize + for<'de> Deserialize<'de> + Map, @@ -309,8 +366,8 @@ where } impl HasModel for BTreeMap where - K: Serialize + for<'de> Deserialize<'de> + Hash + Eq + AsRef, + K: Serialize + for<'de> Deserialize<'de> + Ord + AsRef, V: Serialize + for<'de> Deserialize<'de>, { - type Model = MapModel>; + type Model = MapModel>; } diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 9c0ae34..37e05c2 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -14,10 +14,9 @@ use tokio::fs::File; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::{Mutex, RwLock, RwLockWriteGuard}; -use crate::locker::Locker; use crate::patch::{diff, DiffPatch, Revision}; -use crate::transaction::Transaction; use crate::Error; +use crate::{locker::Locker, PatchDbHandle}; lazy_static! { static ref OPEN_STORES: Mutex>> = Mutex::new(HashMap::new()); @@ -31,7 +30,7 @@ pub struct Store { revision: u64, } impl Store { - pub async fn open>(path: P) -> Result { + pub(crate) async fn open>(path: P) -> Result { let (_lock, path) = { if !path.as_ref().exists() { tokio::fs::File::create(path.as_ref()).await?; @@ -123,13 +122,13 @@ impl Store { self.file.unlock(true).map_err(|e| e.1)?; Ok(()) } - pub fn exists, V: SegList>( + pub(crate) fn exists, V: SegList>( &self, ptr: &JsonPointer, ) -> Result { Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null) } - pub fn get Deserialize<'de>, S: AsRef, V: SegList>( + pub(crate) fn get Deserialize<'de>, S: AsRef, V: SegList>( &self, ptr: &JsonPointer, ) -> Result { @@ -137,10 +136,10 @@ impl Store { ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(), )?) } - pub fn dump(&self) -> Value { + pub(crate) fn dump(&self) -> Value { self.get_data().unwrap().clone() } - pub async fn put, V: SegList>( + pub(crate) async fn put, V: SegList>( &mut self, ptr: &JsonPointer, value: &T, @@ -152,7 +151,7 @@ impl Store { patch.prepend(ptr); self.apply(patch).await } - pub async fn apply(&mut self, patch: DiffPatch) -> Result, Error> { + pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result, Error> { use tokio::io::AsyncWriteExt; self.check_cache_corrupted()?; @@ -196,6 +195,9 @@ impl PatchDb { subscriber: Arc::new(subscriber), }) } + pub async fn dump(&self) -> Value { + self.store.read().await.dump() + } pub async fn exists, V: SegList>( &self, ptr: &JsonPointer, @@ -235,12 +237,10 @@ impl PatchDb { pub fn subscribe(&self) -> Receiver> { self.subscriber.subscribe() } - pub fn begin(&self) -> Transaction { - Transaction { + pub fn handle(&self) -> PatchDbHandle { + PatchDbHandle { db: self.clone(), locks: Vec::new(), - updates: DiffPatch::default(), - sub: self.subscribe(), } } } diff --git a/patch-db/src/test.rs b/patch-db/src/test.rs index a467ca4..19abb41 100644 --- a/patch-db/src/test.rs +++ b/patch-db/src/test.rs @@ -8,7 +8,7 @@ use serde_json::Value; use tokio::fs; use tokio::runtime::Builder; -use crate as patch_db; +use crate::{self as patch_db, DbHandle}; async fn init_db(db_name: String) -> PatchDb { cleanup_db(&db_name).await; @@ -54,7 +54,8 @@ async fn basic() { #[tokio::test] async fn transaction() { let db = init_db("test.db".to_string()).await; - let mut tx = db.begin(); + let mut handle = db.handle(); + let mut tx = handle.begin().await.unwrap(); let ptr: JsonPointer = "/b/b".parse().unwrap(); tx.put(&ptr, &(2 as usize)).await.unwrap(); tx.put(&ptr, &(1 as usize)).await.unwrap(); diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index 3c20aeb..3e13f5c 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::future::{BoxFuture, FutureExt}; +use async_trait::async_trait; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -8,258 +8,59 @@ use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::broadcast::Receiver; use tokio::sync::{RwLock, RwLockReadGuard}; -use crate::locker::{LockType, Locker, LockerGuard}; -use crate::patch::{DiffPatch, Revision}; -use crate::store::{PatchDb, Store}; +use crate::store::Store; use crate::Error; +use crate::{ + locker::{LockType, Locker, LockerGuard}, + DbHandle, +}; +use crate::{ + patch::{DiffPatch, Revision}, + PatchDbHandle, +}; -pub trait Checkpoint: Sized { - fn rebase(&mut self) -> Result<(), Error>; - fn exists<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( - &'a mut self, - ptr: &'a JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'a, Result>; - fn get_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( - &'a mut self, - ptr: &'a JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'a, Result>; - fn put_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( - &'a mut self, - ptr: &'a JsonPointer, - value: &'a Value, - ) -> BoxFuture<'a, Result<(), Error>>; - fn store(&self) -> Arc>; - fn subscribe(&self) -> Receiver>; - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); - fn apply(&mut self, patch: DiffPatch); - fn lock<'a, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &'a mut self, - ptr: &'a JsonPointer, - lock: LockType, - ) -> BoxFuture<'a, ()>; - fn get< - 'a, - T: for<'de> Deserialize<'de> + 'a, - S: AsRef + Clone + Send + Sync + 'a, - V: SegList + Clone + Send + Sync + 'a, - >( - &'a mut self, - ptr: &'a JsonPointer, - lock: LockType, - ) -> BoxFuture<'a, Result>; - fn put< - 'a, - T: Serialize + Send + Sync + 'a, - S: AsRef + Send + Sync + 'a, - V: SegList + Send + Sync + 'a, - >( - &'a mut self, - ptr: &'a JsonPointer, - value: &'a T, - ) -> BoxFuture<'a, Result<(), Error>>; -} - -pub struct Transaction { - pub(crate) db: PatchDb, +pub struct Transaction { + pub(crate) parent: Parent, pub(crate) locks: Vec<(JsonPointer, LockerGuard)>, pub(crate) updates: DiffPatch, pub(crate) sub: Receiver>, } -impl Transaction { - pub fn rebase(&mut self) -> Result<(), Error> { - while let Some(rev) = match self.sub.try_recv() { - Ok(a) => Some(a), - Err(TryRecvError::Empty) => None, - Err(e) => return Err(e.into()), - } { - self.updates.rebase(&rev.patch); - } - Ok(()) - } - async fn exists, V: SegList>( - &mut self, - ptr: &JsonPointer, - store_read_lock: Option>, - ) -> Result { - let exists = { - let store_lock = self.db.store.clone(); - let store = if let Some(store_read_lock) = store_read_lock { - store_read_lock - } else { - store_lock.read().await - }; - self.rebase()?; - ptr.get(store.get_data()?).unwrap_or(&Value::Null) != &Value::Null - }; - Ok(self.updates.for_path(ptr).exists().unwrap_or(exists)) - } - async fn get_value, V: SegList>( - &mut self, - ptr: &JsonPointer, - store_read_lock: Option>, - ) -> Result { - let mut data = { - let store_lock = self.db.store.clone(); - let store = if let Some(store_read_lock) = store_read_lock { - store_read_lock - } else { - store_lock.read().await - }; - self.rebase()?; - ptr.get(store.get_data()?).cloned().unwrap_or_default() - }; - json_patch::patch(&mut data, &*self.updates.for_path(ptr))?; - Ok(data) - } - async fn put_value, V: SegList>( - &mut self, - ptr: &JsonPointer, - value: &Value, - ) -> Result<(), Error> { - let old = Transaction::get_value(self, ptr, None).await?; - let mut patch = crate::patch::diff(&old, &value); - patch.prepend(ptr); - self.updates.append(patch); - Ok(()) - } - pub async fn lock + Clone, V: SegList + Clone>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - ) { - match lock { - LockType::None => (), - LockType::Read => { - self.db - .locker - .add_read_lock(ptr, &mut self.locks, &mut []) - .await - } - LockType::Write => { - self.db - .locker - .add_write_lock(ptr, &mut self.locks, &mut []) - .await - } - } - } - pub async fn get Deserialize<'de>, S: AsRef + Clone, V: SegList + Clone>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - ) -> Result { - self.lock(ptr, lock).await; - Ok(serde_json::from_value( - Transaction::get_value(self, ptr, None).await?, - )?) - } - pub async fn put, V: SegList>( - &mut self, - ptr: &JsonPointer, - value: &T, - ) -> Result<(), Error> { - Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await - } +impl Transaction<&mut PatchDbHandle> { pub async fn commit(mut self) -> Result, Error> { - let store_lock = self.db.store.clone(); - let store = store_lock.write().await; - self.rebase()?; - self.db.apply(self.updates, Some(store)).await - } - pub async fn begin(&mut self) -> Result, Error> { - let store_lock = self.db.store.clone(); + let store_lock = self.parent.store(); let store = store_lock.read().await; self.rebase()?; - let sub = self.db.subscribe(); + let rev = self.parent.db.apply(self.updates, None).await?; drop(store); - Ok(SubTransaction { + Ok(rev) + } +} +impl Transaction { + pub async fn save(mut self) -> Result<(), Error> { + let store_lock = self.parent.store(); + let store = store_lock.read().await; + self.rebase()?; + self.parent.apply(self.updates).await?; + drop(store); + Ok(()) + } +} +#[async_trait] +impl DbHandle for Transaction { + async fn begin<'a>(&'a mut self) -> Result, Error> { + let store_lock = self.parent.store(); + let store = store_lock.read().await; + self.rebase()?; + let sub = self.parent.subscribe(); + drop(store); + Ok(Transaction { parent: self, locks: Vec::new(), updates: DiffPatch::default(), sub, }) } -} -impl<'a> Checkpoint for &'a mut Transaction { fn rebase(&mut self) -> Result<(), Error> { - Transaction::rebase(self) - } - fn exists<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'b, Result> { - Transaction::exists(self, ptr, store_read_lock).boxed() - } - fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'b, Result> { - Transaction::get_value(self, ptr, store_read_lock).boxed() - } - fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b Value, - ) -> BoxFuture<'b, Result<(), Error>> { - Transaction::put_value(self, ptr, value).boxed() - } - fn store(&self) -> Arc> { - self.db.store.clone() - } - fn subscribe(&self) -> Receiver> { - self.db.subscribe() - } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { - (&self.db.locker, vec![&mut self.locks]) - } - fn apply(&mut self, patch: DiffPatch) { - self.updates.append(patch) - } - fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, ()> { - Transaction::lock(self, ptr, lock).boxed() - } - fn get< - 'b, - T: for<'de> Deserialize<'de> + 'b, - S: AsRef + Clone + Send + Sync + 'b, - V: SegList + Clone + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, Result> { - Transaction::get(self, ptr, lock).boxed() - } - fn put< - 'b, - T: Serialize + Send + Sync + 'b, - S: AsRef + Send + Sync + 'b, - V: SegList + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b T, - ) -> BoxFuture<'b, Result<(), Error>> { - Transaction::put(self, ptr, value).boxed() - } -} - -pub struct SubTransaction { - parent: Tx, - locks: Vec<(JsonPointer, LockerGuard)>, - updates: DiffPatch, - sub: Receiver>, -} -impl SubTransaction { - pub fn rebase(&mut self) -> Result<(), Error> { self.parent.rebase()?; while let Some(rev) = match self.sub.try_recv() { Ok(a) => Some(a), @@ -270,6 +71,17 @@ impl SubTransaction { } Ok(()) } + fn store(&self) -> Arc> { + self.parent.store() + } + fn subscribe(&self) -> Receiver> { + self.parent.subscribe() + } + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + let (locker, mut locks) = self.parent.locker_and_locks(); + locks.push(&mut self.locks); + (locker, locks) + } async fn exists + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, @@ -310,13 +122,13 @@ impl SubTransaction { ptr: &JsonPointer, value: &Value, ) -> Result<(), Error> { - let old = SubTransaction::get_value(self, ptr, None).await?; + let old = self.get_value(ptr, None).await?; let mut patch = crate::patch::diff(&old, &value); patch.prepend(ptr); self.updates.append(patch); Ok(()) } - pub async fn lock + Clone, V: SegList + Clone>( + async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( &mut self, ptr: &JsonPointer, lock: LockType, @@ -335,117 +147,29 @@ impl SubTransaction { } } } - pub async fn get< + async fn get< T: for<'de> Deserialize<'de>, - S: AsRef + Clone + Send + Sync, - V: SegList + Clone + Send + Sync, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, >( &mut self, ptr: &JsonPointer, - lock: LockType, ) -> Result { - self.lock(ptr, lock).await; - Ok(serde_json::from_value( - SubTransaction::get_value(self, ptr, None).await?, - )?) + Ok(serde_json::from_value(self.get_value(ptr, None).await?)?) } - pub async fn put + Send + Sync, V: SegList + Send + Sync>( + async fn put< + T: Serialize + Send + Sync, + S: AsRef + Send + Sync, + V: SegList + Send + Sync, + >( &mut self, ptr: &JsonPointer, value: &T, ) -> Result<(), Error> { - SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await + self.put_value(ptr, &serde_json::to_value(value)?).await } - pub async fn commit(mut self) -> Result<(), Error> { - let store_lock = self.parent.store(); - let store = store_lock.read().await; - self.rebase()?; - self.parent.apply(self.updates); - drop(store); + async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> { + self.updates.append(patch); Ok(()) } - pub async fn begin(&mut self) -> Result, Error> { - let store_lock = self.parent.store(); - let store = store_lock.read().await; - self.rebase()?; - let sub = self.parent.subscribe(); - drop(store); - Ok(SubTransaction { - parent: self, - locks: Vec::new(), - updates: DiffPatch::default(), - sub, - }) - } -} -impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction { - fn rebase(&mut self) -> Result<(), Error> { - SubTransaction::rebase(self) - } - fn exists<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'b, Result> { - SubTransaction::exists(self, ptr, store_read_lock).boxed() - } - fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - store_read_lock: Option>, - ) -> BoxFuture<'b, Result> { - SubTransaction::get_value(self, ptr, store_read_lock).boxed() - } - fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b Value, - ) -> BoxFuture<'b, Result<(), Error>> { - SubTransaction::put_value(self, ptr, value).boxed() - } - fn store(&self) -> Arc> { - self.parent.store() - } - fn subscribe(&self) -> Receiver> { - self.parent.subscribe() - } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { - let (locker, mut locks) = self.parent.locker_and_locks(); - locks.push(&mut self.locks); - (locker, locks) - } - fn apply(&mut self, patch: DiffPatch) { - self.updates.append(patch) - } - fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, ()> { - SubTransaction::lock(self, ptr, lock).boxed() - } - fn get< - 'b, - T: for<'de> Deserialize<'de> + 'b, - S: AsRef + Clone + Send + Sync + 'b, - V: SegList + Clone + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - lock: LockType, - ) -> BoxFuture<'b, Result> { - SubTransaction::get(self, ptr, lock).boxed() - } - fn put< - 'b, - T: Serialize + Send + Sync + 'b, - S: AsRef + Send + Sync + 'b, - V: SegList + Send + Sync + 'b, - >( - &'b mut self, - ptr: &'b JsonPointer, - value: &'b T, - ) -> BoxFuture<'b, Result<(), Error>> { - SubTransaction::put(self, ptr, value).boxed() - } }