audit fixes, repo restructure, and documentation

Soundness and performance audit (17 fixes):
- See AUDIT.md for full details and @claude comments in code

Repo restructure:
- Inline json-ptr and json-patch submodules as regular directories
- Remove cbor submodule, replace serde_cbor with ciborium
- Rename patch-db/ -> core/, patch-db-macro/ -> macro/,
  patch-db-macro-internals/ -> macro-internals/, patch-db-util/ -> util/
- Purge upstream CI/CD, bench, and release cruft from json-patch
- Remove .gitmodules

Test fixes:
- Fix proptest doesnt_crash (unique file paths, proper close/cleanup)
- Add PatchDb::close() for clean teardown

Documentation:
- Add README.md, ARCHITECTURE.md, CONTRIBUTING.md, CLAUDE.md, AUDIT.md
- Add TSDocs to TypeScript client exports

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt Hill
2026-02-23 19:06:42 -07:00
parent 05c93290c7
commit 86b0768bbb
46 changed files with 5744 additions and 95 deletions

3
core/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/target
Cargo.lock
test.db

48
core/Cargo.toml Normal file
View File

@@ -0,0 +1,48 @@
[package]
authors = ["Aiden McClelland <aiden@start9labs.com>"]
categories = ["database-implementations"]
description = "A database that tracks state updates as RFC 6902 JSON Patches"
edition = "2018"
keywords = ["json", "json-pointer", "json-patch"]
license = "MIT"
name = "patch-db"
readme = "README.md"
repository = "https://github.com/Start9Labs/patch-db"
version = "0.1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
debug = ["tracing"]
trace = ["debug", "tracing-error"]
unstable = []
[dependencies]
async-trait = "0.1.42"
fd-lock-rs = "0.1.3"
futures = "0.3.8"
imbl = "6"
imbl-value = "0.4.1"
json-patch = { path = "../json-patch" }
json-ptr = { path = "../json-ptr" }
lazy_static = "1.4.0"
tracing = { version = "0.1.29", optional = true }
tracing-error = { version = "0.2.0", optional = true }
nix = "0.30.1"
patch-db-macro = { path = "../macro" }
serde = { version = "1", features = ["rc"] }
ciborium = "0.2"
thiserror = "2"
tokio = { version = "1", features = ["sync", "fs", "rt", "io-util", "macros"] }
[dev-dependencies]
proptest = "1.0.0"
serde = { version = "1.0.118", features = ["rc", "derive"] }
tokio = { version = "1.0.1", features = [
"sync",
"fs",
"rt",
"rt-multi-thread",
"io-util",
"macros",
] }
rand = "0.9.1"

View File

@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc b68e3527ee511046c2e96c9b75163422af755c5204f4a3cfaa973b8293bbf961 # shrinks to lock_order = [LockInfo { handle_id: HandleId { id: 0 }, ptr: PossibleStarPath { path: [Star], count: 1 }, ty: Exist }]

View File

@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 32f96b89f02e5b9dfcc4de5a76c05a2a5e9ef1b232ea9cb5953a29169bec3b16 # shrinks to left = "", right = ""

View File

@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc a239369714309ab23267c160f9243dca5b57da78a5abe0455a26df99f8a3300b # shrinks to s = "$💡;"

2
core/rustfmt.toml Normal file
View File

@@ -0,0 +1,2 @@
group_imports = "StdExternalCrate"
imports_granularity = "Module"

52
core/src/lib.rs Normal file
View File

