Feature/lockless (#60)

* lockless

* new and improved api

* fix tests

Co-authored-by: J H <Blu-J@users.noreply.github.com>

* fix models again

* fix imports

* improved api

* fix internally tagged enums

---------

Co-authored-by: J H <Blu-J@users.noreply.github.com>
This commit is contained in:
Aiden McClelland
2023-09-28 13:26:17 -06:00
committed by GitHub
parent 8aeded3321
commit 3e5cc22bb6
23 changed files with 1008 additions and 4784 deletions

View File

@@ -13,4 +13,4 @@ repository = "https://github.com/Start9Labs/patch-db"
syn = { version = "1.0.5", features = ["full", "extra-traits"] }
quote = "1.0.1"
proc-macro2 = "1.0.1"
heck = "0.3.2"
heck = "0.4.1"

File diff suppressed because it is too large Load Diff

View File

@@ -20,17 +20,17 @@ unstable = []
async-trait = "0.1.42"
fd-lock-rs = "0.1.3"
futures = "0.3.8"
imbl = "1.0.1"
imbl = "2"
imbl-value = { git = "https://github.com/Start9Labs/imbl-value.git" }
json-patch = { path = "../json-patch" }
json-ptr = { path = "../json-ptr" }
lazy_static = "1.4.0"
tracing = { version = "0.1.29", optional = true }
tracing-error = { version = "0.1.2", optional = true }
nix = "0.23.0"
tracing-error = { version = "0.2.0", optional = true }
nix = "0.26.2"
patch-db-macro = { path = "../patch-db-macro" }
serde = { version = "1.0.118", features = ["rc"] }
serde_cbor = { path = "../cbor" }
serde_json = "1.0.61"
thiserror = "1.0.23"
tokio = { version = "1.0.1", features = [
"sync",

View File

@@ -1,171 +0,0 @@
use std::marker::PhantomData;
use imbl::OrdSet;
use serde::{Deserialize, Serialize};
use crate::{model_paths::JsonGlob, DbHandle, Error, LockType};
use self::unsaturated_args::UnsaturatedArgs;
pub mod unsaturated_args;
/// Used at the beggining of a set of code that may acquire locks into a db.
/// This will be used to represent a potential lock that would be used, and this will then be
/// sent to a bulk locker, that will take multiple of these targets and lock them all at once instead
/// of one at a time. Then once the locks have been acquired, this target can then be turned into a receipt
/// which can then access into the db.
#[derive(Clone)]
pub struct LockTarget<T, StarBinds>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub glob: JsonGlob,
pub lock_type: LockType,
/// What the target will eventually need to return in a get, or value to be put in a set
pub(crate) db_type: PhantomData<T>,
/// How many stars (potential keys in maps, ...) that need to be bound to actual paths.
pub(crate) _star_binds: UnsaturatedArgs<StarBinds>,
}
/// This is acting as a newtype for the copyable section
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct LockTargetId {
pub(crate) glob: JsonGlob,
pub(crate) lock_type: LockType,
}
impl<T, SB> LockTarget<T, SB>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub fn key_for_indexing(&self) -> LockTargetId {
let paths: &JsonGlob = &self.glob;
LockTargetId {
// TODO: Remove this clone
glob: paths.clone(),
lock_type: self.lock_type,
}
}
pub fn add_to_keys(self, locks: &mut Vec<LockTargetId>) -> Self {
locks.push(self.key_for_indexing());
self
}
}
#[derive(Debug, Clone)]
pub struct Verifier {
pub(crate) target_locks: OrdSet<LockTargetId>,
}
impl<T, SB> LockTarget<T, SB>
where
T: Serialize + for<'de> Deserialize<'de>,
{
/// Use this to verify the target, and if valid return a verified lock
pub fn verify(self, lock_set: &Verifier) -> Result<LockReceipt<T, SB>, Error> {
if !lock_set.target_locks.contains(&self.key_for_indexing()) {
return Err(Error::Locker(
"Cannot unlock a lock that is not in the unlock set".to_string(),
));
}
Ok(LockReceipt { lock: self })
}
}
/// A lock reciept is the final goal, where we can now get/ set into the db
#[derive(Clone)]
pub struct LockReceipt<T, SB>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub lock: LockTarget<T, SB>,
}
impl<T, SB> LockReceipt<T, SB>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
async fn set_<DH: DbHandle>(
&self,
db_handle: &mut DH,
new_value: T,
binds: &[&str],
) -> Result<(), Error> {
let lock_type = self.lock.lock_type;
let pointer = &self.lock.glob.as_pointer(binds);
if lock_type != LockType::Write {
return Err(Error::Locker("Cannot set a read lock".to_string()));
}
db_handle.put(pointer, &new_value).await?;
Ok(())
}
async fn get_<DH: DbHandle>(
&self,
db_handle: &mut DH,
binds: &[&str],
) -> Result<Option<T>, Error> {
let path = self.lock.glob.as_pointer(binds);
if !db_handle.exists(&path, None).await {
return Ok(None);
}
Ok(Some(db_handle.get(&path).await?))
}
}
impl<T> LockReceipt<T, ()>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
pub async fn set<DH: DbHandle>(&self, db_handle: &mut DH, new_value: T) -> Result<(), Error> {
self.set_(db_handle, new_value, &[]).await
}
pub async fn get<DH: DbHandle>(&self, db_handle: &mut DH) -> Result<T, Error> {
self.get_(db_handle, &[]).await.and_then(|x| {
x.map(Ok).unwrap_or_else(|| {
serde_json::from_value(serde_json::Value::Null).map_err(Error::JSON)
})
})
}
}
impl<T> LockReceipt<T, String>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
pub async fn set<DH: DbHandle>(
&self,
db_handle: &mut DH,
new_value: T,
binds: &str,
) -> Result<(), Error> {
self.set_(db_handle, new_value, &[binds]).await
}
pub async fn get<DH: DbHandle>(
&self,
db_handle: &mut DH,
binds: &str,
) -> Result<Option<T>, Error> {
self.get_(db_handle, &[binds]).await
}
}
impl<T> LockReceipt<T, (String, String)>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
pub async fn set<DH: DbHandle>(
&self,
db_handle: &mut DH,
new_value: T,
binds: (&str, &str),
) -> Result<(), Error> {
self.set_(db_handle, new_value, &[binds.0, binds.1]).await
}
pub async fn get<DH: DbHandle>(
&self,
db_handle: &mut DH,
binds: (&str, &str),
) -> Result<Option<T>, Error> {
self.get_(db_handle, &[binds.0, binds.1]).await
}
}

View File

@@ -1,70 +0,0 @@
use std::marker::PhantomData;
use crate::JsonGlob;
/// Used to create a proof that will be consumed later to verify the amount of arguments needed to get a path.
/// One of the places that it is used is when creating a lock target
#[derive(Clone, Debug, Copy)]
pub struct UnsaturatedArgs<A>(PhantomData<A>);
pub trait AsUnsaturatedArgs<SB> {
fn as_unsaturated_args(&self) -> UnsaturatedArgs<SB>;
}
impl AsUnsaturatedArgs<()> for JsonGlob {
fn as_unsaturated_args(&self) -> UnsaturatedArgs<()> {
let count = match self {
JsonGlob::PathWithStar(path_with_star) => path_with_star.count(),
JsonGlob::Path(_) => 0,
};
if count != 0 {
#[cfg(feature = "tracing")]
tracing::error!("By counts={}, this phantom type = () is not valid", count);
#[cfg(test)]
panic!("By counts={}, this phantom type = () is not valid", count);
}
UnsaturatedArgs(PhantomData)
}
}
impl AsUnsaturatedArgs<String> for JsonGlob {
fn as_unsaturated_args(&self) -> UnsaturatedArgs<String> {
let count = match self {
JsonGlob::PathWithStar(path_with_star) => path_with_star.count(),
JsonGlob::Path(_) => 0,
};
if count != 1 {
#[cfg(feature = "tracing")]
tracing::error!(
"By counts={}, this phantom type = String is not valid",
count
);
#[cfg(test)]
panic!(
"By counts={}, this phantom type = String is not valid",
count
);
}
UnsaturatedArgs(PhantomData)
}
}
impl AsUnsaturatedArgs<(String, String)> for JsonGlob {
fn as_unsaturated_args(&self) -> UnsaturatedArgs<(String, String)> {
let count = match self {
JsonGlob::PathWithStar(path_with_star) => path_with_star.count(),
JsonGlob::Path(_) => 0,
};
if count != 2 {
#[cfg(feature = "tracing")]
tracing::error!(
"By counts={}, this phantom type = (String, String) is not valid",
count
);
#[cfg(test)]
panic!(
"By counts={}, this phantom type = (String, String) is not valid",
count
);
}
UnsaturatedArgs(PhantomData)
}
}

View File

@@ -1,410 +0,0 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use async_trait::async_trait;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::{
bulk_locks::{self, Verifier},
locker::{Guard, LockType},
};
use crate::{model_paths::JsonGlob, patch::DiffPatch};
use crate::{Error, Locker, PatchDb, Revision, Store, Transaction};
#[derive(Debug, Clone, Default)]
pub struct HandleId {
pub(crate) id: u64,
#[cfg(feature = "trace")]
#[allow(dead_code)]
pub(crate) trace: Option<Arc<tracing_error::SpanTrace>>,
}
impl PartialEq for HandleId {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for HandleId {}
impl PartialOrd for HandleId {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.id.partial_cmp(&other.id)
}
}
impl Ord for HandleId {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.id.cmp(&other.id)
}
}
#[async_trait]
pub trait DbHandle: Send + Sync + Sized {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
async fn lock_all<'a>(
&'a mut self,
locks: impl IntoIterator<Item = bulk_locks::LockTargetId> + Send + Sync + Clone + 'a,
) -> Result<bulk_locks::Verifier, Error>;
fn id(&self) -> HandleId;
fn rebase(&mut self);
fn store(&self) -> Arc<RwLock<Store>>;
fn locker(&self) -> &Locker;
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> bool;
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> BTreeSet<String>;
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error>;
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<Option<Arc<Revision>>, Error>;
async fn apply(
&mut self,
patch: DiffPatch,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error>;
async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error>;
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error>;
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<Option<Arc<Revision>>, Error>;
}
#[async_trait]
impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
let Transaction {
locks,
updates,
sub,
..
} = (*self).begin().await?;
Ok(Transaction {
id: self.id(),
parent: self,
locks,
updates,
sub,
})
}
fn id(&self) -> HandleId {
(**self).id()
}
fn rebase(&mut self) {
(**self).rebase()
}
fn store(&self) -> Arc<RwLock<Store>> {
(**self).store()
}
fn locker(&self) -> &Locker {
(**self).locker()
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> bool {
(*self).exists(ptr, store_read_lock).await
}
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> BTreeSet<String> {
(*self).keys(ptr, store_read_lock).await
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> {
(*self).get_value(ptr, store_read_lock).await
}
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<Option<Arc<Revision>>, Error> {
(*self).put_value(ptr, value).await
}
async fn apply(
&mut self,
patch: DiffPatch,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error> {
(*self).apply(patch, store_write_lock).await
}
async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error> {
(*self).lock(ptr, lock_type).await
}
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
(*self).get(ptr).await
}
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
(*self).put(ptr, value).await
}
async fn lock_all<'a>(
&'a mut self,
locks: impl IntoIterator<Item = bulk_locks::LockTargetId> + Send + Sync + Clone + 'a,
) -> Result<bulk_locks::Verifier, Error> {
(*self).lock_all(locks).await
}
}
pub struct PatchDbHandle {
pub(crate) id: HandleId,
pub(crate) db: PatchDb,
pub(crate) locks: Vec<Guard>,
}
impl std::fmt::Debug for PatchDbHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PatchDbHandle")
.field("id", &self.id)
.field("locks", &self.locks)
.finish()
}
}
#[async_trait]
impl DbHandle for PatchDbHandle {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
Ok(Transaction {
sub: self.db.subscribe().await,
id: self.id(),
parent: self,
locks: Vec::new(),
updates: DiffPatch::default(),
})
}
fn id(&self) -> HandleId {
self.id.clone()
}
fn rebase(&mut self) {}
fn store(&self) -> Arc<RwLock<Store>> {
self.db.store.clone()
}
fn locker(&self) -> &Locker {
&self.db.locker
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> bool {
if let Some(lock) = store_read_lock {
lock.exists(ptr)
} else {
self.db.exists(ptr).await
}
}
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> BTreeSet<String> {
if let Some(lock) = store_read_lock {
lock.keys(ptr)
} else {
self.db.keys(ptr).await
}
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> {
Ok(if let Some(lock) = store_read_lock {
lock.get_value(ptr)
} else {
self.db.get_value(ptr).await
})
}
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<Option<Arc<Revision>>, Error> {
self.db.put(ptr, value).await
}
async fn apply(
&mut self,
patch: DiffPatch,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error> {
self.db.apply(patch, store_write_lock).await
}
async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error> {
self.locks
.push(self.db.locker.lock(self.id.clone(), ptr, lock_type).await?);
Ok(())
}
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
self.db.get(ptr).await
}
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
self.db.put(ptr, value).await
}
async fn lock_all<'a>(
&'a mut self,
locks: impl IntoIterator<Item = bulk_locks::LockTargetId> + Send + Sync + Clone + 'a,
) -> Result<bulk_locks::Verifier, Error> {
let verifier = Verifier {
target_locks: locks.clone().into_iter().collect(),
};
let guard = self.db.locker.lock_all(&self.id, locks).await?;
self.locks.push(guard);
Ok(verifier)
}
}
pub mod test_utils {
use async_trait::async_trait;
use crate::{Error, Locker, Revision, Store, Transaction};
use super::*;
pub struct NoOpDb();
#[async_trait]
impl DbHandle for NoOpDb {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
unimplemented!()
}
fn id(&self) -> HandleId {
unimplemented!()
}
fn rebase(&mut self) {
unimplemented!()
}
fn store(&self) -> Arc<RwLock<Store>> {
unimplemented!()
}
fn locker(&self) -> &Locker {
unimplemented!()
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
_ptr: &JsonPointer<S, V>,
_store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> bool {
unimplemented!()
}
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
_ptr: &JsonPointer<S, V>,
_store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> BTreeSet<String> {
unimplemented!()
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
_ptr: &JsonPointer<S, V>,
_store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> {
unimplemented!()
}
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
_ptr: &JsonPointer<S, V>,
_value: &Value,
) -> Result<Option<Arc<Revision>>, Error> {
unimplemented!()
}
async fn apply(
&mut self,
_patch: DiffPatch,
_store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error> {
unimplemented!()
}
async fn lock(&mut self, _ptr: JsonGlob, _lock_type: LockType) -> Result<(), Error> {
unimplemented!()
}
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
_ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
unimplemented!()
}
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
_ptr: &JsonPointer<S, V>,
_value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
unimplemented!()
}
async fn lock_all<'a>(
&'a mut self,
locks: impl IntoIterator<Item = bulk_locks::LockTargetId> + Send + Sync + Clone + 'a,
) -> Result<bulk_locks::Verifier, Error> {
let skeleton_key = Verifier {
target_locks: locks.into_iter().collect(),
};
Ok(skeleton_key)
}
}
}

View File

