diff --git a/patch-db/src/locker/action_mux.rs b/patch-db/src/locker/action_mux.rs index 6367313..57299cc 100644 --- a/patch-db/src/locker/action_mux.rs +++ b/patch-db/src/locker/action_mux.rs @@ -1,7 +1,5 @@ -use tokio::sync::{ - mpsc::{self, UnboundedReceiver}, - oneshot, -}; +use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tokio::sync::oneshot; use super::{LockInfo, Request}; @@ -20,13 +18,16 @@ pub(super) struct ActionMux { inbound_request_queue: InboundRequestQueue, unlock_receivers: Vec>, cancellation_receivers: Vec>, + _dummy_senders: Vec>, } impl ActionMux { pub fn new(inbound_receiver: UnboundedReceiver) -> Self { // futures::future::select_all will panic if the list is empty // instead we want it to block forever by adding a channel that will never recv - let unlock_receivers = vec![oneshot::channel().1]; - let cancellation_receivers = vec![oneshot::channel().1]; + let (unlock_dummy_send, unlock_dummy_recv) = oneshot::channel(); + let unlock_receivers = vec![unlock_dummy_recv]; + let (cancel_dummy_send, cancel_dummy_recv) = oneshot::channel(); + let cancellation_receivers = vec![cancel_dummy_recv]; ActionMux { inbound_request_queue: InboundRequestQueue { recv: inbound_receiver, @@ -34,6 +35,7 @@ impl ActionMux { }, unlock_receivers, cancellation_receivers, + _dummy_senders: vec![unlock_dummy_send, cancel_dummy_send], } } pub async fn get_action(&mut self) -> Option { diff --git a/patch-db/src/locker/bookkeeper.rs b/patch-db/src/locker/bookkeeper.rs index 8c924ad..478ef0b 100644 --- a/patch-db/src/locker/bookkeeper.rs +++ b/patch-db/src/locker/bookkeeper.rs @@ -53,6 +53,7 @@ impl LockBookkeeper { if let Some(hot_seat) = hot_seat { self.deferred_request_queue.push_front(hot_seat); + kill_deadlocked(&mut self.deferred_request_queue, &mut self.trie); } Ok(res) } @@ -126,6 +127,7 @@ impl LockBookkeeper { } if let Some(hot_seat) = hot_seat { self.deferred_request_queue.push_front(hot_seat); + kill_deadlocked(&mut self.deferred_request_queue, &mut self.trie); } new_unlock_receivers } @@ -157,7 +159,6 @@ fn process_new_req( } request_queue.push_back((req, ordset![])); - kill_deadlocked(request_queue, &*trie); None } // otherwise we try and service it immediately, only pushing to the queue if it fails @@ -179,7 +180,6 @@ fn process_new_req( } request_queue.push_back((req, blocking_sessions)); - kill_deadlocked(request_queue, &*trie); None } }, @@ -189,8 +189,7 @@ fn process_new_req( fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet)>, trie: &LockTrie) { // TODO optimize this, it is unlikely that we are anywhere close to as efficient as we can be here. let deadlocked_reqs = deadlock_scan(request_queue); - let last = request_queue.back().unwrap(); - if !deadlocked_reqs.is_empty() && deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, &last.0)) { + if !deadlocked_reqs.is_empty() { let locks_waiting = LockSet( deadlocked_reqs .iter() @@ -204,23 +203,20 @@ fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet)>, tr locks_held: LockSet(trie.subtree_lock_info()), }; - request_queue.pop_back().unwrap().0.reject(err); - - // This commented logic is for if we want to kill the whole cycle, rather than the most recent addition - // let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len()); - // for (i, (req, _)) in request_queue.iter().enumerate() { - // if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) { - // indices_to_remove.push(i) - // } - // } - // let old = std::mem::take(request_queue); - // for (i, (r, s)) in old.into_iter().enumerate() { - // if indices_to_remove.contains(&i) { - // r.reject(err.clone()) - // } else { - // request_queue.push_back((r, s)) - // } - // } + let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len()); + for (i, (req, _)) in request_queue.iter().enumerate() { + if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) { + indices_to_remove.push(i) + } + } + let old = std::mem::take(request_queue); + for (i, (r, s)) in old.into_iter().enumerate() { + if indices_to_remove.contains(&i) { + r.reject(err.clone()) + } else { + request_queue.push_back((r, s)) + } + } } } diff --git a/patch-db/src/locker/mod.rs b/patch-db/src/locker/mod.rs index 96346c5..3c8fb81 100644 --- a/patch-db/src/locker/mod.rs +++ b/patch-db/src/locker/mod.rs @@ -76,6 +76,7 @@ impl Locker { ptr, ty: lock_type, }; + println!("{}", &lock_info); let (send, recv) = oneshot::channel(); let (cancel_send, cancel_recv) = oneshot::channel(); let mut cancel_guard = CancelGuard { @@ -249,7 +250,7 @@ pub enum LockError { first: JsonPointer, second: JsonPointer, }, - #[error("Deadlock Detected: Locks Held = {locks_held}, Locks Waiting = {locks_waiting}")] + #[error("Deadlock Detected:\nLocks Held =\n{locks_held},\nLocks Waiting =\n{locks_waiting}")] DeadlockDetected { locks_held: LockSet, locks_waiting: LockSet, diff --git a/patch-db/src/locker/proptest.rs b/patch-db/src/locker/proptest.rs index 7e1cfeb..fb999bd 100644 --- a/patch-db/src/locker/proptest.rs +++ b/patch-db/src/locker/proptest.rs @@ -11,7 +11,8 @@ mod tests { use crate::handle::HandleId; use crate::locker::bookkeeper::{deadlock_scan, path_to}; - use crate::locker::{CancelGuard, Guard, LockInfo, LockType, Request}; + use crate::locker::{CancelGuard, Guard, LockError, LockInfo, LockType, Request}; + use crate::Locker; // enum Action { // Acquire { @@ -157,7 +158,7 @@ mod tests { req.0, ordset![HandleId { id: dep, - #[cfg(feature = "tracing-error")] + #[cfg(feature = "trace")] trace: None }], )); @@ -225,6 +226,50 @@ mod tests { }); } + #[tokio::test] + async fn deadlock_kill_live() { + let locker = Locker::new(); + let s0 = HandleId { + id: 0, + #[cfg(feature = "trace")] + trace: None, + }; + let s1 = HandleId { + id: 1, + #[cfg(feature = "trace")] + trace: None, + }; + let x = locker + .lock(s0.clone(), "/a/b".parse().unwrap(), LockType::Read) + .await; + assert!(x.is_ok()); + let y = locker + .lock(s1.clone(), "/a/b".parse().unwrap(), LockType::Read) + .await; + assert!(y.is_ok()); + let x = tokio::select! { + r0 = locker.lock(s0, "/a/b".parse().unwrap(), LockType::Write) => r0, + r1 = locker.lock(s1, "/a/b".parse().unwrap(), LockType::Write) => r1, + }; + match x { + Ok(g) => { + println!("wat"); + drop(g); + assert!(false); + } + Err(e) => match e { + LockError::DeadlockDetected { .. } => { + println!("{}", e); + } + _ => { + println!("{}", e); + #[cfg(not(feature = "unstable"))] + assert!(false); + } + }, + } + } + proptest! { #[test] fn zero_or_one_write_lock_per_traversal(x in 0..10) {