simplify process_queue

This commit is contained in:
Aiden McClelland
2021-09-23 18:59:00 -06:00
parent 898d055f72
commit a1311e9803
3 changed files with 70 additions and 32 deletions

View File

@@ -181,6 +181,11 @@ fn build_model_struct(
&self.0
}
}
impl core::ops::DerefMut for #model_name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<patch_db::json_ptr::JsonPointer> for #model_name {
fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self {
#model_name(#inner_model::from(ptr))
@@ -276,6 +281,11 @@ fn build_model_struct(
&self.0
}
}
impl core::ops::DerefMut for #model_name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl #model_name {
#(
pub fn #child_fn_name(self) -> #child_model {
@@ -327,6 +337,11 @@ fn build_model_enum(base: &DeriveInput, _: &DataEnum, model_name: Option<Ident>)
&self.0
}
}
impl core::ops::DerefMut for #model_name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<patch_db::json_ptr::JsonPointer> for #model_name {
fn from(ptr: patch_db::json_ptr::JsonPointer) -> Self {
#model_name(From::from(ptr))

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use json_ptr::JsonPointer;
use tokio::sync::{mpsc, oneshot};
@@ -61,7 +61,7 @@ impl Drop for Guard {
struct Node {
readers: Vec<usize>,
writers: HashSet<usize>,
reqs: VecDeque<Option<Request>>,
reqs: Vec<Request>,
}
impl Node {
fn write_free(&self, id: usize) -> bool {
@@ -70,11 +70,17 @@ impl Node {
fn read_free(&self, id: usize) -> bool {
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
}
// allow a lock to skip the queue if a lock is already held by the same handle
fn can_jump_queue(&self, id: usize) -> bool {
self.writers.contains(&id) || self.readers.contains(&id)
}
fn write_available(&self, id: usize) -> bool {
self.write_free(id) && self.read_free(id) && self.reqs.is_empty()
self.write_free(id)
&& self.read_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id))
}
fn read_available(&self, id: usize) -> bool {
self.write_free(id) && self.reqs.is_empty()
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
}
fn handle_request(
&mut self,
@@ -88,7 +94,7 @@ impl Node {
self.readers.push(req.lock_info.handle_id);
req.process(returned_locks)
} else {
self.reqs.push_back(Some(req));
self.reqs.push(req);
None
}
}
@@ -120,32 +126,16 @@ impl Node {
&mut self,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Vec<Request> {
let mut ids_processed = HashSet::new();
let mut only_matching = false;
let mut res = Vec::new();
let mut tmp_reqs = std::mem::take(&mut self.reqs);
for req_opt in &mut tmp_reqs {
if let Some(req) = req_opt {
if !only_matching || ids_processed.contains(&req.lock_info.handle_id) {
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|| self.read_available(req.lock_info.handle_id)
{
ids_processed.insert(req.lock_info.handle_id);
if let Some(req) = req_opt.take() {
if let Some(req) = self.handle_request(req, returned_locks) {
res.push(req);
}
}
} else {
only_matching = true;
}
for req in std::mem::take(&mut self.reqs) {
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|| self.read_available(req.lock_info.handle_id)
{
if let Some(req) = self.handle_request(req, returned_locks) {
res.push(req);
}
}
}
self.reqs = tmp_reqs;
while matches!(self.reqs.get(0), Some(&None)) {
self.reqs.pop_front();
}
res
}
}

View File

@@ -22,7 +22,7 @@ impl<T: Serialize + for<'de> Deserialize<'de>> Deref for ModelData<T> {
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> ModelData<T> {
pub fn to_owned(self) -> T {
pub fn into_owned(self) -> T {
self.0
}
}
@@ -167,6 +167,20 @@ impl<
{
}
macro_rules! impl_simple_has_model {
($($ty:ty),*) => {
$(
impl HasModel for $ty {
type Model = Model<$ty>;
}
)*
};
}
impl_simple_has_model!(
bool, char, f32, f64, i128, i16, i32, i64, i8, isize, u128, u16, u32, u64, u8, usize, String
);
#[derive(Debug)]
pub struct BoxModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> Deref for BoxModel<T> {
@@ -175,6 +189,11 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> Deref for BoxModel<T>
&self.0
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> DerefMut for BoxModel<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> From<Model<Box<T>>> for BoxModel<T> {
fn from(model: Model<Box<T>>) -> Self {
BoxModel(T::Model::from(JsonPointer::from(model)))
@@ -260,13 +279,13 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
}
pub fn and_then<
F: FnOnce(T::Model) -> U::Model,
F: FnOnce(T::Model) -> OptionModel<U>,
U: Serialize + for<'de> Deserialize<'de> + HasModel,
>(
self,
f: F,
) -> OptionModel<U> {
OptionModel(f(self.0))
f(self.0)
}
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
@@ -277,14 +296,14 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
impl<T> OptionModel<T>
where
T: HasModel + Serialize + for<'de> Deserialize<'de>,
T::Model: AsMut<Model<T>>,
T::Model: DerefMut<Target = Model<T>>,
{
pub async fn check<Db: DbHandle>(mut self, db: &mut Db) -> Result<Option<T::Model>, Error> {
let lock = db
.locker()
.lock(db.id(), self.0.as_ref().clone(), false)
.await;
self.0.as_mut().lock = Some(Arc::new(lock));
self.0.lock = Some(Arc::new(lock));
Ok(if self.exists(db, false).await? {
Some(self.0)
} else {
@@ -355,6 +374,11 @@ impl<T: Serialize + for<'de> Deserialize<'de>> Deref for VecModel<T> {
&self.0
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> DerefMut for VecModel<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: Serialize + for<'de> Deserialize<'de>> VecModel<T> {
pub fn idx(self, idx: usize) -> Model<Option<T>> {
self.0.child(&format!("{}", idx))
@@ -443,6 +467,15 @@ where
&self.0
}
}
impl<T> DerefMut for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,
T::Value: Serialize + for<'de> Deserialize<'de>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> std::clone::Clone for MapModel<T>
where
T: Serialize + for<'de> Deserialize<'de> + Map,