mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
redo locking api
This commit is contained in:
committed by
Aiden McClelland
parent
5b57eb8fe1
commit
e40ce2e14c
@@ -7,10 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
|
||||
|
||||
use crate::{
|
||||
locker::{LockType, LockerGuard},
|
||||
Locker, PatchDb, Revision, Store, Transaction,
|
||||
};
|
||||
use crate::{locker::LockerGuard, Locker, PatchDb, Revision, Store, Transaction};
|
||||
use crate::{patch::DiffPatch, Error};
|
||||
|
||||
#[async_trait]
|
||||
@@ -19,7 +16,7 @@ pub trait DbHandle: Send + Sync {
|
||||
fn rebase(&mut self) -> Result<(), Error>;
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>);
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
@@ -41,12 +38,7 @@ pub trait DbHandle: Send + Sync {
|
||||
value: &Value,
|
||||
) -> Result<Option<Arc<Revision>>, Error>;
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>;
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
deep: bool,
|
||||
) -> ();
|
||||
async fn lock(&mut self, ptr: &JsonPointer) -> ();
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
@@ -90,7 +82,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
(**self).subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
|
||||
(*self).locker_and_locks()
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
@@ -124,13 +116,8 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||
(*self).apply(patch).await
|
||||
}
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
deep: bool,
|
||||
) {
|
||||
(*self).lock(ptr, lock, deep).await
|
||||
async fn lock(&mut self, ptr: &JsonPointer) {
|
||||
(*self).lock(ptr).await
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
@@ -157,7 +144,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
|
||||
pub struct PatchDbHandle {
|
||||
pub(crate) db: PatchDb,
|
||||
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
|
||||
pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -179,7 +166,7 @@ impl DbHandle for PatchDbHandle {
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.db.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
|
||||
(&self.db.locker, vec![self.locks.as_mut_slice()])
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
@@ -225,27 +212,8 @@ impl DbHandle for PatchDbHandle {
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||
self.db.apply(patch, None, None).await
|
||||
}
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
deep: bool,
|
||||
) {
|
||||
match lock {
|
||||
LockType::Read => {
|
||||
self.db
|
||||
.locker
|
||||
.add_read_lock(ptr, &mut self.locks, &mut [], deep)
|
||||
.await;
|
||||
}
|
||||
LockType::Write => {
|
||||
self.db
|
||||
.locker
|
||||
.add_write_lock(ptr, &mut self.locks, &mut [], deep)
|
||||
.await;
|
||||
}
|
||||
LockType::None => (),
|
||||
}
|
||||
async fn lock(&mut self, ptr: &JsonPointer) {
|
||||
self.db.locker.add_lock(ptr, &mut self.locks, &mut []).await;
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
|
||||
@@ -20,7 +20,7 @@ mod test;
|
||||
pub use handle::{DbHandle, PatchDbHandle};
|
||||
pub use json_patch;
|
||||
pub use json_ptr;
|
||||
pub use locker::{LockType, Locker};
|
||||
pub use locker::Locker;
|
||||
pub use model::{
|
||||
BoxModel, HasModel, Map, MapModel, Model, ModelData, ModelDataMut, OptionModel, VecModel,
|
||||
};
|
||||
|
||||
@@ -1,250 +1,162 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::task::Poll;
|
||||
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::FutureExt;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use qutex::{QrwLock, ReadGuard, WriteGuard};
|
||||
use qutex::{Guard, Qutex};
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum LockType {
|
||||
None,
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub struct LockerGuard(Arc<NotifyGuard>, Guard<()>, Arc<LockerInner>);
|
||||
|
||||
pub enum LockerGuard {
|
||||
Empty,
|
||||
Read(LockerReadGuard),
|
||||
Write(LockerWriteGuard),
|
||||
}
|
||||
impl LockerGuard {
|
||||
pub fn take(&mut self) -> Self {
|
||||
std::mem::replace(self, LockerGuard::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LockerReadGuard(Arc<Mutex<Option<ReadGuard<HashMap<String, Locker>>>>>);
|
||||
impl LockerReadGuard {
|
||||
async fn upgrade(&self) -> Option<LockerWriteGuard> {
|
||||
let guard = self.0.try_lock().unwrap().take();
|
||||
if let Some(g) = guard {
|
||||
Some(LockerWriteGuard(
|
||||
Some(ReadGuard::upgrade(g).await.unwrap()),
|
||||
Some(self.clone()),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
impl From<ReadGuard<HashMap<String, Locker>>> for LockerReadGuard {
|
||||
fn from(guard: ReadGuard<HashMap<String, Locker>>) -> Self {
|
||||
LockerReadGuard(Arc::new(Mutex::new(Some(guard))))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LockerWriteGuard(
|
||||
Option<WriteGuard<HashMap<String, Locker>>>,
|
||||
Option<LockerReadGuard>,
|
||||
);
|
||||
impl From<WriteGuard<HashMap<String, Locker>>> for LockerWriteGuard {
|
||||
fn from(guard: WriteGuard<HashMap<String, Locker>>) -> Self {
|
||||
LockerWriteGuard(Some(guard), None)
|
||||
}
|
||||
}
|
||||
impl Drop for LockerWriteGuard {
|
||||
#[derive(Debug)]
|
||||
struct NotifyGuard(Arc<Notify>);
|
||||
impl Drop for NotifyGuard {
|
||||
fn drop(&mut self) {
|
||||
if let (Some(write), Some(read)) = (self.0.take(), self.1.take()) {
|
||||
*read.0.try_lock().unwrap() = Some(WriteGuard::downgrade(write));
|
||||
}
|
||||
self.0.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Locker(QrwLock<HashMap<String, Locker>>);
|
||||
#[derive(Debug)]
|
||||
pub struct Locker(Arc<LockerInner>);
|
||||
#[derive(Debug)]
|
||||
struct LockerInner {
|
||||
map: Mutex<HashMap<String, Weak<LockerInner>>>,
|
||||
self_lock: Qutex<()>,
|
||||
children_lock: Arc<Notify>,
|
||||
guard: Mutex<Weak<NotifyGuard>>,
|
||||
parent: Option<(Arc<NotifyGuard>, String, Arc<LockerInner>)>,
|
||||
}
|
||||
impl Drop for LockerInner {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, idx, parent)) = self.parent.take() {
|
||||
Handle::current().block_on(parent.map.lock()).remove(&idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Locker {
|
||||
pub fn new() -> Self {
|
||||
Locker(QrwLock::new(HashMap::new()))
|
||||
Locker(Arc::new(LockerInner {
|
||||
map: Mutex::new(HashMap::new()),
|
||||
self_lock: Qutex::new(()),
|
||||
children_lock: {
|
||||
let notify = Arc::new(Notify::new());
|
||||
notify.notify_one();
|
||||
notify
|
||||
},
|
||||
guard: Mutex::new(Weak::new()),
|
||||
parent: None,
|
||||
}))
|
||||
}
|
||||
fn lock_root_read<'a>(guard: &'a ReadGuard<HashMap<String, Locker>>) -> BoxFuture<'a, ()> {
|
||||
async move {
|
||||
for (_, v) in &**guard {
|
||||
let g = v.0.clone().read().await.unwrap();
|
||||
Self::lock_root_read(&g).await;
|
||||
}
|
||||
async fn notify_guard(&self) -> Arc<NotifyGuard> {
|
||||
let mut lock = self.0.guard.lock().await;
|
||||
if let Some(n) = lock.upgrade() {
|
||||
Arc::new(NotifyGuard(n.0.clone()))
|
||||
} else {
|
||||
let res = Arc::new(NotifyGuard(self.0.children_lock.clone()));
|
||||
*lock = Arc::downgrade(&res);
|
||||
res
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
pub async fn lock_read<S: AsRef<str>, V: SegList>(
|
||||
async fn child<'a, S: Into<Cow<'a, str>>>(&self, name: S) -> Locker {
|
||||
let name: Cow<'a, str> = name.into();
|
||||
let mut lock = self.0.map.lock().await;
|
||||
if let Some(child) = lock.get(name.as_ref()).and_then(|w| w.upgrade()) {
|
||||
Locker(child)
|
||||
} else {
|
||||
let name = name.into_owned();
|
||||
let res = Arc::new(LockerInner {
|
||||
map: Mutex::new(HashMap::new()),
|
||||
self_lock: Qutex::new(()),
|
||||
children_lock: {
|
||||
let notify = Arc::new(Notify::new());
|
||||
notify.notify_one();
|
||||
notify
|
||||
},
|
||||
guard: Mutex::new(Weak::new()),
|
||||
parent: Some((self.notify_guard().await, name.clone(), self.0.clone())),
|
||||
});
|
||||
lock.insert(name, Arc::downgrade(&res));
|
||||
Locker(res)
|
||||
}
|
||||
}
|
||||
/// await once: in the queue, await twice: all children dropped
|
||||
async fn wait_for_children<'a>(&'a self) -> BoxFuture<'a, ()> {
|
||||
let children = self.0.guard.lock().await;
|
||||
let mut fut = if children.strong_count() == 0 {
|
||||
futures::future::ready(()).boxed()
|
||||
} else {
|
||||
self.0.children_lock.notified().boxed()
|
||||
};
|
||||
if matches!(futures::poll!(&mut fut), Poll::Ready(_)) {
|
||||
return futures::future::ready(()).boxed();
|
||||
}
|
||||
drop(children);
|
||||
fut
|
||||
}
|
||||
async fn acquire_and_trade(self, guards: Vec<LockerGuard>) -> LockerGuard {
|
||||
let children_dropped = self.wait_for_children().await;
|
||||
guards.into_iter().for_each(drop);
|
||||
children_dropped.await;
|
||||
let guard = self.0.self_lock.clone().lock().await.unwrap();
|
||||
LockerGuard(self.notify_guard().await, guard, self.0.clone())
|
||||
}
|
||||
pub async fn lock<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
deep: bool,
|
||||
) -> ReadGuard<HashMap<String, Locker>> {
|
||||
#[cfg(feature = "log")]
|
||||
log::debug!("Locking {} for READ: {{ deep: {} }}", ptr, deep);
|
||||
let mut lock = Some(self.0.clone().read().await.unwrap());
|
||||
guards: Vec<LockerGuard>,
|
||||
) -> LockerGuard {
|
||||
let mut locker = Locker(self.0.clone());
|
||||
for seg in ptr.iter() {
|
||||
let new_lock = if let Some(locker) = lock.as_ref().unwrap().get(seg) {
|
||||
locker.0.clone().read().await.unwrap()
|
||||
} else {
|
||||
let mut writer = ReadGuard::upgrade(lock.take().unwrap()).await.unwrap();
|
||||
writer.insert(seg.to_owned(), Locker::new());
|
||||
let reader = WriteGuard::downgrade(writer);
|
||||
reader.get(seg).unwrap().0.clone().read().await.unwrap()
|
||||
};
|
||||
lock = Some(new_lock);
|
||||
locker = locker.child(seg).await;
|
||||
}
|
||||
let res = lock.unwrap();
|
||||
if deep {
|
||||
Self::lock_root_read(&res);
|
||||
let res = locker.acquire_and_trade(guards).await;
|
||||
let mut guards = Vec::with_capacity(ptr.len());
|
||||
let mut cur = res.2.parent.as_ref().map(|(_, _, p)| p);
|
||||
while let Some(parent) = cur {
|
||||
guards.push(parent.self_lock.clone().lock().await.unwrap());
|
||||
cur = parent.parent.as_ref().map(|(_, _, p)| p);
|
||||
}
|
||||
#[cfg(feature = "log")]
|
||||
log::debug!("Locked {} for READ: {{ deep: {} }}", ptr, deep);
|
||||
res
|
||||
}
|
||||
pub(crate) async fn add_read_lock<S: AsRef<str> + Clone, V: SegList + Clone>(
|
||||
/// TODO: DRAGONS!!!
|
||||
/// Acquiring a lock to a node above something you already held will do so at the transaction level of the lock you already held!
|
||||
/// This means even though you dropped the sub tx in which you acquired the higher level lock,
|
||||
/// the higher lock could still be held by the parent tx which originally held the lower lock.
|
||||
pub async fn add_lock(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
locks: &mut Vec<(JsonPointer, LockerGuard)>,
|
||||
extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]],
|
||||
deep: bool,
|
||||
ptr: &JsonPointer,
|
||||
locks: &mut Vec<(JsonPointer, Option<LockerGuard>)>, // tx locks
|
||||
extra_locks: &mut [&mut [(JsonPointer, Option<LockerGuard>)]], // tx parent locks
|
||||
) {
|
||||
for lock in extra_locks
|
||||
.iter()
|
||||
.flat_map(|a| a.iter())
|
||||
.chain(locks.iter())
|
||||
{
|
||||
if ptr.starts_with(&lock.0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
locks.push((
|
||||
JsonPointer::to_owned(ptr.clone()),
|
||||
LockerGuard::Read(self.lock_read(ptr, deep).await.into()),
|
||||
));
|
||||
}
|
||||
fn lock_root_write<'a>(guard: &'a WriteGuard<HashMap<String, Locker>>) -> BoxFuture<'a, ()> {
|
||||
async move {
|
||||
for (_, v) in &**guard {
|
||||
let g = v.0.clone().write().await.unwrap();
|
||||
Self::lock_root_write(&g).await;
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
pub async fn lock_write<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
deep: bool,
|
||||
) -> WriteGuard<HashMap<String, Locker>> {
|
||||
#[cfg(feature = "log")]
|
||||
log::debug!("Locking {} for WRITE: {{ deep: {} }}", ptr, deep);
|
||||
let mut lock = self.0.clone().write().await.unwrap();
|
||||
for seg in ptr.iter() {
|
||||
let new_lock = if let Some(locker) = lock.get(seg) {
|
||||
locker.0.clone().write().await.unwrap()
|
||||
} else {
|
||||
lock.insert(seg.to_owned(), Locker::new());
|
||||
lock.get(seg).unwrap().0.clone().write().await.unwrap()
|
||||
};
|
||||
lock = new_lock;
|
||||
}
|
||||
let res = lock;
|
||||
if deep {
|
||||
Self::lock_root_write(&res);
|
||||
}
|
||||
#[cfg(feature = "log")]
|
||||
log::debug!("Locked {} for WRITE: {{ deep: {} }}", ptr, deep);
|
||||
res
|
||||
}
|
||||
pub(crate) async fn add_write_lock<S: AsRef<str> + Clone, V: SegList + Clone>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
locks: &mut Vec<(JsonPointer, LockerGuard)>, // tx locks
|
||||
extra_locks: &mut [&mut [(JsonPointer, LockerGuard)]], // tx parent locks
|
||||
deep: bool,
|
||||
) {
|
||||
let mut final_lock = None;
|
||||
let mut lock_dest = None;
|
||||
let mut guards = Vec::new();
|
||||
for lock in extra_locks
|
||||
.iter_mut()
|
||||
.flat_map(|a| a.iter_mut())
|
||||
.chain(locks.iter_mut())
|
||||
{
|
||||
enum Choice {
|
||||
Return,
|
||||
Continue,
|
||||
Break,
|
||||
if ptr.starts_with(&lock.0) {
|
||||
return;
|
||||
}
|
||||
let choice: Choice;
|
||||
if let Some(remainder) = ptr.strip_prefix(&lock.0) {
|
||||
let guard = lock.1.take();
|
||||
lock.1 = match guard {
|
||||
LockerGuard::Read(LockerReadGuard(guard)) if !remainder.is_empty() => {
|
||||
// read guard already exists at higher level
|
||||
let mut lock = guard.lock().await;
|
||||
if let Some(l) = lock.take() {
|
||||
let mut orig_lock = None;
|
||||
let mut lock = ReadGuard::upgrade(l).await.unwrap();
|
||||
for seg in remainder.iter() {
|
||||
let new_lock = if let Some(locker) = lock.get(seg) {
|
||||
locker.0.clone().write().await.unwrap()
|
||||
} else {
|
||||
lock.insert(seg.to_owned(), Locker::new());
|
||||
lock.get(seg).unwrap().0.clone().write().await.unwrap()
|
||||
};
|
||||
if orig_lock.is_none() {
|
||||
orig_lock = Some(lock);
|
||||
}
|
||||
lock = new_lock;
|
||||
}
|
||||
final_lock = Some(LockerGuard::Write(lock.into()));
|
||||
choice = Choice::Break;
|
||||
LockerGuard::Read(WriteGuard::downgrade(orig_lock.unwrap()).into())
|
||||
} else {
|
||||
drop(lock);
|
||||
choice = Choice::Return;
|
||||
LockerGuard::Read(LockerReadGuard(guard))
|
||||
}
|
||||
}
|
||||
LockerGuard::Read(l) => {
|
||||
// read exists, convert to write
|
||||
if let Some(upgraded) = l.upgrade().await {
|
||||
final_lock = Some(LockerGuard::Write(upgraded));
|
||||
choice = Choice::Break;
|
||||
} else {
|
||||
choice = Choice::Continue;
|
||||
}
|
||||
LockerGuard::Read(l)
|
||||
}
|
||||
LockerGuard::Write(l) => {
|
||||
choice = Choice::Return;
|
||||
LockerGuard::Write(l)
|
||||
} // leave it alone, already sufficiently locked
|
||||
LockerGuard::Empty => {
|
||||
unreachable!("LockerGuard found empty");
|
||||
}
|
||||
};
|
||||
match choice {
|
||||
Choice::Return => return,
|
||||
Choice::Break => break,
|
||||
Choice::Continue => continue,
|
||||
if lock.0.starts_with(&ptr) {
|
||||
if let Some(guard) = lock.1.take() {
|
||||
guards.push(guard);
|
||||
lock_dest = Some(lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
locks.push((
|
||||
JsonPointer::to_owned(ptr.clone()),
|
||||
if let Some(lock) = final_lock {
|
||||
lock
|
||||
} else {
|
||||
LockerGuard::Write(self.lock_write(ptr, deep).await.into())
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
impl Default for Locker {
|
||||
fn default() -> Self {
|
||||
Locker::new()
|
||||
let guard = self.lock(ptr, guards).await;
|
||||
if let Some(lock) = lock_dest {
|
||||
lock.0 = ptr.clone();
|
||||
lock.1 = Some(guard);
|
||||
} else {
|
||||
locks.push((ptr.clone(), Some(guard)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ use json_ptr::JsonPointer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::locker::LockType;
|
||||
use crate::{DbHandle, DiffPatch, Error, Revision};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -65,19 +64,19 @@ impl<T> Model<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
|
||||
db.lock(&self.ptr, lock, true).await
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
|
||||
db.lock(&self.ptr).await
|
||||
}
|
||||
|
||||
pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> {
|
||||
if lock {
|
||||
self.lock(db, LockType::Read).await;
|
||||
self.lock(db).await;
|
||||
}
|
||||
Ok(ModelData(db.get(&self.ptr).await?))
|
||||
}
|
||||
|
||||
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
|
||||
self.lock(db, LockType::Write).await;
|
||||
self.lock(db).await;
|
||||
let original = db.get_value(&self.ptr, None).await?;
|
||||
let current = serde_json::from_value(original.clone())?;
|
||||
Ok(ModelDataMut {
|
||||
@@ -105,7 +104,7 @@ where
|
||||
db: &mut Db,
|
||||
value: &T,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
self.lock(db, LockType::Write).await;
|
||||
self.lock(db).await;
|
||||
db.put(&self.ptr, value).await
|
||||
}
|
||||
}
|
||||
@@ -209,8 +208,8 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
|
||||
#[derive(Debug)]
|
||||
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
|
||||
db.lock(self.0.as_ref(), lock, true).await
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
|
||||
db.lock(self.0.as_ref()).await
|
||||
}
|
||||
|
||||
pub async fn get<Db: DbHandle>(
|
||||
@@ -219,7 +218,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
lock: bool,
|
||||
) -> Result<ModelData<Option<T>>, Error> {
|
||||
if lock {
|
||||
self.lock(db, LockType::Read).await;
|
||||
self.lock(db).await;
|
||||
}
|
||||
Ok(ModelData(db.get(self.0.as_ref()).await?))
|
||||
}
|
||||
@@ -228,7 +227,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
&self,
|
||||
db: &mut Db,
|
||||
) -> Result<ModelDataMut<Option<T>>, Error> {
|
||||
self.lock(db, LockType::Write).await;
|
||||
self.lock(db).await;
|
||||
let original = db.get_value(self.0.as_ref(), None).await?;
|
||||
let current = serde_json::from_value(original.clone())?;
|
||||
Ok(ModelDataMut {
|
||||
@@ -240,7 +239,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
|
||||
pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> {
|
||||
if lock {
|
||||
db.lock(self.0.as_ref(), LockType::Read, false).await;
|
||||
db.lock(self.0.as_ref()).await;
|
||||
}
|
||||
Ok(db.exists(&self.as_ref(), None).await?)
|
||||
}
|
||||
@@ -284,7 +283,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
}
|
||||
|
||||
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
|
||||
db.lock(self.as_ref(), LockType::Write, true).await;
|
||||
db.lock(self.as_ref()).await;
|
||||
db.put(self.as_ref(), &Value::Null).await
|
||||
}
|
||||
}
|
||||
@@ -297,7 +296,7 @@ where
|
||||
db: &mut Db,
|
||||
value: &T,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
db.lock(self.as_ref(), LockType::Write, true).await;
|
||||
db.lock(self.as_ref()).await;
|
||||
db.put(self.as_ref(), value).await
|
||||
}
|
||||
}
|
||||
@@ -461,7 +460,7 @@ where
|
||||
lock: bool,
|
||||
) -> Result<IndexSet<T::Key>, Error> {
|
||||
if lock {
|
||||
db.lock(self.as_ref(), LockType::Read, false).await;
|
||||
db.lock(self.as_ref()).await;
|
||||
}
|
||||
let set = db.keys(self.as_ref(), None).await?;
|
||||
Ok(set
|
||||
@@ -470,7 +469,7 @@ where
|
||||
.collect::<Result<_, _>>()?)
|
||||
}
|
||||
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
|
||||
db.lock(self.as_ref(), LockType::Write, false).await;
|
||||
db.lock(self.as_ref()).await;
|
||||
db.apply(DiffPatch(Patch(vec![PatchOperation::Remove(
|
||||
RemoveOperation {
|
||||
path: self.as_ref().clone().join_end(key.as_ref()),
|
||||
|
||||
@@ -209,7 +209,7 @@ impl Store {
|
||||
pub struct PatchDb {
|
||||
pub(crate) store: Arc<RwLock<Store>>,
|
||||
subscriber: Arc<Sender<Arc<Revision>>>,
|
||||
pub(crate) locker: Locker,
|
||||
pub(crate) locker: Arc<Locker>,
|
||||
}
|
||||
impl PatchDb {
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
@@ -217,7 +217,7 @@ impl PatchDb {
|
||||
|
||||
Ok(PatchDb {
|
||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||
locker: Locker::new(),
|
||||
locker: Arc::new(Locker::new()),
|
||||
subscriber: Arc::new(subscriber),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
use crate::store::Store;
|
||||
use crate::Error;
|
||||
use crate::{
|
||||
locker::{LockType, Locker, LockerGuard},
|
||||
locker::{Locker, LockerGuard},
|
||||
DbHandle,
|
||||
};
|
||||
use crate::{
|
||||
@@ -22,7 +22,7 @@ use crate::{
|
||||
|
||||
pub struct Transaction<Parent: DbHandle> {
|
||||
pub(crate) parent: Parent,
|
||||
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
|
||||
pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
|
||||
pub(crate) updates: DiffPatch,
|
||||
pub(crate) sub: Receiver<Arc<Revision>>,
|
||||
}
|
||||
@@ -94,7 +94,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.parent.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locks.push(&mut self.locks);
|
||||
(locker, locks)
|
||||
@@ -162,27 +162,9 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
self.updates.append(patch);
|
||||
Ok(None)
|
||||
}
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
deep: bool,
|
||||
) {
|
||||
match lock {
|
||||
LockType::None => (),
|
||||
LockType::Read => {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locker
|
||||
.add_read_lock(ptr, &mut self.locks, &mut locks, deep)
|
||||
.await
|
||||
}
|
||||
LockType::Write => {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locker
|
||||
.add_write_lock(ptr, &mut self.locks, &mut locks, deep)
|
||||
.await
|
||||
}
|
||||
}
|
||||
async fn lock(&mut self, ptr: &JsonPointer) {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locker.add_lock(ptr, &mut self.locks, &mut locks).await
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
|
||||
Reference in New Issue
Block a user