diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 35e98a7..d299e4f 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeSet, HashMap, VecDeque}; use std::fs::OpenOptions; use std::io::SeekFrom; use std::path::{Path, PathBuf}; @@ -11,15 +11,14 @@ use json_ptr::{JsonPointer, SegList}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::collections::BTreeSet; use tokio::fs::File; use tokio::io::AsyncSeekExt; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard}; use crate::handle::HandleId; +use crate::locker::Locker; use crate::patch::{diff, DiffPatch, Dump, Revision}; -use crate::Error; -use crate::{locker::Locker, PatchDbHandle}; +use crate::{Error, PatchDbHandle}; lazy_static! { static ref OPEN_STORES: Mutex>>> = Mutex::new(HashMap::new()); @@ -216,24 +215,34 @@ impl Store { let patch_bin = serde_cbor::to_vec(&*patch)?; let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?; self.revision += 1; - async { - let file_len = self.file.metadata().await?.len(); - if file_len != self.file_cursor { - self.file.set_len(self.file_cursor).await?; - self.file.seek(SeekFrom::Start(self.file_cursor)).await?; + if let Err(_e) = if self.revision % 4096 == 0 { + self.compress().await + } else { + async { + let file_len = self.file.metadata().await?.len(); + if file_len != self.file_cursor { + self.file.set_len(self.file_cursor).await?; + self.file.seek(SeekFrom::Start(self.file_cursor)).await?; + } + self.file.write_all(&patch_bin).await?; + self.file.flush().await?; + self.file.sync_all().await?; + self.file_cursor = self.file.stream_position().await?; + Ok::<_, Error>(()) } - self.file.write_all(&patch_bin).await?; - self.file.flush().await?; - self.file.sync_all().await?; - self.file_cursor = self.file.stream_position().await?; - Ok::<_, Error>(()) - } - .await - .map_err(|e| { - persistent_undo.apply(&mut self.persistent); - self.revision -= 1; - e - })?; + .await + } { + #[cfg(feature = "tracing")] + tracing::error!("Error saving patch to disk: {}, attempting to compress", _e); + if let Err(e) = self.compress().await { + #[cfg(feature = "tracing")] + tracing::error!("Compression failed: {}", e); + persistent_undo.apply(&mut self.persistent); + self.revision -= 1; + return Err(e); + } + }; + drop(persistent_undo); let id = self.revision; let res = Arc::new(Revision { id, patch });