mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
Merge branch 'audit-fixes' of github.com:Start9Labs/patch-db into audit-fixes
This commit is contained in:
@@ -151,9 +151,6 @@ impl DiffPatch {
|
||||
.get_segment(arr_path_idx)
|
||||
.and_then(|seg| seg.parse::<usize>().ok())
|
||||
{
|
||||
// @claude fix #4: Was `idx >= onto_idx`, which caused
|
||||
// `idx - 1` to underflow when both were 0 (panic in
|
||||
// debug, wraps to usize::MAX in release).
|
||||
if idx > onto_idx {
|
||||
let mut new_path = prefix.clone().to_owned();
|
||||
new_path.push_end_idx(idx - 1);
|
||||
@@ -161,6 +158,19 @@ impl DiffPatch {
|
||||
new_path.append(&tail);
|
||||
}
|
||||
*path = new_path;
|
||||
} else if idx == onto_idx {
|
||||
let new_op = match &*op {
|
||||
PatchOperation::Replace(r) => {
|
||||
Some(PatchOperation::Add(AddOperation {
|
||||
path: r.path.clone(),
|
||||
value: r.value.clone(),
|
||||
}))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
if let Some(new_op) = new_op {
|
||||
*op = new_op;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,8 @@ use std::sync::Arc;
|
||||
|
||||
use fd_lock_rs::FdLock;
|
||||
use futures::{Future, FutureExt};
|
||||
use imbl_value::{InternedString, Value};
|
||||
use imbl::Vector;
|
||||
use imbl_value::{InOMap, InternedString, Value};
|
||||
use json_patch::PatchError;
|
||||
use json_ptr::{JsonPointer, SegList, ROOT};
|
||||
use lazy_static::lazy_static;
|
||||
@@ -23,7 +24,8 @@ use crate::subscriber::Broadcast;
|
||||
use crate::{DbWatch, Error, HasModel, Subscriber};
|
||||
|
||||
lazy_static! {
|
||||
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
|
||||
static ref OPEN_STORES: std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> =
|
||||
std::sync::Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub struct Store {
|
||||
@@ -42,7 +44,7 @@ impl Store {
|
||||
tokio::fs::File::create(path.as_ref()).await?;
|
||||
}
|
||||
let path = tokio::fs::canonicalize(path).await?;
|
||||
let mut lock = OPEN_STORES.lock().await;
|
||||
let mut lock = OPEN_STORES.lock().unwrap();
|
||||
(
|
||||
if let Some(open) = lock.get(&path) {
|
||||
open.clone().try_lock_owned()?
|
||||
@@ -72,12 +74,14 @@ impl Store {
|
||||
false,
|
||||
)?;
|
||||
let mut reader = std::io::BufReader::new(&mut *f);
|
||||
let mut scratch = Vec::new();
|
||||
let mut revision: u64 =
|
||||
ciborium::from_reader(&mut reader).unwrap_or(0);
|
||||
ciborium::from_reader_with_buffer(&mut reader, &mut scratch).unwrap_or(0);
|
||||
let mut persistent: Value =
|
||||
ciborium::from_reader(&mut reader).unwrap_or(Value::Null);
|
||||
ciborium::from_reader_with_buffer(&mut reader, &mut scratch)
|
||||
.unwrap_or(Value::Null);
|
||||
while let Ok(patch) =
|
||||
ciborium::from_reader::<json_patch::Patch, _>(&mut reader)
|
||||
ciborium::from_reader_with_buffer::<json_patch::Patch, _>(&mut reader, &mut scratch)
|
||||
{
|
||||
if let Err(_) = json_patch::patch(&mut persistent, &patch) {
|
||||
#[cfg(feature = "tracing")]
|
||||
@@ -106,7 +110,8 @@ impl Store {
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
res.compress().await.map(|_| ())?;
|
||||
let mut _committed = false;
|
||||
res.compress(&mut _committed).await?;
|
||||
Ok(res)
|
||||
}
|
||||
pub async fn close(mut self) -> Result<(), Error> {
|
||||
@@ -114,20 +119,11 @@ impl Store {
|
||||
|
||||
self.file.flush().await?;
|
||||
self.file.shutdown().await?;
|
||||
self.file.unlock(true).map_err(|e| e.1)?;
|
||||
|
||||
// @claude fix #15: OPEN_STORES never removed entries, causing unbounded
|
||||
// growth over the lifetime of a process. Now cleaned up on close().
|
||||
let mut lock = OPEN_STORES.lock().await;
|
||||
lock.remove(&self.path);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// @claude fix #18: Previously compared against Value::Null, which conflated
|
||||
// an explicit JSON null with a missing key. Now uses .is_some() so that a
|
||||
// key with null value is correctly reported as existing.
|
||||
pub(crate) fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
|
||||
ptr.get(&self.persistent).is_some()
|
||||
ptr.get(&self.persistent).unwrap_or(&Value::Null) != &Value::Null
|
||||
}
|
||||
pub(crate) fn keys<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
@@ -177,19 +173,10 @@ impl Store {
|
||||
}
|
||||
/// Compresses the database file by writing a fresh snapshot.
|
||||
///
|
||||
/// Returns `true` if the backup was committed (point of no return — the new
|
||||
/// state will be recovered on restart regardless of main file state).
|
||||
/// Returns `false` if the backup was never committed (safe to undo in memory).
|
||||
///
|
||||
// @claude fix #2 + #10: Rewrote compress with three explicit phases:
|
||||
// 1. Atomic backup via tmp+rename (safe to undo before this point)
|
||||
// 2. Main file rewrite (backup ensures crash recovery; undo is unsafe)
|
||||
// 3. Backup removal is non-fatal (#10) — a leftover backup is harmlessly
|
||||
// replayed on restart. Previously, remove_file failure propagated an error
|
||||
// that caused Store::open to rename the stale backup over the good file.
|
||||
// Return type changed from Result<(), Error> to Result<bool, Error> so the
|
||||
// caller (TentativeUpdated in apply()) knows whether undo is safe (#2).
|
||||
pub(crate) async fn compress(&mut self) -> Result<bool, Error> {
|
||||
/// Sets `*committed = true` once the atomic backup rename succeeds (point
|
||||
/// of no return — the new state will be recovered on restart regardless of
|
||||
/// main file state). Before that point, the caller can safely undo.
|
||||
pub(crate) async fn compress(&mut self, committed: &mut bool) -> Result<(), Error> {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
let bak = self.path.with_extension("bak");
|
||||
let bak_tmp = bak.with_extension("bak.tmp");
|
||||
@@ -206,6 +193,7 @@ impl Store {
|
||||
backup_file.flush().await?;
|
||||
backup_file.sync_all().await?;
|
||||
tokio::fs::rename(&bak_tmp, &bak).await?;
|
||||
*committed = true;
|
||||
|
||||
// Point of no return: the backup exists with the new state. On restart,
|
||||
// Store::open will rename it over the main file. From here, errors
|
||||
@@ -225,7 +213,7 @@ impl Store {
|
||||
// matches the main file) will be harmlessly applied.
|
||||
let _ = tokio::fs::remove_file(&bak).await;
|
||||
|
||||
Ok(true)
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -267,23 +255,12 @@ impl Store {
|
||||
let mut updated = TentativeUpdated::new(self, &patch)?;
|
||||
|
||||
if updated.store.revision % 4096 == 0 {
|
||||
match updated.store.compress().await {
|
||||
Ok(_) => {
|
||||
// Compress succeeded; disarm undo (done below).
|
||||
}
|
||||
Err(e) => {
|
||||
// @claude fix #2: If compress() succeeded past the atomic
|
||||
// backup rename, the new state will be recovered on restart.
|
||||
// Rolling back in-memory would permanently desync memory vs
|
||||
// disk. Check for backup existence to decide whether undo
|
||||
// is safe.
|
||||
let bak = updated.store.path.with_extension("bak");
|
||||
if bak.exists() {
|
||||
updated.undo.take(); // disarm: can't undo past the backup
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
let mut committed = false;
|
||||
let res = updated.store.compress(&mut committed).await;
|
||||
if committed {
|
||||
updated.undo.take();
|
||||
}
|
||||
res?;
|
||||
} else {
|
||||
if updated.store.file.stream_position().await? != updated.store.file_cursor {
|
||||
updated
|
||||
@@ -313,6 +290,45 @@ impl Store {
|
||||
}
|
||||
}
|
||||
|
||||
// Iterative drop to prevent stack overflow when dropping deeply nested Value trees.
|
||||
// Without this, the compiler-generated recursive Drop can exhaust the call stack.
|
||||
impl Drop for Store {
|
||||
fn drop(&mut self) {
|
||||
if let Ok(mut lock) = OPEN_STORES.lock() {
|
||||
lock.remove(&self.path);
|
||||
}
|
||||
|
||||
// Only Array and Object can cause deep recursion
|
||||
match &self.persistent {
|
||||
Value::Array(vec) if !vec.is_empty() => {}
|
||||
Value::Object(map) if !map.is_empty() => {}
|
||||
_ => return,
|
||||
}
|
||||
|
||||
fn take_children(value: &mut Value, stack: &mut Vec<Value>) {
|
||||
match value {
|
||||
Value::Array(vec) => {
|
||||
stack.extend(std::mem::take(vec));
|
||||
}
|
||||
Value::Object(map) => {
|
||||
let vec: Vector<(InternedString, Value)> =
|
||||
std::mem::replace(map, InOMap::new()).into();
|
||||
stack.extend(vec.into_iter().map(|(_, v)| v));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let mut stack = Vec::new();
|
||||
take_children(&mut self.persistent, &mut stack);
|
||||
while let Some(mut value) = stack.pop() {
|
||||
take_children(&mut value, &mut stack);
|
||||
// `value` drops here with an empty container,
|
||||
// so the recursive Drop::drop returns immediately
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub struct MutateResult<T, E> {
|
||||
pub result: Result<T, E>,
|
||||
@@ -372,16 +388,10 @@ impl PatchDb {
|
||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||
})
|
||||
}
|
||||
pub async fn close(self) -> Result<(), Error> {
|
||||
let store = Arc::try_unwrap(self.store)
|
||||
.map_err(|_| {
|
||||
Error::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::WouldBlock,
|
||||
"other PatchDb references still exist",
|
||||
))
|
||||
})?
|
||||
.into_inner();
|
||||
store.close().await
|
||||
pub async fn close(self) {
|
||||
if let Ok(store) = Arc::try_unwrap(self.store) {
|
||||
let _ = store.into_inner().close().await;
|
||||
}
|
||||
}
|
||||
pub async fn dump<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Dump {
|
||||
self.store.read().await.dump(ptr)
|
||||
@@ -454,11 +464,6 @@ impl PatchDb {
|
||||
.await
|
||||
.into()
|
||||
}
|
||||
// @claude fix #1: Previously, `old` was read once before the loop and never
|
||||
// refreshed. If another writer modified store.persistent between the initial
|
||||
// read and the write-lock acquisition, the `old == store.persistent` check
|
||||
// failed forever — spinning the loop infinitely. Now `old` is re-read from
|
||||
// the store at the start of each iteration.
|
||||
pub async fn run_idempotent<F, Fut, T, E>(&self, f: F) -> Result<(Value, T), E>
|
||||
where
|
||||
F: Fn(Value) -> Fut + Send + Sync + UnwindSafe,
|
||||
|
||||
@@ -93,10 +93,6 @@ impl DbWatch {
|
||||
self.seen = true;
|
||||
Ok(self.state.clone())
|
||||
}
|
||||
// @claude fix #9: Previously applied only one revision per poll, emitting
|
||||
// intermediate states that may never have been a consistent committed state.
|
||||
// Now drains all queued revisions after the first wake, matching sync()
|
||||
// behavior so the caller always sees a fully caught-up snapshot.
|
||||
pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Error>> {
|
||||
if !self.seen {
|
||||
self.seen = true;
|
||||
|
||||
@@ -62,7 +62,7 @@ async fn basic() {
|
||||
db.put(&ptr, "hello").await.unwrap();
|
||||
get_res = db.get(&ptr).await.unwrap();
|
||||
assert_eq!(get_res.as_str(), Some("hello"));
|
||||
db.close().await.unwrap();
|
||||
db.close().await;
|
||||
cleanup_db(&path).await;
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ proptest! {
|
||||
let path = unique_db_path("proptest");
|
||||
let db = init_db(path.clone()).await;
|
||||
put_string_into_root(&db, s).await;
|
||||
db.close().await.unwrap();
|
||||
db.close().await;
|
||||
cleanup_db(&path).await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -634,11 +634,6 @@ impl JsonPointer<String> {
|
||||
PtrSegment::Unescaped(1..prefix_len)
|
||||
});
|
||||
}
|
||||
// @claude fix #3: Previously just inserted the new segment without adjusting
|
||||
// existing segment ranges (unlike push_start which did shift them). All
|
||||
// existing segments' byte ranges became wrong, causing corrupted pointer
|
||||
// lookups or panics. Now shifts ranges forward by prefix_len, matching
|
||||
// push_start behavior.
|
||||
pub fn push_start_idx(&mut self, segment: usize) {
|
||||
let mut src = format!("/{}", segment);
|
||||
let prefix_len = src.len();
|
||||
|
||||
Reference in New Issue
Block a user