@@ -2,51 +2,35 @@ use std::io::Error as IOError;
use std::sync::Arc;
use json_ptr::JsonPointer;
use locker::LockError;
use thiserror::Error;
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
mod bulk_locks;
mod handle;
mod locker;
mod model;
mod model_paths;
mod patch;
mod store;
mod subscriber;
mod transaction;
#[cfg(test)]
mod test;
pub use handle::{DbHandle, PatchDbHandle};
pub use locker::{LockType, Locker};
pub use model::{
BoxModel, HasModel, Map, MapModel, Model, ModelData, ModelDataMut, OptionModel, VecModel,
};
pub use model_paths::{JsonGlob, JsonGlobSegment};
pub use imbl_value as value;
pub use imbl_value::Value;
pub use model::{HasModel, Model, ModelExt};
pub use patch::{DiffPatch, Dump, Revision};
pub use patch_db_macro::HasModel;
pub use store::{PatchDb, Store};
pub use transaction::Transaction;
use tokio::sync::TryLockError;
pub use {json_patch, json_ptr};
pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier};
pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver<Arc<Revision>>;
pub mod test_utils {
use super::*;
pub use handle::test_utils::*;
}
#[derive(Error, Debug)]
pub enum Error {
#[error("IO Error: {0}")]
IO(#[from] IOError),
#[error("JSON (De)Serialization Error: {0}")]
JSON(#[from] serde_json::Error),
JSON(#[from] imbl_value::Error),
#[error("CBOR (De)Serialization Error: {0}")]
CBOR(#[from] serde_cbor::Error),
#[error("Index Error: {0:?}")]
@@ -63,8 +47,8 @@ pub enum Error {
Subscriber(#[from] tokio::sync::mpsc::error::TryRecvError),
#[error("Node Does Not Exist: {0}")]
NodeDoesNotExist(JsonPointer),
#[error("Invalid Lock Request: {0}")]
LockError(#[from] LockError),
#[error("Invalid Lock Request: {0}")]
Locker(String),
#[error("Provided Function Panicked! {0}")]
Panic(String),
#[error("Would Block")]
WouldBlock(#[from] TryLockError),
}

View File

@@ -1,119 +0,0 @@
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use super::{LockInfos, Request};
#[derive(Debug)]
pub(super) enum Action {
HandleRequest(Request),
HandleRelease(LockInfos),
HandleCancel(LockInfos),
}
struct InboundRequestQueue {
closed: bool,
recv: mpsc::UnboundedReceiver<Request>,
}
pub(super) struct ActionMux {
inbound_request_queue: InboundRequestQueue,
unlock_receivers: Vec<oneshot::Receiver<LockInfos>>,
cancellation_receivers: Vec<oneshot::Receiver<LockInfos>>,
_dummy_senders: Vec<oneshot::Sender<LockInfos>>,
}
impl ActionMux {
pub fn new(inbound_receiver: UnboundedReceiver<Request>) -> Self {
// futures::future::select_all will panic if the list is empty
// instead we want it to block forever by adding a channel that will never recv
let (unlock_dummy_send, unlock_dummy_recv) = oneshot::channel();
let unlock_receivers = vec![unlock_dummy_recv];
let (cancel_dummy_send, cancel_dummy_recv) = oneshot::channel();
let cancellation_receivers = vec![cancel_dummy_recv];
ActionMux {
inbound_request_queue: InboundRequestQueue {
recv: inbound_receiver,
closed: false,
},
unlock_receivers,
cancellation_receivers,
_dummy_senders: vec![unlock_dummy_send, cancel_dummy_send],
}
}
async fn get_action(&mut self) -> Option<Action> {
loop {
if self.inbound_request_queue.closed
&& self.unlock_receivers.len() == 1
&& self.cancellation_receivers.len() == 1
{
return None;
}
tokio::select! {
a = self.inbound_request_queue.recv.recv() => {
if let Some(a) = a {
return Some(Action::HandleRequest(a));
} else {
self.inbound_request_queue.closed = true;
}
}
(a, idx, _) = futures::future::select_all(self.unlock_receivers.iter_mut()) => {
self.unlock_receivers.swap_remove(idx);
return Some(Action::HandleRelease(a.unwrap()))
}
(a, idx, _) = futures::future::select_all(self.cancellation_receivers.iter_mut()) => {
self.cancellation_receivers.swap_remove(idx);
if let Ok(a) = a {
return Some(Action::HandleCancel(a))
}
}
}
}
}
pub async fn get_prioritized_action_queue(&mut self) -> Vec<Action> {
if let Some(action) = self.get_action().await {
let mut actions = Vec::new();
// find all serviceable lock releases
for mut r in std::mem::take(&mut self.unlock_receivers) {
match r.try_recv() {
Ok(lock_info) => {
actions.push(Action::HandleRelease(lock_info));
}
Err(TryRecvError::Empty) => {
self.unlock_receivers.push(r);
}
Err(TryRecvError::Closed) => (),
}
}
// find all serviceable lock cancellations
for mut r in std::mem::take(&mut self.cancellation_receivers) {
match r.try_recv() {
Ok(lock_info) => {
actions.push(Action::HandleCancel(lock_info));
}
Err(TryRecvError::Empty) => {
self.cancellation_receivers.push(r);
}
Err(TryRecvError::Closed) => (),
}
}
// finally add the action that started it all
actions.push(action);
actions
} else {
Vec::new()
}
}
pub fn push_unlock_receivers<T: IntoIterator<Item = oneshot::Receiver<LockInfos>>>(
&mut self,
recv: T,
) {
self.unlock_receivers.extend(recv)
}
pub fn push_cancellation_receiver(&mut self, recv: oneshot::Receiver<LockInfos>) {
self.cancellation_receivers.push(recv)
}
}

View File

@@ -1,340 +0,0 @@
use std::collections::VecDeque;
use imbl::{ordmap, ordset, OrdMap, OrdSet};
use tokio::sync::oneshot;
#[cfg(feature = "tracing")]
use tracing::{debug, error, info, warn};
#[cfg(feature = "unstable")]
use super::order_enforcer::LockOrderEnforcer;
use super::trie::LockTrie;
use super::{LockError, LockInfos, Request};
use crate::handle::HandleId;
#[cfg(feature = "tracing")]
use crate::locker::log_utils::{
display_session_set, fmt_acquired, fmt_cancelled, fmt_deferred, fmt_released,
};
use crate::locker::LockSet;
// solely responsible for managing the bookkeeping requirements of requests
pub(super) struct LockBookkeeper {
trie: LockTrie,
deferred_request_queue: VecDeque<(Request, OrdSet<HandleId>)>,
#[cfg(feature = "unstable")]
order_enforcer: LockOrderEnforcer,
}
impl LockBookkeeper {
pub fn new() -> Self {
LockBookkeeper {
trie: LockTrie::default(),
deferred_request_queue: VecDeque::new(),
#[cfg(feature = "unstable")]
order_enforcer: LockOrderEnforcer::new(),
}
}
pub fn lease(
&mut self,
req: Request,
) -> Result<Option<oneshot::Receiver<LockInfos>>, LockError> {
#[cfg(feature = "unstable")]
if let Err(e) = self.order_enforcer.try_insert(&req.lock_info) {
req.reject(e.clone());
return Err(e);
}
// In normal operation we start here
let hot_seat = self.deferred_request_queue.pop_front();
let res = process_new_req(
req,
hot_seat.as_ref(),
&mut self.trie,
&mut self.deferred_request_queue,
);
if let Some(hot_seat) = hot_seat {
self.deferred_request_queue.push_front(hot_seat);
kill_deadlocked(&mut self.deferred_request_queue, &self.trie);
}
Ok(res)
}
pub fn cancel(&mut self, info: &LockInfos) {
#[cfg(feature = "unstable")]
for info in info.as_vec() {
self.order_enforcer.remove(&info);
}
let entry = self
.deferred_request_queue
.iter()
.enumerate()
.find(|(_, (r, _))| &r.lock_info == info);
let index = match entry {
None => {
#[cfg(feature = "tracing")]
{
let infos = &info.0;
warn!(
"Received cancellation for some locks not currently waiting: [{}]",
infos
.iter()
.enumerate()
.fold(String::new(), |acc, (i, new)| {
if i > 0 {
format!("{}/{}", acc, new.ptr)
} else {
format!("/{}", new.ptr)
}
})
);
}
return;
}
Some(value) => {
#[cfg(feature = "tracing")]
for lock_info in value.1 .0.lock_info.as_vec() {
info!("{}", fmt_cancelled(lock_info));
}
value.0
}
};
self.deferred_request_queue.remove(index);
}
pub fn ret(&mut self, info: &LockInfos) -> Vec<oneshot::Receiver<LockInfos>> {
#[cfg(feature = "unstable")]
for info in info.as_vec() {
self.order_enforcer.remove(&info);
}
for info in info.as_vec() {
self.trie.unlock(info);
}
#[cfg(feature = "tracing")]
{
for info in info.as_vec() {
info!("{}", fmt_released(&info));
}
debug!("Reexamining request queue backlog...");
}
// try to pop off as many requests off the front of the queue as we can
let mut new_unlock_receivers = vec![];
let mut hot_seat = None;
while let Some((r, _)) = self.deferred_request_queue.pop_front() {
match self.trie.try_lock(&r.lock_info) {
Ok(()) => {
let recv = r.complete();
new_unlock_receivers.push(recv);
}
Err(new_blocking_sessions) => {
// set the hot seat and proceed to step two
hot_seat = Some((r, new_blocking_sessions));
break;
}
}
}
// when we can no longer do so, try and service the rest of the queue with the new hot seat
let old_request_queue = std::mem::take(&mut self.deferred_request_queue);
for (r, _) in old_request_queue {
// we now want to process each request in the queue as if it was new
let res = process_new_req(
r,
hot_seat.as_ref(),
&mut self.trie,
&mut self.deferred_request_queue,
);
if let Some(recv) = res {
new_unlock_receivers.push(recv);
}
}
if let Some(hot_seat) = hot_seat {
self.deferred_request_queue.push_front(hot_seat);
kill_deadlocked(&mut self.deferred_request_queue, &self.trie);
}
new_unlock_receivers
}
}
// to prevent starvation we privilege the front of the queue and only allow requests that
// conflict with the request at the front to go through if they are requested by sessions that
// are *currently blocking* the front of the queue
fn process_new_req(
req: Request,
hot_seat: Option<&(Request, OrdSet<HandleId>)>,
trie: &mut LockTrie,
request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>,
) -> Option<oneshot::Receiver<LockInfos>> {
#[cfg(feature = "tracing")]
let lock_infos = req.lock_info.as_vec();
match hot_seat {
// hot seat conflicts and request session isn't in current blocking sessions
// so we push it to the queue
Some((hot_req, hot_blockers))
if hot_req.lock_info.conflicts_with(&req.lock_info)
&& !req
.lock_info
.as_vec()
.iter()
.any(|lock_info| hot_blockers.contains(&lock_info.handle_id)) =>
{
#[cfg(feature = "tracing")]
{
for lock_info in lock_infos.iter() {
info!("{}", fmt_deferred(&lock_info));
}
if let Some(hot_req_lock_info) = hot_req.lock_info.as_vec().first() {
debug!(
"Must wait on hot seat request from session {}",
&hot_req_lock_info.handle_id.id
);
}
}
request_queue.push_back((req, ordset![]));
None
}
// otherwise we try and service it immediately, only pushing to the queue if it fails
_ => match trie.try_lock(&req.lock_info) {
Ok(()) => {
#[cfg(feature = "tracing")]
for lock_info in lock_infos.iter() {
info!("{}", fmt_acquired(&lock_info));
}
Some(req.complete())
}
Err(blocking_sessions) => {
#[cfg(feature = "tracing")]
{
for lock_info in lock_infos.iter() {
info!("{}", fmt_deferred(&lock_info));
}
debug!(
"Must wait on sessions {}",
display_session_set(&blocking_sessions)
)
}
request_queue.push_back((req, blocking_sessions));
None
}
},
}
}
fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>, trie: &LockTrie) {
// TODO optimize this, it is unlikely that we are anywhere close to as efficient as we can be here.
let deadlocked_reqs = deadlock_scan(request_queue);
if !deadlocked_reqs.is_empty() {
let locks_waiting = LockSet(
deadlocked_reqs
.iter()
.map(|r| r.lock_info.clone())
.collect(),
);
#[cfg(feature = "tracing")]
error!("Deadlock Detected: {:?}", locks_waiting);
let err = LockError::DeadlockDetected {
locks_waiting,
locks_held: LockSet(
trie.subtree_lock_info()
.into_iter()
.map(|x| vec![x])
.map(LockInfos)
.collect(),
),
};
let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len());
for (i, (req, _)) in request_queue.iter().enumerate() {
if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) {
indices_to_remove.push(i)
}
}
let old = std::mem::take(request_queue);
for (i, (r, s)) in old.into_iter().enumerate() {
if indices_to_remove.contains(&i) {
r.reject(err.clone())
} else {
request_queue.push_back((r, s))
}
}
}
}
pub(super) fn deadlock_scan(queue: &VecDeque<(Request, OrdSet<HandleId>)>) -> Vec<&'_ Request> {
let (wait_map, mut req_map) = queue
.iter()
.flat_map(|(req, set)| {
req.lock_info
.as_vec()
.into_iter()
.map(|lock_info| (&lock_info.handle_id, set, req))
.collect::<Vec<_>>()
})
.fold(
(ordmap! {}, ordmap! {}),
|(mut wmap, mut rmap): (
OrdMap<&HandleId, &OrdSet<HandleId>>,
OrdMap<&HandleId, &Request>,
),
(id, wset, req)| {
(
{
wmap.insert(id, wset);
wmap
},
{
rmap.insert(id, req);
rmap
},
)
},
);
for (root, wait_set) in wait_map.iter() {
let cycle = wait_set.iter().find_map(|start| {
Some(path_to(&wait_map, ordset![], root, start)).filter(|s| !s.is_empty())
});
match cycle {
None => {
continue;
}
Some(c) => {
return c
.into_iter()
.map(|id| req_map.remove(id).unwrap())
.collect();
}
}
}
vec![]
}
pub(super) fn path_to<'a>(
graph: &OrdMap<&'a HandleId, &'a OrdSet<HandleId>>,
visited: OrdSet<&'a HandleId>,
root: &'a HandleId,
node: &'a HandleId,
) -> OrdSet<&'a HandleId> {
if node == root {
return ordset![root];
}
if visited.contains(node) {
return ordset![];
}
match graph.get(node) {
None => ordset![],
Some(s) => s
.iter()
.find_map(|h| {
Some(path_to(graph, visited.update(node), root, h)).filter(|s| !s.is_empty())
})
.map_or(ordset![], |mut s| {
s.insert(node);
s
}),
}
}

View File

@@ -1,48 +0,0 @@
use imbl::OrdSet;
use super::LockInfo;
use crate::handle::HandleId;
#[cfg(feature = "tracing")]
pub fn display_session_set(set: &OrdSet<HandleId>) -> String {
use std::fmt::Write;
let mut display = String::from("{");
for session in set.iter() {
write!(display, "{},", session.id).unwrap();
}
display.replace_range(display.len() - 1.., "}");
display
}
#[cfg(feature = "tracing")]
pub(super) fn fmt_acquired(lock_info: &LockInfo) -> String {
format!(
"Acquired: session {} - {} lock on {}",
lock_info.handle_id.id, lock_info.ty, lock_info.ptr,
)
}
#[cfg(feature = "tracing")]
pub(super) fn fmt_deferred(deferred_lock_info: &LockInfo) -> String {
format!(
"Deferred: session {} - {} lock on {}",
deferred_lock_info.handle_id.id, deferred_lock_info.ty, deferred_lock_info.ptr,
)
}
#[cfg(feature = "tracing")]
pub(super) fn fmt_released(released_lock_info: &LockInfo) -> String {
format!(
"Released: session {} - {} lock on {}",
released_lock_info.handle_id.id, released_lock_info.ty, released_lock_info.ptr
)
}
#[cfg(feature = "tracing")]
pub(super) fn fmt_cancelled(cancelled_lock_info: &LockInfo) -> String {
format!(
"Canceled: session {} - {} lock on {}",
cancelled_lock_info.handle_id.id, cancelled_lock_info.ty, cancelled_lock_info.ptr
)
}

View File

@@ -1,470 +0,0 @@
mod action_mux;
mod bookkeeper;
#[cfg(feature = "tracing")]
mod log_utils;
mod natural;
mod order_enforcer;
#[cfg(test)]
pub(crate) mod proptest;
mod trie;
use imbl::{ordmap, ordset, OrdMap, OrdSet};
use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "tracing")]
use tracing::{debug, trace, warn};
use self::action_mux::ActionMux;
use self::bookkeeper::LockBookkeeper;
use crate::{bulk_locks::LockTargetId, locker::action_mux::Action, Verifier};
use crate::{handle::HandleId, JsonGlob};
pub struct Locker {
sender: mpsc::UnboundedSender<Request>,
}
impl Locker {
pub fn new() -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
tokio::spawn(async move {
let mut action_mux = ActionMux::new(receiver);
let mut lock_server = LockBookkeeper::new();
loop {
let actions = action_mux.get_prioritized_action_queue().await;
if actions.is_empty() {
break;
}
for action in actions {
#[cfg(feature = "tracing")]
trace!("Locker Action: {:#?}", action);
match action {
Action::HandleRequest(mut req) => {
#[cfg(feature = "tracing")]
debug!("New lock request: {}", &req.lock_info);
// Pertinent Logic
let req_cancel =
req.cancel.take().expect("Request Cancellation Stolen");
match lock_server.lease(req) {
Ok(Some(recv)) => {
action_mux.push_unlock_receivers(std::iter::once(recv))
}
Ok(None) => action_mux.push_cancellation_receiver(req_cancel),
Err(_) => {}
}
}
Action::HandleRelease(lock_info) => {
#[cfg(feature = "tracing")]
debug!("New lock release: {}", &lock_info);
let new_unlock_receivers = lock_server.ret(&lock_info);
action_mux.push_unlock_receivers(new_unlock_receivers);
}
Action::HandleCancel(lock_info) => {
#[cfg(feature = "tracing")]
debug!("New request canceled: {}", &lock_info);
lock_server.cancel(&lock_info)
}
}
}
}
});
Locker { sender }
}
pub async fn lock(
&self,
handle_id: HandleId,
ptr: JsonGlob,
lock_type: LockType,
) -> Result<Guard, LockError> {
// Pertinent Logic
let lock_info: LockInfos = LockInfo {
handle_id,
ptr,
ty: lock_type,
}
.into();
self._lock(lock_info).await
}
pub async fn lock_all(
&self,
handle_id: &HandleId,
locks: impl IntoIterator<Item = LockTargetId> + Send,
) -> Result<Guard, LockError> {
let lock_infos = LockInfos(
locks
.into_iter()
.map(
|LockTargetId {
glob: ptr,
lock_type: ty,
}| {
LockInfo {
handle_id: handle_id.clone(),
ptr,
ty,
}
},
)
.collect(),
);
self._lock(lock_infos).await
}
async fn _lock(&self, lock_info: LockInfos) -> Result<Guard, LockError> {
let (send, recv) = oneshot::channel();
let (cancel_send, cancel_recv) = oneshot::channel();
let mut cancel_guard = CancelGuard {
lock_info: Some(lock_info.clone()),
channel: Some(cancel_send),
recv,
};
self.sender
.send(Request {
lock_info,
cancel: Some(cancel_recv),
completion: send,
})
.unwrap();
let res = (&mut cancel_guard.recv).await.unwrap();
cancel_guard.channel.take();
res
}
}
#[derive(Debug)]
struct CancelGuard {
lock_info: Option<LockInfos>,
channel: Option<oneshot::Sender<LockInfos>>,
recv: oneshot::Receiver<Result<Guard, LockError>>,
}
impl Drop for CancelGuard {
fn drop(&mut self) {
if let (Some(lock_info), Some(channel)) = (self.lock_info.take(), self.channel.take()) {
self.recv.close();
let _ = channel.send(lock_info);
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct LockInfos(pub Vec<LockInfo>);
impl LockInfos {
fn conflicts_with(&self, other: &LockInfos) -> bool {
let other_lock_infos = &other.0;
self.0.iter().any(|lock_info| {
other_lock_infos
.iter()
.any(|other_lock_info| lock_info.conflicts_with(other_lock_info))
})
}
fn as_vec(&self) -> &Vec<LockInfo> {
&self.0
}
}
impl std::fmt::Display for LockInfos {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let lock_infos = &self.0;
for lock_info in lock_infos {
write!(f, "{},", lock_info)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
struct LockInfo {
handle_id: HandleId,
ptr: JsonGlob,
ty: LockType,
}
impl LockInfo {
fn conflicts_with(&self, other: &LockInfo) -> bool {
self.handle_id != other.handle_id
&& match (self.ty, other.ty) {
(LockType::Exist, LockType::Exist) => false,
(LockType::Exist, LockType::Read) => false,
(LockType::Exist, LockType::Write) => self.ptr.starts_with(&other.ptr),
(LockType::Read, LockType::Exist) => false,
(LockType::Read, LockType::Read) => false,
(LockType::Read, LockType::Write) => {
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
}
(LockType::Write, LockType::Exist) => other.ptr.starts_with(&self.ptr),
(LockType::Write, LockType::Read) => {
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
}
(LockType::Write, LockType::Write) => {
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
}
}
}
#[cfg(any(feature = "unstable", test))]
fn implicitly_grants(&self, other: &LockInfo) -> bool {
self.handle_id == other.handle_id
&& match self.ty {
LockType::Exist => other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr),
LockType::Read => {
// E's in the ancestry
other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr)
// nonexclusive locks in the subtree
|| other.ty != LockType::Write && other.ptr.starts_with(&self.ptr)
}
LockType::Write => {
// E's in the ancestry
other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr)
// anything in the subtree
|| other.ptr.starts_with(&self.ptr)
}
}
}
}
impl From<LockInfo> for LockInfos {
fn from(lock_info: LockInfo) -> Self {
LockInfos(vec![lock_info])
}
}
impl std::fmt::Display for LockInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}{}", self.handle_id.id, self.ty, self.ptr)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LockType {
Exist,
Read,
Write,
}
impl Default for LockType {
fn default() -> Self {
LockType::Exist
}
}
impl std::fmt::Display for LockType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let show = match self {
LockType::Exist => "E",
LockType::Read => "R",
LockType::Write => "W",
};
write!(f, "{}", show)
}
}
#[derive(Debug, Clone)]
pub struct LockSet(OrdSet<LockInfos>);
impl std::fmt::Display for LockSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let by_session = self
.0
.iter()
.flat_map(|x| x.as_vec())
.map(|i| (&i.handle_id, ordset![(&i.ptr, &i.ty)]))
.fold(
ordmap! {},
|m: OrdMap<&HandleId, OrdSet<(&JsonGlob, &LockType)>>, (id, s)| {
m.update_with(&id, s, OrdSet::union)
},
);
let num_sessions = by_session.len();
for (i, (session, set)) in by_session.into_iter().enumerate() {
write!(f, "{}: {{ ", session.id)?;
let num_entries = set.len();
for (j, (ptr, ty)) in set.into_iter().enumerate() {
write!(f, "{}{}", ty, ptr)?;
if j == num_entries - 1 {
write!(f, " }}")?;
} else {
write!(f, ", ")?;
}
}
if i != num_sessions - 1 {
write!(f, "\n")?;
}
}
Ok(())
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum LockError {
#[error("Lock Taxonomy Escalation: Session = {session:?}, First = {first}, Second = {second}")]
LockTaxonomyEscalation {
session: HandleId,
first: JsonGlob,
second: JsonGlob,
},
#[error("Lock Type Escalation: Session = {session:?}, Pointer = {ptr}, First = {first}, Second = {second}")]
LockTypeEscalation {
session: HandleId,
ptr: JsonGlob,
first: LockType,
second: LockType,
},
#[error("Lock Type Escalation Implicit: Session = {session:?}, First = {first_ptr}:{first_type}, Second = {second_ptr}:{second_type}")]
LockTypeEscalationImplicit {
session: HandleId,
first_ptr: JsonGlob,
first_type: LockType,
second_ptr: JsonGlob,
second_type: LockType,
},
#[error(
"Non-Canonical Lock Ordering: Session = {session:?}, First = {first}, Second = {second}"
)]
NonCanonicalOrdering {
session: HandleId,
first: JsonGlob,
second: JsonGlob,
},
#[error("Deadlock Detected:\nLocks Held =\n{locks_held},\nLocks Waiting =\n{locks_waiting}")]
DeadlockDetected {
locks_held: LockSet,
locks_waiting: LockSet,
},
}
#[derive(Debug)]
struct Request {
lock_info: LockInfos,
cancel: Option<oneshot::Receiver<LockInfos>>,
completion: oneshot::Sender<Result<Guard, LockError>>,
}
impl Request {
fn complete(self) -> oneshot::Receiver<LockInfos> {
let (sender, receiver) = oneshot::channel();
if let Err(_) = self.completion.send(Ok(Guard {
lock_info: self.lock_info,
sender: Some(sender),
})) {
#[cfg(feature = "tracing")]
warn!("Completion sent to closed channel.")
}
receiver
}
fn reject(self, err: LockError) {
if let Err(_) = self.completion.send(Err(err)) {
#[cfg(feature = "tracing")]
warn!("Rejection sent to closed channel.")
}
}
}
#[derive(Debug)]
pub struct Guard {
lock_info: LockInfos,
sender: Option<oneshot::Sender<LockInfos>>,
}
impl Drop for Guard {
fn drop(&mut self) {
if let Err(_e) = self
.sender
.take()
.unwrap()
.send(std::mem::take(&mut self.lock_info))
{
#[cfg(feature = "tracing")]
warn!("Failed to release lock: {:?}", _e)
}
}
}
#[test]
fn conflicts_with_locker_infos_cases() {
let mut id: u64 = 0;
let lock_info_a = LockInfo {
handle_id: HandleId {
id: {
id += 1;
id
},
#[cfg(feature = "trace")]
trace: None,
},
ty: LockType::Write,
ptr: "/a".parse().unwrap(),
};
let lock_infos_a = LockInfos(vec![lock_info_a.clone()]);
let lock_info_b = LockInfo {
handle_id: HandleId {
id: {
id += 1;
id
},
#[cfg(feature = "trace")]
trace: None,
},
ty: LockType::Write,
ptr: "/b".parse().unwrap(),
};
let lock_infos_b = LockInfos(vec![lock_info_b.clone()]);
let lock_info_a_s = LockInfo {
handle_id: HandleId {
id: {
id += 1;
id
},
#[cfg(feature = "trace")]
trace: None,
},
ty: LockType::Write,
ptr: "/a/*".parse().unwrap(),
};
let lock_infos_a_s = LockInfos(vec![lock_info_a_s.clone()]);
let lock_info_a_s_c = LockInfo {
handle_id: HandleId {
id: {
id += 1;
id
},
#[cfg(feature = "trace")]
trace: None,
},
ty: LockType::Write,
ptr: "/a/*/c".parse().unwrap(),
};
let lock_infos_a_s_c = LockInfos(vec![lock_info_a_s_c.clone()]);
let lock_info_a_b_c = LockInfo {
handle_id: HandleId {
id: {
id += 1;
id
},
#[cfg(feature = "trace")]
trace: None,
},
ty: LockType::Write,
ptr: "/a/b/c".parse().unwrap(),
};
let lock_infos_a_b_c = LockInfos(vec![lock_info_a_b_c.clone()]);
let lock_infos_set = LockInfos(vec![lock_info_a.clone()]);
let lock_infos_set_b = LockInfos(vec![lock_info_b]);
let lock_infos_set_deep = LockInfos(vec![
lock_info_a_s.clone(),
lock_info_a_s_c.clone(),
lock_info_a_b_c.clone(),
]);
let lock_infos_set_all = LockInfos(vec![
lock_info_a,
lock_info_a_s,
lock_info_a_s_c,
lock_info_a_b_c,
]);
assert!(!lock_infos_b.conflicts_with(&lock_infos_a));
assert!(!lock_infos_a.conflicts_with(&lock_infos_a)); // same lock won't
assert!(lock_infos_a_s.conflicts_with(&lock_infos_a)); // Since the parent is locked, it won't be able to
assert!(lock_infos_a_s.conflicts_with(&lock_infos_a_s_c));
assert!(lock_infos_a_s_c.conflicts_with(&lock_infos_a_b_c));
assert!(!lock_infos_set.conflicts_with(&lock_infos_a)); // Same lock again
assert!(lock_infos_set.conflicts_with(&lock_infos_set_deep)); // Since this is a parent
assert!(!lock_infos_set_b.conflicts_with(&lock_infos_set_deep)); // Sets are exclusive
assert!(!lock_infos_set.conflicts_with(&lock_infos_set_b)); // Sets are exclusive
assert!(lock_infos_set_deep.conflicts_with(&lock_infos_set)); // Shared parent a
assert!(lock_infos_set_deep.conflicts_with(&lock_infos_set_all)); // Shared parent a
}

