mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
add revision cache and clean up todos
This commit is contained in:
Submodule json-patch updated: c73bbd19b8...6e482d3af4
@@ -18,6 +18,7 @@ unstable = []
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1.42"
|
||||
barrage = "0.2.3"
|
||||
fd-lock-rs = "0.1.3"
|
||||
futures = "0.3.8"
|
||||
imbl = "1.0.1"
|
||||
|
||||
@@ -107,7 +107,7 @@ where
|
||||
binds: &[&str],
|
||||
) -> Result<Option<T>, Error> {
|
||||
let path = self.lock.glob.as_pointer(binds);
|
||||
if !db_handle.exists(&path, None).await? {
|
||||
if !db_handle.exists(&path, None).await {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(db_handle.get(&path).await?))
|
||||
|
||||
@@ -2,10 +2,10 @@ use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use barrage::Receiver;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use crate::{
|
||||
@@ -19,6 +19,7 @@ use crate::{Error, Locker, PatchDb, Revision, Store, Transaction};
|
||||
pub struct HandleId {
|
||||
pub(crate) id: u64,
|
||||
#[cfg(feature = "trace")]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) trace: Option<Arc<tracing_error::SpanTrace>>,
|
||||
}
|
||||
impl PartialEq for HandleId {
|
||||
@@ -45,7 +46,7 @@ pub trait DbHandle: Send + Sync + Sized {
|
||||
locks: impl IntoIterator<Item = bulk_locks::LockTargetId> + Send + Sync + Clone + 'a,
|
||||
) -> Result<bulk_locks::Verifier, Error>;
|
||||
fn id(&self) -> HandleId;
|
||||
fn rebase(&mut self) -> Result<(), Error>;
|
||||
fn rebase(&mut self);
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
fn locker(&self) -> &Locker;
|
||||
@@ -53,12 +54,12 @@ pub trait DbHandle: Send + Sync + Sized {
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error>;
|
||||
) -> bool;
|
||||
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<BTreeSet<String>, Error>;
|
||||
) -> BTreeSet<String>;
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
@@ -113,7 +114,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
fn id(&self) -> HandleId {
|
||||
(**self).id()
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
fn rebase(&mut self) {
|
||||
(**self).rebase()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
@@ -129,14 +130,14 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error> {
|
||||
) -> bool {
|
||||
(*self).exists(ptr, store_read_lock).await
|
||||
}
|
||||
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<BTreeSet<String>, Error> {
|
||||
) -> BTreeSet<String> {
|
||||
(*self).keys(ptr, store_read_lock).await
|
||||
}
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
@@ -220,9 +221,7 @@ impl DbHandle for PatchDbHandle {
|
||||
fn id(&self) -> HandleId {
|
||||
self.id.clone()
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn rebase(&mut self) {}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.db.store.clone()
|
||||
}
|
||||
@@ -236,7 +235,7 @@ impl DbHandle for PatchDbHandle {
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error> {
|
||||
) -> bool {
|
||||
if let Some(lock) = store_read_lock {
|
||||
lock.exists(ptr)
|
||||
} else {
|
||||
@@ -247,7 +246,7 @@ impl DbHandle for PatchDbHandle {
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<BTreeSet<String>, Error> {
|
||||
) -> BTreeSet<String> {
|
||||
if let Some(lock) = store_read_lock {
|
||||
lock.keys(ptr)
|
||||
} else {
|
||||
@@ -259,25 +258,25 @@ impl DbHandle for PatchDbHandle {
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<Value, Error> {
|
||||
if let Some(lock) = store_read_lock {
|
||||
lock.get(ptr)
|
||||
Ok(if let Some(lock) = store_read_lock {
|
||||
lock.get_value(ptr)
|
||||
} else {
|
||||
self.db.get(ptr).await
|
||||
}
|
||||
self.db.get_value(ptr).await
|
||||
})
|
||||
}
|
||||
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
self.db.put(ptr, value, None).await
|
||||
self.db.put(ptr, value).await
|
||||
}
|
||||
async fn apply(
|
||||
&mut self,
|
||||
patch: DiffPatch,
|
||||
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
self.db.apply(patch, None, store_write_lock).await
|
||||
self.db.apply(patch, store_write_lock).await
|
||||
}
|
||||
async fn lock(&mut self, ptr: JsonGlob, lock_type: LockType) -> Result<(), Error> {
|
||||
self.locks
|
||||
@@ -303,7 +302,7 @@ impl DbHandle for PatchDbHandle {
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
self.db.put(ptr, value, None).await
|
||||
self.db.put(ptr, value).await
|
||||
}
|
||||
|
||||
async fn lock_all<'a>(
|
||||
@@ -337,7 +336,7 @@ pub mod test_utils {
|
||||
fn id(&self) -> HandleId {
|
||||
unimplemented!()
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
fn rebase(&mut self) {
|
||||
unimplemented!()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
@@ -353,14 +352,14 @@ pub mod test_utils {
|
||||
&mut self,
|
||||
_ptr: &JsonPointer<S, V>,
|
||||
_store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error> {
|
||||
) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
_ptr: &JsonPointer<S, V>,
|
||||
_store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<BTreeSet<String>, Error> {
|
||||
) -> BTreeSet<String> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::io::Error as IOError;
|
||||
use std::sync::Arc;
|
||||
|
||||
use barrage::Disconnected;
|
||||
use json_ptr::JsonPointer;
|
||||
use locker::LockError;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
|
||||
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
|
||||
|
||||
@@ -34,6 +34,8 @@ pub use {json_patch, json_ptr};
|
||||
|
||||
pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier};
|
||||
|
||||
pub type Subscriber = barrage::Receiver<Arc<Revision>>;
|
||||
|
||||
pub mod test_utils {
|
||||
use super::*;
|
||||
pub use handle::test_utils::*;
|
||||
@@ -56,9 +58,9 @@ pub enum Error {
|
||||
#[error("FD Lock Error: {0}")]
|
||||
FDLock(#[from] fd_lock_rs::Error),
|
||||
#[error("Database Cache Corrupted: {0}")]
|
||||
CacheCorrupted(Arc<IOError>),
|
||||
#[error("Subscriber Error: {0}")]
|
||||
Subscriber(#[from] TryRecvError),
|
||||
CacheCorrupted(Arc<Error>),
|
||||
#[error("Subscriber Error: {0:?}")]
|
||||
Subscriber(Disconnected),
|
||||
#[error("Node Does Not Exist: {0}")]
|
||||
NodeDoesNotExist(JsonPointer),
|
||||
#[error("Invalid Lock Request: {0}")]
|
||||
@@ -66,3 +68,8 @@ pub enum Error {
|
||||
#[error("Invalid Lock Request: {0}")]
|
||||
Locker(String),
|
||||
}
|
||||
impl From<Disconnected> for Error {
|
||||
fn from(e: Disconnected) -> Self {
|
||||
Error::Subscriber(e)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,7 +353,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
db.lock(self.0.as_ref().clone().into(), LockType::Exist)
|
||||
.await?;
|
||||
}
|
||||
Ok(db.exists(self.as_ref(), None).await?)
|
||||
Ok(db.exists(self.as_ref(), None).await)
|
||||
}
|
||||
|
||||
pub fn map<
|
||||
@@ -626,7 +626,7 @@ where
|
||||
db.lock(self.json_ptr().clone().into(), LockType::Exist)
|
||||
.await?;
|
||||
}
|
||||
let set = db.keys(self.json_ptr(), None).await?;
|
||||
let set = db.keys(self.json_ptr(), None).await;
|
||||
Ok(set
|
||||
.into_iter()
|
||||
.map(|s| serde_json::from_value(Value::String(s)))
|
||||
@@ -635,7 +635,7 @@ where
|
||||
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
|
||||
db.lock(self.as_ref().clone().into(), LockType::Write)
|
||||
.await?;
|
||||
if db.exists(self.clone().idx(key).as_ref(), None).await? {
|
||||
if db.exists(self.clone().idx(key).as_ref(), None).await {
|
||||
db.apply(
|
||||
DiffPatch(Patch(vec![PatchOperation::Remove(RemoveOperation {
|
||||
path: self.as_ref().clone().join_end(key.as_ref()),
|
||||
|
||||
@@ -11,7 +11,6 @@ use std::collections::BTreeSet;
|
||||
pub struct Revision {
|
||||
pub id: u64,
|
||||
pub patch: DiffPatch,
|
||||
pub expire_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Error as IOError;
|
||||
use std::io::SeekFrom;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
|
||||
use barrage::{Receiver, Sender};
|
||||
use fd_lock_rs::FdLock;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use lazy_static::lazy_static;
|
||||
@@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::BTreeSet;
|
||||
use tokio::fs::File;
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
|
||||
|
||||
use crate::handle::HandleId;
|
||||
@@ -24,12 +25,46 @@ lazy_static! {
|
||||
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub struct RevisionCache {
|
||||
cache: VecDeque<Arc<Revision>>,
|
||||
capacity: usize,
|
||||
}
|
||||
impl RevisionCache {
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
RevisionCache {
|
||||
cache: VecDeque::with_capacity(capacity),
|
||||
capacity,
|
||||
}
|
||||
}
|
||||
pub fn push(&mut self, revision: Arc<Revision>) {
|
||||
while self.capacity > 0 && self.cache.len() >= self.capacity {
|
||||
self.cache.pop_front();
|
||||
}
|
||||
self.cache.push_back(revision);
|
||||
}
|
||||
pub fn since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
|
||||
let start = self.cache.get(0).map(|rev| rev.id)?;
|
||||
if id < start - 1 {
|
||||
return None;
|
||||
}
|
||||
Some(
|
||||
self.cache
|
||||
.iter()
|
||||
.skip((id - start + 1) as usize)
|
||||
.cloned()
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Store {
|
||||
path: PathBuf,
|
||||
file: FdLock<File>,
|
||||
file_cursor: u64,
|
||||
_lock: OwnedMutexGuard<()>,
|
||||
cache_corrupted: Option<Arc<IOError>>,
|
||||
data: Value,
|
||||
persistent: Value,
|
||||
revision: u64,
|
||||
revision_cache: RevisionCache,
|
||||
}
|
||||
impl Store {
|
||||
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
@@ -50,8 +85,8 @@ impl Store {
|
||||
path,
|
||||
)
|
||||
};
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
use std::io::Write;
|
||||
let mut res = tokio::task::spawn_blocking(move || {
|
||||
use std::io::Seek;
|
||||
|
||||
let bak = path.with_extension("bak");
|
||||
if bak.exists() {
|
||||
@@ -70,50 +105,33 @@ impl Store {
|
||||
serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f));
|
||||
let mut revision: u64 = stream.next().transpose()?.unwrap_or(0);
|
||||
let mut stream = stream.change_output_type();
|
||||
let mut data = stream.next().transpose()?.unwrap_or_else(|| Value::Null);
|
||||
let mut persistent = stream.next().transpose()?.unwrap_or_else(|| Value::Null);
|
||||
let mut stream = stream.change_output_type();
|
||||
while let Some(Ok(patch)) = stream.next() {
|
||||
json_patch::patch(&mut data, &patch)?;
|
||||
json_patch::patch(&mut persistent, &patch)?;
|
||||
revision += 1;
|
||||
}
|
||||
let bak_tmp = bak.with_extension("bak.tmp");
|
||||
let mut backup_file = std::fs::File::create(&bak_tmp)?;
|
||||
serde_cbor::to_writer(&mut backup_file, &revision)?;
|
||||
serde_cbor::to_writer(&mut backup_file, &data)?;
|
||||
backup_file.flush()?;
|
||||
backup_file.sync_all()?;
|
||||
std::fs::rename(&bak_tmp, &bak)?;
|
||||
f.set_len(0)?;
|
||||
serde_cbor::to_writer(&mut *f, &revision)?;
|
||||
serde_cbor::to_writer(&mut *f, &data)?;
|
||||
f.flush()?;
|
||||
f.sync_all()?;
|
||||
std::fs::remove_file(&bak)?;
|
||||
let file_cursor = f.stream_position()?;
|
||||
|
||||
Ok::<_, Error>(Store {
|
||||
path,
|
||||
file: f.map(File::from_std),
|
||||
file_cursor,
|
||||
_lock,
|
||||
cache_corrupted: None,
|
||||
data,
|
||||
persistent,
|
||||
revision,
|
||||
revision_cache: RevisionCache::with_capacity(64),
|
||||
})
|
||||
})
|
||||
.await??)
|
||||
.await??;
|
||||
res.compress().await?;
|
||||
Ok(res)
|
||||
}
|
||||
fn check_cache_corrupted(&self) -> Result<(), Error> {
|
||||
if let Some(ref err) = self.cache_corrupted {
|
||||
Err(Error::CacheCorrupted(err.clone()))
|
||||
} else {
|
||||
Ok(())
|
||||
pub(crate) fn get_revisions_since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
|
||||
if id >= self.revision {
|
||||
return Some(Vec::new());
|
||||
}
|
||||
}
|
||||
pub(crate) fn get_data(&self) -> Result<&Value, Error> {
|
||||
self.check_cache_corrupted()?;
|
||||
Ok(&self.data)
|
||||
}
|
||||
fn get_data_mut(&mut self) -> Result<&mut Value, Error> {
|
||||
self.check_cache_corrupted()?;
|
||||
Ok(&mut self.data)
|
||||
self.revision_cache.since(id)
|
||||
}
|
||||
pub async fn close(mut self) -> Result<(), Error> {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -123,86 +141,101 @@ impl Store {
|
||||
self.file.unlock(true).map_err(|e| e.1)?;
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) fn exists<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<bool, Error> {
|
||||
Ok(ptr.get(self.get_data()?).unwrap_or(&Value::Null) != &Value::Null)
|
||||
pub(crate) fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
|
||||
ptr.get(&self.persistent).unwrap_or(&Value::Null) != &Value::Null
|
||||
}
|
||||
pub(crate) fn keys<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<BTreeSet<String>, Error> {
|
||||
Ok(match ptr.get(self.get_data()?).unwrap_or(&Value::Null) {
|
||||
) -> BTreeSet<String> {
|
||||
match ptr.get(&self.persistent).unwrap_or(&Value::Null) {
|
||||
Value::Object(o) => o.keys().cloned().collect(),
|
||||
_ => BTreeSet::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
pub(crate) fn get_value<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Value {
|
||||
ptr.get(&self.persistent).unwrap_or(&Value::Null).clone()
|
||||
}
|
||||
pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<T, Error> {
|
||||
Ok(serde_json::from_value(
|
||||
ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(),
|
||||
)?)
|
||||
Ok(serde_json::from_value(self.get_value(ptr))?)
|
||||
}
|
||||
pub(crate) fn dump(&self) -> Dump {
|
||||
Dump {
|
||||
pub(crate) fn dump(&self) -> Result<Dump, Error> {
|
||||
Ok(Dump {
|
||||
id: self.revision,
|
||||
value: self.get_data().unwrap().clone(),
|
||||
}
|
||||
value: self.persistent.clone(),
|
||||
})
|
||||
}
|
||||
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
expire_id: Option<String>,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
let mut patch = diff(
|
||||
ptr.get(self.get_data()?).unwrap_or(&Value::Null),
|
||||
ptr.get(&self.persistent).unwrap_or(&Value::Null),
|
||||
&serde_json::to_value(value)?,
|
||||
);
|
||||
patch.prepend(ptr);
|
||||
self.apply(patch, expire_id).await
|
||||
self.apply(patch).await
|
||||
}
|
||||
pub(crate) async fn apply(
|
||||
&mut self,
|
||||
patch: DiffPatch,
|
||||
expire_id: Option<String>,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
pub(crate) async fn compress(&mut self) -> Result<(), Error> {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
let bak = self.path.with_extension("bak");
|
||||
let bak_tmp = bak.with_extension("bak.tmp");
|
||||
let mut backup_file = File::create(&bak_tmp).await?;
|
||||
let revision_cbor = serde_cbor::to_vec(&self.revision)?;
|
||||
let data_cbor = serde_cbor::to_vec(&self.persistent)?;
|
||||
backup_file.write_all(&revision_cbor).await?;
|
||||
backup_file.write_all(&data_cbor).await?;
|
||||
backup_file.flush().await?;
|
||||
backup_file.sync_all().await?;
|
||||
tokio::fs::rename(&bak_tmp, &bak).await?;
|
||||
self.file.set_len(0).await?;
|
||||
self.file.seek(SeekFrom::Start(0)).await?;
|
||||
self.file.write_all(&revision_cbor).await?;
|
||||
self.file.write_all(&data_cbor).await?;
|
||||
self.file.flush().await?;
|
||||
self.file.sync_all().await?;
|
||||
tokio::fs::remove_file(&bak).await?;
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
if (patch.0).0.is_empty() && expire_id.is_none() {
|
||||
if (patch.0).0.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::trace!("Attempting to apply patch: {:?}", patch);
|
||||
|
||||
self.check_cache_corrupted()?;
|
||||
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
||||
json_patch::patch(self.get_data_mut()?, &*patch)?;
|
||||
|
||||
async fn sync_to_disk(file: &mut File, patch_bin: &[u8]) -> Result<(), IOError> {
|
||||
file.write_all(patch_bin).await?;
|
||||
file.flush().await?;
|
||||
file.sync_all().await?;
|
||||
Ok(())
|
||||
}
|
||||
if let Err(e) = sync_to_disk(&mut *self.file, &patch_bin).await {
|
||||
let e = Arc::new(e);
|
||||
self.cache_corrupted = Some(e.clone());
|
||||
return Err(Error::CacheCorrupted(e));
|
||||
// TODO: try to recover.
|
||||
}
|
||||
|
||||
let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?;
|
||||
self.revision += 1;
|
||||
async {
|
||||
let file_len = self.file.metadata().await?.len();
|
||||
if file_len != self.file_cursor {
|
||||
self.file.set_len(self.file_cursor).await?;
|
||||
self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
|
||||
}
|
||||
self.file.write_all(&patch_bin).await?;
|
||||
self.file.flush().await?;
|
||||
self.file.sync_all().await?;
|
||||
self.file_cursor = self.file.stream_position().await?;
|
||||
Ok::<_, Error>(())
|
||||
}
|
||||
.await
|
||||
.map_err(|e| {
|
||||
persistent_undo.apply(&mut self.persistent);
|
||||
self.revision -= 1;
|
||||
e
|
||||
})?;
|
||||
|
||||
let id = self.revision;
|
||||
let res = Arc::new(Revision {
|
||||
id,
|
||||
patch,
|
||||
expire_id,
|
||||
});
|
||||
let res = Arc::new(Revision { id, patch });
|
||||
self.revision_cache.push(res.clone());
|
||||
|
||||
Ok(Some(res))
|
||||
}
|
||||
@@ -211,13 +244,13 @@ impl Store {
|
||||
#[derive(Clone)]
|
||||
pub struct PatchDb {
|
||||
pub(crate) store: Arc<RwLock<Store>>,
|
||||
subscriber: Arc<Sender<Arc<Revision>>>,
|
||||
subscriber: Arc<(Sender<Arc<Revision>>, Receiver<Arc<Revision>>)>,
|
||||
pub(crate) locker: Arc<Locker>,
|
||||
handle_id: Arc<AtomicU64>,
|
||||
}
|
||||
impl PatchDb {
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
let (subscriber, _) = tokio::sync::broadcast::channel(4096); // TODO: make this unbounded
|
||||
let subscriber = barrage::unbounded();
|
||||
|
||||
Ok(PatchDb {
|
||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||
@@ -226,26 +259,34 @@ impl PatchDb {
|
||||
handle_id: Arc::new(AtomicU64::new(0)),
|
||||
})
|
||||
}
|
||||
pub async fn dump(&self) -> Dump {
|
||||
pub async fn sync(&self, sequence: u64) -> Result<Result<Vec<Arc<Revision>>, Dump>, Error> {
|
||||
let store = self.store.read().await;
|
||||
if let Some(revs) = store.get_revisions_since(sequence) {
|
||||
Ok(Ok(revs))
|
||||
} else {
|
||||
Ok(Err(store.dump()?))
|
||||
}
|
||||
}
|
||||
pub async fn dump(&self) -> Result<Dump, Error> {
|
||||
self.store.read().await.dump()
|
||||
}
|
||||
pub async fn dump_and_sub(&self) -> (Dump, Receiver<Arc<Revision>>) {
|
||||
pub async fn dump_and_sub(&self) -> Result<(Dump, Receiver<Arc<Revision>>), Error> {
|
||||
let store = self.store.read().await;
|
||||
let sub = self.subscriber.subscribe();
|
||||
(store.dump(), sub)
|
||||
let sub = self.subscriber.1.clone();
|
||||
Ok((store.dump()?, sub))
|
||||
}
|
||||
pub async fn exists<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<bool, Error> {
|
||||
pub async fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
|
||||
self.store.read().await.exists(ptr)
|
||||
}
|
||||
pub async fn keys<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> Result<BTreeSet<String>, Error> {
|
||||
) -> BTreeSet<String> {
|
||||
self.store.read().await.keys(ptr)
|
||||
}
|
||||
pub async fn get_value<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Value {
|
||||
self.store.read().await.get_value(ptr)
|
||||
}
|
||||
pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
@@ -256,19 +297,17 @@ impl PatchDb {
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
expire_id: Option<String>,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
let mut store = self.store.write().await;
|
||||
let rev = store.put(ptr, value, expire_id).await?;
|
||||
let rev = store.put(ptr, value).await?;
|
||||
if let Some(rev) = rev.as_ref() {
|
||||
self.subscriber.send(rev.clone()).unwrap_or_default();
|
||||
self.subscriber.0.send(rev.clone()).unwrap_or_default();
|
||||
}
|
||||
Ok(rev)
|
||||
}
|
||||
pub async fn apply(
|
||||
&self,
|
||||
patch: DiffPatch,
|
||||
expire_id: Option<String>,
|
||||
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
let mut store = if let Some(store_write_lock) = store_write_lock {
|
||||
@@ -276,14 +315,14 @@ impl PatchDb {
|
||||
} else {
|
||||
self.store.write().await
|
||||
};
|
||||
let rev = store.apply(patch, expire_id).await?;
|
||||
let rev = store.apply(patch).await?;
|
||||
if let Some(rev) = rev.as_ref() {
|
||||
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors
|
||||
self.subscriber.0.send(rev.clone()).unwrap_or_default(); // ignore errors
|
||||
}
|
||||
Ok(rev)
|
||||
}
|
||||
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.subscriber.subscribe()
|
||||
self.subscriber.1.clone()
|
||||
}
|
||||
pub fn handle(&self) -> PatchDbHandle {
|
||||
PatchDbHandle {
|
||||
|
||||
@@ -2,11 +2,10 @@ use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use barrage::Receiver;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use crate::patch::{DiffPatch, Revision};
|
||||
@@ -26,28 +25,21 @@ pub struct Transaction<Parent: DbHandle> {
|
||||
pub(crate) sub: Receiver<Arc<Revision>>,
|
||||
}
|
||||
impl Transaction<&mut PatchDbHandle> {
|
||||
pub async fn commit(
|
||||
mut self,
|
||||
expire_id: Option<String>,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
if (self.updates.0).0.is_empty() && expire_id.is_none() {
|
||||
pub async fn commit(mut self) -> Result<Option<Arc<Revision>>, Error> {
|
||||
if (self.updates.0).0.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.write().await;
|
||||
self.rebase()?;
|
||||
let rev = self
|
||||
.parent
|
||||
.db
|
||||
.apply(self.updates, expire_id, Some(store))
|
||||
.await?;
|
||||
self.rebase();
|
||||
let rev = self.parent.db.apply(self.updates, Some(store)).await?;
|
||||
Ok(rev)
|
||||
}
|
||||
}
|
||||
pub async fn abort(mut self) -> Result<DiffPatch, Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let _store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
self.rebase();
|
||||
Ok(self.updates)
|
||||
}
|
||||
}
|
||||
@@ -55,7 +47,7 @@ impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
|
||||
pub async fn save(mut self) -> Result<(), Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.write().await;
|
||||
self.rebase()?;
|
||||
self.rebase();
|
||||
self.parent.apply(self.updates, Some(store)).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -65,7 +57,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
self.rebase();
|
||||
let sub = self.parent.subscribe();
|
||||
drop(store);
|
||||
Ok(Transaction {
|
||||
@@ -79,16 +71,11 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
fn id(&self) -> HandleId {
|
||||
self.id.clone()
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
self.parent.rebase()?;
|
||||
while let Some(rev) = match self.sub.try_recv() {
|
||||
Ok(a) => Some(a),
|
||||
Err(TryRecvError::Empty) => None,
|
||||
Err(e) => return Err(e.into()),
|
||||
} {
|
||||
fn rebase(&mut self) {
|
||||
self.parent.rebase();
|
||||
while let Some(rev) = self.sub.try_recv().unwrap() {
|
||||
self.updates.rebase(&rev.patch);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.parent.store()
|
||||
@@ -103,7 +90,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<bool, Error> {
|
||||
) -> bool {
|
||||
let exists = {
|
||||
let store_lock = self.parent.store();
|
||||
let store = if let Some(store_read_lock) = store_read_lock {
|
||||
@@ -111,16 +98,16 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
} else {
|
||||
store_lock.read().await
|
||||
};
|
||||
self.rebase()?;
|
||||
self.parent.exists(ptr, Some(store)).await?
|
||||
self.rebase();
|
||||
self.parent.exists(ptr, Some(store)).await
|
||||
};
|
||||
Ok(self.updates.for_path(ptr).exists().unwrap_or(exists))
|
||||
self.updates.for_path(ptr).exists().unwrap_or(exists)
|
||||
}
|
||||
async fn keys<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<BTreeSet<String>, Error> {
|
||||
) -> BTreeSet<String> {
|
||||
let keys = {
|
||||
let store_lock = self.parent.store();
|
||||
let store = if let Some(store_read_lock) = store_read_lock {
|
||||
@@ -128,10 +115,10 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
} else {
|
||||
store_lock.read().await
|
||||
};
|
||||
self.rebase()?;
|
||||
self.parent.keys(ptr, Some(store)).await?
|
||||
self.rebase();
|
||||
self.parent.keys(ptr, Some(store)).await
|
||||
};
|
||||
Ok(self.updates.for_path(ptr).keys(keys))
|
||||
self.updates.for_path(ptr).keys(keys)
|
||||
}
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
@@ -145,7 +132,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
} else {
|
||||
store_lock.read().await
|
||||
};
|
||||
self.rebase()?;
|
||||
self.rebase();
|
||||
self.parent.get_value(ptr, Some(store)).await?
|
||||
};
|
||||
let path_updates = self.updates.for_path(ptr);
|
||||
|
||||
Reference in New Issue
Block a user