first pass at model generics

This commit is contained in:
Aiden McClelland
2021-03-15 19:33:23 -06:00
parent c2e50d0e88
commit a12a93b035

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::future::Future;
use std::io::Error as IOError;
use std::path::Path;
use std::sync::Arc;
@@ -41,6 +42,8 @@ pub enum Error {
FDLock(#[from] fd_lock_rs::Error),
#[error("Database Cache Corrupted: {0}")]
CacheCorrupted(Arc<IOError>),
#[error("Mutex Error (should be unreachable): {0}")]
MutexError(#[from] tokio::sync::TryLockError),
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -225,11 +228,21 @@ impl PatchDb {
pub trait Checkpoint {
type SubTx: Checkpoint;
fn get_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
&'a self,
&'a mut self,
ptr: &'a JsonPointer<S, V>,
) -> BoxFuture<Result<Value, Error>>;
) -> BoxFuture<'a, Result<Value, Error>>;
fn put_value<'a, S: AsRef<str> + Send + Sync + 'a, V: SegList + Send + Sync + 'a>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
value: &'a Value,
) -> BoxFuture<'a, Result<(), Error>>;
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>);
fn apply(&mut self, patch: DiffPatch);
fn lock<'a, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&'a mut self,
ptr: &'a JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'a, ()>;
fn get<
'a,
T: for<'de> Deserialize<'de> + 'a,
@@ -259,7 +272,7 @@ pub struct Transaction {
}
impl Transaction {
async fn get_value<S: AsRef<str>, V: SegList>(
&self,
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<Value, Error> {
let mut data: Value = ptr
@@ -288,6 +301,17 @@ impl Transaction {
}
Ok(data)
}
async fn put_value<S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
let old = Transaction::get_value(self, ptr).await?;
let mut patch = json_patch::diff(&old, value);
patch.prepend(ptr);
(self.updates.0).0.extend(patch.0);
Ok(())
}
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -324,12 +348,7 @@ impl Transaction {
ptr: &JsonPointer<S, V>,
value: &T,
) -> Result<(), Error> {
let old = Transaction::get_value(self, ptr).await?;
let new = serde_json::to_value(value)?;
let mut patch = json_patch::diff(&old, &new);
patch.prepend(ptr);
(self.updates.0).0.extend(patch.0);
Ok(())
Transaction::put_value(self, ptr, &serde_json::to_value(value)?).await
}
pub async fn commit(self) -> Result<Arc<Revision>, Error> {
self.db.apply(self.updates).await
@@ -338,17 +357,31 @@ impl Transaction {
impl<'a> Checkpoint for &'a mut Transaction {
type SubTx = &'a mut SubTransaction<Self>;
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b self,
&'b mut self,
ptr: &'b JsonPointer<S, V>,
) -> BoxFuture<'b, Result<Value, Error>> {
Transaction::get_value(self, ptr).boxed()
}
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
value: &'b Value,
) -> BoxFuture<'b, Result<(), Error>> {
Transaction::put_value(self, ptr, value).boxed()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
(&self.db.locker, vec![&mut self.locks])
}
fn apply(&mut self, patch: DiffPatch) {
(self.updates.0).0.extend((patch.0).0)
}
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'b, ()> {
Transaction::lock(self, ptr, lock).boxed()
}
fn get<
'b,
T: for<'de> Deserialize<'de> + 'b,
@@ -382,7 +415,7 @@ pub struct SubTransaction<Tx: Checkpoint> {
}
impl<Tx: Checkpoint> SubTransaction<Tx> {
async fn get_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&self,
&mut self,
ptr: &JsonPointer<S, V>,
) -> Result<Value, Error> {
let mut data: Value = self.parent.get_value(ptr).await?;
@@ -408,6 +441,17 @@ impl<Tx: Checkpoint> SubTransaction<Tx> {
}
Ok(data)
}
async fn put_value<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
value: &Value,
) -> Result<(), Error> {
let old = SubTransaction::get_value(self, ptr).await?;
let mut patch = json_patch::diff(&old, value);
patch.prepend(ptr);
(self.updates.0).0.extend(patch.0);
Ok(())
}
pub async fn lock<S: AsRef<str> + Clone, V: SegList + Clone>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -460,11 +504,18 @@ impl<Tx: Checkpoint> SubTransaction<Tx> {
impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx> {
type SubTx = &'a mut SubTransaction<Self>;
fn get_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b self,
&'b mut self,
ptr: &'b JsonPointer<S, V>,
) -> BoxFuture<'b, Result<Value, Error>> {
SubTransaction::get_value(self, ptr).boxed()
}
fn put_value<'b, S: AsRef<str> + Send + Sync + 'b, V: SegList + Send + Sync + 'b>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
value: &'b Value,
) -> BoxFuture<'b, Result<(), Error>> {
SubTransaction::put_value(self, ptr, value).boxed()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, LockerGuard)]>) {
let (locker, mut locks) = self.parent.locker_and_locks();
locks.push(&mut self.locks);
@@ -473,6 +524,13 @@ impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx>
fn apply(&mut self, patch: DiffPatch) {
(self.updates.0).0.extend((patch.0).0)
}
fn lock<'b, S: AsRef<str> + Clone + Send + Sync, V: SegList + Clone + Send + Sync>(
&'b mut self,
ptr: &'b JsonPointer<S, V>,
lock: LockType,
) -> BoxFuture<'b, ()> {
SubTransaction::lock(self, ptr, lock).boxed()
}
fn get<
'b,
T: for<'de> Deserialize<'de> + 'b,
@@ -499,7 +557,7 @@ impl<'a, Tx: Checkpoint + Send + Sync> Checkpoint for &'a mut SubTransaction<Tx>
}
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum LockType {
None,
Read,
@@ -638,7 +696,7 @@ impl Locker {
lock.1 = match guard {
LockerGuard::Read(LockerReadGuard(guard)) if !remainder.is_empty() => {
// read guard already exists at higher level
let mut lock = guard.lock().await;
let mut lock = guard.try_lock().unwrap();
if let Some(l) = lock.take() {
let mut orig_lock = None;
let mut lock = ReadGuard::upgrade(l).await.unwrap();
@@ -703,3 +761,260 @@ impl Default for Locker {
Locker::new()
}
}
pub trait ModelData: Send + Sync {
type Inner;
fn apply<
'a,
F: FnOnce(&mut Self::Inner) -> ResFut + Send + Sync + 'a,
ResFut: Future<Output = Res> + Send + Sync + 'a,
Res: Send + Sync + 'a,
>(
&'a mut self,
f: F,
) -> BoxFuture<'a, Res>;
}
pub enum Never {}
impl ModelData for Never {
type Inner = Never;
fn apply<
'a,
F: FnOnce(&mut Self::Inner) -> ResFut + 'a,
ResFut: Future<Output = Res> + 'a,
Res: 'a,
>(
&'a mut self,
_: F,
) -> BoxFuture<'a, Res> {
match *self {}
}
}
pub enum GenericModelData<T: Serialize + for<'de> Deserialize<'de>, Parent: ModelData> {
Uninitialized(Vec<ChildHooks<T>>),
Ref(
Arc<dyn Fn(&mut Parent::Inner) -> &mut T + Send + Sync>,
Arc<Mutex<Parent>>,
),
Owned(Box<T>),
}
impl<T: Serialize + for<'de> Deserialize<'de>, Parent: ModelData> GenericModelData<T, Parent> {
pub fn is_initialized(&self) -> bool {
match self {
GenericModelData::Uninitialized(_) => false,
_ => true,
}
}
async fn apply<
F: FnOnce(&mut T) -> ResFut + Send + Sync,
ResFut: Future<Output = Res> + Send + Sync,
Res: Send + Sync,
>(
&mut self,
f: F,
) -> Res {
match self {
GenericModelData::Owned(data) => f(data).await,
GenericModelData::Ref(g, parent) => parent.lock().await.apply(|t| f(g(t))).await,
_ => panic!("uninitialized"),
}
}
}
impl<T: Serialize + for<'de> Deserialize<'de> + Send + Sync, Parent: ModelData> ModelData
for GenericModelData<T, Parent>
{
type Inner = T;
fn apply<
'a,
F: FnOnce(&mut Self::Inner) -> ResFut + Send + Sync + 'a,
ResFut: Future<Output = Res> + Send + Sync + 'a,
Res: Send + Sync + 'a,
>(
&'a mut self,
f: F,
) -> BoxFuture<'a, Res> {
self.apply(f).boxed()
}
}
pub struct ChildHooks<T> {
init_parent: Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send + Sync>,
save_data: Box<
dyn Fn() -> BoxFuture<'static, Result<Vec<(JsonPointer, Value)>, serde_json::Error>>
+ Send
+ Sync,
>,
}
impl<T> ChildHooks<T> {
async fn init_parent(self, parent: &mut T) {
(self.init_parent)(parent).await
}
async fn save_data(&self) -> Result<Vec<(JsonPointer, Value)>, serde_json::Error> {
(self.save_data)().await
}
}
pub struct GenericModel<T: Serialize + for<'de> Deserialize<'de>, Parent: ModelData> {
data: Arc<Mutex<GenericModelData<T, Parent>>>,
ptr: JsonPointer,
}
impl<
'a,
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
Parent: ModelData + 'static,
> GenericModel<T, Parent>
{
pub fn new(ptr: JsonPointer) -> Self {
Self {
data: Arc::new(Mutex::new(GenericModelData::Uninitialized(Vec::new()))),
ptr,
}
}
/// locks
async fn fetch<Tx: Checkpoint>(&mut self, tx: &mut Tx) -> Result<(), Error> {
let mut data = self.data.lock().await;
if let GenericModelData::Uninitialized(children) = &mut *data {
let mut a: Box<T> = tx.get(&self.ptr, LockType::None).await?;
for child in std::mem::replace(children, Vec::new()) {
child.init_parent(&mut a).await;
}
*data = GenericModelData::Owned(a);
}
Ok(())
}
pub async fn lock<Tx: Checkpoint>(&self, tx: &mut Tx, lock: LockType) {
tx.lock(&self.ptr, lock).await
}
/// locks
pub async fn peek<
Tx: Checkpoint,
F: FnOnce(&T) -> ResFut + Send + Sync,
ResFut: Future<Output = Res> + Send + Sync,
Res: Send + Sync,
>(
&mut self,
tx: &mut Tx,
f: F,
) -> Result<Res, Error> {
self.fetch(tx).await?;
Ok(self.data.lock().await.apply(|t| f(t)).await)
}
/// locks
pub async fn apply<
Tx: Checkpoint,
F: FnOnce(&mut T) -> ResFut + Send + Sync,
ResFut: Future<Output = Res> + Send + Sync,
Res: Send + Sync,
>(
&mut self,
tx: &mut Tx,
f: F,
) -> Result<Res, Error> {
self.fetch(tx).await?;
Ok(self.data.lock().await.apply(f).await)
}
/// locks
pub async fn child<C, F, S, V>(
&mut self,
path: &JsonPointer<S, V>,
f: F,
) -> GenericModel<C, GenericModelData<T, Parent>>
where
C: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
F: Fn(&mut T) -> &mut C + Send + Sync + 'static,
S: AsRef<str>,
V: SegList,
for<'v> &'v V: IntoIterator<Item = &'v json_ptr::PtrSegment>,
{
let arc_f = Arc::new(f);
let ptr = self.ptr.clone() + path;
let mut data_ref = self.data.lock().await;
let data = if let GenericModelData::Uninitialized(children) = &mut *data_ref {
let data = Arc::new(Mutex::new(GenericModelData::Uninitialized(Vec::new())));
let init_parent_data = data.clone();
let parent_data = self.data.clone();
let save_data_data = data.clone();
let save_data_ptr = ptr.clone();
children.push(ChildHooks {
init_parent: Box::new(move |parent| {
async move {
let mut data_ref = init_parent_data.lock().await;
let self_data = std::mem::replace(
&mut *data_ref,
GenericModelData::Uninitialized(Vec::new()),
);
match self_data {
GenericModelData::Owned(t) => *arc_f(parent) = *t,
GenericModelData::Uninitialized(children) => {
for child in children {
child.init_parent(arc_f(parent)).await; // TODO: can probably parallelize
}
}
_ => (),
}
*data_ref = GenericModelData::Ref(arc_f, parent_data);
}
.boxed()
}),
save_data: Box::new(move || {
let ptr = save_data_ptr.clone();
let data = save_data_data.clone();
async move {
let mut data_ref = data.lock().await;
if let GenericModelData::Uninitialized(children) = &mut *data_ref {
let mut res = Vec::new();
for child in children {
res.append(&mut child.save_data().await?)
}
Ok(res)
} else {
Ok(vec![(
ptr,
data_ref
.apply(|t| futures::future::ready(serde_json::to_value(t)))
.await?,
)])
}
}
.boxed()
}),
});
data
} else {
Arc::new(Mutex::new(GenericModelData::Ref(
arc_f.clone(),
self.data.clone(),
)))
};
GenericModel { data, ptr }
}
/// locks
pub async fn save<Tx: Checkpoint>(&mut self, tx: &mut Tx) -> Result<(), Error> {
let mut data_ref = self.data.lock().await;
for (ptr, value) in if let GenericModelData::Uninitialized(children) = &mut *data_ref {
let mut res = Vec::new();
for child in children {
res.append(&mut child.save_data().await?)
}
res
} else {
vec![(
self.ptr.clone(),
data_ref
.apply(|t| futures::future::ready(serde_json::to_value(t)))
.await?,
)]
} {
tx.put_value(&ptr, &value).await?;
}
Ok(())
}
}