mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
fix: revert from ciborium to serde_cbor_2, make apply_patches iterative
- ciborium's deserialize_str bug (#32) caused DB deserialization to fail silently, nuking the database value to null on compress - Switch to dr-bonez/cbor fork (serde_cbor_2) which has StreamDeserializer change_output_type support - Propagate deserialization errors instead of falling back to null - Convert apply_patches from recursion to iteration to prevent stack overflow on patches with many operations (e.g. 953 ops)
This commit is contained in:
@@ -3,7 +3,7 @@ authors = ["Aiden McClelland <aiden@start9labs.com>"]
|
||||
categories = ["database-implementations"]
|
||||
description = "A database that tracks state updates as RFC 6902 JSON Patches"
|
||||
edition = "2018"
|
||||
keywords = ["json", "json-pointer", "json-patch"]
|
||||
keywords = ["json", "json-patch", "json-pointer"]
|
||||
license = "MIT"
|
||||
name = "patch-db"
|
||||
readme = "README.md"
|
||||
@@ -18,6 +18,7 @@ unstable = []
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1.42"
|
||||
serde_cbor = { package = "serde_cbor_2", git = "https://github.com/dr-bonez/cbor.git" }
|
||||
fd-lock-rs = "0.1.3"
|
||||
futures = "0.3.8"
|
||||
imbl = "6"
|
||||
@@ -25,24 +26,23 @@ imbl-value = "0.4.1"
|
||||
json-patch = { path = "../json-patch" }
|
||||
json-ptr = { path = "../json-ptr" }
|
||||
lazy_static = "1.4.0"
|
||||
tracing = { version = "0.1.29", optional = true }
|
||||
tracing-error = { version = "0.2.0", optional = true }
|
||||
nix = "0.30.1"
|
||||
patch-db-macro = { path = "../macro" }
|
||||
serde = { version = "1", features = ["rc"] }
|
||||
ciborium = "0.2"
|
||||
thiserror = "2"
|
||||
tokio = { version = "1", features = ["sync", "fs", "rt", "io-util", "macros"] }
|
||||
tokio = { version = "1", features = ["fs", "io-util", "macros", "rt", "sync"] }
|
||||
tracing = { version = "0.1.29", optional = true }
|
||||
tracing-error = { version = "0.2.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1.0.0"
|
||||
serde = { version = "1.0.118", features = ["rc", "derive"] }
|
||||
rand = "0.9.1"
|
||||
serde = { version = "1.0.118", features = ["derive", "rc"] }
|
||||
tokio = { version = "1.0.1", features = [
|
||||
"sync",
|
||||
"fs",
|
||||
"rt",
|
||||
"rt-multi-thread",
|
||||
"io-util",
|
||||
"macros",
|
||||
"rt",
|
||||
"rt-multi-thread",
|
||||
"sync",
|
||||
] }
|
||||
rand = "0.9.1"
|
||||
|
||||
@@ -27,10 +27,8 @@ pub enum Error {
|
||||
IO(#[from] IOError),
|
||||
#[error("JSON (De)Serialization Error: {0}")]
|
||||
JSON(#[from] imbl_value::Error),
|
||||
#[error("CBOR Deserialization Error: {0}")]
|
||||
CborDe(#[from] ciborium::de::Error<IOError>),
|
||||
#[error("CBOR Serialization Error: {0}")]
|
||||
CborSer(#[from] ciborium::ser::Error<IOError>),
|
||||
#[error("CBOR (De)Serialization Error: {0}")]
|
||||
CBOR(#[from] serde_cbor::Error),
|
||||
#[error("Index Error: {0:?}")]
|
||||
Pointer(#[from] json_ptr::IndexError),
|
||||
#[error("Patch Error: {0}")]
|
||||
|
||||
@@ -58,7 +58,6 @@ impl Store {
|
||||
};
|
||||
let mut res = tokio::task::spawn_blocking(move || {
|
||||
use std::io::Seek;
|
||||
|
||||
let bak = path.with_extension("bak");
|
||||
if bak.exists() {
|
||||
std::fs::rename(&bak, &path)?;
|
||||
@@ -73,16 +72,13 @@ impl Store {
|
||||
fd_lock_rs::LockType::Exclusive,
|
||||
false,
|
||||
)?;
|
||||
let mut reader = std::io::BufReader::new(&mut *f);
|
||||
let mut scratch = Vec::new();
|
||||
let mut revision: u64 =
|
||||
ciborium::from_reader_with_buffer(&mut reader, &mut scratch).unwrap_or(0);
|
||||
let mut persistent: Value =
|
||||
ciborium::from_reader_with_buffer(&mut reader, &mut scratch)
|
||||
.unwrap_or(Value::Null);
|
||||
while let Ok(patch) =
|
||||
ciborium::from_reader_with_buffer::<json_patch::Patch, _>(&mut reader, &mut scratch)
|
||||
{
|
||||
let mut stream =
|
||||
serde_cbor::StreamDeserializer::new(serde_cbor::de::IoRead::new(&mut *f));
|
||||
let mut revision: u64 = stream.next().transpose()?.unwrap_or(0);
|
||||
let mut stream = stream.change_output_type();
|
||||
let mut persistent: Value = stream.next().transpose()?.unwrap_or(Value::Null);
|
||||
let mut stream = stream.change_output_type();
|
||||
while let Some(Ok(patch)) = stream.next() {
|
||||
if let Err(_) = json_patch::patch(&mut persistent, &patch) {
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::error!("Error applying patch, skipping...");
|
||||
@@ -180,10 +176,8 @@ impl Store {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
let bak = self.path.with_extension("bak");
|
||||
let bak_tmp = bak.with_extension("bak.tmp");
|
||||
let mut revision_cbor = Vec::new();
|
||||
ciborium::into_writer(&self.revision, &mut revision_cbor)?;
|
||||
let mut data_cbor = Vec::new();
|
||||
ciborium::into_writer(&self.persistent, &mut data_cbor)?;
|
||||
let revision_cbor = serde_cbor::to_vec(&self.revision)?;
|
||||
let data_cbor = serde_cbor::to_vec(&self.persistent)?;
|
||||
|
||||
// Phase 1: Create atomic backup. If this fails, the main file is
|
||||
// untouched and the caller can safely undo the in-memory patch.
|
||||
@@ -250,8 +244,7 @@ impl Store {
|
||||
tracing::trace!("Attempting to apply patch: {:?}", patch);
|
||||
|
||||
// apply patch in memory
|
||||
let mut patch_bin = Vec::new();
|
||||
ciborium::into_writer(&*patch, &mut patch_bin)?;
|
||||
let patch_bin = serde_cbor::to_vec(&*patch)?;
|
||||
let mut updated = TentativeUpdated::new(self, &patch)?;
|
||||
|
||||
if updated.store.revision % 4096 == 0 {
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::sync::Arc;
|
||||
|
||||
use imbl_value::{json, Value};
|
||||
use json_ptr::JsonPointer;
|
||||
use patch_db::{HasModel, PatchDb, Revision};
|
||||
use patch_db::{PatchDb, Revision};
|
||||
use proptest::prelude::*;
|
||||
use tokio::fs;
|
||||
use tokio::runtime::Builder;
|
||||
@@ -86,20 +86,3 @@ proptest! {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
|
||||
// pub struct Sample {
|
||||
// a: String,
|
||||
// #[model]
|
||||
// b: Child,
|
||||
// }
|
||||
|
||||
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
|
||||
// pub struct Child {
|
||||
// a: String,
|
||||
// b: usize,
|
||||
// c: NewType,
|
||||
// }
|
||||
|
||||
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
|
||||
// pub struct NewType(Option<Box<Sample>>);
|
||||
|
||||
@@ -438,66 +438,75 @@ fn apply_patches<'a>(
|
||||
patches: &'a [PatchOperation],
|
||||
undo: &mut Undo<'a>,
|
||||
) -> Result<(), PatchError> {
|
||||
let (patch, tail) = match patches.split_first() {
|
||||
None => return Ok(()),
|
||||
Some((patch, tail)) => (patch, tail),
|
||||
};
|
||||
|
||||
let res = match *patch {
|
||||
PatchOperation::Add(ref op) => {
|
||||
let prev = add(doc, &op.path, op.value.clone())?;
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
match prev {
|
||||
None => remove(doc, &op.path, true).unwrap(),
|
||||
Some(v) => add(doc, &op.path, v).unwrap().unwrap(),
|
||||
};
|
||||
}));
|
||||
apply_patches(doc, tail, undo)
|
||||
let initial_len = undo.0.len();
|
||||
for patch in patches {
|
||||
let res = match *patch {
|
||||
PatchOperation::Add(ref op) => {
|
||||
add(doc, &op.path, op.value.clone()).map(|prev| {
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
match prev {
|
||||
None => {
|
||||
remove(doc, &op.path, true).unwrap();
|
||||
}
|
||||
Some(v) => {
|
||||
add(doc, &op.path, v).unwrap().unwrap();
|
||||
}
|
||||
};
|
||||
}));
|
||||
})
|
||||
}
|
||||
PatchOperation::Remove(ref op) => {
|
||||
remove(doc, &op.path, false).map(|prev| {
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
assert!(add(doc, &op.path, prev).unwrap().is_none());
|
||||
}));
|
||||
})
|
||||
}
|
||||
PatchOperation::Replace(ref op) => {
|
||||
replace(doc, &op.path, op.value.clone()).map(|prev| {
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
replace(doc, &op.path, prev).unwrap();
|
||||
}));
|
||||
})
|
||||
}
|
||||
PatchOperation::Move(ref op) => {
|
||||
mov(doc, &op.from, &op.path, false).map(|prev| {
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
mov(doc, &op.path, &op.from, true).unwrap();
|
||||
if let Some(prev) = prev {
|
||||
assert!(add(doc, &op.path, prev).unwrap().is_none());
|
||||
}
|
||||
}));
|
||||
})
|
||||
}
|
||||
PatchOperation::Copy(ref op) => {
|
||||
copy(doc, &op.from, &op.path).map(|prev| {
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
match prev {
|
||||
None => {
|
||||
remove(doc, &op.path, true).unwrap();
|
||||
}
|
||||
Some(v) => {
|
||||
add(doc, &op.path, v).unwrap().unwrap();
|
||||
}
|
||||
};
|
||||
}));
|
||||
})
|
||||
}
|
||||
PatchOperation::Test(ref op) => {
|
||||
test(doc, &op.path, &op.value).map(|()| {
|
||||
undo.0.push(Box::new(move |_| ()));
|
||||
})
|
||||
}
|
||||
};
|
||||
if let Err(e) = res {
|
||||
while undo.0.len() > initial_len {
|
||||
undo.0.pop().unwrap()(doc);
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
PatchOperation::Remove(ref op) => {
|
||||
let prev = remove(doc, &op.path, false)?;
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
assert!(add(doc, &op.path, prev).unwrap().is_none());
|
||||
}));
|
||||
apply_patches(doc, tail, undo)
|
||||
}
|
||||
PatchOperation::Replace(ref op) => {
|
||||
let prev = replace(doc, &op.path, op.value.clone())?;
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
replace(doc, &op.path, prev).unwrap();
|
||||
}));
|
||||
apply_patches(doc, tail, undo)
|
||||
}
|
||||
PatchOperation::Move(ref op) => {
|
||||
let prev = mov(doc, &op.from, &op.path, false)?;
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
mov(doc, &op.path, &op.from, true).unwrap();
|
||||
if let Some(prev) = prev {
|
||||
assert!(add(doc, &op.path, prev).unwrap().is_none());
|
||||
}
|
||||
}));
|
||||
apply_patches(doc, tail, undo)
|
||||
}
|
||||
PatchOperation::Copy(ref op) => {
|
||||
let prev = copy(doc, &op.from, &op.path)?;
|
||||
undo.0.push(Box::new(move |doc| {
|
||||
match prev {
|
||||
None => remove(doc, &op.path, true).unwrap(),
|
||||
Some(v) => add(doc, &op.path, v).unwrap().unwrap(),
|
||||
};
|
||||
}));
|
||||
apply_patches(doc, tail, undo)
|
||||
}
|
||||
PatchOperation::Test(ref op) => {
|
||||
test(doc, &op.path, &op.value)?;
|
||||
undo.0.push(Box::new(move |_| ()));
|
||||
apply_patches(doc, tail, undo)
|
||||
}
|
||||
};
|
||||
if res.is_err() {
|
||||
undo.0.pop().unwrap()(doc);
|
||||
}
|
||||
res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Patch provided JSON document (given as `imbl_value::Value`) in place.
|
||||
|
||||
@@ -81,3 +81,41 @@ fn revert_tests() {
|
||||
fn merge_tests() {
|
||||
util::run_specs("specs/merge_tests.json");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn many_ops_no_stack_overflow() {
|
||||
let mut doc = json!({"items": {}});
|
||||
let ops: Vec<PatchOperation> = (0..10_000)
|
||||
.map(|i| {
|
||||
PatchOperation::Add(AddOperation {
|
||||
path: format!("/items/{i}").parse().unwrap(),
|
||||
value: json!(i),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
let p = Patch(ops);
|
||||
patch(&mut doc, &p).unwrap();
|
||||
assert_eq!(doc["items"].as_object().unwrap().len(), 10_000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn many_ops_undo_on_failure() {
|
||||
let original = json!({"items": {}});
|
||||
let mut doc = original.clone();
|
||||
let mut ops: Vec<PatchOperation> = (0..1000)
|
||||
.map(|i| {
|
||||
PatchOperation::Add(AddOperation {
|
||||
path: format!("/items/{i}").parse().unwrap(),
|
||||
value: json!(i),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
// Final op fails: test against a wrong value at a valid path
|
||||
ops.push(PatchOperation::Test(TestOperation {
|
||||
path: "/items/0".parse().unwrap(),
|
||||
value: json!("wrong"),
|
||||
}));
|
||||
let p = Patch(ops);
|
||||
assert!(patch(&mut doc, &p).is_err());
|
||||
assert_eq!(doc, original, "document should be fully restored after failed patch");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user