redo locking api

This commit is contained in:
Aiden McClelland
2021-09-23 13:01:08 -06:00
parent 71109f1f90
commit 8a4afb8d10
6 changed files with 160 additions and 299 deletions

View File

@@ -7,10 +7,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard}; use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
use crate::{ use crate::{locker::LockerGuard, Locker, PatchDb, Revision, Store, Transaction};
locker::{LockType, LockerGuard},
Locker, PatchDb, Revision, Store, Transaction,
};
use crate::{patch::DiffPatch, Error}; use crate::{patch::DiffPatch, Error};
#[async_trait] #[async_trait]
@@ -19,7 +16,7 @@ pub trait DbHandle: Send + Sync {
fn rebase(&mut self) -> Result<(), Error>; fn rebase(&mut self) -> Result<(), Error>;
fn store(&self) -> Arc<RwLock<Store>>; fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>; fn subscribe(&self) -> Receiver<Arc<Revision>>;
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>); fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>);
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
@@ -41,12 +38,7 @@ pub trait DbHandle: Send + Sync {
value: &Value, value: &Value,
) -> Result<Option<Arc<Revision>>, Error>; ) -> Result<Option<Arc<Revision>>, Error>;
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>; async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>;
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( async fn lock(&mut self, ptr: &JsonPointer) -> ();
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
deep: bool,
) -> ();
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync, S: AsRef<str> + Send + Sync,
@@ -90,7 +82,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
fn subscribe(&self) -> Receiver<Arc<Revision>> { fn subscribe(&self) -> Receiver<Arc<Revision>> {
(**self).subscribe() (**self).subscribe()
} }
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
(*self).locker_and_locks() (*self).locker_and_locks()
} }
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
@@ -124,13 +116,8 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> { async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
(*self).apply(patch).await (*self).apply(patch).await
} }
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( async fn lock(&mut self, ptr: &JsonPointer) {
&mut self, (*self).lock(ptr).await
ptr: &JsonPointer<S, V>,
lock: LockType,
deep: bool,
) {
(*self).lock(ptr, lock, deep).await
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
@@ -157,7 +144,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
pub struct PatchDbHandle { pub struct PatchDbHandle {
pub(crate) db: PatchDb, pub(crate) db: PatchDb,
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>, pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
} }
#[async_trait] #[async_trait]
@@ -179,7 +166,7 @@ impl DbHandle for PatchDbHandle {
fn subscribe(&self) -> Receiver<Arc<Revision>> { fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.db.subscribe() self.db.subscribe()
} }
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
(&self.db.locker, vec![self.locks.as_mut_slice()]) (&self.db.locker, vec![self.locks.as_mut_slice()])
} }
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
@@ -225,27 +212,8 @@ impl DbHandle for PatchDbHandle {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> { async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
self.db.apply(patch, None, None).await self.db.apply(patch, None, None).await
} }
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( async fn lock(&mut self, ptr: &JsonPointer) {
&mut self, self.db.locker.add_lock(ptr, &mut self.locks, &mut []).await;
ptr: &JsonPointer<S, V>,
lock: LockType,
deep: bool,
) {
match lock {
LockType::Read => {
self.db
.locker
.add_read_lock(ptr, &mut self.locks, &mut [], deep)
.await;
}
LockType::Write => {
self.db
.locker
.add_write_lock(ptr, &mut self.locks, &mut [], deep)
.await;
}
LockType::None => (),
}
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,

View File

@@ -20,7 +20,7 @@ mod test;
pub use handle::{DbHandle, PatchDbHandle}; pub use handle::{DbHandle, PatchDbHandle};
pub use json_patch; pub use json_patch;
pub use json_ptr; pub use json_ptr;
pub use locker::{LockType, Locker}; pub use locker::Locker;
pub use model::{ pub use model::{
BoxModel, HasModel, Map, MapModel, Model, ModelData, ModelDataMut, OptionModel, VecModel, BoxModel, HasModel, Map, MapModel, Model, ModelData, ModelDataMut, OptionModel, VecModel,
}; };

View File

@@ -1,250 +1,162 @@
use std::borrow::Cow;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::{Arc, Weak};
use std::task::Poll;
use futures::{future::BoxFuture, FutureExt}; use futures::future::BoxFuture;
use futures::FutureExt;
use json_ptr::{JsonPointer, SegList}; use json_ptr::{JsonPointer, SegList};
use qutex::{QrwLock, ReadGuard, WriteGuard}; use qutex::{Guard, Qutex};
use tokio::runtime::Handle;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::sync::Notify;
#[derive(Debug, Clone, Copy)] #[derive(Debug)]
pub enum LockType { pub struct LockerGuard(Arc<NotifyGuard>, Guard<()>, Arc<LockerInner>);
None,
Read,
Write,
}
pub enum LockerGuard { #[derive(Debug)]
Empty, struct NotifyGuard(Arc<Notify>);
Read(LockerReadGuard), impl Drop for NotifyGuard {
Write(LockerWriteGuard),
}
impl LockerGuard {
pub fn take(&mut self) -> Self {
std::mem::replace(self, LockerGuard::Empty)
}
}
#[derive(Debug, Clone)]
pub struct LockerReadGuard(Arc<Mutex<Option<ReadGuard<HashMap<String, Locker>>>>>);
impl LockerReadGuard {
async fn upgrade(&self) -> Option<LockerWriteGuard> {
let guard = self.0.try_lock().unwrap().take();
if let Some(g) = guard {
Some(LockerWriteGuard(
Some(ReadGuard::upgrade(g).await.unwrap()),
Some(self.clone()),
))
} else {
None
}
}
}
impl From<ReadGuard<HashMap<String, Locker>>> for LockerReadGuard {
fn from(guard: ReadGuard<HashMap<String, Locker>>) -> Self {
LockerReadGuard(Arc::new(Mutex::new(Some(guard))))
}
}
pub struct LockerWriteGuard(
Option<WriteGuard<HashMap<String, Locker>>>,
Option<LockerReadGuard>,
);
impl From<WriteGuard<HashMap<String, Locker>>> for LockerWriteGuard {
fn from(guard: WriteGuard<HashMap<String, Locker>>) -> Self {
LockerWriteGuard(Some(guard), None)
}
}
impl Drop for LockerWriteGuard {
fn drop(&mut self) { fn drop(&mut self) {
if let (Some(write), Some(read)) = (self.0.take(), self.1.take()) { self.0.notify_one();
*read.0.try_lock().unwrap() = Some(WriteGuard::downgrade(write));
}
} }
} }
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct Locker(QrwLock<HashMap<String, Locker>>); pub struct Locker(Arc<LockerInner>);
#[derive(Debug)]
struct LockerInner {
map: Mutex<HashMap<String, Weak<LockerInner>>>,
self_lock: Qutex<()>,
children_lock: Arc<Notify>,
guard: Mutex<Weak<NotifyGuard>>,
parent: Option<(Arc<NotifyGuard>, String, Arc<LockerInner>)>,
}
impl Drop for LockerInner {
fn drop(&mut self) {
if let Some((_, idx, parent)) = self.parent.take() {
Handle::current().block_on(parent.map.lock()).remove(&idx);
}
}
}
impl Locker { impl Locker {
pub fn new() -> Self { pub fn new() -> Self {
Locker(QrwLock::new(HashMap::new())) Locker(Arc::new(LockerInner {
map: Mutex::new(HashMap::new()),
self_lock: Qutex::new(()),
children_lock: {
let notify = Arc::new(Notify::new());
notify.notify_one();
notify
},
guard: Mutex::new(Weak::new()),
parent: None,
}))
} }
fn lock_root_read<'a>(guard: &'a ReadGuard<HashMap<String, Locker>>) -> BoxFuture<'a, ()> { async fn notify_guard(&self) -> Arc<NotifyGuard> {
async move { let mut lock = self.0.guard.lock().await;
for (_, v) in &**guard { if let Some(n) = lock.upgrade() {
let g = v.0.clone().read().await.unwrap(); Arc::new(NotifyGuard(n.0.clone()))
Self::lock_root_read(&g).await; } else {
} let res = Arc::new(NotifyGuard(self.0.children_lock.clone()));
*lock = Arc::downgrade(&res);
res
} }
.boxed()
} }
pub async fn lock_read<S: AsRef<str>, V: SegList>( async fn child<'a, S: Into<Cow<'a, str>>>(&self, name: S) -> Locker {
let name: Cow<'a, str> = name.into();
let mut lock = self.0.map.lock().await;
if let Some(child) = lock.get(name.as_ref()).and_then(|w| w.upgrade()) {
Locker(child)
} else {
let name = name.into_owned();
let res = Arc::new(LockerInner {
map: Mutex::new(HashMap::new()),
self_lock: Qutex::new(()),
children_lock: {
let notify = Arc::new(Notify::new());
notify.notify_one();
notify
},
guard: Mutex::new(Weak::new()),
parent: Some((self.notify_guard().await, name.clone(), self.0.clone())),
});
lock.insert(name, Arc::downgrade(&res));
Locker(res)
}
}
/// await once: in the queue, await twice: all children dropped
async fn wait_for_children<'a>(&'a self) -> BoxFuture<'a, ()> {
let children = self.0.guard.lock().await;
let mut fut = if children.strong_count() == 0 {
futures::future::ready(()).boxed()
} else {
self.0.children_lock.notified().boxed()
};
if matches!(futures::poll!(&mut fut), Poll::Ready(_)) {
return futures::future::ready(()).boxed();
}
drop(children);
fut
}
async fn acquire_and_trade(self, guards: Vec<LockerGuard>) -> LockerGuard {
let children_dropped = self.wait_for_children().await;
guards.into_iter().for_each(drop);
children_dropped.await;
let guard = self.0.self_lock.clone().lock().await.unwrap();
LockerGuard(self.notify_guard().await, guard, self.0.clone())
}
pub async fn lock<S: AsRef<str>, V: SegList>(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
deep: bool, guards: Vec<LockerGuard>,
) -> ReadGuard<HashMap<String, Locker>> { ) -> LockerGuard {
#[cfg(feature = "log")] let mut locker = Locker(self.0.clone());
log::debug!("Locking {} for READ: {{ deep: {} }}", ptr, deep);
let mut lock = Some(self.0.clone().read().await.unwrap());
for seg in ptr.iter() { for seg in ptr.iter() {
let new_lock = if let Some(locker) = lock.as_ref().unwrap().get(seg) { locker = locker.child(seg).await;
locker.0.clone().read().await.unwrap()
} else {
let mut writer = ReadGuard::upgrade(lock.take().unwrap()).await.unwrap();
writer.insert(seg.to_owned(), Locker::new());
let reader = WriteGuard::downgrade(writer);
reader.get(seg).unwrap().0.clone().read().await.unwrap()
};
lock = Some(new_lock);
} }
let res = lock.unwrap(); let res = locker.acquire_and_trade(guards).await;
if deep { let mut guards = Vec::with_capacity(ptr.len());
Self::lock_root_read(&res); let mut cur = res.2.parent.as_ref().map(|(_, _, p)| p);
while let Some(parent) = cur {
guards.push(parent.self_lock.clone().lock().await.unwrap());
cur = parent.parent.as_ref().map(|(_, _, p)| p);
} }
#[cfg(feature = "log")]
log::debug!("Locked {} for READ: {{ deep: {} }}", ptr, deep);
res res
} }
pub(crate) async fn add_read_lock<S: AsRef<str> + Clone, V: SegList + Clone>( /// TODO: DRAGONS!!!
/// Acquiring a lock to a node above something you already held will do so at the transaction level of the lock you already held!
/// This means even though you dropped the sub tx in which you acquired the higher level lock,
/// the higher lock could still be held by the parent tx which originally held the lower lock.
pub async fn add_lock(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer,
locks: &mut Vec<(JsonPointer, LockerGuard)>, locks: &mut Vec<(JsonPointer, Option<LockerGuard>)>, // tx locks
extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], extra_locks: &mut [&mut [(JsonPointer, Option<LockerGuard>)]], // tx parent locks
deep: bool,
) { ) {
for lock in extra_locks let mut lock_dest = None;
.iter() let mut guards = Vec::new();
.flat_map(|a| a.iter())
.chain(locks.iter())
{
if ptr.starts_with(&lock.0) {
return;
}
}
locks.push((
JsonPointer::to_owned(ptr.clone()),
LockerGuard::Read(self.lock_read(ptr, deep).await.into()),
));
}
fn lock_root_write<'a>(guard: &'a WriteGuard<HashMap<String, Locker>>) -> BoxFuture<'a, ()> {
async move {
for (_, v) in &**guard {
let g = v.0.clone().write().await.unwrap();
Self::lock_root_write(&g).await;
}
}
.boxed()
}
pub async fn lock_write<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
deep: bool,
) -> WriteGuard<HashMap<String, Locker>> {
#[cfg(feature = "log")]
log::debug!("Locking {} for WRITE: {{ deep: {} }}", ptr, deep);
let mut lock = self.0.clone().write().await.unwrap();
for seg in ptr.iter() {
let new_lock = if let Some(locker) = lock.get(seg) {
locker.0.clone().write().await.unwrap()
} else {
lock.insert(seg.to_owned(), Locker::new());
lock.get(seg).unwrap().0.clone().write().await.unwrap()
};
lock = new_lock;
}
let res = lock;
if deep {
Self::lock_root_write(&res);
}
#[cfg(feature = "log")]
log::debug!("Locked {} for WRITE: {{ deep: {} }}", ptr, deep);
res
}
pub(crate) async fn add_write_lock<S: AsRef<str> + Clone, V: SegList + Clone>(
&self,
ptr: &JsonPointer<S, V>,
locks: &mut Vec<(JsonPointer, LockerGuard)>, // tx locks
extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], // tx parent locks
deep: bool,
) {
let mut final_lock = None;
for lock in extra_locks for lock in extra_locks
.iter_mut() .iter_mut()
.flat_map(|a| a.iter_mut()) .flat_map(|a| a.iter_mut())
.chain(locks.iter_mut()) .chain(locks.iter_mut())
{ {
enum Choice { if ptr.starts_with(&lock.0) {
Return, return;
Continue,
Break,
} }
let choice: Choice; if lock.0.starts_with(&ptr) {
if let Some(remainder) = ptr.strip_prefix(&lock.0) { if let Some(guard) = lock.1.take() {
let guard = lock.1.take(); guards.push(guard);
lock.1 = match guard { lock_dest = Some(lock);
LockerGuard::Read(LockerReadGuard(guard)) if !remainder.is_empty() => {
// read guard already exists at higher level
let mut lock = guard.lock().await;
if let Some(l) = lock.take() {
let mut orig_lock = None;
let mut lock = ReadGuard::upgrade(l).await.unwrap();
for seg in remainder.iter() {
let new_lock = if let Some(locker) = lock.get(seg) {
locker.0.clone().write().await.unwrap()
} else {
lock.insert(seg.to_owned(), Locker::new());
lock.get(seg).unwrap().0.clone().write().await.unwrap()
};
if orig_lock.is_none() {
orig_lock = Some(lock);
}
lock = new_lock;
}
final_lock = Some(LockerGuard::Write(lock.into()));
choice = Choice::Break;
LockerGuard::Read(WriteGuard::downgrade(orig_lock.unwrap()).into())
} else {
drop(lock);
choice = Choice::Return;
LockerGuard::Read(LockerReadGuard(guard))
}
}
LockerGuard::Read(l) => {
// read exists, convert to write
if let Some(upgraded) = l.upgrade().await {
final_lock = Some(LockerGuard::Write(upgraded));
choice = Choice::Break;
} else {
choice = Choice::Continue;
}
LockerGuard::Read(l)
}
LockerGuard::Write(l) => {
choice = Choice::Return;
LockerGuard::Write(l)
} // leave it alone, already sufficiently locked
LockerGuard::Empty => {
unreachable!("LockerGuard found empty");
}
};
match choice {
Choice::Return => return,
Choice::Break => break,
Choice::Continue => continue,
} }
} }
} }
locks.push(( let guard = self.lock(ptr, guards).await;
JsonPointer::to_owned(ptr.clone()), if let Some(lock) = lock_dest {
if let Some(lock) = final_lock { lock.0 = ptr.clone();
lock lock.1 = Some(guard);
} else { } else {
LockerGuard::Write(self.lock_write(ptr, deep).await.into()) locks.push((ptr.clone(), Some(guard)));
}, }
));
}
}
impl Default for Locker {
fn default() -> Self {
Locker::new()
} }
} }

