mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
works to the best of my knowledge
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedReceiver},
|
||||
oneshot,
|
||||
};
|
||||
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use super::{LockInfo, Request};
|
||||
|
||||
@@ -20,13 +18,16 @@ pub(super) struct ActionMux {
|
||||
inbound_request_queue: InboundRequestQueue,
|
||||
unlock_receivers: Vec<oneshot::Receiver<LockInfo>>,
|
||||
cancellation_receivers: Vec<oneshot::Receiver<LockInfo>>,
|
||||
_dummy_senders: Vec<oneshot::Sender<LockInfo>>,
|
||||
}
|
||||
impl ActionMux {
|
||||
pub fn new(inbound_receiver: UnboundedReceiver<Request>) -> Self {
|
||||
// futures::future::select_all will panic if the list is empty
|
||||
// instead we want it to block forever by adding a channel that will never recv
|
||||
let unlock_receivers = vec![oneshot::channel().1];
|
||||
let cancellation_receivers = vec![oneshot::channel().1];
|
||||
let (unlock_dummy_send, unlock_dummy_recv) = oneshot::channel();
|
||||
let unlock_receivers = vec![unlock_dummy_recv];
|
||||
let (cancel_dummy_send, cancel_dummy_recv) = oneshot::channel();
|
||||
let cancellation_receivers = vec![cancel_dummy_recv];
|
||||
ActionMux {
|
||||
inbound_request_queue: InboundRequestQueue {
|
||||
recv: inbound_receiver,
|
||||
@@ -34,6 +35,7 @@ impl ActionMux {
|
||||
},
|
||||
unlock_receivers,
|
||||
cancellation_receivers,
|
||||
_dummy_senders: vec![unlock_dummy_send, cancel_dummy_send],
|
||||
}
|
||||
}
|
||||
pub async fn get_action(&mut self) -> Option<Action> {
|
||||
|
||||
@@ -53,6 +53,7 @@ impl LockBookkeeper {
|
||||
|
||||
if let Some(hot_seat) = hot_seat {
|
||||
self.deferred_request_queue.push_front(hot_seat);
|
||||
kill_deadlocked(&mut self.deferred_request_queue, &mut self.trie);
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
@@ -126,6 +127,7 @@ impl LockBookkeeper {
|
||||
}
|
||||
if let Some(hot_seat) = hot_seat {
|
||||
self.deferred_request_queue.push_front(hot_seat);
|
||||
kill_deadlocked(&mut self.deferred_request_queue, &mut self.trie);
|
||||
}
|
||||
new_unlock_receivers
|
||||
}
|
||||
@@ -157,7 +159,6 @@ fn process_new_req(
|
||||
}
|
||||
|
||||
request_queue.push_back((req, ordset![]));
|
||||
kill_deadlocked(request_queue, &*trie);
|
||||
None
|
||||
}
|
||||
// otherwise we try and service it immediately, only pushing to the queue if it fails
|
||||
@@ -179,7 +180,6 @@ fn process_new_req(
|
||||
}
|
||||
|
||||
request_queue.push_back((req, blocking_sessions));
|
||||
kill_deadlocked(request_queue, &*trie);
|
||||
None
|
||||
}
|
||||
},
|
||||
@@ -189,8 +189,7 @@ fn process_new_req(
|
||||
fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>, trie: &LockTrie) {
|
||||
// TODO optimize this, it is unlikely that we are anywhere close to as efficient as we can be here.
|
||||
let deadlocked_reqs = deadlock_scan(request_queue);
|
||||
let last = request_queue.back().unwrap();
|
||||
if !deadlocked_reqs.is_empty() && deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, &last.0)) {
|
||||
if !deadlocked_reqs.is_empty() {
|
||||
let locks_waiting = LockSet(
|
||||
deadlocked_reqs
|
||||
.iter()
|
||||
@@ -204,23 +203,20 @@ fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>, tr
|
||||
locks_held: LockSet(trie.subtree_lock_info()),
|
||||
};
|
||||
|
||||
request_queue.pop_back().unwrap().0.reject(err);
|
||||
|
||||
// This commented logic is for if we want to kill the whole cycle, rather than the most recent addition
|
||||
// let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len());
|
||||
// for (i, (req, _)) in request_queue.iter().enumerate() {
|
||||
// if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) {
|
||||
// indices_to_remove.push(i)
|
||||
// }
|
||||
// }
|
||||
// let old = std::mem::take(request_queue);
|
||||
// for (i, (r, s)) in old.into_iter().enumerate() {
|
||||
// if indices_to_remove.contains(&i) {
|
||||
// r.reject(err.clone())
|
||||
// } else {
|
||||
// request_queue.push_back((r, s))
|
||||
// }
|
||||
// }
|
||||
let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len());
|
||||
for (i, (req, _)) in request_queue.iter().enumerate() {
|
||||
if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) {
|
||||
indices_to_remove.push(i)
|
||||
}
|
||||
}
|
||||
let old = std::mem::take(request_queue);
|
||||
for (i, (r, s)) in old.into_iter().enumerate() {
|
||||
if indices_to_remove.contains(&i) {
|
||||
r.reject(err.clone())
|
||||
} else {
|
||||
request_queue.push_back((r, s))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,6 +76,7 @@ impl Locker {
|
||||
ptr,
|
||||
ty: lock_type,
|
||||
};
|
||||
println!("{}", &lock_info);
|
||||
let (send, recv) = oneshot::channel();
|
||||
let (cancel_send, cancel_recv) = oneshot::channel();
|
||||
let mut cancel_guard = CancelGuard {
|
||||
@@ -249,7 +250,7 @@ pub enum LockError {
|
||||
first: JsonPointer,
|
||||
second: JsonPointer,
|
||||
},
|
||||
#[error("Deadlock Detected: Locks Held = {locks_held}, Locks Waiting = {locks_waiting}")]
|
||||
#[error("Deadlock Detected:\nLocks Held =\n{locks_held},\nLocks Waiting =\n{locks_waiting}")]
|
||||
DeadlockDetected {
|
||||
locks_held: LockSet,
|
||||
locks_waiting: LockSet,
|
||||
|
||||
@@ -11,7 +11,8 @@ mod tests {
|
||||
|
||||
use crate::handle::HandleId;
|
||||
use crate::locker::bookkeeper::{deadlock_scan, path_to};
|
||||
use crate::locker::{CancelGuard, Guard, LockInfo, LockType, Request};
|
||||
use crate::locker::{CancelGuard, Guard, LockError, LockInfo, LockType, Request};
|
||||
use crate::Locker;
|
||||
|
||||
// enum Action {
|
||||
// Acquire {
|
||||
@@ -157,7 +158,7 @@ mod tests {
|
||||
req.0,
|
||||
ordset![HandleId {
|
||||
id: dep,
|
||||
#[cfg(feature = "tracing-error")]
|
||||
#[cfg(feature = "trace")]
|
||||
trace: None
|
||||
}],
|
||||
));
|
||||
@@ -225,6 +226,50 @@ mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deadlock_kill_live() {
|
||||
let locker = Locker::new();
|
||||
let s0 = HandleId {
|
||||
id: 0,
|
||||
#[cfg(feature = "trace")]
|
||||
trace: None,
|
||||
};
|
||||
let s1 = HandleId {
|
||||
id: 1,
|
||||
#[cfg(feature = "trace")]
|
||||
trace: None,
|
||||
};
|
||||
let x = locker
|
||||
.lock(s0.clone(), "/a/b".parse().unwrap(), LockType::Read)
|
||||
.await;
|
||||
assert!(x.is_ok());
|
||||
let y = locker
|
||||
.lock(s1.clone(), "/a/b".parse().unwrap(), LockType::Read)
|
||||
.await;
|
||||
assert!(y.is_ok());
|
||||
let x = tokio::select! {
|
||||
r0 = locker.lock(s0, "/a/b".parse().unwrap(), LockType::Write) => r0,
|
||||
r1 = locker.lock(s1, "/a/b".parse().unwrap(), LockType::Write) => r1,
|
||||
};
|
||||
match x {
|
||||
Ok(g) => {
|
||||
println!("wat");
|
||||
drop(g);
|
||||
assert!(false);
|
||||
}
|
||||
Err(e) => match e {
|
||||
LockError::DeadlockDetected { .. } => {
|
||||
println!("{}", e);
|
||||
}
|
||||
_ => {
|
||||
println!("{}", e);
|
||||
#[cfg(not(feature = "unstable"))]
|
||||
assert!(false);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn zero_or_one_write_lock_per_traversal(x in 0..10) {
|
||||
|
||||
Reference in New Issue
Block a user