mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
deadlock detection tested
This commit is contained in:
@@ -1,17 +1,19 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::{
|
||||
handle::HandleId,
|
||||
locker::{
|
||||
log_utils::{display_session_set, fmt_acquired, fmt_cancelled, fmt_deferred, fmt_released},
|
||||
LockSet,
|
||||
},
|
||||
};
|
||||
use imbl::{ordmap, ordset, OrdMap, OrdSet};
|
||||
use tokio::sync::oneshot;
|
||||
#[cfg(feature = "tracing")]
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use super::{order_enforcer::LockOrderEnforcer, trie::LockTrie, LockError, LockInfo, Request};
|
||||
use super::order_enforcer::LockOrderEnforcer;
|
||||
use super::trie::LockTrie;
|
||||
use super::{LockError, LockInfo, Request};
|
||||
use crate::handle::HandleId;
|
||||
#[cfg(feature = "tracing")]
|
||||
use crate::locker::log_utils::{
|
||||
display_session_set, fmt_acquired, fmt_cancelled, fmt_deferred, fmt_released,
|
||||
};
|
||||
use crate::locker::LockSet;
|
||||
|
||||
// solely responsible for managing the bookkeeping requirements of requests
|
||||
pub(super) struct LockBookkeeper {
|
||||
@@ -187,7 +189,8 @@ fn process_new_req(
|
||||
fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>, trie: &LockTrie) {
|
||||
// TODO optimize this, it is unlikely that we are anywhere close to as efficient as we can be here.
|
||||
let deadlocked_reqs = deadlock_scan(request_queue);
|
||||
if !deadlocked_reqs.is_empty() {
|
||||
let last = request_queue.back().unwrap();
|
||||
if !deadlocked_reqs.is_empty() && deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, &last.0)) {
|
||||
let locks_waiting = LockSet(
|
||||
deadlocked_reqs
|
||||
.iter()
|
||||
@@ -200,20 +203,24 @@ fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>, tr
|
||||
locks_waiting,
|
||||
locks_held: LockSet(trie.subtree_lock_info()),
|
||||
};
|
||||
let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len());
|
||||
for (i, (req, _)) in request_queue.iter().enumerate() {
|
||||
if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) {
|
||||
indices_to_remove.push(i)
|
||||
}
|
||||
}
|
||||
let old = std::mem::take(request_queue);
|
||||
for (i, (r, s)) in old.into_iter().enumerate() {
|
||||
if indices_to_remove.contains(&i) {
|
||||
r.reject(err.clone())
|
||||
} else {
|
||||
request_queue.push_back((r, s))
|
||||
}
|
||||
}
|
||||
|
||||
request_queue.pop_back().unwrap().0.reject(err);
|
||||
|
||||
// This commented logic is for if we want to kill the whole cycle, rather than the most recent addition
|
||||
// let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len());
|
||||
// for (i, (req, _)) in request_queue.iter().enumerate() {
|
||||
// if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) {
|
||||
// indices_to_remove.push(i)
|
||||
// }
|
||||
// }
|
||||
// let old = std::mem::take(request_queue);
|
||||
// for (i, (r, s)) in old.into_iter().enumerate() {
|
||||
// if indices_to_remove.contains(&i) {
|
||||
// r.reject(err.clone())
|
||||
// } else {
|
||||
// request_queue.push_back((r, s))
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,9 +246,9 @@ pub(super) fn deadlock_scan<'a>(
|
||||
},
|
||||
);
|
||||
for (root, wait_set) in wait_map.iter() {
|
||||
let cycle = wait_set
|
||||
.iter()
|
||||
.find_map(|start| Some(path_to(&wait_map, root, start)).filter(|s| s.is_empty()));
|
||||
let cycle = wait_set.iter().find_map(|start| {
|
||||
Some(path_to(&wait_map, ordset![], root, start)).filter(|s| !s.is_empty())
|
||||
});
|
||||
match cycle {
|
||||
None => {
|
||||
continue;
|
||||
@@ -259,17 +266,23 @@ pub(super) fn deadlock_scan<'a>(
|
||||
|
||||
pub(super) fn path_to<'a>(
|
||||
graph: &OrdMap<&'a HandleId, &'a OrdSet<HandleId>>,
|
||||
visited: OrdSet<&'a HandleId>,
|
||||
root: &'a HandleId,
|
||||
node: &'a HandleId,
|
||||
) -> OrdSet<&'a HandleId> {
|
||||
if node == root {
|
||||
return ordset![root];
|
||||
}
|
||||
if visited.contains(node) {
|
||||
return ordset![];
|
||||
}
|
||||
match graph.get(node) {
|
||||
None => ordset![],
|
||||
Some(s) => s
|
||||
.iter()
|
||||
.find_map(|h| Some(path_to(graph, root, h)).filter(|s| s.is_empty()))
|
||||
.find_map(|h| {
|
||||
Some(path_to(graph, visited.update(node), root, h)).filter(|s| !s.is_empty())
|
||||
})
|
||||
.map_or(ordset![], |mut s| {
|
||||
s.insert(node);
|
||||
s
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::LockInfo;
|
||||
use imbl::OrdSet;
|
||||
|
||||
use super::LockInfo;
|
||||
use crate::handle::HandleId;
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
@@ -31,6 +31,7 @@ pub(super) fn fmt_deferred(deferred_lock_info: &LockInfo) -> String {
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub(super) fn fmt_released(released_lock_info: &LockInfo) -> String {
|
||||
format!(
|
||||
"Released: session {} - {} lock on {}",
|
||||
@@ -38,6 +39,7 @@ pub(super) fn fmt_released(released_lock_info: &LockInfo) -> String {
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub(super) fn fmt_cancelled(cancelled_lock_info: &LockInfo) -> String {
|
||||
format!(
|
||||
"Canceled: session {} - {} lock on {}",
|
||||
|
||||
@@ -10,11 +10,13 @@ mod trie;
|
||||
use imbl::{ordmap, ordset, OrdMap, OrdSet};
|
||||
use json_ptr::JsonPointer;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
#[cfg(feature = "tracing")]
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
use crate::{handle::HandleId, locker::action_mux::Action};
|
||||
|
||||
use self::{action_mux::ActionMux, bookkeeper::LockBookkeeper};
|
||||
use self::action_mux::ActionMux;
|
||||
use self::bookkeeper::LockBookkeeper;
|
||||
use crate::handle::HandleId;
|
||||
use crate::locker::action_mux::Action;
|
||||
|
||||
pub struct Locker {
|
||||
sender: mpsc::UnboundedSender<Request>,
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use imbl::{ordmap, OrdMap};
|
||||
use json_ptr::JsonPointer;
|
||||
#[cfg(feature = "tracing")]
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{handle::HandleId, LockType};
|
||||
|
||||
use super::{LockError, LockInfo};
|
||||
use crate::handle::HandleId;
|
||||
use crate::LockType;
|
||||
|
||||
pub(super) struct LockOrderEnforcer {
|
||||
locks_held: OrdMap<HandleId, OrdMap<(JsonPointer, LockType), usize>>,
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
|
||||
use imbl::{ordmap, ordset, OrdMap, OrdSet};
|
||||
use json_ptr::JsonPointer;
|
||||
use proptest::prelude::*;
|
||||
use proptest::strategy::ValueTree;
|
||||
use proptest::test_runner::{Config, TestRunner};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::handle::HandleId;
|
||||
use crate::locker::bookkeeper::{deadlock_scan, path_to};
|
||||
use crate::locker::{CancelGuard, Guard, LockInfo, LockType, Request};
|
||||
use proptest::prelude::*;
|
||||
|
||||
// enum Action {
|
||||
// Acquire {
|
||||
@@ -96,6 +100,131 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn path_to_base_case(a in arb_handle_id(20), b in arb_handle_id(20)) {
|
||||
let b_set = ordset![b.clone()];
|
||||
let root = &b;
|
||||
let node = &a;
|
||||
let graph = ordmap!{&a => &b_set};
|
||||
prop_assert_eq!(path_to(&graph, ordset![], root, node), ordset![root, node]);
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn path_to_transitive_existence(v in proptest::collection::vec((arb_handle_id(5), arb_handle_id(5)).prop_filter("Self Dependency", |(a, b)| a != b), 1..20), x0 in arb_handle_id(5), x1 in arb_handle_id(5), x2 in arb_handle_id(5)) {
|
||||
let graph_owned = v.into_iter().fold(ordmap!{}, |m, (a, b)| m.update_with(a, ordset![b], OrdSet::union));
|
||||
let graph: OrdMap<&HandleId, &OrdSet<HandleId>> = graph_owned.iter().map(|(k, v)| (k, v)).collect();
|
||||
let avg_set_size = graph.values().fold(0, |a, b| a + b.len()) / graph.len();
|
||||
prop_assume!(avg_set_size >= 2);
|
||||
let k0 = path_to(&graph, ordset![], &x0, &x1);
|
||||
let k1 = path_to(&graph, ordset![], &x1, &x2);
|
||||
prop_assume!(!k0.is_empty());
|
||||
prop_assume!(!k1.is_empty());
|
||||
prop_assert!(!path_to(&graph, ordset![], &x0, &x2).is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn path_to_bounds_inclusion(v in proptest::collection::vec((arb_handle_id(5), arb_handle_id(5)).prop_filter("Self Dependency", |(a, b)| a != b), 1..20), x0 in arb_handle_id(5), x1 in arb_handle_id(5)) {
|
||||
let graph_owned = v.into_iter().fold(ordmap!{}, |m, (a, b)| m.update_with(a, ordset![b], OrdSet::union));
|
||||
let graph: OrdMap<&HandleId, &OrdSet<HandleId>> = graph_owned.iter().map(|(k, v)| (k, v)).collect();
|
||||
let avg_set_size = graph.values().fold(0, |a, b| a + b.len()) / graph.len();
|
||||
prop_assume!(avg_set_size >= 2);
|
||||
let k0 = path_to(&graph, ordset![], &x0, &x1);
|
||||
prop_assume!(!k0.is_empty());
|
||||
prop_assert!(k0.contains(&x0));
|
||||
prop_assert!(k0.contains(&x1));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deadlock_scan_base_case() {
|
||||
let mut harness = TestRunner::new(Config::default());
|
||||
let _ = harness.run(&proptest::bool::ANY, |_| {
|
||||
let mut runner = TestRunner::new(Config::default());
|
||||
let n = (2..10u64).new_tree(&mut runner).unwrap().current();
|
||||
println!("Begin");
|
||||
let mut c = VecDeque::default();
|
||||
let mut queue = VecDeque::default();
|
||||
for i in 0..n {
|
||||
let mut req = arb_request(1, 5).new_tree(&mut runner).unwrap().current();
|
||||
req.0.lock_info.handle_id.id = i;
|
||||
let dep = if i == n - 1 { 0 } else { i + 1 };
|
||||
queue.push_back((
|
||||
req.0,
|
||||
ordset![HandleId {
|
||||
id: dep,
|
||||
#[cfg(feature = "tracing-error")]
|
||||
trace: None
|
||||
}],
|
||||
));
|
||||
c.push_back(req.1);
|
||||
}
|
||||
for i in &queue {
|
||||
println!("{} => {:?}", i.0.lock_info.handle_id.id, i.1)
|
||||
}
|
||||
let set = deadlock_scan(&queue);
|
||||
println!("{:?}", set);
|
||||
assert!(!set.is_empty());
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deadlock_scan_inductive() {
|
||||
let mut harness = TestRunner::new(Config::default());
|
||||
let _ = harness.run(&proptest::bool::ANY, |_| {
|
||||
let mut runner = TestRunner::new(Config::default());
|
||||
let mut cancels = VecDeque::default();
|
||||
let mut queue = VecDeque::default();
|
||||
let (r, c) = arb_request(5, 5).new_tree(&mut runner).unwrap().current();
|
||||
queue.push_back((r, ordset![]));
|
||||
cancels.push_back(c);
|
||||
loop {
|
||||
if proptest::bool::ANY.new_tree(&mut runner).unwrap().current() {
|
||||
// add new edge
|
||||
let h = arb_handle_id(5).new_tree(&mut runner).unwrap().current();
|
||||
let i = (0..queue.len()).new_tree(&mut runner).unwrap().current();
|
||||
if let Some((r, s)) = queue.get_mut(i) {
|
||||
if r.lock_info.handle_id != h {
|
||||
s.insert(h);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// add new node
|
||||
let (r, c) = arb_request(5, 5).new_tree(&mut runner).unwrap().current();
|
||||
// but only if the session hasn't yet been used
|
||||
if queue
|
||||
.iter()
|
||||
.all(|(qr, _)| qr.lock_info.handle_id.id != r.lock_info.handle_id.id)
|
||||
{
|
||||
queue.push_back((r, ordset![]));
|
||||
cancels.push_back(c);
|
||||
}
|
||||
}
|
||||
let cycle = deadlock_scan(&queue)
|
||||
.into_iter()
|
||||
.map(|r| &r.lock_info.handle_id)
|
||||
.collect::<OrdSet<&HandleId>>();
|
||||
if !cycle.is_empty() {
|
||||
println!("Cycle: {:?}", cycle);
|
||||
for (r, s) in &queue {
|
||||
if cycle.contains(&r.lock_info.handle_id) {
|
||||
assert!(s.iter().any(|h| cycle.contains(h)))
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn zero_or_one_write_lock_per_traversal(x in 0..10) {
|
||||
|
||||
Reference in New Issue
Block a user