From b205639575bfeb54b72481cb45713a0735e91778 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Thu, 1 Sep 2022 17:13:54 -0600 Subject: [PATCH] add revision cache and clean up todos --- json-patch | 2 +- patch-db/Cargo.toml | 1 + patch-db/src/bulk_locks.rs | 2 +- patch-db/src/handle.rs | 43 ++++--- patch-db/src/lib.rs | 15 ++- patch-db/src/model.rs | 6 +- patch-db/src/patch.rs | 1 - patch-db/src/store.rs | 241 +++++++++++++++++++++--------------- patch-db/src/transaction.rs | 53 +++----- 9 files changed, 198 insertions(+), 166 deletions(-) diff --git a/json-patch b/json-patch index c73bbd1..6e482d3 160000 --- a/json-patch +++ b/json-patch @@ -1 +1 @@ -Subproject commit c73bbd19b8d2f85d53af1e31f558f90bee966e60 +Subproject commit 6e482d3af4ab573e0f24c31994713d93dff5b719 diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index 273619c..5f31523 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -18,6 +18,7 @@ unstable = [] [dependencies] async-trait = "0.1.42" +barrage = "0.2.3" fd-lock-rs = "0.1.3" futures = "0.3.8" imbl = "1.0.1" diff --git a/patch-db/src/bulk_locks.rs b/patch-db/src/bulk_locks.rs index c11a912..fad342b 100644 --- a/patch-db/src/bulk_locks.rs +++ b/patch-db/src/bulk_locks.rs @@ -107,7 +107,7 @@ where binds: &[&str], ) -> Result, Error> { let path = self.lock.glob.as_pointer(binds); - if !db_handle.exists(&path, None).await? { + if !db_handle.exists(&path, None).await { return Ok(None); } Ok(Some(db_handle.get(&path).await?)) diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index 92718cd..5967f9d 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -2,10 +2,10 @@ use std::collections::BTreeSet; use std::sync::Arc; use async_trait::async_trait; +use barrage::Receiver; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::sync::broadcast::Receiver; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use crate::{ @@ -19,6 +19,7 @@ use crate::{Error, Locker, PatchDb, Revision, Store, Transaction}; pub struct HandleId { pub(crate) id: u64, #[cfg(feature = "trace")] + #[allow(dead_code)] pub(crate) trace: Option>, } impl PartialEq for HandleId { @@ -45,7 +46,7 @@ pub trait DbHandle: Send + Sync + Sized { locks: impl IntoIterator + Send + Sync + Clone + 'a, ) -> Result; fn id(&self) -> HandleId; - fn rebase(&mut self) -> Result<(), Error>; + fn rebase(&mut self); fn store(&self) -> Arc>; fn subscribe(&self) -> Receiver>; fn locker(&self) -> &Locker; @@ -53,12 +54,12 @@ pub trait DbHandle: Send + Sync + Sized { &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result; + ) -> bool; async fn keys + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result, Error>; + ) -> BTreeSet; async fn get_value + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, @@ -113,7 +114,7 @@ impl DbHandle for &mut Handle { fn id(&self) -> HandleId { (**self).id() } - fn rebase(&mut self) -> Result<(), Error> { + fn rebase(&mut self) { (**self).rebase() } fn store(&self) -> Arc> { @@ -129,14 +130,14 @@ impl DbHandle for &mut Handle { &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result { + ) -> bool { (*self).exists(ptr, store_read_lock).await } async fn keys + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result, Error> { + ) -> BTreeSet { (*self).keys(ptr, store_read_lock).await } async fn get_value + Send + Sync, V: SegList + Send + Sync>( @@ -220,9 +221,7 @@ impl DbHandle for PatchDbHandle { fn id(&self) -> HandleId { self.id.clone() } - fn rebase(&mut self) -> Result<(), Error> { - Ok(()) - } + fn rebase(&mut self) {} fn store(&self) -> Arc> { self.db.store.clone() } @@ -236,7 +235,7 @@ impl DbHandle for PatchDbHandle { &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result { + ) -> bool { if let Some(lock) = store_read_lock { lock.exists(ptr) } else { @@ -247,7 +246,7 @@ impl DbHandle for PatchDbHandle { &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result, Error> { + ) -> BTreeSet { if let Some(lock) = store_read_lock { lock.keys(ptr) } else { @@ -259,25 +258,25 @@ impl DbHandle for PatchDbHandle { ptr: &JsonPointer, store_read_lock: Option>, ) -> Result { - if let Some(lock) = store_read_lock { - lock.get(ptr) + Ok(if let Some(lock) = store_read_lock { + lock.get_value(ptr) } else { - self.db.get(ptr).await - } + self.db.get_value(ptr).await + }) } async fn put_value + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, value: &Value, ) -> Result>, Error> { - self.db.put(ptr, value, None).await + self.db.put(ptr, value).await } async fn apply( &mut self, patch: DiffPatch, store_write_lock: Option>, ) -> Result>, Error> { - self.db.apply(patch, None, store_write_lock).await + self.db.apply(patch, store_write_lock).await } async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error> { self.locks @@ -303,7 +302,7 @@ impl DbHandle for PatchDbHandle { ptr: &JsonPointer, value: &T, ) -> Result>, Error> { - self.db.put(ptr, value, None).await + self.db.put(ptr, value).await } async fn lock_all<'a>( @@ -337,7 +336,7 @@ pub mod test_utils { fn id(&self) -> HandleId { unimplemented!() } - fn rebase(&mut self) -> Result<(), Error> { + fn rebase(&mut self) { unimplemented!() } fn store(&self) -> Arc> { @@ -353,14 +352,14 @@ pub mod test_utils { &mut self, _ptr: &JsonPointer, _store_read_lock: Option>, - ) -> Result { + ) -> bool { unimplemented!() } async fn keys + Send + Sync, V: SegList + Send + Sync>( &mut self, _ptr: &JsonPointer, _store_read_lock: Option>, - ) -> Result, Error> { + ) -> BTreeSet { unimplemented!() } async fn get_value + Send + Sync, V: SegList + Send + Sync>( diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index a71e6e8..509a609 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -1,10 +1,10 @@ use std::io::Error as IOError; use std::sync::Arc; +use barrage::Disconnected; use json_ptr::JsonPointer; use locker::LockError; use thiserror::Error; -use tokio::sync::broadcast::error::TryRecvError; // note: inserting into an array (before another element) without proper locking can result in unexpected behaviour @@ -34,6 +34,8 @@ pub use {json_patch, json_ptr}; pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier}; +pub type Subscriber = barrage::Receiver>; + pub mod test_utils { use super::*; pub use handle::test_utils::*; @@ -56,9 +58,9 @@ pub enum Error { #[error("FD Lock Error: {0}")] FDLock(#[from] fd_lock_rs::Error), #[error("Database Cache Corrupted: {0}")] - CacheCorrupted(Arc), - #[error("Subscriber Error: {0}")] - Subscriber(#[from] TryRecvError), + CacheCorrupted(Arc), + #[error("Subscriber Error: {0:?}")] + Subscriber(Disconnected), #[error("Node Does Not Exist: {0}")] NodeDoesNotExist(JsonPointer), #[error("Invalid Lock Request: {0}")] @@ -66,3 +68,8 @@ pub enum Error { #[error("Invalid Lock Request: {0}")] Locker(String), } +impl From for Error { + fn from(e: Disconnected) -> Self { + Error::Subscriber(e) + } +} diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 08b1b84..6824804 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -353,7 +353,7 @@ impl Deserialize<'de>> OptionModel { db.lock(self.0.as_ref().clone().into(), LockType::Exist) .await?; } - Ok(db.exists(self.as_ref(), None).await?) + Ok(db.exists(self.as_ref(), None).await) } pub fn map< @@ -626,7 +626,7 @@ where db.lock(self.json_ptr().clone().into(), LockType::Exist) .await?; } - let set = db.keys(self.json_ptr(), None).await?; + let set = db.keys(self.json_ptr(), None).await; Ok(set .into_iter() .map(|s| serde_json::from_value(Value::String(s))) @@ -635,7 +635,7 @@ where pub async fn remove(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> { db.lock(self.as_ref().clone().into(), LockType::Write) .await?; - if db.exists(self.clone().idx(key).as_ref(), None).await? { + if db.exists(self.clone().idx(key).as_ref(), None).await { db.apply( DiffPatch(Patch(vec![PatchOperation::Remove(RemoveOperation { path: self.as_ref().clone().join_end(key.as_ref()), diff --git a/patch-db/src/patch.rs b/patch-db/src/patch.rs index 796118e..2626337 100644 --- a/patch-db/src/patch.rs +++ b/patch-db/src/patch.rs @@ -11,7 +11,6 @@ use std::collections::BTreeSet; pub struct Revision { pub id: u64, pub patch: DiffPatch, - pub expire_id: Option, } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index f89d0aa..4ecfc6e 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -1,10 +1,11 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fs::OpenOptions; -use std::io::Error as IOError; +use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use barrage::{Receiver, Sender}; use fd_lock_rs::FdLock; use json_ptr::{JsonPointer, SegList}; use lazy_static::lazy_static; @@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::BTreeSet; use tokio::fs::File; -use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::io::AsyncSeekExt; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard}; use crate::handle::HandleId; @@ -24,12 +25,46 @@ lazy_static! { static ref OPEN_STORES: Mutex>>> = Mutex::new(HashMap::new()); } +pub struct RevisionCache { + cache: VecDeque>, + capacity: usize, +} +impl RevisionCache { + pub fn with_capacity(capacity: usize) -> Self { + RevisionCache { + cache: VecDeque::with_capacity(capacity), + capacity, + } + } + pub fn push(&mut self, revision: Arc) { + while self.capacity > 0 && self.cache.len() >= self.capacity { + self.cache.pop_front(); + } + self.cache.push_back(revision); + } + pub fn since(&self, id: u64) -> Option>> { + let start = self.cache.get(0).map(|rev| rev.id)?; + if id < start - 1 { + return None; + } + Some( + self.cache + .iter() + .skip((id - start + 1) as usize) + .cloned() + .collect(), + ) + } +} + pub struct Store { + path: PathBuf, file: FdLock, + file_cursor: u64, _lock: OwnedMutexGuard<()>, - cache_corrupted: Option>, - data: Value, + persistent: Value, revision: u64, + revision_cache: RevisionCache, } impl Store { pub(crate) async fn open>(path: P) -> Result { @@ -50,8 +85,8 @@ impl Store { path, ) }; - Ok(tokio::task::spawn_blocking(move || { - use std::io::Write; + let mut res = tokio::task::spawn_blocking(move || { + use std::io::Seek; let bak = path.with_extension("bak"); if bak.exists() { @@ -70,50 +105,33 @@ impl Store { serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f)); let mut revision: u64 = stream.next().transpose()?.unwrap_or(0); let mut stream = stream.change_output_type(); - let mut data = stream.next().transpose()?.unwrap_or_else(|| Value::Null); + let mut persistent = stream.next().transpose()?.unwrap_or_else(|| Value::Null); let mut stream = stream.change_output_type(); while let Some(Ok(patch)) = stream.next() { - json_patch::patch(&mut data, &patch)?; + json_patch::patch(&mut persistent, &patch)?; revision += 1; } - let bak_tmp = bak.with_extension("bak.tmp"); - let mut backup_file = std::fs::File::create(&bak_tmp)?; - serde_cbor::to_writer(&mut backup_file, &revision)?; - serde_cbor::to_writer(&mut backup_file, &data)?; - backup_file.flush()?; - backup_file.sync_all()?; - std::fs::rename(&bak_tmp, &bak)?; - f.set_len(0)?; - serde_cbor::to_writer(&mut *f, &revision)?; - serde_cbor::to_writer(&mut *f, &data)?; - f.flush()?; - f.sync_all()?; - std::fs::remove_file(&bak)?; + let file_cursor = f.stream_position()?; Ok::<_, Error>(Store { + path, file: f.map(File::from_std), + file_cursor, _lock, - cache_corrupted: None, - data, + persistent, revision, + revision_cache: RevisionCache::with_capacity(64), }) }) - .await??) + .await??; + res.compress().await?; + Ok(res) } - fn check_cache_corrupted(&self) -> Result<(), Error> { - if let Some(ref err) = self.cache_corrupted { - Err(Error::CacheCorrupted(err.clone())) - } else { - Ok(()) + pub(crate) fn get_revisions_since(&self, id: u64) -> Option>> { + if id >= self.revision { + return Some(Vec::new()); } - } - pub(crate) fn get_data(&self) -> Result<&Value, Error> { - self.check_cache_corrupted()?; - Ok(&self.data) - } - fn get_data_mut(&mut self) -> Result<&mut Value, Error> { - self.check_cache_corrupted()?; - Ok(&mut self.data) + self.revision_cache.since(id) } pub async fn close(mut self) -> Result<(), Error> { use tokio::io::AsyncWriteExt; @@ -123,86 +141,101 @@ impl Store { self.file.unlock(true).map_err(|e| e.1)?; Ok(()) } - pub(crate) fn exists, V: SegList>( - &self, - ptr: &JsonPointer, - ) -> Result { - Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null) + pub(crate) fn exists, V: SegList>(&self, ptr: &JsonPointer) -> bool { + ptr.get(&self.persistent).unwrap_or(&Value::Null) != &Value::Null } pub(crate) fn keys, V: SegList>( &self, ptr: &JsonPointer, - ) -> Result, Error> { - Ok(match ptr.get(self.get_data()?).unwrap_or(&Value::Null) { + ) -> BTreeSet { + match ptr.get(&self.persistent).unwrap_or(&Value::Null) { Value::Object(o) => o.keys().cloned().collect(), _ => BTreeSet::new(), - }) + } + } + pub(crate) fn get_value, V: SegList>(&self, ptr: &JsonPointer) -> Value { + ptr.get(&self.persistent).unwrap_or(&Value::Null).clone() } pub(crate) fn get Deserialize<'de>, S: AsRef, V: SegList>( &self, ptr: &JsonPointer, ) -> Result { - Ok(serde_json::from_value( - ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(), - )?) + Ok(serde_json::from_value(self.get_value(ptr))?) } - pub(crate) fn dump(&self) -> Dump { - Dump { + pub(crate) fn dump(&self) -> Result { + Ok(Dump { id: self.revision, - value: self.get_data().unwrap().clone(), - } + value: self.persistent.clone(), + }) } pub(crate) async fn put, V: SegList>( &mut self, ptr: &JsonPointer, value: &T, - expire_id: Option, ) -> Result>, Error> { let mut patch = diff( - ptr.get(self.get_data()?).unwrap_or(&Value::Null), + ptr.get(&self.persistent).unwrap_or(&Value::Null), &serde_json::to_value(value)?, ); patch.prepend(ptr); - self.apply(patch, expire_id).await + self.apply(patch).await } - pub(crate) async fn apply( - &mut self, - patch: DiffPatch, - expire_id: Option, - ) -> Result>, Error> { + pub(crate) async fn compress(&mut self) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + let bak = self.path.with_extension("bak"); + let bak_tmp = bak.with_extension("bak.tmp"); + let mut backup_file = File::create(&bak_tmp).await?; + let revision_cbor = serde_cbor::to_vec(&self.revision)?; + let data_cbor = serde_cbor::to_vec(&self.persistent)?; + backup_file.write_all(&revision_cbor).await?; + backup_file.write_all(&data_cbor).await?; + backup_file.flush().await?; + backup_file.sync_all().await?; + tokio::fs::rename(&bak_tmp, &bak).await?; + self.file.set_len(0).await?; + self.file.seek(SeekFrom::Start(0)).await?; + self.file.write_all(&revision_cbor).await?; + self.file.write_all(&data_cbor).await?; + self.file.flush().await?; + self.file.sync_all().await?; + tokio::fs::remove_file(&bak).await?; + Ok(()) + } + pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result>, Error> { use tokio::io::AsyncWriteExt; - if (patch.0).0.is_empty() && expire_id.is_none() { + if (patch.0).0.is_empty() { return Ok(None); } #[cfg(feature = "tracing")] tracing::trace!("Attempting to apply patch: {:?}", patch); - self.check_cache_corrupted()?; let patch_bin = serde_cbor::to_vec(&*patch)?; - json_patch::patch(self.get_data_mut()?, &*patch)?; - - async fn sync_to_disk(file: &mut File, patch_bin: &[u8]) -> Result<(), IOError> { - file.write_all(patch_bin).await?; - file.flush().await?; - file.sync_all().await?; - Ok(()) - } - if let Err(e) = sync_to_disk(&mut *self.file, &patch_bin).await { - let e = Arc::new(e); - self.cache_corrupted = Some(e.clone()); - return Err(Error::CacheCorrupted(e)); - // TODO: try to recover. - } - + let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?; self.revision += 1; + async { + let file_len = self.file.metadata().await?.len(); + if file_len != self.file_cursor { + self.file.set_len(self.file_cursor).await?; + self.file.seek(SeekFrom::Start(self.file_cursor)).await?; + } + self.file.write_all(&patch_bin).await?; + self.file.flush().await?; + self.file.sync_all().await?; + self.file_cursor = self.file.stream_position().await?; + Ok::<_, Error>(()) + } + .await + .map_err(|e| { + persistent_undo.apply(&mut self.persistent); + self.revision -= 1; + e + })?; + let id = self.revision; - let res = Arc::new(Revision { - id, - patch, - expire_id, - }); + let res = Arc::new(Revision { id, patch }); + self.revision_cache.push(res.clone()); Ok(Some(res)) } @@ -211,13 +244,13 @@ impl Store { #[derive(Clone)] pub struct PatchDb { pub(crate) store: Arc>, - subscriber: Arc>>, + subscriber: Arc<(Sender>, Receiver>)>, pub(crate) locker: Arc, handle_id: Arc, } impl PatchDb { pub async fn open>(path: P) -> Result { - let (subscriber, _) = tokio::sync::broadcast::channel(4096); // TODO: make this unbounded + let subscriber = barrage::unbounded(); Ok(PatchDb { store: Arc::new(RwLock::new(Store::open(path).await?)), @@ -226,26 +259,34 @@ impl PatchDb { handle_id: Arc::new(AtomicU64::new(0)), }) } - pub async fn dump(&self) -> Dump { + pub async fn sync(&self, sequence: u64) -> Result>, Dump>, Error> { + let store = self.store.read().await; + if let Some(revs) = store.get_revisions_since(sequence) { + Ok(Ok(revs)) + } else { + Ok(Err(store.dump()?)) + } + } + pub async fn dump(&self) -> Result { self.store.read().await.dump() } - pub async fn dump_and_sub(&self) -> (Dump, Receiver>) { + pub async fn dump_and_sub(&self) -> Result<(Dump, Receiver>), Error> { let store = self.store.read().await; - let sub = self.subscriber.subscribe(); - (store.dump(), sub) + let sub = self.subscriber.1.clone(); + Ok((store.dump()?, sub)) } - pub async fn exists, V: SegList>( - &self, - ptr: &JsonPointer, - ) -> Result { + pub async fn exists, V: SegList>(&self, ptr: &JsonPointer) -> bool { self.store.read().await.exists(ptr) } pub async fn keys, V: SegList>( &self, ptr: &JsonPointer, - ) -> Result, Error> { + ) -> BTreeSet { self.store.read().await.keys(ptr) } + pub async fn get_value, V: SegList>(&self, ptr: &JsonPointer) -> Value { + self.store.read().await.get_value(ptr) + } pub async fn get Deserialize<'de>, S: AsRef, V: SegList>( &self, ptr: &JsonPointer, @@ -256,19 +297,17 @@ impl PatchDb { &self, ptr: &JsonPointer, value: &T, - expire_id: Option, ) -> Result>, Error> { let mut store = self.store.write().await; - let rev = store.put(ptr, value, expire_id).await?; + let rev = store.put(ptr, value).await?; if let Some(rev) = rev.as_ref() { - self.subscriber.send(rev.clone()).unwrap_or_default(); + self.subscriber.0.send(rev.clone()).unwrap_or_default(); } Ok(rev) } pub async fn apply( &self, patch: DiffPatch, - expire_id: Option, store_write_lock: Option>, ) -> Result>, Error> { let mut store = if let Some(store_write_lock) = store_write_lock { @@ -276,14 +315,14 @@ impl PatchDb { } else { self.store.write().await }; - let rev = store.apply(patch, expire_id).await?; + let rev = store.apply(patch).await?; if let Some(rev) = rev.as_ref() { - self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors + self.subscriber.0.send(rev.clone()).unwrap_or_default(); // ignore errors } Ok(rev) } pub fn subscribe(&self) -> Receiver> { - self.subscriber.subscribe() + self.subscriber.1.clone() } pub fn handle(&self) -> PatchDbHandle { PatchDbHandle { diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index bf3dd9f..a4c54c4 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -2,11 +2,10 @@ use std::collections::BTreeSet; use std::sync::Arc; use async_trait::async_trait; +use barrage::Receiver; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::sync::broadcast::error::TryRecvError; -use tokio::sync::broadcast::Receiver; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use crate::patch::{DiffPatch, Revision}; @@ -26,28 +25,21 @@ pub struct Transaction { pub(crate) sub: Receiver>, } impl Transaction<&mut PatchDbHandle> { - pub async fn commit( - mut self, - expire_id: Option, - ) -> Result>, Error> { - if (self.updates.0).0.is_empty() && expire_id.is_none() { + pub async fn commit(mut self) -> Result>, Error> { + if (self.updates.0).0.is_empty() { Ok(None) } else { let store_lock = self.parent.store(); let store = store_lock.write().await; - self.rebase()?; - let rev = self - .parent - .db - .apply(self.updates, expire_id, Some(store)) - .await?; + self.rebase(); + let rev = self.parent.db.apply(self.updates, Some(store)).await?; Ok(rev) } } pub async fn abort(mut self) -> Result { let store_lock = self.parent.store(); let _store = store_lock.read().await; - self.rebase()?; + self.rebase(); Ok(self.updates) } } @@ -55,7 +47,7 @@ impl Transaction { pub async fn save(mut self) -> Result<(), Error> { let store_lock = self.parent.store(); let store = store_lock.write().await; - self.rebase()?; + self.rebase(); self.parent.apply(self.updates, Some(store)).await?; Ok(()) } @@ -65,7 +57,7 @@ impl DbHandle for Transaction { async fn begin<'a>(&'a mut self) -> Result, Error> { let store_lock = self.parent.store(); let store = store_lock.read().await; - self.rebase()?; + self.rebase(); let sub = self.parent.subscribe(); drop(store); Ok(Transaction { @@ -79,16 +71,11 @@ impl DbHandle for Transaction { fn id(&self) -> HandleId { self.id.clone() } - fn rebase(&mut self) -> Result<(), Error> { - self.parent.rebase()?; - while let Some(rev) = match self.sub.try_recv() { - Ok(a) => Some(a), - Err(TryRecvError::Empty) => None, - Err(e) => return Err(e.into()), - } { + fn rebase(&mut self) { + self.parent.rebase(); + while let Some(rev) = self.sub.try_recv().unwrap() { self.updates.rebase(&rev.patch); } - Ok(()) } fn store(&self) -> Arc> { self.parent.store() @@ -103,7 +90,7 @@ impl DbHandle for Transaction { &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result { + ) -> bool { let exists = { let store_lock = self.parent.store(); let store = if let Some(store_read_lock) = store_read_lock { @@ -111,16 +98,16 @@ impl DbHandle for Transaction { } else { store_lock.read().await }; - self.rebase()?; - self.parent.exists(ptr, Some(store)).await? + self.rebase(); + self.parent.exists(ptr, Some(store)).await }; - Ok(self.updates.for_path(ptr).exists().unwrap_or(exists)) + self.updates.for_path(ptr).exists().unwrap_or(exists) } async fn keys + Send + Sync, V: SegList + Send + Sync>( &mut self, ptr: &JsonPointer, store_read_lock: Option>, - ) -> Result, Error> { + ) -> BTreeSet { let keys = { let store_lock = self.parent.store(); let store = if let Some(store_read_lock) = store_read_lock { @@ -128,10 +115,10 @@ impl DbHandle for Transaction { } else { store_lock.read().await }; - self.rebase()?; - self.parent.keys(ptr, Some(store)).await? + self.rebase(); + self.parent.keys(ptr, Some(store)).await }; - Ok(self.updates.for_path(ptr).keys(keys)) + self.updates.for_path(ptr).keys(keys) } async fn get_value + Send + Sync, V: SegList + Send + Sync>( &mut self, @@ -145,7 +132,7 @@ impl DbHandle for Transaction { } else { store_lock.read().await }; - self.rebase()?; + self.rebase(); self.parent.get_value(ptr, Some(store)).await? }; let path_updates = self.updates.for_path(ptr);