mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-30 12:01:57 +00:00
minor refactor of transactions
This commit is contained in:
@@ -162,6 +162,11 @@ fn build_model_struct(
|
|||||||
#model_name(#inner_model::from(ptr))
|
#model_name(#inner_model::from(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
|
||||||
|
fn from(model: #model_name) -> Self {
|
||||||
|
model.0.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
||||||
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
||||||
self.0.as_ref()
|
self.0.as_ref()
|
||||||
@@ -242,6 +247,11 @@ fn build_model_struct(
|
|||||||
#model_name(From::from(ptr))
|
#model_name(From::from(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
|
||||||
|
fn from(model: #model_name) -> Self {
|
||||||
|
model.0.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
||||||
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
||||||
self.0.as_ref()
|
self.0.as_ref()
|
||||||
@@ -281,6 +291,11 @@ fn build_model_enum(base: &DeriveInput, _: &DataEnum, model_name: Option<Ident>)
|
|||||||
#model_name(From::from(ptr))
|
#model_name(From::from(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl From<#model_name> for patch_db::json_ptr::JsonPointer {
|
||||||
|
fn from(model: #model_name) -> Self {
|
||||||
|
model.0.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
impl AsRef<patch_db::json_ptr::JsonPointer> for #model_name {
|
||||||
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
fn as_ref(&self) -> &patch_db::json_ptr::JsonPointer {
|
||||||
self.0.as_ref()
|
self.0.as_ref()
|
||||||
|
|||||||
247
patch-db/src/handle.rs
Normal file
247
patch-db/src/handle.rs
Normal file
@@ -0,0 +1,247 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use json_ptr::{JsonPointer, SegList};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::Value;
|
||||||
|
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
locker::{LockType, LockerGuard},
|
||||||
|
Locker, PatchDb, Revision, Store, Transaction,
|
||||||
|
};
|
||||||
|
use crate::{patch::DiffPatch, Error};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait DbHandle: Sized + Send + Sync {
|
||||||
|
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
|
||||||
|
fn rebase(&mut self) -> Result<(), Error>;
|
||||||
|
fn store(&self) -> Arc<RwLock<Store>>;
|
||||||
|
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||||
|
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
|
||||||
|
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||||
|
) -> Result<bool, Error>;
|
||||||
|
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||||
|
) -> Result<Value, Error>;
|
||||||
|
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
value: &Value,
|
||||||
|
) -> Result<(), Error>;
|
||||||
|
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error>;
|
||||||
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
lock: LockType,
|
||||||
|
) -> ();
|
||||||
|
async fn get<
|
||||||
|
T: for<'de> Deserialize<'de>,
|
||||||
|
S: AsRef<str> + Send + Sync,
|
||||||
|
V: SegList + Send + Sync,
|
||||||
|
>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
) -> Result<T, Error>;
|
||||||
|
async fn put<
|
||||||
|
T: Serialize + Send + Sync,
|
||||||
|
S: AsRef<str> + Send + Sync,
|
||||||
|
V: SegList + Send + Sync,
|
||||||
|
>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
value: &T,
|
||||||
|
) -> Result<(), Error>;
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl<Handle: DbHandle + Send + Sync> DbHandle for &mut Handle {
|
||||||
|
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||||
|
let Transaction {
|
||||||
|
locks,
|
||||||
|
updates,
|
||||||
|
sub,
|
||||||
|
..
|
||||||
|
} = (*self).begin().await?;
|
||||||
|
Ok(Transaction {
|
||||||
|
parent: self,
|
||||||
|
locks,
|
||||||
|
updates,
|
||||||
|
sub,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn rebase(&mut self) -> Result<(), Error> {
|
||||||
|
(*self).rebase()
|
||||||
|
}
|
||||||
|
fn store(&self) -> Arc<RwLock<Store>> {
|
||||||
|
(**self).store()
|
||||||
|
}
|
||||||
|
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||||
|
(**self).subscribe()
|
||||||
|
}
|
||||||
|
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||||
|
(*self).locker_and_locks()
|
||||||
|
}
|
||||||
|
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||||
|
) -> Result<bool, Error> {
|
||||||
|
(*self).exists(ptr, store_read_lock).await
|
||||||
|
}
|
||||||
|
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||||
|
) -> Result<Value, Error> {
|
||||||
|
(*self).get_value(ptr, store_read_lock).await
|
||||||
|
}
|
||||||
|
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
value: &Value,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
(*self).put_value(ptr, value).await
|
||||||
|
}
|
||||||
|
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
||||||
|
(*self).apply(patch).await
|
||||||
|
}
|
||||||
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
lock: LockType,
|
||||||
|
) {
|
||||||
|
(*self).lock(ptr, lock).await
|
||||||
|
}
|
||||||
|
async fn get<
|
||||||
|
T: for<'de> Deserialize<'de>,
|
||||||
|
S: AsRef<str> + Send + Sync,
|
||||||
|
V: SegList + Send + Sync,
|
||||||
|
>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
) -> Result<T, Error> {
|
||||||
|
(*self).get(ptr).await
|
||||||
|
}
|
||||||
|
async fn put<
|
||||||
|
T: Serialize + Send + Sync,
|
||||||
|
S: AsRef<str> + Send + Sync,
|
||||||
|
V: SegList + Send + Sync,
|
||||||
|
>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
value: &T,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
(*self).put(ptr, value).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PatchDbHandle {
|
||||||
|
pub(crate) db: PatchDb,
|
||||||
|
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl DbHandle for PatchDbHandle {
|
||||||
|
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||||
|
Ok(Transaction {
|
||||||
|
sub: self.subscribe(),
|
||||||
|
parent: self,
|
||||||
|
locks: Vec::new(),
|
||||||
|
updates: DiffPatch::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn rebase(&mut self) -> Result<(), Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
fn store(&self) -> Arc<RwLock<Store>> {
|
||||||
|
self.db.store.clone()
|
||||||
|
}
|
||||||
|
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||||
|
self.db.subscribe()
|
||||||
|
}
|
||||||
|
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||||
|
(&self.db.locker, vec![self.locks.as_mut_slice()])
|
||||||
|
}
|
||||||
|
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||||
|
) -> Result<bool, Error> {
|
||||||
|
if let Some(lock) = store_read_lock {
|
||||||
|
lock.exists(ptr)
|
||||||
|
} else {
|
||||||
|
self.db.exists(ptr).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||||
|
) -> Result<Value, Error> {
|
||||||
|
if let Some(lock) = store_read_lock {
|
||||||
|
lock.get(ptr)
|
||||||
|
} else {
|
||||||
|
self.db.get(ptr).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
value: &Value,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
self.db.put(ptr, value).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
||||||
|
self.db.apply(patch, None).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
lock: LockType,
|
||||||
|
) {
|
||||||
|
match lock {
|
||||||
|
LockType::Read => {
|
||||||
|
self.db
|
||||||
|
.locker
|
||||||
|
.add_read_lock(ptr, &mut self.locks, &mut [])
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
LockType::Write => {
|
||||||
|
self.db
|
||||||
|
.locker
|
||||||
|
.add_write_lock(ptr, &mut self.locks, &mut [])
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
LockType::None => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn get<
|
||||||
|
T: for<'de> Deserialize<'de>,
|
||||||
|
S: AsRef<str> + Send + Sync,
|
||||||
|
V: SegList + Send + Sync,
|
||||||
|
>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
) -> Result<T, Error> {
|
||||||
|
self.db.get(ptr).await
|
||||||
|
}
|
||||||
|
async fn put<
|
||||||
|
T: Serialize + Send + Sync,
|
||||||
|
S: AsRef<str> + Send + Sync,
|
||||||
|
V: SegList + Send + Sync,
|
||||||
|
>(
|
||||||
|
&mut self,
|
||||||
|
ptr: &JsonPointer<S, V>,
|
||||||
|
value: &T,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
self.db.put(ptr, value).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,6 +6,7 @@ use tokio::sync::broadcast::error::TryRecvError;
|
|||||||
|
|
||||||
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
|
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
|
||||||
|
|
||||||
|
mod handle;
|
||||||
mod locker;
|
mod locker;
|
||||||
mod model;
|
mod model;
|
||||||
mod patch;
|
mod patch;
|
||||||
@@ -15,6 +16,7 @@ mod transaction;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test;
|
mod test;
|
||||||
|
|
||||||
|
pub use handle::{DbHandle, PatchDbHandle};
|
||||||
pub use json_ptr;
|
pub use json_ptr;
|
||||||
pub use locker::{LockType, Locker};
|
pub use locker::{LockType, Locker};
|
||||||
pub use model::{
|
pub use model::{
|
||||||
@@ -23,7 +25,7 @@ pub use model::{
|
|||||||
pub use patch::Revision;
|
pub use patch::Revision;
|
||||||
pub use patch_db_macro::HasModel;
|
pub use patch_db_macro::HasModel;
|
||||||
pub use store::{PatchDb, Store};
|
pub use store::{PatchDb, Store};
|
||||||
pub use transaction::{Checkpoint, SubTransaction, Transaction};
|
pub use transaction::Transaction;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
|||||||
@@ -7,9 +7,8 @@ use json_ptr::JsonPointer;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::locker::LockType;
|
|
||||||
use crate::transaction::Checkpoint;
|
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
use crate::{locker::LockType, DbHandle};
|
||||||
|
|
||||||
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
|
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
|
||||||
impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelData<T> {
|
impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelData<T> {
|
||||||
@@ -25,13 +24,13 @@ pub struct ModelDataMut<T: Serialize + for<'de> Deserialize<'de>> {
|
|||||||
ptr: JsonPointer,
|
ptr: JsonPointer,
|
||||||
}
|
}
|
||||||
impl<T: Serialize + for<'de> Deserialize<'de>> ModelDataMut<T> {
|
impl<T: Serialize + for<'de> Deserialize<'de>> ModelDataMut<T> {
|
||||||
pub async fn save<Tx: Checkpoint>(self, tx: &mut Tx) -> Result<(), Error> {
|
pub async fn save<Db: DbHandle>(self, db: &mut Db) -> Result<(), Error> {
|
||||||
let current = serde_json::to_value(&self.current)?;
|
let current = serde_json::to_value(&self.current)?;
|
||||||
let mut diff = crate::patch::diff(&self.original, ¤t);
|
let mut diff = crate::patch::diff(&self.original, ¤t);
|
||||||
let target = tx.get_value(&self.ptr, None).await?;
|
let target = db.get_value(&self.ptr, None).await?;
|
||||||
diff.rebase(&crate::patch::diff(&self.original, &target));
|
diff.rebase(&crate::patch::diff(&self.original, &target));
|
||||||
diff.prepend(&self.ptr);
|
diff.prepend(&self.ptr);
|
||||||
tx.apply(diff);
|
db.apply(diff).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -56,17 +55,18 @@ impl<T> Model<T>
|
|||||||
where
|
where
|
||||||
T: Serialize + for<'de> Deserialize<'de>,
|
T: Serialize + for<'de> Deserialize<'de>,
|
||||||
{
|
{
|
||||||
pub async fn lock<Tx: Checkpoint>(&self, tx: &mut Tx, lock: LockType) {
|
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, lock: LockType) {
|
||||||
tx.lock(&self.ptr, lock).await
|
db.lock(&self.ptr, lock).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get<Tx: Checkpoint>(&self, tx: &mut Tx) -> Result<ModelData<T>, Error> {
|
pub async fn get<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelData<T>, Error> {
|
||||||
Ok(ModelData(tx.get(&self.ptr, LockType::Read).await?))
|
db.lock(&self.ptr, LockType::Read).await;
|
||||||
|
Ok(ModelData(db.get(&self.ptr).await?))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_mut<Tx: Checkpoint>(&self, tx: &mut Tx) -> Result<ModelDataMut<T>, Error> {
|
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
|
||||||
self.lock(tx, LockType::Write).await;
|
self.lock(db, LockType::Write).await;
|
||||||
let original = tx.get_value(&self.ptr, None).await?;
|
let original = db.get_value(&self.ptr, None).await?;
|
||||||
let current = serde_json::from_value(original.clone())?;
|
let current = serde_json::from_value(original.clone())?;
|
||||||
Ok(ModelDataMut {
|
Ok(ModelDataMut {
|
||||||
original,
|
original,
|
||||||
@@ -88,9 +88,9 @@ impl<T> Model<T>
|
|||||||
where
|
where
|
||||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
|
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
|
||||||
{
|
{
|
||||||
pub async fn put<Tx: Checkpoint>(&self, tx: &mut Tx, value: &T) -> Result<(), Error> {
|
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
|
||||||
self.lock(tx, LockType::Write).await;
|
self.lock(db, LockType::Write).await;
|
||||||
tx.put(&self.ptr, value).await
|
db.put(&self.ptr, value).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl<T> From<JsonPointer> for Model<T>
|
impl<T> From<JsonPointer> for Model<T>
|
||||||
@@ -132,8 +132,8 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait HasModel {
|
pub trait HasModel: Serialize + for<'de> Deserialize<'de> {
|
||||||
type Model: From<JsonPointer> + AsRef<JsonPointer>;
|
type Model: From<JsonPointer> + AsRef<JsonPointer> + Into<JsonPointer> + From<Model<Self>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -162,6 +162,11 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for
|
|||||||
BoxModel(T::Model::from(ptr))
|
BoxModel(T::Model::from(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<BoxModel<T>> for JsonPointer {
|
||||||
|
fn from(model: BoxModel<T>) -> Self {
|
||||||
|
model.0.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
|
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
|
||||||
type Model = BoxModel<T>;
|
type Model = BoxModel<T>;
|
||||||
}
|
}
|
||||||
@@ -169,13 +174,32 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
|
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
|
||||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||||
pub async fn check<Tx: Checkpoint>(self, tx: &mut Tx) -> Result<Option<T::Model>, Error> {
|
pub async fn exists<Db: DbHandle>(&self, db: &mut Db) -> Result<bool, Error> {
|
||||||
Ok(if tx.exists(self.0.as_ref(), None).await? {
|
db.lock(self.as_ref(), LockType::Read).await;
|
||||||
|
Ok(db.exists(&self.as_ref(), None).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
|
||||||
|
Ok(if self.exists(db).await? {
|
||||||
Some(self.0)
|
Some(self.0)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
|
||||||
|
db.lock(self.as_ref(), LockType::Write).await;
|
||||||
|
db.put(self.as_ref(), &Value::Null).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> OptionModel<T>
|
||||||
|
where
|
||||||
|
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
|
||||||
|
{
|
||||||
|
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
|
||||||
|
db.lock(self.as_ref(), LockType::Write).await;
|
||||||
|
db.put(self.as_ref(), value).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Option<T>>>
|
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Option<T>>>
|
||||||
for OptionModel<T>
|
for OptionModel<T>
|
||||||
@@ -189,8 +213,18 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for
|
|||||||
OptionModel(T::Model::from(ptr))
|
OptionModel(T::Model::from(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<OptionModel<T>> for JsonPointer {
|
||||||
|
fn from(model: OptionModel<T>) -> Self {
|
||||||
|
model.0.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> AsRef<JsonPointer> for OptionModel<T> {
|
||||||
|
fn as_ref(&self) -> &JsonPointer {
|
||||||
|
self.0.as_ref()
|
||||||
|
}
|
||||||
|
}
|
||||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Option<T> {
|
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Option<T> {
|
||||||
type Model = BoxModel<T>;
|
type Model = OptionModel<T>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -216,6 +250,11 @@ impl<T: Serialize + for<'de> Deserialize<'de>> From<JsonPointer> for VecModel<T>
|
|||||||
VecModel(From::from(ptr))
|
VecModel(From::from(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl<T: Serialize + for<'de> Deserialize<'de>> From<VecModel<T>> for JsonPointer {
|
||||||
|
fn from(model: VecModel<T>) -> Self {
|
||||||
|
model.0.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
impl<T> AsRef<JsonPointer> for VecModel<T>
|
impl<T> AsRef<JsonPointer> for VecModel<T>
|
||||||
where
|
where
|
||||||
T: Serialize + for<'de> Deserialize<'de>,
|
T: Serialize + for<'de> Deserialize<'de>,
|
||||||
@@ -282,6 +321,15 @@ where
|
|||||||
self.child(idx.as_ref()).into()
|
self.child(idx.as_ref()).into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl<T> From<Model<T>> for MapModel<T>
|
||||||
|
where
|
||||||
|
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||||
|
T::Value: Serialize + for<'de> Deserialize<'de>,
|
||||||
|
{
|
||||||
|
fn from(model: Model<T>) -> Self {
|
||||||
|
MapModel(model)
|
||||||
|
}
|
||||||
|
}
|
||||||
impl<T> From<JsonPointer> for MapModel<T>
|
impl<T> From<JsonPointer> for MapModel<T>
|
||||||
where
|
where
|
||||||
T: Serialize + for<'de> Deserialize<'de> + Map,
|
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||||
@@ -291,6 +339,15 @@ where
|
|||||||
MapModel(From::from(ptr))
|
MapModel(From::from(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl<T> From<MapModel<T>> for JsonPointer
|
||||||
|
where
|
||||||
|
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||||
|
T::Value: Serialize + for<'de> Deserialize<'de>,
|
||||||
|
{
|
||||||
|
fn from(model: MapModel<T>) -> Self {
|
||||||
|
model.0.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
impl<T> AsRef<JsonPointer> for MapModel<T>
|
impl<T> AsRef<JsonPointer> for MapModel<T>
|
||||||
where
|
where
|
||||||
T: Serialize + for<'de> Deserialize<'de> + Map,
|
T: Serialize + for<'de> Deserialize<'de> + Map,
|
||||||
@@ -309,8 +366,8 @@ where
|
|||||||
}
|
}
|
||||||
impl<K, V> HasModel for BTreeMap<K, V>
|
impl<K, V> HasModel for BTreeMap<K, V>
|
||||||
where
|
where
|
||||||
K: Serialize + for<'de> Deserialize<'de> + Hash + Eq + AsRef<str>,
|
K: Serialize + for<'de> Deserialize<'de> + Ord + AsRef<str>,
|
||||||
V: Serialize + for<'de> Deserialize<'de>,
|
V: Serialize + for<'de> Deserialize<'de>,
|
||||||
{
|
{
|
||||||
type Model = MapModel<HashMap<K, V>>;
|
type Model = MapModel<BTreeMap<K, V>>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,10 +14,9 @@ use tokio::fs::File;
|
|||||||
use tokio::sync::broadcast::{Receiver, Sender};
|
use tokio::sync::broadcast::{Receiver, Sender};
|
||||||
use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};
|
use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};
|
||||||
|
|
||||||
use crate::locker::Locker;
|
|
||||||
use crate::patch::{diff, DiffPatch, Revision};
|
use crate::patch::{diff, DiffPatch, Revision};
|
||||||
use crate::transaction::Transaction;
|
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
use crate::{locker::Locker, PatchDbHandle};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Qutex<()>>> = Mutex::new(HashMap::new());
|
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Qutex<()>>> = Mutex::new(HashMap::new());
|
||||||
@@ -31,7 +30,7 @@ pub struct Store {
|
|||||||
revision: u64,
|
revision: u64,
|
||||||
}
|
}
|
||||||
impl Store {
|
impl Store {
|
||||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||||
let (_lock, path) = {
|
let (_lock, path) = {
|
||||||
if !path.as_ref().exists() {
|
if !path.as_ref().exists() {
|
||||||
tokio::fs::File::create(path.as_ref()).await?;
|
tokio::fs::File::create(path.as_ref()).await?;
|
||||||
@@ -123,13 +122,13 @@ impl Store {
|
|||||||
self.file.unlock(true).map_err(|e| e.1)?;
|
self.file.unlock(true).map_err(|e| e.1)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn exists<S: AsRef<str>, V: SegList>(
|
pub(crate) fn exists<S: AsRef<str>, V: SegList>(
|
||||||
&self,
|
&self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
) -> Result<bool, Error> {
|
) -> Result<bool, Error> {
|
||||||
Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null)
|
Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null)
|
||||||
}
|
}
|
||||||
pub fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
|
pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
|
||||||
&self,
|
&self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
) -> Result<T, Error> {
|
) -> Result<T, Error> {
|
||||||
@@ -137,10 +136,10 @@ impl Store {
|
|||||||
ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(),
|
ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(),
|
||||||
)?)
|
)?)
|
||||||
}
|
}
|
||||||
pub fn dump(&self) -> Value {
|
pub(crate) fn dump(&self) -> Value {
|
||||||
self.get_data().unwrap().clone()
|
self.get_data().unwrap().clone()
|
||||||
}
|
}
|
||||||
pub async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
|
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
|
||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
@@ -152,7 +151,7 @@ impl Store {
|
|||||||
patch.prepend(ptr);
|
patch.prepend(ptr);
|
||||||
self.apply(patch).await
|
self.apply(patch).await
|
||||||
}
|
}
|
||||||
pub async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
|
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
self.check_cache_corrupted()?;
|
self.check_cache_corrupted()?;
|
||||||
@@ -196,6 +195,9 @@ impl PatchDb {
|
|||||||
subscriber: Arc::new(subscriber),
|
subscriber: Arc::new(subscriber),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
pub async fn dump(&self) -> Value {
|
||||||
|
self.store.read().await.dump()
|
||||||
|
}
|
||||||
pub async fn exists<S: AsRef<str>, V: SegList>(
|
pub async fn exists<S: AsRef<str>, V: SegList>(
|
||||||
&self,
|
&self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
@@ -235,12 +237,10 @@ impl PatchDb {
|
|||||||
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||||
self.subscriber.subscribe()
|
self.subscriber.subscribe()
|
||||||
}
|
}
|
||||||
pub fn begin(&self) -> Transaction {
|
pub fn handle(&self) -> PatchDbHandle {
|
||||||
Transaction {
|
PatchDbHandle {
|
||||||
db: self.clone(),
|
db: self.clone(),
|
||||||
locks: Vec::new(),
|
locks: Vec::new(),
|
||||||
updates: DiffPatch::default(),
|
|
||||||
sub: self.subscribe(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use serde_json::Value;
|
|||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::runtime::Builder;
|
use tokio::runtime::Builder;
|
||||||
|
|
||||||
use crate as patch_db;
|
use crate::{self as patch_db, DbHandle};
|
||||||
|
|
||||||
async fn init_db(db_name: String) -> PatchDb {
|
async fn init_db(db_name: String) -> PatchDb {
|
||||||
cleanup_db(&db_name).await;
|
cleanup_db(&db_name).await;
|
||||||
@@ -54,7 +54,8 @@ async fn basic() {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn transaction() {
|
async fn transaction() {
|
||||||
let db = init_db("test.db".to_string()).await;
|
let db = init_db("test.db".to_string()).await;
|
||||||
let mut tx = db.begin();
|
let mut handle = db.handle();
|
||||||
|
let mut tx = handle.begin().await.unwrap();
|
||||||
let ptr: JsonPointer = "/b/b".parse().unwrap();
|
let ptr: JsonPointer = "/b/b".parse().unwrap();
|
||||||
tx.put(&ptr, &(2 as usize)).await.unwrap();
|
tx.put(&ptr, &(2 as usize)).await.unwrap();
|
||||||
tx.put(&ptr, &(1 as usize)).await.unwrap();
|
tx.put(&ptr, &(1 as usize)).await.unwrap();
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::future::{BoxFuture, FutureExt};
|
use async_trait::async_trait;
|
||||||
use json_ptr::{JsonPointer, SegList};
|
use json_ptr::{JsonPointer, SegList};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
@@ -8,258 +8,59 @@ use tokio::sync::broadcast::error::TryRecvError;
|
|||||||
use tokio::sync::broadcast::Receiver;
|
use tokio::sync::broadcast::Receiver;
|
||||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||||
|
|
||||||
use crate::locker::{LockType, Locker, LockerGuard};
|
use crate::store::Store;
|
||||||
use crate::patch::{DiffPatch, Revision};
|
|
||||||
use crate::store::{PatchDb, Store};
|
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
use crate::{
|
||||||
|
locker::{LockType, Locker, LockerGuard},
|
||||||
|
DbHandle,
|
||||||
|
};
|
||||||
|
use crate::{
|
||||||
|
patch::{DiffPatch, Revision},
|
||||||
|
PatchDbHandle,
|
||||||
|
};
|
||||||
|
|
||||||
pub trait Checkpoint: Sized {
|
pub struct Transaction<Parent: DbHandle> {
|
||||||
fn rebase(&mut self) -> Result<(), Error>;
|
pub(crate) parent: Parent,
|
||||||
fn exists<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
|
||||||
&'a mut self,
|
|
||||||
ptr: &'a JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'a, Store>>,
|
|
||||||
) -> BoxFuture<'a, Result<bool, Error>>;
|
|
||||||
fn get_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
|
||||||
&'a mut self,
|
|
||||||
ptr: &'a JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'a, Store>>,
|
|
||||||
) -> BoxFuture<'a, Result<Value, Error>>;
|
|
||||||
fn put_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
|
||||||
&'a mut self,
|
|
||||||
ptr: &'a JsonPointer<S, V>,
|
|
||||||
value: &'a Value,
|
|
||||||
) -> BoxFuture<'a, Result<(), Error>>;
|
|
||||||
fn store(&self) -> Arc<RwLock<Store>>;
|
|
||||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
|
||||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
|
|
||||||
fn apply(&mut self, patch: DiffPatch);
|
|
||||||
fn lock<'a, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
|
||||||
&'a mut self,
|
|
||||||
ptr: &'a JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) -> BoxFuture<'a, ()>;
|
|
||||||
fn get<
|
|
||||||
'a,
|
|
||||||
T: for<'de> Deserialize<'de> + 'a,
|
|
||||||
S: AsRef<str> + Clone + Send + Sync + 'a,
|
|
||||||
V: SegList + Clone + Send + Sync + 'a,
|
|
||||||
>(
|
|
||||||
&'a mut self,
|
|
||||||
ptr: &'a JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) -> BoxFuture<'a, Result<T, Error>>;
|
|
||||||
fn put<
|
|
||||||
'a,
|
|
||||||
T: Serialize + Send + Sync + 'a,
|
|
||||||
S: AsRef<str> + Send + Sync + 'a,
|
|
||||||
V: SegList + Send + Sync + 'a,
|
|
||||||
>(
|
|
||||||
&'a mut self,
|
|
||||||
ptr: &'a JsonPointer<S, V>,
|
|
||||||
value: &'a T,
|
|
||||||
) -> BoxFuture<'a, Result<(), Error>>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Transaction {
|
|
||||||
pub(crate) db: PatchDb,
|
|
||||||
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
|
pub(crate) locks: Vec<(JsonPointer, LockerGuard)>,
|
||||||
pub(crate) updates: DiffPatch,
|
pub(crate) updates: DiffPatch,
|
||||||
pub(crate) sub: Receiver<Arc<Revision>>,
|
pub(crate) sub: Receiver<Arc<Revision>>,
|
||||||
}
|
}
|
||||||
impl Transaction {
|
impl Transaction<&mut PatchDbHandle> {
|
||||||
pub fn rebase(&mut self) -> Result<(), Error> {
|
|
||||||
while let Some(rev) = match self.sub.try_recv() {
|
|
||||||
Ok(a) => Some(a),
|
|
||||||
Err(TryRecvError::Empty) => None,
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
} {
|
|
||||||
self.updates.rebase(&rev.patch);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
async fn exists<S: AsRef<str>, V: SegList>(
|
|
||||||
&mut self,
|
|
||||||
ptr: &JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
|
||||||
) -> Result<bool, Error> {
|
|
||||||
let exists = {
|
|
||||||
let store_lock = self.db.store.clone();
|
|
||||||
let store = if let Some(store_read_lock) = store_read_lock {
|
|
||||||
store_read_lock
|
|
||||||
} else {
|
|
||||||
store_lock.read().await
|
|
||||||
};
|
|
||||||
self.rebase()?;
|
|
||||||
ptr.get(store.get_data()?).unwrap_or(&Value::Null) != &Value::Null
|
|
||||||
};
|
|
||||||
Ok(self.updates.for_path(ptr).exists().unwrap_or(exists))
|
|
||||||
}
|
|
||||||
async fn get_value<S: AsRef<str>, V: SegList>(
|
|
||||||
&mut self,
|
|
||||||
ptr: &JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
|
||||||
) -> Result<Value, Error> {
|
|
||||||
let mut data = {
|
|
||||||
let store_lock = self.db.store.clone();
|
|
||||||
let store = if let Some(store_read_lock) = store_read_lock {
|
|
||||||
store_read_lock
|
|
||||||
} else {
|
|
||||||
store_lock.read().await
|
|
||||||
};
|
|
||||||
self.rebase()?;
|
|
||||||
ptr.get(store.get_data()?).cloned().unwrap_or_default()
|
|
||||||
};
|
|
||||||
json_patch::patch(&mut data, &*self.updates.for_path(ptr))?;
|
|
||||||
Ok(data)
|
|
||||||
}
|
|
||||||
async fn put_value<S: AsRef<str>, V: SegList>(
|
|
||||||
&mut self,
|
|
||||||
ptr: &JsonPointer<S, V>,
|
|
||||||
value: &Value,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let old = Transaction::get_value(self, ptr, None).await?;
|
|
||||||
let mut patch = crate::patch::diff(&old, &value);
|
|
||||||
patch.prepend(ptr);
|
|
||||||
self.updates.append(patch);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
|
|
||||||
&mut self,
|
|
||||||
ptr: &JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) {
|
|
||||||
match lock {
|
|
||||||
LockType::None => (),
|
|
||||||
LockType::Read => {
|
|
||||||
self.db
|
|
||||||
.locker
|
|
||||||
.add_read_lock(ptr, &mut self.locks, &mut [])
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
LockType::Write => {
|
|
||||||
self.db
|
|
||||||
.locker
|
|
||||||
.add_write_lock(ptr, &mut self.locks, &mut [])
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str> + Clone, V: SegList + Clone>(
|
|
||||||
&mut self,
|
|
||||||
ptr: &JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) -> Result<T, Error> {
|
|
||||||
self.lock(ptr, lock).await;
|
|
||||||
Ok(serde_json::from_value(
|
|
||||||
Transaction::get_value(self, ptr, None).await?,
|
|
||||||
)?)
|
|
||||||
}
|
|
||||||
pub async fn put<T: Serialize, S: AsRef<str>, V: SegList>(
|
|
||||||
&mut self,
|
|
||||||
ptr: &JsonPointer<S, V>,
|
|
||||||
value: &T,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await
|
|
||||||
}
|
|
||||||
pub async fn commit(mut self) -> Result<Arc<Revision>, Error> {
|
pub async fn commit(mut self) -> Result<Arc<Revision>, Error> {
|
||||||
let store_lock = self.db.store.clone();
|
let store_lock = self.parent.store();
|
||||||
let store = store_lock.write().await;
|
|
||||||
self.rebase()?;
|
|
||||||
self.db.apply(self.updates, Some(store)).await
|
|
||||||
}
|
|
||||||
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
|
|
||||||
let store_lock = self.db.store.clone();
|
|
||||||
let store = store_lock.read().await;
|
let store = store_lock.read().await;
|
||||||
self.rebase()?;
|
self.rebase()?;
|
||||||
let sub = self.db.subscribe();
|
let rev = self.parent.db.apply(self.updates, None).await?;
|
||||||
drop(store);
|
drop(store);
|
||||||
Ok(SubTransaction {
|
Ok(rev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
|
||||||
|
pub async fn save(mut self) -> Result<(), Error> {
|
||||||
|
let store_lock = self.parent.store();
|
||||||
|
let store = store_lock.read().await;
|
||||||
|
self.rebase()?;
|
||||||
|
self.parent.apply(self.updates).await?;
|
||||||
|
drop(store);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||||
|
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||||
|
let store_lock = self.parent.store();
|
||||||
|
let store = store_lock.read().await;
|
||||||
|
self.rebase()?;
|
||||||
|
let sub = self.parent.subscribe();
|
||||||
|
drop(store);
|
||||||
|
Ok(Transaction {
|
||||||
parent: self,
|
parent: self,
|
||||||
locks: Vec::new(),
|
locks: Vec::new(),
|
||||||
updates: DiffPatch::default(),
|
updates: DiffPatch::default(),
|
||||||
sub,
|
sub,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
|
||||||
impl<'a> Checkpoint for &'a mut Transaction {
|
|
||||||
fn rebase(&mut self) -> Result<(), Error> {
|
fn rebase(&mut self) -> Result<(), Error> {
|
||||||
Transaction::rebase(self)
|
|
||||||
}
|
|
||||||
fn exists<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
|
||||||
) -> BoxFuture<'b, Result<bool, Error>> {
|
|
||||||
Transaction::exists(self, ptr, store_read_lock).boxed()
|
|
||||||
}
|
|
||||||
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
|
||||||
) -> BoxFuture<'b, Result<Value, Error>> {
|
|
||||||
Transaction::get_value(self, ptr, store_read_lock).boxed()
|
|
||||||
}
|
|
||||||
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
value: &'b Value,
|
|
||||||
) -> BoxFuture<'b, Result<(), Error>> {
|
|
||||||
Transaction::put_value(self, ptr, value).boxed()
|
|
||||||
}
|
|
||||||
fn store(&self) -> Arc<RwLock<Store>> {
|
|
||||||
self.db.store.clone()
|
|
||||||
}
|
|
||||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
|
||||||
self.db.subscribe()
|
|
||||||
}
|
|
||||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
|
||||||
(&self.db.locker, vec![&mut self.locks])
|
|
||||||
}
|
|
||||||
fn apply(&mut self, patch: DiffPatch) {
|
|
||||||
self.updates.append(patch)
|
|
||||||
}
|
|
||||||
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) -> BoxFuture<'b, ()> {
|
|
||||||
Transaction::lock(self, ptr, lock).boxed()
|
|
||||||
}
|
|
||||||
fn get<
|
|
||||||
'b,
|
|
||||||
T: for<'de> Deserialize<'de> + 'b,
|
|
||||||
S: AsRef<str> + Clone + Send + Sync + 'b,
|
|
||||||
V: SegList + Clone + Send + Sync + 'b,
|
|
||||||
>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) -> BoxFuture<'b, Result<T, Error>> {
|
|
||||||
Transaction::get(self, ptr, lock).boxed()
|
|
||||||
}
|
|
||||||
fn put<
|
|
||||||
'b,
|
|
||||||
T: Serialize + Send + Sync + 'b,
|
|
||||||
S: AsRef<str> + Send + Sync + 'b,
|
|
||||||
V: SegList + Send + Sync + 'b,
|
|
||||||
>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
value: &'b T,
|
|
||||||
) -> BoxFuture<'b, Result<(), Error>> {
|
|
||||||
Transaction::put(self, ptr, value).boxed()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SubTransaction<Tx: Checkpoint> {
|
|
||||||
parent: Tx,
|
|
||||||
locks: Vec<(JsonPointer, LockerGuard)>,
|
|
||||||
updates: DiffPatch,
|
|
||||||
sub: Receiver<Arc<Revision>>,
|
|
||||||
}
|
|
||||||
impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
|
||||||
pub fn rebase(&mut self) -> Result<(), Error> {
|
|
||||||
self.parent.rebase()?;
|
self.parent.rebase()?;
|
||||||
while let Some(rev) = match self.sub.try_recv() {
|
while let Some(rev) = match self.sub.try_recv() {
|
||||||
Ok(a) => Some(a),
|
Ok(a) => Some(a),
|
||||||
@@ -270,6 +71,17 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
fn store(&self) -> Arc<RwLock<Store>> {
|
||||||
|
self.parent.store()
|
||||||
|
}
|
||||||
|
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||||
|
self.parent.subscribe()
|
||||||
|
}
|
||||||
|
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||||
|
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||||
|
locks.push(&mut self.locks);
|
||||||
|
(locker, locks)
|
||||||
|
}
|
||||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
@@ -310,13 +122,13 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
|||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &Value,
|
value: &Value,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let old = SubTransaction::get_value(self, ptr, None).await?;
|
let old = self.get_value(ptr, None).await?;
|
||||||
let mut patch = crate::patch::diff(&old, &value);
|
let mut patch = crate::patch::diff(&old, &value);
|
||||||
patch.prepend(ptr);
|
patch.prepend(ptr);
|
||||||
self.updates.append(patch);
|
self.updates.append(patch);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
lock: LockType,
|
lock: LockType,
|
||||||
@@ -335,117 +147,29 @@ impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub async fn get<
|
async fn get<
|
||||||
T: for<'de> Deserialize<'de>,
|
T: for<'de> Deserialize<'de>,
|
||||||
S: AsRef<str> + Clone + Send + Sync,
|
S: AsRef<str> + Send + Sync,
|
||||||
V: SegList + Clone + Send + Sync,
|
V: SegList + Send + Sync,
|
||||||
>(
|
>(
|
||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
lock: LockType,
|
|
||||||
) -> Result<T, Error> {
|
) -> Result<T, Error> {
|
||||||
self.lock(ptr, lock).await;
|
Ok(serde_json::from_value(self.get_value(ptr, None).await?)?)
|
||||||
Ok(serde_json::from_value(
|
|
||||||
SubTransaction::get_value(self, ptr, None).await?,
|
|
||||||
)?)
|
|
||||||
}
|
}
|
||||||
pub async fn put<T: Serialize, S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
async fn put<
|
||||||
|
T: Serialize + Send + Sync,
|
||||||
|
S: AsRef<str> + Send + Sync,
|
||||||
|
V: SegList + Send + Sync,
|
||||||
|
>(
|
||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await
|
self.put_value(ptr, &serde_json::to_value(value)?).await
|
||||||
}
|
}
|
||||||
pub async fn commit(mut self) -> Result<(), Error> {
|
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
||||||
let store_lock = self.parent.store();
|
self.updates.append(patch);
|
||||||
let store = store_lock.read().await;
|
|
||||||
self.rebase()?;
|
|
||||||
self.parent.apply(self.updates);
|
|
||||||
drop(store);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
|
|
||||||
let store_lock = self.parent.store();
|
|
||||||
let store = store_lock.read().await;
|
|
||||||
self.rebase()?;
|
|
||||||
let sub = self.parent.subscribe();
|
|
||||||
drop(store);
|
|
||||||
Ok(SubTransaction {
|
|
||||||
parent: self,
|
|
||||||
locks: Vec::new(),
|
|
||||||
updates: DiffPatch::default(),
|
|
||||||
sub,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx> {
|
|
||||||
fn rebase(&mut self) -> Result<(), Error> {
|
|
||||||
SubTransaction::rebase(self)
|
|
||||||
}
|
|
||||||
fn exists<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
|
||||||
) -> BoxFuture<'b, Result<bool, Error>> {
|
|
||||||
SubTransaction::exists(self, ptr, store_read_lock).boxed()
|
|
||||||
}
|
|
||||||
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
|
||||||
) -> BoxFuture<'b, Result<Value, Error>> {
|
|
||||||
SubTransaction::get_value(self, ptr, store_read_lock).boxed()
|
|
||||||
}
|
|
||||||
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
value: &'b Value,
|
|
||||||
) -> BoxFuture<'b, Result<(), Error>> {
|
|
||||||
SubTransaction::put_value(self, ptr, value).boxed()
|
|
||||||
}
|
|
||||||
fn store(&self) -> Arc<RwLock<Store>> {
|
|
||||||
self.parent.store()
|
|
||||||
}
|
|
||||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
|
||||||
self.parent.subscribe()
|
|
||||||
}
|
|
||||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
|
||||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
|
||||||
locks.push(&mut self.locks);
|
|
||||||
(locker, locks)
|
|
||||||
}
|
|
||||||
fn apply(&mut self, patch: DiffPatch) {
|
|
||||||
self.updates.append(patch)
|
|
||||||
}
|
|
||||||
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) -> BoxFuture<'b, ()> {
|
|
||||||
SubTransaction::lock(self, ptr, lock).boxed()
|
|
||||||
}
|
|
||||||
fn get<
|
|
||||||
'b,
|
|
||||||
T: for<'de> Deserialize<'de> + 'b,
|
|
||||||
S: AsRef<str> + Clone + Send + Sync + 'b,
|
|
||||||
V: SegList + Clone + Send + Sync + 'b,
|
|
||||||
>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
lock: LockType,
|
|
||||||
) -> BoxFuture<'b, Result<T, Error>> {
|
|
||||||
SubTransaction::get(self, ptr, lock).boxed()
|
|
||||||
}
|
|
||||||
fn put<
|
|
||||||
'b,
|
|
||||||
T: Serialize + Send + Sync + 'b,
|
|
||||||
S: AsRef<str> + Send + Sync + 'b,
|
|
||||||
V: SegList + Send + Sync + 'b,
|
|
||||||
>(
|
|
||||||
&'b mut self,
|
|
||||||
ptr: &'b JsonPointer<S, V>,
|
|
||||||
value: &'b T,
|
|
||||||
) -> BoxFuture<'b, Result<(), Error>> {
|
|
||||||
SubTransaction::put(self, ptr, value).boxed()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user