diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index dac0853..4405bf5 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -7,16 +7,17 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard}; -use crate::{locker::LockerGuard, Locker, PatchDb, Revision, Store, Transaction}; +use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction}; use crate::{patch::DiffPatch, Error}; #[async_trait] pub trait DbHandle: Send + Sync { async fn begin<'a>(&'a mut self) -> Result, Error>; + fn id(&self) -> usize; fn rebase(&mut self) -> Result<(), Error>; fn store(&self) -> Arc>; fn subscribe(&self) -> Receiver>; - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>); + fn locker(&self) -> &Locker; async fn exists + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, @@ -38,7 +39,7 @@ pub trait DbHandle: Send + Sync { value: &Value, ) -> Result>, Error>; async fn apply(&mut self, patch: DiffPatch) -> Result>, Error>; - async fn lock(&mut self, ptr: &JsonPointer) -> (); + async fn lock(&mut self, ptr: JsonPointer, write: bool) -> (); async fn get< T: for<'de> Deserialize<'de>, S: AsRef + Send + Sync, @@ -67,14 +68,18 @@ impl DbHandle for &mut Handle { .. } = (*self).begin().await?; Ok(Transaction { + id: self.id(), parent: self, locks, updates, sub, }) } + fn id(&self) -> usize { + (**self).id() + } fn rebase(&mut self) -> Result<(), Error> { - (*self).rebase() + (**self).rebase() } fn store(&self) -> Arc> { (**self).store() @@ -82,8 +87,8 @@ impl DbHandle for &mut Handle { fn subscribe(&self) -> Receiver> { (**self).subscribe() } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>) { - (*self).locker_and_locks() + fn locker(&self) -> &Locker { + (**self).locker() } async fn exists + Send + Sync, V: SegList + Send + Sync>( &mut self, @@ -116,8 +121,8 @@ impl DbHandle for &mut Handle { async fn apply(&mut self, patch: DiffPatch) -> Result>, Error> { (*self).apply(patch).await } - async fn lock(&mut self, ptr: &JsonPointer) { - (*self).lock(ptr).await + async fn lock(&mut self, ptr: JsonPointer, write: bool) { + (*self).lock(ptr, write).await } async fn get< T: for<'de> Deserialize<'de>, @@ -143,8 +148,9 @@ impl DbHandle for &mut Handle { } pub struct PatchDbHandle { + pub(crate) id: usize, pub(crate) db: PatchDb, - pub(crate) locks: Vec<(JsonPointer, Option)>, + pub(crate) locks: Vec, } #[async_trait] @@ -152,11 +158,15 @@ impl DbHandle for PatchDbHandle { async fn begin<'a>(&'a mut self) -> Result, Error> { Ok(Transaction { sub: self.subscribe(), + id: self.id(), parent: self, locks: Vec::new(), updates: DiffPatch::default(), }) } + fn id(&self) -> usize { + self.id + } fn rebase(&mut self) -> Result<(), Error> { Ok(()) } @@ -166,8 +176,8 @@ impl DbHandle for PatchDbHandle { fn subscribe(&self) -> Receiver> { self.db.subscribe() } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>) { - (&self.db.locker, vec![self.locks.as_mut_slice()]) + fn locker(&self) -> &Locker { + &self.db.locker } async fn exists + Send + Sync, V: SegList + Send + Sync>( &mut self, @@ -212,8 +222,9 @@ impl DbHandle for PatchDbHandle { async fn apply(&mut self, patch: DiffPatch) -> Result>, Error> { self.db.apply(patch, None, None).await } - async fn lock(&mut self, ptr: &JsonPointer) { - self.db.locker.add_lock(ptr, &mut self.locks, &mut []).await; + async fn lock(&mut self, ptr: JsonPointer, write: bool) { + self.locks + .push(self.db.locker.lock(self.id, ptr, write).await); } async fn get< T: for<'de> Deserialize<'de>, diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index b156656..86db8b8 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -1,162 +1,266 @@ -use std::borrow::Cow; -use std::collections::HashMap; -use std::sync::{Arc, Weak}; -use std::task::Poll; +use std::collections::{HashMap, HashSet, VecDeque}; -use futures::future::BoxFuture; -use futures::FutureExt; -use json_ptr::{JsonPointer, SegList}; -use qutex::{Guard, Qutex}; -use tokio::runtime::Handle; -use tokio::sync::Mutex; -use tokio::sync::Notify; +use json_ptr::JsonPointer; +use tokio::sync::{mpsc, oneshot}; -#[derive(Debug)] -pub struct LockerGuard(Arc, Guard<()>, Arc); - -#[derive(Debug)] -struct NotifyGuard(Arc); -impl Drop for NotifyGuard { - fn drop(&mut self) { - self.0.notify_one(); +#[derive(Debug, Default)] +struct LockInfo { + ptr: JsonPointer, + segments_handled: usize, + write: bool, + handle_id: usize, +} +impl LockInfo { + fn write(&self) -> bool { + self.write && self.segments_handled == self.ptr.len() + } + fn next_seg(&self) -> Option<&str> { + self.ptr.get_segment(self.segments_handled) } } #[derive(Debug)] -pub struct Locker(Arc); -#[derive(Debug)] -struct LockerInner { - map: Mutex>>, - self_lock: Qutex<()>, - children_lock: Arc, - guard: Mutex>, - parent: Option<(Arc, String, Arc)>, +struct Request { + lock_info: LockInfo, + completion: oneshot::Sender, } -impl Drop for LockerInner { - fn drop(&mut self) { - if let Some((_, idx, parent)) = self.parent.take() { - Handle::current().block_on(parent.map.lock()).remove(&idx); - } - } -} -impl Locker { - pub fn new() -> Self { - Locker(Arc::new(LockerInner { - map: Mutex::new(HashMap::new()), - self_lock: Qutex::new(()), - children_lock: { - let notify = Arc::new(Notify::new()); - notify.notify_one(); - notify - }, - guard: Mutex::new(Weak::new()), - parent: None, - })) - } - async fn notify_guard(&self) -> Arc { - let mut lock = self.0.guard.lock().await; - if let Some(n) = lock.upgrade() { - Arc::new(NotifyGuard(n.0.clone())) - } else { - let res = Arc::new(NotifyGuard(self.0.children_lock.clone())); - *lock = Arc::downgrade(&res); - res - } - } - async fn child<'a, S: Into>>(&self, name: S) -> Locker { - let name: Cow<'a, str> = name.into(); - let mut lock = self.0.map.lock().await; - if let Some(child) = lock.get(name.as_ref()).and_then(|w| w.upgrade()) { - Locker(child) - } else { - let name = name.into_owned(); - let res = Arc::new(LockerInner { - map: Mutex::new(HashMap::new()), - self_lock: Qutex::new(()), - children_lock: { - let notify = Arc::new(Notify::new()); - notify.notify_one(); - notify - }, - guard: Mutex::new(Weak::new()), - parent: Some((self.notify_guard().await, name.clone(), self.0.clone())), +impl Request { + fn process(mut self, returned_locks: &mut Vec>) -> Option { + if self.lock_info.ptr.len() == self.lock_info.segments_handled { + let (sender, receiver) = oneshot::channel(); + returned_locks.push(receiver); + self.lock_info.segments_handled = 0; + let _ = self.completion.send(Guard { + lock_info: self.lock_info, + sender: Some(sender), }); - lock.insert(name, Arc::downgrade(&res)); - Locker(res) - } - } - /// await once: in the queue, await twice: all children dropped - async fn wait_for_children<'a>(&'a self) -> BoxFuture<'a, ()> { - let children = self.0.guard.lock().await; - let mut fut = if children.strong_count() == 0 { - futures::future::ready(()).boxed() + None } else { - self.0.children_lock.notified().boxed() - }; - if matches!(futures::poll!(&mut fut), Poll::Ready(_)) { - return futures::future::ready(()).boxed(); + self.lock_info.segments_handled += 1; + Some(self) } - drop(children); - fut } - async fn acquire_and_trade(self, guards: Vec) -> LockerGuard { - let children_dropped = self.wait_for_children().await; - guards.into_iter().for_each(drop); - children_dropped.await; - let guard = self.0.self_lock.clone().lock().await.unwrap(); - LockerGuard(self.notify_guard().await, guard, self.0.clone()) +} + +#[derive(Debug)] +pub struct Guard { + lock_info: LockInfo, + sender: Option>, +} +impl Drop for Guard { + fn drop(&mut self) { + let _ = self + .sender + .take() + .unwrap() + .send(std::mem::take(&mut self.lock_info)); } - pub async fn lock, V: SegList>( - &self, - ptr: &JsonPointer, - guards: Vec, - ) -> LockerGuard { - let mut locker = Locker(self.0.clone()); - for seg in ptr.iter() { - locker = locker.child(seg).await; +} + +#[derive(Default)] +struct Node { + readers: Vec, + writers: HashSet, + reqs: VecDeque>, +} +impl Node { + fn write_free(&self, id: usize) -> bool { + self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id)) + } + fn read_free(&self, id: usize) -> bool { + self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0) + } + fn write_available(&self, id: usize) -> bool { + self.write_free(id) && self.read_free(id) && self.reqs.is_empty() + } + fn read_available(&self, id: usize) -> bool { + self.write_free(id) && self.reqs.is_empty() + } + fn handle_request( + &mut self, + req: Request, + returned_locks: &mut Vec>, + ) -> Option { + if req.lock_info.write() && self.write_available(req.lock_info.handle_id) { + self.writers.insert(req.lock_info.handle_id); + req.process(returned_locks) + } else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) { + self.readers.push(req.lock_info.handle_id); + req.process(returned_locks) + } else { + self.reqs.push_back(Some(req)); + None } - let res = locker.acquire_and_trade(guards).await; - let mut guards = Vec::with_capacity(ptr.len()); - let mut cur = res.2.parent.as_ref().map(|(_, _, p)| p); - while let Some(parent) = cur { - guards.push(parent.self_lock.clone().lock().await.unwrap()); - cur = parent.parent.as_ref().map(|(_, _, p)| p); - } - res } - /// TODO: DRAGONS!!! - /// Acquiring a lock to a node above something you already held will do so at the transaction level of the lock you already held! - /// This means even though you dropped the sub tx in which you acquired the higher level lock, - /// the higher lock could still be held by the parent tx which originally held the lower lock. - pub async fn add_lock( - &self, - ptr: &JsonPointer, - locks: &mut Vec<(JsonPointer, Option)>, // tx locks - extra_locks: &mut [&mut [(JsonPointer, Option)]], // tx parent locks - ) { - let mut lock_dest = None; - let mut guards = Vec::new(); - for lock in extra_locks - .iter_mut() - .flat_map(|a| a.iter_mut()) - .chain(locks.iter_mut()) + fn handle_release( + &mut self, + mut lock_info: LockInfo, + returned_locks: &mut Vec>, + ) -> (Option, Vec) { + if lock_info.write() { + self.writers.remove(&lock_info.handle_id); + } else if let Some(idx) = self + .readers + .iter() + .enumerate() + .find(|(_, id)| id == &&lock_info.handle_id) + .map(|(idx, _)| idx) { - if ptr.starts_with(&lock.0) { - return; - } - if lock.0.starts_with(&ptr) { - if let Some(guard) = lock.1.take() { - guards.push(guard); - lock_dest = Some(lock); + self.readers.swap_remove(idx); + } + let new_reqs = self.process_queue(returned_locks); + if lock_info.ptr.len() == lock_info.segments_handled { + (None, new_reqs) + } else { + lock_info.segments_handled += 1; + (Some(lock_info), new_reqs) + } + } + fn process_queue( + &mut self, + returned_locks: &mut Vec>, + ) -> Vec { + let mut ids_processed = HashSet::new(); + let mut only_matching = false; + let mut res = Vec::new(); + let mut tmp_reqs = std::mem::take(&mut self.reqs); + for req_opt in &mut tmp_reqs { + if let Some(req) = req_opt { + if !only_matching || ids_processed.contains(&req.lock_info.handle_id) { + if (req.lock_info.write() && self.write_available(req.lock_info.handle_id)) + || self.read_available(req.lock_info.handle_id) + { + ids_processed.insert(req.lock_info.handle_id); + if let Some(req) = req_opt.take() { + if let Some(req) = self.handle_request(req, returned_locks) { + res.push(req); + } + } + } else { + only_matching = true; + } } } } - let guard = self.lock(ptr, guards).await; - if let Some(lock) = lock_dest { - lock.0 = ptr.clone(); - lock.1 = Some(guard); - } else { - locks.push((ptr.clone(), Some(guard))); + self.reqs = tmp_reqs; + while matches!(self.reqs.get(0), Some(&None)) { + self.reqs.pop_front(); + } + res + } +} + +#[derive(Default)] +struct Trie { + node: Node, + children: HashMap, +} +impl Trie { + fn child_mut(&mut self, name: &str) -> &mut Self { + if !self.children.contains_key(name) { + self.children.insert(name.to_owned(), Trie::default()); + } + self.children.get_mut(name).unwrap() + } + fn handle_request( + &mut self, + req: Request, + returned_locks: &mut Vec>, + ) { + if let Some(req) = self.node.handle_request(req, returned_locks) { + if let Some(seg) = req.lock_info.next_seg() { + self.child_mut(seg).handle_request(req, returned_locks) + } + } + } + fn handle_release( + &mut self, + lock_info: LockInfo, + returned_locks: &mut Vec>, + ) { + let (release, reqs) = self.node.handle_release(lock_info, returned_locks); + for req in reqs { + self.handle_request(req, returned_locks); + } + if let Some(release) = release { + if let Some(seg) = release.next_seg() { + self.child_mut(seg).handle_release(release, returned_locks) + } + } + } +} + +pub struct Locker { + sender: mpsc::UnboundedSender, +} +impl Locker { + pub fn new() -> Self { + let (sender, receiver) = mpsc::unbounded_channel(); + tokio::spawn(async move { + let mut trie = Trie::default(); + let mut new_requests = RequestQueue { + closed: false, + recv: receiver, + }; + let mut returned_locks = Vec::new(); + while let Some(action) = get_action(&mut new_requests, &mut returned_locks).await { + match action { + Action::HandleRequest(req) => trie.handle_request(req, &mut returned_locks), + Action::HandleRelease(lock_info) => { + trie.handle_release(lock_info, &mut returned_locks) + } + } + } + }); + Locker { sender } + } + pub async fn lock(&self, handle_id: usize, ptr: JsonPointer, write: bool) -> Guard { + let (send, recv) = oneshot::channel(); + self.sender + .send(Request { + lock_info: LockInfo { + handle_id, + ptr, + write, + segments_handled: 0, + }, + completion: send, + }) + .unwrap(); + recv.await.unwrap() + } +} + +struct RequestQueue { + closed: bool, + recv: mpsc::UnboundedReceiver, +} + +enum Action { + HandleRequest(Request), + HandleRelease(LockInfo), +} + +async fn get_action( + new_requests: &mut RequestQueue, + returned_locks: &mut Vec>, +) -> Option { + loop { + if new_requests.closed && returned_locks.is_empty() { + return None; + } + tokio::select! { + a = new_requests.recv.recv() => { + if let Some(a) = a { + return Some(Action::HandleRequest(a)); + } else { + new_requests.closed = true; + } + } + (a, idx, _) = futures::future::select_all(returned_locks.iter_mut()) => { + returned_locks.swap_remove(idx); + return Some(Action::HandleRelease(a.unwrap())) + } } } } diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 0fc76aa..22d7c62 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -10,6 +10,7 @@ use json_ptr::JsonPointer; use serde::{Deserialize, Serialize}; use serde_json::Value; +use crate::locker::Guard; use crate::{DbHandle, DiffPatch, Error, Revision}; #[derive(Debug)] @@ -59,24 +60,25 @@ impl Deserialize<'de>> DerefMut for ModelDataMut { pub struct Model Deserialize<'de>> { ptr: JsonPointer, phantom: PhantomData, + lock: Option>, } impl Model where T: Serialize + for<'de> Deserialize<'de>, { - pub async fn lock(&self, db: &mut Db) { - db.lock(&self.ptr).await + pub async fn lock(&self, db: &mut Db, write: bool) { + db.lock(self.ptr.clone(), write).await } pub async fn get(&self, db: &mut Db, lock: bool) -> Result, Error> { if lock { - self.lock(db).await; + self.lock(db, false).await; } Ok(ModelData(db.get(&self.ptr).await?)) } pub async fn get_mut(&self, db: &mut Db) -> Result, Error> { - self.lock(db).await; + self.lock(db, true).await; let original = db.get_value(&self.ptr, None).await?; let current = serde_json::from_value(original.clone())?; Ok(ModelDataMut { @@ -92,6 +94,7 @@ where Model { ptr, phantom: PhantomData, + lock: self.lock, } } } @@ -104,7 +107,7 @@ where db: &mut Db, value: &T, ) -> Result>, Error> { - self.lock(db).await; + self.lock(db, true).await; db.put(&self.ptr, value).await } } @@ -116,6 +119,7 @@ where Self { ptr, phantom: PhantomData, + lock: None, } } } @@ -143,6 +147,7 @@ where Model { ptr: self.ptr.clone(), phantom: PhantomData, + lock: self.lock.clone(), } } } @@ -208,8 +213,8 @@ impl Deserialize<'de>> HasModel for Box { #[derive(Debug)] pub struct OptionModel Deserialize<'de>>(T::Model); impl Deserialize<'de>> OptionModel { - pub async fn lock(&self, db: &mut Db) { - db.lock(self.0.as_ref()).await + pub async fn lock(&self, db: &mut Db, write: bool) { + db.lock(self.0.as_ref().clone(), write).await } pub async fn get( @@ -218,7 +223,7 @@ impl Deserialize<'de>> OptionModel { lock: bool, ) -> Result>, Error> { if lock { - self.lock(db).await; + self.lock(db, false).await; } Ok(ModelData(db.get(self.0.as_ref()).await?)) } @@ -227,7 +232,7 @@ impl Deserialize<'de>> OptionModel { &self, db: &mut Db, ) -> Result>, Error> { - self.lock(db).await; + self.lock(db, true).await; let original = db.get_value(self.0.as_ref(), None).await?; let current = serde_json::from_value(original.clone())?; Ok(ModelDataMut { @@ -239,35 +244,48 @@ impl Deserialize<'de>> OptionModel { pub async fn exists(&self, db: &mut Db, lock: bool) -> Result { if lock { - db.lock(self.0.as_ref()).await; + db.lock(self.0.as_ref().clone(), false).await; } Ok(db.exists(&self.as_ref(), None).await?) } pub fn map< - F: FnOnce(T::Model) -> V, + F: FnOnce(T::Model) -> U::Model, U: Serialize + for<'de> Deserialize<'de> + HasModel, - V: ModelFor, >( self, f: F, ) -> OptionModel { - Into::::into(f(self.0)).into() + OptionModel(f(self.0)) } pub fn and_then< - F: FnOnce(T::Model) -> V, - U: Serialize + for<'de> Deserialize<'de>, - V: ModelFor>, + F: FnOnce(T::Model) -> U::Model, + U: Serialize + for<'de> Deserialize<'de> + HasModel, >( self, f: F, - ) -> V { - Into::::into(f(self.0)).into() + ) -> OptionModel { + OptionModel(f(self.0)) } - pub async fn check(self, db: &mut Db) -> Result, Error> { - Ok(if self.exists(db, true).await? { + pub async fn delete(&self, db: &mut Db) -> Result>, Error> { + db.lock(self.as_ref().clone(), true).await; + db.put(self.as_ref(), &Value::Null).await + } +} +impl OptionModel +where + T: HasModel + Serialize + for<'de> Deserialize<'de>, + T::Model: AsMut>, +{ + pub async fn check(mut self, db: &mut Db) -> Result, Error> { + let lock = db + .locker() + .lock(db.id(), self.0.as_ref().clone(), false) + .await; + self.0.as_mut().lock = Some(Arc::new(lock)); + Ok(if self.exists(db, false).await? { Some(self.0) } else { None @@ -281,11 +299,6 @@ impl Deserialize<'de>> OptionModel { Err(Error::NodeDoesNotExist(self.0.into())) } } - - pub async fn delete(&self, db: &mut Db) -> Result>, Error> { - db.lock(self.as_ref()).await; - db.put(self.as_ref(), &Value::Null).await - } } impl OptionModel where @@ -296,7 +309,7 @@ where db: &mut Db, value: &T, ) -> Result>, Error> { - db.lock(self.as_ref()).await; + db.lock(self.as_ref().clone(), true).await; db.put(self.as_ref(), value).await } } @@ -460,7 +473,7 @@ where lock: bool, ) -> Result, Error> { if lock { - db.lock(self.as_ref()).await; + db.lock(self.as_ref().clone(), false).await; } let set = db.keys(self.as_ref(), None).await?; Ok(set @@ -469,7 +482,7 @@ where .collect::>()?) } pub async fn remove(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> { - db.lock(self.as_ref()).await; + db.lock(self.as_ref().clone(), true).await; db.apply(DiffPatch(Patch(vec![PatchOperation::Remove( RemoveOperation { path: self.as_ref().clone().join_end(key.as_ref()), diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index b62f8e7..0d1bbf7 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::fs::OpenOptions; use std::io::Error as IOError; use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use fd_lock_rs::FdLock; @@ -210,6 +211,7 @@ pub struct PatchDb { pub(crate) store: Arc>, subscriber: Arc>>, pub(crate) locker: Arc, + handle_id: Arc, } impl PatchDb { pub async fn open>(path: P) -> Result { @@ -219,6 +221,7 @@ impl PatchDb { store: Arc::new(RwLock::new(Store::open(path).await?)), locker: Arc::new(Locker::new()), subscriber: Arc::new(subscriber), + handle_id: Arc::new(AtomicUsize::new(0)), }) } pub async fn dump(&self) -> Dump { @@ -282,6 +285,9 @@ impl PatchDb { } pub fn handle(&self) -> PatchDbHandle { PatchDbHandle { + id: self + .handle_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), db: self.clone(), locks: Vec::new(), } diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index a448691..80685af 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -12,7 +12,7 @@ use tokio::sync::{RwLock, RwLockReadGuard}; use crate::store::Store; use crate::Error; use crate::{ - locker::{Locker, LockerGuard}, + locker::{Guard, Locker}, DbHandle, }; use crate::{ @@ -21,8 +21,9 @@ use crate::{ }; pub struct Transaction { + pub(crate) id: usize, pub(crate) parent: Parent, - pub(crate) locks: Vec<(JsonPointer, Option)>, + pub(crate) locks: Vec, pub(crate) updates: DiffPatch, pub(crate) sub: Receiver>, } @@ -71,12 +72,16 @@ impl DbHandle for Transaction { let sub = self.parent.subscribe(); drop(store); Ok(Transaction { + id: self.id(), parent: self, locks: Vec::new(), updates: DiffPatch::default(), sub, }) } + fn id(&self) -> usize { + self.id + } fn rebase(&mut self) -> Result<(), Error> { self.parent.rebase()?; while let Some(rev) = match self.sub.try_recv() { @@ -94,10 +99,8 @@ impl DbHandle for Transaction { fn subscribe(&self) -> Receiver> { self.parent.subscribe() } - fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option)]>) { - let (locker, mut locks) = self.parent.locker_and_locks(); - locks.push(&mut self.locks); - (locker, locks) + fn locker(&self) -> &Locker { + self.parent.locker() } async fn exists + Send + Sync, V: SegList + Send + Sync>( &mut self, @@ -162,9 +165,9 @@ impl DbHandle for Transaction { self.updates.append(patch); Ok(None) } - async fn lock(&mut self, ptr: &JsonPointer) { - let (locker, mut locks) = self.parent.locker_and_locks(); - locker.add_lock(ptr, &mut self.locks, &mut locks).await + async fn lock(&mut self, ptr: JsonPointer, write: bool) { + self.locks + .push(self.parent.locker().lock(self.id, ptr, write).await) } async fn get< T: for<'de> Deserialize<'de>,