add revision cache and clean up todos

This commit is contained in:
Aiden McClelland
2022-09-01 17:13:54 -06:00
committed by Aiden McClelland
parent 3e0a0204bc
commit b205639575
9 changed files with 198 additions and 166 deletions

View File

@@ -18,6 +18,7 @@ unstable = []
[dependencies] [dependencies]
async-trait = "0.1.42" async-trait = "0.1.42"
barrage = "0.2.3"
fd-lock-rs = "0.1.3" fd-lock-rs = "0.1.3"
futures = "0.3.8" futures = "0.3.8"
imbl = "1.0.1" imbl = "1.0.1"

View File

@@ -107,7 +107,7 @@ where
binds: &[&str], binds: &[&str],
) -> Result<Option<T>, Error> { ) -> Result<Option<T>, Error> {
let path = self.lock.glob.as_pointer(binds); let path = self.lock.glob.as_pointer(binds);
if !db_handle.exists(&path, None).await? { if !db_handle.exists(&path, None).await {
return Ok(None); return Ok(None);
} }
Ok(Some(db_handle.get(&path).await?)) Ok(Some(db_handle.get(&path).await?))

View File

@@ -2,10 +2,10 @@ use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use barrage::Receiver;
use json_ptr::{JsonPointer, SegList}; use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tokio::sync::broadcast::Receiver;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::{ use crate::{
@@ -19,6 +19,7 @@ use crate::{Error, Locker, PatchDb, Revision, Store, Transaction};
pub struct HandleId { pub struct HandleId {
pub(crate) id: u64, pub(crate) id: u64,
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
#[allow(dead_code)]
pub(crate) trace: Option<Arc<tracing_error::SpanTrace>>, pub(crate) trace: Option<Arc<tracing_error::SpanTrace>>,
} }
impl PartialEq for HandleId { impl PartialEq for HandleId {
@@ -45,7 +46,7 @@ pub trait DbHandle: Send + Sync + Sized {
locks: impl IntoIterator<Item = bulk_locks::LockTargetId> + Send + Sync + Clone + 'a, locks: impl IntoIterator<Item = bulk_locks::LockTargetId> + Send + Sync + Clone + 'a,
) -> Result<bulk_locks::Verifier, Error>; ) -> Result<bulk_locks::Verifier, Error>;
fn id(&self) -> HandleId; fn id(&self) -> HandleId;
fn rebase(&mut self) -> Result<(), Error>; fn rebase(&mut self);
fn store(&self) -> Arc<RwLock<Store>>; fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>; fn subscribe(&self) -> Receiver<Arc<Revision>>;
fn locker(&self) -> &Locker; fn locker(&self) -> &Locker;
@@ -53,12 +54,12 @@ pub trait DbHandle: Send + Sync + Sized {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error>; ) -> bool;
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<BTreeSet<String>, Error>; ) -> BTreeSet<String>;
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
@@ -113,7 +114,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
fn id(&self) -> HandleId { fn id(&self) -> HandleId {
(**self).id() (**self).id()
} }
fn rebase(&mut self) -> Result<(), Error> { fn rebase(&mut self) {
(**self).rebase() (**self).rebase()
} }
fn store(&self) -> Arc<RwLock<Store>> { fn store(&self) -> Arc<RwLock<Store>> {
@@ -129,14 +130,14 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error> { ) -> bool {
(*self).exists(ptr, store_read_lock).await (*self).exists(ptr, store_read_lock).await
} }
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<BTreeSet<String>, Error> { ) -> BTreeSet<String> {
(*self).keys(ptr, store_read_lock).await (*self).keys(ptr, store_read_lock).await
} }
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
@@ -220,9 +221,7 @@ impl DbHandle for PatchDbHandle {
fn id(&self) -> HandleId { fn id(&self) -> HandleId {
self.id.clone() self.id.clone()
} }
fn rebase(&mut self) -> Result<(), Error> { fn rebase(&mut self) {}
Ok(())
}
fn store(&self) -> Arc<RwLock<Store>> { fn store(&self) -> Arc<RwLock<Store>> {
self.db.store.clone() self.db.store.clone()
} }
@@ -236,7 +235,7 @@ impl DbHandle for PatchDbHandle {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error> { ) -> bool {
if let Some(lock) = store_read_lock { if let Some(lock) = store_read_lock {
lock.exists(ptr) lock.exists(ptr)
} else { } else {
@@ -247,7 +246,7 @@ impl DbHandle for PatchDbHandle {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<BTreeSet<String>, Error> { ) -> BTreeSet<String> {
if let Some(lock) = store_read_lock { if let Some(lock) = store_read_lock {
lock.keys(ptr) lock.keys(ptr)
} else { } else {
@@ -259,25 +258,25 @@ impl DbHandle for PatchDbHandle {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
if let Some(lock) = store_read_lock { Ok(if let Some(lock) = store_read_lock {
lock.get(ptr) lock.get_value(ptr)
} else { } else {
self.db.get(ptr).await self.db.get_value(ptr).await
} })
} }
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &Value, value: &Value,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
self.db.put(ptr, value, None).await self.db.put(ptr, value).await
} }
async fn apply( async fn apply(
&mut self, &mut self,
patch: DiffPatch, patch: DiffPatch,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>, store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
self.db.apply(patch, None, store_write_lock).await self.db.apply(patch, store_write_lock).await
} }
async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error> { async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error> {
self.locks self.locks
@@ -303,7 +302,7 @@ impl DbHandle for PatchDbHandle {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &T, value: &T,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
self.db.put(ptr, value, None).await self.db.put(ptr, value).await
} }
async fn lock_all<'a>( async fn lock_all<'a>(
@@ -337,7 +336,7 @@ pub mod test_utils {
fn id(&self) -> HandleId { fn id(&self) -> HandleId {
unimplemented!() unimplemented!()
} }
fn rebase(&mut self) -> Result<(), Error> { fn rebase(&mut self) {
unimplemented!() unimplemented!()
} }
fn store(&self) -> Arc<RwLock<Store>> { fn store(&self) -> Arc<RwLock<Store>> {
@@ -353,14 +352,14 @@ pub mod test_utils {
&mut self, &mut self,
_ptr: &JsonPointer<S, V>, _ptr: &JsonPointer<S, V>,
_store_read_lock: Option<RwLockReadGuard<'_, Store>>, _store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error> { ) -> bool {
unimplemented!() unimplemented!()
} }
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
_ptr: &JsonPointer<S, V>, _ptr: &JsonPointer<S, V>,
_store_read_lock: Option<RwLockReadGuard<'_, Store>>, _store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<BTreeSet<String>, Error> { ) -> BTreeSet<String> {
unimplemented!() unimplemented!()
} }
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(

View File

@@ -1,10 +1,10 @@
use std::io::Error as IOError; use std::io::Error as IOError;
use std::sync::Arc; use std::sync::Arc;
use barrage::Disconnected;
use json_ptr::JsonPointer; use json_ptr::JsonPointer;
use locker::LockError; use locker::LockError;
use thiserror::Error; use thiserror::Error;
use tokio::sync::broadcast::error::TryRecvError;
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour // note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
@@ -34,6 +34,8 @@ pub use {json_patch, json_ptr};
pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier}; pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier};
pub type Subscriber = barrage::Receiver<Arc<Revision>>;
pub mod test_utils { pub mod test_utils {
use super::*; use super::*;
pub use handle::test_utils::*; pub use handle::test_utils::*;
@@ -56,9 +58,9 @@ pub enum Error {
#[error("FD Lock Error: {0}")] #[error("FD Lock Error: {0}")]
FDLock(#[from] fd_lock_rs::Error), FDLock(#[from] fd_lock_rs::Error),
#[error("Database Cache Corrupted: {0}")] #[error("Database Cache Corrupted: {0}")]
CacheCorrupted(Arc<IOError>), CacheCorrupted(Arc<Error>),
#[error("Subscriber Error: {0}")] #[error("Subscriber Error: {0:?}")]
Subscriber(#[from] TryRecvError), Subscriber(Disconnected),
#[error("Node Does Not Exist: {0}")] #[error("Node Does Not Exist: {0}")]
NodeDoesNotExist(JsonPointer), NodeDoesNotExist(JsonPointer),
#[error("Invalid Lock Request: {0}")] #[error("Invalid Lock Request: {0}")]
@@ -66,3 +68,8 @@ pub enum Error {
#[error("Invalid Lock Request: {0}")] #[error("Invalid Lock Request: {0}")]
Locker(String), Locker(String),
} }
impl From<Disconnected> for Error {
fn from(e: Disconnected) -> Self {
Error::Subscriber(e)
}
}

View File

@@ -353,7 +353,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
db.lock(self.0.as_ref().clone().into(), LockType::Exist) db.lock(self.0.as_ref().clone().into(), LockType::Exist)
.await?; .await?;
} }
Ok(db.exists(self.as_ref(), None).await?) Ok(db.exists(self.as_ref(), None).await)
} }
pub fn map< pub fn map<
@@ -626,7 +626,7 @@ where
db.lock(self.json_ptr().clone().into(), LockType::Exist) db.lock(self.json_ptr().clone().into(), LockType::Exist)
.await?; .await?;
} }
let set = db.keys(self.json_ptr(), None).await?; let set = db.keys(self.json_ptr(), None).await;
Ok(set Ok(set
.into_iter() .into_iter()
.map(|s| serde_json::from_value(Value::String(s))) .map(|s| serde_json::from_value(Value::String(s)))
@@ -635,7 +635,7 @@ where
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> { pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
db.lock(self.as_ref().clone().into(), LockType::Write) db.lock(self.as_ref().clone().into(), LockType::Write)
.await?; .await?;
if db.exists(self.clone().idx(key).as_ref(), None).await? { if db.exists(self.clone().idx(key).as_ref(), None).await {
db.apply( db.apply(
DiffPatch(Patch(vec![PatchOperation::Remove(RemoveOperation { DiffPatch(Patch(vec![PatchOperation::Remove(RemoveOperation {
path: self.as_ref().clone().join_end(key.as_ref()), path: self.as_ref().clone().join_end(key.as_ref()),

View File

@@ -11,7 +11,6 @@ use std::collections::BTreeSet;
pub struct Revision { pub struct Revision {
pub id: u64, pub id: u64,
pub patch: DiffPatch, pub patch: DiffPatch,
pub expire_id: Option<String>,
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -1,10 +1,11 @@
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Error as IOError; use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use barrage::{Receiver, Sender};
use fd_lock_rs::FdLock; use fd_lock_rs::FdLock;
use json_ptr::{JsonPointer, SegList}; use json_ptr::{JsonPointer, SegList};
use lazy_static::lazy_static; use lazy_static::lazy_static;
@@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use tokio::fs::File; use tokio::fs::File;
use tokio::sync::broadcast::{Receiver, Sender}; use tokio::io::AsyncSeekExt;
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard}; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
use crate::handle::HandleId; use crate::handle::HandleId;
@@ -24,12 +25,46 @@ lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new()); static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
} }
pub struct RevisionCache {
cache: VecDeque<Arc<Revision>>,
capacity: usize,
}
impl RevisionCache {
pub fn with_capacity(capacity: usize) -> Self {
RevisionCache {
cache: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn push(&mut self, revision: Arc<Revision>) {
while self.capacity > 0 && self.cache.len() >= self.capacity {
self.cache.pop_front();
}
self.cache.push_back(revision);
}
pub fn since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
let start = self.cache.get(0).map(|rev| rev.id)?;
if id < start - 1 {
return None;
}
Some(
self.cache
.iter()
.skip((id - start + 1) as usize)
.cloned()
.collect(),
)
}
}
pub struct Store { pub struct Store {
path: PathBuf,
file: FdLock<File>, file: FdLock<File>,
file_cursor: u64,
_lock: OwnedMutexGuard<()>, _lock: OwnedMutexGuard<()>,
cache_corrupted: Option<Arc<IOError>>, persistent: Value,
data: Value,
revision: u64, revision: u64,
revision_cache: RevisionCache,
} }
impl Store { impl Store {
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> { pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
@@ -50,8 +85,8 @@ impl Store {
path, path,
) )
}; };
Ok(tokio::task::spawn_blocking(move || { let mut res = tokio::task::spawn_blocking(move || {
use std::io::Write; use std::io::Seek;
let bak = path.with_extension("bak"); let bak = path.with_extension("bak");
if bak.exists() { if bak.exists() {
@@ -70,50 +105,33 @@ impl Store {
serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f)); serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f));
let mut revision: u64 = stream.next().transpose()?.unwrap_or(0); let mut revision: u64 = stream.next().transpose()?.unwrap_or(0);
let mut stream = stream.change_output_type(); let mut stream = stream.change_output_type();
let mut data = stream.next().transpose()?.unwrap_or_else(|| Value::Null); let mut persistent = stream.next().transpose()?.unwrap_or_else(|| Value::Null);
let mut stream = stream.change_output_type(); let mut stream = stream.change_output_type();
while let Some(Ok(patch)) = stream.next() { while let Some(Ok(patch)) = stream.next() {
json_patch::patch(&mut data, &patch)?; json_patch::patch(&mut persistent, &patch)?;
revision += 1; revision += 1;
} }
let bak_tmp = bak.with_extension("bak.tmp"); let file_cursor = f.stream_position()?;
let mut backup_file = std::fs::File::create(&bak_tmp)?;
serde_cbor::to_writer(&mut backup_file, &revision)?;
serde_cbor::to_writer(&mut backup_file, &data)?;
backup_file.flush()?;
backup_file.sync_all()?;
std::fs::rename(&bak_tmp, &bak)?;
f.set_len(0)?;
serde_cbor::to_writer(&mut *f, &revision)?;
serde_cbor::to_writer(&mut *f, &data)?;
f.flush()?;
f.sync_all()?;
std::fs::remove_file(&bak)?;
Ok::<_, Error>(Store { Ok::<_, Error>(Store {
path,
file: f.map(File::from_std), file: f.map(File::from_std),
file_cursor,
_lock, _lock,
cache_corrupted: None, persistent,
data,
revision, revision,
revision_cache: RevisionCache::with_capacity(64),
}) })
}) })
.await??) .await??;
res.compress().await?;
Ok(res)
} }
fn check_cache_corrupted(&self) -> Result<(), Error> { pub(crate) fn get_revisions_since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
if let Some(ref err) = self.cache_corrupted { if id >= self.revision {
Err(Error::CacheCorrupted(err.clone())) return Some(Vec::new());
} else {
Ok(())
} }
} self.revision_cache.since(id)
pub(crate) fn get_data(&self) -> Result<&Value, Error> {
self.check_cache_corrupted()?;
Ok(&self.data)
}
fn get_data_mut(&mut self) -> Result<&mut Value, Error> {
self.check_cache_corrupted()?;
Ok(&mut self.data)
} }
pub async fn close(mut self) -> Result<(), Error> { pub async fn close(mut self) -> Result<(), Error> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@@ -123,86 +141,101 @@ impl Store {
self.file.unlock(true).map_err(|e| e.1)?; self.file.unlock(true).map_err(|e| e.1)?;
Ok(()) Ok(())
} }
pub(crate) fn exists<S: AsRef<str>, V: SegList>( pub(crate) fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
&self, ptr.get(&self.persistent).unwrap_or(&Value::Null) != &Value::Null
ptr: &JsonPointer<S, V>,
) -> Result<bool, Error> {
Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null)
} }
pub(crate) fn keys<S: AsRef<str>, V: SegList>( pub(crate) fn keys<S: AsRef<str>, V: SegList>(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
) -> Result<BTreeSet<String>, Error> { ) -> BTreeSet<String> {
Ok(match ptr.get(self.get_data()?).unwrap_or(&Value::Null) { match ptr.get(&self.persistent).unwrap_or(&Value::Null) {
Value::Object(o) => o.keys().cloned().collect(), Value::Object(o) => o.keys().cloned().collect(),
_ => BTreeSet::new(), _ => BTreeSet::new(),
}) }
}
pub(crate) fn get_value<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Value {
ptr.get(&self.persistent).unwrap_or(&Value::Null).clone()
} }
pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>( pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
) -> Result<T, Error> { ) -> Result<T, Error> {
Ok(serde_json::from_value( Ok(serde_json::from_value(self.get_value(ptr))?)
ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(),
)?)
} }
pub(crate) fn dump(&self) -> Dump { pub(crate) fn dump(&self) -> Result<Dump, Error> {
Dump { Ok(Dump {
id: self.revision, id: self.revision,
value: self.get_data().unwrap().clone(), value: self.persistent.clone(),
} })
} }
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>( pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &T, value: &T,
expire_id: Option<String>,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
let mut patch = diff( let mut patch = diff(
ptr.get(self.get_data()?).unwrap_or(&Value::Null), ptr.get(&self.persistent).unwrap_or(&Value::Null),
&serde_json::to_value(value)?, &serde_json::to_value(value)?,
); );
patch.prepend(ptr); patch.prepend(ptr);
self.apply(patch, expire_id).await self.apply(patch).await
} }
pub(crate) async fn apply( pub(crate) async fn compress(&mut self) -> Result<(), Error> {
&mut self, use tokio::io::AsyncWriteExt;
patch: DiffPatch, let bak = self.path.with_extension("bak");
expire_id: Option<String>, let bak_tmp = bak.with_extension("bak.tmp");
) -> Result<Option<Arc<Revision>>, Error> { let mut backup_file = File::create(&bak_tmp).await?;
let revision_cbor = serde_cbor::to_vec(&self.revision)?;
let data_cbor = serde_cbor::to_vec(&self.persistent)?;
backup_file.write_all(&revision_cbor).await?;
backup_file.write_all(&data_cbor).await?;
backup_file.flush().await?;
backup_file.sync_all().await?;
tokio::fs::rename(&bak_tmp, &bak).await?;
self.file.set_len(0).await?;
self.file.seek(SeekFrom::Start(0)).await?;
self.file.write_all(&revision_cbor).await?;
self.file.write_all(&data_cbor).await?;
self.file.flush().await?;
self.file.sync_all().await?;
tokio::fs::remove_file(&bak).await?;
Ok(())
}
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
if (patch.0).0.is_empty() && expire_id.is_none() { if (patch.0).0.is_empty() {
return Ok(None); return Ok(None);
} }
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
tracing::trace!("Attempting to apply patch: {:?}", patch); tracing::trace!("Attempting to apply patch: {:?}", patch);
self.check_cache_corrupted()?;
let patch_bin = serde_cbor::to_vec(&*patch)?; let patch_bin = serde_cbor::to_vec(&*patch)?;
json_patch::patch(self.get_data_mut()?, &*patch)?; let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?;
async fn sync_to_disk(file: &mut File, patch_bin: &[u8]) -> Result<(), IOError> {
file.write_all(patch_bin).await?;
file.flush().await?;
file.sync_all().await?;
Ok(())
}
if let Err(e) = sync_to_disk(&mut *self.file, &patch_bin).await {
let e = Arc::new(e);
self.cache_corrupted = Some(e.clone());
return Err(Error::CacheCorrupted(e));
// TODO: try to recover.
}
self.revision += 1; self.revision += 1;
async {
let file_len = self.file.metadata().await?.len();
if file_len != self.file_cursor {
self.file.set_len(self.file_cursor).await?;
self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
}
self.file.write_all(&patch_bin).await?;
self.file.flush().await?;
self.file.sync_all().await?;
self.file_cursor = self.file.stream_position().await?;
Ok::<_, Error>(())
}
.await
.map_err(|e| {
persistent_undo.apply(&mut self.persistent);
self.revision -= 1;
e
})?;
let id = self.revision; let id = self.revision;
let res = Arc::new(Revision { let res = Arc::new(Revision { id, patch });
id, self.revision_cache.push(res.clone());
patch,
expire_id,
});
Ok(Some(res)) Ok(Some(res))
} }
@@ -211,13 +244,13 @@ impl Store {
#[derive(Clone)] #[derive(Clone)]
pub struct PatchDb { pub struct PatchDb {
pub(crate) store: Arc<RwLock<Store>>, pub(crate) store: Arc<RwLock<Store>>,
subscriber: Arc<Sender<Arc<Revision>>>, subscriber: Arc<(Sender<Arc<Revision>>, Receiver<Arc<Revision>>)>,
pub(crate) locker: Arc<Locker>, pub(crate) locker: Arc<Locker>,
handle_id: Arc<AtomicU64>, handle_id: Arc<AtomicU64>,
} }
impl PatchDb { impl PatchDb {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> { pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let (subscriber, _) = tokio::sync::broadcast::channel(4096); // TODO: make this unbounded let subscriber = barrage::unbounded();
Ok(PatchDb { Ok(PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)), store: Arc::new(RwLock::new(Store::open(path).await?)),
@@ -226,26 +259,34 @@ impl PatchDb {
handle_id: Arc::new(AtomicU64::new(0)), handle_id: Arc::new(AtomicU64::new(0)),
}) })
} }
pub async fn dump(&self) -> Dump { pub async fn sync(&self, sequence: u64) -> Result<Result<Vec<Arc<Revision>>, Dump>, Error> {
let store = self.store.read().await;
if let Some(revs) = store.get_revisions_since(sequence) {
Ok(Ok(revs))
} else {
Ok(Err(store.dump()?))
}
}
pub async fn dump(&self) -> Result<Dump, Error> {
self.store.read().await.dump() self.store.read().await.dump()
} }
pub async fn dump_and_sub(&self) -> (Dump, Receiver<Arc<Revision>>) { pub async fn dump_and_sub(&self) -> Result<(Dump, Receiver<Arc<Revision>>), Error> {
let store = self.store.read().await; let store = self.store.read().await;
let sub = self.subscriber.subscribe(); let sub = self.subscriber.1.clone();
(store.dump(), sub) Ok((store.dump()?, sub))
} }
pub async fn exists<S: AsRef<str>, V: SegList>( pub async fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
&self,
ptr: &JsonPointer<S, V>,
) -> Result<bool, Error> {
self.store.read().await.exists(ptr) self.store.read().await.exists(ptr)
} }
pub async fn keys<S: AsRef<str>, V: SegList>( pub async fn keys<S: AsRef<str>, V: SegList>(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
) -> Result<BTreeSet<String>, Error> { ) -> BTreeSet<String> {
self.store.read().await.keys(ptr) self.store.read().await.keys(ptr)
} }
pub async fn get_value<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Value {
self.store.read().await.get_value(ptr)
}
pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>( pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
@@ -256,19 +297,17 @@ impl PatchDb {
&self, &self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
value: &T, value: &T,
expire_id: Option<String>,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
let mut store = self.store.write().await; let mut store = self.store.write().await;
let rev = store.put(ptr, value, expire_id).await?; let rev = store.put(ptr, value).await?;
if let Some(rev) = rev.as_ref() { if let Some(rev) = rev.as_ref() {
self.subscriber.send(rev.clone()).unwrap_or_default(); self.subscriber.0.send(rev.clone()).unwrap_or_default();
} }
Ok(rev) Ok(rev)
} }
pub async fn apply( pub async fn apply(
&self, &self,
patch: DiffPatch, patch: DiffPatch,
expire_id: Option<String>,
store_write_lock: Option<RwLockWriteGuard<'_, Store>>, store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
) -> Result<Option<Arc<Revision>>, Error> { ) -> Result<Option<Arc<Revision>>, Error> {
let mut store = if let Some(store_write_lock) = store_write_lock { let mut store = if let Some(store_write_lock) = store_write_lock {
@@ -276,14 +315,14 @@ impl PatchDb {
} else { } else {
self.store.write().await self.store.write().await
}; };
let rev = store.apply(patch, expire_id).await?; let rev = store.apply(patch).await?;
if let Some(rev) = rev.as_ref() { if let Some(rev) = rev.as_ref() {
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors self.subscriber.0.send(rev.clone()).unwrap_or_default(); // ignore errors
} }
Ok(rev) Ok(rev)
} }
pub fn subscribe(&self) -> Receiver<Arc<Revision>> { pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.subscriber.subscribe() self.subscriber.1.clone()
} }
pub fn handle(&self) -> PatchDbHandle { pub fn handle(&self) -> PatchDbHandle {
PatchDbHandle { PatchDbHandle {

View File

@@ -2,11 +2,10 @@ use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use barrage::Receiver;
use json_ptr::{JsonPointer, SegList}; use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::patch::{DiffPatch, Revision}; use crate::patch::{DiffPatch, Revision};
@@ -26,28 +25,21 @@ pub struct Transaction<Parent: DbHandle> {
pub(crate) sub: Receiver<Arc<Revision>>, pub(crate) sub: Receiver<Arc<Revision>>,
} }
impl Transaction<&mut PatchDbHandle> { impl Transaction<&mut PatchDbHandle> {
pub async fn commit( pub async fn commit(mut self) -> Result<Option<Arc<Revision>>, Error> {
mut self, if (self.updates.0).0.is_empty() {
expire_id: Option<String>,
) -> Result<Option<Arc<Revision>>, Error> {
if (self.updates.0).0.is_empty() && expire_id.is_none() {
Ok(None) Ok(None)
} else { } else {
let store_lock = self.parent.store(); let store_lock = self.parent.store();
let store = store_lock.write().await; let store = store_lock.write().await;
self.rebase()?; self.rebase();
let rev = self let rev = self.parent.db.apply(self.updates, Some(store)).await?;
.parent
.db
.apply(self.updates, expire_id, Some(store))
.await?;
Ok(rev) Ok(rev)
} }
} }
pub async fn abort(mut self) -> Result<DiffPatch, Error> { pub async fn abort(mut self) -> Result<DiffPatch, Error> {
let store_lock = self.parent.store(); let store_lock = self.parent.store();
let _store = store_lock.read().await; let _store = store_lock.read().await;
self.rebase()?; self.rebase();
Ok(self.updates) Ok(self.updates)
} }
} }
@@ -55,7 +47,7 @@ impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
pub async fn save(mut self) -> Result<(), Error> { pub async fn save(mut self) -> Result<(), Error> {
let store_lock = self.parent.store(); let store_lock = self.parent.store();
let store = store_lock.write().await; let store = store_lock.write().await;
self.rebase()?; self.rebase();
self.parent.apply(self.updates, Some(store)).await?; self.parent.apply(self.updates, Some(store)).await?;
Ok(()) Ok(())
} }
@@ -65,7 +57,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> { async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
let store_lock = self.parent.store(); let store_lock = self.parent.store();
let store = store_lock.read().await; let store = store_lock.read().await;
self.rebase()?; self.rebase();
let sub = self.parent.subscribe(); let sub = self.parent.subscribe();
drop(store); drop(store);
Ok(Transaction { Ok(Transaction {
@@ -79,16 +71,11 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
fn id(&self) -> HandleId { fn id(&self) -> HandleId {
self.id.clone() self.id.clone()
} }
fn rebase(&mut self) -> Result<(), Error> { fn rebase(&mut self) {
self.parent.rebase()?; self.parent.rebase();
while let Some(rev) = match self.sub.try_recv() { while let Some(rev) = self.sub.try_recv().unwrap() {
Ok(a) => Some(a),
Err(TryRecvError::Empty) => None,
Err(e) => return Err(e.into()),
} {
self.updates.rebase(&rev.patch); self.updates.rebase(&rev.patch);
} }
Ok(())
} }
fn store(&self) -> Arc<RwLock<Store>> { fn store(&self) -> Arc<RwLock<Store>> {
self.parent.store() self.parent.store()
@@ -103,7 +90,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<bool, Error> { ) -> bool {
let exists = { let exists = {
let store_lock = self.parent.store(); let store_lock = self.parent.store();
let store = if let Some(store_read_lock) = store_read_lock { let store = if let Some(store_read_lock) = store_read_lock {
@@ -111,16 +98,16 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
} else { } else {
store_lock.read().await store_lock.read().await
}; };
self.rebase()?; self.rebase();
self.parent.exists(ptr, Some(store)).await? self.parent.exists(ptr, Some(store)).await
}; };
Ok(self.updates.for_path(ptr).exists().unwrap_or(exists)) self.updates.for_path(ptr).exists().unwrap_or(exists)
} }
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
store_read_lock: Option<RwLockReadGuard<'_, Store>>, store_read_lock: Option<RwLockReadGuard<'_, Store>>,
) -> Result<BTreeSet<String>, Error> { ) -> BTreeSet<String> {
let keys = { let keys = {
let store_lock = self.parent.store(); let store_lock = self.parent.store();
let store = if let Some(store_read_lock) = store_read_lock { let store = if let Some(store_read_lock) = store_read_lock {
@@ -128,10 +115,10 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
} else { } else {
store_lock.read().await store_lock.read().await
}; };
self.rebase()?; self.rebase();
self.parent.keys(ptr, Some(store)).await? self.parent.keys(ptr, Some(store)).await
}; };
Ok(self.updates.for_path(ptr).keys(keys)) self.updates.for_path(ptr).keys(keys)
} }
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>( async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self, &mut self,
@@ -145,7 +132,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
} else { } else {
store_lock.read().await store_lock.read().await
}; };
self.rebase()?; self.rebase();
self.parent.get_value(ptr, Some(store)).await? self.parent.get_value(ptr, Some(store)).await?
}; };
let path_updates = self.updates.for_path(ptr); let path_updates = self.updates.for_path(ptr);