View File

@@ -1,28 +0,0 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct Natural(usize);
impl Natural {
pub fn one() -> Self {
Natural(1)
}
pub fn of(n: usize) -> Option<Self> {
if n == 0 {
None
} else {
Some(Natural(n))
}
}
pub fn inc(&mut self) {
self.0 += 1;
}
pub fn dec(mut self) -> Option<Natural> {
self.0 -= 1;
if self.0 == 0 {
None
} else {
Some(self)
}
}
pub fn into_usize(self) -> usize {
self.0
}
}

View File

@@ -1,157 +0,0 @@
use imbl::OrdMap;
#[cfg(feature = "tracing")]
use tracing::warn;
use super::LockInfo;
use crate::LockType;
use crate::{handle::HandleId, model_paths::JsonGlob};
#[cfg(any(feature = "unstable", test))]
use super::LockError;
#[derive(Debug, PartialEq, Eq)]
pub(super) struct LockOrderEnforcer {
locks_held: OrdMap<HandleId, OrdMap<(JsonGlob, LockType), usize>>,
}
impl LockOrderEnforcer {
#[cfg(any(feature = "unstable", test))]
pub fn new() -> Self {
LockOrderEnforcer {
locks_held: imbl::ordmap! {},
}
}
#[cfg_attr(feature = "trace", tracing::instrument)]
#[cfg(any(feature = "unstable", test))]
// locks must be acquired in lexicographic order for the pointer, and reverse order for type
fn validate(&self, req: &LockInfo) -> Result<(), LockError> {
// the following notation is used to denote an example sequence that can cause deadlocks
//
// Individual Lock Requests
// 1W/A/B
// |||> Node whose lock is being acquired: /A/B (strings prefixed by slashes, indicating descent path)
// ||> Type of Lock: W (E/R/W)
// |> Session Number: 1 (any natural number)
//
// Sequences
// LockRequest >> LockRequest
match self.locks_held.get(&req.handle_id) {
None => Ok(()),
Some(m) => {
// quick accept
for (ptr, ty) in m.keys() {
let tmp = LockInfo {
ptr: ptr.clone(),
ty: *ty,
handle_id: req.handle_id.clone(),
};
if tmp.implicitly_grants(req) {
return Ok(());
}
}
let err = m.keys().find_map(|(ptr, ty)| match ptr.cmp(&req.ptr) {
std::cmp::Ordering::Less => {
if req.ptr.starts_with(ptr)
&& req.ty == LockType::Write
&& *ty == LockType::Read
{
// 1R/A >> 2R/A >> 1W/A/A >> 2W/A/B
Some(LockError::LockTypeEscalationImplicit {
session: req.handle_id.clone(),
first_ptr: ptr.clone(),
first_type: *ty,
second_ptr: req.ptr.clone(),
second_type: req.ty,
})
} else {
None
}
}
std::cmp::Ordering::Equal => {
if req.ty > *ty {
// 1R/A >> 2R/A >> 1W/A >> 1W/A
Some(LockError::LockTypeEscalation {
session: req.handle_id.clone(),
ptr: ptr.clone(),
first: *ty,
second: req.ty,
})
} else {
None
}
}
std::cmp::Ordering::Greater => Some(if ptr.starts_with(&req.ptr) {
// 1W/A/A >> 2W/A/B >> 1R/A >> 2R/A
LockError::LockTaxonomyEscalation {
session: req.handle_id.clone(),
first: ptr.clone(),
second: req.ptr.clone(),
}
} else {
// 1W/A >> 2W/B >> 1W/B >> 2W/A
LockError::NonCanonicalOrdering {
session: req.handle_id.clone(),
first: ptr.clone(),
second: req.ptr.clone(),
}
}),
});
err.map_or(Ok(()), Err)
}
}
}
#[cfg(any(feature = "unstable", test))]
pub(super) fn try_insert(&mut self, reqs: &super::LockInfos) -> Result<(), LockError> {
// These are seperate since we want to check all first before we insert
for req in reqs.as_vec() {
self.validate(req)?;
}
for req in reqs.as_vec() {
match self.locks_held.get_mut(&req.handle_id) {
None => {
self.locks_held.insert(
req.handle_id.clone(),
imbl::ordmap![(req.ptr.clone(), req.ty) => 1],
);
}
Some(locks) => {
let k = (req.ptr.clone(), req.ty);
match locks.get_mut(&k) {
None => {
locks.insert(k, 1);
}
Some(n) => {
*n += 1;
}
}
}
}
}
Ok(())
}
#[cfg(any(feature = "unstable", test))]
pub(super) fn remove(&mut self, req: &LockInfo) {
match self.locks_held.remove_with_key(&req.handle_id) {
None => {
#[cfg(feature = "tracing")]
warn!("Invalid removal from session manager: {:?}", req);
}
Some((hdl, mut locks)) => {
let k = (req.ptr.clone(), req.ty);
match locks.remove_with_key(&k) {
None => {
#[cfg(feature = "tracing")]
warn!("Invalid removal from session manager: {:?}", req);
}
Some((k, n)) => {
if n - 1 > 0 {
locks.insert(k, n - 1);
}
}
}
if !locks.is_empty() {
self.locks_held.insert(hdl, locks);
}
}
}
}
}