@@ -0,0 +1,52 @@
use std::io::Error as IOError;
use std::sync::Arc;
use json_ptr::JsonPointer;
use thiserror::Error;
mod model;
mod patch;
mod store;
mod subscriber;
#[cfg(test)]
mod test;
pub use imbl_value::Value;
pub use model::{DestructureMut, HasModel, Model, ModelExt, Pointer};
pub use patch::{DiffPatch, Dump, Revision};
pub use patch_db_macro::HasModel;
pub use store::{MutateResult, PatchDb, Store, TypedPatchDb};
pub use subscriber::{DbWatch, Subscriber, TypedDbWatch};
use tokio::sync::TryLockError;
pub use {imbl_value as value, json_patch, json_ptr};
#[derive(Error, Debug)]
pub enum Error {
#[error("IO Error: {0}")]
IO(#[from] IOError),
#[error("JSON (De)Serialization Error: {0}")]
JSON(#[from] imbl_value::Error),
#[error("CBOR Deserialization Error: {0}")]
CborDe(#[from] ciborium::de::Error<IOError>),
#[error("CBOR Serialization Error: {0}")]
CborSer(#[from] ciborium::ser::Error<IOError>),
#[error("Index Error: {0:?}")]
Pointer(#[from] json_ptr::IndexError),
#[error("Patch Error: {0}")]
Patch(#[from] json_patch::PatchError),
#[error("Join Error: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("FD Lock Error: {0}")]
FDLock(#[from] fd_lock_rs::Error),
#[error("Database Cache Corrupted: {0}")]
CacheCorrupted(Arc<Error>),
#[error("Subscriber Error: {0:?}")]
Subscriber(#[from] tokio::sync::mpsc::error::TryRecvError),
#[error("Node Does Not Exist: {0}")]
NodeDoesNotExist(JsonPointer),
#[error("Provided Function Panicked! {0}")]
Panic(String),
#[error("Would Block")]
WouldBlock(#[from] TryLockError),
}

236
core/src/model.rs Normal file
View File

@@ -0,0 +1,236 @@
use std::marker::PhantomData;
use imbl_value::{InternedString, Value};
use json_ptr::JsonPointer;
pub struct Pointer<T> {
ptr: JsonPointer,
phantom: PhantomData<T>,
}
impl<T> Default for Pointer<T> {
fn default() -> Self {
Self {
ptr: JsonPointer::default(),
phantom: PhantomData,
}
}
}
impl<T> Pointer<T> {
pub fn unwrap(self) -> JsonPointer {
self.ptr
}
}
impl<T> std::ops::Deref for Pointer<T> {
type Target = JsonPointer;
fn deref(&self) -> &Self::Target {
&self.ptr
}
}
pub trait HasModel: Sized {
type Model: Model<Self>;
}
mod sealed {
use super::*;
pub trait ModelMarker {
fn into_value(self) -> Value;
fn from_value(value: Value) -> Self;
fn as_value<'a>(&'a self) -> &'a Value;
fn value_as<'a>(value: &'a Value) -> &'a Self;
fn as_value_mut<'a>(&'a mut self) -> &'a mut Value;
fn value_as_mut<'a>(value: &'a mut Value) -> &'a mut Self;
}
impl<T> ModelMarker for T
where
T: From<Value> + Into<Value> + Sized,
for<'a> &'a T: From<&'a Value> + Into<&'a Value>,
for<'a> &'a mut T: From<&'a mut Value> + Into<&'a mut Value>,
{
fn into_value(self) -> Value {
self.into()
}
fn from_value(value: Value) -> Self {
value.into()
}
fn as_value<'a>(&'a self) -> &'a Value {
self.into()
}
fn value_as<'a>(value: &'a Value) -> &'a Self {
value.into()
}
fn as_value_mut<'a>(&'a mut self) -> &'a mut Value {
self.into()
}
fn value_as_mut<'a>(value: &'a mut Value) -> &'a mut Self {
value.into()
}
}
}
pub trait Model<T>: sealed::ModelMarker + Sized {
type Model<U>: Model<U>;
}
pub trait ModelExt<T>: Model<T> {
fn into_value(self) -> Value {
<Self as sealed::ModelMarker>::into_value(self)
}
fn from_value(value: Value) -> Self {
<Self as sealed::ModelMarker>::from_value(value)
}
fn as_value<'a>(&'a self) -> &'a Value {
<Self as sealed::ModelMarker>::as_value(self)
}
fn value_as<'a>(value: &'a Value) -> &'a Self {
<Self as sealed::ModelMarker>::value_as(value)
}
fn as_value_mut<'a>(&'a mut self) -> &'a mut Value {
<Self as sealed::ModelMarker>::as_value_mut(self)
}
fn value_as_mut<'a>(value: &'a mut Value) -> &'a mut Self {
<Self as sealed::ModelMarker>::value_as_mut(value)
}
fn transmute<U>(self, f: impl FnOnce(Value) -> Value) -> Self::Model<U> {
Self::Model::<U>::from_value(f(<Self as sealed::ModelMarker>::into_value(self)))
}
fn transmute_ref<'a, U>(
&'a self,
f: impl for<'b> FnOnce(&'b Value) -> &'b Value,
) -> &'a Self::Model<U> {
Self::Model::<U>::value_as(f(<Self as sealed::ModelMarker>::as_value(self)))
}
fn transmute_mut<'a, U>(
&'a mut self,
f: impl for<'b> FnOnce(&'b mut Value) -> &'b mut Value,
) -> &'a mut Self::Model<U> {
Self::Model::<U>::value_as_mut(f(<Self as sealed::ModelMarker>::as_value_mut(self)))
}
fn children_mut<'a>(
&'a mut self,
) -> impl IntoIterator<Item = (&'a InternedString, &'a mut Value)> + Send + Sync {
ModelExt::<T>::as_value_mut(self)
.as_object_mut()
.into_iter()
.flat_map(|o| o.iter_mut().map(|(k, v)| (&*k, v)))
}
}
impl<T, M: Model<T>> ModelExt<T> for M {}
pub trait DestructureMut {
type Destructured<'a>
where
Self: 'a;
fn destructure_mut<'a>(&'a mut self) -> Self::Destructured<'a>;
}
#[cfg(test)]
mod test {
use std::marker::PhantomData;
use imbl_value::{from_value, json, to_value, Value};
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate as patch_db;
/// &mut Model<T> <=> &mut Value
#[repr(transparent)]
#[derive(Debug)]
pub struct Model<T> {
value: Value,
phantom: PhantomData<T>,
}
impl<T: DeserializeOwned> Model<T> {
pub fn de(self) -> Result<T, imbl_value::Error> {
from_value(self.value)
}
}
impl<T: Serialize> Model<T> {
pub fn ser(&mut self, value: &T) -> Result<(), imbl_value::Error> {
self.value = to_value(value)?;
Ok(())
}
}
impl<T> Clone for Model<T> {
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
phantom: PhantomData,
}
}
}
impl<T> From<Value> for Model<T> {
fn from(value: Value) -> Self {
Self {
value,
phantom: PhantomData,
}
}
}
impl<T> From<Model<T>> for Value {
fn from(value: Model<T>) -> Self {
value.value
}
}
impl<'a, T> From<&'a Value> for &'a Model<T> {
fn from(value: &'a Value) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<'a, T> From<&'a Model<T>> for &'a Value {
fn from(value: &'a Model<T>) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<'a, T> From<&'a mut Value> for &mut Model<T> {
fn from(value: &'a mut Value) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<'a, T> From<&'a mut Model<T>> for &mut Value {
fn from(value: &'a mut Model<T>) -> Self {
unsafe { std::mem::transmute(value) }
}
}
impl<T> patch_db::Model<T> for Model<T> {
type Model<U> = Model<U>;
}
#[derive(crate::HasModel)]
#[model = "Model<Self>"]
// #[macro_debug]
struct Foo {
a: Bar,
}
#[derive(crate::HasModel)]
#[model = "Model<Self>"]
struct Bar {
b: String,
}
fn mutate_fn(v: &mut Model<Foo>) {
let mut a = v.as_a_mut();
a.as_b_mut().ser(&"NotThis".into()).unwrap();
a.as_b_mut().ser(&"Replaced".into()).unwrap();
}
#[test]
fn test() {
let mut model = Model::<Foo>::from(imbl_value::json!({
"a": {
"b": "ReplaceMe"
}
}));
mutate_fn(&mut model);
mutate_fn(&mut model);
assert_eq!(
crate::model::sealed::ModelMarker::as_value(&model),
&json!({
"a": {
"b": "Replaced"
}
})
)
}
}

236
core/src/patch.rs Normal file
View File

@@ -0,0 +1,236 @@
use std::collections::BTreeSet;
use std::ops::Deref;
use imbl_value::Value;
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct Revision {
pub id: u64,
pub patch: DiffPatch,
}
impl Revision {
pub fn for_path<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Revision {
Self {
id: self.id,
patch: self.patch.for_path(ptr),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct Dump {
pub id: u64,
pub value: Value,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DiffPatch(pub(crate) Patch);
impl DiffPatch {
pub fn prepend<S: AsRef<str>, V: SegList>(&mut self, ptr: &JsonPointer<S, V>) {
self.0.prepend(ptr)
}
pub fn append(&mut self, other: DiffPatch) {
(self.0).0.extend((other.0).0)
}
// safe to assume dictionary style symantics for arrays since patches will always be rebased before being applied
pub fn for_path<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> DiffPatch {
let DiffPatch(Patch(ops)) = self;
DiffPatch(Patch(
ops.iter()
.filter_map(|op| match op {
PatchOperation::Add(op) => {
if let Some(tail) = op.path.strip_prefix(ptr) {
Some(PatchOperation::Add(AddOperation {
path: tail.to_owned(),
value: op.value.clone(),
}))
} else if let Some(tail) = ptr.strip_prefix(&op.path) {
Some(PatchOperation::Add(AddOperation {
path: Default::default(),
value: tail.get(&op.value).cloned().unwrap_or_default(),
}))
} else {
None
}
}
PatchOperation::Replace(op) => {
if let Some(tail) = op.path.strip_prefix(ptr) {
Some(PatchOperation::Replace(ReplaceOperation {
path: tail.to_owned(),
value: op.value.clone(),
}))
} else if let Some(tail) = ptr.strip_prefix(&op.path) {
Some(PatchOperation::Replace(ReplaceOperation {
path: Default::default(),
value: tail.get(&op.value).cloned().unwrap_or_default(),
}))
} else {
None
}
}
PatchOperation::Remove(op) => {
if ptr.starts_with(&op.path) {
Some(PatchOperation::Replace(ReplaceOperation {
path: Default::default(),
value: Default::default(),
}))
} else if let Some(tail) = op.path.strip_prefix(ptr) {
Some(PatchOperation::Remove(RemoveOperation {
path: tail.to_owned(),
}))
} else {
None
}
}
_ => unreachable!(),
})
.collect(),
))
}
pub fn rebase(&mut self, onto: &DiffPatch) {
let DiffPatch(Patch(ops)) = self;
let DiffPatch(Patch(onto_ops)) = onto;
for onto_op in onto_ops {
if let PatchOperation::Add(onto_op) = onto_op {
let arr_path_idx = onto_op.path.len() - 1;
if let Some(onto_idx) = onto_op
.path
.get_segment(arr_path_idx)
.and_then(|seg| seg.parse::<usize>().ok())
{
let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default();
for op in ops.iter_mut() {
let path = match op {
PatchOperation::Add(op) => &mut op.path,
PatchOperation::Replace(op) => &mut op.path,
PatchOperation::Remove(op) => &mut op.path,
_ => unreachable!(),
};
if path.starts_with(&prefix) {
if let Some(idx) = path
.get_segment(arr_path_idx)
.and_then(|seg| seg.parse::<usize>().ok())
{
if idx >= onto_idx {
let mut new_path = prefix.clone().to_owned();
new_path.push_end_idx(idx + 1);
if let Some(tail) = path.slice(arr_path_idx + 1..) {
new_path.append(&tail);
}
*path = new_path;
}
}
}
}
}
} else if let PatchOperation::Remove(onto_op) = onto_op {
let arr_path_idx = onto_op.path.len() - 1;
if let Some(onto_idx) = onto_op
.path
.get_segment(arr_path_idx)
.and_then(|seg| seg.parse::<usize>().ok())
{
let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default();
for op in ops.iter_mut() {
let path = match op {
PatchOperation::Add(op) => &mut op.path,
PatchOperation::Replace(op) => &mut op.path,
PatchOperation::Remove(op) => &mut op.path,
_ => unreachable!(),
};
if path.starts_with(&prefix) {
if let Some(idx) = path
.get_segment(arr_path_idx)
.and_then(|seg| seg.parse::<usize>().ok())
{
// @claude fix #4: Was `idx >= onto_idx`, which caused
// `idx - 1` to underflow when both were 0 (panic in
// debug, wraps to usize::MAX in release).
if idx > onto_idx {
let mut new_path = prefix.clone().to_owned();
new_path.push_end_idx(idx - 1);
if let Some(tail) = path.slice(arr_path_idx + 1..) {
new_path.append(&tail);
}
*path = new_path;
}
}
}
}
}
}
}
}
pub fn exists(&self) -> Option<bool> {
let mut res = None;
for op in &(self.0).0 {
match op {
PatchOperation::Add(a) => {
if a.path.is_empty() {
res = Some(!a.value.is_null());
}
}
PatchOperation::Replace(a) => {
if a.path.is_empty() {
res = Some(!a.value.is_null())
}
}
PatchOperation::Remove(a) => {
if a.path.is_empty() {
res = Some(false)
}
}
_ => unreachable!(),
}
}
res
}
pub fn keys(&self, mut keys: BTreeSet<String>) -> BTreeSet<String> {
for op in &(self.0).0 {
match op {
PatchOperation::Add(a) => {
if a.path.len() == 1 {
keys.insert(a.path.get_segment(0).unwrap().to_owned());
}
}
PatchOperation::Replace(_) => (),
PatchOperation::Remove(a) => {
if a.path.len() == 1 {
keys.remove(a.path.get_segment(0).unwrap());
}
}
_ => unreachable!(),
}
}
keys
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl Default for DiffPatch {
fn default() -> Self {
DiffPatch(Patch(Vec::default()))
}
}
impl Deref for DiffPatch {
type Target = Patch;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub fn diff(left: &Value, right: &Value) -> DiffPatch {
DiffPatch(json_patch::diff(left, right))
}

575
core/src/store.rs Normal file
View File

@@ -0,0 +1,575 @@
use std::collections::{BTreeSet, HashMap};
use std::fs::OpenOptions;
use std::io::{SeekFrom, Write};
use std::marker::PhantomData;
use std::panic::UnwindSafe;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use fd_lock_rs::FdLock;
use futures::{Future, FutureExt};
use imbl_value::{InternedString, Value};
use json_patch::PatchError;
use json_ptr::{JsonPointer, SegList, ROOT};
use lazy_static::lazy_static;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::AsyncSeekExt;
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
use crate::patch::{diff, DiffPatch, Dump, Revision};
use crate::subscriber::Broadcast;
use crate::{DbWatch, Error, HasModel, Subscriber};
lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
}
pub struct Store {
path: PathBuf,
file: FdLock<File>,
file_cursor: u64,
_lock: OwnedMutexGuard<()>,
persistent: Value,
revision: u64,
broadcast: Broadcast,
}
impl Store {
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let (_lock, path) = {
if !path.as_ref().exists() {
tokio::fs::File::create(path.as_ref()).await?;
}
let path = tokio::fs::canonicalize(path).await?;
let mut lock = OPEN_STORES.lock().await;
(
if let Some(open) = lock.get(&path) {
open.clone().try_lock_owned()?
} else {
let tex = Arc::new(Mutex::new(()));
lock.insert(path.clone(), tex.clone());
tex.try_lock_owned()?
},
path,
)
};
let mut res = tokio::task::spawn_blocking(move || {
use std::io::Seek;
let bak = path.with_extension("bak");
if bak.exists() {
std::fs::rename(&bak, &path)?;
}
let mut f = FdLock::lock(
OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&path)?,
fd_lock_rs::LockType::Exclusive,
false,
)?;
let mut reader = std::io::BufReader::new(&mut *f);
let mut revision: u64 =
ciborium::from_reader(&mut reader).unwrap_or(0);
let mut persistent: Value =
ciborium::from_reader(&mut reader).unwrap_or(Value::Null);
while let Ok(patch) =
ciborium::from_reader::<json_patch::Patch, _>(&mut reader)
{
if let Err(_) = json_patch::patch(&mut persistent, &patch) {
#[cfg(feature = "tracing")]
tracing::error!("Error applying patch, skipping...");
writeln!(
OpenOptions::new()
.create(true)
.append(true)
.open(path.with_extension("failed"))?,
"{}",
imbl_value::to_value(&patch).map_err(Error::JSON)?,
)?;
}
revision += 1;
}
let file_cursor = f.stream_position()?;
Ok::<_, Error>(Store {
path,
file: f.map(File::from_std),
file_cursor,
_lock,
persistent,
revision,
broadcast: Broadcast::new(),
})
})
.await??;
res.compress().await.map(|_| ())?;
Ok(res)
}
pub async fn close(mut self) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
self.file.flush().await?;
self.file.shutdown().await?;
self.file.unlock(true).map_err(|e| e.1)?;
// @claude fix #15: OPEN_STORES never removed entries, causing unbounded
// growth over the lifetime of a process. Now cleaned up on close().
let mut lock = OPEN_STORES.lock().await;
lock.remove(&self.path);
Ok(())
}
// @claude fix #18: Previously compared against Value::Null, which conflated
// an explicit JSON null with a missing key. Now uses .is_some() so that a
// key with null value is correctly reported as existing.
pub(crate) fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
ptr.get(&self.persistent).is_some()
}
pub(crate) fn keys<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> BTreeSet<InternedString> {
match ptr.get(&self.persistent).unwrap_or(&Value::Null) {
Value::Object(o) => o.keys().cloned().collect(),
_ => BTreeSet::new(),
}
}
pub(crate) fn get_value<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Value {
ptr.get(&self.persistent).unwrap_or(&Value::Null).clone()
}
pub(crate) fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
Ok(imbl_value::from_value(self.get_value(ptr))?)
}
pub(crate) fn dump<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Dump {
Dump {
id: self.revision,
value: ptr.get(&self.persistent).cloned().unwrap_or(Value::Null),
}
}
pub(crate) fn sequence(&self) -> u64 {
self.revision
}
pub(crate) fn subscribe(&mut self, ptr: JsonPointer) -> Subscriber {
self.broadcast.subscribe(ptr)
}
pub(crate) async fn put_value<S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<Option<Arc<Revision>>, Error> {
let mut patch = diff(ptr.get(&self.persistent).unwrap_or(&Value::Null), value);
patch.prepend(ptr);
self.apply(patch).await
}
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
self.put_value(ptr, &imbl_value::to_value(&value)?).await
}
/// Compresses the database file by writing a fresh snapshot.
///
/// Returns `true` if the backup was committed (point of no return — the new
/// state will be recovered on restart regardless of main file state).
/// Returns `false` if the backup was never committed (safe to undo in memory).
///
// @claude fix #2 + #10: Rewrote compress with three explicit phases:
// 1. Atomic backup via tmp+rename (safe to undo before this point)
// 2. Main file rewrite (backup ensures crash recovery; undo is unsafe)
// 3. Backup removal is non-fatal (#10) — a leftover backup is harmlessly
// replayed on restart. Previously, remove_file failure propagated an error
// that caused Store::open to rename the stale backup over the good file.
// Return type changed from Result<(), Error> to Result<bool, Error> so the
// caller (TentativeUpdated in apply()) knows whether undo is safe (#2).
pub(crate) async fn compress(&mut self) -> Result<bool, Error> {
use tokio::io::AsyncWriteExt;
let bak = self.path.with_extension("bak");
let bak_tmp = bak.with_extension("bak.tmp");
let mut revision_cbor = Vec::new();
ciborium::into_writer(&self.revision, &mut revision_cbor)?;
let mut data_cbor = Vec::new();
ciborium::into_writer(&self.persistent, &mut data_cbor)?;
// Phase 1: Create atomic backup. If this fails, the main file is
// untouched and the caller can safely undo the in-memory patch.
let mut backup_file = File::create(&bak_tmp).await?;
backup_file.write_all(&revision_cbor).await?;
backup_file.write_all(&data_cbor).await?;
backup_file.flush().await?;
backup_file.sync_all().await?;
tokio::fs::rename(&bak_tmp, &bak).await?;
// Point of no return: the backup exists with the new state. On restart,
// Store::open will rename it over the main file. From here, errors
// must NOT cause an in-memory undo.
// Phase 2: Rewrite main file. If this fails, the backup ensures crash
// recovery. We propagate the error but signal that undo is unsafe.
self.file.set_len(0).await?;
self.file.seek(SeekFrom::Start(0)).await?;
self.file.write_all(&revision_cbor).await?;
self.file.write_all(&data_cbor).await?;
self.file.flush().await?;
self.file.sync_all().await?;
self.file_cursor = self.file.stream_position().await?;
// Phase 3: Remove backup. Non-fatal — on restart, the backup (which
// matches the main file) will be harmlessly applied.
let _ = tokio::fs::remove_file(&bak).await;
Ok(true)
}
pub(crate) async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
use tokio::io::AsyncWriteExt;
// eject if noop
if (patch.0).0.is_empty() {
return Ok(None);
}
struct TentativeUpdated<'a> {
store: &'a mut Store,
undo: Option<json_patch::Undo<'a>>,
}
impl<'a> TentativeUpdated<'a> {
fn new(store: &'a mut Store, patch: &'a DiffPatch) -> Result<Self, PatchError> {
let undo = json_patch::patch(&mut store.persistent, &*patch)?;
store.revision += 1;
Ok(Self {
store,
undo: Some(undo),
})
}
}
impl<'a> Drop for TentativeUpdated<'a> {
fn drop(&mut self) {
if let Some(undo) = self.undo.take() {
undo.apply(&mut self.store.persistent);
self.store.revision -= 1;
}
}
}
#[cfg(feature = "tracing")]
tracing::trace!("Attempting to apply patch: {:?}", patch);
// apply patch in memory
let mut patch_bin = Vec::new();
ciborium::into_writer(&*patch, &mut patch_bin)?;
let mut updated = TentativeUpdated::new(self, &patch)?;
if updated.store.revision % 4096 == 0 {
match updated.store.compress().await {
Ok(_) => {
// Compress succeeded; disarm undo (done below).
}
Err(e) => {
// @claude fix #2: If compress() succeeded past the atomic
// backup rename, the new state will be recovered on restart.
// Rolling back in-memory would permanently desync memory vs
// disk. Check for backup existence to decide whether undo
// is safe.
let bak = updated.store.path.with_extension("bak");
if bak.exists() {
updated.undo.take(); // disarm: can't undo past the backup
}
return Err(e);
}
}
} else {
if updated.store.file.stream_position().await? != updated.store.file_cursor {
updated
.store
.file
.set_len(updated.store.file_cursor)
.await?;
updated
.store
.file
.seek(SeekFrom::Start(updated.store.file_cursor))
.await?;
}
updated.store.file.write_all(&patch_bin).await?;
updated.store.file.flush().await?;
updated.store.file.sync_all().await?;
updated.store.file_cursor += patch_bin.len() as u64;
}
drop(updated.undo.take());
drop(updated);
let id = self.revision;
let res = Arc::new(Revision { id, patch });
self.broadcast.send(&res);
Ok(Some(res))
}
}
#[must_use]
pub struct MutateResult<T, E> {
pub result: Result<T, E>,
pub revision: Option<Arc<Revision>>,
}
impl<T, E> MutateResult<T, E> {
pub fn map_result<T0, E0, F: FnOnce(Result<T, E>) -> Result<T0, E0>>(
self,
f: F,
) -> MutateResult<T0, E0> {
MutateResult {
result: f(self.result),
revision: self.revision,
}
}
pub fn map_ok<T0, F: FnOnce(T) -> T0>(self, f: F) -> MutateResult<T0, E> {
MutateResult {
result: self.result.map(f),
revision: self.revision,
}
}
pub fn map_err<E0, F: FnOnce(E) -> E0>(self, f: F) -> MutateResult<T, E0> {
MutateResult {
result: self.result.map_err(f),
revision: self.revision,
}
}
pub fn and_then<T0, F: FnOnce(T) -> Result<T0, E>>(self, f: F) -> MutateResult<T0, E> {
MutateResult {
result: self.result.and_then(f),
revision: self.revision,
}
}
}
impl<T, E> From<Result<(T, Option<Arc<Revision>>), E>> for MutateResult<T, E> {
fn from(value: Result<(T, Option<Arc<Revision>>), E>) -> Self {
match value {
Ok((result, revision)) => Self {
result: Ok(result),
revision,
},
Err(e) => Self {
result: Err(e),
revision: None,
},
}
}
}
#[derive(Clone)]
pub struct PatchDb {
pub(crate) store: Arc<RwLock<Store>>,
}
impl PatchDb {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
Ok(PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)),
})
}
pub async fn close(self) -> Result<(), Error> {
let store = Arc::try_unwrap(self.store)
.map_err(|_| {
Error::IO(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"other PatchDb references still exist",
))
})?
.into_inner();
store.close().await
}
pub async fn dump<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Dump {
self.store.read().await.dump(ptr)
}
pub async fn sequence(&self) -> u64 {
self.store.read().await.sequence()
}
pub async fn dump_and_sub(&self, ptr: JsonPointer) -> (Dump, Subscriber) {
let mut store = self.store.write().await;
(store.dump(&ptr), store.broadcast.subscribe(ptr))
}
pub async fn watch(&self, ptr: JsonPointer) -> DbWatch {
let (dump, sub) = self.dump_and_sub(ptr).await;
DbWatch::new(dump, sub)
}
pub async fn subscribe(&self, ptr: JsonPointer) -> Subscriber {
self.store.write().await.subscribe(ptr)
}
pub async fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
self.store.read().await.exists(ptr)
}
pub async fn keys<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> BTreeSet<InternedString> {
self.store.read().await.keys(ptr)
}
pub async fn get_value<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> Value {
self.store.read().await.get_value(ptr)
}
pub async fn get<T: for<'de> Deserialize<'de>, S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
) -> Result<T, Error> {
self.store.read().await.get(ptr)
}
pub async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
let mut store = self.store.write().await;
let rev = store.put(ptr, value).await?;
Ok(rev)
}
pub async fn apply(&self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
let mut store = self.store.write().await;
let rev = store.apply(patch).await?;
Ok(rev)
}
pub async fn apply_function<F, T, E>(&self, f: F) -> MutateResult<(Value, T), E>
where
F: FnOnce(Value) -> Result<(Value, T), E> + UnwindSafe,
E: From<Error>,
{
async {
let mut store = self.store.write().await;
let old = store.persistent.clone();
let (new, res) = std::panic::catch_unwind(move || f(old)).map_err(|e| {
Error::Panic(
e.downcast()
.map(|a| *a)
.unwrap_or_else(|_| "UNKNOWN".to_owned()),
)
})??;
let diff = diff(&store.persistent, &new);
let rev = store.apply(diff).await?;
Ok(((new, res), rev))
}
.await
.into()
}
// @claude fix #1: Previously, `old` was read once before the loop and never
// refreshed. If another writer modified store.persistent between the initial
// read and the write-lock acquisition, the `old == store.persistent` check
// failed forever — spinning the loop infinitely. Now `old` is re-read from
// the store at the start of each iteration.
pub async fn run_idempotent<F, Fut, T, E>(&self, f: F) -> Result<(Value, T), E>
where
F: Fn(Value) -> Fut + Send + Sync + UnwindSafe,
for<'a> &'a F: UnwindSafe,
Fut: std::future::Future<Output = Result<(Value, T), E>> + UnwindSafe,
E: From<Error>,
{
loop {
let store = self.store.read().await;
let old = store.persistent.clone();
drop(store);
let (new, res) = async { f(old.clone()).await }
.catch_unwind()
.await
.map_err(|e| {
Error::Panic(
e.downcast()
.map(|a| *a)
.unwrap_or_else(|_| "UNKNOWN".to_owned()),
)
})??;
let mut store = self.store.write().await;
if old == store.persistent {
let diff = diff(&store.persistent, &new);
store.apply(diff).await?;
return Ok((new, res));
}
// State changed since we read it; retry with the fresh value
}
}
}
pub struct TypedPatchDb<T: HasModel, E: From<Error> = Error> {
db: PatchDb,
_phantom: PhantomData<(T, E)>,
}
impl<T: HasModel, E: From<Error>> Clone for TypedPatchDb<T, E> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
_phantom: PhantomData,
}
}
}
impl<T: HasModel, E: From<Error>> std::ops::Deref for TypedPatchDb<T, E> {
type Target = PatchDb;
fn deref(&self) -> &Self::Target {
&self.db
}
}
impl<T: HasModel, E: From<Error>> TypedPatchDb<T, E> {
pub fn load_unchecked(db: PatchDb) -> Self {
Self {
db,
_phantom: PhantomData,
}
}
pub async fn peek(&self) -> T::Model {
use crate::ModelExt;
T::Model::from_value(self.db.dump(&ROOT).await.value)
}
pub async fn mutate<U: UnwindSafe + Send>(
&self,
f: impl FnOnce(&mut T::Model) -> Result<U, E> + UnwindSafe + Send,
) -> MutateResult<U, E> {
use crate::ModelExt;
self.apply_function(|mut v| {
let model = T::Model::value_as_mut(&mut v);
let res = f(model)?;
Ok::<_, E>((v, res))
})
.await
.map_ok(|(_, v)| v)
}
pub async fn map_mutate(
&self,
f: impl FnOnce(T::Model) -> Result<T::Model, E> + UnwindSafe + Send,
) -> MutateResult<T::Model, E> {
use crate::ModelExt;
self.apply_function(|v| f(T::Model::from_value(v)).map(|a| (T::Model::into_value(a), ())))
.await
.map_ok(|(v, _)| T::Model::from_value(v))
}
}
impl<T: HasModel + DeserializeOwned + Serialize, E: From<Error>> TypedPatchDb<T, E> {
pub async fn load(db: PatchDb) -> Result<Self, E> {
use crate::ModelExt;
let res = Self::load_unchecked(db);
res.map_mutate(|db| {
Ok(T::Model::from_value(
imbl_value::to_value(
&imbl_value::from_value::<T>(db.into_value()).map_err(Error::from)?,
)
.map_err(Error::from)?,
))
})
.await
.result?;
Ok(res)
}
pub async fn load_or_init<F: FnOnce() -> Fut, Fut: Future<Output = Result<T, E>>>(
db: PatchDb,
init: F,
) -> Result<Self, E> {
if db.dump(&ROOT).await.value.is_null() {
db.put(&ROOT, &init().await?).await?;
Ok(Self::load_unchecked(db))
} else {
Self::load(db).await
}
}
}

