diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index 120f69a..dac0853 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -7,10 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard}; -use crate::{ - locker::{LockType, LockerGuard}, - Locker, PatchDb, Revision, Store, Transaction, -}; +use crate::{locker::LockerGuard, Locker, PatchDb, Revision, Store, Transaction}; use crate::{patch::DiffPatch, Error}; #[async_trait] @@ -19,7 +16,7 @@ pub trait DbHandle: Send + Sync { fn rebase(&mut self) -> Result<(), Error>; fn store(&self) -> Arc>; fn subscribe(&self) -> Receiver>; - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>); async fn exists + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, @@ -41,12 +38,7 @@ pub trait DbHandle: Send + Sync { value: &Value, ) -> Result>, Error>; async fn apply(&mut self, patch: DiffPatch) -> Result>, Error>; - async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - deep: bool, - ) -> (); + async fn lock(&mut self, ptr: &JsonPointer) -> (); async fn get< T: for<'de> Deserialize<'de>, S: AsRef + Send + Sync, @@ -90,7 +82,7 @@ impl DbHandle for &mut Handle { fn subscribe(&self) -> Receiver> { (**self).subscribe() } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>) { (*self).locker_and_locks() } async fn exists + Send + Sync, V: SegList + Send + Sync>( @@ -124,13 +116,8 @@ impl DbHandle for &mut Handle { async fn apply(&mut self, patch: DiffPatch) -> Result>, Error> { (*self).apply(patch).await } - async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - deep: bool, - ) { - (*self).lock(ptr, lock, deep).await + async fn lock(&mut self, ptr: &JsonPointer) { + (*self).lock(ptr).await } async fn get< T: for<'de> Deserialize<'de>, @@ -157,7 +144,7 @@ impl DbHandle for &mut Handle { pub struct PatchDbHandle { pub(crate) db: PatchDb, - pub(crate) locks: Vec<(JsonPointer, LockerGuard)>, + pub(crate) locks: Vec<(JsonPointer, Option)>, } #[async_trait] @@ -179,7 +166,7 @@ impl DbHandle for PatchDbHandle { fn subscribe(&self) -> Receiver> { self.db.subscribe() } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>) { (&self.db.locker, vec![self.locks.as_mut_slice()]) } async fn exists + Send + Sync, V: SegList + Send + Sync>( @@ -225,27 +212,8 @@ impl DbHandle for PatchDbHandle { async fn apply(&mut self, patch: DiffPatch) -> Result>, Error> { self.db.apply(patch, None, None).await } - async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - deep: bool, - ) { - match lock { - LockType::Read => { - self.db - .locker - .add_read_lock(ptr, &mut self.locks, &mut [], deep) - .await; - } - LockType::Write => { - self.db - .locker - .add_write_lock(ptr, &mut self.locks, &mut [], deep) - .await; - } - LockType::None => (), - } + async fn lock(&mut self, ptr: &JsonPointer) { + self.db.locker.add_lock(ptr, &mut self.locks, &mut []).await; } async fn get< T: for<'de> Deserialize<'de>, diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index 960a444..bb04ece 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -20,7 +20,7 @@ mod test; pub use handle::{DbHandle, PatchDbHandle}; pub use json_patch; pub use json_ptr; -pub use locker::{LockType, Locker}; +pub use locker::Locker; pub use model::{ BoxModel, HasModel, Map, MapModel, Model, ModelData, ModelDataMut, OptionModel, VecModel, }; diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index f3144e1..b156656 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -1,250 +1,162 @@ +use std::borrow::Cow; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Weak}; +use std::task::Poll; -use futures::{future::BoxFuture, FutureExt}; +use futures::future::BoxFuture; +use futures::FutureExt; use json_ptr::{JsonPointer, SegList}; -use qutex::{QrwLock, ReadGuard, WriteGuard}; +use qutex::{Guard, Qutex}; +use tokio::runtime::Handle; use tokio::sync::Mutex; +use tokio::sync::Notify; -#[derive(Debug, Clone, Copy)] -pub enum LockType { - None, - Read, - Write, -} +#[derive(Debug)] +pub struct LockerGuard(Arc, Guard<()>, Arc); -pub enum LockerGuard { - Empty, - Read(LockerReadGuard), - Write(LockerWriteGuard), -} -impl LockerGuard { - pub fn take(&mut self) -> Self { - std::mem::replace(self, LockerGuard::Empty) - } -} - -#[derive(Debug, Clone)] -pub struct LockerReadGuard(Arc>>>>); -impl LockerReadGuard { - async fn upgrade(&self) -> Option { - let guard = self.0.try_lock().unwrap().take(); - if let Some(g) = guard { - Some(LockerWriteGuard( - Some(ReadGuard::upgrade(g).await.unwrap()), - Some(self.clone()), - )) - } else { - None - } - } -} -impl From>> for LockerReadGuard { - fn from(guard: ReadGuard>) -> Self { - LockerReadGuard(Arc::new(Mutex::new(Some(guard)))) - } -} - -pub struct LockerWriteGuard( - Option>>, - Option, -); -impl From>> for LockerWriteGuard { - fn from(guard: WriteGuard>) -> Self { - LockerWriteGuard(Some(guard), None) - } -} -impl Drop for LockerWriteGuard { +#[derive(Debug)] +struct NotifyGuard(Arc); +impl Drop for NotifyGuard { fn drop(&mut self) { - if let (Some(write), Some(read)) = (self.0.take(), self.1.take()) { - *read.0.try_lock().unwrap() = Some(WriteGuard::downgrade(write)); - } + self.0.notify_one(); } } -#[derive(Clone, Debug)] -pub struct Locker(QrwLock>); +#[derive(Debug)] +pub struct Locker(Arc); +#[derive(Debug)] +struct LockerInner { + map: Mutex>>, + self_lock: Qutex<()>, + children_lock: Arc, + guard: Mutex>, + parent: Option<(Arc, String, Arc)>, +} +impl Drop for LockerInner { + fn drop(&mut self) { + if let Some((_, idx, parent)) = self.parent.take() { + Handle::current().block_on(parent.map.lock()).remove(&idx); + } + } +} impl Locker { pub fn new() -> Self { - Locker(QrwLock::new(HashMap::new())) + Locker(Arc::new(LockerInner { + map: Mutex::new(HashMap::new()), + self_lock: Qutex::new(()), + children_lock: { + let notify = Arc::new(Notify::new()); + notify.notify_one(); + notify + }, + guard: Mutex::new(Weak::new()), + parent: None, + })) } - fn lock_root_read<'a>(guard: &'a ReadGuard>) -> BoxFuture<'a, ()> { - async move { - for (_, v) in &**guard { - let g = v.0.clone().read().await.unwrap(); - Self::lock_root_read(&g).await; - } + async fn notify_guard(&self) -> Arc { + let mut lock = self.0.guard.lock().await; + if let Some(n) = lock.upgrade() { + Arc::new(NotifyGuard(n.0.clone())) + } else { + let res = Arc::new(NotifyGuard(self.0.children_lock.clone())); + *lock = Arc::downgrade(&res); + res } - .boxed() } - pub async fn lock_read, V: SegList>( + async fn child<'a, S: Into>>(&self, name: S) -> Locker { + let name: Cow<'a, str> = name.into(); + let mut lock = self.0.map.lock().await; + if let Some(child) = lock.get(name.as_ref()).and_then(|w| w.upgrade()) { + Locker(child) + } else { + let name = name.into_owned(); + let res = Arc::new(LockerInner { + map: Mutex::new(HashMap::new()), + self_lock: Qutex::new(()), + children_lock: { + let notify = Arc::new(Notify::new()); + notify.notify_one(); + notify + }, + guard: Mutex::new(Weak::new()), + parent: Some((self.notify_guard().await, name.clone(), self.0.clone())), + }); + lock.insert(name, Arc::downgrade(&res)); + Locker(res) + } + } + /// await once: in the queue, await twice: all children dropped + async fn wait_for_children<'a>(&'a self) -> BoxFuture<'a, ()> { + let children = self.0.guard.lock().await; + let mut fut = if children.strong_count() == 0 { + futures::future::ready(()).boxed() + } else { + self.0.children_lock.notified().boxed() + }; + if matches!(futures::poll!(&mut fut), Poll::Ready(_)) { + return futures::future::ready(()).boxed(); + } + drop(children); + fut + } + async fn acquire_and_trade(self, guards: Vec) -> LockerGuard { + let children_dropped = self.wait_for_children().await; + guards.into_iter().for_each(drop); + children_dropped.await; + let guard = self.0.self_lock.clone().lock().await.unwrap(); + LockerGuard(self.notify_guard().await, guard, self.0.clone()) + } + pub async fn lock, V: SegList>( &self, ptr: &JsonPointer, - deep: bool, - ) -> ReadGuard> { - #[cfg(feature = "log")] - log::debug!("Locking {} for READ: {{ deep: {} }}", ptr, deep); - let mut lock = Some(self.0.clone().read().await.unwrap()); + guards: Vec, + ) -> LockerGuard { + let mut locker = Locker(self.0.clone()); for seg in ptr.iter() { - let new_lock = if let Some(locker) = lock.as_ref().unwrap().get(seg) { - locker.0.clone().read().await.unwrap() - } else { - let mut writer = ReadGuard::upgrade(lock.take().unwrap()).await.unwrap(); - writer.insert(seg.to_owned(), Locker::new()); - let reader = WriteGuard::downgrade(writer); - reader.get(seg).unwrap().0.clone().read().await.unwrap() - }; - lock = Some(new_lock); + locker = locker.child(seg).await; } - let res = lock.unwrap(); - if deep { - Self::lock_root_read(&res); + let res = locker.acquire_and_trade(guards).await; + let mut guards = Vec::with_capacity(ptr.len()); + let mut cur = res.2.parent.as_ref().map(|(_, _, p)| p); + while let Some(parent) = cur { + guards.push(parent.self_lock.clone().lock().await.unwrap()); + cur = parent.parent.as_ref().map(|(_, _, p)| p); } - #[cfg(feature = "log")] - log::debug!("Locked {} for READ: {{ deep: {} }}", ptr, deep); res } - pub(crate) async fn add_read_lock + Clone, V: SegList + Clone>( + /// TODO: DRAGONS!!! + /// Acquiring a lock to a node above something you already held will do so at the transaction level of the lock you already held! + /// This means even though you dropped the sub tx in which you acquired the higher level lock, + /// the higher lock could still be held by the parent tx which originally held the lower lock. + pub async fn add_lock( &self, - ptr: &JsonPointer, - locks: &mut Vec<(JsonPointer, LockerGuard)>, - extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], - deep: bool, + ptr: &JsonPointer, + locks: &mut Vec<(JsonPointer, Option)>, // tx locks + extra_locks: &mut [&mut [(JsonPointer, Option)]], // tx parent locks ) { - for lock in extra_locks - .iter() - .flat_map(|a| a.iter()) - .chain(locks.iter()) - { - if ptr.starts_with(&lock.0) { - return; - } - } - locks.push(( - JsonPointer::to_owned(ptr.clone()), - LockerGuard::Read(self.lock_read(ptr, deep).await.into()), - )); - } - fn lock_root_write<'a>(guard: &'a WriteGuard>) -> BoxFuture<'a, ()> { - async move { - for (_, v) in &**guard { - let g = v.0.clone().write().await.unwrap(); - Self::lock_root_write(&g).await; - } - } - .boxed() - } - pub async fn lock_write, V: SegList>( - &self, - ptr: &JsonPointer, - deep: bool, - ) -> WriteGuard> { - #[cfg(feature = "log")] - log::debug!("Locking {} for WRITE: {{ deep: {} }}", ptr, deep); - let mut lock = self.0.clone().write().await.unwrap(); - for seg in ptr.iter() { - let new_lock = if let Some(locker) = lock.get(seg) { - locker.0.clone().write().await.unwrap() - } else { - lock.insert(seg.to_owned(), Locker::new()); - lock.get(seg).unwrap().0.clone().write().await.unwrap() - }; - lock = new_lock; - } - let res = lock; - if deep { - Self::lock_root_write(&res); - } - #[cfg(feature = "log")] - log::debug!("Locked {} for WRITE: {{ deep: {} }}", ptr, deep); - res - } - pub(crate) async fn add_write_lock + Clone, V: SegList + Clone>( - &self, - ptr: &JsonPointer, - locks: &mut Vec<(JsonPointer, LockerGuard)>, // tx locks - extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], // tx parent locks - deep: bool, - ) { - let mut final_lock = None; + let mut lock_dest = None; + let mut guards = Vec::new(); for lock in extra_locks .iter_mut() .flat_map(|a| a.iter_mut()) .chain(locks.iter_mut()) { - enum Choice { - Return, - Continue, - Break, + if ptr.starts_with(&lock.0) { + return; } - let choice: Choice; - if let Some(remainder) = ptr.strip_prefix(&lock.0) { - let guard = lock.1.take(); - lock.1 = match guard { - LockerGuard::Read(LockerReadGuard(guard)) if !remainder.is_empty() => { - // read guard already exists at higher level - let mut lock = guard.lock().await; - if let Some(l) = lock.take() { - let mut orig_lock = None; - let mut lock = ReadGuard::upgrade(l).await.unwrap(); - for seg in remainder.iter() { - let new_lock = if let Some(locker) = lock.get(seg) { - locker.0.clone().write().await.unwrap() - } else { - lock.insert(seg.to_owned(), Locker::new()); - lock.get(seg).unwrap().0.clone().write().await.unwrap() - }; - if orig_lock.is_none() { - orig_lock = Some(lock); - } - lock = new_lock; - } - final_lock = Some(LockerGuard::Write(lock.into())); - choice = Choice::Break; - LockerGuard::Read(WriteGuard::downgrade(orig_lock.unwrap()).into()) - } else { - drop(lock); - choice = Choice::Return; - LockerGuard::Read(LockerReadGuard(guard)) - } - } - LockerGuard::Read(l) => { - // read exists, convert to write - if let Some(upgraded) = l.upgrade().await { - final_lock = Some(LockerGuard::Write(upgraded)); - choice = Choice::Break; - } else { - choice = Choice::Continue; - } - LockerGuard::Read(l) - } - LockerGuard::Write(l) => { - choice = Choice::Return; - LockerGuard::Write(l) - } // leave it alone, already sufficiently locked - LockerGuard::Empty => { - unreachable!("LockerGuard found empty"); - } - }; - match choice { - Choice::Return => return, - Choice::Break => break, - Choice::Continue => continue, + if lock.0.starts_with(&ptr) { + if let Some(guard) = lock.1.take() { + guards.push(guard); + lock_dest = Some(lock); } } } - locks.push(( - JsonPointer::to_owned(ptr.clone()), - if let Some(lock) = final_lock { - lock - } else { - LockerGuard::Write(self.lock_write(ptr, deep).await.into()) - }, - )); - } -} -impl Default for Locker { - fn default() -> Self { - Locker::new() + let guard = self.lock(ptr, guards).await; + if let Some(lock) = lock_dest { + lock.0 = ptr.clone(); + lock.1 = Some(guard); + } else { + locks.push((ptr.clone(), Some(guard))); + } } } diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 0050ca5..0fc76aa 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -10,7 +10,6 @@ use json_ptr::JsonPointer; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::locker::LockType; use crate::{DbHandle, DiffPatch, Error, Revision}; #[derive(Debug)] @@ -65,19 +64,19 @@ impl Model where T: Serialize + for<'de> Deserialize<'de>, { - pub async fn lock(&self, db: &mut Db, lock: LockType) { - db.lock(&self.ptr, lock, true).await + pub async fn lock(&self, db: &mut Db) { + db.lock(&self.ptr).await } pub async fn get(&self, db: &mut Db, lock: bool) -> Result, Error> { if lock { - self.lock(db, LockType::Read).await; + self.lock(db).await; } Ok(ModelData(db.get(&self.ptr).await?)) } pub async fn get_mut(&self, db: &mut Db) -> Result, Error> { - self.lock(db, LockType::Write).await; + self.lock(db).await; let original = db.get_value(&self.ptr, None).await?; let current = serde_json::from_value(original.clone())?; Ok(ModelDataMut { @@ -105,7 +104,7 @@ where db: &mut Db, value: &T, ) -> Result>, Error> { - self.lock(db, LockType::Write).await; + self.lock(db).await; db.put(&self.ptr, value).await } } @@ -209,8 +208,8 @@ impl Deserialize<'de>> HasModel for Box { #[derive(Debug)] pub struct OptionModel Deserialize<'de>>(T::Model); impl Deserialize<'de>> OptionModel { - pub async fn lock(&self, db: &mut Db, lock: LockType) { - db.lock(self.0.as_ref(), lock, true).await + pub async fn lock(&self, db: &mut Db) { + db.lock(self.0.as_ref()).await } pub async fn get( @@ -219,7 +218,7 @@ impl Deserialize<'de>> OptionModel { lock: bool, ) -> Result>, Error> { if lock { - self.lock(db, LockType::Read).await; + self.lock(db).await; } Ok(ModelData(db.get(self.0.as_ref()).await?)) } @@ -228,7 +227,7 @@ impl Deserialize<'de>> OptionModel { &self, db: &mut Db, ) -> Result>, Error> { - self.lock(db, LockType::Write).await; + self.lock(db).await; let original = db.get_value(self.0.as_ref(), None).await?; let current = serde_json::from_value(original.clone())?; Ok(ModelDataMut { @@ -240,7 +239,7 @@ impl Deserialize<'de>> OptionModel { pub async fn exists(&self, db: &mut Db, lock: bool) -> Result { if lock { - db.lock(self.0.as_ref(), LockType::Read, false).await; + db.lock(self.0.as_ref()).await; } Ok(db.exists(&self.as_ref(), None).await?) } @@ -284,7 +283,7 @@ impl Deserialize<'de>> OptionModel { } pub async fn delete(&self, db: &mut Db) -> Result>, Error> { - db.lock(self.as_ref(), LockType::Write, true).await; + db.lock(self.as_ref()).await; db.put(self.as_ref(), &Value::Null).await } } @@ -297,7 +296,7 @@ where db: &mut Db, value: &T, ) -> Result>, Error> { - db.lock(self.as_ref(), LockType::Write, true).await; + db.lock(self.as_ref()).await; db.put(self.as_ref(), value).await } } @@ -461,7 +460,7 @@ where lock: bool, ) -> Result, Error> { if lock { - db.lock(self.as_ref(), LockType::Read, false).await; + db.lock(self.as_ref()).await; } let set = db.keys(self.as_ref(), None).await?; Ok(set @@ -470,7 +469,7 @@ where .collect::>()?) } pub async fn remove(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> { - db.lock(self.as_ref(), LockType::Write, false).await; + db.lock(self.as_ref()).await; db.apply(DiffPatch(Patch(vec![PatchOperation::Remove( RemoveOperation { path: self.as_ref().clone().join_end(key.as_ref()), diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 594c7d8..b62f8e7 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -209,7 +209,7 @@ impl Store { pub struct PatchDb { pub(crate) store: Arc>, subscriber: Arc>>, - pub(crate) locker: Locker, + pub(crate) locker: Arc, } impl PatchDb { pub async fn open>(path: P) -> Result { @@ -217,7 +217,7 @@ impl PatchDb { Ok(PatchDb { store: Arc::new(RwLock::new(Store::open(path).await?)), - locker: Locker::new(), + locker: Arc::new(Locker::new()), subscriber: Arc::new(subscriber), }) } diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index c4362d3..a448691 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -12,7 +12,7 @@ use tokio::sync::{RwLock, RwLockReadGuard}; use crate::store::Store; use crate::Error; use crate::{ - locker::{LockType, Locker, LockerGuard}, + locker::{Locker, LockerGuard}, DbHandle, }; use crate::{ @@ -22,7 +22,7 @@ use crate::{ pub struct Transaction { pub(crate) parent: Parent, - pub(crate) locks: Vec<(JsonPointer, LockerGuard)>, + pub(crate) locks: Vec<(JsonPointer, Option)>, pub(crate) updates: DiffPatch, pub(crate) sub: Receiver>, } @@ -94,7 +94,7 @@ impl DbHandle for Transaction { fn subscribe(&self) -> Receiver> { self.parent.subscribe() } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { + fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>) { let (locker, mut locks) = self.parent.locker_and_locks(); locks.push(&mut self.locks); (locker, locks) @@ -162,27 +162,9 @@ impl DbHandle for Transaction { self.updates.append(patch); Ok(None) } - async fn lock + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( - &mut self, - ptr: &JsonPointer, - lock: LockType, - deep: bool, - ) { - match lock { - LockType::None => (), - LockType::Read => { - let (locker, mut locks) = self.parent.locker_and_locks(); - locker - .add_read_lock(ptr, &mut self.locks, &mut locks, deep) - .await - } - LockType::Write => { - let (locker, mut locks) = self.parent.locker_and_locks(); - locker - .add_write_lock(ptr, &mut self.locks, &mut locks, deep) - .await - } - } + async fn lock(&mut self, ptr: &JsonPointer) { + let (locker, mut locks) = self.parent.locker_and_locks(); + locker.add_lock(ptr, &mut self.locks, &mut locks).await } async fn get< T: for<'de> Deserialize<'de>,