From e95e97e96de2b476de847cb151f00debc55bff8f Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Wed, 15 Dec 2021 09:53:31 -0700 Subject: [PATCH] fixed bugs in locking exclusion logic --- json-ptr | 2 +- patch-db/src/locker.rs | 255 +++++++++++++++++++++++++++++++---------- 2 files changed, 193 insertions(+), 64 deletions(-) diff --git a/json-ptr b/json-ptr index 54be8bc..f1d671b 160000 --- a/json-ptr +++ b/json-ptr @@ -1 +1 @@ -Subproject commit 54be8bcf512d9da90ee754d79b57200bfaceb4f1 +Subproject commit f1d671bd5194a99fcfee918b7c568355c2c3459a diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index 24a3907..d63ac9d 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -2,10 +2,11 @@ use std::collections::{BTreeMap, VecDeque}; use imbl::{ordset, OrdSet}; use json_ptr::{JsonPointer, SegList}; +#[cfg(test)] +use proptest::prelude::*; use tokio::sync::{mpsc, oneshot}; use crate::handle::HandleId; - pub struct Locker { sender: mpsc::UnboundedSender, } @@ -29,6 +30,16 @@ impl Locker { while let Some(action) = get_action(&mut new_requests, &mut locks_on_lease, &mut cancellations).await { + #[cfg(feature = "tracing")] + fn display_session_set(set: &OrdSet) -> String { + use std::fmt::Write; + let mut display = String::from("{"); + for session in set.iter() { + write!(display, "{},", session.id); + } + display.replace_range(display.len() - 1.., "}"); + display + } // to prevent starvation we privilege the front of the queue and only allow requests that // conflict with the request at the front to go through if they are requested by sessions that // are *currently blocking* the front of the queue @@ -46,15 +57,48 @@ impl Locker { if hot_req.lock_info.conflicts_with(&req.lock_info) && !hot_blockers.contains(&req.lock_info.handle_id) => { + #[cfg(feature = "tracing")] + { + tracing::info!( + "Deferred: session {} - {} lock on {}", + &req.lock_info.handle_id.id, + &req.lock_info.ty, + &req.lock_info.ptr, + ); + tracing::info!( + "Must wait on hot seat request from session {}", + &hot_req.lock_info.handle_id.id + ); + } request_queue.push_back((req, OrdSet::new())) } // otherwise we try and service it immediately, only pushing to the queue if it fails _ => match trie.try_lock(&req.lock_info) { Ok(()) => { + #[cfg(feature = "tracing")] + tracing::info!( + "Acquired: session {} - {} lock on {}", + &req.lock_info.handle_id.id, + &req.lock_info.ty, + &req.lock_info.ptr, + ); let lease = req.complete(); locks_on_lease.push(lease); } Err(blocking_sessions) => { + #[cfg(feature = "tracing")] + { + tracing::info!( + "Deferred: session {} - {} lock on {}", + &req.lock_info.handle_id.id, + &req.lock_info.ty, + &req.lock_info.ptr, + ); + tracing::info!( + "Must wait on sessions {}", + &display_session_set(&blocking_sessions) + ); + } request_queue.push_back((req, blocking_sessions)) } }, @@ -64,6 +108,8 @@ impl Locker { tracing::trace!("Locker Action: {:#?}", action); match action { Action::HandleRequest(mut req) => { + #[cfg(feature = "tracing")] + tracing::debug!("New lock request"); cancellations.extend(req.cancel.take()); let hot_seat = request_queue.pop_front(); process_new_req( @@ -79,38 +125,54 @@ impl Locker { } Action::HandleRelease(lock_info) => { // release actual lock - if trie.try_unlock(&lock_info) { - // try to pop off as many requests off the front of the queue as we can - let mut hot_seat = None; - while let Some((r, _)) = request_queue.pop_front() { - match trie.try_lock(&r.lock_info) { - Ok(()) => { - let lease = r.complete(); - locks_on_lease.push(lease); - } - Err(new_blocking_sessions) => { - // set the hot seat and proceed to step two - hot_seat = Some((r, new_blocking_sessions)); - break; - } + trie.unlock(&lock_info); + #[cfg(feature = "tracing")] + { + tracing::info!( + "Released: session {} - {} lock on {} ", + &lock_info.handle_id.id, + &lock_info.ty, + &lock_info.ptr, + ); + tracing::debug!("Subtree sessions: {:?}", trie.subtree_sessions()); + tracing::debug!("Processing request queue backlog"); + } + // try to pop off as many requests off the front of the queue as we can + let mut hot_seat = None; + while let Some((r, _)) = request_queue.pop_front() { + match trie.try_lock(&r.lock_info) { + Ok(()) => { + #[cfg(feature = "tracing")] + tracing::info!( + "Acquired: session {} - {} lock on {}", + &r.lock_info.handle_id.id, + &r.lock_info.ty, + &r.lock_info.ptr, + ); + let lease = r.complete(); + locks_on_lease.push(lease); + } + Err(new_blocking_sessions) => { + // set the hot seat and proceed to step two + hot_seat = Some((r, new_blocking_sessions)); + break; } } - // when we can no longer do so, try and service the rest of the queue with the new hot seat - let old_request_queue = request_queue; - request_queue = VecDeque::new(); - for (r, _) in old_request_queue { - // we now want to process each request in the queue as if it was new - process_new_req( - hot_seat.as_ref(), - r, - &mut trie, - &mut locks_on_lease, - &mut request_queue, - ) - } - if let Some(hot_seat) = hot_seat { - request_queue.push_front(hot_seat); - } + } + // when we can no longer do so, try and service the rest of the queue with the new hot seat + let old_request_queue = std::mem::take(&mut request_queue); + for (r, _) in old_request_queue { + // we now want to process each request in the queue as if it was new + process_new_req( + hot_seat.as_ref(), + r, + &mut trie, + &mut locks_on_lease, + &mut request_queue, + ) + } + if let Some(hot_seat) = hot_seat { + request_queue.push_front(hot_seat); } } Action::HandleCancel(lock_info) => { @@ -127,7 +189,15 @@ impl Locker { lock_info.ptr ); } - Some((i, _)) => { + #[allow(unused_variables)] + Some((i, (req, _))) => { + #[cfg(feature = "tracing")] + tracing::info!( + "Canceled: session {} - {} lock on {}", + &req.lock_info.handle_id.id, + &req.lock_info.ty, + &req.lock_info.ptr + ); request_queue.remove(i); } } @@ -228,15 +298,19 @@ struct Trie { children: BTreeMap, } impl Trie { + #[allow(dead_code)] fn all bool>(&self, f: F) -> bool { f(&self.state) && self.children.values().all(|t| t.all(&f)) } + #[allow(dead_code)] fn any bool>(&self, f: F) -> bool { f(&self.state) || self.children.values().any(|t| t.any(&f)) } + #[allow(dead_code)] fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool { self.all(|s| s.sessions().difference(ordset![session]).is_empty()) } + #[allow(dead_code)] fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool { self.all(|s| match s.clone().erase(session) { LockState::Exclusive { .. } => false, @@ -278,6 +352,7 @@ impl Trie { } } // no writes in ancestor set, no writes at node + #[allow(dead_code)] fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool { let (v, t) = self.ancestors_and_trie(ptr); let ancestor_write_free = v @@ -288,6 +363,7 @@ impl Trie { ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free()) } // no writes in ancestor set, no writes in subtree + #[allow(dead_code)] fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool { let (v, t) = self.ancestors_and_trie(ptr); let ancestor_write_free = v @@ -298,6 +374,7 @@ impl Trie { ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session)) } // no reads or writes in ancestor set, no locks in subtree + #[allow(dead_code)] fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool { let (v, t) = self.ancestors_and_trie(ptr); let ancestor_rw_free = v @@ -313,14 +390,13 @@ impl Trie { ptr: &JsonPointer, session: &HandleId, ) -> Option<&'a HandleId> { - let (v, _t) = self.ancestors_and_trie(ptr); - v.into_iter().find_map(|s| s.write_session()).and_then(|h| { - if h == session { - None - } else { - Some(h) - } - }) + let (v, t) = self.ancestors_and_trie(ptr); + // there can only be one write session per traversal + let ancestor_write = v.into_iter().find_map(|s| s.write_session()); + let node_write = t.and_then(|t| t.state.write_session()); + ancestor_write + .or(node_write) + .and_then(|s| if s == session { None } else { Some(s) }) } // ancestors with writes, subtrees with writes fn sessions_blocking_read<'a>( @@ -386,20 +462,20 @@ impl Trie { Err(blocking_sessions.into_iter().cloned().collect()) } else { drop(blocking_sessions); - self.child_mut(&lock_info.ptr) + let success = self + .child_mut(&lock_info.ptr) .state - .lock(lock_info.handle_id.clone(), &lock_info.ty); + .try_lock(lock_info.handle_id.clone(), &lock_info.ty); + assert!(success); Ok(()) } } - fn try_unlock(&mut self, lock_info: &LockInfo) -> bool { + fn unlock(&mut self, lock_info: &LockInfo) { let t = self.child_mut(&lock_info.ptr); - let success = t.state.unlock(&lock_info.handle_id, &lock_info.ty); - if success { - self.prune(); - } - success + let success = t.state.try_unlock(&lock_info.handle_id, &lock_info.ty); + assert!(success); + self.prune(); } fn prunable(&self) -> bool { @@ -431,10 +507,10 @@ impl Natural { self.0 += 1; } fn dec(mut self) -> Option { + self.0 -= 1; if self.0 == 0 { None } else { - self.0 -= 1; Some(self) } } @@ -506,6 +582,7 @@ impl LockState { LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], } } + #[allow(dead_code)] fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { match self { LockState::Free => OrdSet::new(), @@ -546,11 +623,23 @@ impl LockState { _ => None, } } + + fn normalize(&mut self) { + match &*self { + LockState::Shared { + e_lessees, + r_lessees, + } if e_lessees.is_empty() && r_lessees.is_empty() => { + *self = LockState::Free; + } + _ => {} + } + } // note this is not necessarily safe in the overall trie locking model // this function will return true if the state changed as a result of the call // if it returns false it technically means that the call was invalid and did not // change the lock state at all - fn lock(&mut self, session: HandleId, typ: &LockType) -> bool { + fn try_lock(&mut self, session: HandleId, typ: &LockType) -> bool { match (&mut *self, typ) { (LockState::Free, LockType::Exist) => { *self = LockState::Shared { @@ -665,31 +754,39 @@ impl LockState { // there are many ways for this function to be called in an invalid way: Notably releasing locks that you never // had to begin with. - fn unlock(&mut self, session: &HandleId, typ: &LockType) -> bool { + fn try_unlock(&mut self, session: &HandleId, typ: &LockType) -> bool { match (&mut *self, typ) { (LockState::Free, _) => false, (LockState::Shared { e_lessees, .. }, LockType::Exist) => { match e_lessees.remove_entry(session) { None => false, - Some((k, v)) => match v.dec() { - None => true, - Some(n) => { - e_lessees.insert(k, n); - true + Some((k, v)) => { + match v.dec() { + None => { + self.normalize(); + } + Some(n) => { + e_lessees.insert(k, n); + } } - }, + true + } } } (LockState::Shared { r_lessees, .. }, LockType::Read) => { match r_lessees.remove_entry(session) { None => false, - Some((k, v)) => match v.dec() { - None => true, - Some(n) => { - r_lessees.insert(k, n); - true + Some((k, v)) => { + match v.dec() { + None => { + self.normalize(); + } + Some(n) => { + r_lessees.insert(k, n); + } } - }, + true + } } } (LockState::Shared { .. }, LockType::Write) => false, @@ -747,6 +844,7 @@ impl LockState { e_lessees, r_lessees, }; + self.normalize(); } Some(n) => *w_session_count = n, } @@ -801,6 +899,16 @@ impl Default for LockType { LockType::Exist } } +impl std::fmt::Display for LockType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let show = match self { + LockType::Exist => "E", + LockType::Read => "R", + LockType::Write => "W", + }; + write!(f, "{}", show) + } +} #[derive(Debug)] struct Request { @@ -840,3 +948,24 @@ impl Drop for Guard { } } } + +#[cfg(test)] +fn lock_type_gen() -> BoxedStrategy { + proptest::prop_oneof![ + proptest::strategy::Just(crate::LockType::Exist), + proptest::strategy::Just(crate::LockType::Read), + proptest::strategy::Just(crate::LockType::Write), + ] + .boxed() +} + +#[cfg(test)] +proptest! { + #[test] + fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) { + let mut orig = LockState::Free; + orig.try_lock(HandleId{id: session }, &typ); + orig.try_unlock(&HandleId{id: session }, &typ); + prop_assert_eq!(orig, LockState::Free); + } +}