allow recovery from broken db

This commit is contained in:
Aiden McClelland
2022-09-27 14:51:31 -06:00
parent 50657d63e2
commit 391e86276d
2 changed files with 16 additions and 6 deletions

View File

@@ -7,6 +7,6 @@ edition = "2021"
[dependencies]
clap = "3.2.16"
patch-db = { path = "../patch-db" }
patch-db = { path = "../patch-db", features = ["debug"] }
serde_json = "1.0.85"
tokio = { version = "1.20.1", features = ["full"] }

View File

@@ -1,6 +1,6 @@
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fs::OpenOptions;
use std::io::SeekFrom;
use std::io::{SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
@@ -109,7 +109,18 @@ impl Store {
let mut persistent = stream.next().transpose()?.unwrap_or_else(|| Value::Null);
let mut stream = stream.change_output_type();
while let Some(Ok(patch)) = stream.next() {
json_patch::patch(&mut persistent, &patch)?;
if let Err(_) = json_patch::patch(&mut persistent, &patch) {
#[cfg(feature = "tracing")]
tracing::error!("Error applying patch, skipping...");
writeln!(
OpenOptions::new()
.create(true)
.append(true)
.open(path.with_extension("failed"))?,
"{}",
serde_json::to_string(&patch)?,
)?;
}
revision += 1;
}
let file_cursor = f.stream_position()?;
@@ -224,15 +235,14 @@ impl Store {
self.compress().await
} else {
async {
let file_len = self.file.metadata().await?.len();
if file_len != self.file_cursor {
if self.file.stream_position().await? != self.file_cursor {
self.file.set_len(self.file_cursor).await?;
self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
}
self.file.write_all(&patch_bin).await?;
self.file.flush().await?;
self.file.sync_all().await?;
self.file_cursor = self.file.stream_position().await?;
self.file_cursor += patch_bin.len() as u64;
Ok::<_, Error>(())
}
.await