From 21fed0c0beefe95d40fbca25de95d3f34d90a4a4 Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Thu, 20 Jan 2022 16:06:22 -0700 Subject: [PATCH] fixes issue where locker actions may be reordered if they fire at close enough times --- patch-db/src/locker/action_mux.rs | 31 +++++++++++++- patch-db/src/locker/mod.rs | 59 +++++++++++++++------------ patch-db/src/locker/order_enforcer.rs | 1 + patch-db/src/locker/proptest.rs | 20 +++++++++ patch-db/src/test.rs | 15 ++++++- 5 files changed, 99 insertions(+), 27 deletions(-) diff --git a/patch-db/src/locker/action_mux.rs b/patch-db/src/locker/action_mux.rs index 57299cc..af96a9d 100644 --- a/patch-db/src/locker/action_mux.rs +++ b/patch-db/src/locker/action_mux.rs @@ -38,7 +38,7 @@ impl ActionMux { _dummy_senders: vec![unlock_dummy_send, cancel_dummy_send], } } - pub async fn get_action(&mut self) -> Option { + async fn get_action(&mut self) -> Option { loop { if self.inbound_request_queue.closed && self.unlock_receivers.len() == 1 @@ -68,6 +68,35 @@ impl ActionMux { } } + pub async fn get_prioritized_action_queue(&mut self) -> Vec { + if let Some(action) = self.get_action().await { + let mut actions = Vec::new(); + // find all serviceable lock releases + for mut r in std::mem::take(&mut self.unlock_receivers) { + if let Ok(lock_info) = r.try_recv() { + actions.push(Action::HandleRelease(lock_info)); + } else { + self.unlock_receivers.push(r); + } + } + + // find all serviceable lock cancellations + for mut r in std::mem::take(&mut self.cancellation_receivers) { + if let Ok(lock_info) = r.try_recv() { + actions.push(Action::HandleCancel(lock_info)); + } else { + self.cancellation_receivers.push(r); + } + } + + // finally add the action that started it all + actions.push(action); + actions + } else { + Vec::new() + } + } + pub fn push_unlock_receivers>>( &mut self, recv: T, diff --git a/patch-db/src/locker/mod.rs b/patch-db/src/locker/mod.rs index ce9d142..739c3e0 100644 --- a/patch-db/src/locker/mod.rs +++ b/patch-db/src/locker/mod.rs @@ -28,36 +28,45 @@ impl Locker { let mut action_mux = ActionMux::new(receiver); let mut lock_server = LockBookkeeper::new(); - while let Some(action) = action_mux.get_action().await { - #[cfg(feature = "tracing")] - trace!("Locker Action: {:#?}", action); - match action { - Action::HandleRequest(mut req) => { - #[cfg(feature = "tracing")] - debug!("New lock request: {}", &req.lock_info); + loop { + let actions = action_mux.get_prioritized_action_queue().await; + if actions.is_empty() { + break; + } + for action in actions { + #[cfg(feature = "tracing")] + trace!("Locker Action: {:#?}", action); + match action { + Action::HandleRequest(mut req) => { + #[cfg(feature = "tracing")] + debug!("New lock request: {}", &req.lock_info); - // Pertinent Logic - let req_cancel = req.cancel.take().expect("Request Cancellation Stolen"); - match lock_server.lease(req) { - Ok(Some(recv)) => { - action_mux.push_unlock_receivers(std::iter::once(recv)) + // Pertinent Logic + let req_cancel = + req.cancel.take().expect("Request Cancellation Stolen"); + match lock_server.lease(req) { + Ok(Some(recv)) => { + action_mux.push_unlock_receivers(std::iter::once(recv)) + } + Ok(None) => action_mux.push_cancellation_receiver(req_cancel), + Err(_) => {} } - Ok(None) => action_mux.push_cancellation_receiver(req_cancel), - Err(_) => {} } - } - Action::HandleRelease(lock_info) => { - #[cfg(feature = "tracing")] - debug!("New lock release: {}", &lock_info); + Action::HandleRelease(lock_info) => { + #[cfg(feature = "tracing")] + debug!("New lock release: {}", &lock_info); - let new_unlock_receivers = lock_server.ret(&lock_info); - action_mux.push_unlock_receivers(new_unlock_receivers); - } - Action::HandleCancel(lock_info) => { - #[cfg(feature = "tracing")] - debug!("New request canceled: {}", &lock_info); + println!("Release Called {}", &lock_info); - lock_server.cancel(&lock_info) + let new_unlock_receivers = lock_server.ret(&lock_info); + action_mux.push_unlock_receivers(new_unlock_receivers); + } + Action::HandleCancel(lock_info) => { + #[cfg(feature = "tracing")] + debug!("New request canceled: {}", &lock_info); + + lock_server.cancel(&lock_info) + } } } } diff --git a/patch-db/src/locker/order_enforcer.rs b/patch-db/src/locker/order_enforcer.rs index a0bdfde..a1c2d0b 100644 --- a/patch-db/src/locker/order_enforcer.rs +++ b/patch-db/src/locker/order_enforcer.rs @@ -7,6 +7,7 @@ use super::{LockError, LockInfo}; use crate::handle::HandleId; use crate::LockType; +#[derive(Debug, PartialEq, Eq)] pub(super) struct LockOrderEnforcer { locks_held: OrdMap>, } diff --git a/patch-db/src/locker/proptest.rs b/patch-db/src/locker/proptest.rs index 1f6daf8..79ce88e 100644 --- a/patch-db/src/locker/proptest.rs +++ b/patch-db/src/locker/proptest.rs @@ -289,6 +289,26 @@ mod tests { } } + proptest! { + #[test] + fn enforcer_lock_inverse_identity(lock_order in proptest::collection::vec(arb_lock_info(1,3), 1..30)) { + use crate::locker::order_enforcer::LockOrderEnforcer; + use rand::seq::SliceRandom; + let mut enforcer = LockOrderEnforcer::new(); + for i in &lock_order { + enforcer.try_insert(i); + } + let mut release_order = lock_order.clone(); + let slice: &mut [LockInfo] = &mut release_order[..]; + slice.shuffle(&mut rand::thread_rng()); + prop_assert!(enforcer != LockOrderEnforcer::new()); + for i in &release_order { + enforcer.remove(i); + } + prop_assert_eq!(enforcer, LockOrderEnforcer::new()); + } + } + proptest! { #[test] fn existence_ancestors_dont_block_descendent_writes(s0 in arb_handle_id(10), s1 in arb_handle_id(10), mut ptr0 in arb_json_ptr(3), ptr1 in arb_json_ptr(3)) { diff --git a/patch-db/src/test.rs b/patch-db/src/test.rs index 9febaa8..b39ef37 100644 --- a/patch-db/src/test.rs +++ b/patch-db/src/test.rs @@ -8,7 +8,7 @@ use serde_json::Value; use tokio::fs; use tokio::runtime::Builder; -use crate::{self as patch_db, DbHandle}; +use crate::{self as patch_db, DbHandle, LockType}; async fn init_db(db_name: String) -> PatchDb { cleanup_db(&db_name).await; @@ -101,3 +101,16 @@ pub struct Child { #[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)] pub struct NewType(Option>); + +#[tokio::test] +async fn locks_dropped_from_enforcer_on_tx_save() { + let db = init_db("test.db".to_string()).await; + let mut handle = db.handle(); + let mut tx = handle.begin().await.unwrap(); + let ptr_a: JsonPointer = "/a".parse().unwrap(); + let ptr_b: JsonPointer = "/b".parse().unwrap(); + tx.lock(ptr_b, LockType::Write).await.unwrap(); + tx.save().await.unwrap(); + handle.lock(ptr_a, LockType::Write).await.unwrap(); + cleanup_db("test.db").await; +}