From a12a93b03524fb01b5d1a32f0a25188a0790ce64 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Mon, 15 Mar 2021 19:33:23 -0600 Subject: [PATCH] first pass at model generics --- patch-db/src/lib.rs | 343 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 329 insertions(+), 14 deletions(-) diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index af70c22..502ae4d 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fs::OpenOptions; +use std::future::Future; use std::io::Error as IOError; use std::path::Path; use std::sync::Arc; @@ -41,6 +42,8 @@ pub enum Error { FDLock(#[from] fd_lock_rs::Error), #[error("Database Cache Corrupted: {0}")] CacheCorrupted(Arc), + #[error("Mutex Error (should be unreachable): {0}")] + MutexError(#[from] tokio::sync::TryLockError), } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -225,11 +228,21 @@ impl PatchDb { pub trait Checkpoint { type SubTx: Checkpoint; fn get_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( - &'a self, + &'a mut self, ptr: &'a JsonPointer, - ) -> BoxFuture>; + ) -> BoxFuture<'a, Result>; + fn put_value<'a, S: AsRef + Send + Sync + 'a, V: SegList + Send + Sync + 'a>( + &'a mut self, + ptr: &'a JsonPointer, + value: &'a Value, + ) -> BoxFuture<'a, Result<(), Error>>; fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); fn apply(&mut self, patch: DiffPatch); + fn lock<'a, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &'a mut self, + ptr: &'a JsonPointer, + lock: LockType, + ) -> BoxFuture<'a, ()>; fn get< 'a, T: for<'de> Deserialize<'de> + 'a, @@ -259,7 +272,7 @@ pub struct Transaction { } impl Transaction { async fn get_value, V: SegList>( - &self, + &mut self, ptr: &JsonPointer, ) -> Result { let mut data: Value = ptr @@ -288,6 +301,17 @@ impl Transaction { } Ok(data) } + async fn put_value, V: SegList>( + &mut self, + ptr: &JsonPointer, + value: &Value, + ) -> Result<(), Error> { + let old = Transaction::get_value(self, ptr).await?; + let mut patch = json_patch::diff(&old, value); + patch.prepend(ptr); + (self.updates.0).0.extend(patch.0); + Ok(()) + } pub async fn lock + Clone, V: SegList + Clone>( &mut self, ptr: &JsonPointer, @@ -324,12 +348,7 @@ impl Transaction { ptr: &JsonPointer, value: &T, ) -> Result<(), Error> { - let old = Transaction::get_value(self, ptr).await?; - let new = serde_json::to_value(value)?; - let mut patch = json_patch::diff(&old, &new); - patch.prepend(ptr); - (self.updates.0).0.extend(patch.0); - Ok(()) + Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await } pub async fn commit(self) -> Result, Error> { self.db.apply(self.updates).await @@ -338,17 +357,31 @@ impl Transaction { impl<'a> Checkpoint for &'a mut Transaction { type SubTx = &'a mut SubTransaction; fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b self, + &'b mut self, ptr: &'b JsonPointer, ) -> BoxFuture<'b, Result> { Transaction::get_value(self, ptr).boxed() } + fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( + &'b mut self, + ptr: &'b JsonPointer, + value: &'b Value, + ) -> BoxFuture<'b, Result<(), Error>> { + Transaction::put_value(self, ptr, value).boxed() + } fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { (&self.db.locker, vec![&mut self.locks]) } fn apply(&mut self, patch: DiffPatch) { (self.updates.0).0.extend((patch.0).0) } + fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &'b mut self, + ptr: &'b JsonPointer, + lock: LockType, + ) -> BoxFuture<'b, ()> { + Transaction::lock(self, ptr, lock).boxed() + } fn get< 'b, T: for<'de> Deserialize<'de> + 'b, @@ -382,7 +415,7 @@ pub struct SubTransaction { } impl SubTransaction { async fn get_value + Send + Sync, V: SegList + Send + Sync>( - &self, + &mut self, ptr: &JsonPointer, ) -> Result { let mut data: Value = self.parent.get_value(ptr).await?; @@ -408,6 +441,17 @@ impl SubTransaction { } Ok(data) } + async fn put_value + Send + Sync, V: SegList + Send + Sync>( + &mut self, + ptr: &JsonPointer, + value: &Value, + ) -> Result<(), Error> { + let old = SubTransaction::get_value(self, ptr).await?; + let mut patch = json_patch::diff(&old, value); + patch.prepend(ptr); + (self.updates.0).0.extend(patch.0); + Ok(()) + } pub async fn lock + Clone, V: SegList + Clone>( &mut self, ptr: &JsonPointer, @@ -460,11 +504,18 @@ impl SubTransaction { impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction { type SubTx = &'a mut SubTransaction; fn get_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( - &'b self, + &'b mut self, ptr: &'b JsonPointer, ) -> BoxFuture<'b, Result> { SubTransaction::get_value(self, ptr).boxed() } + fn put_value<'b, S: AsRef + Send + Sync + 'b, V: SegList + Send + Sync + 'b>( + &'b mut self, + ptr: &'b JsonPointer, + value: &'b Value, + ) -> BoxFuture<'b, Result<(), Error>> { + SubTransaction::put_value(self, ptr, value).boxed() + } fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { let (locker, mut locks) = self.parent.locker_and_locks(); locks.push(&mut self.locks); @@ -473,6 +524,13 @@ impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction fn apply(&mut self, patch: DiffPatch) { (self.updates.0).0.extend((patch.0).0) } + fn lock<'b, S: AsRef + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( + &'b mut self, + ptr: &'b JsonPointer, + lock: LockType, + ) -> BoxFuture<'b, ()> { + SubTransaction::lock(self, ptr, lock).boxed() + } fn get< 'b, T: for<'de> Deserialize<'de> + 'b, @@ -499,7 +557,7 @@ impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction } } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub enum LockType { None, Read, @@ -638,7 +696,7 @@ impl Locker { lock.1 = match guard { LockerGuard::Read(LockerReadGuard(guard)) if !remainder.is_empty() => { // read guard already exists at higher level - let mut lock = guard.lock().await; + let mut lock = guard.try_lock().unwrap(); if let Some(l) = lock.take() { let mut orig_lock = None; let mut lock = ReadGuard::upgrade(l).await.unwrap(); @@ -703,3 +761,260 @@ impl Default for Locker { Locker::new() } } + +pub trait ModelData: Send + Sync { + type Inner; + fn apply< + 'a, + F: FnOnce(&mut Self::Inner) -> ResFut + Send + Sync + 'a, + ResFut: Future + Send + Sync + 'a, + Res: Send + Sync + 'a, + >( + &'a mut self, + f: F, + ) -> BoxFuture<'a, Res>; +} + +pub enum Never {} +impl ModelData for Never { + type Inner = Never; + fn apply< + 'a, + F: FnOnce(&mut Self::Inner) -> ResFut + 'a, + ResFut: Future + 'a, + Res: 'a, + >( + &'a mut self, + _: F, + ) -> BoxFuture<'a, Res> { + match *self {} + } +} + +pub enum GenericModelData Deserialize<'de>, Parent: ModelData> { + Uninitialized(Vec>), + Ref( + Arc &mut T + Send + Sync>, + Arc>, + ), + Owned(Box), +} +impl Deserialize<'de>, Parent: ModelData> GenericModelData { + pub fn is_initialized(&self) -> bool { + match self { + GenericModelData::Uninitialized(_) => false, + _ => true, + } + } + async fn apply< + F: FnOnce(&mut T) -> ResFut + Send + Sync, + ResFut: Future + Send + Sync, + Res: Send + Sync, + >( + &mut self, + f: F, + ) -> Res { + match self { + GenericModelData::Owned(data) => f(data).await, + GenericModelData::Ref(g, parent) => parent.lock().await.apply(|t| f(g(t))).await, + _ => panic!("uninitialized"), + } + } +} +impl Deserialize<'de> + Send + Sync, Parent: ModelData> ModelData + for GenericModelData +{ + type Inner = T; + fn apply< + 'a, + F: FnOnce(&mut Self::Inner) -> ResFut + Send + Sync + 'a, + ResFut: Future + Send + Sync + 'a, + Res: Send + Sync + 'a, + >( + &'a mut self, + f: F, + ) -> BoxFuture<'a, Res> { + self.apply(f).boxed() + } +} + +pub struct ChildHooks { + init_parent: Box FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send + Sync>, + save_data: Box< + dyn Fn() -> BoxFuture<'static, Result, serde_json::Error>> + + Send + + Sync, + >, +} +impl ChildHooks { + async fn init_parent(self, parent: &mut T) { + (self.init_parent)(parent).await + } + async fn save_data(&self) -> Result, serde_json::Error> { + (self.save_data)().await + } +} + +pub struct GenericModel Deserialize<'de>, Parent: ModelData> { + data: Arc>>, + ptr: JsonPointer, +} +impl< + 'a, + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, + Parent: ModelData + 'static, + > GenericModel +{ + pub fn new(ptr: JsonPointer) -> Self { + Self { + data: Arc::new(Mutex::new(GenericModelData::Uninitialized(Vec::new()))), + ptr, + } + } + + /// locks + async fn fetch(&mut self, tx: &mut Tx) -> Result<(), Error> { + let mut data = self.data.lock().await; + if let GenericModelData::Uninitialized(children) = &mut *data { + let mut a: Box = tx.get(&self.ptr, LockType::None).await?; + for child in std::mem::replace(children, Vec::new()) { + child.init_parent(&mut a).await; + } + *data = GenericModelData::Owned(a); + } + + Ok(()) + } + + pub async fn lock(&self, tx: &mut Tx, lock: LockType) { + tx.lock(&self.ptr, lock).await + } + + /// locks + pub async fn peek< + Tx: Checkpoint, + F: FnOnce(&T) -> ResFut + Send + Sync, + ResFut: Future + Send + Sync, + Res: Send + Sync, + >( + &mut self, + tx: &mut Tx, + f: F, + ) -> Result { + self.fetch(tx).await?; + Ok(self.data.lock().await.apply(|t| f(t)).await) + } + + /// locks + pub async fn apply< + Tx: Checkpoint, + F: FnOnce(&mut T) -> ResFut + Send + Sync, + ResFut: Future + Send + Sync, + Res: Send + Sync, + >( + &mut self, + tx: &mut Tx, + f: F, + ) -> Result { + self.fetch(tx).await?; + Ok(self.data.lock().await.apply(f).await) + } + + /// locks + pub async fn child( + &mut self, + path: &JsonPointer, + f: F, + ) -> GenericModel> + where + C: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, + F: Fn(&mut T) -> &mut C + Send + Sync + 'static, + S: AsRef, + V: SegList, + for<'v> &'v V: IntoIterator, + { + let arc_f = Arc::new(f); + let ptr = self.ptr.clone() + path; + let mut data_ref = self.data.lock().await; + let data = if let GenericModelData::Uninitialized(children) = &mut *data_ref { + let data = Arc::new(Mutex::new(GenericModelData::Uninitialized(Vec::new()))); + let init_parent_data = data.clone(); + let parent_data = self.data.clone(); + let save_data_data = data.clone(); + let save_data_ptr = ptr.clone(); + children.push(ChildHooks { + init_parent: Box::new(move |parent| { + async move { + let mut data_ref = init_parent_data.lock().await; + let self_data = std::mem::replace( + &mut *data_ref, + GenericModelData::Uninitialized(Vec::new()), + ); + match self_data { + GenericModelData::Owned(t) => *arc_f(parent) = *t, + GenericModelData::Uninitialized(children) => { + for child in children { + child.init_parent(arc_f(parent)).await; // TODO: can probably parallelize + } + } + _ => (), + } + *data_ref = GenericModelData::Ref(arc_f, parent_data); + } + .boxed() + }), + save_data: Box::new(move || { + let ptr = save_data_ptr.clone(); + let data = save_data_data.clone(); + async move { + let mut data_ref = data.lock().await; + if let GenericModelData::Uninitialized(children) = &mut *data_ref { + let mut res = Vec::new(); + for child in children { + res.append(&mut child.save_data().await?) + } + Ok(res) + } else { + Ok(vec![( + ptr, + data_ref + .apply(|t| futures::future::ready(serde_json::to_value(t))) + .await?, + )]) + } + } + .boxed() + }), + }); + data + } else { + Arc::new(Mutex::new(GenericModelData::Ref( + arc_f.clone(), + self.data.clone(), + ))) + }; + GenericModel { data, ptr } + } + + /// locks + pub async fn save(&mut self, tx: &mut Tx) -> Result<(), Error> { + let mut data_ref = self.data.lock().await; + for (ptr, value) in if let GenericModelData::Uninitialized(children) = &mut *data_ref { + let mut res = Vec::new(); + for child in children { + res.append(&mut child.save_data().await?) + } + res + } else { + vec![( + self.ptr.clone(), + data_ref + .apply(|t| futures::future::ready(serde_json::to_value(t))) + .await?, + )] + } { + tx.put_value(&ptr, &value).await?; + } + Ok(()) + } +}