View File

@@ -1,419 +0,0 @@
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use imbl::{ordmap, ordset, OrdMap, OrdSet};
use json_ptr::JsonPointer;
use proptest::prelude::*;
use proptest::strategy::ValueTree;
use proptest::test_runner::{Config, TestRunner};
use tokio::sync::oneshot;
use crate::handle::HandleId;
use crate::locker::{CancelGuard, LockError, LockInfo, LockInfos, LockType, Request};
use crate::Locker;
use crate::{
locker::bookkeeper::{deadlock_scan, path_to},
JsonGlob,
};
// enum Action {
// Acquire {
// lock_type: LockType,
// ptr: JsonPointer,
// },
// Release(JsonPointer),
// }
// struct Session {
// // session id
// id: HandleId,
// // list of actions and whether or not they have been completed (await returns before test freezes state)
// actions: Vec<(Action, bool)>,
// // lookup table for (json pointers, action indices) -> release action
// guard: HashMap<(JsonPointer, usize), Guard>,
// }
// type Traversal = Vec<usize>;
// randomly select the type of lock we are requesting
fn arb_lock_type() -> BoxedStrategy<LockType> {
prop_oneof![
Just(LockType::Exist),
Just(LockType::Read),
Just(LockType::Write),
]
.boxed()
}
prop_compose! {
fn arb_handle_id(n: u64)(x in 0..n) -> HandleId {
HandleId {
id: x,
#[cfg(feature = "trace")]
trace: None,
}
}
}
fn arb_json_ptr(max_size: usize) -> BoxedStrategy<JsonPointer> {
(1..max_size)
.prop_flat_map(|n| {
let s = proptest::bool::ANY.prop_map(|b| if b { "b" } else { "a" });
proptest::collection::vec_deque(s, n).prop_flat_map(|v| {
let mut ptr = JsonPointer::default();
for seg in v {
ptr.push_end(seg);
}
Just(ptr)
})
})
.boxed()
}
fn arb_model_paths(max_size: usize) -> impl Strategy<Value = JsonGlob> {
proptest::collection::vec("[a-z*]", 1..max_size).prop_map(|a_s| {
a_s.into_iter()
.fold(String::new(), |mut s, x| {
s.push_str(&x);
s
})
.parse::<JsonGlob>()
.unwrap()
})
}
fn arb_lock_info(session_bound: u64, ptr_max_size: usize) -> BoxedStrategy<LockInfo> {
arb_handle_id(session_bound)
.prop_flat_map(move |handle_id| {
arb_model_paths(ptr_max_size).prop_flat_map(move |ptr| {
let handle_id = handle_id.clone();
arb_lock_type().prop_map(move |ty| LockInfo {
handle_id: handle_id.clone(),
ty,
ptr: ptr.clone(),
})
})
})
.boxed()
}
fn arb_lock_infos(
session_bound: u64,
ptr_max_size: usize,
max_size: usize,
) -> BoxedStrategy<LockInfos> {
arb_handle_id(session_bound)
.prop_flat_map(move |handle_id| {
proptest::collection::vec(arb_lock_info(session_bound, ptr_max_size), 1..max_size)
.prop_map(move |xs| {
xs.into_iter()
.map(|mut x| {
x.handle_id = handle_id.clone();
x
})
.collect::<Vec<_>>()
})
})
.prop_map(LockInfos)
.boxed()
}
prop_compose! {
fn arb_request(session_bound: u64, ptr_max_size: usize)(lis in arb_lock_infos(session_bound, ptr_max_size, 10)) -> (Request, CancelGuard) {
let (cancel_send, cancel_recv) = oneshot::channel();
let (guard_send, guard_recv) = oneshot::channel();
let r = Request {
lock_info: lis.clone(),
cancel: Some(cancel_recv),
completion: guard_send,
};
let c = CancelGuard {
lock_info: Some(lis),
channel: Some(cancel_send),
recv: guard_recv,
};
(r, c)
}
}
proptest! {
#[test]
fn path_to_base_case(a in arb_handle_id(20), b in arb_handle_id(20)) {
let b_set = ordset![b.clone()];
let root = &b;
let node = &a;
let graph = ordmap!{&a => &b_set};
prop_assert_eq!(path_to(&graph, ordset![], root, node), ordset![root, node]);
}
}
proptest! {
#[test]
fn path_to_transitive_existence(v in proptest::collection::vec((arb_handle_id(5), arb_handle_id(5)).prop_filter("Self Dependency", |(a, b)| a != b), 1..20), x0 in arb_handle_id(5), x1 in arb_handle_id(5), x2 in arb_handle_id(5)) {
let graph_owned = v.into_iter().fold(ordmap!{}, |m, (a, b)| m.update_with(a, ordset![b], OrdSet::union));
let graph: OrdMap<&HandleId, &OrdSet<HandleId>> = graph_owned.iter().map(|(k, v)| (k, v)).collect();
let avg_set_size = graph.values().fold(0, |a, b| a + b.len()) / graph.len();
prop_assume!(avg_set_size >= 2);
let k0 = path_to(&graph, ordset![], &x0, &x1);
let k1 = path_to(&graph, ordset![], &x1, &x2);
prop_assume!(!k0.is_empty());
prop_assume!(!k1.is_empty());
prop_assert!(!path_to(&graph, ordset![], &x0, &x2).is_empty());
}
}
proptest! {
#[test]
fn path_to_bounds_inclusion(v in proptest::collection::vec((arb_handle_id(5), arb_handle_id(5)).prop_filter("Self Dependency", |(a, b)| a != b), 1..20), x0 in arb_handle_id(5), x1 in arb_handle_id(5)) {
let graph_owned = v.into_iter().fold(ordmap!{}, |m, (a, b)| m.update_with(a, ordset![b], OrdSet::union));
let graph: OrdMap<&HandleId, &OrdSet<HandleId>> = graph_owned.iter().map(|(k, v)| (k, v)).collect();
let avg_set_size = graph.values().fold(0, |a, b| a + b.len()) / graph.len();
prop_assume!(avg_set_size >= 2);
let k0 = path_to(&graph, ordset![], &x0, &x1);
prop_assume!(!k0.is_empty());
prop_assert!(k0.contains(&x0));
prop_assert!(k0.contains(&x1));
}
}
#[test]
fn deadlock_scan_base_case() {
let mut harness = TestRunner::new(Config::default());
let _ = harness.run(&proptest::bool::ANY, |_| {
let mut runner = TestRunner::new(Config::default());
let n = (2..10u64).new_tree(&mut runner).unwrap().current();
println!("Begin");
let mut c = VecDeque::default();
let mut queue = VecDeque::default();
for i in 0..n {
let mut req = arb_request(1, 5).new_tree(&mut runner).unwrap().current();
match req.0.lock_info {
LockInfos(ref mut li) => {
li[0].handle_id.id = i;
}
_ => unreachable!(),
}
let dep = if i == n - 1 { 0 } else { i + 1 };
queue.push_back((
req.0,
ordset![HandleId {
id: dep,
#[cfg(feature = "trace")]
trace: None
}],
));
c.push_back(req.1);
}
for i in &queue {
for info in i.0.lock_info.as_vec() {
println!("{} => {:?}", info.handle_id.id, i.1)
}
}
let set = deadlock_scan(&queue);
println!("{:?}", set);
assert!(!set.is_empty());
Ok(())
});
}
#[test]
fn deadlock_scan_inductive() {
let mut harness = TestRunner::new(Config::default());
let _ = harness.run(&proptest::bool::ANY, |_| {
let mut runner = TestRunner::new(Config::default());
let mut cancels = VecDeque::default();
let mut queue = VecDeque::default();
let (r, c) = arb_request(5, 5).new_tree(&mut runner).unwrap().current();
queue.push_back((r, ordset![]));
cancels.push_back(c);
loop {
if proptest::bool::ANY.new_tree(&mut runner).unwrap().current() {
// add new edge
let h = arb_handle_id(5).new_tree(&mut runner).unwrap().current();
let i = (0..queue.len()).new_tree(&mut runner).unwrap().current();
if let Some((r, s)) = queue.get_mut(i) {
if r.lock_info.as_vec().iter().all(|x| x.handle_id != h) {
s.insert(h);
} else {
continue;
}
}
} else {
// add new node
let (r, c) = arb_request(5, 5).new_tree(&mut runner).unwrap().current();
let request_infos = r.lock_info.as_vec();
// but only if the session hasn't yet been used
if queue.iter().all(|(qr, _)| {
for qr_info in qr.lock_info.as_vec() {
for request_info in request_infos.iter() {
if qr_info.handle_id == request_info.handle_id {
return false;
}
}
}
true
}) {
queue.push_back((r, ordset![]));
cancels.push_back(c);
}
}
let cycle = deadlock_scan(&queue)
.into_iter()
.flat_map(|x| x.lock_info.as_vec())
.map(|r| &r.handle_id)
.collect::<OrdSet<&HandleId>>();
if !cycle.is_empty() {
println!("Cycle: {:?}", cycle);
for (r, s) in &queue {
for info in r.lock_info.as_vec() {
if cycle.contains(&info.handle_id) {
assert!(s.iter().any(|h| cycle.contains(h)))
}
}
}
break;
}
}
Ok(())
});
}
#[tokio::test]
async fn deadlock_kill_live() {
let locker = Locker::new();
let s0 = HandleId {
id: 0,
#[cfg(feature = "trace")]
trace: None,
};
let s1 = HandleId {
id: 1,
#[cfg(feature = "trace")]
trace: None,
};
let x = locker
.lock(s0.clone(), "/a/b".parse().unwrap(), LockType::Read)
.await;
assert!(x.is_ok());
let y = locker
.lock(s1.clone(), "/a/b".parse().unwrap(), LockType::Read)
.await;
assert!(y.is_ok());
let x = tokio::select! {
r0 = locker.lock(s0, "/a/b".parse().unwrap(), LockType::Write) => r0,
r1 = locker.lock(s1, "/a/b".parse().unwrap(), LockType::Write) => r1,
};
match x {
Ok(g) => {
println!("wat");
drop(g);
assert!(false);
}
Err(e) => match e {
LockError::DeadlockDetected { .. } => {
println!("{}", e);
}
_ => {
println!("{}", e);
#[cfg(not(feature = "unstable"))]
assert!(false);
}
},
}
}
proptest! {
#[test]
fn trie_lock_inverse_identity(lock_order in proptest::collection::vec(arb_lock_infos(1, 5, 10), 1..30)) {
use crate::locker::trie::LockTrie;
use rand::seq::SliceRandom;
let mut trie = LockTrie::default();
for i in &lock_order {
trie.try_lock(i).expect(&format!("try_lock failed: {}", i));
}
let mut release_order = lock_order.clone();
let slice: &mut [LockInfos] = &mut release_order[..];
slice.shuffle(&mut rand::thread_rng());
for is in &release_order {
for i in is.as_vec() {
trie.unlock(i);
}
}
prop_assert_eq!(trie, LockTrie::default())
}
}
proptest! {
#[test]
fn enforcer_lock_inverse_identity(lock_order in proptest::collection::vec(arb_lock_infos(1,3,10), 1..30)) {
use crate::locker::order_enforcer::LockOrderEnforcer;
use rand::seq::SliceRandom;
let mut enforcer = LockOrderEnforcer::new();
for i in &lock_order {
enforcer.try_insert(i);
}
let mut release_order = lock_order.clone();
let slice: &mut [LockInfos] = &mut release_order[..];
slice.shuffle(&mut rand::thread_rng());
prop_assert!(enforcer != LockOrderEnforcer::new());
for is in &release_order {
for i in is.as_vec() {
enforcer.remove(i);
}
}
prop_assert_eq!(enforcer, LockOrderEnforcer::new());
}
}
proptest! {
#[test]
fn existence_ancestors_dont_block_descendent_writes(s0 in arb_handle_id(10), s1 in arb_handle_id(10), mut ptr0 in arb_json_ptr(3), ptr1 in arb_json_ptr(3)) {
use crate::locker::trie::LockTrie;
prop_assume!(s0 != s1);
let mut trie = LockTrie::default();
let li0 = LockInfo {
handle_id: s0,
ty: LockType::Exist,
ptr: ptr0.clone().into()
};
println!("{}", ptr0);
ptr0.append(&ptr1);
println!("{}", ptr0);
let li1 = LockInfo {
handle_id: s1,
ty: LockType::Write,
ptr: ptr0.clone().into()
};
trie.try_lock(&LockInfos(vec![li0])).unwrap();
println!("{:?}", trie);
trie.try_lock(&LockInfos(vec![li1])).expect("E locks don't prevent child locks");
}
}
proptest! {
#[test]
fn zero_or_one_write_lock_per_traversal(x in 0..10) {
// if there is a write lock in the traversal, then the cardinality of the set of all lock holders on that traversal must be exactly 1
let x = 1..100i32;
assert!(true)
}
}
proptest! {
#[test]
fn existence_locks_must_not_have_write_ancestors(x in 0..10) {
// existence locks cannot be granted to nodes that have write locks on lease to any ancestor
}
}
proptest! {
#[test]
fn single_session_is_unrestricted(x in 0..10) {
// if there is only one active session, all lock requests will be granted
}
}
proptest! {
#[test]
fn read_locks_never_conflict(x in 0..10) {
// if all that is requested is read locks, an unlimited number of sessions will be able to acquire all locks asked for
}
}
}
// any given database handle must go out of scope in finite time

View File

