diff --git a/patch-db-util/Cargo.toml b/patch-db-util/Cargo.toml index ab3f286..3e67361 100644 --- a/patch-db-util/Cargo.toml +++ b/patch-db-util/Cargo.toml @@ -7,6 +7,6 @@ edition = "2021" [dependencies] clap = "3.2.16" -patch-db = { path = "../patch-db" } +patch-db = { path = "../patch-db", features = ["debug"] } serde_json = "1.0.85" tokio = { version = "1.20.1", features = ["full"] } diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 2e5c487..db0d341 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeSet, HashMap, VecDeque}; use std::fs::OpenOptions; -use std::io::SeekFrom; +use std::io::{SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -109,7 +109,18 @@ impl Store { let mut persistent = stream.next().transpose()?.unwrap_or_else(|| Value::Null); let mut stream = stream.change_output_type(); while let Some(Ok(patch)) = stream.next() { - json_patch::patch(&mut persistent, &patch)?; + if let Err(_) = json_patch::patch(&mut persistent, &patch) { + #[cfg(feature = "tracing")] + tracing::error!("Error applying patch, skipping..."); + writeln!( + OpenOptions::new() + .create(true) + .append(true) + .open(path.with_extension("failed"))?, + "{}", + serde_json::to_string(&patch)?, + )?; + } revision += 1; } let file_cursor = f.stream_position()?; @@ -224,15 +235,14 @@ impl Store { self.compress().await } else { async { - let file_len = self.file.metadata().await?.len(); - if file_len != self.file_cursor { + if self.file.stream_position().await? != self.file_cursor { self.file.set_len(self.file_cursor).await?; self.file.seek(SeekFrom::Start(self.file_cursor)).await?; } self.file.write_all(&patch_bin).await?; self.file.flush().await?; self.file.sync_all().await?; - self.file_cursor = self.file.stream_position().await?; + self.file_cursor += patch_bin.len() as u64; Ok::<_, Error>(()) } .await