mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
prefer compression as a failsafe
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{BTreeSet, HashMap, VecDeque};
|
||||||
use std::fs::OpenOptions;
|
use std::fs::OpenOptions;
|
||||||
use std::io::SeekFrom;
|
use std::io::SeekFrom;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@@ -11,15 +11,14 @@ use json_ptr::{JsonPointer, SegList};
|
|||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::BTreeSet;
|
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::AsyncSeekExt;
|
use tokio::io::AsyncSeekExt;
|
||||||
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
|
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
|
||||||
|
|
||||||
use crate::handle::HandleId;
|
use crate::handle::HandleId;
|
||||||
|
use crate::locker::Locker;
|
||||||
use crate::patch::{diff, DiffPatch, Dump, Revision};
|
use crate::patch::{diff, DiffPatch, Dump, Revision};
|
||||||
use crate::Error;
|
use crate::{Error, PatchDbHandle};
|
||||||
use crate::{locker::Locker, PatchDbHandle};
|
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
|
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
|
||||||
@@ -216,24 +215,34 @@ impl Store {
|
|||||||
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
||||||
let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?;
|
let persistent_undo = json_patch::patch(&mut self.persistent, &*patch)?;
|
||||||
self.revision += 1;
|
self.revision += 1;
|
||||||
async {
|
if let Err(_e) = if self.revision % 4096 == 0 {
|
||||||
let file_len = self.file.metadata().await?.len();
|
self.compress().await
|
||||||
if file_len != self.file_cursor {
|
} else {
|
||||||
self.file.set_len(self.file_cursor).await?;
|
async {
|
||||||
self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
|
let file_len = self.file.metadata().await?.len();
|
||||||
|
if file_len != self.file_cursor {
|
||||||
|
self.file.set_len(self.file_cursor).await?;
|
||||||
|
self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
|
||||||
|
}
|
||||||
|
self.file.write_all(&patch_bin).await?;
|
||||||
|
self.file.flush().await?;
|
||||||
|
self.file.sync_all().await?;
|
||||||
|
self.file_cursor = self.file.stream_position().await?;
|
||||||
|
Ok::<_, Error>(())
|
||||||
}
|
}
|
||||||
self.file.write_all(&patch_bin).await?;
|
.await
|
||||||
self.file.flush().await?;
|
} {
|
||||||
self.file.sync_all().await?;
|
#[cfg(feature = "tracing")]
|
||||||
self.file_cursor = self.file.stream_position().await?;
|
tracing::error!("Error saving patch to disk: {}, attempting to compress", _e);
|
||||||
Ok::<_, Error>(())
|
if let Err(e) = self.compress().await {
|
||||||
}
|
#[cfg(feature = "tracing")]
|
||||||
.await
|
tracing::error!("Compression failed: {}", e);
|
||||||
.map_err(|e| {
|
persistent_undo.apply(&mut self.persistent);
|
||||||
persistent_undo.apply(&mut self.persistent);
|
self.revision -= 1;
|
||||||
self.revision -= 1;
|
return Err(e);
|
||||||
e
|
}
|
||||||
})?;
|
};
|
||||||
|
drop(persistent_undo);
|
||||||
|
|
||||||
let id = self.revision;
|
let id = self.revision;
|
||||||
let res = Arc::new(Revision { id, patch });
|
let res = Arc::new(Revision { id, patch });
|
||||||
|
|||||||
Reference in New Issue
Block a user