prefer compression as a failsafe (#48)

This commit is contained in:
Aiden McClelland
2022-09-22 08:40:06 -06:00
committed by GitHub
parent 4d987b1921
commit e74f36f073

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, VecDeque}; use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -11,15 +11,14 @@ use json_ptr::{JsonPointer, SegList};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use std::collections::BTreeSet;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncSeekExt; use tokio::io::AsyncSeekExt;
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard}; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
use crate::handle::HandleId; use crate::handle::HandleId;
use crate::locker::Locker;
use crate::patch::{diff, DiffPatch, Dump, Revision}; use crate::patch::{diff, DiffPatch, Dump, Revision};
use crate::Error; use crate::{Error, PatchDbHandle};
use crate::{locker::Locker, PatchDbHandle};
lazy_static! { lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new()); static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
@@ -216,24 +215,34 @@ impl Store {
let patch_bin = serde_cbor::to_vec(&*patch)?; let patch_bin = serde_cbor::to_vec(&*patch)?;
let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?; let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?;
self.revision += 1; self.revision += 1;
async { if let Err(_e) = if self.revision % 4096 == 0 {
let file_len = self.file.metadata().await?.len(); self.compress().await
if file_len != self.file_cursor { } else {
self.file.set_len(self.file_cursor).await?; async {
self.file.seek(SeekFrom::Start(self.file_cursor)).await?; let file_len = self.file.metadata().await?.len();
if file_len != self.file_cursor {
self.file.set_len(self.file_cursor).await?;
self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
}
self.file.write_all(&patch_bin).await?;
self.file.flush().await?;
self.file.sync_all().await?;
self.file_cursor = self.file.stream_position().await?;
Ok::<_, Error>(())
} }
self.file.write_all(&patch_bin).await?; .await
self.file.flush().await?; } {
self.file.sync_all().await?; #[cfg(feature = "tracing")]
self.file_cursor = self.file.stream_position().await?; tracing::error!("Error saving patch to disk: {}, attempting to compress", _e);
Ok::<_, Error>(()) if let Err(e) = self.compress().await {
} #[cfg(feature = "tracing")]
.await tracing::error!("Compression failed: {}", e);
.map_err(|e| { persistent_undo.apply(&mut self.persistent);
persistent_undo.apply(&mut self.persistent); self.revision -= 1;
self.revision -= 1; return Err(e);
e }
})?; };
drop(persistent_undo);
let id = self.revision; let id = self.revision;
let res = Arc::new(Revision { id, patch }); let res = Arc::new(Revision { id, patch });