@@ -1,725 +0,0 @@
use std::collections::BTreeMap;
use imbl::{ordset, OrdSet};
use json_ptr::{JsonPointer, SegList};
use super::LockInfo;
use super::{natural::Natural, LockInfos};
use crate::{handle::HandleId, model_paths::JsonGlob};
use crate::{model_paths::JsonGlobSegment, LockType};
#[derive(Debug, Clone, PartialEq, Eq)]
enum LockState {
Free,
Shared {
e_lessees: BTreeMap<HandleId, Natural>,
r_lessees: BTreeMap<HandleId, Natural>,
},
Exclusive {
w_lessee: HandleId,
w_session_count: Natural, // should never be 0
r_session_count: usize,
e_session_count: usize,
},
}
impl LockState {
fn erase(self, session: &HandleId) -> LockState {
match self {
LockState::Free => LockState::Free,
LockState::Shared {
mut e_lessees,
mut r_lessees,
} => {
e_lessees.remove(session);
r_lessees.remove(session);
if e_lessees.is_empty() && r_lessees.is_empty() {
LockState::Free
} else {
LockState::Shared {
e_lessees,
r_lessees,
}
}
}
LockState::Exclusive { ref w_lessee, .. } => {
if w_lessee == session {
LockState::Free
} else {
self
}
}
}
}
fn write_free(&self) -> bool {
!matches!(self, LockState::Exclusive { .. })
}
fn read_free(&self) -> bool {
match self {
LockState::Exclusive {
r_session_count, ..
} => *r_session_count == 0,
LockState::Shared { r_lessees, .. } => r_lessees.is_empty(),
_ => true,
}
}
fn sessions(&self) -> OrdSet<&'_ HandleId> {
match self {
LockState::Free => OrdSet::new(),
LockState::Shared {
e_lessees,
r_lessees,
} => e_lessees.keys().chain(r_lessees.keys()).collect(),
LockState::Exclusive { w_lessee, .. } => ordset![w_lessee],
}
}
#[allow(dead_code)]
fn exist_sessions(&self) -> OrdSet<&'_ HandleId> {
match self {
LockState::Free => OrdSet::new(),
LockState::Shared { e_lessees, .. } => e_lessees.keys().collect(),
LockState::Exclusive {
w_lessee,
e_session_count,
..
} => {
if *e_session_count > 0 {
ordset![w_lessee]
} else {
OrdSet::new()
}
}
}
}
fn read_sessions(&self) -> OrdSet<&'_ HandleId> {
match self {
LockState::Free => OrdSet::new(),
LockState::Shared { r_lessees, .. } => r_lessees.keys().collect(),
LockState::Exclusive {
w_lessee,
r_session_count,
..
} => {
if *r_session_count > 0 {
ordset![w_lessee]
} else {
OrdSet::new()
}
}
}
}
fn write_session(&self) -> Option<&'_ HandleId> {
match self {
LockState::Exclusive { w_lessee, .. } => Some(w_lessee),
_ => None,
}
}
fn normalize(&mut self) {
match &*self {
LockState::Shared {
e_lessees,
r_lessees,
} if e_lessees.is_empty() && r_lessees.is_empty() => {
*self = LockState::Free;
}
_ => {}
}
}
// note this is not necessarily safe in the overall trie locking model
// this function will return true if the state changed as a result of the call
// if it returns false it technically means that the call was invalid and did not
// change the lock state at all
fn try_lock(&mut self, session: HandleId, typ: &LockType) -> bool {
match (&mut *self, typ) {
(LockState::Free, LockType::Exist) => {
*self = LockState::Shared {
e_lessees: [(session, Natural::one())].into(),
r_lessees: BTreeMap::new(),
};
true
}
(LockState::Free, LockType::Read) => {
*self = LockState::Shared {
e_lessees: BTreeMap::new(),
r_lessees: [(session, Natural::one())].into(),
};
true
}
(LockState::Free, LockType::Write) => {
*self = LockState::Exclusive {
w_lessee: session,
w_session_count: Natural::one(),
r_session_count: 0,
e_session_count: 0,
};
true
}
(LockState::Shared { e_lessees, .. }, LockType::Exist) => {
match e_lessees.get_mut(&session) {
None => {
e_lessees.insert(session, Natural::one());
}
Some(v) => v.inc(),
};
true
}
(LockState::Shared { r_lessees, .. }, LockType::Read) => {
match r_lessees.get_mut(&session) {
None => {
r_lessees.insert(session, Natural::one());
}
Some(v) => v.inc(),
}
true
}
(
LockState::Shared {
e_lessees,
r_lessees,
},
LockType::Write,
) => {
for hdl in e_lessees.keys() {
if hdl != &session {
return false;
}
}
for hdl in r_lessees.keys() {
if hdl != &session {
return false;
}
}
*self = LockState::Exclusive {
r_session_count: r_lessees.remove(&session).map_or(0, Natural::into_usize),
e_session_count: e_lessees.remove(&session).map_or(0, Natural::into_usize),
w_lessee: session,
w_session_count: Natural::one(),
};
true
}
(
LockState::Exclusive {
w_lessee,
e_session_count,
..
},
LockType::Exist,
) => {
if w_lessee != &session {
return false;
}
*e_session_count += 1;
true
}
(
LockState::Exclusive {
w_lessee,
r_session_count,
..
},
LockType::Read,
) => {
if w_lessee != &session {
return false;
}
*r_session_count += 1;
true
}
(
LockState::Exclusive {
w_lessee,
w_session_count,
..
},
LockType::Write,
) => {
if w_lessee != &session {
return false;
}
w_session_count.inc();
true
}
}
}
// there are many ways for this function to be called in an invalid way: Notably releasing locks that you never
// had to begin with.
fn try_unlock(&mut self, session: &HandleId, typ: &LockType) -> bool {
match (&mut *self, typ) {
(LockState::Free, _) => false,
(LockState::Shared { e_lessees, .. }, LockType::Exist) => {
match e_lessees.remove_entry(session) {
None => false,
Some((k, v)) => {
match v.dec() {
None => {
self.normalize();
}
Some(n) => {
e_lessees.insert(k, n);
}
}
true
}
}
}
(LockState::Shared { r_lessees, .. }, LockType::Read) => {
match r_lessees.remove_entry(session) {
None => false,
Some((k, v)) => {
match v.dec() {
None => {
self.normalize();
}
Some(n) => {
r_lessees.insert(k, n);
}
}
true
}
}
}
(LockState::Shared { .. }, LockType::Write) => false,
(
LockState::Exclusive {
w_lessee,
e_session_count,
..
},
LockType::Exist,
) => {
if w_lessee != session || *e_session_count == 0 {
return false;
}
*e_session_count -= 1;
true
}
(
LockState::Exclusive {
w_lessee,
r_session_count,
..
},
LockType::Read,
) => {
if w_lessee != session || *r_session_count == 0 {
return false;
}
*r_session_count -= 1;
true
}
(
LockState::Exclusive {
w_lessee,
w_session_count,
r_session_count,
e_session_count,
},
LockType::Write,
) => {
if w_lessee != session {
return false;
}
match w_session_count.dec() {
None => {
let mut e_lessees = BTreeMap::new();
if let Some(n) = Natural::of(*e_session_count) {
e_lessees.insert(session.clone(), n);
}
let mut r_lessees = BTreeMap::new();
if let Some(n) = Natural::of(*r_session_count) {
r_lessees.insert(session.clone(), n);
}
*self = LockState::Shared {
e_lessees,
r_lessees,
};
self.normalize();
}
Some(n) => *w_session_count = n,
}
true
}
}
}
}
impl Default for LockState {
fn default() -> Self {
LockState::Free
}
}
#[derive(Debug, Default, PartialEq, Eq)]
pub(super) struct LockTrie {
state: LockState,
children: BTreeMap<String, LockTrie>,
}
impl LockTrie {
#[allow(dead_code)]
fn all<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
f(&self.state) && self.children.values().all(|t| t.all(&f))
}
#[allow(dead_code)]
fn any<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
f(&self.state) || self.children.values().any(|t| t.any(&f))
}
#[allow(dead_code)]
fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool {
self.all(|s| s.sessions().difference(ordset![session]).is_empty())
}
#[allow(dead_code)]
fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool {
self.all(|s| !matches!(s.clone().erase(session), LockState::Exclusive { .. }))
}
fn subtree_write_sessions(&self) -> OrdSet<&'_ HandleId> {
match &self.state {
LockState::Exclusive { w_lessee, .. } => ordset![w_lessee],
_ => self
.children
.values()
.map(|t| t.subtree_write_sessions())
.fold(OrdSet::new(), OrdSet::union),
}
}
fn subtree_sessions(&self) -> OrdSet<&'_ HandleId> {
let children = self
.children
.values()
.map(LockTrie::subtree_sessions)
.fold(OrdSet::new(), OrdSet::union);
self.state.sessions().union(children)
}
pub fn subtree_lock_info(&self) -> OrdSet<LockInfo> {
let mut acc = self
.children
.iter()
.map(|(s, t)| {
t.subtree_lock_info()
.into_iter()
.map(|i| LockInfo {
ty: i.ty,
handle_id: i.handle_id,
ptr: {
i.ptr.append(s.parse().unwrap_or_else(|_| {
#[cfg(feature = "tracing")]
tracing::error!(
"Should never not be able to parse a string as a path"
);
Default::default()
}))
},
})
.collect()
})
.fold(ordset![], OrdSet::union);
let self_writes = self.state.write_session().map(|session| LockInfo {
handle_id: session.clone(),
ptr: Default::default(),
ty: LockType::Write,
});
let self_reads = self
.state
.read_sessions()
.into_iter()
.map(|session| LockInfo {
handle_id: session.clone(),
ptr: Default::default(),
ty: LockType::Read,
});
let self_exists = self
.state
.exist_sessions()
.into_iter()
.map(|session| LockInfo {
handle_id: session.clone(),
ptr: Default::default(),
ty: LockType::Exist,
});
acc.extend(self_writes.into_iter().chain(self_reads).chain(self_exists));
acc
}
fn ancestors_and_trie_json_path<'a, S: AsRef<str>, V: SegList>(
&'a self,
ptr: &JsonPointer<S, V>,
) -> (Vec<&'a LockState>, Option<&'a LockTrie>) {
match ptr.uncons() {
None => (Vec::new(), Some(self)),
Some((first, rest)) => match self.children.get(first) {
None => (vec![&self.state], None),
Some(t) => {
let (mut v, t) = t.ancestors_and_trie_json_path(&rest);
v.push(&self.state);
(v, t)
}
},
}
}
fn ancestors_and_trie_model_paths<'a>(
&'a self,
path: &[JsonGlobSegment],
) -> Vec<(Vec<&'a LockState>, Option<&'a LockTrie>)> {
let head = path.get(0);
match head {
None => vec![(Vec::new(), Some(self))],
Some(JsonGlobSegment::Star) => self
.children
.values()
.into_iter()
.flat_map(|lock_trie| lock_trie.ancestors_and_trie_model_paths(&path[1..]))
.collect(),
Some(JsonGlobSegment::Path(x)) => match self.children.get(x) {
None => vec![(vec![&self.state], None)],
Some(t) => t
.ancestors_and_trie_model_paths(&path[1..])
.into_iter()
.map(|(mut v, t)| {
v.push(&self.state);
(v, t)
})
.collect(),
},
}
}
fn ancestors_and_trie<'a>(
&'a self,
ptr: &JsonGlob,
) -> Vec<(Vec<&'a LockState>, Option<&'a LockTrie>)> {
match ptr {
JsonGlob::Path(x) => vec![self.ancestors_and_trie_json_path(x)],
JsonGlob::PathWithStar(path) => self.ancestors_and_trie_model_paths(path.segments()),
}
}
// no writes in ancestor set, no writes at node
#[allow(dead_code)]
fn can_acquire_exist(&self, ptr: &JsonGlob, session: &HandleId) -> bool {
let (vectors, tries): (Vec<_>, Vec<_>) = self.ancestors_and_trie(ptr).into_iter().unzip();
let ancestor_write_free = vectors.into_iter().all(|v| {
v.into_iter()
.cloned()
.map(|s| s.erase(session))
.all(|s| s.write_free())
});
let checking_end_tries_are_write_free = tries
.into_iter()
.all(|t| t.map_or(true, |t| t.state.clone().erase(session).write_free()));
ancestor_write_free && checking_end_tries_are_write_free
}
// no writes in ancestor set, no writes in subtree
#[allow(dead_code)]
fn can_acquire_read(&self, ptr: &JsonGlob, session: &HandleId) -> bool {
let (vectors, tries): (Vec<_>, Vec<_>) = self.ancestors_and_trie(ptr).into_iter().unzip();
let ancestor_write_free = vectors.into_iter().all(|v| {
v.into_iter()
.cloned()
.map(|s| s.erase(session))
.all(|s| s.write_free())
});
let end_nodes_are_correct = tries
.into_iter()
.all(|t| t.map_or(true, |t| t.subtree_is_exclusive_free_for(session)));
ancestor_write_free && end_nodes_are_correct
}
// no reads or writes in ancestor set, no locks in subtree
#[allow(dead_code)]
fn can_acquire_write(&self, ptr: &JsonGlob, session: &HandleId) -> bool {
let (vectors, tries): (Vec<_>, Vec<_>) = self.ancestors_and_trie(ptr).into_iter().unzip();
let ancestor_rw_free = vectors.into_iter().all(|v| {
v.into_iter()
.cloned()
.map(|s| s.erase(session))
.all(|s| s.write_free() && s.read_free())
});
let end_nodes_are_correct = tries
.into_iter()
.all(|t| t.map_or(true, |t| t.subtree_is_lock_free_for(session)));
ancestor_rw_free && end_nodes_are_correct
}
// ancestors with writes and writes on the node
fn session_blocking_exist<'a>(
&'a self,
ptr: &JsonGlob,
session: &HandleId,
) -> Option<&'a HandleId> {
let vectors_and_tries = self.ancestors_and_trie(ptr);
vectors_and_tries.into_iter().find_map(|(v, t)| {
// there can only be one write session per traversal
let ancestor_write = v.into_iter().find_map(|s| s.write_session());
let node_write = t.and_then(|t| t.state.write_session());
ancestor_write
.or(node_write)
.and_then(|s| if s == session { None } else { Some(s) })
})
}
// ancestors with writes, subtrees with writes
fn sessions_blocking_read<'a>(
&'a self,
ptr: &JsonGlob,
session: &HandleId,
) -> OrdSet<&'a HandleId> {
let vectors_and_tries = self.ancestors_and_trie(ptr);
vectors_and_tries
.into_iter()
.flat_map(|(v, t)| {
let ancestor_writes = v
.into_iter()
.map(|s| s.write_session().into_iter().collect::<OrdSet<_>>())
.fold(OrdSet::new(), OrdSet::union);
let relevant_write_sessions = match t {
None => ancestor_writes,
Some(t) => ancestor_writes.union(t.subtree_write_sessions()),
};
relevant_write_sessions.without(session)
})
.collect()
}
// ancestors with reads or writes, subtrees with anything
fn sessions_blocking_write<'a>(
&'a self,
ptr: &JsonGlob,
session: &HandleId,
) -> OrdSet<&'a HandleId> {
let vectors_and_tries = self.ancestors_and_trie(ptr);
vectors_and_tries
.into_iter()
.flat_map(|(v, t)| {
let ancestors = v
.into_iter()
.map(|s| {
s.read_sessions()
.union(s.write_session().into_iter().collect())
})
.fold(OrdSet::new(), OrdSet::union);
let subtree = t.map_or(OrdSet::new(), |t| t.subtree_sessions());
ancestors.union(subtree).without(session)
})
.collect()
}
fn child_mut_pointer<S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> &mut Self {
match ptr.uncons() {
None => self,
Some((first, rest)) => {
if !self.children.contains_key(first) {
self.children.insert(first.to_owned(), LockTrie::default());
}
self.children
.get_mut(first)
.unwrap()
.child_mut_pointer(&rest)
}
}
}
fn child_mut(&mut self, ptr: &JsonGlob) -> &mut Self {
match ptr {
JsonGlob::Path(x) => self.child_mut_pointer(x),
JsonGlob::PathWithStar(path) => self.child_mut_paths(path.segments()),
}
}
fn child_mut_paths(&mut self, path: &[JsonGlobSegment]) -> &mut LockTrie {
let mut current = self;
let paths_iter = path.iter();
for head in paths_iter {
let key = match head {
JsonGlobSegment::Path(path) => path.clone(),
JsonGlobSegment::Star => "*".to_string(),
};
if !current.children.contains_key(&key) {
current.children.insert(key.to_owned(), LockTrie::default());
}
current = current.children.get_mut(&key).unwrap();
}
current
}
fn sessions_blocking_lock<'a>(&'a self, lock_info: &LockInfo) -> OrdSet<&'a HandleId> {
match &lock_info.ty {
LockType::Exist => self
.session_blocking_exist(&lock_info.ptr, &lock_info.handle_id)
.into_iter()
.collect(),
LockType::Read => self.sessions_blocking_read(&lock_info.ptr, &lock_info.handle_id),
LockType::Write => self.sessions_blocking_write(&lock_info.ptr, &lock_info.handle_id),
}
}
pub fn try_lock(&mut self, lock_infos: &LockInfos) -> Result<(), OrdSet<HandleId>> {
let lock_info_vec = lock_infos.as_vec();
let blocking_sessions: OrdSet<_> = lock_info_vec
.iter()
.flat_map(|lock_info| self.sessions_blocking_lock(lock_info))
.collect();
if !blocking_sessions.is_empty() {
Err(blocking_sessions.into_iter().cloned().collect())
} else {
drop(blocking_sessions);
for lock_info in lock_info_vec {
let success = self
.child_mut(&lock_info.ptr)
.state
.try_lock(lock_info.handle_id.clone(), &lock_info.ty);
assert!(success);
}
Ok(())
}
}
pub fn unlock(&mut self, lock_info: &LockInfo) {
let t = self.child_mut(&lock_info.ptr);
let success = t.state.try_unlock(&lock_info.handle_id, &lock_info.ty);
assert!(success);
self.prune();
}
fn prunable(&self) -> bool {
self.children.is_empty() && self.state == LockState::Free
}
fn prune(&mut self) {
self.children.retain(|_, t| {
t.prune();
!t.prunable()
})
}
}
#[cfg(test)]
mod proptest {
use ::proptest::prelude::*;
use super::*;
fn lock_type_gen() -> BoxedStrategy<crate::LockType> {
prop_oneof![
Just(crate::LockType::Exist),
Just(crate::LockType::Read),
Just(crate::LockType::Write),
]
.boxed()
}
proptest! {
#[test]
fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) {
let mut orig = LockState::Free;
orig.try_lock(HandleId{
id: session,
#[cfg(feature = "tracing")]
trace: None
}, &typ);
orig.try_unlock(&HandleId{
id: session,
#[cfg(feature="tracing")]
trace:None
}, &typ);
prop_assert_eq!(orig, LockState::Free);
}
}
}

