fixed bugs in locking exclusion logic

This commit is contained in:
Keagan McClelland
2021-12-15 09:53:31 -07:00
committed by Aiden McClelland
parent 69d9632cac
commit e95e97e96d
2 changed files with 193 additions and 64 deletions

View File

@@ -2,10 +2,11 @@ use std::collections::{BTreeMap, VecDeque};
use imbl::{ordset, OrdSet}; use imbl::{ordset, OrdSet};
use json_ptr::{JsonPointer, SegList}; use json_ptr::{JsonPointer, SegList};
#[cfg(test)]
use proptest::prelude::*;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::handle::HandleId; use crate::handle::HandleId;
pub struct Locker { pub struct Locker {
sender: mpsc::UnboundedSender<Request>, sender: mpsc::UnboundedSender<Request>,
} }
@@ -29,6 +30,16 @@ impl Locker {
while let Some(action) = while let Some(action) =
get_action(&mut new_requests, &mut locks_on_lease, &mut cancellations).await get_action(&mut new_requests, &mut locks_on_lease, &mut cancellations).await
{ {
#[cfg(feature = "tracing")]
fn display_session_set(set: &OrdSet<HandleId>) -> String {
use std::fmt::Write;
let mut display = String::from("{");
for session in set.iter() {
write!(display, "{},", session.id);
}
display.replace_range(display.len() - 1.., "}");
display
}
// to prevent starvation we privilege the front of the queue and only allow requests that // to prevent starvation we privilege the front of the queue and only allow requests that
// conflict with the request at the front to go through if they are requested by sessions that // conflict with the request at the front to go through if they are requested by sessions that
// are *currently blocking* the front of the queue // are *currently blocking* the front of the queue
@@ -46,15 +57,48 @@ impl Locker {
if hot_req.lock_info.conflicts_with(&req.lock_info) if hot_req.lock_info.conflicts_with(&req.lock_info)
&& !hot_blockers.contains(&req.lock_info.handle_id) => && !hot_blockers.contains(&req.lock_info.handle_id) =>
{ {
#[cfg(feature = "tracing")]
{
tracing::info!(
"Deferred: session {} - {} lock on {}",
&req.lock_info.handle_id.id,
&req.lock_info.ty,
&req.lock_info.ptr,
);
tracing::info!(
"Must wait on hot seat request from session {}",
&hot_req.lock_info.handle_id.id
);
}
request_queue.push_back((req, OrdSet::new())) request_queue.push_back((req, OrdSet::new()))
} }
// otherwise we try and service it immediately, only pushing to the queue if it fails // otherwise we try and service it immediately, only pushing to the queue if it fails
_ => match trie.try_lock(&req.lock_info) { _ => match trie.try_lock(&req.lock_info) {
Ok(()) => { Ok(()) => {
#[cfg(feature = "tracing")]
tracing::info!(
"Acquired: session {} - {} lock on {}",
&req.lock_info.handle_id.id,
&req.lock_info.ty,
&req.lock_info.ptr,
);
let lease = req.complete(); let lease = req.complete();
locks_on_lease.push(lease); locks_on_lease.push(lease);
} }
Err(blocking_sessions) => { Err(blocking_sessions) => {
#[cfg(feature = "tracing")]
{
tracing::info!(
"Deferred: session {} - {} lock on {}",
&req.lock_info.handle_id.id,
&req.lock_info.ty,
&req.lock_info.ptr,
);
tracing::info!(
"Must wait on sessions {}",
&display_session_set(&blocking_sessions)
);
}
request_queue.push_back((req, blocking_sessions)) request_queue.push_back((req, blocking_sessions))
} }
}, },
@@ -64,6 +108,8 @@ impl Locker {
tracing::trace!("Locker Action: {:#?}", action); tracing::trace!("Locker Action: {:#?}", action);
match action { match action {
Action::HandleRequest(mut req) => { Action::HandleRequest(mut req) => {
#[cfg(feature = "tracing")]
tracing::debug!("New lock request");
cancellations.extend(req.cancel.take()); cancellations.extend(req.cancel.take());
let hot_seat = request_queue.pop_front(); let hot_seat = request_queue.pop_front();
process_new_req( process_new_req(
@@ -79,38 +125,54 @@ impl Locker {
} }
Action::HandleRelease(lock_info) => { Action::HandleRelease(lock_info) => {
// release actual lock // release actual lock
if trie.try_unlock(&lock_info) { trie.unlock(&lock_info);
// try to pop off as many requests off the front of the queue as we can #[cfg(feature = "tracing")]
let mut hot_seat = None; {
while let Some((r, _)) = request_queue.pop_front() { tracing::info!(
match trie.try_lock(&r.lock_info) { "Released: session {} - {} lock on {} ",
Ok(()) => { &lock_info.handle_id.id,
let lease = r.complete(); &lock_info.ty,
locks_on_lease.push(lease); &lock_info.ptr,
} );
Err(new_blocking_sessions) => { tracing::debug!("Subtree sessions: {:?}", trie.subtree_sessions());
// set the hot seat and proceed to step two tracing::debug!("Processing request queue backlog");
hot_seat = Some((r, new_blocking_sessions)); }
break; // try to pop off as many requests off the front of the queue as we can
} let mut hot_seat = None;
while let Some((r, _)) = request_queue.pop_front() {
match trie.try_lock(&r.lock_info) {
Ok(()) => {
#[cfg(feature = "tracing")]
tracing::info!(
"Acquired: session {} - {} lock on {}",
&r.lock_info.handle_id.id,
&r.lock_info.ty,
&r.lock_info.ptr,
);
let lease = r.complete();
locks_on_lease.push(lease);
}
Err(new_blocking_sessions) => {
// set the hot seat and proceed to step two
hot_seat = Some((r, new_blocking_sessions));
break;
} }
} }
// when we can no longer do so, try and service the rest of the queue with the new hot seat }
let old_request_queue = request_queue; // when we can no longer do so, try and service the rest of the queue with the new hot seat
request_queue = VecDeque::new(); let old_request_queue = std::mem::take(&mut request_queue);
for (r, _) in old_request_queue { for (r, _) in old_request_queue {
// we now want to process each request in the queue as if it was new // we now want to process each request in the queue as if it was new
process_new_req( process_new_req(
hot_seat.as_ref(), hot_seat.as_ref(),
r, r,
&mut trie, &mut trie,
&mut locks_on_lease, &mut locks_on_lease,
&mut request_queue, &mut request_queue,
) )
} }
if let Some(hot_seat) = hot_seat { if let Some(hot_seat) = hot_seat {
request_queue.push_front(hot_seat); request_queue.push_front(hot_seat);
}
} }
} }
Action::HandleCancel(lock_info) => { Action::HandleCancel(lock_info) => {
@@ -127,7 +189,15 @@ impl Locker {
lock_info.ptr lock_info.ptr
); );
} }
Some((i, _)) => { #[allow(unused_variables)]
Some((i, (req, _))) => {
#[cfg(feature = "tracing")]
tracing::info!(
"Canceled: session {} - {} lock on {}",
&req.lock_info.handle_id.id,
&req.lock_info.ty,
&req.lock_info.ptr
);
request_queue.remove(i); request_queue.remove(i);
} }
} }
@@ -228,15 +298,19 @@ struct Trie {
children: BTreeMap<String, Trie>, children: BTreeMap<String, Trie>,
} }
impl Trie { impl Trie {
#[allow(dead_code)]
fn all<F: Fn(&LockState) -> bool>(&self, f: F) -> bool { fn all<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
f(&self.state) && self.children.values().all(|t| t.all(&f)) f(&self.state) && self.children.values().all(|t| t.all(&f))
} }
#[allow(dead_code)]
fn any<F: Fn(&LockState) -> bool>(&self, f: F) -> bool { fn any<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
f(&self.state) || self.children.values().any(|t| t.any(&f)) f(&self.state) || self.children.values().any(|t| t.any(&f))
} }
#[allow(dead_code)]
fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool { fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool {
self.all(|s| s.sessions().difference(ordset![session]).is_empty()) self.all(|s| s.sessions().difference(ordset![session]).is_empty())
} }
#[allow(dead_code)]
fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool { fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool {
self.all(|s| match s.clone().erase(session) { self.all(|s| match s.clone().erase(session) {
LockState::Exclusive { .. } => false, LockState::Exclusive { .. } => false,
@@ -278,6 +352,7 @@ impl Trie {
} }
} }
// no writes in ancestor set, no writes at node // no writes in ancestor set, no writes at node
#[allow(dead_code)]
fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool { fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
let (v, t) = self.ancestors_and_trie(ptr); let (v, t) = self.ancestors_and_trie(ptr);
let ancestor_write_free = v let ancestor_write_free = v
@@ -288,6 +363,7 @@ impl Trie {
ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free()) ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free())
} }
// no writes in ancestor set, no writes in subtree // no writes in ancestor set, no writes in subtree
#[allow(dead_code)]
fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool { fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
let (v, t) = self.ancestors_and_trie(ptr); let (v, t) = self.ancestors_and_trie(ptr);
let ancestor_write_free = v let ancestor_write_free = v
@@ -298,6 +374,7 @@ impl Trie {
ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session)) ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session))
} }
// no reads or writes in ancestor set, no locks in subtree // no reads or writes in ancestor set, no locks in subtree
#[allow(dead_code)]
fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool { fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
let (v, t) = self.ancestors_and_trie(ptr); let (v, t) = self.ancestors_and_trie(ptr);
let ancestor_rw_free = v let ancestor_rw_free = v
@@ -313,14 +390,13 @@ impl Trie {
ptr: &JsonPointer, ptr: &JsonPointer,
session: &HandleId, session: &HandleId,
) -> Option<&'a HandleId> { ) -> Option<&'a HandleId> {
let (v, _t) = self.ancestors_and_trie(ptr); let (v, t) = self.ancestors_and_trie(ptr);
v.into_iter().find_map(|s| s.write_session()).and_then(|h| { // there can only be one write session per traversal
if h == session { let ancestor_write = v.into_iter().find_map(|s| s.write_session());
None let node_write = t.and_then(|t| t.state.write_session());
} else { ancestor_write
Some(h) .or(node_write)
} .and_then(|s| if s == session { None } else { Some(s) })
})
} }
// ancestors with writes, subtrees with writes // ancestors with writes, subtrees with writes
fn sessions_blocking_read<'a>( fn sessions_blocking_read<'a>(
@@ -386,20 +462,20 @@ impl Trie {
Err(blocking_sessions.into_iter().cloned().collect()) Err(blocking_sessions.into_iter().cloned().collect())
} else { } else {
drop(blocking_sessions); drop(blocking_sessions);
self.child_mut(&lock_info.ptr) let success = self
.child_mut(&lock_info.ptr)
.state .state
.lock(lock_info.handle_id.clone(), &lock_info.ty); .try_lock(lock_info.handle_id.clone(), &lock_info.ty);
assert!(success);
Ok(()) Ok(())
} }
} }
fn try_unlock(&mut self, lock_info: &LockInfo) -> bool { fn unlock(&mut self, lock_info: &LockInfo) {
let t = self.child_mut(&lock_info.ptr); let t = self.child_mut(&lock_info.ptr);
let success = t.state.unlock(&lock_info.handle_id, &lock_info.ty); let success = t.state.try_unlock(&lock_info.handle_id, &lock_info.ty);
if success { assert!(success);
self.prune(); self.prune();
}
success
} }
fn prunable(&self) -> bool { fn prunable(&self) -> bool {
@@ -431,10 +507,10 @@ impl Natural {
self.0 += 1; self.0 += 1;
} }
fn dec(mut self) -> Option<Natural> { fn dec(mut self) -> Option<Natural> {
self.0 -= 1;
if self.0 == 0 { if self.0 == 0 {
None None
} else { } else {
self.0 -= 1;
Some(self) Some(self)
} }
} }
@@ -506,6 +582,7 @@ impl LockState {
LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], LockState::Exclusive { w_lessee, .. } => ordset![w_lessee],
} }
} }
#[allow(dead_code)]
fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
match self { match self {
LockState::Free => OrdSet::new(), LockState::Free => OrdSet::new(),
@@ -546,11 +623,23 @@ impl LockState {
_ => None, _ => None,
} }
} }
fn normalize(&mut self) {
match &*self {
LockState::Shared {
e_lessees,
r_lessees,
} if e_lessees.is_empty() && r_lessees.is_empty() => {
*self = LockState::Free;
}
_ => {}
}
}
// note this is not necessarily safe in the overall trie locking model // note this is not necessarily safe in the overall trie locking model
// this function will return true if the state changed as a result of the call // this function will return true if the state changed as a result of the call
// if it returns false it technically means that the call was invalid and did not // if it returns false it technically means that the call was invalid and did not
// change the lock state at all // change the lock state at all
fn lock(&mut self, session: HandleId, typ: &LockType) -> bool { fn try_lock(&mut self, session: HandleId, typ: &LockType) -> bool {
match (&mut *self, typ) { match (&mut *self, typ) {
(LockState::Free, LockType::Exist) => { (LockState::Free, LockType::Exist) => {
*self = LockState::Shared { *self = LockState::Shared {
@@ -665,31 +754,39 @@ impl LockState {
// there are many ways for this function to be called in an invalid way: Notably releasing locks that you never // there are many ways for this function to be called in an invalid way: Notably releasing locks that you never
// had to begin with. // had to begin with.
fn unlock(&mut self, session: &HandleId, typ: &LockType) -> bool { fn try_unlock(&mut self, session: &HandleId, typ: &LockType) -> bool {
match (&mut *self, typ) { match (&mut *self, typ) {
(LockState::Free, _) => false, (LockState::Free, _) => false,
(LockState::Shared { e_lessees, .. }, LockType::Exist) => { (LockState::Shared { e_lessees, .. }, LockType::Exist) => {
match e_lessees.remove_entry(session) { match e_lessees.remove_entry(session) {
None => false, None => false,
Some((k, v)) => match v.dec() { Some((k, v)) => {
None => true, match v.dec() {
Some(n) => { None => {
e_lessees.insert(k, n); self.normalize();
true }
Some(n) => {
e_lessees.insert(k, n);
}
} }
}, true
}
} }
} }
(LockState::Shared { r_lessees, .. }, LockType::Read) => { (LockState::Shared { r_lessees, .. }, LockType::Read) => {
match r_lessees.remove_entry(session) { match r_lessees.remove_entry(session) {
None => false, None => false,
Some((k, v)) => match v.dec() { Some((k, v)) => {
None => true, match v.dec() {
Some(n) => { None => {
r_lessees.insert(k, n); self.normalize();
true }
Some(n) => {
r_lessees.insert(k, n);
}
} }
}, true
}
} }
} }
(LockState::Shared { .. }, LockType::Write) => false, (LockState::Shared { .. }, LockType::Write) => false,
@@ -747,6 +844,7 @@ impl LockState {
e_lessees, e_lessees,
r_lessees, r_lessees,
}; };
self.normalize();
} }
Some(n) => *w_session_count = n, Some(n) => *w_session_count = n,
} }
@@ -801,6 +899,16 @@ impl Default for LockType {
LockType::Exist LockType::Exist
} }
} }
impl std::fmt::Display for LockType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let show = match self {
LockType::Exist => "E",
LockType::Read => "R",
LockType::Write => "W",
};
write!(f, "{}", show)
}
}
#[derive(Debug)] #[derive(Debug)]
struct Request { struct Request {
@@ -840,3 +948,24 @@ impl Drop for Guard {
} }
} }
} }
#[cfg(test)]
fn lock_type_gen() -> BoxedStrategy<crate::LockType> {
proptest::prop_oneof![
proptest::strategy::Just(crate::LockType::Exist),
proptest::strategy::Just(crate::LockType::Read),
proptest::strategy::Just(crate::LockType::Write),
]
.boxed()
}
#[cfg(test)]
proptest! {
#[test]
fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) {
let mut orig = LockState::Free;
orig.try_lock(HandleId{id: session }, &typ);
orig.try_unlock(&HandleId{id: session }, &typ);
prop_assert_eq!(orig, LockState::Free);
}
}