185
core/src/subscriber.rs Normal file
View File

@@ -0,0 +1,185 @@
use std::marker::PhantomData;
use std::task::{ready, Poll};
use futures::Stream;
use imbl_value::Value;
use json_patch::patch;
use json_ptr::JsonPointer;
use tokio::sync::mpsc;
use crate::{Dump, Error, HasModel, ModelExt, Revision};
pub type Subscriber = mpsc::UnboundedReceiver<Revision>;
#[derive(Debug)]
struct ScopedSender(JsonPointer, mpsc::UnboundedSender<Revision>);
impl ScopedSender {
fn send(&self, revision: &Revision) -> Result<(), mpsc::error::SendError<Revision>> {
let scoped = revision.for_path(&self.0);
if scoped.patch.is_empty() {
return Ok(());
}
self.1.send(scoped)
}
}
#[derive(Debug)]
pub struct Broadcast {
listeners: Vec<ScopedSender>,
}
impl Default for Broadcast {
fn default() -> Self {
Self {
listeners: Vec::new(),
}
}
}
impl Broadcast {
pub fn new() -> Self {
Default::default()
}
pub fn send(&mut self, value: &Revision) {
let mut i = 0;
while i < self.listeners.len() {
if self.listeners[i].send(value).is_err() {
self.listeners.swap_remove(i);
} else {
i += 1;
}
}
}
pub fn subscribe(&mut self, ptr: JsonPointer) -> Subscriber {
let (send, recv) = mpsc::unbounded_channel();
self.listeners.push(ScopedSender(ptr, send));
recv
}
}
#[derive(Debug)]
pub struct DbWatch {
state: Value,
subscriber: Subscriber,
seen: bool,
}
impl DbWatch {
pub fn new(dump: Dump, sub: Subscriber) -> Self {
Self {
state: dump.value,
subscriber: sub,
seen: false,
}
}
pub fn typed<T>(self) -> TypedDbWatch<T> {
TypedDbWatch {
watch: self,
_phantom: PhantomData,
}
}
pub fn sync(&mut self) -> Result<(), Error> {
while let Ok(rev) = self.subscriber.try_recv() {
patch(&mut self.state, &rev.patch.0)?;
self.seen = false;
}
Ok(())
}
pub fn peek(&mut self) -> Result<Value, Error> {
self.sync()?;
Ok(self.state.clone())
}
pub fn peek_and_mark_seen(&mut self) -> Result<Value, Error> {
self.sync()?;
self.seen = true;
Ok(self.state.clone())
}
// @claude fix #9: Previously applied only one revision per poll, emitting
// intermediate states that may never have been a consistent committed state.
// Now drains all queued revisions after the first wake, matching sync()
// behavior so the caller always sees a fully caught-up snapshot.
pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Error>> {
if !self.seen {
self.seen = true;
return Poll::Ready(Ok(()));
}
let rev =
ready!(self.subscriber.poll_recv(cx)).ok_or(mpsc::error::TryRecvError::Disconnected)?;
patch(&mut self.state, &rev.patch.0)?;
while let Ok(rev) = self.subscriber.try_recv() {
patch(&mut self.state, &rev.patch.0)?;
}
Poll::Ready(Ok(()))
}
pub async fn changed(&mut self) -> Result<(), Error> {
futures::future::poll_fn(|cx| self.poll_changed(cx)).await
}
}
impl Unpin for DbWatch {}
impl Stream for DbWatch {
type Item = Result<Value, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Err(e) = ready!(this.poll_changed(cx)) {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Some(Ok(this.state.clone())))
}
}
pub struct TypedDbWatch<T> {
watch: DbWatch,
_phantom: PhantomData<T>,
}
impl<T> AsRef<DbWatch> for TypedDbWatch<T> {
fn as_ref(&self) -> &DbWatch {
&self.watch
}
}
impl<T> AsMut<DbWatch> for TypedDbWatch<T> {
fn as_mut(&mut self) -> &mut DbWatch {
&mut self.watch
}
}
impl<T> TypedDbWatch<T> {
pub fn untyped(self) -> DbWatch {
self.watch
}
pub fn sync(&mut self) -> Result<(), Error> {
self.as_mut().sync()
}
pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Error>> {
self.as_mut().poll_changed(cx)
}
pub async fn changed(&mut self) -> Result<(), Error> {
self.as_mut().changed().await
}
}
impl<T: HasModel> TypedDbWatch<T> {
pub fn peek(&mut self) -> Result<T::Model, Error> {
let peek = self.as_mut().peek()?;
Ok(<T::Model as ModelExt<T>>::from_value(peek))
}
pub fn peek_and_mark_seen(&mut self) -> Result<T::Model, Error> {
let peek = self.as_mut().peek_and_mark_seen()?;
Ok(<T::Model as ModelExt<T>>::from_value(peek))
}
}
impl<T> Unpin for TypedDbWatch<T> {}
impl<T: HasModel> Stream for TypedDbWatch<T> {
type Item = Result<T::Model, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Err(e) = ready!(this.poll_changed(cx)) {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Some(Ok(<T::Model as ModelExt<T>>::from_value(
this.watch.state.clone(),
))))
}
}

