expire_id

This commit is contained in:
Aiden McClelland
2021-04-29 10:59:10 -06:00
parent a3040ddcf6
commit f8dd662d84
5 changed files with 26 additions and 13 deletions

View File

@@ -194,11 +194,11 @@ impl DbHandle for PatchDbHandle {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &Value, value: &Value,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.db.put(ptr, value).await?; self.db.put(ptr, value, None).await?;
Ok(()) Ok(())
} }
async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> { async fn apply(&mut self, patch: DiffPatch) -> Result<(), Error> {
self.db.apply(patch, None).await?; self.db.apply(patch, None, None).await?;
Ok(()) Ok(())
} }
async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>( async fn lock<S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
@@ -241,7 +241,7 @@ impl DbHandle for PatchDbHandle {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &T, value: &T,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.db.put(ptr, value).await?; self.db.put(ptr, value, None).await?;
Ok(()) Ok(())
} }
} }

View File

@@ -9,6 +9,7 @@ use serde_json::Value;
pub struct Revision { pub struct Revision {
pub id: u64, pub id: u64,
pub patch: DiffPatch, pub patch: DiffPatch,
pub expire_id: Option<String>,
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -143,15 +143,20 @@ impl Store {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &T, value: &T,
expire_id: Option<String>,
) -> Result<Arc<Revision>, Error> { ) -> Result<Arc<Revision>, Error> {
let mut patch = diff( let mut patch = diff(
ptr.get(self.get_data()?).unwrap_or(&Value::Null), ptr.get(self.get_data()?).unwrap_or(&Value::Null),
&serde_json::to_value(value)?, &serde_json::to_value(value)?,
); );
patch.prepend(ptr); patch.prepend(ptr);
self.apply(patch).await self.apply(patch, expire_id).await
} }
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> { pub(crate) async fn apply(
&mut self,
patch: DiffPatch,
expire_id: Option<String>,
) -> Result<Arc<Revision>, Error> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
self.check_cache_corrupted()?; self.check_cache_corrupted()?;
@@ -173,7 +178,11 @@ impl Store {
let id = self.revision; let id = self.revision;
self.revision += 1; self.revision += 1;
let res = Arc::new(Revision { id, patch }); let res = Arc::new(Revision {
id,
patch,
expire_id,
});
Ok(res) Ok(res)
} }
@@ -214,15 +223,17 @@ impl PatchDb {
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &T, value: &T,
expire_id: Option<String>,
) -> Result<Arc<Revision>, Error> { ) -> Result<Arc<Revision>, Error> {
let mut store = self.store.write().await; let mut store = self.store.write().await;
let rev = store.put(ptr, value).await?; let rev = store.put(ptr, value, expire_id).await?;
self.subscriber.send(rev.clone()).unwrap_or_default(); self.subscriber.send(rev.clone()).unwrap_or_default();
Ok(rev) Ok(rev)
} }
pub async fn apply( pub async fn apply(
&self, &self,
patch: DiffPatch, patch: DiffPatch,
expire_id: Option<String>,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>, store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Arc<Revision>, Error> { ) -> Result<Arc<Revision>, Error> {
let mut store = if let Some(store_write_lock) = store_write_lock { let mut store = if let Some(store_write_lock) = store_write_lock {
@@ -230,7 +241,7 @@ impl PatchDb {
} else { } else {
self.store.write().await self.store.write().await
}; };
let rev = store.apply(patch).await?; let rev = store.apply(patch, expire_id).await?;
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors
Ok(rev) Ok(rev)
} }

View File

@@ -23,6 +23,7 @@ async fn init_db(db_name: String) -> PatchDb {
c: NewType(None), c: NewType(None),
}, },
}, },
None,
) )
.await .await
.unwrap(); .unwrap();
@@ -34,7 +35,7 @@ async fn cleanup_db(db_name: &str) {
} }
async fn put_string_into_root(db: PatchDb, s: String) -> Arc<Revision> { async fn put_string_into_root(db: PatchDb, s: String) -> Arc<Revision> {
db.put(&JsonPointer::<&'static str>::default(), &s) db.put(&JsonPointer::<&'static str>::default(), &s, None)
.await .await
.unwrap() .unwrap()
} }
@@ -45,7 +46,7 @@ async fn basic() {
let ptr: JsonPointer = "/b/b".parse().unwrap(); let ptr: JsonPointer = "/b/b".parse().unwrap();
let mut get_res: Value = db.get(&ptr).await.unwrap(); let mut get_res: Value = db.get(&ptr).await.unwrap();
assert_eq!(get_res, 1); assert_eq!(get_res, 1);
db.put(&ptr, "hello").await.unwrap(); db.put(&ptr, "hello", None).await.unwrap();
get_res = db.get(&ptr).await.unwrap(); get_res = db.get(&ptr).await.unwrap();
assert_eq!(get_res, "hello"); assert_eq!(get_res, "hello");
cleanup_db("test.db").await; cleanup_db("test.db").await;
@@ -59,7 +60,7 @@ async fn transaction() {
let ptr: JsonPointer = "/b/b".parse().unwrap(); let ptr: JsonPointer = "/b/b".parse().unwrap();
tx.put(&ptr, &(2 as usize)).await.unwrap(); tx.put(&ptr, &(2 as usize)).await.unwrap();
tx.put(&ptr, &(1 as usize)).await.unwrap(); tx.put(&ptr, &(1 as usize)).await.unwrap();
let _res = tx.commit().await.unwrap(); let _res = tx.commit(None).await.unwrap();
println!("res = {:?}", _res); println!("res = {:?}", _res);
cleanup_db("test.db").await; cleanup_db("test.db").await;
} }

View File

@@ -26,11 +26,11 @@ pub struct Transaction<Parent: DbHandle> {
pub(crate) sub: Receiver<Arc<Revision>>, pub(crate) sub: Receiver<Arc<Revision>>,
} }
impl Transaction<&mut PatchDbHandle> { impl Transaction<&mut PatchDbHandle> {
pub async fn commit(mut self) -> Result<Arc<Revision>, Error> { pub async fn commit(mut self, expire_id: Option<String>) -> Result<Arc<Revision>, Error> {
let store_lock = self.parent.store(); let store_lock = self.parent.store();
let store = store_lock.read().await; let store = store_lock.read().await;
self.rebase()?; self.rebase()?;
let rev = self.parent.db.apply(self.updates, None).await?; let rev = self.parent.db.apply(self.updates, expire_id, None).await?;
drop(store); drop(store);
Ok(rev) Ok(rev)
} }