mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
return revisions
This commit is contained in:
committed by
Aiden McClelland
parent
a0d9392f77
commit
5b57eb8fe1
@@ -39,8 +39,8 @@ pub trait DbHandle: Send + Sync {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &Value,
|
value: &Value,
|
||||||
) -> Result<(), Error>;
|
) -> Result<Option<Arc<Revision>>, Error>;
|
||||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error>;
|
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>;
|
||||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
@@ -63,7 +63,7 @@ pub trait DbHandle: Send + Sync {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
) -> Result<(), Error>;
|
) -> Result<Option<Arc<Revision>>, Error>;
|
||||||
}
|
}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||||
@@ -118,10 +118,10 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &Value,
|
value: &Value,
|
||||||
) -> Result<(), Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
(*self).put_value(ptr, value).await
|
(*self).put_value(ptr, value).await
|
||||||
}
|
}
|
||||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
(*self).apply(patch).await
|
(*self).apply(patch).await
|
||||||
}
|
}
|
||||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
@@ -150,7 +150,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
) -> Result<(), Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
(*self).put(ptr, value).await
|
(*self).put(ptr, value).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -219,13 +219,11 @@ impl DbHandle for PatchDbHandle {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &Value,
|
value: &Value,
|
||||||
) -> Result<(), Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
self.db.put(ptr, value, None).await?;
|
self.db.put(ptr, value, None).await
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
self.db.apply(patch, None, None).await?;
|
self.db.apply(patch, None, None).await
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -267,8 +265,7 @@ impl DbHandle for PatchDbHandle {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
) -> Result<(), Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
self.db.put(ptr, value, None).await?;
|
self.db.put(ptr, value, None).await
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
|
|||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use indexmap::{IndexMap, IndexSet};
|
use indexmap::{IndexMap, IndexSet};
|
||||||
use json_patch::{Patch, PatchOperation, RemoveOperation};
|
use json_patch::{Patch, PatchOperation, RemoveOperation};
|
||||||
@@ -9,8 +10,8 @@ use json_ptr::JsonPointer;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::{locker::LockType, DbHandle};
|
use crate::locker::LockType;
|
||||||
use crate::{DiffPatch, Error};
|
use crate::{DbHandle, DiffPatch, Error, Revision};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
|
pub struct ModelData<T: Serialize + for<'de> Deserialize<'de>>(T);
|
||||||
@@ -99,7 +100,11 @@ impl<T> Model<T>
|
|||||||
where
|
where
|
||||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
|
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
|
||||||
{
|
{
|
||||||
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
|
pub async fn put<Db: DbHandle>(
|
||||||
|
&self,
|
||||||
|
db: &mut Db,
|
||||||
|
value: &T,
|
||||||
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
self.lock(db, LockType::Write).await;
|
self.lock(db, LockType::Write).await;
|
||||||
db.put(&self.ptr, value).await
|
db.put(&self.ptr, value).await
|
||||||
}
|
}
|
||||||
@@ -278,7 +283,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
|
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
db.lock(self.as_ref(), LockType::Write, true).await;
|
db.lock(self.as_ref(), LockType::Write, true).await;
|
||||||
db.put(self.as_ref(), &Value::Null).await
|
db.put(self.as_ref(), &Value::Null).await
|
||||||
}
|
}
|
||||||
@@ -287,7 +292,11 @@ impl<T> OptionModel<T>
|
|||||||
where
|
where
|
||||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
|
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + HasModel,
|
||||||
{
|
{
|
||||||
pub async fn put<Db: DbHandle>(&self, db: &mut Db, value: &T) -> Result<(), Error> {
|
pub async fn put<Db: DbHandle>(
|
||||||
|
&self,
|
||||||
|
db: &mut Db,
|
||||||
|
value: &T,
|
||||||
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
db.lock(self.as_ref(), LockType::Write, true).await;
|
db.lock(self.as_ref(), LockType::Write, true).await;
|
||||||
db.put(self.as_ref(), value).await
|
db.put(self.as_ref(), value).await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -157,7 +157,7 @@ impl Store {
|
|||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
expire_id: Option<String>,
|
expire_id: Option<String>,
|
||||||
) -> Result<Arc<Revision>, Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
let mut patch = diff(
|
let mut patch = diff(
|
||||||
ptr.get(self.get_data()?).unwrap_or(&Value::Null),
|
ptr.get(self.get_data()?).unwrap_or(&Value::Null),
|
||||||
&serde_json::to_value(value)?,
|
&serde_json::to_value(value)?,
|
||||||
@@ -169,9 +169,13 @@ impl Store {
|
|||||||
&mut self,
|
&mut self,
|
||||||
patch: DiffPatch,
|
patch: DiffPatch,
|
||||||
expire_id: Option<String>,
|
expire_id: Option<String>,
|
||||||
) -> Result<Arc<Revision>, Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
if (patch.0).0.is_empty() && expire_id.is_none() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
self.check_cache_corrupted()?;
|
self.check_cache_corrupted()?;
|
||||||
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
||||||
json_patch::patch(self.get_data_mut()?, &*patch)?;
|
json_patch::patch(self.get_data_mut()?, &*patch)?;
|
||||||
@@ -197,7 +201,7 @@ impl Store {
|
|||||||
expire_id,
|
expire_id,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(res)
|
Ok(Some(res))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,10 +252,12 @@ impl PatchDb {
|
|||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
expire_id: Option<String>,
|
expire_id: Option<String>,
|
||||||
) -> Result<Arc<Revision>, Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
let mut store = self.store.write().await;
|
let mut store = self.store.write().await;
|
||||||
let rev = store.put(ptr, value, expire_id).await?;
|
let rev = store.put(ptr, value, expire_id).await?;
|
||||||
self.subscriber.send(rev.clone()).unwrap_or_default();
|
if let Some(rev) = rev.as_ref() {
|
||||||
|
self.subscriber.send(rev.clone()).unwrap_or_default();
|
||||||
|
}
|
||||||
Ok(rev)
|
Ok(rev)
|
||||||
}
|
}
|
||||||
pub async fn apply(
|
pub async fn apply(
|
||||||
@@ -259,14 +265,16 @@ impl PatchDb {
|
|||||||
patch: DiffPatch,
|
patch: DiffPatch,
|
||||||
expire_id: Option<String>,
|
expire_id: Option<String>,
|
||||||
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
|
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
|
||||||
) -> Result<Arc<Revision>, Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
let mut store = if let Some(store_write_lock) = store_write_lock {
|
let mut store = if let Some(store_write_lock) = store_write_lock {
|
||||||
store_write_lock
|
store_write_lock
|
||||||
} else {
|
} else {
|
||||||
self.store.write().await
|
self.store.write().await
|
||||||
};
|
};
|
||||||
let rev = store.apply(patch, expire_id).await?;
|
let rev = store.apply(patch, expire_id).await?;
|
||||||
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors
|
if let Some(rev) = rev.as_ref() {
|
||||||
|
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors
|
||||||
|
}
|
||||||
Ok(rev)
|
Ok(rev)
|
||||||
}
|
}
|
||||||
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ async fn put_string_into_root(db: PatchDb, s: String) -> Arc<Revision> {
|
|||||||
db.put(&JsonPointer::<&'static str>::default(), &s, None)
|
db.put(&JsonPointer::<&'static str>::default(), &s, None)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ impl Transaction<&mut PatchDbHandle> {
|
|||||||
.db
|
.db
|
||||||
.apply(self.updates, expire_id, Some(store))
|
.apply(self.updates, expire_id, Some(store))
|
||||||
.await?;
|
.await?;
|
||||||
Ok(Some(rev))
|
Ok(rev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub async fn abort(mut self) -> Result<DiffPatch, Error> {
|
pub async fn abort(mut self) -> Result<DiffPatch, Error> {
|
||||||
@@ -155,12 +155,12 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &Value,
|
value: &Value,
|
||||||
) -> Result<(), Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
let old = self.get_value(ptr, None).await?;
|
let old = self.get_value(ptr, None).await?;
|
||||||
let mut patch = crate::patch::diff(&old, &value);
|
let mut patch = crate::patch::diff(&old, &value);
|
||||||
patch.prepend(ptr);
|
patch.prepend(ptr);
|
||||||
self.updates.append(patch);
|
self.updates.append(patch);
|
||||||
Ok(())
|
Ok(None)
|
||||||
}
|
}
|
||||||
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -202,11 +202,11 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ptr: &JsonPointer<S, V>,
|
ptr: &JsonPointer<S, V>,
|
||||||
value: &T,
|
value: &T,
|
||||||
) -> Result<(), Error> {
|
) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
self.put_value(ptr, &serde_json::to_value(value)?).await
|
self.put_value(ptr, &serde_json::to_value(value)?).await
|
||||||
}
|
}
|
||||||
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
|
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
self.updates.append(patch);
|
self.updates.append(patch);
|
||||||
Ok(())
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user