return revisions

This commit is contained in:
Aiden McClelland
2021-09-23 11:49:36 -06:00
parent 67f2ec82cf
commit 71109f1f90
5 changed files with 48 additions and 33 deletions

View File

@@ -39,8 +39,8 @@ pub trait DbHandle: Send + Sync {
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error>;
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error>;
) -> Result<Option<Arc<Revision>>, Error>;
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>;
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -63,7 +63,7 @@ pub trait DbHandle: Send + Sync {
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error>;
) -> Result<Option<Arc<Revision>>, Error>;
}
#[async_trait]
impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
@@ -118,10 +118,10 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
) -> Result<Option<Arc<Revision>>, Error> {
(*self).put_value(ptr, value).await
}
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
(*self).apply(patch).await
}
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
@@ -150,7 +150,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
) -> Result<Option<Arc<Revision>>, Error> {
(*self).put(ptr, value).await
}
}
@@ -219,13 +219,11 @@ impl DbHandle for PatchDbHandle {
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
self.db.put(ptr, value, None).await?;
Ok(())
) -> Result<Option<Arc<Revision>>, Error> {
self.db.put(ptr, value, None).await
}
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
self.db.apply(patch, None, None).await?;
Ok(())
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
self.db.apply(patch, None, None).await
}
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&mut self,
@@ -267,8 +265,7 @@ impl DbHandle for PatchDbHandle {
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
self.db.put(ptr, value, None).await?;
Ok(())
) -> Result<Option<Arc<Revision>>, Error> {
self.db.put(ptr, value, None).await
}
}

View File

@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::hash::Hash;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use indexmap::{IndexMap, IndexSet};
use json_patch::{Patch, PatchOperation, RemoveOperation};
@@ -9,8 +10,8 @@ use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{locker::LockType, DbHandle};
use crate::{DiffPatch, Error};
use crate::locker::LockType;
use crate::{DbHandle, DiffPatch, Error, Revision};
#[derive(Debug)]
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
@@ -99,7 +100,11 @@ impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
pub async fn put<Db: DbHandle>(
&self,
db: &mut Db,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
self.lock(db, LockType::Write).await;
db.put(&self.ptr, value).await
}
@@ -278,7 +283,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
}
}
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref(), LockType::Write, true).await;
db.put(self.as_ref(), &Value::Null).await
}
@@ -287,7 +292,11 @@ impl<T> OptionModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
{
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
pub async fn put<Db: DbHandle>(
&self,
db: &mut Db,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref(), LockType::Write, true).await;
db.put(self.as_ref(), value).await
}

View File

@@ -157,7 +157,7 @@ impl Store {
ptr: &JsonPointer<S, V>,
value: &T,
expire_id: Option<String>,
) -> Result<Arc<Revision>, Error> {
) -> Result<Option<Arc<Revision>>, Error> {
let mut patch = diff(
ptr.get(self.get_data()?).unwrap_or(&Value::Null),
&serde_json::to_value(value)?,
@@ -169,9 +169,13 @@ impl Store {
&mut self,
patch: DiffPatch,
expire_id: Option<String>,
) -> Result<Arc<Revision>, Error> {
) -> Result<Option<Arc<Revision>>, Error> {
use tokio::io::AsyncWriteExt;
if (patch.0).0.is_empty() && expire_id.is_none() {
return Ok(None);
}
self.check_cache_corrupted()?;
let patch_bin = serde_cbor::to_vec(&*patch)?;
json_patch::patch(self.get_data_mut()?, &*patch)?;
@@ -197,7 +201,7 @@ impl Store {
expire_id,
});
Ok(res)
Ok(Some(res))
}
}
@@ -248,10 +252,12 @@ impl PatchDb {
ptr: &JsonPointer<S, V>,
value: &T,
expire_id: Option<String>,
) -> Result<Arc<Revision>, Error> {
) -> Result<Option<Arc<Revision>>, Error> {
let mut store = self.store.write().await;
let rev = store.put(ptr, value, expire_id).await?;
self.subscriber.send(rev.clone()).unwrap_or_default();
if let Some(rev) = rev.as_ref() {
self.subscriber.send(rev.clone()).unwrap_or_default();
}
Ok(rev)
}
pub async fn apply(
@@ -259,14 +265,16 @@ impl PatchDb {
patch: DiffPatch,
expire_id: Option<String>,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Arc<Revision>, Error> {
) -> Result<Option<Arc<Revision>>, Error> {
let mut store = if let Some(store_write_lock) = store_write_lock {
store_write_lock
} else {
self.store.write().await
};
let rev = store.apply(patch, expire_id).await?;
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors
if let Some(rev) = rev.as_ref() {
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors
}
Ok(rev)
}
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {

View File

@@ -38,6 +38,7 @@ async fn put_string_into_root(db: PatchDb, s: String) -> Arc<Revision> {
db.put(&JsonPointer::<&'static str>::default(), &s, None)
.await
.unwrap()
.unwrap()
}
#[tokio::test]

View File

@@ -42,7 +42,7 @@ impl Transaction<&mut PatchDbHandle> {
.db
.apply(self.updates, expire_id, Some(store))
.await?;
Ok(Some(rev))
Ok(rev)
}
}
pub async fn abort(mut self) -> Result<DiffPatch, Error> {
@@ -155,12 +155,12 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
) -> Result<Option<Arc<Revision>>, Error> {
let old = self.get_value(ptr, None).await?;
let mut patch = crate::patch::diff(&old, &value);
patch.prepend(ptr);
self.updates.append(patch);
Ok(())
Ok(None)
}
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&mut self,
@@ -202,11 +202,11 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
) -> Result<Option<Arc<Revision>>, Error> {
self.put_value(ptr, &serde_json::to_value(value)?).await
}
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
self.updates.append(patch);
Ok(())
Ok(None)
}
}