simplify process_queue

This commit is contained in:
Aiden McClelland
2021-09-23 18:59:00 -06:00
committed by Aiden McClelland
parent 561a3ceafc
commit 843e7febcb
3 changed files with 70 additions and 32 deletions

View File

@@ -181,6 +181,11 @@ fn build_model_struct(
&self.0 &self.0
} }
} }
impl core::ops::DerefMut for #model_name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<patch_db::json_ptr::JsonPointer> for #model_name { impl From<patch_db::json_ptr::JsonPointer> for #model_name {
fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self { fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self {
#model_name(#inner_model::from(ptr)) #model_name(#inner_model::from(ptr))
@@ -276,6 +281,11 @@ fn build_model_struct(
&self.0 &self.0
} }
} }
impl core::ops::DerefMut for #model_name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl #model_name { impl #model_name {
#( #(
pub fn #child_fn_name(self) -> #child_model { pub fn #child_fn_name(self) -> #child_model {
@@ -327,6 +337,11 @@ fn build_model_enum(base: &DeriveInput, _: &DataEnum, model_name: Option<Ident>)
&self.0 &self.0
} }
} }
impl core::ops::DerefMut for #model_name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<patch_db::json_ptr::JsonPointer> for #model_name { impl From<patch_db::json_ptr::JsonPointer> for #model_name {
fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self { fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self {
#model_name(From::from(ptr)) #model_name(From::from(ptr))

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet};
use json_ptr::JsonPointer; use json_ptr::JsonPointer;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@@ -61,7 +61,7 @@ impl Drop for Guard {
struct Node { struct Node {
readers: Vec<usize>, readers: Vec<usize>,
writers: HashSet<usize>, writers: HashSet<usize>,
reqs: VecDeque<Option<Request>>, reqs: Vec<Request>,
} }
impl Node { impl Node {
fn write_free(&self, id: usize) -> bool { fn write_free(&self, id: usize) -> bool {
@@ -70,11 +70,17 @@ impl Node {
fn read_free(&self, id: usize) -> bool { fn read_free(&self, id: usize) -> bool {
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0) self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
} }
// allow a lock to skip the queue if a lock is already held by the same handle
fn can_jump_queue(&self, id: usize) -> bool {
self.writers.contains(&id) || self.readers.contains(&id)
}
fn write_available(&self, id: usize) -> bool { fn write_available(&self, id: usize) -> bool {
self.write_free(id) && self.read_free(id) && self.reqs.is_empty() self.write_free(id)
&& self.read_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id))
} }
fn read_available(&self, id: usize) -> bool { fn read_available(&self, id: usize) -> bool {
self.write_free(id) && self.reqs.is_empty() self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
} }
fn handle_request( fn handle_request(
&mut self, &mut self,
@@ -88,7 +94,7 @@ impl Node {
self.readers.push(req.lock_info.handle_id); self.readers.push(req.lock_info.handle_id);
req.process(returned_locks) req.process(returned_locks)
} else { } else {
self.reqs.push_back(Some(req)); self.reqs.push(req);
None None
} }
} }
@@ -120,32 +126,16 @@ impl Node {
&mut self, &mut self,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>, returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Vec<Request> { ) -> Vec<Request> {
let mut ids_processed = HashSet::new();
let mut only_matching = false;
let mut res = Vec::new(); let mut res = Vec::new();
let mut tmp_reqs = std::mem::take(&mut self.reqs); for req in std::mem::take(&mut self.reqs) {
for req_opt in &mut tmp_reqs { if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
if let Some(req) = req_opt { || self.read_available(req.lock_info.handle_id)
if !only_matching || ids_processed.contains(&req.lock_info.handle_id) { {
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id)) if let Some(req) = self.handle_request(req, returned_locks) {
|| self.read_available(req.lock_info.handle_id) res.push(req);
{
ids_processed.insert(req.lock_info.handle_id);
if let Some(req) = req_opt.take() {
if let Some(req) = self.handle_request(req, returned_locks) {
res.push(req);
}
}
} else {
only_matching = true;
}
} }
} }
} }
self.reqs = tmp_reqs;
while matches!(self.reqs.get(0), Some(&None)) {
self.reqs.pop_front();
}
res res
} }
} }

