make Store::apply cancel safe

This commit is contained in:
Aiden McClelland
2022-10-24 16:01:56 -06:00
parent 6c3079786f
commit 079aade381

View File

@@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use fd_lock_rs::FdLock; use fd_lock_rs::FdLock;
use json_patch::PatchError;
use json_ptr::{JsonPointer, SegList}; use json_ptr::{JsonPointer, SegList};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -221,43 +222,63 @@ impl Store {
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> { pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
// eject if noop
if (patch.0).0.is_empty() { if (patch.0).0.is_empty() {
return Ok(None); return Ok(None);
} }
struct TentativeUpdated<'a> {
store: &'a mut Store,
undo: Option<json_patch::Undo<'a>>,
}
impl<'a> TentativeUpdated<'a> {
fn new(store: &'a mut Store, patch: &'a DiffPatch) -> Result<Self, PatchError> {
let undo = json_patch::patch(&mut store.persistent, &*patch)?;
store.revision += 1;
Ok(Self {
store,
undo: Some(undo),
})
}
}
impl<'a> Drop for TentativeUpdated<'a> {
fn drop(&mut self) {
if let Some(undo) = self.undo.take() {
undo.apply(&mut self.store.persistent);
self.store.revision -= 1;
}
}
}
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
tracing::trace!("Attempting to apply patch: {:?}", patch); tracing::trace!("Attempting to apply patch: {:?}", patch);
// apply patch in memory
let patch_bin = serde_cbor::to_vec(&*patch)?; let patch_bin = serde_cbor::to_vec(&*patch)?;
let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?; let mut updated = TentativeUpdated::new(self, &patch)?;
self.revision += 1;
if let Err(_e) = if self.revision % 4096 == 0 { if updated.store.revision % 4096 == 0 {
self.compress().await updated.store.compress().await?
} else { } else {
async { if updated.store.file.stream_position().await? != updated.store.file_cursor {
if self.file.stream_position().await? != self.file_cursor { updated
self.file.set_len(self.file_cursor).await?; .store
self.file.seek(SeekFrom::Start(self.file_cursor)).await?; .file
} .set_len(updated.store.file_cursor)
self.file.write_all(&patch_bin).await?; .await?;
self.file.flush().await?; updated
self.file.sync_all().await?; .store
self.file_cursor += patch_bin.len() as u64; .file
Ok::<_, Error>(()) .seek(SeekFrom::Start(updated.store.file_cursor))
.await?;
} }
.await updated.store.file.write_all(&patch_bin).await?;
} { updated.store.file.flush().await?;
#[cfg(feature = "tracing")] updated.store.file.sync_all().await?;
tracing::error!("Error saving patch to disk: {}, attempting to compress", _e); updated.store.file_cursor += patch_bin.len() as u64;
if let Err(e) = self.compress().await { }
#[cfg(feature = "tracing")] drop(updated.undo.take());
tracing::error!("Compression failed: {}", e); drop(updated);
persistent_undo.apply(&mut self.persistent);
self.revision -= 1;
return Err(e);
}
};
drop(persistent_undo);
let id = self.revision; let id = self.revision;
let res = Arc::new(Revision { id, patch }); let res = Arc::new(Revision { id, patch });