From 217d8dbdef298cfb464786d5992b506ce36a8bab Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Tue, 4 Jan 2022 16:53:37 -0700 Subject: [PATCH] deadlock detection tested --- patch-db/src/locker/bookkeeper.rs | 67 +++++++------ patch-db/src/locker/log_utils.rs | 4 +- patch-db/src/locker/mod.rs | 8 +- patch-db/src/locker/order_enforcer.rs | 5 +- patch-db/src/locker/proptest.rs | 133 +++++++++++++++++++++++++- 5 files changed, 182 insertions(+), 35 deletions(-) diff --git a/patch-db/src/locker/bookkeeper.rs b/patch-db/src/locker/bookkeeper.rs index 1cbc040..8c924ad 100644 --- a/patch-db/src/locker/bookkeeper.rs +++ b/patch-db/src/locker/bookkeeper.rs @@ -1,17 +1,19 @@ use std::collections::VecDeque; -use crate::{ - handle::HandleId, - locker::{ - log_utils::{display_session_set, fmt_acquired, fmt_cancelled, fmt_deferred, fmt_released}, - LockSet, - }, -}; use imbl::{ordmap, ordset, OrdMap, OrdSet}; use tokio::sync::oneshot; +#[cfg(feature = "tracing")] use tracing::{debug, error, info, warn}; -use super::{order_enforcer::LockOrderEnforcer, trie::LockTrie, LockError, LockInfo, Request}; +use super::order_enforcer::LockOrderEnforcer; +use super::trie::LockTrie; +use super::{LockError, LockInfo, Request}; +use crate::handle::HandleId; +#[cfg(feature = "tracing")] +use crate::locker::log_utils::{ + display_session_set, fmt_acquired, fmt_cancelled, fmt_deferred, fmt_released, +}; +use crate::locker::LockSet; // solely responsible for managing the bookkeeping requirements of requests pub(super) struct LockBookkeeper { @@ -187,7 +189,8 @@ fn process_new_req( fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet)>, trie: &LockTrie) { // TODO optimize this, it is unlikely that we are anywhere close to as efficient as we can be here. let deadlocked_reqs = deadlock_scan(request_queue); - if !deadlocked_reqs.is_empty() { + let last = request_queue.back().unwrap(); + if !deadlocked_reqs.is_empty() && deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, &last.0)) { let locks_waiting = LockSet( deadlocked_reqs .iter() @@ -200,20 +203,24 @@ fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet)>, tr locks_waiting, locks_held: LockSet(trie.subtree_lock_info()), }; - let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len()); - for (i, (req, _)) in request_queue.iter().enumerate() { - if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) { - indices_to_remove.push(i) - } - } - let old = std::mem::take(request_queue); - for (i, (r, s)) in old.into_iter().enumerate() { - if indices_to_remove.contains(&i) { - r.reject(err.clone()) - } else { - request_queue.push_back((r, s)) - } - } + + request_queue.pop_back().unwrap().0.reject(err); + + // This commented logic is for if we want to kill the whole cycle, rather than the most recent addition + // let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len()); + // for (i, (req, _)) in request_queue.iter().enumerate() { + // if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) { + // indices_to_remove.push(i) + // } + // } + // let old = std::mem::take(request_queue); + // for (i, (r, s)) in old.into_iter().enumerate() { + // if indices_to_remove.contains(&i) { + // r.reject(err.clone()) + // } else { + // request_queue.push_back((r, s)) + // } + // } } } @@ -239,9 +246,9 @@ pub(super) fn deadlock_scan<'a>( }, ); for (root, wait_set) in wait_map.iter() { - let cycle = wait_set - .iter() - .find_map(|start| Some(path_to(&wait_map, root, start)).filter(|s| s.is_empty())); + let cycle = wait_set.iter().find_map(|start| { + Some(path_to(&wait_map, ordset![], root, start)).filter(|s| !s.is_empty()) + }); match cycle { None => { continue; @@ -259,17 +266,23 @@ pub(super) fn deadlock_scan<'a>( pub(super) fn path_to<'a>( graph: &OrdMap<&'a HandleId, &'a OrdSet>, + visited: OrdSet<&'a HandleId>, root: &'a HandleId, node: &'a HandleId, ) -> OrdSet<&'a HandleId> { if node == root { return ordset![root]; } + if visited.contains(node) { + return ordset![]; + } match graph.get(node) { None => ordset![], Some(s) => s .iter() - .find_map(|h| Some(path_to(graph, root, h)).filter(|s| s.is_empty())) + .find_map(|h| { + Some(path_to(graph, visited.update(node), root, h)).filter(|s| !s.is_empty()) + }) .map_or(ordset![], |mut s| { s.insert(node); s diff --git a/patch-db/src/locker/log_utils.rs b/patch-db/src/locker/log_utils.rs index 434e1f5..ac43bbd 100644 --- a/patch-db/src/locker/log_utils.rs +++ b/patch-db/src/locker/log_utils.rs @@ -1,6 +1,6 @@ -use super::LockInfo; use imbl::OrdSet; +use super::LockInfo; use crate::handle::HandleId; #[cfg(feature = "tracing")] @@ -31,6 +31,7 @@ pub(super) fn fmt_deferred(deferred_lock_info: &LockInfo) -> String { ) } +#[cfg(feature = "tracing")] pub(super) fn fmt_released(released_lock_info: &LockInfo) -> String { format!( "Released: session {} - {} lock on {}", @@ -38,6 +39,7 @@ pub(super) fn fmt_released(released_lock_info: &LockInfo) -> String { ) } +#[cfg(feature = "tracing")] pub(super) fn fmt_cancelled(cancelled_lock_info: &LockInfo) -> String { format!( "Canceled: session {} - {} lock on {}", diff --git a/patch-db/src/locker/mod.rs b/patch-db/src/locker/mod.rs index 31b60fa..96346c5 100644 --- a/patch-db/src/locker/mod.rs +++ b/patch-db/src/locker/mod.rs @@ -10,11 +10,13 @@ mod trie; use imbl::{ordmap, ordset, OrdMap, OrdSet}; use json_ptr::JsonPointer; use tokio::sync::{mpsc, oneshot}; +#[cfg(feature = "tracing")] use tracing::{debug, trace, warn}; -use crate::{handle::HandleId, locker::action_mux::Action}; - -use self::{action_mux::ActionMux, bookkeeper::LockBookkeeper}; +use self::action_mux::ActionMux; +use self::bookkeeper::LockBookkeeper; +use crate::handle::HandleId; +use crate::locker::action_mux::Action; pub struct Locker { sender: mpsc::UnboundedSender, diff --git a/patch-db/src/locker/order_enforcer.rs b/patch-db/src/locker/order_enforcer.rs index 1f24e70..a0bdfde 100644 --- a/patch-db/src/locker/order_enforcer.rs +++ b/patch-db/src/locker/order_enforcer.rs @@ -1,10 +1,11 @@ use imbl::{ordmap, OrdMap}; use json_ptr::JsonPointer; +#[cfg(feature = "tracing")] use tracing::warn; -use crate::{handle::HandleId, LockType}; - use super::{LockError, LockInfo}; +use crate::handle::HandleId; +use crate::LockType; pub(super) struct LockOrderEnforcer { locks_held: OrdMap>, diff --git a/patch-db/src/locker/proptest.rs b/patch-db/src/locker/proptest.rs index 1eb2999..7e1cfeb 100644 --- a/patch-db/src/locker/proptest.rs +++ b/patch-db/src/locker/proptest.rs @@ -1,13 +1,17 @@ #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, VecDeque}; + use imbl::{ordmap, ordset, OrdMap, OrdSet}; use json_ptr::JsonPointer; + use proptest::prelude::*; + use proptest::strategy::ValueTree; + use proptest::test_runner::{Config, TestRunner}; use tokio::sync::oneshot; use crate::handle::HandleId; + use crate::locker::bookkeeper::{deadlock_scan, path_to}; use crate::locker::{CancelGuard, Guard, LockInfo, LockType, Request}; - use proptest::prelude::*; // enum Action { // Acquire { @@ -96,6 +100,131 @@ mod tests { } } + proptest! { + #[test] + fn path_to_base_case(a in arb_handle_id(20), b in arb_handle_id(20)) { + let b_set = ordset![b.clone()]; + let root = &b; + let node = &a; + let graph = ordmap!{&a => &b_set}; + prop_assert_eq!(path_to(&graph, ordset![], root, node), ordset![root, node]); + } + } + + proptest! { + #[test] + fn path_to_transitive_existence(v in proptest::collection::vec((arb_handle_id(5), arb_handle_id(5)).prop_filter("Self Dependency", |(a, b)| a != b), 1..20), x0 in arb_handle_id(5), x1 in arb_handle_id(5), x2 in arb_handle_id(5)) { + let graph_owned = v.into_iter().fold(ordmap!{}, |m, (a, b)| m.update_with(a, ordset![b], OrdSet::union)); + let graph: OrdMap<&HandleId, &OrdSet> = graph_owned.iter().map(|(k, v)| (k, v)).collect(); + let avg_set_size = graph.values().fold(0, |a, b| a + b.len()) / graph.len(); + prop_assume!(avg_set_size >= 2); + let k0 = path_to(&graph, ordset![], &x0, &x1); + let k1 = path_to(&graph, ordset![], &x1, &x2); + prop_assume!(!k0.is_empty()); + prop_assume!(!k1.is_empty()); + prop_assert!(!path_to(&graph, ordset![], &x0, &x2).is_empty()); + } + } + + proptest! { + #[test] + fn path_to_bounds_inclusion(v in proptest::collection::vec((arb_handle_id(5), arb_handle_id(5)).prop_filter("Self Dependency", |(a, b)| a != b), 1..20), x0 in arb_handle_id(5), x1 in arb_handle_id(5)) { + let graph_owned = v.into_iter().fold(ordmap!{}, |m, (a, b)| m.update_with(a, ordset![b], OrdSet::union)); + let graph: OrdMap<&HandleId, &OrdSet> = graph_owned.iter().map(|(k, v)| (k, v)).collect(); + let avg_set_size = graph.values().fold(0, |a, b| a + b.len()) / graph.len(); + prop_assume!(avg_set_size >= 2); + let k0 = path_to(&graph, ordset![], &x0, &x1); + prop_assume!(!k0.is_empty()); + prop_assert!(k0.contains(&x0)); + prop_assert!(k0.contains(&x1)); + } + } + + #[test] + fn deadlock_scan_base_case() { + let mut harness = TestRunner::new(Config::default()); + let _ = harness.run(&proptest::bool::ANY, |_| { + let mut runner = TestRunner::new(Config::default()); + let n = (2..10u64).new_tree(&mut runner).unwrap().current(); + println!("Begin"); + let mut c = VecDeque::default(); + let mut queue = VecDeque::default(); + for i in 0..n { + let mut req = arb_request(1, 5).new_tree(&mut runner).unwrap().current(); + req.0.lock_info.handle_id.id = i; + let dep = if i == n - 1 { 0 } else { i + 1 }; + queue.push_back(( + req.0, + ordset![HandleId { + id: dep, + #[cfg(feature = "tracing-error")] + trace: None + }], + )); + c.push_back(req.1); + } + for i in &queue { + println!("{} => {:?}", i.0.lock_info.handle_id.id, i.1) + } + let set = deadlock_scan(&queue); + println!("{:?}", set); + assert!(!set.is_empty()); + Ok(()) + }); + } + + #[test] + fn deadlock_scan_inductive() { + let mut harness = TestRunner::new(Config::default()); + let _ = harness.run(&proptest::bool::ANY, |_| { + let mut runner = TestRunner::new(Config::default()); + let mut cancels = VecDeque::default(); + let mut queue = VecDeque::default(); + let (r, c) = arb_request(5, 5).new_tree(&mut runner).unwrap().current(); + queue.push_back((r, ordset![])); + cancels.push_back(c); + loop { + if proptest::bool::ANY.new_tree(&mut runner).unwrap().current() { + // add new edge + let h = arb_handle_id(5).new_tree(&mut runner).unwrap().current(); + let i = (0..queue.len()).new_tree(&mut runner).unwrap().current(); + if let Some((r, s)) = queue.get_mut(i) { + if r.lock_info.handle_id != h { + s.insert(h); + } else { + continue; + } + } + } else { + // add new node + let (r, c) = arb_request(5, 5).new_tree(&mut runner).unwrap().current(); + // but only if the session hasn't yet been used + if queue + .iter() + .all(|(qr, _)| qr.lock_info.handle_id.id != r.lock_info.handle_id.id) + { + queue.push_back((r, ordset![])); + cancels.push_back(c); + } + } + let cycle = deadlock_scan(&queue) + .into_iter() + .map(|r| &r.lock_info.handle_id) + .collect::>(); + if !cycle.is_empty() { + println!("Cycle: {:?}", cycle); + for (r, s) in &queue { + if cycle.contains(&r.lock_info.handle_id) { + assert!(s.iter().any(|h| cycle.contains(h))) + } + } + break; + } + } + Ok(()) + }); + } + proptest! { #[test] fn zero_or_one_write_lock_per_traversal(x in 0..10) {