mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
minor refactor of transactions
This commit is contained in:
committed by
Aiden McClelland
parent
ab4fc4b4e4
commit
b70f1ce3af
@@ -162,6 +162,11 @@ fn build_model_struct(
|
||||
#model_name(#inner_model::from(ptr))
|
||||
}
|
||||
}
|
||||
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
|
||||
fn from(model: #model_name) -> Self {
|
||||
model.0.into()
|
||||
}
|
||||
}
|
||||
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
||||
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
||||
self.0.as_ref()
|
||||
@@ -242,6 +247,11 @@ fn build_model_struct(
|
||||
#model_name(From::from(ptr))
|
||||
}
|
||||
}
|
||||
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
|
||||
fn from(model: #model_name) -> Self {
|
||||
model.0.into()
|
||||
}
|
||||
}
|
||||
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
||||
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
||||
self.0.as_ref()
|
||||
@@ -281,6 +291,11 @@ fn build_model_enum(base: &DeriveInput, _: &DataEnum, model_name: Option<Ident>)
|
||||
#model_name(From::from(ptr))
|
||||
}
|
||||
}
|
||||
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
|
||||
fn from(model: #model_name) -> Self {
|
||||
model.0.into()
|
||||
}
|
||||
}
|
||||
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
||||
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
||||
self.0.as_ref()
|
||||
|
||||
247
patch-db/src/handle.rs
Normal file
247
patch-db/src/handle.rs
Normal file
@@ -0,0 +1,247 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
|
||||
|
||||
use crate::{
|
||||
locker::{LockType, LockerGuard},
|
||||
Locker, PatchDb, Revision, Store, Transaction,
|
||||
};
|
||||
use crate::{patch::DiffPatch, Error};
|
||||
|
||||
#[async_trait]
|
||||
pub trait DbHandle: Sized + Send + Sync {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
|
||||
fn rebase(&mut self) -> Result<(), Error>;
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error>;
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<Value, Error>;
|
||||
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<(), Error>;
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error>;
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> ();
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<T, Error>;
|
||||
async fn put<
|
||||
T: Serialize + Send + Sync,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
) -> Result<(), Error>;
|
||||
}
|
||||
#[async_trait]
|
||||
impl<Handle: DbHandle + Send + Sync> DbHandle for &mut Handle {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||
let Transaction {
|
||||
locks,
|
||||
updates,
|
||||
sub,
|
||||
..
|
||||
} = (*self).begin().await?;
|
||||
Ok(Transaction {
|
||||
parent: self,
|
||||
locks,
|
||||
updates,
|
||||
sub,
|
||||
})
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
(*self).rebase()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
(**self).store()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
(**self).subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
(*self).locker_and_locks()
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error> {
|
||||
(*self).exists(ptr, store_read_lock).await
|
||||
}
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<Value, Error> {
|
||||
(*self).get_value(ptr, store_read_lock).await
|
||||
}
|
||||
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<(), Error> {
|
||||
(*self).put_value(ptr, value).await
|
||||
}
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
||||
(*self).apply(patch).await
|
||||
}
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) {
|
||||
(*self).lock(ptr, lock).await
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<T, Error> {
|
||||
(*self).get(ptr).await
|
||||
}
|
||||
async fn put<
|
||||
T: Serialize + Send + Sync,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
) -> Result<(), Error> {
|
||||
(*self).put(ptr, value).await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PatchDbHandle {
|
||||
pub(crate) db: PatchDb,
|
||||
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DbHandle for PatchDbHandle {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||
Ok(Transaction {
|
||||
sub: self.subscribe(),
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch::default(),
|
||||
})
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.db.store.clone()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.db.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
(&self.db.locker, vec![self.locks.as_mut_slice()])
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error> {
|
||||
if let Some(lock) = store_read_lock {
|
||||
lock.exists(ptr)
|
||||
} else {
|
||||
self.db.exists(ptr).await
|
||||
}
|
||||
}
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<Value, Error> {
|
||||
if let Some(lock) = store_read_lock {
|
||||
lock.get(ptr)
|
||||
} else {
|
||||
self.db.get(ptr).await
|
||||
}
|
||||
}
|
||||
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<(), Error> {
|
||||
self.db.put(ptr, value).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
||||
self.db.apply(patch, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) {
|
||||
match lock {
|
||||
LockType::Read => {
|
||||
self.db
|
||||
.locker
|
||||
.add_read_lock(ptr, &mut self.locks, &mut [])
|
||||
.await;
|
||||
}
|
||||
LockType::Write => {
|
||||
self.db
|
||||
.locker
|
||||
.add_write_lock(ptr, &mut self.locks, &mut [])
|
||||
.await;
|
||||
}
|
||||
LockType::None => (),
|
||||
}
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<T, Error> {
|
||||
self.db.get(ptr).await
|
||||
}
|
||||
async fn put<
|
||||
T: Serialize + Send + Sync,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
) -> Result<(), Error> {
|
||||
self.db.put(ptr, value).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ use tokio::sync::broadcast::error::TryRecvError;
|
||||
|
||||
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
|
||||
|
||||
mod handle;
|
||||
mod locker;
|
||||
mod model;
|
||||
mod patch;
|
||||
@@ -15,6 +16,7 @@ mod transaction;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
pub use handle::{DbHandle, PatchDbHandle};
|
||||
pub use json_ptr;
|
||||
pub use locker::{LockType, Locker};
|
||||
pub use model::{
|
||||
@@ -23,7 +25,7 @@ pub use model::{
|
||||
pub use patch::Revision;
|
||||
pub use patch_db_macro::HasModel;
|
||||
pub use store::{PatchDb, Store};
|
||||
pub use transaction::{Checkpoint, SubTransaction, Transaction};
|
||||
pub use transaction::Transaction;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
|
||||
@@ -7,9 +7,8 @@ use json_ptr::JsonPointer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::locker::LockType;
|
||||
use crate::transaction::Checkpoint;
|
||||
use crate::Error;
|
||||
use crate::{locker::LockType, DbHandle};
|
||||
|
||||
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
|
||||
impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelData<T> {
|
||||
@@ -25,13 +24,13 @@ pub struct ModelDataMut<T: Serialize + for<'de> Deserialize<'de>> {
|
||||
ptr: JsonPointer,
|
||||
}
|
||||
impl<T: Serialize + for<'de> Deserialize<'de>> ModelDataMut<T> {
|
||||
pub async fn save<Tx: Checkpoint>(self, tx: &mut Tx) -> Result<(), Error> {
|
||||
pub async fn save<Db: DbHandle>(self, db: &mut Db) -> Result<(), Error> {
|
||||
let current = serde_json::to_value(&self.current)?;
|
||||
let mut diff = crate::patch::diff(&self.original, ¤t);
|
||||
let target = tx.get_value(&self.ptr, None).await?;
|
||||
let target = db.get_value(&self.ptr, None).await?;
|
||||
diff.rebase(&crate::patch::diff(&self.original, &target));
|
||||
diff.prepend(&self.ptr);
|
||||
tx.apply(diff);
|
||||
db.apply(diff).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -56,17 +55,18 @@ impl<T> Model<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
pub async fn lock<Tx: Checkpoint>(&self, tx: &mut Tx, lock: LockType) {
|
||||
tx.lock(&self.ptr, lock).await
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
|
||||
db.lock(&self.ptr, lock).await
|
||||
}
|
||||
|
||||
pub async fn get<Tx: Checkpoint>(&self, tx: &mut Tx) -> Result<ModelData<T>, Error> {
|
||||
Ok(ModelData(tx.get(&self.ptr, LockType::Read).await?))
|
||||
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<T>, Error> {
|
||||
db.lock(&self.ptr, LockType::Read).await;
|
||||
Ok(ModelData(db.get(&self.ptr).await?))
|
||||
}
|
||||
|
||||
pub async fn get_mut<Tx: Checkpoint>(&self, tx: &mut Tx) -> Result<ModelDataMut<T>, Error> {
|
||||
self.lock(tx, LockType::Write).await;
|
||||
let original = tx.get_value(&self.ptr, None).await?;
|
||||
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
|
||||
self.lock(db, LockType::Write).await;
|
||||
let original = db.get_value(&self.ptr, None).await?;
|
||||
let current = serde_json::from_value(original.clone())?;
|
||||
Ok(ModelDataMut {
|
||||
original,
|
||||
@@ -88,9 +88,9 @@ impl<T> Model<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
|
||||
{
|
||||
pub async fn put<Tx: Checkpoint>(&self, tx: &mut Tx, value: &T) -> Result<(), Error> {
|
||||
self.lock(tx, LockType::Write).await;
|
||||
tx.put(&self.ptr, value).await
|
||||
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
|
||||
self.lock(db, LockType::Write).await;
|
||||
db.put(&self.ptr, value).await
|
||||
}
|
||||
}
|
||||
impl<T> From<JsonPointer> for Model<T>
|
||||
@@ -132,8 +132,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub trait HasModel {
|
||||
type Model: From<JsonPointer> + AsRef<JsonPointer>;
|
||||
pub trait HasModel: Serialize + for<'de> Deserialize<'de> {
|
||||
type Model: From<JsonPointer> + AsRef<JsonPointer> + Into<JsonPointer> + From<Model<Self>>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -162,6 +162,11 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for
|
||||
BoxModel(T::Model::from(ptr))
|
||||
}
|
||||
}
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<BoxModel<T>> for JsonPointer {
|
||||
fn from(model: BoxModel<T>) -> Self {
|
||||
model.0.into()
|
||||
}
|
||||
}
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
|
||||
type Model = BoxModel<T>;
|
||||
}
|
||||
@@ -169,13 +174,32 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
pub async fn check<Tx: Checkpoint>(self, tx: &mut Tx) -> Result<Option<T::Model>, Error> {
|
||||
Ok(if tx.exists(self.0.as_ref(), None).await? {
|
||||
pub async fn exists<Db: DbHandle>(&self, db: &mut Db) -> Result<bool, Error> {
|
||||
db.lock(self.as_ref(), LockType::Read).await;
|
||||
Ok(db.exists(&self.as_ref(), None).await?)
|
||||
}
|
||||
|
||||
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
|
||||
Ok(if self.exists(db).await? {
|
||||
Some(self.0)
|
||||
} else {
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
|
||||
db.lock(self.as_ref(), LockType::Write).await;
|
||||
db.put(self.as_ref(), &Value::Null).await
|
||||
}
|
||||
}
|
||||
impl<T> OptionModel<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
|
||||
{
|
||||
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
|
||||
db.lock(self.as_ref(), LockType::Write).await;
|
||||
db.put(self.as_ref(), value).await
|
||||
}
|
||||
}
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Option<T>>>
|
||||
for OptionModel<T>
|
||||
@@ -189,8 +213,18 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for
|
||||
OptionModel(T::Model::from(ptr))
|
||||
}
|
||||
}
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<OptionModel<T>> for JsonPointer {
|
||||
fn from(model: OptionModel<T>) -> Self {
|
||||
model.0.into()
|
||||
}
|
||||
}
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> AsRef<JsonPointer> for OptionModel<T> {
|
||||
fn as_ref(&self) -> &JsonPointer {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Option<T> {
|
||||
type Model = BoxModel<T>;
|
||||
type Model = OptionModel<T>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -216,6 +250,11 @@ impl<T: Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for VecModel<T>
|
||||
VecModel(From::from(ptr))
|
||||
}
|
||||
}
|
||||
impl<T: Serialize + for<'de> Deserialize<'de>> From<VecModel<T>> for JsonPointer {
|
||||
fn from(model: VecModel<T>) -> Self {
|
||||
model.0.into()
|
||||
}
|
||||
}
|
||||
impl<T> AsRef<JsonPointer> for VecModel<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de>,
|
||||
@@ -282,6 +321,15 @@ where
|
||||
self.child(idx.as_ref()).into()
|
||||
}
|
||||
}
|
||||
impl<T> From<Model<T>> for MapModel<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||
T::Value: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
fn from(model: Model<T>) -> Self {
|
||||
MapModel(model)
|
||||
}
|
||||
}
|
||||
impl<T> From<JsonPointer> for MapModel<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||
@@ -291,6 +339,15 @@ where
|
||||
MapModel(From::from(ptr))
|
||||
}
|
||||
}
|
||||
impl<T> From<MapModel<T>> for JsonPointer
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||
T::Value: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
fn from(model: MapModel<T>) -> Self {
|
||||
model.0.into()
|
||||
}
|
||||
}
|
||||
impl<T> AsRef<JsonPointer> for MapModel<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||
@@ -309,8 +366,8 @@ where
|
||||
}
|
||||
impl<K, V> HasModel for BTreeMap<K, V>
|
||||
where
|
||||
K: Serialize + for<'de> Deserialize<'de> + Hash + Eq + AsRef<str>,
|
||||
K: Serialize + for<'de> Deserialize<'de> + Ord + AsRef<str>,
|
||||
V: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
type Model = MapModel<HashMap<K, V>>;
|
||||
type Model = MapModel<BTreeMap<K, V>>;
|
||||
}
|
||||
|
||||
@@ -14,10 +14,9 @@ use tokio::fs::File;
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};
|
||||
|
||||
use crate::locker::Locker;
|
||||
use crate::patch::{diff, DiffPatch, Revision};
|
||||
use crate::transaction::Transaction;
|
||||
use crate::Error;
|
||||
use crate::{locker::Locker, PatchDbHandle};
|
||||
|
||||
lazy_static! {
|
||||
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Qutex<()>>> = Mutex::new(HashMap::new());
|
||||
@@ -31,7 +30,7 @@ pub struct Store {
|
||||
revision: u64,
|
||||
}
|
||||
impl Store {
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
let (_lock, path) = {
|
||||
if !path.as_ref().exists() {
|
||||
tokio::fs::File::create(path.as_ref()).await?;
|
||||
@@ -123,13 +122,13 @@ impl Store {
|
||||
self.file.unlock(true).map_err(|e| e.1)?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn exists<S: AsRef<str>, V: SegList>(
|
||||
pub(crate) fn exists<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<bool, Error> {
|
||||
Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null)
|
||||
}
|
||||
pub fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
|
||||
pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<T, Error> {
|
||||
@@ -137,10 +136,10 @@ impl Store {
|
||||
ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(),
|
||||
)?)
|
||||
}
|
||||
pub fn dump(&self) -> Value {
|
||||
pub(crate) fn dump(&self) -> Value {
|
||||
self.get_data().unwrap().clone()
|
||||
}
|
||||
pub async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
|
||||
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
@@ -152,7 +151,7 @@ impl Store {
|
||||
patch.prepend(ptr);
|
||||
self.apply(patch).await
|
||||
}
|
||||
pub async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
|
||||
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
self.check_cache_corrupted()?;
|
||||
@@ -196,6 +195,9 @@ impl PatchDb {
|
||||
subscriber: Arc::new(subscriber),
|
||||
})
|
||||
}
|
||||
pub async fn dump(&self) -> Value {
|
||||
self.store.read().await.dump()
|
||||
}
|
||||
pub async fn exists<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
@@ -235,12 +237,10 @@ impl PatchDb {
|
||||
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.subscriber.subscribe()
|
||||
}
|
||||
pub fn begin(&self) -> Transaction {
|
||||
Transaction {
|
||||
pub fn handle(&self) -> PatchDbHandle {
|
||||
PatchDbHandle {
|
||||
db: self.clone(),
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch::default(),
|
||||
sub: self.subscribe(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde_json::Value;
|
||||
use tokio::fs;
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
use crate as patch_db;
|
||||
use crate::{self as patch_db, DbHandle};
|
||||
|
||||
async fn init_db(db_name: String) -> PatchDb {
|
||||
cleanup_db(&db_name).await;
|
||||
@@ -54,7 +54,8 @@ async fn basic() {
|
||||
#[tokio::test]
|
||||
async fn transaction() {
|
||||
let db = init_db("test.db".to_string()).await;
|
||||
let mut tx = db.begin();
|
||||
let mut handle = db.handle();
|
||||
let mut tx = handle.begin().await.unwrap();
|
||||
let ptr: JsonPointer = "/b/b".parse().unwrap();
|
||||
tx.put(&ptr, &(2 as usize)).await.unwrap();
|
||||
tx.put(&ptr, &(1 as usize)).await.unwrap();
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::future::{BoxFuture, FutureExt};
|
||||
use async_trait::async_trait;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
@@ -8,258 +8,59 @@ use tokio::sync::broadcast::error::TryRecvError;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
|
||||
use crate::locker::{LockType, Locker, LockerGuard};
|
||||
use crate::patch::{DiffPatch, Revision};
|
||||
use crate::store::{PatchDb, Store};
|
||||
use crate::store::Store;
|
||||
use crate::Error;
|
||||
use crate::{
|
||||
locker::{LockType, Locker, LockerGuard},
|
||||
DbHandle,
|
||||
};
|
||||
use crate::{
|
||||
patch::{DiffPatch, Revision},
|
||||
PatchDbHandle,
|
||||
};
|
||||
|
||||
pub trait Checkpoint: Sized {
|
||||
fn rebase(&mut self) -> Result<(), Error>;
|
||||
fn exists<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'a, Store>>,
|
||||
) -> BoxFuture<'a, Result<bool, Error>>;
|
||||
fn get_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'a, Store>>,
|
||||
) -> BoxFuture<'a, Result<Value, Error>>;
|
||||
fn put_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
value: &'a Value,
|
||||
) -> BoxFuture<'a, Result<(), Error>>;
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
|
||||
fn apply(&mut self, patch: DiffPatch);
|
||||
fn lock<'a, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> BoxFuture<'a, ()>;
|
||||
fn get<
|
||||
'a,
|
||||
T: for<'de> Deserialize<'de> + 'a,
|
||||
S: AsRef<str> + Clone + Send + Sync + 'a,
|
||||
V: SegList + Clone + Send + Sync + 'a,
|
||||
>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> BoxFuture<'a, Result<T, Error>>;
|
||||
fn put<
|
||||
'a,
|
||||
T: Serialize + Send + Sync + 'a,
|
||||
S: AsRef<str> + Send + Sync + 'a,
|
||||
V: SegList + Send + Sync + 'a,
|
||||
>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
value: &'a T,
|
||||
) -> BoxFuture<'a, Result<(), Error>>;
|
||||
}
|
||||
|
||||
pub struct Transaction {
|
||||
pub(crate) db: PatchDb,
|
||||
pub struct Transaction<Parent: DbHandle> {
|
||||
pub(crate) parent: Parent,
|
||||
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
|
||||
pub(crate) updates: DiffPatch,
|
||||
pub(crate) sub: Receiver<Arc<Revision>>,
|
||||
}
|
||||
impl Transaction {
|
||||
pub fn rebase(&mut self) -> Result<(), Error> {
|
||||
while let Some(rev) = match self.sub.try_recv() {
|
||||
Ok(a) => Some(a),
|
||||
Err(TryRecvError::Empty) => None,
|
||||
Err(e) => return Err(e.into()),
|
||||
} {
|
||||
self.updates.rebase(&rev.patch);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn exists<S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error> {
|
||||
let exists = {
|
||||
let store_lock = self.db.store.clone();
|
||||
let store = if let Some(store_read_lock) = store_read_lock {
|
||||
store_read_lock
|
||||
} else {
|
||||
store_lock.read().await
|
||||
};
|
||||
self.rebase()?;
|
||||
ptr.get(store.get_data()?).unwrap_or(&Value::Null) != &Value::Null
|
||||
};
|
||||
Ok(self.updates.for_path(ptr).exists().unwrap_or(exists))
|
||||
}
|
||||
async fn get_value<S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<Value, Error> {
|
||||
let mut data = {
|
||||
let store_lock = self.db.store.clone();
|
||||
let store = if let Some(store_read_lock) = store_read_lock {
|
||||
store_read_lock
|
||||
} else {
|
||||
store_lock.read().await
|
||||
};
|
||||
self.rebase()?;
|
||||
ptr.get(store.get_data()?).cloned().unwrap_or_default()
|
||||
};
|
||||
json_patch::patch(&mut data, &*self.updates.for_path(ptr))?;
|
||||
Ok(data)
|
||||
}
|
||||
async fn put_value<S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<(), Error> {
|
||||
let old = Transaction::get_value(self, ptr, None).await?;
|
||||
let mut patch = crate::patch::diff(&old, &value);
|
||||
patch.prepend(ptr);
|
||||
self.updates.append(patch);
|
||||
Ok(())
|
||||
}
|
||||
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) {
|
||||
match lock {
|
||||
LockType::None => (),
|
||||
LockType::Read => {
|
||||
self.db
|
||||
.locker
|
||||
.add_read_lock(ptr, &mut self.locks, &mut [])
|
||||
.await
|
||||
}
|
||||
LockType::Write => {
|
||||
self.db
|
||||
.locker
|
||||
.add_write_lock(ptr, &mut self.locks, &mut [])
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str> + Clone, V: SegList + Clone>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> Result<T, Error> {
|
||||
self.lock(ptr, lock).await;
|
||||
Ok(serde_json::from_value(
|
||||
Transaction::get_value(self, ptr, None).await?,
|
||||
)?)
|
||||
}
|
||||
pub async fn put<T: Serialize, S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
) -> Result<(), Error> {
|
||||
Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await
|
||||
}
|
||||
impl Transaction<&mut PatchDbHandle> {
|
||||
pub async fn commit(mut self) -> Result<Arc<Revision>, Error> {
|
||||
let store_lock = self.db.store.clone();
|
||||
let store = store_lock.write().await;
|
||||
self.rebase()?;
|
||||
self.db.apply(self.updates, Some(store)).await
|
||||
}
|
||||
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
|
||||
let store_lock = self.db.store.clone();
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
let sub = self.db.subscribe();
|
||||
let rev = self.parent.db.apply(self.updates, None).await?;
|
||||
drop(store);
|
||||
Ok(SubTransaction {
|
||||
Ok(rev)
|
||||
}
|
||||
}
|
||||
impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
|
||||
pub async fn save(mut self) -> Result<(), Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
self.parent.apply(self.updates).await?;
|
||||
drop(store);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
let sub = self.parent.subscribe();
|
||||
drop(store);
|
||||
Ok(Transaction {
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch::default(),
|
||||
sub,
|
||||
})
|
||||
}
|
||||
}
|
||||
impl<'a> Checkpoint for &'a mut Transaction {
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
Transaction::rebase(self)
|
||||
}
|
||||
fn exists<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
||||
) -> BoxFuture<'b, Result<bool, Error>> {
|
||||
Transaction::exists(self, ptr, store_read_lock).boxed()
|
||||
}
|
||||
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
||||
) -> BoxFuture<'b, Result<Value, Error>> {
|
||||
Transaction::get_value(self, ptr, store_read_lock).boxed()
|
||||
}
|
||||
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
value: &'b Value,
|
||||
) -> BoxFuture<'b, Result<(), Error>> {
|
||||
Transaction::put_value(self, ptr, value).boxed()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.db.store.clone()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.db.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
(&self.db.locker, vec![&mut self.locks])
|
||||
}
|
||||
fn apply(&mut self, patch: DiffPatch) {
|
||||
self.updates.append(patch)
|
||||
}
|
||||
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> BoxFuture<'b, ()> {
|
||||
Transaction::lock(self, ptr, lock).boxed()
|
||||
}
|
||||
fn get<
|
||||
'b,
|
||||
T: for<'de> Deserialize<'de> + 'b,
|
||||
S: AsRef<str> + Clone + Send + Sync + 'b,
|
||||
V: SegList + Clone + Send + Sync + 'b,
|
||||
>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> BoxFuture<'b, Result<T, Error>> {
|
||||
Transaction::get(self, ptr, lock).boxed()
|
||||
}
|
||||
fn put<
|
||||
'b,
|
||||
T: Serialize + Send + Sync + 'b,
|
||||
S: AsRef<str> + Send + Sync + 'b,
|
||||
V: SegList + Send + Sync + 'b,
|
||||
>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
value: &'b T,
|
||||
) -> BoxFuture<'b, Result<(), Error>> {
|
||||
Transaction::put(self, ptr, value).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SubTransaction<Tx: Checkpoint> {
|
||||
parent: Tx,
|
||||
locks: Vec<(JsonPointer, LockerGuard)>,
|
||||
updates: DiffPatch,
|
||||
sub: Receiver<Arc<Revision>>,
|
||||
}
|
||||
impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
||||
pub fn rebase(&mut self) -> Result<(), Error> {
|
||||
self.parent.rebase()?;
|
||||
while let Some(rev) = match self.sub.try_recv() {
|
||||
Ok(a) => Some(a),
|
||||
@@ -270,6 +71,17 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.parent.store()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.parent.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locks.push(&mut self.locks);
|
||||
(locker, locks)
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
@@ -310,13 +122,13 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<(), Error> {
|
||||
let old = SubTransaction::get_value(self, ptr, None).await?;
|
||||
let old = self.get_value(ptr, None).await?;
|
||||
let mut patch = crate::patch::diff(&old, &value);
|
||||
patch.prepend(ptr);
|
||||
self.updates.append(patch);
|
||||
Ok(())
|
||||
}
|
||||
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
|
||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
@@ -335,117 +147,29 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
||||
}
|
||||
}
|
||||
}
|
||||
pub async fn get<
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
S: AsRef<str> + Clone + Send + Sync,
|
||||
V: SegList + Clone + Send + Sync,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> Result<T, Error> {
|
||||
self.lock(ptr, lock).await;
|
||||
Ok(serde_json::from_value(
|
||||
SubTransaction::get_value(self, ptr, None).await?,
|
||||
)?)
|
||||
Ok(serde_json::from_value(self.get_value(ptr, None).await?)?)
|
||||
}
|
||||
pub async fn put<T: Serialize, S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
async fn put<
|
||||
T: Serialize + Send + Sync,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
V: SegList + Send + Sync,
|
||||
>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
) -> Result<(), Error> {
|
||||
SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await
|
||||
self.put_value(ptr, &serde_json::to_value(value)?).await
|
||||
}
|
||||
pub async fn commit(mut self) -> Result<(), Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
self.parent.apply(self.updates);
|
||||
drop(store);
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
||||
self.updates.append(patch);
|
||||
Ok(())
|
||||
}
|
||||
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
let sub = self.parent.subscribe();
|
||||
drop(store);
|
||||
Ok(SubTransaction {
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch::default(),
|
||||
sub,
|
||||
})
|
||||
}
|
||||
}
|
||||
impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx> {
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
SubTransaction::rebase(self)
|
||||
}
|
||||
fn exists<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
||||
) -> BoxFuture<'b, Result<bool, Error>> {
|
||||
SubTransaction::exists(self, ptr, store_read_lock).boxed()
|
||||
}
|
||||
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
||||
) -> BoxFuture<'b, Result<Value, Error>> {
|
||||
SubTransaction::get_value(self, ptr, store_read_lock).boxed()
|
||||
}
|
||||
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
value: &'b Value,
|
||||
) -> BoxFuture<'b, Result<(), Error>> {
|
||||
SubTransaction::put_value(self, ptr, value).boxed()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.parent.store()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.parent.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locks.push(&mut self.locks);
|
||||
(locker, locks)
|
||||
}
|
||||
fn apply(&mut self, patch: DiffPatch) {
|
||||
self.updates.append(patch)
|
||||
}
|
||||
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> BoxFuture<'b, ()> {
|
||||
SubTransaction::lock(self, ptr, lock).boxed()
|
||||
}
|
||||
fn get<
|
||||
'b,
|
||||
T: for<'de> Deserialize<'de> + 'b,
|
||||
S: AsRef<str> + Clone + Send + Sync + 'b,
|
||||
V: SegList + Clone + Send + Sync + 'b,
|
||||
>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
lock: LockType,
|
||||
) -> BoxFuture<'b, Result<T, Error>> {
|
||||
SubTransaction::get(self, ptr, lock).boxed()
|
||||
}
|
||||
fn put<
|
||||
'b,
|
||||
T: Serialize + Send + Sync + 'b,
|
||||
S: AsRef<str> + Send + Sync + 'b,
|
||||
V: SegList + Send + Sync + 'b,
|
||||
>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
value: &'b T,
|
||||
) -> BoxFuture<'b, Result<(), Error>> {
|
||||
SubTransaction::put(self, ptr, value).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user