Co-authored-by: Drew Ansbacher <drew.ansbacher@gmail.com>

This commit is contained in:
Matt Hill
2021-07-19 16:08:51 -06:00
4 changed files with 55 additions and 25 deletions

View File

@@ -14,7 +14,7 @@ use crate::{
use crate::{patch::DiffPatch, Error}; use crate::{patch::DiffPatch, Error};
#[async_trait] #[async_trait]
pub trait DbHandle: Sized + Send + Sync { pub trait DbHandle: Send + Sync {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>; async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
fn rebase(&mut self) -> Result<(), Error>; fn rebase(&mut self) -> Result<(), Error>;
fn store(&self) -> Arc<RwLock<Store>>; fn store(&self) -> Arc<RwLock<Store>>;
@@ -45,6 +45,7 @@ pub trait DbHandle: Sized + Send + Sync {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
lock: LockType, lock: LockType,
deep: bool,
) -> (); ) -> ();
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
@@ -65,7 +66,7 @@ pub trait DbHandle: Sized + Send + Sync {
) -> Result<(), Error>; ) -> Result<(), Error>;
} }
#[async_trait] #[async_trait]
impl<Handle: DbHandle + Send + Sync> DbHandle for &mut Handle { impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> { async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
let Transaction { let Transaction {
locks, locks,
@@ -127,8 +128,9 @@ impl<Handle: DbHandle + Send + Sync> DbHandle for &mut Handle {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
lock: LockType, lock: LockType,
deep: bool,
) { ) {
(*self).lock(ptr, lock).await (*self).lock(ptr, lock, deep).await
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
@@ -229,18 +231,19 @@ impl DbHandle for PatchDbHandle {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
lock: LockType, lock: LockType,
deep: bool,
) { ) {
match lock { match lock {
LockType::Read => { LockType::Read => {
self.db self.db
.locker .locker
.add_read_lock(ptr, &mut self.locks, &mut []) .add_read_lock(ptr, &mut self.locks, &mut [], deep)
.await; .await;
} }
LockType::Write => { LockType::Write => {
self.db self.db
.locker .locker
.add_write_lock(ptr, &mut self.locks, &mut []) .add_write_lock(ptr, &mut self.locks, &mut [], deep)
.await; .await;
} }
LockType::None => (), LockType::None => (),

View File

@@ -80,6 +80,7 @@ impl Locker {
pub async fn lock_read<S: AsRef<str>, V: SegList>( pub async fn lock_read<S: AsRef<str>, V: SegList>(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
deep: bool,
) -> ReadGuard<HashMap<String, Locker>> { ) -> ReadGuard<HashMap<String, Locker>> {
let mut lock = Some(self.0.clone().read().await.unwrap()); let mut lock = Some(self.0.clone().read().await.unwrap());
for seg in ptr.iter() { for seg in ptr.iter() {
@@ -94,7 +95,9 @@ impl Locker {
lock = Some(new_lock); lock = Some(new_lock);
} }
let res = lock.unwrap(); let res = lock.unwrap();
Self::lock_root_read(&res); if deep {
Self::lock_root_read(&res);
}
res res
} }
pub(crate) async fn add_read_lock<S: AsRef<str> + Clone, V: SegList + Clone>( pub(crate) async fn add_read_lock<S: AsRef<str> + Clone, V: SegList + Clone>(
@@ -102,6 +105,7 @@ impl Locker {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
locks: &mut Vec<(JsonPointer, LockerGuard)>, locks: &mut Vec<(JsonPointer, LockerGuard)>,
extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]],
deep: bool,
) { ) {
for lock in extra_locks for lock in extra_locks
.iter() .iter()
@@ -114,7 +118,7 @@ impl Locker {
} }
locks.push(( locks.push((
JsonPointer::to_owned(ptr.clone()), JsonPointer::to_owned(ptr.clone()),
LockerGuard::Read(self.lock_read(ptr).await.into()), LockerGuard::Read(self.lock_read(ptr, deep).await.into()),
)); ));
} }
fn lock_root_write<'a>(guard: &'a WriteGuard<HashMap<String, Locker>>) -> BoxFuture<'a, ()> { fn lock_root_write<'a>(guard: &'a WriteGuard<HashMap<String, Locker>>) -> BoxFuture<'a, ()> {
@@ -129,6 +133,7 @@ impl Locker {
pub async fn lock_write<S: AsRef<str>, V: SegList>( pub async fn lock_write<S: AsRef<str>, V: SegList>(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
deep: bool,
) -> WriteGuard<HashMap<String, Locker>> { ) -> WriteGuard<HashMap<String, Locker>> {
let mut lock = self.0.clone().write().await.unwrap(); let mut lock = self.0.clone().write().await.unwrap();
for seg in ptr.iter() { for seg in ptr.iter() {
@@ -141,7 +146,9 @@ impl Locker {
lock = new_lock; lock = new_lock;
} }
let res = lock; let res = lock;
Self::lock_root_write(&res); if deep {
Self::lock_root_write(&res);
}
res res
} }
pub(crate) async fn add_write_lock<S: AsRef<str> + Clone, V: SegList + Clone>( pub(crate) async fn add_write_lock<S: AsRef<str> + Clone, V: SegList + Clone>(
@@ -149,6 +156,7 @@ impl Locker {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
locks: &mut Vec<(JsonPointer, LockerGuard)>, // tx locks locks: &mut Vec<(JsonPointer, LockerGuard)>, // tx locks
extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], // tx parent locks extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], // tx parent locks
deep: bool,
) { ) {
let mut final_lock = None; let mut final_lock = None;
for lock in extra_locks for lock in extra_locks
@@ -222,7 +230,7 @@ impl Locker {
if let Some(lock) = final_lock { if let Some(lock) = final_lock {
lock lock
} else { } else {
LockerGuard::Write(self.lock_write(ptr).await.into()) LockerGuard::Write(self.lock_write(ptr, deep).await.into())
}, },
)); ));
} }

View File

@@ -64,11 +64,13 @@ where
T: Serialize + for<'de> Deserialize<'de>, T: Serialize + for<'de> Deserialize<'de>,
{ {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) { pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
db.lock(&self.ptr, lock).await db.lock(&self.ptr, lock, true).await
} }
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<T>, Error> { pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> {
self.lock(db, LockType::Read).await; if lock {
self.lock(db, LockType::Read).await;
}
Ok(ModelData(db.get(&self.ptr).await?)) Ok(ModelData(db.get(&self.ptr).await?))
} }
@@ -202,11 +204,17 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model); pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> { impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) { pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
db.lock(self.0.as_ref(), lock).await db.lock(self.0.as_ref(), lock, true).await
} }
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<Option<T>>, Error> { pub async fn get<Db: DbHandle>(
self.lock(db, LockType::Read).await; &self,
db: &mut Db,
lock: bool,
) -> Result<ModelData<Option<T>>, Error> {
if lock {
self.lock(db, LockType::Read).await;
}
Ok(ModelData(db.get(self.0.as_ref()).await?)) Ok(ModelData(db.get(self.0.as_ref()).await?))
} }
@@ -224,8 +232,10 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
}) })
} }
pub async fn exists<Db: DbHandle>(&self, db: &mut Db) -> Result<bool, Error> { pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> {
self.lock(db, LockType::Read).await; if lock {
db.lock(self.0.as_ref(), LockType::Read, false).await;
}
Ok(db.exists(&self.as_ref(), None).await?) Ok(db.exists(&self.as_ref(), None).await?)
} }
@@ -252,7 +262,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
} }
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> { pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
Ok(if self.exists(db).await? { Ok(if self.exists(db, true).await? {
Some(self.0) Some(self.0)
} else { } else {
None None
@@ -260,7 +270,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
} }
pub async fn expect<Db: DbHandle>(self, db: &mut Db) -> Result<T::Model, Error> { pub async fn expect<Db: DbHandle>(self, db: &mut Db) -> Result<T::Model, Error> {
if self.exists(db).await? { if self.exists(db, true).await? {
Ok(self.0) Ok(self.0)
} else { } else {
Err(Error::NodeDoesNotExist(self.0.into())) Err(Error::NodeDoesNotExist(self.0.into()))
@@ -268,7 +278,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
} }
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> { pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
db.lock(self.as_ref(), LockType::Write).await; db.lock(self.as_ref(), LockType::Write, true).await;
db.put(self.as_ref(), &Value::Null).await db.put(self.as_ref(), &Value::Null).await
} }
} }
@@ -277,7 +287,7 @@ where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel, T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
{ {
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> { pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
db.lock(self.as_ref(), LockType::Write).await; db.lock(self.as_ref(), LockType::Write, true).await;
db.put(self.as_ref(), value).await db.put(self.as_ref(), value).await
} }
} }
@@ -435,8 +445,14 @@ where
T::Key: Hash + Eq + for<'de> Deserialize<'de>, T::Key: Hash + Eq + for<'de> Deserialize<'de>,
T::Value: Serialize + for<'de> Deserialize<'de>, T::Value: Serialize + for<'de> Deserialize<'de>,
{ {
pub async fn keys<Db: DbHandle>(&self, db: &mut Db) -> Result<IndexSet<T::Key>, Error> { pub async fn keys<Db: DbHandle>(
db.lock(self.as_ref(), LockType::Read).await; &self,
db: &mut Db,
lock: bool,
) -> Result<IndexSet<T::Key>, Error> {
if lock {
db.lock(self.as_ref(), LockType::Read, false).await;
}
let set = db.keys(self.as_ref(), None).await?; let set = db.keys(self.as_ref(), None).await?;
Ok(set Ok(set
.into_iter() .into_iter()

View File

@@ -159,17 +159,20 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
lock: LockType, lock: LockType,
deep: bool,
) { ) {
match lock { match lock {
LockType::None => (), LockType::None => (),
LockType::Read => { LockType::Read => {
let (locker, mut locks) = self.parent.locker_and_locks(); let (locker, mut locks) = self.parent.locker_and_locks();
locker.add_read_lock(ptr, &mut self.locks, &mut locks).await locker
.add_read_lock(ptr, &mut self.locks, &mut locks, deep)
.await
} }
LockType::Write => { LockType::Write => {
let (locker, mut locks) = self.parent.locker_and_locks(); let (locker, mut locks) = self.parent.locker_and_locks();
locker locker
.add_write_lock(ptr, &mut self.locks, &mut locks) .add_write_lock(ptr, &mut self.locks, &mut locks, deep)
.await .await
} }
} }