From 10413414dc91828db5344e0245960be431a9cb52 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Mon, 9 Mar 2026 23:42:14 -0600 Subject: [PATCH] fix: revert from ciborium to serde_cbor_2, make apply_patches iterative - ciborium's deserialize_str bug (#32) caused DB deserialization to fail silently, nuking the database value to null on compress - Switch to dr-bonez/cbor fork (serde_cbor_2) which has StreamDeserializer change_output_type support - Propagate deserialization errors instead of falling back to null - Convert apply_patches from recursion to iteration to prevent stack overflow on patches with many operations (e.g. 953 ops) --- core/Cargo.toml | 20 +++--- core/src/lib.rs | 6 +- core/src/store.rs | 27 +++----- core/src/test.rs | 19 +----- json-patch/src/lib.rs | 125 +++++++++++++++++++----------------- json-patch/src/tests/mod.rs | 38 +++++++++++ 6 files changed, 128 insertions(+), 107 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index b3a8521..60c4ef2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -3,7 +3,7 @@ authors = ["Aiden McClelland "] categories = ["database-implementations"] description = "A database that tracks state updates as RFC 6902 JSON Patches" edition = "2018" -keywords = ["json", "json-pointer", "json-patch"] +keywords = ["json", "json-patch", "json-pointer"] license = "MIT" name = "patch-db" readme = "README.md" @@ -18,6 +18,7 @@ unstable = [] [dependencies] async-trait = "0.1.42" +serde_cbor = { package = "serde_cbor_2", git = "https://github.com/dr-bonez/cbor.git" } fd-lock-rs = "0.1.3" futures = "0.3.8" imbl = "6" @@ -25,24 +26,23 @@ imbl-value = "0.4.1" json-patch = { path = "../json-patch" } json-ptr = { path = "../json-ptr" } lazy_static = "1.4.0" -tracing = { version = "0.1.29", optional = true } -tracing-error = { version = "0.2.0", optional = true } nix = "0.30.1" patch-db-macro = { path = "../macro" } serde = { version = "1", features = ["rc"] } -ciborium = "0.2" thiserror = "2" -tokio = { version = "1", features = ["sync", "fs", "rt", "io-util", "macros"] } +tokio = { version = "1", features = ["fs", "io-util", "macros", "rt", "sync"] } +tracing = { version = "0.1.29", optional = true } +tracing-error = { version = "0.2.0", optional = true } [dev-dependencies] proptest = "1.0.0" -serde = { version = "1.0.118", features = ["rc", "derive"] } +rand = "0.9.1" +serde = { version = "1.0.118", features = ["derive", "rc"] } tokio = { version = "1.0.1", features = [ - "sync", "fs", - "rt", - "rt-multi-thread", "io-util", "macros", + "rt", + "rt-multi-thread", + "sync", ] } -rand = "0.9.1" diff --git a/core/src/lib.rs b/core/src/lib.rs index 845831d..006973d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -27,10 +27,8 @@ pub enum Error { IO(#[from] IOError), #[error("JSON (De)Serialization Error: {0}")] JSON(#[from] imbl_value::Error), - #[error("CBOR Deserialization Error: {0}")] - CborDe(#[from] ciborium::de::Error), - #[error("CBOR Serialization Error: {0}")] - CborSer(#[from] ciborium::ser::Error), + #[error("CBOR (De)Serialization Error: {0}")] + CBOR(#[from] serde_cbor::Error), #[error("Index Error: {0:?}")] Pointer(#[from] json_ptr::IndexError), #[error("Patch Error: {0}")] diff --git a/core/src/store.rs b/core/src/store.rs index db0c947..3eb4021 100644 --- a/core/src/store.rs +++ b/core/src/store.rs @@ -58,7 +58,6 @@ impl Store { }; let mut res = tokio::task::spawn_blocking(move || { use std::io::Seek; - let bak = path.with_extension("bak"); if bak.exists() { std::fs::rename(&bak, &path)?; @@ -73,16 +72,13 @@ impl Store { fd_lock_rs::LockType::Exclusive, false, )?; - let mut reader = std::io::BufReader::new(&mut *f); - let mut scratch = Vec::new(); - let mut revision: u64 = - ciborium::from_reader_with_buffer(&mut reader, &mut scratch).unwrap_or(0); - let mut persistent: Value = - ciborium::from_reader_with_buffer(&mut reader, &mut scratch) - .unwrap_or(Value::Null); - while let Ok(patch) = - ciborium::from_reader_with_buffer::(&mut reader, &mut scratch) - { + let mut stream = + serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f)); + let mut revision: u64 = stream.next().transpose()?.unwrap_or(0); + let mut stream = stream.change_output_type(); + let mut persistent: Value = stream.next().transpose()?.unwrap_or(Value::Null); + let mut stream = stream.change_output_type(); + while let Some(Ok(patch)) = stream.next() { if let Err(_) = json_patch::patch(&mut persistent, &patch) { #[cfg(feature = "tracing")] tracing::error!("Error applying patch, skipping..."); @@ -180,10 +176,8 @@ impl Store { use tokio::io::AsyncWriteExt; let bak = self.path.with_extension("bak"); let bak_tmp = bak.with_extension("bak.tmp"); - let mut revision_cbor = Vec::new(); - ciborium::into_writer(&self.revision, &mut revision_cbor)?; - let mut data_cbor = Vec::new(); - ciborium::into_writer(&self.persistent, &mut data_cbor)?; + let revision_cbor = serde_cbor::to_vec(&self.revision)?; + let data_cbor = serde_cbor::to_vec(&self.persistent)?; // Phase 1: Create atomic backup. If this fails, the main file is // untouched and the caller can safely undo the in-memory patch. @@ -250,8 +244,7 @@ impl Store { tracing::trace!("Attempting to apply patch: {:?}", patch); // apply patch in memory - let mut patch_bin = Vec::new(); - ciborium::into_writer(&*patch, &mut patch_bin)?; + let patch_bin = serde_cbor::to_vec(&*patch)?; let mut updated = TentativeUpdated::new(self, &patch)?; if updated.store.revision % 4096 == 0 { diff --git a/core/src/test.rs b/core/src/test.rs index a909c55..fcbccde 100644 --- a/core/src/test.rs +++ b/core/src/test.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use imbl_value::{json, Value}; use json_ptr::JsonPointer; -use patch_db::{HasModel, PatchDb, Revision}; +use patch_db::{PatchDb, Revision}; use proptest::prelude::*; use tokio::fs; use tokio::runtime::Builder; @@ -86,20 +86,3 @@ proptest! { }); } } - -// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)] -// pub struct Sample { -// a: String, -// #[model] -// b: Child, -// } - -// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)] -// pub struct Child { -// a: String, -// b: usize, -// c: NewType, -// } - -// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)] -// pub struct NewType(Option>); diff --git a/json-patch/src/lib.rs b/json-patch/src/lib.rs index c6e15bd..f7949ba 100644 --- a/json-patch/src/lib.rs +++ b/json-patch/src/lib.rs @@ -438,66 +438,75 @@ fn apply_patches<'a>( patches: &'a [PatchOperation], undo: &mut Undo<'a>, ) -> Result<(), PatchError> { - let (patch, tail) = match patches.split_first() { - None => return Ok(()), - Some((patch, tail)) => (patch, tail), - }; - - let res = match *patch { - PatchOperation::Add(ref op) => { - let prev = add(doc, &op.path, op.value.clone())?; - undo.0.push(Box::new(move |doc| { - match prev { - None => remove(doc, &op.path, true).unwrap(), - Some(v) => add(doc, &op.path, v).unwrap().unwrap(), - }; - })); - apply_patches(doc, tail, undo) + let initial_len = undo.0.len(); + for patch in patches { + let res = match *patch { + PatchOperation::Add(ref op) => { + add(doc, &op.path, op.value.clone()).map(|prev| { + undo.0.push(Box::new(move |doc| { + match prev { + None => { + remove(doc, &op.path, true).unwrap(); + } + Some(v) => { + add(doc, &op.path, v).unwrap().unwrap(); + } + }; + })); + }) + } + PatchOperation::Remove(ref op) => { + remove(doc, &op.path, false).map(|prev| { + undo.0.push(Box::new(move |doc| { + assert!(add(doc, &op.path, prev).unwrap().is_none()); + })); + }) + } + PatchOperation::Replace(ref op) => { + replace(doc, &op.path, op.value.clone()).map(|prev| { + undo.0.push(Box::new(move |doc| { + replace(doc, &op.path, prev).unwrap(); + })); + }) + } + PatchOperation::Move(ref op) => { + mov(doc, &op.from, &op.path, false).map(|prev| { + undo.0.push(Box::new(move |doc| { + mov(doc, &op.path, &op.from, true).unwrap(); + if let Some(prev) = prev { + assert!(add(doc, &op.path, prev).unwrap().is_none()); + } + })); + }) + } + PatchOperation::Copy(ref op) => { + copy(doc, &op.from, &op.path).map(|prev| { + undo.0.push(Box::new(move |doc| { + match prev { + None => { + remove(doc, &op.path, true).unwrap(); + } + Some(v) => { + add(doc, &op.path, v).unwrap().unwrap(); + } + }; + })); + }) + } + PatchOperation::Test(ref op) => { + test(doc, &op.path, &op.value).map(|()| { + undo.0.push(Box::new(move |_| ())); + }) + } + }; + if let Err(e) = res { + while undo.0.len() > initial_len { + undo.0.pop().unwrap()(doc); + } + return Err(e); } - PatchOperation::Remove(ref op) => { - let prev = remove(doc, &op.path, false)?; - undo.0.push(Box::new(move |doc| { - assert!(add(doc, &op.path, prev).unwrap().is_none()); - })); - apply_patches(doc, tail, undo) - } - PatchOperation::Replace(ref op) => { - let prev = replace(doc, &op.path, op.value.clone())?; - undo.0.push(Box::new(move |doc| { - replace(doc, &op.path, prev).unwrap(); - })); - apply_patches(doc, tail, undo) - } - PatchOperation::Move(ref op) => { - let prev = mov(doc, &op.from, &op.path, false)?; - undo.0.push(Box::new(move |doc| { - mov(doc, &op.path, &op.from, true).unwrap(); - if let Some(prev) = prev { - assert!(add(doc, &op.path, prev).unwrap().is_none()); - } - })); - apply_patches(doc, tail, undo) - } - PatchOperation::Copy(ref op) => { - let prev = copy(doc, &op.from, &op.path)?; - undo.0.push(Box::new(move |doc| { - match prev { - None => remove(doc, &op.path, true).unwrap(), - Some(v) => add(doc, &op.path, v).unwrap().unwrap(), - }; - })); - apply_patches(doc, tail, undo) - } - PatchOperation::Test(ref op) => { - test(doc, &op.path, &op.value)?; - undo.0.push(Box::new(move |_| ())); - apply_patches(doc, tail, undo) - } - }; - if res.is_err() { - undo.0.pop().unwrap()(doc); } - res + Ok(()) } /// Patch provided JSON document (given as `imbl_value::Value`) in place. diff --git a/json-patch/src/tests/mod.rs b/json-patch/src/tests/mod.rs index 9489d3f..a2e9567 100644 --- a/json-patch/src/tests/mod.rs +++ b/json-patch/src/tests/mod.rs @@ -81,3 +81,41 @@ fn revert_tests() { fn merge_tests() { util::run_specs("specs/merge_tests.json"); } + +#[test] +fn many_ops_no_stack_overflow() { + let mut doc = json!({"items": {}}); + let ops: Vec = (0..10_000) + .map(|i| { + PatchOperation::Add(AddOperation { + path: format!("/items/{i}").parse().unwrap(), + value: json!(i), + }) + }) + .collect(); + let p = Patch(ops); + patch(&mut doc, &p).unwrap(); + assert_eq!(doc["items"].as_object().unwrap().len(), 10_000); +} + +#[test] +fn many_ops_undo_on_failure() { + let original = json!({"items": {}}); + let mut doc = original.clone(); + let mut ops: Vec = (0..1000) + .map(|i| { + PatchOperation::Add(AddOperation { + path: format!("/items/{i}").parse().unwrap(), + value: json!(i), + }) + }) + .collect(); + // Final op fails: test against a wrong value at a valid path + ops.push(PatchOperation::Test(TestOperation { + path: "/items/0".parse().unwrap(), + value: json!("wrong"), + })); + let p = Patch(ops); + assert!(patch(&mut doc, &p).is_err()); + assert_eq!(doc, original, "document should be fully restored after failed patch"); +}