minor refactor of transactions

This commit is contained in:
Aiden McClelland
2021-04-27 14:59:53 -06:00
parent 235362bdab
commit f1febb6bb4
7 changed files with 421 additions and 375 deletions

View File

@@ -162,6 +162,11 @@ fn build_model_struct(
#model_name(#inner_model::from(ptr))
}
}
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
fn from(model: #model_name) -> Self {
model.0.into()
}
}
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
self.0.as_ref()
@@ -242,6 +247,11 @@ fn build_model_struct(
#model_name(From::from(ptr))
}
}
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
fn from(model: #model_name) -> Self {
model.0.into()
}
}
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
self.0.as_ref()
@@ -281,6 +291,11 @@ fn build_model_enum(base: &DeriveInput, _: &DataEnum, model_name: Option<Ident>)
#model_name(From::from(ptr))
}
}
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
fn from(model: #model_name) -> Self {
model.0.into()
}
}
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
self.0.as_ref()

247
patch-db/src/handle.rs Normal file
View File

@@ -0,0 +1,247 @@
use std::sync::Arc;
use async_trait::async_trait;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
use crate::{
locker::{LockType, LockerGuard},
Locker, PatchDb, Revision, Store, Transaction,
};
use crate::{patch::DiffPatch, Error};
#[async_trait]
pub trait DbHandle: Sized + Send + Sync {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
fn rebase(&mut self) -> Result<(), Error>;
fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>;
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error>;
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error>;
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error>;
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error>;
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
) -> ();
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error>;
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error>;
}
#[async_trait]
impl<Handle: DbHandle + Send + Sync> DbHandle for &mut Handle {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
let Transaction {
locks,
updates,
sub,
..
} = (*self).begin().await?;
Ok(Transaction {
parent: self,
locks,
updates,
sub,
})
}
fn rebase(&mut self) -> Result<(), Error> {
(*self).rebase()
}
fn store(&self) -> Arc<RwLock<Store>> {
(**self).store()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
(**self).subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
(*self).locker_and_locks()
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error> {
(*self).exists(ptr, store_read_lock).await
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> {
(*self).get_value(ptr, store_read_lock).await
}
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
(*self).put_value(ptr, value).await
}
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
(*self).apply(patch).await
}
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
) {
(*self).lock(ptr, lock).await
}
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
(*self).get(ptr).await
}
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
(*self).put(ptr, value).await
}
}
pub struct PatchDbHandle {
pub(crate) db: PatchDb,
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
}
#[async_trait]
impl DbHandle for PatchDbHandle {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
Ok(Transaction {
sub: self.subscribe(),
parent: self,
locks: Vec::new(),
updates: DiffPatch::default(),
})
}
fn rebase(&mut self) -> Result<(), Error> {
Ok(())
}
fn store(&self) -> Arc<RwLock<Store>> {
self.db.store.clone()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.db.subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
(&self.db.locker, vec![self.locks.as_mut_slice()])
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error> {
if let Some(lock) = store_read_lock {
lock.exists(ptr)
} else {
self.db.exists(ptr).await
}
}
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> {
if let Some(lock) = store_read_lock {
lock.get(ptr)
} else {
self.db.get(ptr).await
}
}
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
self.db.put(ptr, value).await?;
Ok(())
}
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
self.db.apply(patch, None).await?;
Ok(())
}
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
) {
match lock {
LockType::Read => {
self.db
.locker
.add_read_lock(ptr, &mut self.locks, &mut [])
.await;
}
LockType::Write => {
self.db
.locker
.add_write_lock(ptr, &mut self.locks, &mut [])
.await;
}
LockType::None => (),
}
}
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
self.db.get(ptr).await
}
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
self.db.put(ptr, value).await?;
Ok(())
}
}

View File