105
core/src/test.rs Normal file
View File

@@ -0,0 +1,105 @@
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use imbl_value::{json, Value};
use json_ptr::JsonPointer;
use patch_db::{HasModel, PatchDb, Revision};
use proptest::prelude::*;
use tokio::fs;
use tokio::runtime::Builder;
use crate::{self as patch_db};
/// Atomic counter to generate unique file paths across concurrent tests.
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn unique_db_path(prefix: &str) -> String {
let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("test-{}-{}.db", prefix, id)
}
async fn init_db(db_name: String) -> PatchDb {
cleanup_db(&db_name).await;
let db = PatchDb::open(db_name).await.unwrap();
db.put(
&JsonPointer::<&'static str>::default(),
&json!({
"a": "test1",
"b": {
"a": "test2",
"b": 1,
"c": null,
},
}),
)
.await
.unwrap();
db
}
async fn cleanup_db(db_name: &str) {
fs::remove_file(db_name).await.ok();
fs::remove_file(format!("{}.bak", db_name)).await.ok();
fs::remove_file(format!("{}.bak.tmp", db_name)).await.ok();
fs::remove_file(format!("{}.failed", db_name)).await.ok();
}
async fn put_string_into_root(db: &PatchDb, s: String) -> Arc<Revision> {
db.put(&JsonPointer::<&'static str>::default(), &s)
.await
.unwrap()
.unwrap()
}
#[tokio::test]
async fn basic() {
let path = unique_db_path("basic");
let db = init_db(path.clone()).await;
let ptr: JsonPointer = "/b/b".parse().unwrap();
let mut get_res: Value = db.get(&ptr).await.unwrap();
assert_eq!(get_res.as_u64(), Some(1));
db.put(&ptr, "hello").await.unwrap();
get_res = db.get(&ptr).await.unwrap();
assert_eq!(get_res.as_str(), Some("hello"));
db.close().await.unwrap();
cleanup_db(&path).await;
}
fn run_future<S: Into<String>, Fut: Future<Output = ()>>(name: S, fut: Fut) {
Builder::new_multi_thread()
.thread_name(name)
.build()
.unwrap()
.block_on(fut)
}
proptest! {
#[test]
fn doesnt_crash(s in "\\PC*") {
run_future("test-doesnt-crash", async {
let path = unique_db_path("proptest");
let db = init_db(path.clone()).await;
put_string_into_root(&db, s).await;
db.close().await.unwrap();
cleanup_db(&path).await;
});
}
}
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
// pub struct Sample {
// a: String,
// #[model]
// b: Child,
// }
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
// pub struct Child {
// a: String,
// b: usize,
// c: NewType,
// }
// #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
// pub struct NewType(Option<Box<Sample>>);