first pass at deadlock detection

This commit is contained in:
Keagan McClelland
2021-12-30 17:49:04 -07:00
committed by Aiden McClelland
parent 89d2b31f41
commit 766e45dc5a

View File

@@ -22,7 +22,7 @@ impl Locker {
// futures::future::select_all will panic if the list is empty
// instead we want it to block forever by adding a channel that will never recv
let (_dummy_send, dummy_recv) = oneshot::channel();
let mut locks_on_lease = vec![dummy_recv];
let mut unlock_receivers = vec![dummy_recv];
let (_dummy_send, dummy_recv) = oneshot::channel();
let mut cancellations = vec![dummy_recv];
@@ -30,14 +30,14 @@ impl Locker {
let mut lock_order_enforcer = LockOrderEnforcer::new();
while let Some(action) =
get_action(&mut new_requests, &mut locks_on_lease, &mut cancellations).await
get_action(&mut new_requests, &mut unlock_receivers, &mut cancellations).await
{
#[cfg(feature = "tracing")]
fn display_session_set(set: &OrdSet<HandleId>) -> String {
use std::fmt::Write;
let mut display = String::from("{");
for session in set.iter() {
write!(display, "{},", session.id);
write!(display, "{},", session.id).unwrap();
}
display.replace_range(display.len() - 1.., "}");
display
@@ -49,7 +49,7 @@ impl Locker {
hot_seat: Option<&(Request, OrdSet<HandleId>)>,
req: Request,
trie: &mut Trie,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
unlock_receivers: &mut Vec<oneshot::Receiver<LockInfo>>,
request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>,
) {
match hot_seat {
@@ -72,7 +72,8 @@ impl Locker {
&hot_req.lock_info.handle_id.id
);
}
request_queue.push_back((req, OrdSet::new()))
request_queue.push_back((req, OrdSet::new()));
process_deadlocks(request_queue, &*trie);
}
// otherwise we try and service it immediately, only pushing to the queue if it fails
_ => match trie.try_lock(&req.lock_info) {
@@ -85,7 +86,7 @@ impl Locker {
&req.lock_info.ptr,
);
let lease = req.complete();
locks_on_lease.push(lease);
unlock_receivers.push(lease);
}
Err(blocking_sessions) => {
#[cfg(feature = "tracing")]
@@ -101,7 +102,8 @@ impl Locker {
&display_session_set(&blocking_sessions)
);
}
request_queue.push_back((req, blocking_sessions))
request_queue.push_back((req, blocking_sessions));
process_deadlocks(request_queue, &*trie);
}
},
}
@@ -121,7 +123,7 @@ impl Locker {
hot_seat.as_ref(),
req,
&mut trie,
&mut locks_on_lease,
&mut unlock_receivers,
&mut request_queue,
);
if let Some(hot_seat) = hot_seat {
@@ -157,7 +159,7 @@ impl Locker {
&r.lock_info.ptr,
);
let lease = r.complete();
locks_on_lease.push(lease);
unlock_receivers.push(lease);
}
Err(new_blocking_sessions) => {
// set the hot seat and proceed to step two
@@ -174,7 +176,7 @@ impl Locker {
hot_seat.as_ref(),
r,
&mut trie,
&mut locks_on_lease,
&mut unlock_receivers,
&mut request_queue,
)
}
@@ -266,6 +268,97 @@ struct RequestQueue {
closed: bool,
recv: mpsc::UnboundedReceiver<Request>,
}
fn deadlock_scan<'a>(queue: &'a VecDeque<(Request, OrdSet<HandleId>)>) -> Vec<&'a Request> {
let (wait_map, mut req_map) = queue
.iter()
.map(|(req, set)| ((&req.lock_info.handle_id, set, req)))
.fold(
(
OrdMap::<&'a HandleId, &'a OrdSet<HandleId>>::new(),
OrdMap::<&'a HandleId, &'a Request>::new(),
),
|(mut wmap, mut rmap), (id, wset, req)| {
(
{
wmap.insert(id, wset);
wmap
},
{
rmap.insert(id, req);
rmap
},
)
},
);
fn path_to<'a>(
graph: &OrdMap<&'a HandleId, &'a OrdSet<HandleId>>,
root: &'a HandleId,
node: &'a HandleId,
) -> OrdSet<&'a HandleId> {
if node == root {
return ordset![root];
}
match graph.get(node) {
None => ordset![],
Some(s) => s
.iter()
.find_map(|h| Some(path_to(graph, root, h)).filter(|s| s.is_empty()))
.map_or(ordset![], |mut s| {
s.insert(node);
s
}),
}
}
for (root, wait_set) in wait_map.iter() {
let cycle = wait_set
.iter()
.find_map(|start| Some(path_to(&wait_map, root, start)).filter(|s| s.is_empty()));
match cycle {
None => {
continue;
}
Some(c) => {
return c
.into_iter()
.map(|id| req_map.remove(id).unwrap())
.collect();
}
}
}
vec![]
}
fn process_deadlocks(request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>, trie: &Trie) {
let deadlocked_reqs = deadlock_scan(request_queue);
if !deadlocked_reqs.is_empty() {
#[cfg(feature = "tracing")]
tracing::info!("Deadlock Detected: {:?}", deadlocked_reqs);
let locks_waiting = LockSet(
deadlocked_reqs
.iter()
.map(|r| r.lock_info.clone())
.collect(),
);
let err = LockError::DeadlockDetected {
locks_waiting,
locks_held: LockSet(trie.subtree_lock_info()),
};
let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len());
for (i, (req, _)) in request_queue.iter().enumerate() {
if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) {
indices_to_remove.push(i)
}
}
let old = std::mem::take(request_queue);
for (i, (r, s)) in old.into_iter().enumerate() {
if indices_to_remove.contains(&i) {
r.reject(err.clone())
} else {
request_queue.push_back((r, s))
}
}
}
}
#[derive(Debug)]
enum Action {
@@ -486,6 +579,50 @@ impl Trie {
.fold(OrdSet::new(), OrdSet::union);
self.state.sessions().union(children)
}
fn subtree_lock_info<'a>(&'a self) -> OrdSet<LockInfo> {
let mut acc = self
.children
.iter()
.map(|(s, t)| {
t.subtree_lock_info()
.into_iter()
.map(|mut i| LockInfo {
ty: i.ty,
handle_id: i.handle_id,
ptr: {
i.ptr.push_start(s);
i.ptr
},
})
.collect()
})
.fold(ordset![], OrdSet::union);
let self_writes = self.state.write_session().map(|session| LockInfo {
handle_id: session.clone(),
ptr: JsonPointer::default(),
ty: LockType::Write,
});
let self_reads = self
.state
.read_sessions()
.into_iter()
.map(|session| LockInfo {
handle_id: session.clone(),
ptr: JsonPointer::default(),
ty: LockType::Read,
});
let self_exists = self
.state
.exist_sessions()
.into_iter()
.map(|session| LockInfo {
handle_id: session.clone(),
ptr: JsonPointer::default(),
ty: LockType::Exist,
});
acc.extend(self_writes.into_iter().chain(self_reads).chain(self_exists));
acc
}
fn ancestors_and_trie<'a, S: AsRef<str>, V: SegList>(
&'a self,
ptr: &JsonPointer<S, V>,
@@ -1010,11 +1147,11 @@ impl Default for LockState {
}
}
#[derive(Debug, Clone, Default, PartialEq)]
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
struct LockInfo {
handle_id: HandleId,
ptr: JsonPointer,
ty: LockType,
handle_id: HandleId,
}
impl LockInfo {
fn conflicts_with(&self, other: &LockInfo) -> bool {
@@ -1056,6 +1193,45 @@ impl LockInfo {
}
}
}
impl std::fmt::Display for LockInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}{}", self.handle_id.id, self.ty, self.ptr)
}
}
#[derive(Debug, Clone)]
pub struct LockSet(OrdSet<LockInfo>);
impl std::fmt::Display for LockSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let by_session = self
.0
.iter()
.map(|i| (&i.handle_id, ordset![(&i.ptr, &i.ty)]))
.fold(
ordmap! {},
|m: OrdMap<&HandleId, OrdSet<(&JsonPointer, &LockType)>>, (id, s)| {
m.update_with(&id, s, OrdSet::union)
},
);
let num_sessions = by_session.len();
for (i, (session, set)) in by_session.into_iter().enumerate() {
write!(f, "{}: {{ ", session.id)?;
let num_entries = set.len();
for (j, (ptr, ty)) in set.into_iter().enumerate() {
write!(f, "{}{}", ty, ptr)?;
if j == num_entries - 1 {
write!(f, " }}")?;
} else {
write!(f, ", ")?;
}
}
if i != num_sessions - 1 {
write!(f, "\n")?;
}
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LockType {
@@ -1110,6 +1286,11 @@ pub enum LockError {
first: JsonPointer,
second: JsonPointer,
},
#[error("Deadlock Detected: Locks Held = {locks_held}, Locks Waiting = {locks_waiting}")]
DeadlockDetected {
locks_held: LockSet,
locks_waiting: LockSet,
},
}
#[derive(Debug)]
@@ -1172,8 +1353,8 @@ proptest! {
#[test]
fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) {
let mut orig = LockState::Free;
orig.try_lock(HandleId{id: session }, &typ);
orig.try_unlock(&HandleId{id: session }, &typ);
orig.try_lock(HandleId{id: session}, &typ);
orig.try_unlock(&HandleId{id: session}, &typ);
prop_assert_eq!(orig, LockState::Free);
}
}