allow recovery from broken db (#50)

This commit is contained in:
Aiden McClelland
2022-09-27 14:56:26 -06:00
committed by GitHub
parent 50657d63e2
commit 00564ca1ca
2 changed files with 16 additions and 6 deletions

View File

@@ -7,6 +7,6 @@ edition = "2021"
[dependencies] [dependencies]
clap = "3.2.16" clap = "3.2.16"
patch-db = { path = "../patch-db" } patch-db = { path = "../patch-db", features = ["debug"] }
serde_json = "1.0.85" serde_json = "1.0.85"
tokio = { version = "1.20.1", features = ["full"] } tokio = { version = "1.20.1", features = ["full"] }

View File

@@ -1,6 +1,6 @@
use std::collections::{BTreeSet, HashMap, VecDeque}; use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::SeekFrom; use std::io::{SeekFrom, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
@@ -109,7 +109,18 @@ impl Store {
let mut persistent = stream.next().transpose()?.unwrap_or_else(|| Value::Null); let mut persistent = stream.next().transpose()?.unwrap_or_else(|| Value::Null);
let mut stream = stream.change_output_type(); let mut stream = stream.change_output_type();
while let Some(Ok(patch)) = stream.next() { while let Some(Ok(patch)) = stream.next() {
json_patch::patch(&mut persistent, &patch)?; if let Err(_) = json_patch::patch(&mut persistent, &patch) {
#[cfg(feature = "tracing")]
tracing::error!("Error applying patch, skipping...");
writeln!(
OpenOptions::new()
.create(true)
.append(true)
.open(path.with_extension("failed"))?,
"{}",
serde_json::to_string(&patch)?,
)?;
}
revision += 1; revision += 1;
} }
let file_cursor = f.stream_position()?; let file_cursor = f.stream_position()?;
@@ -224,15 +235,14 @@ impl Store {
self.compress().await self.compress().await
} else { } else {
async { async {
let file_len = self.file.metadata().await?.len(); if self.file.stream_position().await? != self.file_cursor {
if file_len != self.file_cursor {
self.file.set_len(self.file_cursor).await?; self.file.set_len(self.file_cursor).await?;
self.file.seek(SeekFrom::Start(self.file_cursor)).await?; self.file.seek(SeekFrom::Start(self.file_cursor)).await?;
} }
self.file.write_all(&patch_bin).await?; self.file.write_all(&patch_bin).await?;
self.file.flush().await?; self.file.flush().await?;
self.file.sync_all().await?; self.file.sync_all().await?;
self.file_cursor = self.file.stream_position().await?; self.file_cursor += patch_bin.len() as u64;
Ok::<_, Error>(()) Ok::<_, Error>(())
} }
.await .await