From 003cb1dcf2a59330f7363cba2a8e81109acef85d Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Fri, 6 Mar 2026 16:32:33 -0700 Subject: [PATCH] fix: address PR review comments - patch.rs: convert Replace to Add when idx == onto_idx in Remove rebase - store.rs: use ciborium::from_reader_with_buffer to reuse scratch buffer - store.rs: compress takes &mut bool instead of returning bool - store.rs: move OPEN_STORES cleanup to Drop impl (std::sync::Mutex) - store.rs: PatchDb::close no longer errors on other references - store.rs: revert exists() to treat null as non-existent - remove @claude changelog comments from backend --- core/src/patch.rs | 16 ++++- core/src/store.rs | 129 +++++++++++++++++++++-------------------- core/src/subscriber.rs | 4 -- core/src/test.rs | 4 +- json-ptr/src/lib.rs | 5 -- 5 files changed, 82 insertions(+), 76 deletions(-) diff --git a/core/src/patch.rs b/core/src/patch.rs index b6286c4..6c2b0c9 100644 --- a/core/src/patch.rs +++ b/core/src/patch.rs @@ -151,9 +151,6 @@ impl DiffPatch { .get_segment(arr_path_idx) .and_then(|seg| seg.parse::().ok()) { - // @claude fix #4: Was `idx >= onto_idx`, which caused - // `idx - 1` to underflow when both were 0 (panic in - // debug, wraps to usize::MAX in release). if idx > onto_idx { let mut new_path = prefix.clone().to_owned(); new_path.push_end_idx(idx - 1); @@ -161,6 +158,19 @@ impl DiffPatch { new_path.append(&tail); } *path = new_path; + } else if idx == onto_idx { + let new_op = match &*op { + PatchOperation::Replace(r) => { + Some(PatchOperation::Add(AddOperation { + path: r.path.clone(), + value: r.value.clone(), + })) + } + _ => None, + }; + if let Some(new_op) = new_op { + *op = new_op; + } } } } diff --git a/core/src/store.rs b/core/src/store.rs index 58ece9e..db0c947 100644 --- a/core/src/store.rs +++ b/core/src/store.rs @@ -8,7 +8,8 @@ use std::sync::Arc; use fd_lock_rs::FdLock; use futures::{Future, FutureExt}; -use imbl_value::{InternedString, Value}; +use imbl::Vector; +use imbl_value::{InOMap, InternedString, Value}; use json_patch::PatchError; use json_ptr::{JsonPointer, SegList, ROOT}; use lazy_static::lazy_static; @@ -23,7 +24,8 @@ use crate::subscriber::Broadcast; use crate::{DbWatch, Error, HasModel, Subscriber}; lazy_static! { - static ref OPEN_STORES: Mutex>>> = Mutex::new(HashMap::new()); + static ref OPEN_STORES: std::sync::Mutex>>> = + std::sync::Mutex::new(HashMap::new()); } pub struct Store { @@ -42,7 +44,7 @@ impl Store { tokio::fs::File::create(path.as_ref()).await?; } let path = tokio::fs::canonicalize(path).await?; - let mut lock = OPEN_STORES.lock().await; + let mut lock = OPEN_STORES.lock().unwrap(); ( if let Some(open) = lock.get(&path) { open.clone().try_lock_owned()? @@ -72,12 +74,14 @@ impl Store { false, )?; let mut reader = std::io::BufReader::new(&mut *f); + let mut scratch = Vec::new(); let mut revision: u64 = - ciborium::from_reader(&mut reader).unwrap_or(0); + ciborium::from_reader_with_buffer(&mut reader, &mut scratch).unwrap_or(0); let mut persistent: Value = - ciborium::from_reader(&mut reader).unwrap_or(Value::Null); + ciborium::from_reader_with_buffer(&mut reader, &mut scratch) + .unwrap_or(Value::Null); while let Ok(patch) = - ciborium::from_reader::(&mut reader) + ciborium::from_reader_with_buffer::(&mut reader, &mut scratch) { if let Err(_) = json_patch::patch(&mut persistent, &patch) { #[cfg(feature = "tracing")] @@ -106,7 +110,8 @@ impl Store { }) }) .await??; - res.compress().await.map(|_| ())?; + let mut _committed = false; + res.compress(&mut _committed).await?; Ok(res) } pub async fn close(mut self) -> Result<(), Error> { @@ -114,20 +119,11 @@ impl Store { self.file.flush().await?; self.file.shutdown().await?; - self.file.unlock(true).map_err(|e| e.1)?; - - // @claude fix #15: OPEN_STORES never removed entries, causing unbounded - // growth over the lifetime of a process. Now cleaned up on close(). - let mut lock = OPEN_STORES.lock().await; - lock.remove(&self.path); Ok(()) } - // @claude fix #18: Previously compared against Value::Null, which conflated - // an explicit JSON null with a missing key. Now uses .is_some() so that a - // key with null value is correctly reported as existing. pub(crate) fn exists, V: SegList>(&self, ptr: &JsonPointer) -> bool { - ptr.get(&self.persistent).is_some() + ptr.get(&self.persistent).unwrap_or(&Value::Null) != &Value::Null } pub(crate) fn keys, V: SegList>( &self, @@ -177,19 +173,10 @@ impl Store { } /// Compresses the database file by writing a fresh snapshot. /// - /// Returns `true` if the backup was committed (point of no return — the new - /// state will be recovered on restart regardless of main file state). - /// Returns `false` if the backup was never committed (safe to undo in memory). - /// - // @claude fix #2 + #10: Rewrote compress with three explicit phases: - // 1. Atomic backup via tmp+rename (safe to undo before this point) - // 2. Main file rewrite (backup ensures crash recovery; undo is unsafe) - // 3. Backup removal is non-fatal (#10) — a leftover backup is harmlessly - // replayed on restart. Previously, remove_file failure propagated an error - // that caused Store::open to rename the stale backup over the good file. - // Return type changed from Result<(), Error> to Result so the - // caller (TentativeUpdated in apply()) knows whether undo is safe (#2). - pub(crate) async fn compress(&mut self) -> Result { + /// Sets `*committed = true` once the atomic backup rename succeeds (point + /// of no return — the new state will be recovered on restart regardless of + /// main file state). Before that point, the caller can safely undo. + pub(crate) async fn compress(&mut self, committed: &mut bool) -> Result<(), Error> { use tokio::io::AsyncWriteExt; let bak = self.path.with_extension("bak"); let bak_tmp = bak.with_extension("bak.tmp"); @@ -206,6 +193,7 @@ impl Store { backup_file.flush().await?; backup_file.sync_all().await?; tokio::fs::rename(&bak_tmp, &bak).await?; + *committed = true; // Point of no return: the backup exists with the new state. On restart, // Store::open will rename it over the main file. From here, errors @@ -225,7 +213,7 @@ impl Store { // matches the main file) will be harmlessly applied. let _ = tokio::fs::remove_file(&bak).await; - Ok(true) + Ok(()) } pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result>, Error> { use tokio::io::AsyncWriteExt; @@ -267,23 +255,12 @@ impl Store { let mut updated = TentativeUpdated::new(self, &patch)?; if updated.store.revision % 4096 == 0 { - match updated.store.compress().await { - Ok(_) => { - // Compress succeeded; disarm undo (done below). - } - Err(e) => { - // @claude fix #2: If compress() succeeded past the atomic - // backup rename, the new state will be recovered on restart. - // Rolling back in-memory would permanently desync memory vs - // disk. Check for backup existence to decide whether undo - // is safe. - let bak = updated.store.path.with_extension("bak"); - if bak.exists() { - updated.undo.take(); // disarm: can't undo past the backup - } - return Err(e); - } + let mut committed = false; + let res = updated.store.compress(&mut committed).await; + if committed { + updated.undo.take(); } + res?; } else { if updated.store.file.stream_position().await? != updated.store.file_cursor { updated @@ -313,6 +290,45 @@ impl Store { } } +// Iterative drop to prevent stack overflow when dropping deeply nested Value trees. +// Without this, the compiler-generated recursive Drop can exhaust the call stack. +impl Drop for Store { + fn drop(&mut self) { + if let Ok(mut lock) = OPEN_STORES.lock() { + lock.remove(&self.path); + } + + // Only Array and Object can cause deep recursion + match &self.persistent { + Value::Array(vec) if !vec.is_empty() => {} + Value::Object(map) if !map.is_empty() => {} + _ => return, + } + + fn take_children(value: &mut Value, stack: &mut Vec) { + match value { + Value::Array(vec) => { + stack.extend(std::mem::take(vec)); + } + Value::Object(map) => { + let vec: Vector<(InternedString, Value)> = + std::mem::replace(map, InOMap::new()).into(); + stack.extend(vec.into_iter().map(|(_, v)| v)); + } + _ => {} + } + } + + let mut stack = Vec::new(); + take_children(&mut self.persistent, &mut stack); + while let Some(mut value) = stack.pop() { + take_children(&mut value, &mut stack); + // `value` drops here with an empty container, + // so the recursive Drop::drop returns immediately + } + } +} + #[must_use] pub struct MutateResult { pub result: Result, @@ -372,16 +388,10 @@ impl PatchDb { store: Arc::new(RwLock::new(Store::open(path).await?)), }) } - pub async fn close(self) -> Result<(), Error> { - let store = Arc::try_unwrap(self.store) - .map_err(|_| { - Error::IO(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "other PatchDb references still exist", - )) - })? - .into_inner(); - store.close().await + pub async fn close(self) { + if let Ok(store) = Arc::try_unwrap(self.store) { + let _ = store.into_inner().close().await; + } } pub async fn dump, V: SegList>(&self, ptr: &JsonPointer) -> Dump { self.store.read().await.dump(ptr) @@ -454,11 +464,6 @@ impl PatchDb { .await .into() } - // @claude fix #1: Previously, `old` was read once before the loop and never - // refreshed. If another writer modified store.persistent between the initial - // read and the write-lock acquisition, the `old == store.persistent` check - // failed forever — spinning the loop infinitely. Now `old` is re-read from - // the store at the start of each iteration. pub async fn run_idempotent(&self, f: F) -> Result<(Value, T), E> where F: Fn(Value) -> Fut + Send + Sync + UnwindSafe, diff --git a/core/src/subscriber.rs b/core/src/subscriber.rs index 6e61427..df89d0e 100644 --- a/core/src/subscriber.rs +++ b/core/src/subscriber.rs @@ -93,10 +93,6 @@ impl DbWatch { self.seen = true; Ok(self.state.clone()) } - // @claude fix #9: Previously applied only one revision per poll, emitting - // intermediate states that may never have been a consistent committed state. - // Now drains all queued revisions after the first wake, matching sync() - // behavior so the caller always sees a fully caught-up snapshot. pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { if !self.seen { self.seen = true; diff --git a/core/src/test.rs b/core/src/test.rs index 99db984..a909c55 100644 --- a/core/src/test.rs +++ b/core/src/test.rs @@ -62,7 +62,7 @@ async fn basic() { db.put(&ptr, "hello").await.unwrap(); get_res = db.get(&ptr).await.unwrap(); assert_eq!(get_res.as_str(), Some("hello")); - db.close().await.unwrap(); + db.close().await; cleanup_db(&path).await; } @@ -81,7 +81,7 @@ proptest! { let path = unique_db_path("proptest"); let db = init_db(path.clone()).await; put_string_into_root(&db, s).await; - db.close().await.unwrap(); + db.close().await; cleanup_db(&path).await; }); } diff --git a/json-ptr/src/lib.rs b/json-ptr/src/lib.rs index c50513e..e8c0d55 100644 --- a/json-ptr/src/lib.rs +++ b/json-ptr/src/lib.rs @@ -634,11 +634,6 @@ impl JsonPointer { PtrSegment::Unescaped(1..prefix_len) }); } - // @claude fix #3: Previously just inserted the new segment without adjusting - // existing segment ranges (unlike push_start which did shift them). All - // existing segments' byte ranges became wrong, causing corrupted pointer - // lookups or panics. Now shifts ranges forward by prefix_len, matching - // push_start behavior. pub fn push_start_idx(&mut self, segment: usize) { let mut src = format!("/{}", segment); let prefix_len = src.len();