From a1311e98036210f2f3db0d3c29a1bdc3abff3ed7 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Thu, 23 Sep 2021 18:59:00 -0600 Subject: [PATCH] simplify process_queue --- patch-db-macro-internals/src/lib.rs | 15 ++++++++++ patch-db/src/locker.rs | 44 +++++++++++------------------ patch-db/src/model.rs | 43 ++++++++++++++++++++++++---- 3 files changed, 70 insertions(+), 32 deletions(-) diff --git a/patch-db-macro-internals/src/lib.rs b/patch-db-macro-internals/src/lib.rs index a4b1310..391f2a5 100644 --- a/patch-db-macro-internals/src/lib.rs +++ b/patch-db-macro-internals/src/lib.rs @@ -181,6 +181,11 @@ fn build_model_struct( &self.0 } } + impl core::ops::DerefMut for #model_name { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } impl From for #model_name { fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self { #model_name(#inner_model::from(ptr)) @@ -276,6 +281,11 @@ fn build_model_struct( &self.0 } } + impl core::ops::DerefMut for #model_name { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } impl #model_name { #( pub fn #child_fn_name(self) -> #child_model { @@ -327,6 +337,11 @@ fn build_model_enum(base: &DeriveInput, _: &DataEnum, model_name: Option) &self.0 } } + impl core::ops::DerefMut for #model_name { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } impl From for #model_name { fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self { #model_name(From::from(ptr)) diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index 86db8b8..37d4c55 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use json_ptr::JsonPointer; use tokio::sync::{mpsc, oneshot}; @@ -61,7 +61,7 @@ impl Drop for Guard { struct Node { readers: Vec, writers: HashSet, - reqs: VecDeque>, + reqs: Vec, } impl Node { fn write_free(&self, id: usize) -> bool { @@ -70,11 +70,17 @@ impl Node { fn read_free(&self, id: usize) -> bool { self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0) } + // allow a lock to skip the queue if a lock is already held by the same handle + fn can_jump_queue(&self, id: usize) -> bool { + self.writers.contains(&id) || self.readers.contains(&id) + } fn write_available(&self, id: usize) -> bool { - self.write_free(id) && self.read_free(id) && self.reqs.is_empty() + self.write_free(id) + && self.read_free(id) + && (self.reqs.is_empty() || self.can_jump_queue(id)) } fn read_available(&self, id: usize) -> bool { - self.write_free(id) && self.reqs.is_empty() + self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) } fn handle_request( &mut self, @@ -88,7 +94,7 @@ impl Node { self.readers.push(req.lock_info.handle_id); req.process(returned_locks) } else { - self.reqs.push_back(Some(req)); + self.reqs.push(req); None } } @@ -120,32 +126,16 @@ impl Node { &mut self, returned_locks: &mut Vec>, ) -> Vec { - let mut ids_processed = HashSet::new(); - let mut only_matching = false; let mut res = Vec::new(); - let mut tmp_reqs = std::mem::take(&mut self.reqs); - for req_opt in &mut tmp_reqs { - if let Some(req) = req_opt { - if !only_matching || ids_processed.contains(&req.lock_info.handle_id) { - if (req.lock_info.write() && self.write_available(req.lock_info.handle_id)) - || self.read_available(req.lock_info.handle_id) - { - ids_processed.insert(req.lock_info.handle_id); - if let Some(req) = req_opt.take() { - if let Some(req) = self.handle_request(req, returned_locks) { - res.push(req); - } - } - } else { - only_matching = true; - } + for req in std::mem::take(&mut self.reqs) { + if (req.lock_info.write() && self.write_available(req.lock_info.handle_id)) + || self.read_available(req.lock_info.handle_id) + { + if let Some(req) = self.handle_request(req, returned_locks) { + res.push(req); } } } - self.reqs = tmp_reqs; - while matches!(self.reqs.get(0), Some(&None)) { - self.reqs.pop_front(); - } res } } diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 22d7c62..ebb493f 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -22,7 +22,7 @@ impl Deserialize<'de>> Deref for ModelData { } } impl Deserialize<'de>> ModelData { - pub fn to_owned(self) -> T { + pub fn into_owned(self) -> T { self.0 } } @@ -167,6 +167,20 @@ impl< { } +macro_rules! impl_simple_has_model { + ($($ty:ty),*) => { + $( + impl HasModel for $ty { + type Model = Model<$ty>; + } + )* + }; +} + +impl_simple_has_model!( + bool, char, f32, f64, i128, i16, i32, i64, i8, isize, u128, u16, u32, u64, u8, usize, String +); + #[derive(Debug)] pub struct BoxModel Deserialize<'de>>(T::Model); impl Deserialize<'de>> Deref for BoxModel { @@ -175,6 +189,11 @@ impl Deserialize<'de>> Deref for BoxModel &self.0 } } +impl Deserialize<'de>> DerefMut for BoxModel { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} impl Deserialize<'de>> From>> for BoxModel { fn from(model: Model>) -> Self { BoxModel(T::Model::from(JsonPointer::from(model))) @@ -260,13 +279,13 @@ impl Deserialize<'de>> OptionModel { } pub fn and_then< - F: FnOnce(T::Model) -> U::Model, + F: FnOnce(T::Model) -> OptionModel, U: Serialize + for<'de> Deserialize<'de> + HasModel, >( self, f: F, ) -> OptionModel { - OptionModel(f(self.0)) + f(self.0) } pub async fn delete(&self, db: &mut Db) -> Result>, Error> { @@ -277,14 +296,14 @@ impl Deserialize<'de>> OptionModel { impl OptionModel where T: HasModel + Serialize + for<'de> Deserialize<'de>, - T::Model: AsMut>, + T::Model: DerefMut>, { pub async fn check(mut self, db: &mut Db) -> Result, Error> { let lock = db .locker() .lock(db.id(), self.0.as_ref().clone(), false) .await; - self.0.as_mut().lock = Some(Arc::new(lock)); + self.0.lock = Some(Arc::new(lock)); Ok(if self.exists(db, false).await? { Some(self.0) } else { @@ -355,6 +374,11 @@ impl Deserialize<'de>> Deref for VecModel { &self.0 } } +impl Deserialize<'de>> DerefMut for VecModel { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} impl Deserialize<'de>> VecModel { pub fn idx(self, idx: usize) -> Model> { self.0.child(&format!("{}", idx)) @@ -443,6 +467,15 @@ where &self.0 } } +impl DerefMut for MapModel +where + T: Serialize + for<'de> Deserialize<'de> + Map, + T::Value: Serialize + for<'de> Deserialize<'de>, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} impl std::clone::Clone for MapModel where T: Serialize + for<'de> Deserialize<'de> + Map,