mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
make Store::apply cancel safe (#53)
This commit is contained in:
@@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU64;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use fd_lock_rs::FdLock;
|
use fd_lock_rs::FdLock;
|
||||||
|
use json_patch::PatchError;
|
||||||
use json_ptr::{JsonPointer, SegList};
|
use json_ptr::{JsonPointer, SegList};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -221,43 +222,63 @@ impl Store {
|
|||||||
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
// eject if noop
|
||||||
if (patch.0).0.is_empty() {
|
if (patch.0).0.is_empty() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct TentativeUpdated<'a> {
|
||||||
|
store: &'a mut Store,
|
||||||
|
undo: Option<json_patch::Undo<'a>>,
|
||||||
|
}
|
||||||
|
impl<'a> TentativeUpdated<'a> {
|
||||||
|
fn new(store: &'a mut Store, patch: &'a DiffPatch) -> Result<Self, PatchError> {
|
||||||
|
let undo = json_patch::patch(&mut store.persistent, &*patch)?;
|
||||||
|
store.revision += 1;
|
||||||
|
Ok(Self {
|
||||||
|
store,
|
||||||
|
undo: Some(undo),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<'a> Drop for TentativeUpdated<'a> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(undo) = self.undo.take() {
|
||||||
|
undo.apply(&mut self.store.persistent);
|
||||||
|
self.store.revision -= 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "tracing")]
|
#[cfg(feature = "tracing")]
|
||||||
tracing::trace!("Attempting to apply patch: {:?}", patch);
|
tracing::trace!("Attempting to apply patch: {:?}", patch);
|
||||||
|
|
||||||
|
// apply patch in memory
|
||||||
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
||||||
let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?;
|
let mut updated = TentativeUpdated::new(self, &patch)?;
|
||||||
self.revision += 1;
|
|
||||||
if let Err(_e) = if self.revision % 4096 == 0 {
|
if updated.store.revision % 4096 == 0 {
|
||||||
self.compress().await
|
updated.store.compress().await?
|
||||||
} else {
|
} else {
|
||||||
async {
|
if updated.store.file.stream_position().await? != updated.store.file_cursor {
|
||||||
if self.file.stream_position().await? != self.file_cursor {
|
updated
|
||||||
self.file.set_len(self.file_cursor).await?;
|
.store
|
||||||
self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
|
.file
|
||||||
}
|
.set_len(updated.store.file_cursor)
|
||||||
self.file.write_all(&patch_bin).await?;
|
.await?;
|
||||||
self.file.flush().await?;
|
updated
|
||||||
self.file.sync_all().await?;
|
.store
|
||||||
self.file_cursor += patch_bin.len() as u64;
|
.file
|
||||||
Ok::<_, Error>(())
|
.seek(SeekFrom::Start(updated.store.file_cursor))
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
.await
|
updated.store.file.write_all(&patch_bin).await?;
|
||||||
} {
|
updated.store.file.flush().await?;
|
||||||
#[cfg(feature = "tracing")]
|
updated.store.file.sync_all().await?;
|
||||||
tracing::error!("Error saving patch to disk: {}, attempting to compress", _e);
|
updated.store.file_cursor += patch_bin.len() as u64;
|
||||||
if let Err(e) = self.compress().await {
|
}
|
||||||
#[cfg(feature = "tracing")]
|
drop(updated.undo.take());
|
||||||
tracing::error!("Compression failed: {}", e);
|
drop(updated);
|
||||||
persistent_undo.apply(&mut self.persistent);
|
|
||||||
self.revision -= 1;
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
drop(persistent_undo);
|
|
||||||
|
|
||||||
let id = self.revision;
|
let id = self.revision;
|
||||||
let res = Arc::new(Revision { id, patch });
|
let res = Arc::new(Revision { id, patch });
|
||||||
|
|||||||
Reference in New Issue
Block a user