mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 10:21:53 +00:00
fix transactions
This commit is contained in:
@@ -19,3 +19,6 @@ serde_json = "1.0.61"
|
||||
serde_cbor = { git = "https://github.com/dr-bonez/cbor.git" }
|
||||
thiserror = "1.0.23"
|
||||
tokio = { version = "1.0.1", features = ["sync", "fs", "rt", "io-util", "macros"] }
|
||||
|
||||
[dev-dependencies]
|
||||
serde = { version = "1.0.118", features = ["rc", "derive"] }
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
|
||||
use fd_lock_rs::FdLock;
|
||||
use futures::future::{BoxFuture, FutureExt};
|
||||
use json_patch::{Patch, PatchOperation};
|
||||
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use qutex_2::{QrwLock, ReadGuard, WriteGuard};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -16,11 +16,13 @@ use thiserror::Error;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
sync::{
|
||||
broadcast::{Receiver, Sender},
|
||||
Mutex, RwLock,
|
||||
broadcast::{error::TryRecvError, Receiver, Sender},
|
||||
Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
|
||||
},
|
||||
};
|
||||
|
||||
// note: inserting into an array (before another element) without proper locking can result in unexpected behaviour
|
||||
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
@@ -42,18 +44,146 @@ pub enum Error {
|
||||
FDLock(#[from] fd_lock_rs::Error),
|
||||
#[error("Database Cache Corrupted: {0}")]
|
||||
CacheCorrupted(Arc<IOError>),
|
||||
#[error("Mutex Error (should be unreachable): {0}")]
|
||||
MutexError(#[from] tokio::sync::TryLockError),
|
||||
#[error("Subscriber Error: {0}")]
|
||||
Subscriber(#[from] TryRecvError),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Revision {
|
||||
pub id: u64,
|
||||
pub patch: Patch,
|
||||
pub patch: DiffPatch,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct DiffPatch(Patch);
|
||||
impl DiffPatch {
|
||||
// safe to assume dictionary style symantics for arrays since patches will always be rebased before being applied
|
||||
pub fn for_path<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> DiffPatch {
|
||||
let DiffPatch(Patch(ops)) = self;
|
||||
DiffPatch(Patch(
|
||||
ops.iter()
|
||||
.filter_map(|op| match op {
|
||||
PatchOperation::Add(op) => {
|
||||
if let Some(tail) = op.path.strip_prefix(ptr) {
|
||||
Some(PatchOperation::Add(AddOperation {
|
||||
path: tail.to_owned(),
|
||||
value: op.value.clone(),
|
||||
}))
|
||||
} else if let Some(tail) = ptr.strip_prefix(&op.path) {
|
||||
Some(PatchOperation::Add(AddOperation {
|
||||
path: Default::default(),
|
||||
value: tail.get(&op.value).cloned().unwrap_or_default(),
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
PatchOperation::Replace(op) => {
|
||||
if let Some(tail) = op.path.strip_prefix(ptr) {
|
||||
Some(PatchOperation::Replace(ReplaceOperation {
|
||||
path: tail.to_owned(),
|
||||
value: op.value.clone(),
|
||||
}))
|
||||
} else if let Some(tail) = ptr.strip_prefix(&op.path) {
|
||||
Some(PatchOperation::Replace(ReplaceOperation {
|
||||
path: Default::default(),
|
||||
value: tail.get(&op.value).cloned().unwrap_or_default(),
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
PatchOperation::Remove(op) => {
|
||||
if ptr.starts_with(&op.path) {
|
||||
Some(PatchOperation::Replace(ReplaceOperation {
|
||||
path: Default::default(),
|
||||
value: Default::default(),
|
||||
}))
|
||||
} else if let Some(tail) = op.path.strip_prefix(ptr) {
|
||||
Some(PatchOperation::Remove(RemoveOperation {
|
||||
path: tail.to_owned(),
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
})
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
pub fn rebase(&mut self, onto: &DiffPatch) {
|
||||
let DiffPatch(Patch(ops)) = self;
|
||||
let DiffPatch(Patch(onto_ops)) = onto;
|
||||
for onto_op in onto_ops {
|
||||
if let PatchOperation::Add(onto_op) = onto_op {
|
||||
let arr_path_idx = onto_op.path.len() - 1;
|
||||
if let Some(onto_idx) = onto_op
|
||||
.path
|
||||
.get_segment(arr_path_idx)
|
||||
.and_then(|seg| seg.parse::<usize>().ok())
|
||||
{
|
||||
let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default();
|
||||
for op in ops.iter_mut() {
|
||||
let path = match op {
|
||||
PatchOperation::Add(op) => &mut op.path,
|
||||
PatchOperation::Replace(op) => &mut op.path,
|
||||
PatchOperation::Remove(op) => &mut op.path,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
if path.starts_with(&prefix) {
|
||||
if let Some(idx) = path
|
||||
.get_segment(arr_path_idx)
|
||||
.and_then(|seg| seg.parse::<usize>().ok())
|
||||
{
|
||||
if idx >= onto_idx {
|
||||
let mut new_path = prefix.clone().to_owned();
|
||||
new_path.push_end_idx(idx + 1);
|
||||
if let Some(tail) = path.slice(arr_path_idx + 1..) {
|
||||
new_path.append(&tail);
|
||||
}
|
||||
*path = new_path;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let PatchOperation::Remove(onto_op) = onto_op {
|
||||
let arr_path_idx = onto_op.path.len() - 1;
|
||||
if let Some(onto_idx) = onto_op
|
||||
.path
|
||||
.get_segment(arr_path_idx)
|
||||
.and_then(|seg| seg.parse::<usize>().ok())
|
||||
{
|
||||
let prefix = onto_op.path.slice(..arr_path_idx).unwrap_or_default();
|
||||
for op in ops.iter_mut() {
|
||||
let path = match op {
|
||||
PatchOperation::Add(op) => &mut op.path,
|
||||
PatchOperation::Replace(op) => &mut op.path,
|
||||
PatchOperation::Remove(op) => &mut op.path,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
if path.starts_with(&prefix) {
|
||||
if let Some(idx) = path
|
||||
.get_segment(arr_path_idx)
|
||||
.and_then(|seg| seg.parse::<usize>().ok())
|
||||
{
|
||||
if idx >= onto_idx {
|
||||
let mut new_path = prefix.clone().to_owned();
|
||||
new_path.push_end_idx(idx - 1);
|
||||
if let Some(tail) = path.slice(arr_path_idx + 1..) {
|
||||
new_path.append(&tail);
|
||||
}
|
||||
*path = new_path;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Store {
|
||||
file: FdLock<File>,
|
||||
@@ -176,7 +306,7 @@ impl Store {
|
||||
|
||||
let id = self.revision;
|
||||
self.revision += 1;
|
||||
let res = Arc::new(Revision { id, patch: patch.0 });
|
||||
let res = Arc::new(Revision { id, patch });
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
@@ -211,8 +341,19 @@ impl PatchDb {
|
||||
) -> Result<Arc<Revision>, Error> {
|
||||
self.store.write().await.put(ptr, value).await
|
||||
}
|
||||
pub async fn apply(&self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
|
||||
self.store.write().await.apply(patch).await
|
||||
pub async fn apply(
|
||||
&self,
|
||||
patch: DiffPatch,
|
||||
store_write_lock: Option<RwLockWriteGuard<'_, Store>>,
|
||||
) -> Result<Arc<Revision>, Error> {
|
||||
let mut store = if let Some(store_write_lock) = store_write_lock {
|
||||
store_write_lock
|
||||
} else {
|
||||
self.store.write().await
|
||||
};
|
||||
let rev = store.apply(patch).await?;
|
||||
self.subscriber.send(rev.clone()).unwrap_or_default(); // ignore errors
|
||||
Ok(rev)
|
||||
}
|
||||
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.subscriber.subscribe()
|
||||
@@ -222,20 +363,24 @@ impl PatchDb {
|
||||
db: self.clone(),
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch(Patch(Vec::new())),
|
||||
sub: self.subscribe(),
|
||||
}
|
||||
}
|
||||
}
|
||||
pub trait Checkpoint {
|
||||
type SubTx: Checkpoint;
|
||||
pub trait Checkpoint: Sized {
|
||||
fn rebase(&mut self) -> Result<(), Error>;
|
||||
fn get_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'a, Store>>,
|
||||
) -> BoxFuture<'a, Result<Value, Error>>;
|
||||
fn put_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
|
||||
&'a mut self,
|
||||
ptr: &'a JsonPointer<S, V>,
|
||||
value: &'a Value,
|
||||
) -> BoxFuture<'a, Result<(), Error>>;
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
|
||||
fn apply(&mut self, patch: DiffPatch);
|
||||
fn lock<'a, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
|
||||
@@ -269,36 +414,35 @@ pub struct Transaction {
|
||||
db: PatchDb,
|
||||
locks: Vec<(JsonPointer, LockerGuard)>,
|
||||
updates: DiffPatch,
|
||||
sub: Receiver<Arc<Revision>>,
|
||||
}
|
||||
impl Transaction {
|
||||
pub fn rebase(&mut self) -> Result<(), Error> {
|
||||
while let Some(rev) = match self.sub.try_recv() {
|
||||
Ok(a) => Some(a),
|
||||
Err(TryRecvError::Empty) => None,
|
||||
Err(e) => return Err(e.into()),
|
||||
} {
|
||||
self.updates.rebase(&rev.patch);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn get_value<S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<Value, Error> {
|
||||
let mut data: Value = ptr
|
||||
.get(self.db.store.read().await.get_data()?)
|
||||
.unwrap_or(&Value::Null)
|
||||
.clone();
|
||||
for op in (self.updates.0).0.iter() {
|
||||
match op {
|
||||
PatchOperation::Add(ref op) => {
|
||||
if let Some(path) = op.path.strip_prefix(ptr) {
|
||||
path.insert(&mut data, op.value.clone(), false)?;
|
||||
}
|
||||
}
|
||||
PatchOperation::Remove(ref op) => {
|
||||
if let Some(path) = op.path.strip_prefix(ptr) {
|
||||
path.remove(&mut data, false);
|
||||
}
|
||||
}
|
||||
PatchOperation::Replace(ref op) => {
|
||||
if let Some(path) = op.path.strip_prefix(ptr) {
|
||||
path.set(&mut data, op.value.clone(), false)?;
|
||||
}
|
||||
}
|
||||
_ => unreachable!("Diff patches cannot contain other operations."),
|
||||
}
|
||||
}
|
||||
let mut data = {
|
||||
let store_lock = self.db.store.clone();
|
||||
let store = if let Some(store_read_lock) = store_read_lock {
|
||||
store_read_lock
|
||||
} else {
|
||||
store_lock.read().await
|
||||
};
|
||||
self.rebase()?;
|
||||
ptr.get(store.get_data()?).cloned().unwrap_or_default()
|
||||
};
|
||||
json_patch::patch(&mut data, &self.updates.for_path(ptr).0)?;
|
||||
Ok(data)
|
||||
}
|
||||
async fn put_value<S: AsRef<str>, V: SegList>(
|
||||
@@ -306,8 +450,8 @@ impl Transaction {
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<(), Error> {
|
||||
let old = Transaction::get_value(self, ptr).await?;
|
||||
let mut patch = json_patch::diff(&old, value);
|
||||
let old = Transaction::get_value(self, ptr, None).await?;
|
||||
let mut patch = json_patch::diff(&old, &value);
|
||||
patch.prepend(ptr);
|
||||
(self.updates.0).0.extend(patch.0);
|
||||
Ok(())
|
||||
@@ -340,7 +484,7 @@ impl Transaction {
|
||||
) -> Result<T, Error> {
|
||||
self.lock(ptr, lock).await;
|
||||
Ok(serde_json::from_value(
|
||||
Transaction::get_value(self, ptr).await?,
|
||||
Transaction::get_value(self, ptr, None).await?,
|
||||
)?)
|
||||
}
|
||||
pub async fn put<T: Serialize, S: AsRef<str>, V: SegList>(
|
||||
@@ -350,17 +494,36 @@ impl Transaction {
|
||||
) -> Result<(), Error> {
|
||||
Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await
|
||||
}
|
||||
pub async fn commit(self) -> Result<Arc<Revision>, Error> {
|
||||
self.db.apply(self.updates).await
|
||||
pub async fn commit(mut self) -> Result<Arc<Revision>, Error> {
|
||||
let store_lock = self.db.store.clone();
|
||||
let store = store_lock.write().await;
|
||||
self.rebase()?;
|
||||
self.db.apply(self.updates, Some(store)).await
|
||||
}
|
||||
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
|
||||
let store_lock = self.db.store.clone();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
let sub = self.db.subscribe();
|
||||
drop(store);
|
||||
Ok(SubTransaction {
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch(Patch(Vec::new())),
|
||||
sub,
|
||||
})
|
||||
}
|
||||
}
|
||||
impl<'a> Checkpoint for &'a mut Transaction {
|
||||
type SubTx = &'a mut SubTransaction<Self>;
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
Transaction::rebase(self)
|
||||
}
|
||||
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
||||
) -> BoxFuture<'b, Result<Value, Error>> {
|
||||
Transaction::get_value(self, ptr).boxed()
|
||||
Transaction::get_value(self, ptr, store_read_lock).boxed()
|
||||
}
|
||||
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
@@ -369,6 +532,12 @@ impl<'a> Checkpoint for &'a mut Transaction {
|
||||
) -> BoxFuture<'b, Result<(), Error>> {
|
||||
Transaction::put_value(self, ptr, value).boxed()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.db.store.clone()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.db.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
(&self.db.locker, vec![&mut self.locks])
|
||||
}
|
||||
@@ -412,33 +581,36 @@ pub struct SubTransaction<Tx: Checkpoint> {
|
||||
parent: Tx,
|
||||
locks: Vec<(JsonPointer, LockerGuard)>,
|
||||
updates: DiffPatch,
|
||||
sub: Receiver<Arc<Revision>>,
|
||||
}
|
||||
impl<Tx: Checkpoint> SubTransaction<Tx> {
|
||||
impl<Tx: Checkpoint + Send + Sync> SubTransaction<Tx> {
|
||||
pub fn rebase(&mut self) -> Result<(), Error> {
|
||||
self.parent.rebase()?;
|
||||
while let Some(rev) = match self.sub.try_recv() {
|
||||
Ok(a) => Some(a),
|
||||
Err(TryRecvError::Empty) => None,
|
||||
Err(e) => return Err(e.into()),
|
||||
} {
|
||||
self.updates.rebase(&rev.patch);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'_, Store>>,
|
||||
) -> Result<Value, Error> {
|
||||
let mut data: Value = self.parent.get_value(ptr).await?;
|
||||
for op in (self.updates.0).0.iter() {
|
||||
match op {
|
||||
PatchOperation::Add(ref op) => {
|
||||
if let Some(path) = op.path.strip_prefix(ptr) {
|
||||
path.insert(&mut data, op.value.clone(), false)?;
|
||||
}
|
||||
}
|
||||
PatchOperation::Remove(ref op) => {
|
||||
if let Some(path) = op.path.strip_prefix(ptr) {
|
||||
path.remove(&mut data, false);
|
||||
}
|
||||
}
|
||||
PatchOperation::Replace(ref op) => {
|
||||
if let Some(path) = op.path.strip_prefix(ptr) {
|
||||
path.set(&mut data, op.value.clone(), false)?;
|
||||
}
|
||||
}
|
||||
_ => unreachable!("Diff patches cannot contain other operations."),
|
||||
}
|
||||
}
|
||||
let mut data = {
|
||||
let store_lock = self.parent.store();
|
||||
let store = if let Some(store_read_lock) = store_read_lock {
|
||||
store_read_lock
|
||||
} else {
|
||||
store_lock.read().await
|
||||
};
|
||||
self.rebase()?;
|
||||
self.parent.get_value(ptr, Some(store)).await?
|
||||
};
|
||||
json_patch::patch(&mut data, &self.updates.for_path(ptr).0)?;
|
||||
Ok(data)
|
||||
}
|
||||
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
@@ -446,8 +618,8 @@ impl<Tx: Checkpoint> SubTransaction<Tx> {
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &Value,
|
||||
) -> Result<(), Error> {
|
||||
let old = SubTransaction::get_value(self, ptr).await?;
|
||||
let mut patch = json_patch::diff(&old, value);
|
||||
let old = SubTransaction::get_value(self, ptr, None).await?;
|
||||
let mut patch = json_patch::diff(&old, &value);
|
||||
patch.prepend(ptr);
|
||||
(self.updates.0).0.extend(patch.0);
|
||||
Ok(())
|
||||
@@ -482,7 +654,7 @@ impl<Tx: Checkpoint> SubTransaction<Tx> {
|
||||
) -> Result<T, Error> {
|
||||
self.lock(ptr, lock).await;
|
||||
Ok(serde_json::from_value(
|
||||
SubTransaction::get_value(self, ptr).await?,
|
||||
SubTransaction::get_value(self, ptr, None).await?,
|
||||
)?)
|
||||
}
|
||||
pub async fn put<T: Serialize, S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
@@ -490,24 +662,40 @@ impl<Tx: Checkpoint> SubTransaction<Tx> {
|
||||
ptr: &JsonPointer<S, V>,
|
||||
value: &T,
|
||||
) -> Result<(), Error> {
|
||||
let old = SubTransaction::get_value(self, ptr).await?;
|
||||
let new = serde_json::to_value(value)?;
|
||||
let mut patch = json_patch::diff(&old, &new);
|
||||
patch.prepend(ptr);
|
||||
(self.updates.0).0.extend(patch.0);
|
||||
SubTransaction::put_value(self, ptr, &serde_json::to_value(value)?).await
|
||||
}
|
||||
pub async fn commit(mut self) -> Result<(), Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
self.parent.apply(self.updates);
|
||||
drop(store);
|
||||
Ok(())
|
||||
}
|
||||
pub fn commit(mut self) {
|
||||
self.parent.apply(self.updates)
|
||||
pub async fn begin(&mut self) -> Result<SubTransaction<&mut Self>, Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
self.rebase()?;
|
||||
let sub = self.parent.subscribe();
|
||||
drop(store);
|
||||
Ok(SubTransaction {
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch(Patch(Vec::new())),
|
||||
sub,
|
||||
})
|
||||
}
|
||||
}
|
||||
impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx> {
|
||||
type SubTx = &'a mut SubTransaction<Self>;
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
SubTransaction::rebase(self)
|
||||
}
|
||||
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
ptr: &'b JsonPointer<S, V>,
|
||||
store_read_lock: Option<RwLockReadGuard<'b, Store>>,
|
||||
) -> BoxFuture<'b, Result<Value, Error>> {
|
||||
SubTransaction::get_value(self, ptr).boxed()
|
||||
SubTransaction::get_value(self, ptr, store_read_lock).boxed()
|
||||
}
|
||||
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
|
||||
&'b mut self,
|
||||
@@ -516,6 +704,12 @@ impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx>
|
||||
) -> BoxFuture<'b, Result<(), Error>> {
|
||||
SubTransaction::put_value(self, ptr, value).boxed()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.parent.store()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.parent.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locks.push(&mut self.locks);
|
||||
@@ -855,15 +1049,22 @@ impl<T> ChildHooks<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Model {
|
||||
type Data: ModelData;
|
||||
}
|
||||
|
||||
impl Model for Never {
|
||||
type Data = Never;
|
||||
}
|
||||
|
||||
pub struct GenericModel<T: Serialize + for<'de> Deserialize<'de>, Parent: ModelData> {
|
||||
data: Arc<Mutex<GenericModelData<T, Parent>>>,
|
||||
ptr: JsonPointer,
|
||||
}
|
||||
impl<
|
||||
'a,
|
||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
|
||||
Parent: ModelData + 'static,
|
||||
> GenericModel<T, Parent>
|
||||
impl<T, Parent> GenericModel<T, Parent>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
|
||||
Parent: ModelData + 'static,
|
||||
{
|
||||
pub fn new(ptr: JsonPointer) -> Self {
|
||||
Self {
|
||||
@@ -872,7 +1073,7 @@ impl<
|
||||
}
|
||||
}
|
||||
|
||||
/// locks
|
||||
// locks
|
||||
async fn fetch<Tx: Checkpoint>(&mut self, tx: &mut Tx) -> Result<(), Error> {
|
||||
let mut data = self.data.lock().await;
|
||||
if let GenericModelData::Uninitialized(children) = &mut *data {
|
||||
@@ -890,7 +1091,7 @@ impl<
|
||||
tx.lock(&self.ptr, lock).await
|
||||
}
|
||||
|
||||
/// locks
|
||||
// locks
|
||||
pub async fn peek<
|
||||
Tx: Checkpoint,
|
||||
F: FnOnce(&T) -> ResFut + Send + Sync,
|
||||
@@ -901,11 +1102,12 @@ impl<
|
||||
tx: &mut Tx,
|
||||
f: F,
|
||||
) -> Result<Res, Error> {
|
||||
self.lock(tx, LockType::Read).await;
|
||||
self.fetch(tx).await?;
|
||||
Ok(self.data.lock().await.apply(|t| f(t)).await)
|
||||
}
|
||||
|
||||
/// locks
|
||||
// locks
|
||||
pub async fn apply<
|
||||
Tx: Checkpoint,
|
||||
F: FnOnce(&mut T) -> ResFut + Send + Sync,
|
||||
@@ -916,11 +1118,12 @@ impl<
|
||||
tx: &mut Tx,
|
||||
f: F,
|
||||
) -> Result<Res, Error> {
|
||||
self.lock(tx, LockType::Write).await;
|
||||
self.fetch(tx).await?;
|
||||
Ok(self.data.lock().await.apply(f).await)
|
||||
}
|
||||
|
||||
/// locks
|
||||
// locks
|
||||
pub async fn child<C, F, S, V>(
|
||||
&mut self,
|
||||
path: &JsonPointer<S, V>,
|
||||
@@ -1018,3 +1221,11 @@ impl<
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Parent> Model for GenericModel<T, Parent>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
|
||||
Parent: ModelData,
|
||||
{
|
||||
type Data = GenericModelData<T, Parent>;
|
||||
}
|
||||
|
||||
@@ -8,3 +8,25 @@ async fn basic() {
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Sample {
|
||||
a: String,
|
||||
b: Child,
|
||||
}
|
||||
|
||||
pub struct SampleModel<Parent: Model = Never>(GenericModel<Sample, Parent::Data>);
|
||||
impl<Parent: Model> Model for SampleModel<Parent> {
|
||||
type Data = GenericModelData<Sample, Parent::Data>;
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct Child {
|
||||
a: String,
|
||||
b: usize,
|
||||
}
|
||||
|
||||
pub struct ChildModel<Parent: Model = Never>(GenericModel<Child, Parent::Data>);
|
||||
impl<Parent: Model> Model for ChildModel<Parent> {
|
||||
type Data = GenericModelData<Sample, Parent::Data>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user