From 7f5b20b4d56c2f6a250d6b981acbdfd592dbc935 Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Thu, 30 Dec 2021 17:49:04 -0700 Subject: [PATCH] first pass at deadlock detection --- patch-db/src/locker.rs | 209 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 195 insertions(+), 14 deletions(-) diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index 5dcc89a..e8d2d71 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -22,7 +22,7 @@ impl Locker { // futures::future::select_all will panic if the list is empty // instead we want it to block forever by adding a channel that will never recv let (_dummy_send, dummy_recv) = oneshot::channel(); - let mut locks_on_lease = vec![dummy_recv]; + let mut unlock_receivers = vec![dummy_recv]; let (_dummy_send, dummy_recv) = oneshot::channel(); let mut cancellations = vec![dummy_recv]; @@ -30,14 +30,14 @@ impl Locker { let mut lock_order_enforcer = LockOrderEnforcer::new(); while let Some(action) = - get_action(&mut new_requests, &mut locks_on_lease, &mut cancellations).await + get_action(&mut new_requests, &mut unlock_receivers, &mut cancellations).await { #[cfg(feature = "tracing")] fn display_session_set(set: &OrdSet) -> String { use std::fmt::Write; let mut display = String::from("{"); for session in set.iter() { - write!(display, "{},", session.id); + write!(display, "{},", session.id).unwrap(); } display.replace_range(display.len() - 1.., "}"); display @@ -49,7 +49,7 @@ impl Locker { hot_seat: Option<&(Request, OrdSet)>, req: Request, trie: &mut Trie, - locks_on_lease: &mut Vec>, + unlock_receivers: &mut Vec>, request_queue: &mut VecDeque<(Request, OrdSet)>, ) { match hot_seat { @@ -72,7 +72,8 @@ impl Locker { &hot_req.lock_info.handle_id.id ); } - request_queue.push_back((req, OrdSet::new())) + request_queue.push_back((req, OrdSet::new())); + process_deadlocks(request_queue, &*trie); } // otherwise we try and service it immediately, only pushing to the queue if it fails _ => match trie.try_lock(&req.lock_info) { @@ -85,7 +86,7 @@ impl Locker { &req.lock_info.ptr, ); let lease = req.complete(); - locks_on_lease.push(lease); + unlock_receivers.push(lease); } Err(blocking_sessions) => { #[cfg(feature = "tracing")] @@ -101,7 +102,8 @@ impl Locker { &display_session_set(&blocking_sessions) ); } - request_queue.push_back((req, blocking_sessions)) + request_queue.push_back((req, blocking_sessions)); + process_deadlocks(request_queue, &*trie); } }, } @@ -121,7 +123,7 @@ impl Locker { hot_seat.as_ref(), req, &mut trie, - &mut locks_on_lease, + &mut unlock_receivers, &mut request_queue, ); if let Some(hot_seat) = hot_seat { @@ -157,7 +159,7 @@ impl Locker { &r.lock_info.ptr, ); let lease = r.complete(); - locks_on_lease.push(lease); + unlock_receivers.push(lease); } Err(new_blocking_sessions) => { // set the hot seat and proceed to step two @@ -174,7 +176,7 @@ impl Locker { hot_seat.as_ref(), r, &mut trie, - &mut locks_on_lease, + &mut unlock_receivers, &mut request_queue, ) } @@ -266,6 +268,97 @@ struct RequestQueue { closed: bool, recv: mpsc::UnboundedReceiver, } +fn deadlock_scan<'a>(queue: &'a VecDeque<(Request, OrdSet)>) -> Vec<&'a Request> { + let (wait_map, mut req_map) = queue + .iter() + .map(|(req, set)| ((&req.lock_info.handle_id, set, req))) + .fold( + ( + OrdMap::<&'a HandleId, &'a OrdSet>::new(), + OrdMap::<&'a HandleId, &'a Request>::new(), + ), + |(mut wmap, mut rmap), (id, wset, req)| { + ( + { + wmap.insert(id, wset); + wmap + }, + { + rmap.insert(id, req); + rmap + }, + ) + }, + ); + fn path_to<'a>( + graph: &OrdMap<&'a HandleId, &'a OrdSet>, + root: &'a HandleId, + node: &'a HandleId, + ) -> OrdSet<&'a HandleId> { + if node == root { + return ordset![root]; + } + match graph.get(node) { + None => ordset![], + Some(s) => s + .iter() + .find_map(|h| Some(path_to(graph, root, h)).filter(|s| s.is_empty())) + .map_or(ordset![], |mut s| { + s.insert(node); + s + }), + } + } + for (root, wait_set) in wait_map.iter() { + let cycle = wait_set + .iter() + .find_map(|start| Some(path_to(&wait_map, root, start)).filter(|s| s.is_empty())); + match cycle { + None => { + continue; + } + Some(c) => { + return c + .into_iter() + .map(|id| req_map.remove(id).unwrap()) + .collect(); + } + } + } + vec![] +} + +fn process_deadlocks(request_queue: &mut VecDeque<(Request, OrdSet)>, trie: &Trie) { + let deadlocked_reqs = deadlock_scan(request_queue); + if !deadlocked_reqs.is_empty() { + #[cfg(feature = "tracing")] + tracing::info!("Deadlock Detected: {:?}", deadlocked_reqs); + let locks_waiting = LockSet( + deadlocked_reqs + .iter() + .map(|r| r.lock_info.clone()) + .collect(), + ); + let err = LockError::DeadlockDetected { + locks_waiting, + locks_held: LockSet(trie.subtree_lock_info()), + }; + let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len()); + for (i, (req, _)) in request_queue.iter().enumerate() { + if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) { + indices_to_remove.push(i) + } + } + let old = std::mem::take(request_queue); + for (i, (r, s)) in old.into_iter().enumerate() { + if indices_to_remove.contains(&i) { + r.reject(err.clone()) + } else { + request_queue.push_back((r, s)) + } + } + } +} #[derive(Debug)] enum Action { @@ -486,6 +579,50 @@ impl Trie { .fold(OrdSet::new(), OrdSet::union); self.state.sessions().union(children) } + fn subtree_lock_info<'a>(&'a self) -> OrdSet { + let mut acc = self + .children + .iter() + .map(|(s, t)| { + t.subtree_lock_info() + .into_iter() + .map(|mut i| LockInfo { + ty: i.ty, + handle_id: i.handle_id, + ptr: { + i.ptr.push_start(s); + i.ptr + }, + }) + .collect() + }) + .fold(ordset![], OrdSet::union); + let self_writes = self.state.write_session().map(|session| LockInfo { + handle_id: session.clone(), + ptr: JsonPointer::default(), + ty: LockType::Write, + }); + let self_reads = self + .state + .read_sessions() + .into_iter() + .map(|session| LockInfo { + handle_id: session.clone(), + ptr: JsonPointer::default(), + ty: LockType::Read, + }); + let self_exists = self + .state + .exist_sessions() + .into_iter() + .map(|session| LockInfo { + handle_id: session.clone(), + ptr: JsonPointer::default(), + ty: LockType::Exist, + }); + acc.extend(self_writes.into_iter().chain(self_reads).chain(self_exists)); + acc + } fn ancestors_and_trie<'a, S: AsRef, V: SegList>( &'a self, ptr: &JsonPointer, @@ -1010,11 +1147,11 @@ impl Default for LockState { } } -#[derive(Debug, Clone, Default, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] struct LockInfo { + handle_id: HandleId, ptr: JsonPointer, ty: LockType, - handle_id: HandleId, } impl LockInfo { fn conflicts_with(&self, other: &LockInfo) -> bool { @@ -1056,6 +1193,45 @@ impl LockInfo { } } } +impl std::fmt::Display for LockInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}{}{}", self.handle_id.id, self.ty, self.ptr) + } +} + +#[derive(Debug, Clone)] +pub struct LockSet(OrdSet); +impl std::fmt::Display for LockSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let by_session = self + .0 + .iter() + .map(|i| (&i.handle_id, ordset![(&i.ptr, &i.ty)])) + .fold( + ordmap! {}, + |m: OrdMap<&HandleId, OrdSet<(&JsonPointer, &LockType)>>, (id, s)| { + m.update_with(&id, s, OrdSet::union) + }, + ); + let num_sessions = by_session.len(); + for (i, (session, set)) in by_session.into_iter().enumerate() { + write!(f, "{}: {{ ", session.id)?; + let num_entries = set.len(); + for (j, (ptr, ty)) in set.into_iter().enumerate() { + write!(f, "{}{}", ty, ptr)?; + if j == num_entries - 1 { + write!(f, " }}")?; + } else { + write!(f, ", ")?; + } + } + if i != num_sessions - 1 { + write!(f, "\n")?; + } + } + Ok(()) + } +} #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum LockType { @@ -1110,6 +1286,11 @@ pub enum LockError { first: JsonPointer, second: JsonPointer, }, + #[error("Deadlock Detected: Locks Held = {locks_held}, Locks Waiting = {locks_waiting}")] + DeadlockDetected { + locks_held: LockSet, + locks_waiting: LockSet, + }, } #[derive(Debug)] @@ -1172,8 +1353,8 @@ proptest! { #[test] fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) { let mut orig = LockState::Free; - orig.try_lock(HandleId{id: session }, &typ); - orig.try_unlock(&HandleId{id: session }, &typ); + orig.try_lock(HandleId{id: session}, &typ); + orig.try_unlock(&HandleId{id: session}, &typ); prop_assert_eq!(orig, LockState::Free); } }