View File

@@ -10,7 +10,6 @@ use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::locker::LockType;
use crate::{DbHandle, DiffPatch, Error, Revision}; use crate::{DbHandle, DiffPatch, Error, Revision};
#[derive(Debug)] #[derive(Debug)]
@@ -65,19 +64,19 @@ impl<T> Model<T>
where where
T: Serialize + for<'de> Deserialize<'de>, T: Serialize + for<'de> Deserialize<'de>,
{ {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) { pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
db.lock(&self.ptr, lock, true).await db.lock(&self.ptr).await
} }
pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> { pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> {
if lock { if lock {
self.lock(db, LockType::Read).await; self.lock(db).await;
} }
Ok(ModelData(db.get(&self.ptr).await?)) Ok(ModelData(db.get(&self.ptr).await?))
} }
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> { pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
self.lock(db, LockType::Write).await; self.lock(db).await;
let original = db.get_value(&self.ptr, None).await?; let original = db.get_value(&self.ptr, None).await?;
let current = serde_json::from_value(original.clone())?; let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut { Ok(ModelDataMut {
@@ -105,7 +104,7 @@ where
db: &mut Db, db: &mut Db,
value: &T, value: &T,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
self.lock(db, LockType::Write).await; self.lock(db).await;
db.put(&self.ptr, value).await db.put(&self.ptr, value).await
} }
} }
@@ -209,8 +208,8 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
#[derive(Debug)] #[derive(Debug)]
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model); pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> { impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) { pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
db.lock(self.0.as_ref(), lock, true).await db.lock(self.0.as_ref()).await
} }
pub async fn get<Db: DbHandle>( pub async fn get<Db: DbHandle>(
@@ -219,7 +218,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
lock: bool, lock: bool,
) -> Result<ModelData<Option<T>>, Error> { ) -> Result<ModelData<Option<T>>, Error> {
if lock { if lock {
self.lock(db, LockType::Read).await; self.lock(db).await;
} }
Ok(ModelData(db.get(self.0.as_ref()).await?)) Ok(ModelData(db.get(self.0.as_ref()).await?))
} }
@@ -228,7 +227,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
&self, &self,
db: &mut Db, db: &mut Db,
) -> Result<ModelDataMut<Option<T>>, Error> { ) -> Result<ModelDataMut<Option<T>>, Error> {
self.lock(db, LockType::Write).await; self.lock(db).await;
let original = db.get_value(self.0.as_ref(), None).await?; let original = db.get_value(self.0.as_ref(), None).await?;
let current = serde_json::from_value(original.clone())?; let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut { Ok(ModelDataMut {
@@ -240,7 +239,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> { pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> {
if lock { if lock {
db.lock(self.0.as_ref(), LockType::Read, false).await; db.lock(self.0.as_ref()).await;
} }
Ok(db.exists(&self.as_ref(), None).await?) Ok(db.exists(&self.as_ref(), None).await?)
} }
@@ -284,7 +283,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
} }
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> { pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref(), LockType::Write, true).await; db.lock(self.as_ref()).await;
db.put(self.as_ref(), &Value::Null).await db.put(self.as_ref(), &Value::Null).await
} }
} }
@@ -297,7 +296,7 @@ where
db: &mut Db, db: &mut Db,
value: &T, value: &T,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref(), LockType::Write, true).await; db.lock(self.as_ref()).await;
db.put(self.as_ref(), value).await db.put(self.as_ref(), value).await
} }
} }
@@ -461,7 +460,7 @@ where
lock: bool, lock: bool,
) -> Result<IndexSet<T::Key>, Error> { ) -> Result<IndexSet<T::Key>, Error> {
if lock { if lock {
db.lock(self.as_ref(), LockType::Read, false).await; db.lock(self.as_ref()).await;
} }
let set = db.keys(self.as_ref(), None).await?; let set = db.keys(self.as_ref(), None).await?;
Ok(set Ok(set
@@ -470,7 +469,7 @@ where
.collect::<Result<_, _>>()?) .collect::<Result<_, _>>()?)
} }
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> { pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
db.lock(self.as_ref(), LockType::Write, false).await; db.lock(self.as_ref()).await;
db.apply(DiffPatch(Patch(vec![PatchOperation::Remove( db.apply(DiffPatch(Patch(vec![PatchOperation::Remove(
RemoveOperation { RemoveOperation {
path: self.as_ref().clone().join_end(key.as_ref()), path: self.as_ref().clone().join_end(key.as_ref()),

View File

@@ -209,7 +209,7 @@ impl Store {
pub struct PatchDb { pub struct PatchDb {
pub(crate) store: Arc<RwLock<Store>>, pub(crate) store: Arc<RwLock<Store>>,
subscriber: Arc<Sender<Arc<Revision>>>, subscriber: Arc<Sender<Arc<Revision>>>,
pub(crate) locker: Locker, pub(crate) locker: Arc<Locker>,
} }
impl PatchDb { impl PatchDb {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> { pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
@@ -217,7 +217,7 @@ impl PatchDb {
Ok(PatchDb { Ok(PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)), store: Arc::new(RwLock::new(Store::open(path).await?)),
locker: Locker::new(), locker: Arc::new(Locker::new()),
subscriber: Arc::new(subscriber), subscriber: Arc::new(subscriber),
}) })
} }

View File

@@ -12,7 +12,7 @@ use tokio::sync::{RwLock, RwLockReadGuard};
use crate::store::Store; use crate::store::Store;
use crate::Error; use crate::Error;
use crate::{ use crate::{
locker::{LockType, Locker, LockerGuard}, locker::{Locker, LockerGuard},
DbHandle, DbHandle,
}; };
use crate::{ use crate::{
@@ -22,7 +22,7 @@ use crate::{
pub struct Transaction<Parent: DbHandle> { pub struct Transaction<Parent: DbHandle> {
pub(crate) parent: Parent, pub(crate) parent: Parent,
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>, pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
pub(crate) updates: DiffPatch, pub(crate) updates: DiffPatch,
pub(crate) sub: Receiver<Arc<Revision>>, pub(crate) sub: Receiver<Arc<Revision>>,
} }
@@ -94,7 +94,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
fn subscribe(&self) -> Receiver<Arc<Revision>> { fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.parent.subscribe() self.parent.subscribe()
} }
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) { fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
let (locker, mut locks) = self.parent.locker_and_locks(); let (locker, mut locks) = self.parent.locker_and_locks();
locks.push(&mut self.locks); locks.push(&mut self.locks);
(locker, locks) (locker, locks)
@@ -162,27 +162,9 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
self.updates.append(patch); self.updates.append(patch);
Ok(None) Ok(None)
} }
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( async fn lock(&mut self, ptr: &JsonPointer) {
&mut self, let (locker, mut locks) = self.parent.locker_and_locks();
ptr: &JsonPointer<S, V>, locker.add_lock(ptr, &mut self.locks, &mut locks).await
lock: LockType,
deep: bool,
) {
match lock {
LockType::None => (),
LockType::Read => {
let (locker, mut locks) = self.parent.locker_and_locks();
locker
.add_read_lock(ptr, &mut self.locks, &mut locks, deep)
.await
}
LockType::Write => {
let (locker, mut locks) = self.parent.locker_and_locks();
locker
.add_write_lock(ptr, &mut self.locks, &mut locks, deep)
.await
}
}
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,