View File

@@ -1,711 +1,192 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::hash::Hash;
use imbl_value::Value;
pub trait HasModel: Sized {
type Model: Model<Self>;
}
mod sealed {
use super::*;
pub trait ModelMarker {
fn into_value(self) -> Value;
fn from_value(value: Value) -> Self;
fn as_value<'a>(&'a self) -> &'a Value;
fn value_as<'a>(value: &'a Value) -> &'a Self;
fn as_value_mut<'a>(&'a mut self) -> &'a mut Value;
fn value_as_mut<'a>(value: &'a mut Value) -> &'a mut Self;
}
impl<T> ModelMarker for T
where
T: From<Value> + Into<Value> + Sized,
for<'a> &'a T: From<&'a Value> + Into<&'a Value>,
for<'a> &'a mut T: From<&'a mut Value> + Into<&'a mut Value>,
{
fn into_value(self) -> Value {
self.into()
}
fn from_value(value: Value) -> Self {
value.into()
}
fn as_value<'a>(&'a self) -> &'a Value {
self.into()
}
fn value_as<'a>(value: &'a Value) -> &'a Self {
value.into()
}
fn as_value_mut<'a>(&'a mut self) -> &'a mut Value {
self.into()
}
fn value_as_mut<'a>(value: &'a mut Value) -> &'a mut Self {
value.into()
}
}
}
pub trait Model<T>: sealed::ModelMarker + Sized {
type Model<U>: Model<U>;
}
pub trait ModelExt<T>: Model<T> {
fn into_value(self) -> Value {
<Self as sealed::ModelMarker>::into_value(self)
}
fn from_value(value: Value) -> Self {
<Self as sealed::ModelMarker>::from_value(value)
}
fn as_value<'a>(&'a self) -> &'a Value {
<Self as sealed::ModelMarker>::as_value(self)
}
fn value_as<'a>(value: &'a Value) -> &'a Self {
<Self as sealed::ModelMarker>::value_as(value)
}
fn as_value_mut<'a>(&'a mut self) -> &'a mut Value {
<Self as sealed::ModelMarker>::as_value_mut(self)
}
fn value_as_mut<'a>(value: &'a mut Value) -> &'a mut Self {
<Self as sealed::ModelMarker>::value_as_mut(value)
}
fn transmute<U>(self, f: impl FnOnce(Value) -> Value) -> Self::Model<U> {
Self::Model::<U>::from_value(f(<Self as sealed::ModelMarker>::into_value(self)))
}
fn transmute_ref<'a, U>(
&'a self,
f: impl for<'b> FnOnce(&'b Value) -> &'b Value,
) -> &'a Self::Model<U> {
Self::Model::<U>::value_as(f(<Self as sealed::ModelMarker>::as_value(self)))
}
fn transmute_mut<'a, U>(
&'a mut self,
f: impl for<'b> FnOnce(&'b mut Value) -> &'b mut Value,
) -> &'a mut Self::Model<U> {
Self::Model::<U>::value_as_mut(f(<Self as sealed::ModelMarker>::as_value_mut(self)))
}
}
impl<T, M: Model<T>> ModelExt<T> for M {}
#[cfg(test)]
mod test {
use crate as patch_db;
use crate::model::sealed::ModelMarker;
use imbl_value::{from_value, json, to_value, Value};
use serde::{de::DeserializeOwned, Serialize};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use json_patch::{Patch, PatchOperation, RemoveOperation};
use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{
bulk_locks::{self, unsaturated_args::AsUnsaturatedArgs, LockTarget},
locker::LockType,
model_paths::JsonGlob,
};
use crate::{DbHandle, DiffPatch, Error, Revision};
/// &mut Model<T> <=> &mut Value
#[repr(transparent)]
#[derive(Debug)]
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelData<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> ModelData<T> {
pub fn into_owned(self) -> T {
self.0
}
}
#[derive(Debug)]
pub struct ModelDataMut<T: Serialize + for<'de> Deserialize<'de>> {
original: Value,
current: T,
ptr: JsonPointer,
}
impl<T: Serialize + for<'de> Deserialize<'de>> ModelDataMut<T> {
pub async fn save<Db: DbHandle>(&mut self, db: &mut Db) -> Result<(), Error> {
let current = serde_json::to_value(&self.current)?;
let mut diff = crate::patch::diff(&self.original, &current);
let target = db.get_value(&self.ptr, None).await?;
diff.rebase(&crate::patch::diff(&self.original, &target));
diff.prepend(&self.ptr);
db.apply(diff, None).await?;
self.original = current;
Ok(())
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelDataMut<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.current
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> DerefMut for ModelDataMut<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.current
}
}
#[derive(Debug)]
pub struct Model<T: Serialize + for<'de> Deserialize<'de>> {
pub(crate) path: JsonGlob,
pub struct Model<T> {
value: Value,
phantom: PhantomData<T>,
}
lazy_static::lazy_static!(
static ref EMPTY_JSON: JsonPointer = JsonPointer::default();
);
impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock_type: LockType) -> Result<(), Error> {
Ok(db.lock(self.json_ptr().clone().into(), lock_type).await?)
impl<T: DeserializeOwned> Model<T> {
pub fn de(self) -> Result<T, imbl_value::Error> {
from_value(self.value)
}
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<T>, Error> {
Ok(ModelData(db.get(self.json_ptr()).await?))
}
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
let original = db.get_value(self.json_ptr(), None).await?;
let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut {
original,
current,
ptr: self.json_ptr().clone(),
})
}
/// Used for times of Serialization, or when going into the db
fn json_ptr(&self) -> &JsonPointer {
match self.path {
JsonGlob::Path(ref ptr) => ptr,
JsonGlob::PathWithStar { .. } => {
#[cfg(feature = "tracing")]
tracing::error!("Should be unreachable, since the type of () means that the paths is always Paths");
&*EMPTY_JSON
}
}
}
}
impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub fn child<C: Serialize + for<'de> Deserialize<'de>>(self, index: &str) -> Model<C> {
let path = self.path.append(index.parse().unwrap_or_else(|_e| {
#[cfg(feature = "trace")]
tracing::error!("Shouldn't ever not be able to parse a path");
Default::default()
}));
Model {
path,
phantom: PhantomData,
}
}
/// One use is gettign the modelPaths for the bulk locks
pub fn model_paths(&self) -> &JsonGlob {
&self.path
}
}
impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
/// Used to create a lock for the db
pub fn make_locker<SB>(&self, lock_type: LockType) -> LockTarget<T, SB>
where
JsonGlob: AsUnsaturatedArgs<SB>,
{
bulk_locks::LockTarget {
lock_type,
db_type: self.phantom,
_star_binds: self.path.as_unsaturated_args(),
glob: self.path.clone(),
}
}
}
impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
pub async fn put<Db: DbHandle>(
&self,
db: &mut Db,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
db.put(self.json_ptr(), value).await
}
}
impl<T> From<JsonPointer> for Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn from(ptr: JsonPointer) -> Self {
Self {
path: JsonGlob::Path(ptr),
phantom: PhantomData,
}
}
}
impl<T> From<JsonGlob> for Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn from(ptr: JsonGlob) -> Self {
Self {
path: ptr,
phantom: PhantomData,
}
}
}
impl<T> AsRef<JsonPointer> for Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn as_ref(&self) -> &JsonPointer {
self.json_ptr()
}
}
impl<T> From<Model<T>> for JsonPointer
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn from(model: Model<T>) -> Self {
model.json_ptr().clone()
}
}
impl<T> From<Model<T>> for JsonGlob
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn from(model: Model<T>) -> Self {
model.path
}
}
impl<T> std::clone::Clone for Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn clone(&self) -> Self {
Model {
path: self.path.clone(),
phantom: PhantomData,
}
}
}
pub trait HasModel: Serialize + for<'de> Deserialize<'de> {
type Model: ModelFor<Self>;
}
pub trait ModelFor<T: Serialize + for<'de> Deserialize<'de>>:
From<JsonPointer>
+ From<JsonGlob>
+ AsRef<JsonPointer>
+ Into<JsonPointer>
+ From<Model<T>>
+ Clone
+ Into<JsonGlob>
{
}
impl<
T: Serialize + for<'de> Deserialize<'de>,
U: From<JsonPointer>
+ From<JsonGlob>
+ AsRef<JsonPointer>
+ Into<JsonPointer>
+ From<Model<T>>
+ Clone
+ Into<JsonGlob>,
> ModelFor<T> for U
{
}
macro_rules! impl_simple_has_model {
($($ty:ty),*) => {
$(
impl HasModel for $ty {
type Model = Model<$ty>;
}
)*
};
}
impl_simple_has_model!(
bool, char, f32, f64, i128, i16, i32, i64, i8, isize, u128, u16, u32, u64, u8, usize, String
);
#[derive(Debug)]
pub struct BoxModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> Deref for BoxModel<T> {
type Target = T::Model;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> DerefMut for BoxModel<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Box<T>>> for BoxModel<T> {
fn from(model: Model<Box<T>>) -> Self {
BoxModel(T::Model::from(JsonPointer::from(model)))
}
}
impl<T> AsRef<JsonPointer> for BoxModel<T>
where
T: HasModel + Serialize + for<'de> Deserialize<'de>,
{
fn as_ref(&self) -> &JsonPointer {
self.0.as_ref()
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for BoxModel<T> {
fn from(ptr: JsonPointer) -> Self {
BoxModel(T::Model::from(ptr))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonGlob> for BoxModel<T> {
fn from(ptr: JsonGlob) -> Self {
BoxModel(T::Model::from(ptr))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<BoxModel<T>> for JsonPointer {
fn from(model: BoxModel<T>) -> Self {
model.0.into()
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<BoxModel<T>> for JsonGlob {
fn from(model: BoxModel<T>) -> Self {
model.0.into()
}
}
impl<T> std::clone::Clone for BoxModel<T>
where
T: HasModel + Serialize + for<'de> Deserialize<'de>,
{
fn clone(&self) -> Self {
BoxModel(self.0.clone())
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
type Model = BoxModel<T>;
}
#[derive(Debug)]
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock_type: LockType) -> Result<(), Error> {
Ok(db.lock(self.0.as_ref().clone().into(), lock_type).await?)
}
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<Option<T>>, Error> {
Ok(ModelData(db.get(self.0.as_ref()).await?))
}
pub async fn get_mut<Db: DbHandle>(
&self,
db: &mut Db,
) -> Result<ModelDataMut<Option<T>>, Error> {
let original = db.get_value(self.0.as_ref(), None).await?;
let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut {
original,
current,
ptr: self.0.clone().into(),
})
}
pub async fn exists<Db: DbHandle>(&self, db: &mut Db) -> Result<bool, Error> {
Ok(db.exists(self.as_ref(), None).await)
}
pub fn map<
F: FnOnce(T::Model) -> U::Model,
U: Serialize + for<'de> Deserialize<'de> + HasModel,
>(
self,
f: F,
) -> OptionModel<U> {
OptionModel(f(self.0))
}
pub fn and_then<
F: FnOnce(T::Model) -> OptionModel<U>,
U: Serialize + for<'de> Deserialize<'de> + HasModel,
>(
self,
f: F,
) -> OptionModel<U> {
f(self.0)
}
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
db.put(self.as_ref(), &Value::Null).await
}
}
impl<T> OptionModel<T>
where
T: HasModel,
{
/// Used to create a lock for the db
pub fn make_locker<SB>(self, lock_type: LockType) -> LockTarget<T, SB>
where
JsonGlob: AsUnsaturatedArgs<SB>,
{
let paths: JsonGlob = self.into();
bulk_locks::LockTarget {
_star_binds: paths.as_unsaturated_args(),
glob: paths,
lock_type,
db_type: PhantomData,
}
}
}
impl<T> OptionModel<T>
where
T: HasModel + Serialize + for<'de> Deserialize<'de>,
T::Model: DerefMut<Target = Model<T>>,
{
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
Ok(if self.exists(db).await? {
Some(self.0)
} else {
None
})
}
pub async fn expect<Db: DbHandle>(self, db: &mut Db) -> Result<T::Model, Error> {
if self.exists(db).await? {
Ok(self.0)
} else {
Err(Error::NodeDoesNotExist(self.0.into()))
}
}
}
impl<T> OptionModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
{
pub async fn put<Db: DbHandle>(
&self,
db: &mut Db,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
db.put(self.as_ref(), value).await
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Option<T>>>
for OptionModel<T>
{
fn from(model: Model<Option<T>>) -> Self {
OptionModel(T::Model::from(JsonGlob::from(model)))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for OptionModel<T> {
fn from(ptr: JsonPointer) -> Self {
OptionModel(T::Model::from(ptr))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonGlob> for OptionModel<T> {
fn from(ptr: JsonGlob) -> Self {
OptionModel(T::Model::from(ptr))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<OptionModel<T>> for JsonPointer {
fn from(model: OptionModel<T>) -> Self {
model.0.into()
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<OptionModel<T>> for JsonGlob {
fn from(model: OptionModel<T>) -> Self {
model.0.into()
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> AsRef<JsonPointer> for OptionModel<T> {
fn as_ref(&self) -> &JsonPointer {
self.0.as_ref()
}
}
impl<T> std::clone::Clone for OptionModel<T>
where
T: HasModel + Serialize + for<'de> Deserialize<'de>,
{
fn clone(&self) -> Self {
OptionModel(self.0.clone())
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Option<T> {
type Model = OptionModel<T>;
}
#[derive(Debug)]
pub struct VecModel<T: Serialize + for<'de> Deserialize<'de>>(Model<Vec<T>>);
impl<T: Serialize + for<'de> Deserialize<'de>> Deref for VecModel<T> {
type Target = Model<Vec<T>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> DerefMut for VecModel<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> VecModel<T> {
pub fn idx(self, idx: usize) -> Model<Option<T>> {
self.0.child(&format!("{}", idx))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> VecModel<T> {
pub fn idx_model(self, idx: usize) -> OptionModel<T> {
self.0.child(&format!("{}", idx)).into()
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> From<Model<Vec<T>>> for VecModel<T> {
fn from(model: Model<Vec<T>>) -> Self {
VecModel(From::from(JsonGlob::from(model)))
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for VecModel<T> {
fn from(ptr: JsonPointer) -> Self {
VecModel(From::from(ptr))
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> From<JsonGlob> for VecModel<T> {
fn from(ptr: JsonGlob) -> Self {
VecModel(From::from(ptr))
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> From<VecModel<T>> for JsonPointer {
fn from(model: VecModel<T>) -> Self {
model.0.into()
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> From<VecModel<T>> for JsonGlob {
fn from(model: VecModel<T>) -> Self {
model.0.into()
}
}
impl<T> AsRef<JsonPointer> for VecModel<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn as_ref(&self) -> &JsonPointer {
self.0.as_ref()
}
}
impl<T> std::clone::Clone for VecModel<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
fn clone(&self) -> Self {
VecModel(self.0.clone())
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> HasModel for Vec<T> {
type Model = VecModel<T>;
}
pub trait Map {
type Key: AsRef<str>;
type Value;
fn get(&self, key: &Self::Key) -> Option<&Self::Value>;
}
impl<K: AsRef<str> + Eq + Hash, V> Map for HashMap<K, V> {
type Key = K;
type Value = V;
fn get(&self, key: &Self::Key) -> Option<&Self::Value> {
HashMap::get(self, key)
}
}
impl<K: AsRef<str> + Ord, V> Map for BTreeMap<K, V> {
type Key = K;
type Value = V;
fn get(&self, key: &Self::Key) -> Option<&Self::Value> {
self.get(key)
}
}
#[derive(Debug)]
pub struct MapModel<T>(Model<T>)
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>;
impl<T> Deref for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
type Target = Model<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> std::clone::Clone for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn clone(&self) -> Self {
MapModel(self.0.clone())
}
}
impl<T> MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
pub fn idx(self, idx: &<T as Map>::Key) -> Model<Option<<T as Map>::Value>> {
self.0.child(idx.as_ref())
}
}
impl<T> MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Key: Ord + Eq + for<'de> Deserialize<'de>,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
pub async fn keys<Db: DbHandle>(&self, db: &mut Db) -> Result<BTreeSet<T::Key>, Error> {
let set = db.keys(self.json_ptr(), None).await;
Ok(set
.into_iter()
.map(|s| serde_json::from_value(Value::String(s)))
.collect::<Result<_, _>>()?)
}
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
if db.exists(self.clone().idx(key).as_ref(), None).await {
db.apply(
DiffPatch(Patch(vec![PatchOperation::Remove(RemoveOperation {
path: self.as_ref().clone().join_end(key.as_ref()),
})])),
None,
)
.await?;
}
impl<T: Serialize> Model<T> {
pub fn ser(&mut self, value: &T) -> Result<(), imbl_value::Error> {
self.value = to_value(value)?;
Ok(())
}
}
impl<T> MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de> + HasModel,
{
pub fn idx_model(self, idx: &<T as Map>::Key) -> OptionModel<<T as Map>::Value> {
self.0.child(idx.as_ref()).into()
}
}
impl<T> MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de> + HasModel,
{
/// Used when mapping across all possible paths of a map or such, to later be filled
pub fn star(self) -> <<T as Map>::Value as HasModel>::Model {
let path = self.0.path.append(JsonGlob::star());
Model {
path,
impl<T> Clone for Model<T> {
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
phantom: PhantomData,
}
.into()
}
}
impl<T> From<Model<T>> for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn from(model: Model<T>) -> Self {
MapModel(model)
impl<T> From<Value> for Model<T> {
fn from(value: Value) -> Self {
Self {
value,
phantom: PhantomData,
}
}
impl<T> From<JsonPointer> for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn from(ptr: JsonPointer) -> Self {
MapModel(From::from(ptr))
}
impl<T> From<Model<T>> for Value {
fn from(value: Model<T>) -> Self {
value.value
}
}
impl<T> From<JsonGlob> for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn from(ptr: JsonGlob) -> Self {
MapModel(From::from(ptr))
impl<'a, T> From<&'a Value> for &'a Model<T> {
fn from(value: &'a Value) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<T> From<MapModel<T>> for JsonPointer
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn from(model: MapModel<T>) -> Self {
model.0.into()
impl<'a, T> From<&'a Model<T>> for &'a Value {
fn from(value: &'a Model<T>) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<T> From<MapModel<T>> for JsonGlob
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn from(model: MapModel<T>) -> Self {
model.0.into()
impl<'a, T> From<&'a mut Value> for &mut Model<T> {
fn from(value: &'a mut Value) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<T> AsRef<JsonPointer> for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn as_ref(&self) -> &JsonPointer {
self.0.as_ref()
impl<'a, T> From<&'a mut Model<T>> for &mut Value {
fn from(value: &'a mut Model<T>) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<K, V> HasModel for HashMap<K, V>
where
K: Serialize + for<'de> Deserialize<'de> + Hash + Eq + AsRef<str>,
V: Serialize + for<'de> Deserialize<'de>,
{
type Model = MapModel<HashMap<K, V>>;
impl<T> patch_db::Model<T> for Model<T> {
type Model<U> = Model<U>;
}
#[derive(crate::HasModel)]
#[model = "Model<Self>"]
// #[macro_debug]
struct Foo {
a: Bar,
}
#[derive(crate::HasModel)]
#[model = "Model<Self>"]
struct Bar {
b: String,
}
fn mutate_fn(v: &mut Model<Foo>) {
let mut a = v.as_a_mut();
a.as_b_mut().ser(&"NotThis".into()).unwrap();
a.as_b_mut().ser(&"Replaced".into()).unwrap();
}
#[test]
fn test() {
let mut model = Model::<Foo>::from(imbl_value::json!({
"a": {
"b": "ReplaceMe"
}
}));
mutate_fn(&mut model);
mutate_fn(&mut model);
assert_eq!(
model.as_value(),
&json!({
"a": {
"b": "Replaced"
}
})
)
}
impl<K, V> HasModel for BTreeMap<K, V>
where
K: Serialize + for<'de> Deserialize<'de> + Ord + AsRef<str>,
V: Serialize + for<'de> Deserialize<'de>,
{
type Model = MapModel<BTreeMap<K, V>>;
}

View File

@@ -1,419 +0,0 @@
use std::str::FromStr;
use json_ptr::JsonPointer;
/// Used in the locking of a model where we have an all, a predicate to filter children.
/// This is split because we know the path or we have predicate filters
/// We split once we got the all, so we could go into the models and lock all of services.name for example
/// without locking all of them.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub enum JsonGlobSegment {
/// Used to be just be a regular json path
Path(String),
/// Indicating that we are going to be using some part of all of this Vec, Map, etc.
Star,
}
impl std::fmt::Display for JsonGlobSegment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JsonGlobSegment::Path(x) => {
write!(f, "{}", x)?;
}
JsonGlobSegment::Star => {
write!(f, "*")?;
}
}
Ok(())
}
}
/// Use in the model to point from root down a specific path
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub enum JsonGlob {
/// This was the default
Path(JsonPointer),
/// Once we add an All, our predicate, we don't know the possible paths could be in the maps so we are filling
/// in binds for the possible paths to take.
PathWithStar(PathWithStar),
}
/// Path including the glob
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct PathWithStar {
segments: Vec<JsonGlobSegment>,
count: usize,
}
impl PathWithStar {
pub fn segments(&self) -> &[JsonGlobSegment] {
&self.segments
}
pub fn count(&self) -> usize {
self.count
}
}
impl Default for JsonGlob {
fn default() -> Self {
Self::Path(Default::default())
}
}
impl From<JsonPointer> for JsonGlob {
fn from(pointer: JsonPointer) -> Self {
Self::Path(pointer)
}
}
impl std::fmt::Display for JsonGlob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JsonGlob::Path(x) => {
write!(f, "{}", x)?;
}
JsonGlob::PathWithStar(PathWithStar {
segments: path,
count: _,
}) => {
for path in path.iter() {
write!(f, "/")?;
write!(f, "{}", path)?;
}
}
}
Ok(())
}
}
impl FromStr for JsonGlob {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let split = s.split('/').filter(|x| !x.is_empty());
if !s.contains('*') {
return Ok(JsonGlob::Path(split.fold(
JsonPointer::default(),
|mut pointer, s| {
pointer.push_end(s);
pointer
},
)));
}
let segments: Vec<JsonGlobSegment> = split
.map(|x| match x {
"*" => JsonGlobSegment::Star,
x => JsonGlobSegment::Path(x.to_string()),
})
.collect();
let segments = segments;
let count = segments
.iter()
.filter(|x| matches!(x, JsonGlobSegment::Star))
.count();
Ok(JsonGlob::PathWithStar(PathWithStar { segments, count }))
}
}
impl JsonGlob {
pub fn append(self, path: JsonGlob) -> Self {
fn append_stars(
PathWithStar {
segments: mut left_segments,
count: left_count,
}: PathWithStar,
PathWithStar {
segments: mut right_segments,
count: right_count,
}: PathWithStar,
) -> PathWithStar {
left_segments.append(&mut right_segments);
PathWithStar {
segments: left_segments,
count: left_count + right_count,
}
}
fn point_as_path_with_star(pointer: JsonPointer) -> PathWithStar {
PathWithStar {
segments: pointer.into_iter().map(JsonGlobSegment::Path).collect(),
count: 0,
}
}
match (self, path) {
(JsonGlob::Path(mut paths), JsonGlob::Path(right_paths)) => {
paths.append(&right_paths);
JsonGlob::Path(paths)
}
(JsonGlob::Path(left), JsonGlob::PathWithStar(right)) => {
JsonGlob::PathWithStar(append_stars(point_as_path_with_star(left), right))
}
(JsonGlob::PathWithStar(left), JsonGlob::Path(right)) => {
JsonGlob::PathWithStar(append_stars(left, point_as_path_with_star(right)))
}
(JsonGlob::PathWithStar(left), JsonGlob::PathWithStar(right)) => {
JsonGlob::PathWithStar(append_stars(left, right))
}
}
}
/// Used during the creation of star paths
pub fn star() -> Self {
JsonGlob::PathWithStar(PathWithStar {
segments: vec![JsonGlobSegment::Star],
count: 1,
})
}
/// There are points that we use the JsonPointer starts_with, and we need to be able to
/// utilize that and to be able to deal with the star paths
pub fn starts_with(&self, other: &JsonGlob) -> bool {
fn starts_with_<'a>(left: &Vec<JsonGlobSegment>, right: &Vec<JsonGlobSegment>) -> bool {
let mut left_paths = left.iter();
let mut right_paths = right.iter();
loop {
match (left_paths.next(), right_paths.next()) {
(Some(JsonGlobSegment::Path(x)), Some(JsonGlobSegment::Path(y))) => {
if x != y {
return false;
}
}
(Some(JsonGlobSegment::Star), Some(JsonGlobSegment::Star)) => {}
(Some(JsonGlobSegment::Star), Some(JsonGlobSegment::Path(_))) => {}
(Some(JsonGlobSegment::Path(_)), Some(JsonGlobSegment::Star)) => {}
(None, None) => return true,
(None, _) => return false,
(_, None) => return true,
}
}
}
match (self, other) {
(JsonGlob::Path(x), JsonGlob::Path(y)) => x.starts_with(y),
(
JsonGlob::Path(x),
JsonGlob::PathWithStar(PathWithStar {
segments: path,
count: _,
}),
) => starts_with_(
&x.iter()
.map(|x| JsonGlobSegment::Path(x.to_string()))
.collect(),
path,
),
(
JsonGlob::PathWithStar(PathWithStar {
segments: path,
count: _,
}),
JsonGlob::Path(y),
) => starts_with_(
path,
&y.iter()
.map(|x| JsonGlobSegment::Path(x.to_string()))
.collect(),
),
(
JsonGlob::PathWithStar(PathWithStar {
segments: path,
count: _,
}),
JsonGlob::PathWithStar(PathWithStar {
segments: path_other,
count: _,
}),
) => starts_with_(path, path_other),
}
}
/// When we need to convert back into a usuable pointer string that is used for the paths of the
/// get and set of the db.
pub fn as_pointer(&self, binds: &[&str]) -> JsonPointer {
match self {
JsonGlob::Path(json_pointer) => json_pointer.clone(),
JsonGlob::PathWithStar(PathWithStar {
segments: path,
count: _,
}) => {
let mut json_pointer: JsonPointer = Default::default();
let mut binds = binds.iter();
for path in (*path).iter() {
match path {
JsonGlobSegment::Path(path) => json_pointer.push_end(&path),
JsonGlobSegment::Star => {
if let Some(path) = binds.next() {
json_pointer.push_end(path)
}
}
}
}
json_pointer
}
}
}
pub fn star_count(&self) -> usize {
match self {
JsonGlob::Path(_) => 0,
JsonGlob::PathWithStar(PathWithStar { count, .. }) => *count,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use proptest::prelude::*;
#[test]
fn model_paths_parse_simple() {
let path = "/a/b/c";
let model_paths = JsonGlob::from_str(path).unwrap();
assert_eq!(
model_paths.as_pointer(&[]),
JsonPointer::from_str(path).unwrap()
);
}
#[test]
fn model_paths_parse_star() {
let path = "/a/b/c/*/e";
let model_paths = JsonGlob::from_str(path).unwrap();
assert_eq!(
model_paths.as_pointer(&["d"]),
JsonPointer::from_str("/a/b/c/d/e").unwrap()
);
}
#[test]
fn append() {
let path = "/a/b/";
let model_paths = JsonGlob::from_str(path)
.unwrap()
.append("c".parse().unwrap());
assert_eq!(
model_paths.as_pointer(&[]),
JsonPointer::from_str("/a/b/c").unwrap()
);
}
#[test]
fn append_star() {
let path = "/a/b/";
let model_paths = JsonGlob::from_str(path)
.unwrap()
.append("*".parse().unwrap());
assert_eq!(
model_paths.as_pointer(&["c"]),
JsonPointer::from_str("/a/b/c").unwrap()
);
}
#[test]
fn star_append() {
let path = "/a/*/";
let model_paths = JsonGlob::from_str(path)
.unwrap()
.append("c".parse().unwrap());
assert_eq!(
model_paths.as_pointer(&["b"]),
JsonPointer::from_str("/a/b/c").unwrap()
);
}
#[test]
fn star_append_star() {
let path = "/a/*/";
let model_paths = JsonGlob::from_str(path)
.unwrap()
.append("*".parse().unwrap());
assert_eq!(
model_paths.as_pointer(&["b", "c"]),
JsonPointer::from_str("/a/b/c").unwrap()
);
}
#[test]
fn starts_with_paths() {
let path: JsonGlob = "/a/b".parse().unwrap();
let path_b: JsonGlob = "/a".parse().unwrap();
let path_c: JsonGlob = "/a/b/c".parse().unwrap();
assert!(path.starts_with(&path_b));
assert!(!path.starts_with(&path_c));
assert!(path_c.starts_with(&path));
assert!(!path_b.starts_with(&path));
}
#[test]
fn starts_with_star_left() {
let path: JsonGlob = "/a/*/c".parse().unwrap();
let path_a: JsonGlob = "/a".parse().unwrap();
let path_b: JsonGlob = "/b".parse().unwrap();
let path_full_c: JsonGlob = "/a/b/c".parse().unwrap();
let path_full_c_d: JsonGlob = "/a/b/c/d".parse().unwrap();
let path_full_d_other: JsonGlob = "/a/b/d".parse().unwrap();
assert!(path.starts_with(&path_a));
assert!(path.starts_with(&path));
assert!(!path.starts_with(&path_b));
assert!(path.starts_with(&path_full_c));
assert!(!path.starts_with(&path_full_c_d));
assert!(!path.starts_with(&path_full_d_other));
// Others start with
assert!(!path_a.starts_with(&path));
assert!(!path_b.starts_with(&path));
assert!(path_full_c.starts_with(&path));
assert!(path_full_c_d.starts_with(&path));
assert!(!path_full_d_other.starts_with(&path));
}
/// A valid star path is something like `/a/*/c`
/// A path may start with a letter, then any letter/ dash/ number
/// A star path may only be a star
pub fn arb_path_str() -> impl Strategy<Value = String> {
// Funny enough we can't test the max size, running out of memory, funny that
proptest::collection::vec("([a-z][a-z\\-0-9]*|\\*)", 0..100).prop_map(|a_s| {
a_s.into_iter().fold(String::new(), |mut s, x| {
s.push('/');
s.push_str(&x);
s
})
})
}
mod star_counts {
use super::*;
#[test]
fn base_have_valid_star_count() {
let path = "/a/*/c";
let glob = JsonGlob::from_str(&path).unwrap();
assert_eq!(
glob.star_count(),
1,
"Star count should be the total number of star paths for path {}",
path
);
}
proptest! {
#[test]
fn all_valid_paths_have_valid_star_count(path in arb_path_str()) {
let glob = JsonGlob::from_str(&path).unwrap();
prop_assert_eq!(glob.star_count(), path.matches('*').count(), "Star count should be the total number of star paths for path {}", path);
}
}
}
proptest! {
#[test]
fn inductive_append_as_monoid(left in arb_path_str(), right in arb_path_str()) {
let left_glob = JsonGlob::from_str(&left).unwrap();
let right_glob = JsonGlob::from_str(&right).unwrap();
let expected_join = format!("{}{}", left, right);
let expected = JsonGlob::from_str(&expected_join).unwrap();
let answer = left_glob.append(right_glob);
prop_assert_eq!(answer, expected, "Appending another path should be the same as joining them as a string first for path {}", expected_join);
}
#[test]
fn all_globs_parse_display_isomorphism(path in arb_path_str()) {
let glob = JsonGlob::from_str(&path).unwrap();
let other_glob = JsonGlob::from_str(&glob.to_string()).unwrap();
prop_assert_eq!(other_glob, glob);
}
}
}

View File

@@ -1,9 +1,9 @@
use std::ops::Deref;
use imbl_value::Value;
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeSet;
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -1,25 +1,24 @@
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fs::OpenOptions;
use std::io::{SeekFrom, Write};
use std::panic::UnwindSafe;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use fd_lock_rs::FdLock;
use futures::FutureExt;
use imbl_value::{InternedString, Value};
use json_patch::PatchError;
use json_ptr::{JsonPointer, SegList};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::fs::File;
use tokio::io::AsyncSeekExt;
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
use crate::handle::HandleId;
use crate::locker::Locker;
use crate::patch::{diff, DiffPatch, Dump, Revision};
use crate::subscriber::Broadcast;
use crate::{Error, PatchDbHandle, Subscriber};
use crate::{Error, Subscriber};
lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
@@ -119,7 +118,7 @@ impl Store {
.append(true)
.open(path.with_extension("failed"))?,
"{}",
serde_json::to_string(&patch)?,
imbl_value::to_value(&patch).map_err(Error::JSON)?,
)?;
}
revision += 1;
@@ -161,7 +160,7 @@ impl Store {
pub(crate) fn keys<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> BTreeSet<String> {
) -> BTreeSet<InternedString> {
match ptr.get(&self.persistent).unwrap_or(&Value::Null) {
Value::Object(o) => o.keys().cloned().collect(),
_ => BTreeSet::new(),
@@ -174,7 +173,7 @@ impl Store {
&self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
Ok(serde_json::from_value(self.get_value(ptr))?)
Ok(imbl_value::from_value(self.get_value(ptr))?)
}
pub(crate) fn dump(&self) -> Result<Dump, Error> {
Ok(Dump {
@@ -185,17 +184,21 @@ impl Store {
pub(crate) fn subscribe(&mut self) -> Subscriber {
self.broadcast.subscribe()
}
pub(crate) async fn put_value<S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<Option<Arc<Revision>>, Error> {
let mut patch = diff(ptr.get(&self.persistent).unwrap_or(&Value::Null), value);
patch.prepend(ptr);
self.apply(patch).await
}
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
let mut patch = diff(
ptr.get(&self.persistent).unwrap_or(&Value::Null),
&serde_json::to_value(value)?,
);
patch.prepend(ptr);
self.apply(patch).await
self.put_value(ptr, &imbl_value::to_value(&value)?).await
}
pub(crate) async fn compress(&mut self) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
@@ -292,15 +295,11 @@ impl Store {
#[derive(Clone)]
pub struct PatchDb {
pub(crate) store: Arc<RwLock<Store>>,
pub(crate) locker: Arc<Locker>,
handle_id: Arc<AtomicU64>,
}
impl PatchDb {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
Ok(PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)),
locker: Arc::new(Locker::new()),
handle_id: Arc::new(AtomicU64::new(0)),
})
}
pub async fn sync(&self, sequence: u64) -> Result<Result<Vec<Arc<Revision>>, Dump>, Error> {
@@ -328,7 +327,7 @@ impl PatchDb {
pub async fn keys<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> BTreeSet<String> {
) -> BTreeSet<InternedString> {
self.store.read().await.keys(ptr)
}
pub async fn get_value<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Value {
@@ -349,30 +348,56 @@ impl PatchDb {
let rev = store.put(ptr, value).await?;
Ok(rev)
}
pub async fn apply(
&self,
patch: DiffPatch,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error> {
let mut store = if let Some(store_write_lock) = store_write_lock {
store_write_lock
} else {
self.store.write().await
};
pub async fn apply(&self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
let mut store = self.store.write().await;
let rev = store.apply(patch).await?;
Ok(rev)
}
pub fn handle(&self) -> PatchDbHandle {
PatchDbHandle {
id: HandleId {
id: self
.handle_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
#[cfg(feature = "trace")]
trace: Some(Arc::new(tracing_error::SpanTrace::capture())),
},
db: self.clone(),
locks: Vec::new(),
pub async fn apply_function<F, T, E>(&self, f: F) -> Result<(Value, T), E>
where
F: FnOnce(Value) -> Result<(Value, T), E> + UnwindSafe,
E: From<Error>,
{
let mut store = self.store.write().await;
let old = store.persistent.clone();
let (new, res) = std::panic::catch_unwind(move || f(old)).map_err(|e| {
Error::Panic(
e.downcast()
.map(|a| *a)
.unwrap_or_else(|_| "UNKNOWN".to_owned()),
)
})??;
let diff = diff(&store.persistent, &new);
store.apply(diff).await?;
Ok((new, res))
}
pub async fn run_idempotent<F, Fut, T, E>(&self, f: F) -> Result<(Value, T), E>
where
F: Fn(Value) -> Fut + Send + Sync + UnwindSafe,
for<'a> &'a F: UnwindSafe,
Fut: std::future::Future<Output = Result<(Value, T), E>> + UnwindSafe,
E: From<Error>,
{
let store = self.store.read().await;
let old = store.persistent.clone();
drop(store);
loop {
let (new, res) = async { f(old.clone()).await }
.catch_unwind()
.await
.map_err(|e| {
Error::Panic(
e.downcast()
.map(|a| *a)
.unwrap_or_else(|_| "UNKNOWN".to_owned()),
)
})??;
let mut store = self.store.write().await;
if &old == &store.persistent {
let diff = diff(&store.persistent, &new);
store.apply(diff).await?;
return Ok((new, res));
}
}
}
}

View File

@@ -1,28 +1,28 @@
use std::future::Future;
use std::sync::Arc;
use imbl_value::{json, Value};
use json_ptr::JsonPointer;
use patch_db::{HasModel, PatchDb, Revision};
use proptest::prelude::*;
use serde_json::Value;
use tokio::fs;
use tokio::runtime::Builder;
use crate::{self as patch_db, DbHandle, LockType};
use crate::{self as patch_db};
async fn init_db(db_name: String) -> PatchDb {
cleanup_db(&db_name).await;
let db = PatchDb::open(db_name).await.unwrap();
db.put(
&JsonPointer::<&'static str>::default(),
&Sample {
a: "test1".to_string(),
b: Child {
a: "test2".to_string(),
b: 1,
c: NewType(None),
},
&json!({
"a": "test1",
"b": {
"a": "test2",
"b": 1,
"c": null,
},
}),
)
.await
.unwrap();
@@ -45,23 +45,10 @@ async fn basic() {
let db = init_db("test.db".to_string()).await;
let ptr: JsonPointer = "/b/b".parse().unwrap();
let mut get_res: Value = db.get(&ptr).await.unwrap();
assert_eq!(get_res, 1);
assert_eq!(get_res.as_u64(), Some(1));
db.put(&ptr, "hello").await.unwrap();
get_res = db.get(&ptr).await.unwrap();
assert_eq!(get_res, "hello");
cleanup_db("test.db").await;
}
#[tokio::test]
async fn transaction() {
let db = init_db("test.db".to_string()).await;
let mut handle = db.handle();
let mut tx = handle.begin().await.unwrap();
let ptr: JsonPointer = "/b/b".parse().unwrap();
tx.put(&ptr, &(2 as usize)).await.unwrap();
tx.put(&ptr, &(1 as usize)).await.unwrap();
let _res = tx.commit().await.unwrap();
println!("res = {:?}", _res);
assert_eq!(get_res.as_str(), Some("hello"));
cleanup_db("test.db").await;
}
@@ -84,32 +71,19 @@ proptest! {
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
pub struct Sample {
a: String,
#[model]
b: Child,
}
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
// pub struct Sample {
// a: String,
// #[model]
// b: Child,
// }
#[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
pub struct Child {
a: String,
b: usize,
c: NewType,
}
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
// pub struct Child {
// a: String,
// b: usize,
// c: NewType,
// }
#[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
pub struct NewType(Option<Box<Sample>>);
#[tokio::test]
async fn locks_dropped_from_enforcer_on_tx_save() {
let db = init_db("test.db".to_string()).await;
let mut handle = db.handle();
let mut tx = handle.begin().await.unwrap();
let ptr_a: JsonPointer = "/a".parse().unwrap();
let ptr_b: JsonPointer = "/b".parse().unwrap();
tx.lock(ptr_b.into(), LockType::Write).await.unwrap();
tx.save().await.unwrap();
handle.lock(ptr_a.into(), LockType::Write).await.unwrap();
cleanup_db("test.db").await;
}
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
// pub struct NewType(Option<Box<Sample>>);

View File

@@ -1,204 +0,0 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use async_trait::async_trait;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::patch::{DiffPatch, Revision};
use crate::store::Store;
use crate::{
bulk_locks::Verifier,
locker::{Guard, LockType, Locker},
};
use crate::{handle::HandleId, model_paths::JsonGlob};
use crate::{DbHandle, Error, PatchDbHandle, Subscriber};
pub struct Transaction<Parent: DbHandle> {
pub(crate) id: HandleId,
pub(crate) parent: Parent,
pub(crate) locks: Vec<Guard>,
pub(crate) updates: DiffPatch,
pub(crate) sub: Subscriber,
}
impl Transaction<&mut PatchDbHandle> {
pub async fn commit(mut self) -> Result<Option<Arc<Revision>>, Error> {
if (self.updates.0).0.is_empty() {
Ok(None)
} else {
let store_lock = self.parent.store();
let store = store_lock.write().await;
self.rebase();
let rev = self.parent.db.apply(self.updates, Some(store)).await?;
Ok(rev)
}
}
pub async fn abort(mut self) -> Result<DiffPatch, Error> {
let store_lock = self.parent.store();
let _store = store_lock.read().await;
self.rebase();
Ok(self.updates)
}
}
impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
pub async fn save(mut self) -> Result<(), Error> {
let store_lock = self.parent.store();
let store = store_lock.write().await;
self.rebase();
self.parent.apply(self.updates, Some(store)).await?;
Ok(())
}
}
#[async_trait]
impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
let store_lock = self.parent.store();
let mut store = store_lock.write().await;
self.rebase();
let sub = store.subscribe();
drop(store);
Ok(Transaction {
id: self.id(),
parent: self,
locks: Vec::new(),
updates: DiffPatch::default(),
sub,
})
}
fn id(&self) -> HandleId {
self.id.clone()
}
fn rebase(&mut self) {
self.parent.rebase();
while let Ok(rev) = self.sub.try_recv() {
self.updates.rebase(&rev.patch);
}
}
fn store(&self) -> Arc<RwLock<Store>> {
self.parent.store()
}
fn locker(&self) -> &Locker {
self.parent.locker()
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> bool {
let exists = {
let store_lock = self.parent.store();
let store = if let Some(store_read_lock) = store_read_lock {
store_read_lock
} else {
store_lock.read().await
};
self.rebase();
self.parent.exists(ptr, Some(store)).await
};
self.updates.for_path(ptr).exists().unwrap_or(exists)
}
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> BTreeSet<String> {
let keys = {
let store_lock = self.parent.store();
let store = if let Some(store_read_lock) = store_read_lock {
store_read_lock
} else {
store_lock.read().await
};
self.rebase();
self.parent.keys(ptr, Some(store)).await
};
self.updates.for_path(ptr).keys(keys)
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> {
let mut data = {
let store_lock = self.parent.store();
let store = if let Some(store_read_lock) = store_read_lock {
store_read_lock
} else {
store_lock.read().await
};
self.rebase();
self.parent.get_value(ptr, Some(store)).await?
};
let path_updates = self.updates.for_path(ptr);
if !(path_updates.0).0.is_empty() {
#[cfg(feature = "tracing")]
tracing::trace!("Applying patch {:?} at path {}", path_updates, ptr);
json_patch::patch(&mut data, &*path_updates)?;
}
Ok(data)
}
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<Option<Arc<Revision>>, Error> {
let old = self.get_value(ptr, None).await?;
let mut patch = crate::patch::diff(&old, &value);
patch.prepend(ptr);
self.updates.append(patch);
Ok(None)
}
async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error> {
self.locks.push(
self.parent
.locker()
.lock(self.id.clone(), ptr, lock_type)
.await?,
);
Ok(())
}
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
Ok(serde_json::from_value(self.get_value(ptr, None).await?)?)
}
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
self.put_value(ptr, &serde_json::to_value(value)?).await
}
async fn apply(
&mut self,
patch: DiffPatch,
_store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error> {
self.updates.append(patch);
Ok(None)
}
async fn lock_all<'a>(
&'a mut self,
locks: impl IntoIterator<Item = crate::LockTargetId> + Send + Clone + 'a,
) -> Result<crate::bulk_locks::Verifier, Error> {
let verifier = Verifier {
target_locks: locks.clone().into_iter().collect(),
};
self.locks
.push(self.parent.locker().lock_all(&self.id, locks).await?);
Ok(verifier)
}
}