@@ -6,6 +6,7 @@ use tokio::sync::broadcast::error::TryRecvError;
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
mod handle;
mod locker;
mod model;
mod patch;
@@ -15,6 +16,7 @@ mod transaction;
#[cfg(test)]
mod test;
pub use handle::{DbHandle, PatchDbHandle};
pub use json_ptr;
pub use locker::{LockType, Locker};
pub use model::{
@@ -23,7 +25,7 @@ pub use model::{
pub use patch::Revision;
pub use patch_db_macro::HasModel;
pub use store::{PatchDb, Store};
pub use transaction::{Checkpoint, SubTransaction, Transaction};
pub use transaction::Transaction;
#[derive(Error, Debug)]
pub enum Error {

View File

@@ -7,9 +7,8 @@ use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::locker::LockType;
use crate::transaction::Checkpoint;
use crate::Error;
use crate::{locker::LockType, DbHandle};
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelData<T> {
@@ -25,13 +24,13 @@ pub struct ModelDataMut<T: Serialize + for<'de> Deserialize<'de>> {
ptr: JsonPointer,
}
impl<T: Serialize + for<'de> Deserialize<'de>> ModelDataMut<T> {
pub async fn save<Tx: Checkpoint>(self, tx: &mut Tx) -> Result<(), Error> {
pub async fn save<Db: DbHandle>(self, db: &mut Db) -> Result<(), Error> {
let current = serde_json::to_value(&self.current)?;
let mut diff = crate::patch::diff(&self.original, &current);
let target = tx.get_value(&self.ptr, None).await?;
let target = db.get_value(&self.ptr, None).await?;
diff.rebase(&crate::patch::diff(&self.original, &target));
diff.prepend(&self.ptr);
tx.apply(diff);
db.apply(diff).await?;
Ok(())
}
}
@@ -56,17 +55,18 @@ impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub async fn lock<Tx: Checkpoint>(&self, tx: &mut Tx, lock: LockType) {
tx.lock(&self.ptr, lock).await
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
db.lock(&self.ptr, lock).await
}
pub async fn get<Tx: Checkpoint>(&self, tx: &mut Tx) -> Result<ModelData<T>, Error> {
Ok(ModelData(tx.get(&self.ptr, LockType::Read).await?))
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<T>, Error> {
db.lock(&self.ptr, LockType::Read).await;
Ok(ModelData(db.get(&self.ptr).await?))
}
pub async fn get_mut<Tx: Checkpoint>(&self, tx: &mut Tx) -> Result<ModelDataMut<T>, Error> {
self.lock(tx, LockType::Write).await;
let original = tx.get_value(&self.ptr, None).await?;
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
self.lock(db, LockType::Write).await;
let original = db.get_value(&self.ptr, None).await?;
let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut {
original,
@@ -88,9 +88,9 @@ impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
pub async fn put<Tx: Checkpoint>(&self, tx: &mut Tx, value: &T) -> Result<(), Error> {
self.lock(tx, LockType::Write).await;
tx.put(&self.ptr, value).await
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
self.lock(db, LockType::Write).await;
db.put(&self.ptr, value).await
}
}
impl<T> From<JsonPointer> for Model<T>
@@ -132,8 +132,8 @@ where
}
}
pub trait HasModel {
type Model: From<JsonPointer> + AsRef<JsonPointer>;
pub trait HasModel: Serialize + for<'de> Deserialize<'de> {
type Model: From<JsonPointer> + AsRef<JsonPointer> + Into<JsonPointer> + From<Model<Self>>;
}
#[derive(Debug, Clone)]
@@ -162,6 +162,11 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for
BoxModel(T::Model::from(ptr))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<BoxModel<T>> for JsonPointer {
fn from(model: BoxModel<T>) -> Self {
model.0.into()
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
type Model = BoxModel<T>;
}
@@ -169,13 +174,32 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
#[derive(Debug, Clone)]
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn check<Tx: Checkpoint>(self, tx: &mut Tx) -> Result<Option<T::Model>, Error> {
Ok(if tx.exists(self.0.as_ref(), None).await? {
pub async fn exists<Db: DbHandle>(&self, db: &mut Db) -> Result<bool, Error> {
db.lock(self.as_ref(), LockType::Read).await;
Ok(db.exists(&self.as_ref(), None).await?)
}
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
Ok(if self.exists(db).await? {
Some(self.0)
} else {
None
})
}
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
db.lock(self.as_ref(), LockType::Write).await;
db.put(self.as_ref(), &Value::Null).await
}
}
impl<T> OptionModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
{
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
db.lock(self.as_ref(), LockType::Write).await;
db.put(self.as_ref(), value).await
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Option<T>>>
for OptionModel<T>
@@ -189,8 +213,18 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for
OptionModel(T::Model::from(ptr))
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<OptionModel<T>> for JsonPointer {
fn from(model: OptionModel<T>) -> Self {
model.0.into()
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> AsRef<JsonPointer> for OptionModel<T> {
fn as_ref(&self) -> &JsonPointer {
self.0.as_ref()
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Option<T> {
type Model = BoxModel<T>;
type Model = OptionModel<T>;
}
#[derive(Debug, Clone)]
@@ -216,6 +250,11 @@ impl<T: Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for VecModel<T>
VecModel(From::from(ptr))
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> From<VecModel<T>> for JsonPointer {
fn from(model: VecModel<T>) -> Self {
model.0.into()
}
}
impl<T> AsRef<JsonPointer> for VecModel<T>
where
T: Serialize + for<'de> Deserialize<'de>,
@@ -282,6 +321,15 @@ where
self.child(idx.as_ref()).into()
}
}
impl<T> From<Model<T>> for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn from(model: Model<T>) -> Self {
MapModel(model)
}
}
impl<T> From<JsonPointer> for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
@@ -291,6 +339,15 @@ where
MapModel(From::from(ptr))
}
}
impl<T> From<MapModel<T>> for JsonPointer
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn from(model: MapModel<T>) -> Self {
model.0.into()
}
}
impl<T> AsRef<JsonPointer> for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
@@ -309,8 +366,8 @@ where
}
impl<K, V> HasModel for BTreeMap<K, V>
where
K: Serialize + for<'de> Deserialize<'de> + Hash + Eq + AsRef<str>,
K: Serialize + for<'de> Deserialize<'de> + Ord + AsRef<str>,
V: Serialize + for<'de> Deserialize<'de>,
{
type Model = MapModel<HashMap<K, V>>;
type Model = MapModel<BTreeMap<K, V>>;
}

View File

@@ -14,10 +14,9 @@ use tokio::fs::File;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};
use crate::locker::Locker;
use crate::patch::{diff, DiffPatch, Revision};
use crate::transaction::Transaction;
use crate::Error;
use crate::{locker::Locker, PatchDbHandle};
lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Qutex<()>>> = Mutex::new(HashMap::new());
@@ -31,7 +30,7 @@ pub struct Store {
revision: u64,
}
impl Store {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let (_lock, path) = {
if !path.as_ref().exists() {
tokio::fs::File::create(path.as_ref()).await?;
@@ -123,13 +122,13 @@ impl Store {
self.file.unlock(true).map_err(|e| e.1)?;
Ok(())
}
pub fn exists<S: AsRef<str>, V: SegList>(
pub(crate) fn exists<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> Result<bool, Error> {
Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null)
}
pub fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
@@ -137,10 +136,10 @@ impl Store {
ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(),
)?)
}
pub fn dump(&self) -> Value {
pub(crate) fn dump(&self) -> Value {
self.get_data().unwrap().clone()
}
pub async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
@@ -152,7 +151,7 @@ impl Store {
patch.prepend(ptr);
self.apply(patch).await
}
pub async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
use tokio::io::AsyncWriteExt;
self.check_cache_corrupted()?;
@@ -196,6 +195,9 @@ impl PatchDb {
subscriber: Arc::new(subscriber),
})
}
pub async fn dump(&self) -> Value {
self.store.read().await.dump()
}
pub async fn exists<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
@@ -235,12 +237,10 @@ impl PatchDb {
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.subscriber.subscribe()
}
pub fn begin(&self) -> Transaction {
Transaction {
pub fn handle(&self) -> PatchDbHandle {
PatchDbHandle {
db: self.clone(),
locks: Vec::new(),
updates: DiffPatch::default(),
sub: self.subscribe(),
}
}
}

View File

@@ -8,7 +8,7 @@ use serde_json::Value;
use tokio::fs;
use tokio::runtime::Builder;
use crate as patch_db;
use crate::{self as patch_db, DbHandle};
async fn init_db(db_name: String) -> PatchDb {
cleanup_db(&db_name).await;
@@ -54,7 +54,8 @@ async fn basic() {
#[tokio::test]
async fn transaction() {
let db = init_db("test.db".to_string()).await;
let mut tx = db.begin();
let mut handle = db.handle();
let mut tx = handle.begin().await.unwrap();
let ptr: JsonPointer = "/b/b".parse().unwrap();
tx.put(&ptr, &(2 as usize)).await.unwrap();
tx.put(&ptr, &(1 as usize)).await.unwrap();

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use futures::future::{BoxFuture, FutureExt};
use async_trait::async_trait;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -8,258 +8,59 @@ use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::{RwLock, RwLockReadGuard};
use crate::locker::{LockType, Locker, LockerGuard};
use crate::patch::{DiffPatch, Revision};
use crate::store::{PatchDb, Store};
use crate::store::Store;
use crate::Error;
use crate::{
locker::{LockType, Locker, LockerGuard},
DbHandle,
};
use crate::{
patch::{DiffPatch, Revision},
PatchDbHandle,
};
pub trait Checkpoint: Sized {
fn rebase(&mut self) -> Result<(), Error>;
fn exists<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'a, Store>>,
) -> BoxFuture<'a, Result<bool, Error>>;
fn get_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'a, Store>>,
) -> BoxFuture<'a, Result<Value, Error>>;
fn put_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
value: &'a Value,
) -> BoxFuture<'a, Result<(), Error>>;
fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>;
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
fn apply(&mut self, patch: DiffPatch);
fn lock<'a, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'a, ()>;
fn get<
'a,
T: for<'de> Deserialize<'de> + 'a,
S: AsRef<str> + Clone + Send + Sync + 'a,
V: SegList + Clone + Send + Sync + 'a,
>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'a, Result<T, Error>>;
fn put<
'a,
T: Serialize + Send + Sync + 'a,
S: AsRef<str> + Send + Sync + 'a,
V: SegList + Send + Sync + 'a,
>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
value: &'a T,
) -> BoxFuture<'a, Result<(), Error>>;
}
pub struct Transaction {
pub(crate) db: PatchDb,
pub struct Transaction<Parent: DbHandle> {
pub(crate) parent: Parent,
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
pub(crate) updates: DiffPatch,
pub(crate) sub: Receiver<Arc<Revision>>,
}
impl Transaction {
pub fn rebase(&mut self) -> Result<(), Error> {
while let Some(rev) = match self.sub.try_recv() {
Ok(a) => Some(a),
Err(TryRecvError::Empty) => None,
Err(e) => return Err(e.into()),
} {
self.updates.rebase(&rev.patch);
}
Ok(())
}
async fn exists<S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error> {
let exists = {
let store_lock = self.db.store.clone();
let store = if let Some(store_read_lock) = store_read_lock {
store_read_lock
} else {
store_lock.read().await
};
self.rebase()?;
ptr.get(store.get_data()?).unwrap_or(&Value::Null) != &Value::Null
};
Ok(self.updates.for_path(ptr).exists().unwrap_or(exists))
}
async fn get_value<S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> {
let mut data = {
let store_lock = self.db.store.clone();
let store = if let Some(store_read_lock) = store_read_lock {
store_read_lock
} else {
store_lock.read().await
};
self.rebase()?;
ptr.get(store.get_data()?).cloned().unwrap_or_default()
};
json_patch::patch(&mut data, &*self.updates.for_path(ptr))?;
Ok(data)
}
async fn put_value<S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
let old = Transaction::get_value(self, ptr, None).await?;
let mut patch = crate::patch::diff(&old, &value);
patch.prepend(ptr);
self.updates.append(patch);
Ok(())
}
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
) {
match lock {
LockType::None => (),
LockType::Read => {
self.db
.locker
.add_read_lock(ptr, &mut self.locks, &mut [])
.await
}
LockType::Write => {
self.db
.locker
.add_write_lock(ptr, &mut self.locks, &mut [])
.await
}
}
}
pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str> + Clone, V: SegList + Clone>(
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
) -> Result<T, Error> {
self.lock(ptr, lock).await;
Ok(serde_json::from_value(
Transaction::get_value(self, ptr, None).await?,
)?)
}
pub async fn put<T: Serialize, S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await
}
impl Transaction<&mut PatchDbHandle> {
pub async fn commit(mut self) -> Result<Arc<Revision>, Error> {
let store_lock = self.db.store.clone();
let store = store_lock.write().await;
self.rebase()?;
self.db.apply(self.updates, Some(store)).await
}
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
let store_lock = self.db.store.clone();
let store_lock = self.parent.store();
let store = store_lock.read().await;
self.rebase()?;
let sub = self.db.subscribe();
let rev = self.parent.db.apply(self.updates, None).await?;
drop(store);
Ok(SubTransaction {
Ok(rev)
}
}
impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
pub async fn save(mut self) -> Result<(), Error> {
let store_lock = self.parent.store();
let store = store_lock.read().await;
self.rebase()?;
self.parent.apply(self.updates).await?;
drop(store);
Ok(())
}
}
#[async_trait]
impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
let store_lock = self.parent.store();
let store = store_lock.read().await;
self.rebase()?;
let sub = self.parent.subscribe();
drop(store);
Ok(Transaction {
parent: self,
locks: Vec::new(),
updates: DiffPatch::default(),
sub,
})
}
}
impl<'a> Checkpoint for &'a mut Transaction {
fn rebase(&mut self) -> Result<(), Error> {
Transaction::rebase(self)
}
fn exists<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
) -> BoxFuture<'b, Result<bool, Error>> {
Transaction::exists(self, ptr, store_read_lock).boxed()
}
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
) -> BoxFuture<'b, Result<Value, Error>> {
Transaction::get_value(self, ptr, store_read_lock).boxed()
}
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
value: &'b Value,
) -> BoxFuture<'b, Result<(), Error>> {
Transaction::put_value(self, ptr, value).boxed()
}
fn store(&self) -> Arc<RwLock<Store>> {
self.db.store.clone()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.db.subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
(&self.db.locker, vec![&mut self.locks])
}
fn apply(&mut self, patch: DiffPatch) {
self.updates.append(patch)
}
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'b, ()> {
Transaction::lock(self, ptr, lock).boxed()
}
fn get<
'b,
T: for<'de> Deserialize<'de> + 'b,
S: AsRef<str> + Clone + Send + Sync + 'b,
V: SegList + Clone + Send + Sync + 'b,
>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'b, Result<T, Error>> {
Transaction::get(self, ptr, lock).boxed()
}
fn put<
'b,
T: Serialize + Send + Sync + 'b,
S: AsRef<str> + Send + Sync + 'b,
V: SegList + Send + Sync + 'b,
>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
value: &'b T,
) -> BoxFuture<'b, Result<(), Error>> {
Transaction::put(self, ptr, value).boxed()
}
}
pub struct SubTransaction<Tx: Checkpoint> {
parent: Tx,
locks: Vec<(JsonPointer, LockerGuard)>,
updates: DiffPatch,
sub: Receiver<Arc<Revision>>,
}
impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
pub fn rebase(&mut self) -> Result<(), Error> {
self.parent.rebase()?;
while let Some(rev) = match self.sub.try_recv() {
Ok(a) => Some(a),
@@ -270,6 +71,17 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
}
Ok(())
}
fn store(&self) -> Arc<RwLock<Store>> {
self.parent.store()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.parent.subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
let (locker, mut locks) = self.parent.locker_and_locks();
locks.push(&mut self.locks);
(locker, locks)
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -310,13 +122,13 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
let old = SubTransaction::get_value(self, ptr, None).await?;
let old = self.get_value(ptr, None).await?;
let mut patch = crate::patch::diff(&old, &value);
patch.prepend(ptr);
self.updates.append(patch);
Ok(())
}
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
@@ -335,117 +147,29 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
}
}
}
pub async fn get<
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Clone + Send + Sync,
V: SegList + Clone + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
lock: LockType,
) -> Result<T, Error> {
self.lock(ptr, lock).await;
Ok(serde_json::from_value(
SubTransaction::get_value(self, ptr, None).await?,
)?)
Ok(serde_json::from_value(self.get_value(ptr, None).await?)?)
}
pub async fn put<T: Serialize, S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
async fn put<
T: Serialize + Send + Sync,
S: AsRef<str> + Send + Sync,
V: SegList + Send + Sync,
>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await
self.put_value(ptr, &serde_json::to_value(value)?).await
}
pub async fn commit(mut self) -> Result<(), Error> {
let store_lock = self.parent.store();
let store = store_lock.read().await;
self.rebase()?;
self.parent.apply(self.updates);
drop(store);
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
self.updates.append(patch);
Ok(())
}
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
let store_lock = self.parent.store();
let store = store_lock.read().await;
self.rebase()?;
let sub = self.parent.subscribe();
drop(store);
Ok(SubTransaction {
parent: self,
locks: Vec::new(),
updates: DiffPatch::default(),
sub,
})
}
}
impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx> {
fn rebase(&mut self) -> Result<(), Error> {
SubTransaction::rebase(self)
}
fn exists<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
) -> BoxFuture<'b, Result<bool, Error>> {
SubTransaction::exists(self, ptr, store_read_lock).boxed()
}
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
) -> BoxFuture<'b, Result<Value, Error>> {
SubTransaction::get_value(self, ptr, store_read_lock).boxed()
}
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
value: &'b Value,
) -> BoxFuture<'b, Result<(), Error>> {
SubTransaction::put_value(self, ptr, value).boxed()
}
fn store(&self) -> Arc<RwLock<Store>> {
self.parent.store()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.parent.subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
let (locker, mut locks) = self.parent.locker_and_locks();
locks.push(&mut self.locks);
(locker, locks)
}
fn apply(&mut self, patch: DiffPatch) {
self.updates.append(patch)
}
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'b, ()> {
SubTransaction::lock(self, ptr, lock).boxed()
}
fn get<
'b,
T: for<'de> Deserialize<'de> + 'b,
S: AsRef<str> + Clone + Send + Sync + 'b,
V: SegList + Clone + Send + Sync + 'b,
>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'b, Result<T, Error>> {
SubTransaction::get(self, ptr, lock).boxed()
}
fn put<
'b,
T: Serialize + Send + Sync + 'b,
S: AsRef<str> + Send + Sync + 'b,
V: SegList + Send + Sync + 'b,
>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
value: &'b T,
) -> BoxFuture<'b, Result<(), Error>> {
SubTransaction::put(self, ptr, value).boxed()
}
}