Shallow locking option

This commit is contained in:
Aiden McClelland
2021-10-12 17:52:00 -06:00
committed by Aiden McClelland
parent be608c5f15
commit d8340a58f1
4 changed files with 170 additions and 76 deletions

View File

@@ -7,6 +7,7 @@ use serde_json::Value;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard}; use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
use crate::locker::LockType;
use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction}; use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction};
use crate::{patch::DiffPatch, Error}; use crate::{patch::DiffPatch, Error};
@@ -42,7 +43,7 @@ pub trait DbHandle: Send + Sync {
value: &Value, value: &Value,
) -> Result<Option<Arc<Revision>>, Error>; ) -> Result<Option<Arc<Revision>>, Error>;
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>; async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>;
async fn lock(&mut self, ptr: JsonPointer, write: bool) -> (); async fn lock(&mut self, ptr: JsonPointer, lock_type: LockType) -> ();
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync, S: AsRef<str> + Send + Sync,
@@ -124,8 +125,8 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> { async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
(*self).apply(patch).await (*self).apply(patch).await
} }
async fn lock(&mut self, ptr: JsonPointer, write: bool) { async fn lock(&mut self, ptr: JsonPointer, lock_type: LockType) {
(*self).lock(ptr, write).await (*self).lock(ptr, lock_type).await
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
@@ -232,9 +233,9 @@ impl DbHandle for PatchDbHandle {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> { async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
self.db.apply(patch, None, None).await self.db.apply(patch, None, None).await
} }
async fn lock(&mut self, ptr: JsonPointer, write: bool) { async fn lock(&mut self, ptr: JsonPointer, lock_type: LockType) {
self.locks self.locks
.push(self.db.locker.lock(self.id.clone(), ptr, write).await); .push(self.db.locker.lock(self.id.clone(), ptr, lock_type).await);
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,

View File

@@ -36,14 +36,14 @@ impl Locker {
}); });
Locker { sender } Locker { sender }
} }
pub async fn lock(&self, handle_id: HandleId, ptr: JsonPointer, write: bool) -> Guard { pub async fn lock(&self, handle_id: HandleId, ptr: JsonPointer, lock_type: LockType) -> Guard {
let (send, recv) = oneshot::channel(); let (send, recv) = oneshot::channel();
self.sender self.sender
.send(Request { .send(Request {
lock_info: LockInfo { lock_info: LockInfo {
handle_id, handle_id,
ptr, ptr,
write, ty: lock_type,
segments_handled: 0, segments_handled: 0,
}, },
completion: send, completion: send,
@@ -127,68 +127,141 @@ impl Trie {
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct Node { struct Node {
reader_parents: Vec<HandleId>,
readers: Vec<HandleId>, readers: Vec<HandleId>,
writer_parents: Vec<HandleId>,
writers: Vec<HandleId>, writers: Vec<HandleId>,
reqs: Vec<Request>, reqs: Vec<Request>,
} }
impl Node { impl Node {
// true: If there are any writers, it is `id`. // true: If there are any writer_parents, they are `id`.
fn write_free(&self, id: &HandleId) -> bool { fn write_parent_free(&self, id: &HandleId) -> bool {
self.writers.is_empty() || (self.writers.iter().filter(|a| a != &id).count() == 0) self.writer_parents.is_empty() || (self.writer_parents.iter().find(|a| a != &id).is_none())
} }
// true: If there are any readers, it is `id`. // true: If there are any writers, they are `id`.
fn write_free(&self, id: &HandleId) -> bool {
self.writers.is_empty() || (self.writers.iter().find(|a| a != &id).is_none())
}
// true: If there are any reader_parents, they are `id`.
fn read_parent_free(&self, id: &HandleId) -> bool {
self.reader_parents.is_empty() || (self.reader_parents.iter().find(|a| a != &id).is_none())
}
// true: If there are any readers, they are `id`.
fn read_free(&self, id: &HandleId) -> bool { fn read_free(&self, id: &HandleId) -> bool {
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &id).count() == 0) self.readers.is_empty() || (self.readers.iter().find(|a| a != &id).is_none())
} }
// allow a lock to skip the queue if a lock is already held by the same handle // allow a lock to skip the queue if a lock is already held by the same handle
fn can_jump_queue(&self, id: &HandleId) -> bool { fn can_jump_queue(&self, id: &HandleId) -> bool {
self.writers.contains(&id) || self.readers.contains(&id) self.writers.contains(&id)
|| self.writer_parents.contains(&id)
|| self.readers.contains(&id)
|| self.reader_parents.contains(&id)
} }
// `id` is capable of acquiring this node for writing // `id` is capable of acquiring this node for the purpose of writing to a child
fn write_available(&self, id: &HandleId) -> bool { fn write_parent_available(&self, id: &HandleId) -> bool {
self.write_free(id) self.write_free(id)
&& self.read_free(id) && self.read_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id)) && (self.reqs.is_empty() || self.can_jump_queue(id))
} }
// `id` is capable of acquiring this node for writing
fn write_available(&self, id: &HandleId) -> bool {
self.write_free(id)
&& self.write_parent_free(id)
&& self.read_free(id)
&& self.read_parent_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id))
}
fn read_parent_available(&self, id: &HandleId) -> bool {
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
}
// `id` is capable of acquiring this node for reading // `id` is capable of acquiring this node for reading
fn read_available(&self, id: &HandleId) -> bool { fn read_available(&self, id: &HandleId) -> bool {
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) self.write_free(id)
&& self.write_parent_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id))
} }
fn handle_request( fn handle_request(
&mut self, &mut self,
req: Request, req: Request,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Option<Request> { ) -> Option<Request> {
if req.lock_info.write() && self.write_available(&req.lock_info.handle_id) { match (
self.writers.push(req.lock_info.handle_id.clone()); req.lock_info.ty,
req.process(locks_on_lease) req.lock_info.segments_handled == req.lock_info.ptr.len(),
} else if !req.lock_info.write() && self.read_available(&req.lock_info.handle_id) { ) {
self.readers.push(req.lock_info.handle_id.clone()); (LockType::Write, true) if self.write_available(&req.lock_info.handle_id) => {
req.process(locks_on_lease) self.writers.push(req.lock_info.handle_id.clone());
} else { req.process(locks_on_lease)
self.reqs.push(req); }
None (LockType::DeepRead, true) if self.read_available(&req.lock_info.handle_id) => {
self.readers.push(req.lock_info.handle_id.clone());
req.process(locks_on_lease)
}
(LockType::Write, false) if self.write_parent_available(&req.lock_info.handle_id) => {
self.writer_parents.push(req.lock_info.handle_id.clone());
req.process(locks_on_lease)
}
(LockType::DeepRead, false) | (LockType::ShallowRead, _)
if self.read_parent_available(&req.lock_info.handle_id) =>
{
self.reader_parents.push(req.lock_info.handle_id.clone());
req.process(locks_on_lease)
}
_ => {
self.reqs.push(req);
None
}
} }
} }
fn release(&mut self, mut lock_info: LockInfo) -> Option<LockInfo> { fn release(&mut self, mut lock_info: LockInfo) -> Option<LockInfo> {
if lock_info.write() { match (
if let Some(idx) = self lock_info.ty,
.writers lock_info.segments_handled == lock_info.ptr.len(),
.iter() ) {
.enumerate() (LockType::Write, true) => {
.find(|(_, id)| id == &&lock_info.handle_id) if let Some(idx) = self
.map(|(idx, _)| idx) .writers
{ .iter()
self.writers.swap_remove(idx); .enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
self.writers.swap_remove(idx);
}
}
(LockType::DeepRead, true) => {
if let Some(idx) = self
.writers
.iter()
.enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
self.readers.swap_remove(idx);
}
}
(LockType::Write, false) => {
if let Some(idx) = self
.writer_parents
.iter()
.enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
self.writer_parents.swap_remove(idx);
}
}
(LockType::DeepRead, false) | (LockType::ShallowRead, _) => {
if let Some(idx) = self
.reader_parents
.iter()
.enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
self.reader_parents.swap_remove(idx);
}
} }
} else if let Some(idx) = self
.readers
.iter()
.enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
assert!(lock_info.handle_id == self.readers.swap_remove(idx));
} }
if lock_info.ptr.len() == lock_info.segments_handled { if lock_info.ptr.len() == lock_info.segments_handled {
None None
@@ -203,13 +276,10 @@ impl Node {
struct LockInfo { struct LockInfo {
ptr: JsonPointer, ptr: JsonPointer,
segments_handled: usize, segments_handled: usize,
write: bool, ty: LockType,
handle_id: HandleId, handle_id: HandleId,
} }
impl LockInfo { impl LockInfo {
fn write(&self) -> bool {
self.write && self.segments_handled == self.ptr.len()
}
fn current_seg(&self) -> &str { fn current_seg(&self) -> &str {
if self.segments_handled == 0 { if self.segments_handled == 0 {
"" // root "" // root
@@ -225,6 +295,18 @@ impl LockInfo {
} }
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LockType {
ShallowRead,
DeepRead,
Write,
}
impl Default for LockType {
fn default() -> Self {
LockType::ShallowRead
}
}
#[derive(Debug)] #[derive(Debug)]
struct Request { struct Request {
lock_info: LockInfo, lock_info: LockInfo,
@@ -233,18 +315,21 @@ struct Request {
impl Request { impl Request {
fn process(mut self, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> { fn process(mut self, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
if self.lock_info.ptr.len() == self.lock_info.segments_handled { if self.lock_info.ptr.len() == self.lock_info.segments_handled {
let (sender, receiver) = oneshot::channel(); self.complete(locks_on_lease);
locks_on_lease.push(receiver);
let _ = self.completion.send(Guard {
lock_info: self.lock_info.reset(),
sender: Some(sender),
});
None None
} else { } else {
self.lock_info.segments_handled += 1; self.lock_info.segments_handled += 1;
Some(self) Some(self)
} }
} }
fn complete(self, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>) {
let (sender, receiver) = oneshot::channel();
locks_on_lease.push(receiver);
let _ = self.completion.send(Guard {
lock_info: self.lock_info.reset(),
sender: Some(sender),
});
}
} }
#[derive(Debug)] #[derive(Debug)]

View File

@@ -9,6 +9,7 @@ use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::locker::LockType;
use crate::{DbHandle, DiffPatch, Error, Revision}; use crate::{DbHandle, DiffPatch, Error, Revision};
#[derive(Debug)] #[derive(Debug)]
@@ -63,19 +64,19 @@ impl<T> Model<T>
where where
T: Serialize + for<'de> Deserialize<'de>, T: Serialize + for<'de> Deserialize<'de>,
{ {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, write: bool) { pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock_type: LockType) {
db.lock(self.ptr.clone(), write).await db.lock(self.ptr.clone(), lock_type).await
} }
pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> { pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> {
if lock { if lock {
self.lock(db, false).await; self.lock(db, LockType::DeepRead).await;
} }
Ok(ModelData(db.get(&self.ptr).await?)) Ok(ModelData(db.get(&self.ptr).await?))
} }
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> { pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
self.lock(db, true).await; self.lock(db, LockType::Write).await;
let original = db.get_value(&self.ptr, None).await?; let original = db.get_value(&self.ptr, None).await?;
let current = serde_json::from_value(original.clone())?; let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut { Ok(ModelDataMut {
@@ -103,7 +104,7 @@ where
db: &mut Db, db: &mut Db,
value: &T, value: &T,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
self.lock(db, true).await; self.lock(db, LockType::Write).await;
db.put(&self.ptr, value).await db.put(&self.ptr, value).await
} }
} }
@@ -226,8 +227,8 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
#[derive(Debug)] #[derive(Debug)]
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model); pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> { impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, write: bool) { pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock_type: LockType) {
db.lock(self.0.as_ref().clone(), write).await db.lock(self.0.as_ref().clone(), lock_type).await
} }
pub async fn get<Db: DbHandle>( pub async fn get<Db: DbHandle>(
@@ -236,7 +237,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
lock: bool, lock: bool,
) -> Result<ModelData<Option<T>>, Error> { ) -> Result<ModelData<Option<T>>, Error> {
if lock { if lock {
self.lock(db, false).await; self.lock(db, LockType::DeepRead).await;
} }
Ok(ModelData(db.get(self.0.as_ref()).await?)) Ok(ModelData(db.get(self.0.as_ref()).await?))
} }
@@ -245,7 +246,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
&self, &self,
db: &mut Db, db: &mut Db,
) -> Result<ModelDataMut<Option<T>>, Error> { ) -> Result<ModelDataMut<Option<T>>, Error> {
self.lock(db, true).await; self.lock(db, LockType::Write).await;
let original = db.get_value(self.0.as_ref(), None).await?; let original = db.get_value(self.0.as_ref(), None).await?;
let current = serde_json::from_value(original.clone())?; let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut { Ok(ModelDataMut {
@@ -257,7 +258,8 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> { pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> {
if lock { if lock {
db.lock(self.0.as_ref().clone(), false).await; db.lock(self.0.as_ref().clone(), LockType::ShallowRead)
.await;
} }
Ok(db.exists(&self.as_ref(), None).await?) Ok(db.exists(&self.as_ref(), None).await?)
} }
@@ -283,7 +285,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
} }
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> { pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref().clone(), true).await; db.lock(self.as_ref().clone(), LockType::Write).await;
db.put(self.as_ref(), &Value::Null).await db.put(self.as_ref(), &Value::Null).await
} }
} }
@@ -317,7 +319,7 @@ where
db: &mut Db, db: &mut Db,
value: &T, value: &T,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref().clone(), true).await; db.lock(self.as_ref().clone(), LockType::Write).await;
db.put(self.as_ref(), value).await db.put(self.as_ref(), value).await
} }
} }
@@ -488,7 +490,7 @@ where
lock: bool, lock: bool,
) -> Result<BTreeSet<T::Key>, Error> { ) -> Result<BTreeSet<T::Key>, Error> {
if lock { if lock {
db.lock(self.as_ref().clone(), false).await; db.lock(self.as_ref().clone(), LockType::ShallowRead).await;
} }
let set = db.keys(self.as_ref(), None).await?; let set = db.keys(self.as_ref(), None).await?;
Ok(set Ok(set
@@ -497,13 +499,15 @@ where
.collect::<Result<_, _>>()?) .collect::<Result<_, _>>()?)
} }
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> { pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
db.lock(self.as_ref().clone(), true).await; db.lock(self.as_ref().clone(), LockType::Write).await;
db.apply(DiffPatch(Patch(vec![PatchOperation::Remove( if db.exists(self.clone().idx(key).as_ref(), None).await? {
RemoveOperation { db.apply(DiffPatch(Patch(vec![PatchOperation::Remove(
path: self.as_ref().clone().join_end(key.as_ref()), RemoveOperation {
}, path: self.as_ref().clone().join_end(key.as_ref()),
)]))) },
.await?; )])))
.await?;
}
Ok(()) Ok(())
} }
} }

View File

@@ -10,7 +10,7 @@ use tokio::sync::broadcast::Receiver;
use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::sync::{RwLock, RwLockReadGuard};
use crate::handle::HandleId; use crate::handle::HandleId;
use crate::locker::{Guard, Locker}; use crate::locker::{Guard, LockType, Locker};
use crate::patch::{DiffPatch, Revision}; use crate::patch::{DiffPatch, Revision};
use crate::store::Store; use crate::store::Store;
use crate::{DbHandle, Error, PatchDbHandle}; use crate::{DbHandle, Error, PatchDbHandle};
@@ -166,9 +166,13 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
self.updates.append(patch); self.updates.append(patch);
Ok(None) Ok(None)
} }
async fn lock(&mut self, ptr: JsonPointer, write: bool) { async fn lock(&mut self, ptr: JsonPointer, lock_type: LockType) {
self.locks self.locks.push(
.push(self.parent.locker().lock(self.id.clone(), ptr, write).await) self.parent
.locker()
.lock(self.id.clone(), ptr, lock_type)
.await,
)
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,