View File

@@ -22,7 +22,7 @@ impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelData<T> {
} }
} }
impl<T: Serialize + for<'de> Deserialize<'de>> ModelData<T> { impl<T: Serialize + for<'de> Deserialize<'de>> ModelData<T> {
pub fn to_owned(self) -> T { pub fn into_owned(self) -> T {
self.0 self.0
} }
} }
@@ -167,6 +167,20 @@ impl<
{ {
} }
macro_rules! impl_simple_has_model {
($($ty:ty),*) => {
$(
impl HasModel for $ty {
type Model = Model<$ty>;
}
)*
};
}
impl_simple_has_model!(
bool, char, f32, f64, i128, i16, i32, i64, i8, isize, u128, u16, u32, u64, u8, usize, String
);
#[derive(Debug)] #[derive(Debug)]
pub struct BoxModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model); pub struct BoxModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> Deref for BoxModel<T> { impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> Deref for BoxModel<T> {
@@ -175,6 +189,11 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> Deref for BoxModel<T>
&self.0 &self.0
} }
} }
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> DerefMut for BoxModel<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Box<T>>> for BoxModel<T> { impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Box<T>>> for BoxModel<T> {
fn from(model: Model<Box<T>>) -> Self { fn from(model: Model<Box<T>>) -> Self {
BoxModel(T::Model::from(JsonPointer::from(model))) BoxModel(T::Model::from(JsonPointer::from(model)))
@@ -260,13 +279,13 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
} }
pub fn and_then< pub fn and_then<
F: FnOnce(T::Model) -> U::Model, F: FnOnce(T::Model) -> OptionModel<U>,
U: Serialize + for<'de> Deserialize<'de> + HasModel, U: Serialize + for<'de> Deserialize<'de> + HasModel,
>( >(
self, self,
f: F, f: F,
) -> OptionModel<U> { ) -> OptionModel<U> {
OptionModel(f(self.0)) f(self.0)
} }
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> { pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
@@ -277,14 +296,14 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
impl<T> OptionModel<T> impl<T> OptionModel<T>
where where
T: HasModel + Serialize + for<'de> Deserialize<'de>, T: HasModel + Serialize + for<'de> Deserialize<'de>,
T::Model: AsMut<Model<T>>, T::Model: DerefMut<Target = Model<T>>,
{ {
pub async fn check<Db: DbHandle>(mut self, db: &mut Db) -> Result<Option<T::Model>, Error> { pub async fn check<Db: DbHandle>(mut self, db: &mut Db) -> Result<Option<T::Model>, Error> {
let lock = db let lock = db
.locker() .locker()
.lock(db.id(), self.0.as_ref().clone(), false) .lock(db.id(), self.0.as_ref().clone(), false)
.await; .await;
self.0.as_mut().lock = Some(Arc::new(lock)); self.0.lock = Some(Arc::new(lock));
Ok(if self.exists(db, false).await? { Ok(if self.exists(db, false).await? {
Some(self.0) Some(self.0)
} else { } else {
@@ -355,6 +374,11 @@ impl<T: Serialize + for<'de> Deserialize<'de>> Deref for VecModel<T> {
&self.0 &self.0
} }
} }
impl<T: Serialize + for<'de> Deserialize<'de>> DerefMut for VecModel<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> VecModel<T> { impl<T: Serialize + for<'de> Deserialize<'de>> VecModel<T> {
pub fn idx(self, idx: usize) -> Model<Option<T>> { pub fn idx(self, idx: usize) -> Model<Option<T>> {
self.0.child(&format!("{}", idx)) self.0.child(&format!("{}", idx))
@@ -443,6 +467,15 @@ where
&self.0 &self.0
} }
} }
impl<T> DerefMut for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> std::clone::Clone for MapModel<T> impl<T> std::clone::Clone for MapModel<T>
where where
T: Serialize + for<'de> Deserialize<'de> + Map, T: Serialize + for<'de> Deserialize<'de> + Map,