diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index a38bfd1..f5a070b 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -12,6 +12,7 @@ version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +default = ["trace", "unstable"] debug = ["tracing"] trace = ["debug", "tracing-error"] unstable = [] diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index abd2958..1f98bc9 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -15,8 +15,6 @@ mod patch; mod store; mod transaction; -#[cfg(test)] -mod proptest; #[cfg(test)] mod test; diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs deleted file mode 100644 index e8d2d71..0000000 --- a/patch-db/src/locker.rs +++ /dev/null @@ -1,1360 +0,0 @@ -use std::collections::{BTreeMap, VecDeque}; - -use imbl::{ordmap, ordset, OrdMap, OrdSet}; -use json_ptr::{JsonPointer, SegList}; -#[cfg(test)] -use proptest::prelude::*; -use tokio::sync::{mpsc, oneshot}; - -use crate::handle::HandleId; -pub struct Locker { - sender: mpsc::UnboundedSender, -} -impl Locker { - pub fn new() -> Self { - let (sender, receiver) = mpsc::unbounded_channel(); - tokio::spawn(async move { - let mut trie = Trie::default(); - let mut new_requests = RequestQueue { - closed: false, - recv: receiver, - }; - // futures::future::select_all will panic if the list is empty - // instead we want it to block forever by adding a channel that will never recv - let (_dummy_send, dummy_recv) = oneshot::channel(); - let mut unlock_receivers = vec![dummy_recv]; - let (_dummy_send, dummy_recv) = oneshot::channel(); - let mut cancellations = vec![dummy_recv]; - - let mut request_queue = VecDeque::<(Request, OrdSet)>::new(); - let mut lock_order_enforcer = LockOrderEnforcer::new(); - - while let Some(action) = - get_action(&mut new_requests, &mut unlock_receivers, &mut cancellations).await - { - #[cfg(feature = "tracing")] - fn display_session_set(set: &OrdSet) -> String { - use std::fmt::Write; - let mut display = String::from("{"); - for session in set.iter() { - write!(display, "{},", session.id).unwrap(); - } - display.replace_range(display.len() - 1.., "}"); - display - } - // to prevent starvation we privilege the front of the queue and only allow requests that - // conflict with the request at the front to go through if they are requested by sessions that - // are *currently blocking* the front of the queue - fn process_new_req( - hot_seat: Option<&(Request, OrdSet)>, - req: Request, - trie: &mut Trie, - unlock_receivers: &mut Vec>, - request_queue: &mut VecDeque<(Request, OrdSet)>, - ) { - match hot_seat { - // hot seat conflicts and request session isn't in current blocking sessions - // so we push it to the queue - Some((hot_req, hot_blockers)) - if hot_req.lock_info.conflicts_with(&req.lock_info) - && !hot_blockers.contains(&req.lock_info.handle_id) => - { - #[cfg(feature = "tracing")] - { - tracing::info!( - "Deferred: session {} - {} lock on {}", - &req.lock_info.handle_id.id, - &req.lock_info.ty, - &req.lock_info.ptr, - ); - tracing::info!( - "Must wait on hot seat request from session {}", - &hot_req.lock_info.handle_id.id - ); - } - request_queue.push_back((req, OrdSet::new())); - process_deadlocks(request_queue, &*trie); - } - // otherwise we try and service it immediately, only pushing to the queue if it fails - _ => match trie.try_lock(&req.lock_info) { - Ok(()) => { - #[cfg(feature = "tracing")] - tracing::info!( - "Acquired: session {} - {} lock on {}", - &req.lock_info.handle_id.id, - &req.lock_info.ty, - &req.lock_info.ptr, - ); - let lease = req.complete(); - unlock_receivers.push(lease); - } - Err(blocking_sessions) => { - #[cfg(feature = "tracing")] - { - tracing::info!( - "Deferred: session {} - {} lock on {}", - &req.lock_info.handle_id.id, - &req.lock_info.ty, - &req.lock_info.ptr, - ); - tracing::info!( - "Must wait on sessions {}", - &display_session_set(&blocking_sessions) - ); - } - request_queue.push_back((req, blocking_sessions)); - process_deadlocks(request_queue, &*trie); - } - }, - } - } - #[cfg(feature = "tracing")] - tracing::trace!("Locker Action: {:#?}", action); - match action { - Action::HandleRequest(mut req) => { - #[cfg(feature = "tracing")] - tracing::debug!("New lock request"); - if let Err(e) = lock_order_enforcer.try_insert(&req.lock_info) { - req.reject(e) - } else { - cancellations.extend(req.cancel.take()); - let hot_seat = request_queue.pop_front(); - process_new_req( - hot_seat.as_ref(), - req, - &mut trie, - &mut unlock_receivers, - &mut request_queue, - ); - if let Some(hot_seat) = hot_seat { - request_queue.push_front(hot_seat); - } - } - } - Action::HandleRelease(lock_info) => { - // release actual lock - lock_order_enforcer.remove(&lock_info); - trie.unlock(&lock_info); - #[cfg(feature = "tracing")] - { - tracing::info!( - "Released: session {} - {} lock on {} ", - &lock_info.handle_id.id, - &lock_info.ty, - &lock_info.ptr, - ); - tracing::debug!("Subtree sessions: {:?}", trie.subtree_sessions()); - tracing::debug!("Processing request queue backlog"); - } - // try to pop off as many requests off the front of the queue as we can - let mut hot_seat = None; - while let Some((r, _)) = request_queue.pop_front() { - match trie.try_lock(&r.lock_info) { - Ok(()) => { - #[cfg(feature = "tracing")] - tracing::info!( - "Acquired: session {} - {} lock on {}", - &r.lock_info.handle_id.id, - &r.lock_info.ty, - &r.lock_info.ptr, - ); - let lease = r.complete(); - unlock_receivers.push(lease); - } - Err(new_blocking_sessions) => { - // set the hot seat and proceed to step two - hot_seat = Some((r, new_blocking_sessions)); - break; - } - } - } - // when we can no longer do so, try and service the rest of the queue with the new hot seat - let old_request_queue = std::mem::take(&mut request_queue); - for (r, _) in old_request_queue { - // we now want to process each request in the queue as if it was new - process_new_req( - hot_seat.as_ref(), - r, - &mut trie, - &mut unlock_receivers, - &mut request_queue, - ) - } - if let Some(hot_seat) = hot_seat { - request_queue.push_front(hot_seat); - } - } - Action::HandleCancel(lock_info) => { - lock_order_enforcer.remove(&lock_info); - let entry = request_queue - .iter() - .enumerate() - .find(|(_, (r, _))| r.lock_info == lock_info); - match entry { - None => { - #[cfg(feature = "tracing")] - tracing::warn!( - "Received cancellation for a lock not currently waiting: {}", - lock_info.ptr - ); - } - #[allow(unused_variables)] - Some((i, (req, _))) => { - #[cfg(feature = "tracing")] - tracing::info!( - "Canceled: session {} - {} lock on {}", - &req.lock_info.handle_id.id, - &req.lock_info.ty, - &req.lock_info.ptr - ); - request_queue.remove(i); - } - } - } - } - #[cfg(feature = "tracing")] - tracing::trace!("Locker Trie: {:#?}", trie); - } - }); - Locker { sender } - } - pub async fn lock( - &self, - handle_id: HandleId, - ptr: JsonPointer, - lock_type: LockType, - ) -> Result { - struct CancelGuard { - lock_info: Option, - channel: Option>, - recv: oneshot::Receiver>, - } - impl Drop for CancelGuard { - fn drop(&mut self) { - if let (Some(lock_info), Some(channel)) = - (self.lock_info.take(), self.channel.take()) - { - self.recv.close(); - let _ = channel.send(lock_info); - } - } - } - let lock_info = LockInfo { - handle_id, - ptr, - ty: lock_type, - }; - let (send, recv) = oneshot::channel(); - let (cancel_send, cancel_recv) = oneshot::channel(); - let mut cancel_guard = CancelGuard { - lock_info: Some(lock_info.clone()), - channel: Some(cancel_send), - recv, - }; - self.sender - .send(Request { - lock_info, - cancel: Some(cancel_recv), - completion: send, - }) - .unwrap(); - let res = (&mut cancel_guard.recv).await.unwrap(); - cancel_guard.channel.take(); - res - } -} - -struct RequestQueue { - closed: bool, - recv: mpsc::UnboundedReceiver, -} -fn deadlock_scan<'a>(queue: &'a VecDeque<(Request, OrdSet)>) -> Vec<&'a Request> { - let (wait_map, mut req_map) = queue - .iter() - .map(|(req, set)| ((&req.lock_info.handle_id, set, req))) - .fold( - ( - OrdMap::<&'a HandleId, &'a OrdSet>::new(), - OrdMap::<&'a HandleId, &'a Request>::new(), - ), - |(mut wmap, mut rmap), (id, wset, req)| { - ( - { - wmap.insert(id, wset); - wmap - }, - { - rmap.insert(id, req); - rmap - }, - ) - }, - ); - fn path_to<'a>( - graph: &OrdMap<&'a HandleId, &'a OrdSet>, - root: &'a HandleId, - node: &'a HandleId, - ) -> OrdSet<&'a HandleId> { - if node == root { - return ordset![root]; - } - match graph.get(node) { - None => ordset![], - Some(s) => s - .iter() - .find_map(|h| Some(path_to(graph, root, h)).filter(|s| s.is_empty())) - .map_or(ordset![], |mut s| { - s.insert(node); - s - }), - } - } - for (root, wait_set) in wait_map.iter() { - let cycle = wait_set - .iter() - .find_map(|start| Some(path_to(&wait_map, root, start)).filter(|s| s.is_empty())); - match cycle { - None => { - continue; - } - Some(c) => { - return c - .into_iter() - .map(|id| req_map.remove(id).unwrap()) - .collect(); - } - } - } - vec![] -} - -fn process_deadlocks(request_queue: &mut VecDeque<(Request, OrdSet)>, trie: &Trie) { - let deadlocked_reqs = deadlock_scan(request_queue); - if !deadlocked_reqs.is_empty() { - #[cfg(feature = "tracing")] - tracing::info!("Deadlock Detected: {:?}", deadlocked_reqs); - let locks_waiting = LockSet( - deadlocked_reqs - .iter() - .map(|r| r.lock_info.clone()) - .collect(), - ); - let err = LockError::DeadlockDetected { - locks_waiting, - locks_held: LockSet(trie.subtree_lock_info()), - }; - let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len()); - for (i, (req, _)) in request_queue.iter().enumerate() { - if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) { - indices_to_remove.push(i) - } - } - let old = std::mem::take(request_queue); - for (i, (r, s)) in old.into_iter().enumerate() { - if indices_to_remove.contains(&i) { - r.reject(err.clone()) - } else { - request_queue.push_back((r, s)) - } - } - } -} - -#[derive(Debug)] -enum Action { - HandleRequest(Request), - HandleRelease(LockInfo), - HandleCancel(LockInfo), -} -async fn get_action( - new_requests: &mut RequestQueue, - locks_on_lease: &mut Vec>, - cancellations: &mut Vec>, -) -> Option { - loop { - if new_requests.closed && locks_on_lease.len() == 1 && cancellations.len() == 1 { - return None; - } - tokio::select! { - a = new_requests.recv.recv() => { - if let Some(a) = a { - return Some(Action::HandleRequest(a)); - } else { - new_requests.closed = true; - } - } - (a, idx, _) = futures::future::select_all(locks_on_lease.iter_mut()) => { - locks_on_lease.swap_remove(idx); - return Some(Action::HandleRelease(a.unwrap())) - } - (a, idx, _) = futures::future::select_all(cancellations.iter_mut()) => { - cancellations.swap_remove(idx); - if let Ok(a) = a { - return Some(Action::HandleCancel(a)) - } - } - } - } -} - -struct LockOrderEnforcer { - locks_held: OrdMap>, -} -impl LockOrderEnforcer { - fn new() -> Self { - LockOrderEnforcer { - locks_held: ordmap! {}, - } - } - // locks must be acquired in lexicographic order for the pointer, and reverse order for type - fn validate(&self, req: &LockInfo) -> Result<(), LockError> { - // the following notation is used to denote an example sequence that can cause deadlocks - // - // Individual Lock Requests - // 1W/A/B - // |||> Node whose lock is being acquired: /A/B (strings prefixed by slashes, indicating descent path) - // ||> Type of Lock: W (E/R/W) - // |> Session Number: 1 (any natural number) - // - // Sequences - // LockRequest >> LockRequest - #[cfg(not(feature = "unstable"))] - return Ok(()); - #[cfg(feature = "unstable")] - match self.locks_held.get(&req.handle_id) { - None => Ok(()), - Some(m) => { - // quick accept - for (ptr, ty) in m.keys() { - let tmp = LockInfo { - ptr: ptr.clone(), - ty: *ty, - handle_id: req.handle_id.clone(), - }; - if tmp.implicitly_grants(req) { - return Ok(()); - } - } - let err = m.keys().find_map(|(ptr, ty)| match ptr.cmp(&req.ptr) { - std::cmp::Ordering::Less => { - if req.ptr.starts_with(ptr) - && req.ty == LockType::Write - && *ty == LockType::Read - { - // 1R/A >> 2R/A >> 1W/A/A >> 2W/A/B - Some(LockError::LockTypeEscalationImplicit { - session: req.handle_id.clone(), - first_ptr: ptr.clone(), - first_type: *ty, - second_ptr: req.ptr.clone(), - second_type: req.ty, - }) - } else { - None - } - } - std::cmp::Ordering::Equal => { - if req.ty > *ty { - // 1R/A >> 2R/A >> 1W/A >> 1W/A - Some(LockError::LockTypeEscalation { - session: req.handle_id.clone(), - ptr: ptr.clone(), - first: *ty, - second: req.ty, - }) - } else { - None - } - } - std::cmp::Ordering::Greater => Some(if ptr.starts_with(&req.ptr) { - // 1W/A/A >> 2W/A/B >> 1R/A >> 2R/A - LockError::LockTaxonomyEscalation { - session: req.handle_id.clone(), - first: ptr.clone(), - second: req.ptr.clone(), - } - } else { - // 1W/A >> 2W/B >> 1W/B >> 2W/A - LockError::NonCanonicalOrdering { - session: req.handle_id.clone(), - first: ptr.clone(), - second: req.ptr.clone(), - } - }), - }); - err.map_or(Ok(()), Err) - } - } - } - fn try_insert(&mut self, req: &LockInfo) -> Result<(), LockError> { - self.validate(req)?; - match self.locks_held.get_mut(&req.handle_id) { - None => { - self.locks_held.insert( - req.handle_id.clone(), - ordmap![(req.ptr.clone(), req.ty) => 1], - ); - } - Some(locks) => { - let k = (req.ptr.clone(), req.ty); - match locks.get_mut(&k) { - None => { - locks.insert(k, 1); - } - Some(n) => { - *n += 1; - } - } - } - } - Ok(()) - } - fn remove(&mut self, req: &LockInfo) { - match self.locks_held.remove_with_key(&req.handle_id) { - None => { - #[cfg(feature = "tracing")] - tracing::warn!("Invalid removal from session manager: {:?}", req); - } - Some((hdl, mut locks)) => { - let k = (req.ptr.clone(), req.ty); - match locks.remove_with_key(&k) { - None => { - #[cfg(feature = "tracing")] - tracing::warn!("Invalid removal from session manager: {:?}", req); - } - Some((k, n)) => { - if n - 1 > 0 { - locks.insert(k, n - 1); - } - } - } - if !locks.is_empty() { - self.locks_held.insert(hdl, locks); - } - } - } - } -} - -#[derive(Debug, Default)] -struct Trie { - state: LockState, - children: BTreeMap, -} -impl Trie { - #[allow(dead_code)] - fn all bool>(&self, f: F) -> bool { - f(&self.state) && self.children.values().all(|t| t.all(&f)) - } - #[allow(dead_code)] - fn any bool>(&self, f: F) -> bool { - f(&self.state) || self.children.values().any(|t| t.any(&f)) - } - #[allow(dead_code)] - fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool { - self.all(|s| s.sessions().difference(ordset![session]).is_empty()) - } - #[allow(dead_code)] - fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool { - self.all(|s| match s.clone().erase(session) { - LockState::Exclusive { .. } => false, - _ => true, - }) - } - fn subtree_write_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { - match &self.state { - LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], - _ => self - .children - .values() - .map(|t| t.subtree_write_sessions()) - .fold(OrdSet::new(), OrdSet::union), - } - } - fn subtree_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { - let children = self - .children - .values() - .map(Trie::subtree_sessions) - .fold(OrdSet::new(), OrdSet::union); - self.state.sessions().union(children) - } - fn subtree_lock_info<'a>(&'a self) -> OrdSet { - let mut acc = self - .children - .iter() - .map(|(s, t)| { - t.subtree_lock_info() - .into_iter() - .map(|mut i| LockInfo { - ty: i.ty, - handle_id: i.handle_id, - ptr: { - i.ptr.push_start(s); - i.ptr - }, - }) - .collect() - }) - .fold(ordset![], OrdSet::union); - let self_writes = self.state.write_session().map(|session| LockInfo { - handle_id: session.clone(), - ptr: JsonPointer::default(), - ty: LockType::Write, - }); - let self_reads = self - .state - .read_sessions() - .into_iter() - .map(|session| LockInfo { - handle_id: session.clone(), - ptr: JsonPointer::default(), - ty: LockType::Read, - }); - let self_exists = self - .state - .exist_sessions() - .into_iter() - .map(|session| LockInfo { - handle_id: session.clone(), - ptr: JsonPointer::default(), - ty: LockType::Exist, - }); - acc.extend(self_writes.into_iter().chain(self_reads).chain(self_exists)); - acc - } - fn ancestors_and_trie<'a, S: AsRef, V: SegList>( - &'a self, - ptr: &JsonPointer, - ) -> (Vec<&'a LockState>, Option<&'a Trie>) { - match ptr.uncons() { - None => (Vec::new(), Some(self)), - Some((first, rest)) => match self.children.get(first) { - None => (vec![&self.state], None), - Some(t) => { - let (mut v, t) = t.ancestors_and_trie(&rest); - v.push(&self.state); - (v, t) - } - }, - } - } - // no writes in ancestor set, no writes at node - #[allow(dead_code)] - fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool { - let (v, t) = self.ancestors_and_trie(ptr); - let ancestor_write_free = v - .into_iter() - .cloned() - .map(|s| s.erase(session)) - .all(|s| s.write_free()); - ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free()) - } - // no writes in ancestor set, no writes in subtree - #[allow(dead_code)] - fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool { - let (v, t) = self.ancestors_and_trie(ptr); - let ancestor_write_free = v - .into_iter() - .cloned() - .map(|s| s.erase(session)) - .all(|s| s.write_free()); - ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session)) - } - // no reads or writes in ancestor set, no locks in subtree - #[allow(dead_code)] - fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool { - let (v, t) = self.ancestors_and_trie(ptr); - let ancestor_rw_free = v - .into_iter() - .cloned() - .map(|s| s.erase(session)) - .all(|s| s.write_free() && s.read_free()); - ancestor_rw_free && t.map_or(true, |t| t.subtree_is_lock_free_for(session)) - } - // ancestors with writes and writes on the node - fn session_blocking_exist<'a>( - &'a self, - ptr: &JsonPointer, - session: &HandleId, - ) -> Option<&'a HandleId> { - let (v, t) = self.ancestors_and_trie(ptr); - // there can only be one write session per traversal - let ancestor_write = v.into_iter().find_map(|s| s.write_session()); - let node_write = t.and_then(|t| t.state.write_session()); - ancestor_write - .or(node_write) - .and_then(|s| if s == session { None } else { Some(s) }) - } - // ancestors with writes, subtrees with writes - fn sessions_blocking_read<'a>( - &'a self, - ptr: &JsonPointer, - session: &HandleId, - ) -> OrdSet<&'a HandleId> { - let (v, t) = self.ancestors_and_trie(ptr); - let ancestor_writes = v - .into_iter() - .map(|s| s.write_session().into_iter().collect::>()) - .fold(OrdSet::new(), OrdSet::union); - let relevant_write_sessions = match t { - None => ancestor_writes, - Some(t) => ancestor_writes.union(t.subtree_write_sessions()), - }; - relevant_write_sessions.without(session) - } - // ancestors with reads or writes, subtrees with anything - fn sessions_blocking_write<'a>( - &'a self, - ptr: &JsonPointer, - session: &HandleId, - ) -> OrdSet<&'a HandleId> { - let (v, t) = self.ancestors_and_trie(ptr); - let ancestors = v - .into_iter() - .map(|s| { - s.read_sessions() - .union(s.write_session().into_iter().collect()) - }) - .fold(OrdSet::new(), OrdSet::union); - let subtree = t.map_or(OrdSet::new(), |t| t.subtree_sessions()); - ancestors.union(subtree).without(session) - } - - fn child_mut, V: SegList>(&mut self, ptr: &JsonPointer) -> &mut Self { - match ptr.uncons() { - None => self, - Some((first, rest)) => { - if !self.children.contains_key(first) { - self.children.insert(first.to_owned(), Trie::default()); - } - self.children.get_mut(first).unwrap().child_mut(&rest) - } - } - } - - fn sessions_blocking_lock<'a>(&'a self, lock_info: &LockInfo) -> OrdSet<&'a HandleId> { - match &lock_info.ty { - LockType::Exist => self - .session_blocking_exist(&lock_info.ptr, &lock_info.handle_id) - .into_iter() - .collect(), - LockType::Read => self.sessions_blocking_read(&lock_info.ptr, &lock_info.handle_id), - LockType::Write => self.sessions_blocking_write(&lock_info.ptr, &lock_info.handle_id), - } - } - - fn try_lock<'a>(&'a mut self, lock_info: &LockInfo) -> Result<(), OrdSet> { - let blocking_sessions = self.sessions_blocking_lock(lock_info); - if !blocking_sessions.is_empty() { - Err(blocking_sessions.into_iter().cloned().collect()) - } else { - drop(blocking_sessions); - let success = self - .child_mut(&lock_info.ptr) - .state - .try_lock(lock_info.handle_id.clone(), &lock_info.ty); - assert!(success); - Ok(()) - } - } - - fn unlock(&mut self, lock_info: &LockInfo) { - let t = self.child_mut(&lock_info.ptr); - let success = t.state.try_unlock(&lock_info.handle_id, &lock_info.ty); - assert!(success); - self.prune(); - } - - fn prunable(&self) -> bool { - self.children.is_empty() && self.state == LockState::Free - } - - fn prune(&mut self) { - self.children.retain(|_, t| { - t.prune(); - !t.prunable() - }) - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct Natural(usize); -impl Natural { - fn one() -> Self { - Natural(1) - } - fn of(n: usize) -> Option { - if n == 0 { - None - } else { - Some(Natural(n)) - } - } - fn inc(&mut self) { - self.0 += 1; - } - fn dec(mut self) -> Option { - self.0 -= 1; - if self.0 == 0 { - None - } else { - Some(self) - } - } -} -#[derive(Debug, Clone, PartialEq, Eq)] -enum LockState { - Free, - Shared { - e_lessees: BTreeMap, - r_lessees: BTreeMap, - }, - Exclusive { - w_lessee: HandleId, - w_session_count: Natural, // should never be 0 - r_session_count: usize, - e_session_count: usize, - }, -} -impl LockState { - fn erase(self, session: &HandleId) -> LockState { - match self { - LockState::Free => LockState::Free, - LockState::Shared { - mut e_lessees, - mut r_lessees, - } => { - e_lessees.remove(session); - r_lessees.remove(session); - if e_lessees.is_empty() && r_lessees.is_empty() { - LockState::Free - } else { - LockState::Shared { - e_lessees, - r_lessees, - } - } - } - LockState::Exclusive { ref w_lessee, .. } => { - if w_lessee == session { - LockState::Free - } else { - self - } - } - } - } - fn write_free(&self) -> bool { - match self { - LockState::Exclusive { .. } => false, - _ => true, - } - } - fn read_free(&self) -> bool { - match self { - LockState::Exclusive { - r_session_count, .. - } => *r_session_count == 0, - LockState::Shared { r_lessees, .. } => r_lessees.is_empty(), - _ => true, - } - } - fn sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { - match self { - LockState::Free => OrdSet::new(), - LockState::Shared { - e_lessees, - r_lessees, - } => e_lessees.keys().chain(r_lessees.keys()).collect(), - LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], - } - } - #[allow(dead_code)] - fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { - match self { - LockState::Free => OrdSet::new(), - LockState::Shared { e_lessees, .. } => e_lessees.keys().collect(), - LockState::Exclusive { - w_lessee, - e_session_count, - .. - } => { - if *e_session_count > 0 { - ordset![w_lessee] - } else { - OrdSet::new() - } - } - } - } - fn read_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { - match self { - LockState::Free => OrdSet::new(), - LockState::Shared { r_lessees, .. } => r_lessees.keys().collect(), - LockState::Exclusive { - w_lessee, - r_session_count, - .. - } => { - if *r_session_count > 0 { - ordset![w_lessee] - } else { - OrdSet::new() - } - } - } - } - fn write_session<'a>(&'a self) -> Option<&'a HandleId> { - match self { - LockState::Exclusive { w_lessee, .. } => Some(w_lessee), - _ => None, - } - } - - fn normalize(&mut self) { - match &*self { - LockState::Shared { - e_lessees, - r_lessees, - } if e_lessees.is_empty() && r_lessees.is_empty() => { - *self = LockState::Free; - } - _ => {} - } - } - // note this is not necessarily safe in the overall trie locking model - // this function will return true if the state changed as a result of the call - // if it returns false it technically means that the call was invalid and did not - // change the lock state at all - fn try_lock(&mut self, session: HandleId, typ: &LockType) -> bool { - match (&mut *self, typ) { - (LockState::Free, LockType::Exist) => { - *self = LockState::Shared { - e_lessees: [(session, Natural::one())].into(), - r_lessees: BTreeMap::new(), - }; - true - } - (LockState::Free, LockType::Read) => { - *self = LockState::Shared { - e_lessees: BTreeMap::new(), - r_lessees: [(session, Natural::one())].into(), - }; - true - } - (LockState::Free, LockType::Write) => { - *self = LockState::Exclusive { - w_lessee: session, - w_session_count: Natural::one(), - r_session_count: 0, - e_session_count: 0, - }; - true - } - (LockState::Shared { e_lessees, .. }, LockType::Exist) => { - match e_lessees.get_mut(&session) { - None => { - e_lessees.insert(session, Natural::one()); - } - Some(v) => v.inc(), - }; - true - } - (LockState::Shared { r_lessees, .. }, LockType::Read) => { - match r_lessees.get_mut(&session) { - None => { - r_lessees.insert(session, Natural::one()); - } - Some(v) => v.inc(), - } - true - } - ( - LockState::Shared { - e_lessees, - r_lessees, - }, - LockType::Write, - ) => { - for hdl in e_lessees.keys() { - if hdl != &session { - return false; - } - } - for hdl in r_lessees.keys() { - if hdl != &session { - return false; - } - } - *self = LockState::Exclusive { - r_session_count: r_lessees.remove(&session).map_or(0, |x| x.0), - e_session_count: e_lessees.remove(&session).map_or(0, |x| x.0), - w_lessee: session, - w_session_count: Natural::one(), - }; - true - } - ( - LockState::Exclusive { - w_lessee, - e_session_count, - .. - }, - LockType::Exist, - ) => { - if w_lessee != &session { - return false; - } - *e_session_count += 1; - true - } - ( - LockState::Exclusive { - w_lessee, - r_session_count, - .. - }, - LockType::Read, - ) => { - if w_lessee != &session { - return false; - } - *r_session_count += 1; - true - } - ( - LockState::Exclusive { - w_lessee, - w_session_count, - .. - }, - LockType::Write, - ) => { - if w_lessee != &session { - return false; - } - w_session_count.inc(); - true - } - } - } - - // there are many ways for this function to be called in an invalid way: Notably releasing locks that you never - // had to begin with. - fn try_unlock(&mut self, session: &HandleId, typ: &LockType) -> bool { - match (&mut *self, typ) { - (LockState::Free, _) => false, - (LockState::Shared { e_lessees, .. }, LockType::Exist) => { - match e_lessees.remove_entry(session) { - None => false, - Some((k, v)) => { - match v.dec() { - None => { - self.normalize(); - } - Some(n) => { - e_lessees.insert(k, n); - } - } - true - } - } - } - (LockState::Shared { r_lessees, .. }, LockType::Read) => { - match r_lessees.remove_entry(session) { - None => false, - Some((k, v)) => { - match v.dec() { - None => { - self.normalize(); - } - Some(n) => { - r_lessees.insert(k, n); - } - } - true - } - } - } - (LockState::Shared { .. }, LockType::Write) => false, - ( - LockState::Exclusive { - w_lessee, - e_session_count, - .. - }, - LockType::Exist, - ) => { - if w_lessee != session || *e_session_count == 0 { - return false; - } - *e_session_count -= 1; - true - } - ( - LockState::Exclusive { - w_lessee, - r_session_count, - .. - }, - LockType::Read, - ) => { - if w_lessee != session || *r_session_count == 0 { - return false; - } - *r_session_count -= 1; - true - } - ( - LockState::Exclusive { - w_lessee, - w_session_count, - r_session_count, - e_session_count, - }, - LockType::Write, - ) => { - if w_lessee != session { - return false; - } - match w_session_count.dec() { - None => { - let mut e_lessees = BTreeMap::new(); - if let Some(n) = Natural::of(*e_session_count) { - e_lessees.insert(session.clone(), n); - } - let mut r_lessees = BTreeMap::new(); - if let Some(n) = Natural::of(*r_session_count) { - r_lessees.insert(session.clone(), n); - } - *self = LockState::Shared { - e_lessees, - r_lessees, - }; - self.normalize(); - } - Some(n) => *w_session_count = n, - } - true - } - } - } -} -impl Default for LockState { - fn default() -> Self { - LockState::Free - } -} - -#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] -struct LockInfo { - handle_id: HandleId, - ptr: JsonPointer, - ty: LockType, -} -impl LockInfo { - fn conflicts_with(&self, other: &LockInfo) -> bool { - self.handle_id != other.handle_id - && match (self.ty, other.ty) { - (LockType::Exist, LockType::Exist) => false, - (LockType::Exist, LockType::Read) => false, - (LockType::Exist, LockType::Write) => self.ptr.starts_with(&other.ptr), - (LockType::Read, LockType::Exist) => false, - (LockType::Read, LockType::Read) => false, - (LockType::Read, LockType::Write) => { - self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) - } - (LockType::Write, LockType::Exist) => other.ptr.starts_with(&self.ptr), - (LockType::Write, LockType::Read) => { - self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) - } - (LockType::Write, LockType::Write) => { - self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) - } - } - } - fn implicitly_grants(&self, other: &LockInfo) -> bool { - self.handle_id == other.handle_id - && match self.ty { - LockType::Exist => other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr), - LockType::Read => { - // E's in the ancestry - other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr) - // nonexclusive locks in the subtree - || other.ty != LockType::Write && other.ptr.starts_with(&self.ptr) - } - LockType::Write => { - // E's in the ancestry - other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr) - // anything in the subtree - || other.ptr.starts_with(&self.ptr) - } - } - } -} -impl std::fmt::Display for LockInfo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}{}{}", self.handle_id.id, self.ty, self.ptr) - } -} - -#[derive(Debug, Clone)] -pub struct LockSet(OrdSet); -impl std::fmt::Display for LockSet { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let by_session = self - .0 - .iter() - .map(|i| (&i.handle_id, ordset![(&i.ptr, &i.ty)])) - .fold( - ordmap! {}, - |m: OrdMap<&HandleId, OrdSet<(&JsonPointer, &LockType)>>, (id, s)| { - m.update_with(&id, s, OrdSet::union) - }, - ); - let num_sessions = by_session.len(); - for (i, (session, set)) in by_session.into_iter().enumerate() { - write!(f, "{}: {{ ", session.id)?; - let num_entries = set.len(); - for (j, (ptr, ty)) in set.into_iter().enumerate() { - write!(f, "{}{}", ty, ptr)?; - if j == num_entries - 1 { - write!(f, " }}")?; - } else { - write!(f, ", ")?; - } - } - if i != num_sessions - 1 { - write!(f, "\n")?; - } - } - Ok(()) - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum LockType { - Exist, - Read, - Write, -} -impl Default for LockType { - fn default() -> Self { - LockType::Exist - } -} -impl std::fmt::Display for LockType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let show = match self { - LockType::Exist => "E", - LockType::Read => "R", - LockType::Write => "W", - }; - write!(f, "{}", show) - } -} - -#[derive(Debug, Clone, thiserror::Error)] -pub enum LockError { - #[error("Lock Taxonomy Escalation: Session = {session:?}, First = {first}, Second = {second}")] - LockTaxonomyEscalation { - session: HandleId, - first: JsonPointer, - second: JsonPointer, - }, - #[error("Lock Type Escalation: Session = {session:?}, Pointer = {ptr}, First = {first}, Second = {second}")] - LockTypeEscalation { - session: HandleId, - ptr: JsonPointer, - first: LockType, - second: LockType, - }, - #[error("Lock Type Escalation Implicit: Session = {session:?}, First = {first_ptr}:{first_type}, Second = {second_ptr}:{second_type}")] - LockTypeEscalationImplicit { - session: HandleId, - first_ptr: JsonPointer, - first_type: LockType, - second_ptr: JsonPointer, - second_type: LockType, - }, - #[error( - "Non-Canonical Lock Ordering: Session = {session:?}, First = {first}, Second = {second}" - )] - NonCanonicalOrdering { - session: HandleId, - first: JsonPointer, - second: JsonPointer, - }, - #[error("Deadlock Detected: Locks Held = {locks_held}, Locks Waiting = {locks_waiting}")] - DeadlockDetected { - locks_held: LockSet, - locks_waiting: LockSet, - }, -} - -#[derive(Debug)] -struct Request { - lock_info: LockInfo, - cancel: Option>, - completion: oneshot::Sender>, -} -impl Request { - fn complete(self) -> oneshot::Receiver { - let (sender, receiver) = oneshot::channel(); - if let Err(_) = self.completion.send(Ok(Guard { - lock_info: self.lock_info, - sender: Some(sender), - })) { - #[cfg(feature = "tracing")] - tracing::warn!("Completion sent to closed channel.") - } - receiver - } - fn reject(self, err: LockError) { - if let Err(_) = self.completion.send(Err(err)) { - #[cfg(feature = "tracing")] - tracing::warn!("Rejection sent to closed channel.") - } - } -} - -#[derive(Debug)] -pub struct Guard { - lock_info: LockInfo, - sender: Option>, -} -impl Drop for Guard { - fn drop(&mut self) { - if let Err(_e) = self - .sender - .take() - .unwrap() - .send(std::mem::take(&mut self.lock_info)) - { - #[cfg(feature = "tracing")] - tracing::warn!("Failed to release lock: {:?}", _e) - } - } -} - -#[cfg(test)] -fn lock_type_gen() -> BoxedStrategy { - proptest::prop_oneof![ - proptest::strategy::Just(crate::LockType::Exist), - proptest::strategy::Just(crate::LockType::Read), - proptest::strategy::Just(crate::LockType::Write), - ] - .boxed() -} - -#[cfg(test)] -proptest! { - #[test] - fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) { - let mut orig = LockState::Free; - orig.try_lock(HandleId{id: session}, &typ); - orig.try_unlock(&HandleId{id: session}, &typ); - prop_assert_eq!(orig, LockState::Free); - } -} diff --git a/patch-db/src/locker/action_mux.rs b/patch-db/src/locker/action_mux.rs new file mode 100644 index 0000000..6367313 --- /dev/null +++ b/patch-db/src/locker/action_mux.rs @@ -0,0 +1,79 @@ +use tokio::sync::{ + mpsc::{self, UnboundedReceiver}, + oneshot, +}; + +use super::{LockInfo, Request}; + +#[derive(Debug)] +pub(super) enum Action { + HandleRequest(Request), + HandleRelease(LockInfo), + HandleCancel(LockInfo), +} + +struct InboundRequestQueue { + closed: bool, + recv: mpsc::UnboundedReceiver, +} +pub(super) struct ActionMux { + inbound_request_queue: InboundRequestQueue, + unlock_receivers: Vec>, + cancellation_receivers: Vec>, +} +impl ActionMux { + pub fn new(inbound_receiver: UnboundedReceiver) -> Self { + // futures::future::select_all will panic if the list is empty + // instead we want it to block forever by adding a channel that will never recv + let unlock_receivers = vec![oneshot::channel().1]; + let cancellation_receivers = vec![oneshot::channel().1]; + ActionMux { + inbound_request_queue: InboundRequestQueue { + recv: inbound_receiver, + closed: false, + }, + unlock_receivers, + cancellation_receivers, + } + } + pub async fn get_action(&mut self) -> Option { + loop { + if self.inbound_request_queue.closed + && self.unlock_receivers.len() == 1 + && self.cancellation_receivers.len() == 1 + { + return None; + } + tokio::select! { + a = self.inbound_request_queue.recv.recv() => { + if let Some(a) = a { + return Some(Action::HandleRequest(a)); + } else { + self.inbound_request_queue.closed = true; + } + } + (a, idx, _) = futures::future::select_all(self.unlock_receivers.iter_mut()) => { + self.unlock_receivers.swap_remove(idx); + return Some(Action::HandleRelease(a.unwrap())) + } + (a, idx, _) = futures::future::select_all(self.cancellation_receivers.iter_mut()) => { + self.cancellation_receivers.swap_remove(idx); + if let Ok(a) = a { + return Some(Action::HandleCancel(a)) + } + } + } + } + } + + pub fn push_unlock_receivers>>( + &mut self, + recv: T, + ) { + self.unlock_receivers.extend(recv) + } + + pub fn push_cancellation_receiver(&mut self, recv: oneshot::Receiver) { + self.cancellation_receivers.push(recv) + } +} diff --git a/patch-db/src/locker/bookkeeper.rs b/patch-db/src/locker/bookkeeper.rs new file mode 100644 index 0000000..84991c8 --- /dev/null +++ b/patch-db/src/locker/bookkeeper.rs @@ -0,0 +1,278 @@ +use std::collections::VecDeque; + +use crate::{ + handle::HandleId, + locker::{ + log_utils::{display_session_set, fmt_acquired, fmt_cancelled, fmt_deferred, fmt_released}, + LockSet, + }, +}; +use imbl::{ordset, OrdMap, OrdSet}; +use tokio::sync::oneshot; +use tracing::{debug, error, info, warn}; + +use super::{order_enforcer::LockOrderEnforcer, trie::LockTrie, LockError, LockInfo, Request}; + +// solely responsible for managing the bookkeeping requirements of requests +pub(super) struct LockBookkeeper { + trie: LockTrie, + deferred_request_queue: VecDeque<(Request, OrdSet)>, + #[cfg(feature = "unstable")] + order_enforcer: LockOrderEnforcer, +} +impl LockBookkeeper { + pub fn new() -> Self { + LockBookkeeper { + trie: LockTrie::default(), + deferred_request_queue: VecDeque::new(), + #[cfg(feature = "unstable")] + order_enforcer: LockOrderEnforcer::new(), + } + } + + pub fn lease( + &mut self, + req: Request, + ) -> Result>, LockError> { + #[cfg(feature = "unstable")] + if let Err(e) = self.order_enforcer.try_insert(&req.lock_info) { + req.reject(e.clone()); + return Err(e); + } + + // In normal operation we start here + let hot_seat = self.deferred_request_queue.pop_front(); + let res = process_new_req( + req, + hot_seat.as_ref(), + &mut self.trie, + &mut self.deferred_request_queue, + ); + + if let Some(hot_seat) = hot_seat { + self.deferred_request_queue.push_front(hot_seat); + } + Ok(res) + } + + pub fn cancel(&mut self, info: &LockInfo) { + #[cfg(feature = "unstable")] + self.order_enforcer.remove(&info); + + let entry = self + .deferred_request_queue + .iter() + .enumerate() + .find(|(_, (r, _))| &r.lock_info == info); + match entry { + None => { + #[cfg(feature = "tracing")] + warn!( + "Received cancellation for a lock not currently waiting: {}", + info.ptr + ); + } + Some((i, (req, _))) => { + #[cfg(feature = "tracing")] + info!("{}", fmt_cancelled(&req.lock_info)); + + self.deferred_request_queue.remove(i); + } + } + } + + pub fn ret(&mut self, info: &LockInfo) -> Vec> { + #[cfg(feature = "unstable")] + self.order_enforcer.remove(&info); + self.trie.unlock(&info); + + #[cfg(feature = "tracing")] + { + info!("{}", fmt_released(&info)); + debug!("Reexamining request queue backlog..."); + } + + // try to pop off as many requests off the front of the queue as we can + let mut new_unlock_receivers = vec![]; + let mut hot_seat = None; + while let Some((r, _)) = self.deferred_request_queue.pop_front() { + match self.trie.try_lock(&r.lock_info) { + Ok(()) => { + let recv = r.complete(); + new_unlock_receivers.push(recv); + } + Err(new_blocking_sessions) => { + // set the hot seat and proceed to step two + hot_seat = Some((r, new_blocking_sessions)); + break; + } + } + } + // when we can no longer do so, try and service the rest of the queue with the new hot seat + let old_request_queue = std::mem::take(&mut self.deferred_request_queue); + for (r, _) in old_request_queue { + // we now want to process each request in the queue as if it was new + let res = process_new_req( + r, + hot_seat.as_ref(), + &mut self.trie, + &mut self.deferred_request_queue, + ); + if let Some(recv) = res { + new_unlock_receivers.push(recv); + } + } + if let Some(hot_seat) = hot_seat { + self.deferred_request_queue.push_front(hot_seat); + } + new_unlock_receivers + } +} + +// to prevent starvation we privilege the front of the queue and only allow requests that +// conflict with the request at the front to go through if they are requested by sessions that +// are *currently blocking* the front of the queue +fn process_new_req( + req: Request, + hot_seat: Option<&(Request, OrdSet)>, + trie: &mut LockTrie, + request_queue: &mut VecDeque<(Request, OrdSet)>, +) -> Option> { + match hot_seat { + // hot seat conflicts and request session isn't in current blocking sessions + // so we push it to the queue + Some((hot_req, hot_blockers)) + if hot_req.lock_info.conflicts_with(&req.lock_info) + && !hot_blockers.contains(&req.lock_info.handle_id) => + { + #[cfg(feature = "tracing")] + { + info!("{}", fmt_deferred(&req.lock_info)); + debug!( + "Must wait on hot seat request from session {}", + &hot_req.lock_info.handle_id.id + ); + } + + request_queue.push_back((req, ordset![])); + kill_deadlocked(request_queue, &*trie); + None + } + // otherwise we try and service it immediately, only pushing to the queue if it fails + _ => match trie.try_lock(&req.lock_info) { + Ok(()) => { + #[cfg(feature = "tracing")] + info!("{}", fmt_acquired(&req.lock_info)); + + Some(req.complete()) + } + Err(blocking_sessions) => { + #[cfg(feature = "tracing")] + { + info!("{}", fmt_deferred(&req.lock_info)); + debug!( + "Must wait on sessions {}", + display_session_set(&blocking_sessions) + ) + } + + request_queue.push_back((req, blocking_sessions)); + kill_deadlocked(request_queue, &*trie); + None + } + }, + } +} + +fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet)>, trie: &LockTrie) { + // TODO optimize this, it is unlikely that we are anywhere close to as efficient as we can be here. + let deadlocked_reqs = deadlock_scan(request_queue); + if !deadlocked_reqs.is_empty() { + let locks_waiting = LockSet( + deadlocked_reqs + .iter() + .map(|r| r.lock_info.clone()) + .collect(), + ); + #[cfg(feature = "tracing")] + error!("Deadlock Detected: {:?}", locks_waiting); + let err = LockError::DeadlockDetected { + locks_waiting, + locks_held: LockSet(trie.subtree_lock_info()), + }; + let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len()); + for (i, (req, _)) in request_queue.iter().enumerate() { + if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) { + indices_to_remove.push(i) + } + } + let old = std::mem::take(request_queue); + for (i, (r, s)) in old.into_iter().enumerate() { + if indices_to_remove.contains(&i) { + r.reject(err.clone()) + } else { + request_queue.push_back((r, s)) + } + } + } +} + +fn deadlock_scan<'a>(queue: &'a VecDeque<(Request, OrdSet)>) -> Vec<&'a Request> { + let (wait_map, mut req_map) = queue + .iter() + .map(|(req, set)| ((&req.lock_info.handle_id, set, req))) + .fold( + ( + OrdMap::<&'a HandleId, &'a OrdSet>::new(), + OrdMap::<&'a HandleId, &'a Request>::new(), + ), + |(mut wmap, mut rmap), (id, wset, req)| { + ( + { + wmap.insert(id, wset); + wmap + }, + { + rmap.insert(id, req); + rmap + }, + ) + }, + ); + fn path_to<'a>( + graph: &OrdMap<&'a HandleId, &'a OrdSet>, + root: &'a HandleId, + node: &'a HandleId, + ) -> OrdSet<&'a HandleId> { + if node == root { + return ordset![root]; + } + match graph.get(node) { + None => ordset![], + Some(s) => s + .iter() + .find_map(|h| Some(path_to(graph, root, h)).filter(|s| s.is_empty())) + .map_or(ordset![], |mut s| { + s.insert(node); + s + }), + } + } + for (root, wait_set) in wait_map.iter() { + let cycle = wait_set + .iter() + .find_map(|start| Some(path_to(&wait_map, root, start)).filter(|s| s.is_empty())); + match cycle { + None => { + continue; + } + Some(c) => { + return c + .into_iter() + .map(|id| req_map.remove(id).unwrap()) + .collect(); + } + } + } + vec![] +} diff --git a/patch-db/src/locker/log_utils.rs b/patch-db/src/locker/log_utils.rs new file mode 100644 index 0000000..434e1f5 --- /dev/null +++ b/patch-db/src/locker/log_utils.rs @@ -0,0 +1,46 @@ +use super::LockInfo; +use imbl::OrdSet; + +use crate::handle::HandleId; + +#[cfg(feature = "tracing")] +pub fn display_session_set(set: &OrdSet) -> String { + use std::fmt::Write; + + let mut display = String::from("{"); + for session in set.iter() { + write!(display, "{},", session.id).unwrap(); + } + display.replace_range(display.len() - 1.., "}"); + display +} + +#[cfg(feature = "tracing")] +pub(super) fn fmt_acquired(lock_info: &LockInfo) -> String { + format!( + "Acquired: session {} - {} lock on {}", + lock_info.handle_id.id, lock_info.ty, lock_info.ptr, + ) +} + +#[cfg(feature = "tracing")] +pub(super) fn fmt_deferred(deferred_lock_info: &LockInfo) -> String { + format!( + "Deferred: session {} - {} lock on {}", + deferred_lock_info.handle_id.id, deferred_lock_info.ty, deferred_lock_info.ptr, + ) +} + +pub(super) fn fmt_released(released_lock_info: &LockInfo) -> String { + format!( + "Released: session {} - {} lock on {}", + released_lock_info.handle_id.id, released_lock_info.ty, released_lock_info.ptr + ) +} + +pub(super) fn fmt_cancelled(cancelled_lock_info: &LockInfo) -> String { + format!( + "Canceled: session {} - {} lock on {}", + cancelled_lock_info.handle_id.id, cancelled_lock_info.ty, cancelled_lock_info.ptr + ) +} diff --git a/patch-db/src/locker/mod.rs b/patch-db/src/locker/mod.rs new file mode 100644 index 0000000..e957bef --- /dev/null +++ b/patch-db/src/locker/mod.rs @@ -0,0 +1,301 @@ +mod action_mux; +mod bookkeeper; +mod log_utils; +mod natural; +mod order_enforcer; +mod trie; + +use imbl::{ordmap, ordset, OrdMap, OrdSet}; +use json_ptr::JsonPointer; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, trace, warn}; + +use crate::{handle::HandleId, locker::action_mux::Action}; + +use self::{action_mux::ActionMux, bookkeeper::LockBookkeeper}; + +pub struct Locker { + sender: mpsc::UnboundedSender, +} +impl Locker { + pub fn new() -> Self { + let (sender, receiver) = mpsc::unbounded_channel(); + tokio::spawn(async move { + let mut action_mux = ActionMux::new(receiver); + let mut lock_server = LockBookkeeper::new(); + + while let Some(action) = action_mux.get_action().await { + #[cfg(feature = "tracing")] + trace!("Locker Action: {:#?}", action); + match action { + Action::HandleRequest(mut req) => { + #[cfg(feature = "tracing")] + debug!("New lock request: {}", &req.lock_info); + + // Pertinent Logic + let req_cancel = req.cancel.take().expect("Request Cancellation Stolen"); + match lock_server.lease(req) { + Ok(Some(recv)) => { + action_mux.push_unlock_receivers(std::iter::once(recv)) + } + Ok(None) => action_mux.push_cancellation_receiver(req_cancel), + Err(_) => {} + } + } + Action::HandleRelease(lock_info) => { + #[cfg(feature = "tracing")] + debug!("New lock release: {}", &lock_info); + + let new_unlock_receivers = lock_server.ret(&lock_info); + action_mux.push_unlock_receivers(new_unlock_receivers); + } + Action::HandleCancel(lock_info) => { + #[cfg(feature = "tracing")] + debug!("New request canceled: {}", &lock_info); + + lock_server.cancel(&lock_info) + } + } + } + }); + Locker { sender } + } + pub async fn lock( + &self, + handle_id: HandleId, + ptr: JsonPointer, + lock_type: LockType, + ) -> Result { + // Local Definitions + struct CancelGuard { + lock_info: Option, + channel: Option>, + recv: oneshot::Receiver>, + } + impl Drop for CancelGuard { + fn drop(&mut self) { + if let (Some(lock_info), Some(channel)) = + (self.lock_info.take(), self.channel.take()) + { + self.recv.close(); + let _ = channel.send(lock_info); + } + } + } + + // Pertinent Logic + let lock_info = LockInfo { + handle_id, + ptr, + ty: lock_type, + }; + let (send, recv) = oneshot::channel(); + let (cancel_send, cancel_recv) = oneshot::channel(); + let mut cancel_guard = CancelGuard { + lock_info: Some(lock_info.clone()), + channel: Some(cancel_send), + recv, + }; + self.sender + .send(Request { + lock_info, + cancel: Some(cancel_recv), + completion: send, + }) + .unwrap(); + let res = (&mut cancel_guard.recv).await.unwrap(); + cancel_guard.channel.take(); + res + } +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] +struct LockInfo { + handle_id: HandleId, + ptr: JsonPointer, + ty: LockType, +} +impl LockInfo { + fn conflicts_with(&self, other: &LockInfo) -> bool { + self.handle_id != other.handle_id + && match (self.ty, other.ty) { + (LockType::Exist, LockType::Exist) => false, + (LockType::Exist, LockType::Read) => false, + (LockType::Exist, LockType::Write) => self.ptr.starts_with(&other.ptr), + (LockType::Read, LockType::Exist) => false, + (LockType::Read, LockType::Read) => false, + (LockType::Read, LockType::Write) => { + self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) + } + (LockType::Write, LockType::Exist) => other.ptr.starts_with(&self.ptr), + (LockType::Write, LockType::Read) => { + self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) + } + (LockType::Write, LockType::Write) => { + self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) + } + } + } + fn implicitly_grants(&self, other: &LockInfo) -> bool { + self.handle_id == other.handle_id + && match self.ty { + LockType::Exist => other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr), + LockType::Read => { + // E's in the ancestry + other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr) + // nonexclusive locks in the subtree + || other.ty != LockType::Write && other.ptr.starts_with(&self.ptr) + } + LockType::Write => { + // E's in the ancestry + other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr) + // anything in the subtree + || other.ptr.starts_with(&self.ptr) + } + } + } +} +impl std::fmt::Display for LockInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}{}{}", self.handle_id.id, self.ty, self.ptr) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum LockType { + Exist, + Read, + Write, +} +impl Default for LockType { + fn default() -> Self { + LockType::Exist + } +} +impl std::fmt::Display for LockType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let show = match self { + LockType::Exist => "E", + LockType::Read => "R", + LockType::Write => "W", + }; + write!(f, "{}", show) + } +} + +#[derive(Debug, Clone)] +pub struct LockSet(OrdSet); +impl std::fmt::Display for LockSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let by_session = self + .0 + .iter() + .map(|i| (&i.handle_id, ordset![(&i.ptr, &i.ty)])) + .fold( + ordmap! {}, + |m: OrdMap<&HandleId, OrdSet<(&JsonPointer, &LockType)>>, (id, s)| { + m.update_with(&id, s, OrdSet::union) + }, + ); + let num_sessions = by_session.len(); + for (i, (session, set)) in by_session.into_iter().enumerate() { + write!(f, "{}: {{ ", session.id)?; + let num_entries = set.len(); + for (j, (ptr, ty)) in set.into_iter().enumerate() { + write!(f, "{}{}", ty, ptr)?; + if j == num_entries - 1 { + write!(f, " }}")?; + } else { + write!(f, ", ")?; + } + } + if i != num_sessions - 1 { + write!(f, "\n")?; + } + } + Ok(()) + } +} + +#[derive(Debug, Clone, thiserror::Error)] +pub enum LockError { + #[error("Lock Taxonomy Escalation: Session = {session:?}, First = {first}, Second = {second}")] + LockTaxonomyEscalation { + session: HandleId, + first: JsonPointer, + second: JsonPointer, + }, + #[error("Lock Type Escalation: Session = {session:?}, Pointer = {ptr}, First = {first}, Second = {second}")] + LockTypeEscalation { + session: HandleId, + ptr: JsonPointer, + first: LockType, + second: LockType, + }, + #[error("Lock Type Escalation Implicit: Session = {session:?}, First = {first_ptr}:{first_type}, Second = {second_ptr}:{second_type}")] + LockTypeEscalationImplicit { + session: HandleId, + first_ptr: JsonPointer, + first_type: LockType, + second_ptr: JsonPointer, + second_type: LockType, + }, + #[error( + "Non-Canonical Lock Ordering: Session = {session:?}, First = {first}, Second = {second}" + )] + NonCanonicalOrdering { + session: HandleId, + first: JsonPointer, + second: JsonPointer, + }, + #[error("Deadlock Detected: Locks Held = {locks_held}, Locks Waiting = {locks_waiting}")] + DeadlockDetected { + locks_held: LockSet, + locks_waiting: LockSet, + }, +} + +#[derive(Debug)] +struct Request { + lock_info: LockInfo, + cancel: Option>, + completion: oneshot::Sender>, +} +impl Request { + fn complete(self) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + if let Err(_) = self.completion.send(Ok(Guard { + lock_info: self.lock_info, + sender: Some(sender), + })) { + #[cfg(feature = "tracing")] + warn!("Completion sent to closed channel.") + } + receiver + } + fn reject(self, err: LockError) { + if let Err(_) = self.completion.send(Err(err)) { + #[cfg(feature = "tracing")] + warn!("Rejection sent to closed channel.") + } + } +} + +#[derive(Debug)] +pub struct Guard { + lock_info: LockInfo, + sender: Option>, +} +impl Drop for Guard { + fn drop(&mut self) { + if let Err(_e) = self + .sender + .take() + .unwrap() + .send(std::mem::take(&mut self.lock_info)) + { + #[cfg(feature = "tracing")] + warn!("Failed to release lock: {:?}", _e) + } + } +} diff --git a/patch-db/src/locker/natural.rs b/patch-db/src/locker/natural.rs new file mode 100644 index 0000000..afc4dbe --- /dev/null +++ b/patch-db/src/locker/natural.rs @@ -0,0 +1,28 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) struct Natural(usize); +impl Natural { + pub fn one() -> Self { + Natural(1) + } + pub fn of(n: usize) -> Option { + if n == 0 { + None + } else { + Some(Natural(n)) + } + } + pub fn inc(&mut self) { + self.0 += 1; + } + pub fn dec(mut self) -> Option { + self.0 -= 1; + if self.0 == 0 { + None + } else { + Some(self) + } + } + pub fn into_usize(self) -> usize { + self.0 + } +} diff --git a/patch-db/src/locker/order_enforcer.rs b/patch-db/src/locker/order_enforcer.rs new file mode 100644 index 0000000..1f24e70 --- /dev/null +++ b/patch-db/src/locker/order_enforcer.rs @@ -0,0 +1,143 @@ +use imbl::{ordmap, OrdMap}; +use json_ptr::JsonPointer; +use tracing::warn; + +use crate::{handle::HandleId, LockType}; + +use super::{LockError, LockInfo}; + +pub(super) struct LockOrderEnforcer { + locks_held: OrdMap>, +} +impl LockOrderEnforcer { + pub fn new() -> Self { + LockOrderEnforcer { + locks_held: ordmap! {}, + } + } + // locks must be acquired in lexicographic order for the pointer, and reverse order for type + fn validate(&self, req: &LockInfo) -> Result<(), LockError> { + // the following notation is used to denote an example sequence that can cause deadlocks + // + // Individual Lock Requests + // 1W/A/B + // |||> Node whose lock is being acquired: /A/B (strings prefixed by slashes, indicating descent path) + // ||> Type of Lock: W (E/R/W) + // |> Session Number: 1 (any natural number) + // + // Sequences + // LockRequest >> LockRequest + match self.locks_held.get(&req.handle_id) { + None => Ok(()), + Some(m) => { + // quick accept + for (ptr, ty) in m.keys() { + let tmp = LockInfo { + ptr: ptr.clone(), + ty: *ty, + handle_id: req.handle_id.clone(), + }; + if tmp.implicitly_grants(req) { + return Ok(()); + } + } + let err = m.keys().find_map(|(ptr, ty)| match ptr.cmp(&req.ptr) { + std::cmp::Ordering::Less => { + if req.ptr.starts_with(ptr) + && req.ty == LockType::Write + && *ty == LockType::Read + { + // 1R/A >> 2R/A >> 1W/A/A >> 2W/A/B + Some(LockError::LockTypeEscalationImplicit { + session: req.handle_id.clone(), + first_ptr: ptr.clone(), + first_type: *ty, + second_ptr: req.ptr.clone(), + second_type: req.ty, + }) + } else { + None + } + } + std::cmp::Ordering::Equal => { + if req.ty > *ty { + // 1R/A >> 2R/A >> 1W/A >> 1W/A + Some(LockError::LockTypeEscalation { + session: req.handle_id.clone(), + ptr: ptr.clone(), + first: *ty, + second: req.ty, + }) + } else { + None + } + } + std::cmp::Ordering::Greater => Some(if ptr.starts_with(&req.ptr) { + // 1W/A/A >> 2W/A/B >> 1R/A >> 2R/A + LockError::LockTaxonomyEscalation { + session: req.handle_id.clone(), + first: ptr.clone(), + second: req.ptr.clone(), + } + } else { + // 1W/A >> 2W/B >> 1W/B >> 2W/A + LockError::NonCanonicalOrdering { + session: req.handle_id.clone(), + first: ptr.clone(), + second: req.ptr.clone(), + } + }), + }); + err.map_or(Ok(()), Err) + } + } + } + pub(super) fn try_insert(&mut self, req: &LockInfo) -> Result<(), LockError> { + self.validate(req)?; + match self.locks_held.get_mut(&req.handle_id) { + None => { + self.locks_held.insert( + req.handle_id.clone(), + ordmap![(req.ptr.clone(), req.ty) => 1], + ); + } + Some(locks) => { + let k = (req.ptr.clone(), req.ty); + match locks.get_mut(&k) { + None => { + locks.insert(k, 1); + } + Some(n) => { + *n += 1; + } + } + } + } + Ok(()) + } + pub(super) fn remove(&mut self, req: &LockInfo) { + match self.locks_held.remove_with_key(&req.handle_id) { + None => { + #[cfg(feature = "tracing")] + warn!("Invalid removal from session manager: {:?}", req); + } + Some((hdl, mut locks)) => { + let k = (req.ptr.clone(), req.ty); + match locks.remove_with_key(&k) { + None => { + #[cfg(feature = "tracing")] + warn!("Invalid removal from session manager: {:?}", req); + } + Some((k, n)) => { + if n - 1 > 0 { + locks.insert(k, n - 1); + } + } + } + if !locks.is_empty() { + self.locks_held.insert(hdl, locks); + } + } + } + } +} diff --git a/patch-db/src/proptest.rs b/patch-db/src/locker/proptest.rs similarity index 100% rename from patch-db/src/proptest.rs rename to patch-db/src/locker/proptest.rs diff --git a/patch-db/src/locker/trie.rs b/patch-db/src/locker/trie.rs new file mode 100644 index 0000000..10f3d9f --- /dev/null +++ b/patch-db/src/locker/trie.rs @@ -0,0 +1,628 @@ +use std::collections::BTreeMap; + +use imbl::{ordset, OrdSet}; +use json_ptr::{JsonPointer, SegList}; + +use crate::{handle::HandleId, LockType}; + +use super::{natural::Natural, LockInfo}; + +#[derive(Debug, Clone, PartialEq, Eq)] +enum LockState { + Free, + Shared { + e_lessees: BTreeMap, + r_lessees: BTreeMap, + }, + Exclusive { + w_lessee: HandleId, + w_session_count: Natural, // should never be 0 + r_session_count: usize, + e_session_count: usize, + }, +} +impl LockState { + fn erase(self, session: &HandleId) -> LockState { + match self { + LockState::Free => LockState::Free, + LockState::Shared { + mut e_lessees, + mut r_lessees, + } => { + e_lessees.remove(session); + r_lessees.remove(session); + if e_lessees.is_empty() && r_lessees.is_empty() { + LockState::Free + } else { + LockState::Shared { + e_lessees, + r_lessees, + } + } + } + LockState::Exclusive { ref w_lessee, .. } => { + if w_lessee == session { + LockState::Free + } else { + self + } + } + } + } + fn write_free(&self) -> bool { + match self { + LockState::Exclusive { .. } => false, + _ => true, + } + } + fn read_free(&self) -> bool { + match self { + LockState::Exclusive { + r_session_count, .. + } => *r_session_count == 0, + LockState::Shared { r_lessees, .. } => r_lessees.is_empty(), + _ => true, + } + } + fn sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match self { + LockState::Free => OrdSet::new(), + LockState::Shared { + e_lessees, + r_lessees, + } => e_lessees.keys().chain(r_lessees.keys()).collect(), + LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], + } + } + #[allow(dead_code)] + fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match self { + LockState::Free => OrdSet::new(), + LockState::Shared { e_lessees, .. } => e_lessees.keys().collect(), + LockState::Exclusive { + w_lessee, + e_session_count, + .. + } => { + if *e_session_count > 0 { + ordset![w_lessee] + } else { + OrdSet::new() + } + } + } + } + fn read_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match self { + LockState::Free => OrdSet::new(), + LockState::Shared { r_lessees, .. } => r_lessees.keys().collect(), + LockState::Exclusive { + w_lessee, + r_session_count, + .. + } => { + if *r_session_count > 0 { + ordset![w_lessee] + } else { + OrdSet::new() + } + } + } + } + fn write_session<'a>(&'a self) -> Option<&'a HandleId> { + match self { + LockState::Exclusive { w_lessee, .. } => Some(w_lessee), + _ => None, + } + } + + fn normalize(&mut self) { + match &*self { + LockState::Shared { + e_lessees, + r_lessees, + } if e_lessees.is_empty() && r_lessees.is_empty() => { + *self = LockState::Free; + } + _ => {} + } + } + // note this is not necessarily safe in the overall trie locking model + // this function will return true if the state changed as a result of the call + // if it returns false it technically means that the call was invalid and did not + // change the lock state at all + fn try_lock(&mut self, session: HandleId, typ: &LockType) -> bool { + match (&mut *self, typ) { + (LockState::Free, LockType::Exist) => { + *self = LockState::Shared { + e_lessees: [(session, Natural::one())].into(), + r_lessees: BTreeMap::new(), + }; + true + } + (LockState::Free, LockType::Read) => { + *self = LockState::Shared { + e_lessees: BTreeMap::new(), + r_lessees: [(session, Natural::one())].into(), + }; + true + } + (LockState::Free, LockType::Write) => { + *self = LockState::Exclusive { + w_lessee: session, + w_session_count: Natural::one(), + r_session_count: 0, + e_session_count: 0, + }; + true + } + (LockState::Shared { e_lessees, .. }, LockType::Exist) => { + match e_lessees.get_mut(&session) { + None => { + e_lessees.insert(session, Natural::one()); + } + Some(v) => v.inc(), + }; + true + } + (LockState::Shared { r_lessees, .. }, LockType::Read) => { + match r_lessees.get_mut(&session) { + None => { + r_lessees.insert(session, Natural::one()); + } + Some(v) => v.inc(), + } + true + } + ( + LockState::Shared { + e_lessees, + r_lessees, + }, + LockType::Write, + ) => { + for hdl in e_lessees.keys() { + if hdl != &session { + return false; + } + } + for hdl in r_lessees.keys() { + if hdl != &session { + return false; + } + } + *self = LockState::Exclusive { + r_session_count: r_lessees.remove(&session).map_or(0, Natural::into_usize), + e_session_count: e_lessees.remove(&session).map_or(0, Natural::into_usize), + w_lessee: session, + w_session_count: Natural::one(), + }; + true + } + ( + LockState::Exclusive { + w_lessee, + e_session_count, + .. + }, + LockType::Exist, + ) => { + if w_lessee != &session { + return false; + } + *e_session_count += 1; + true + } + ( + LockState::Exclusive { + w_lessee, + r_session_count, + .. + }, + LockType::Read, + ) => { + if w_lessee != &session { + return false; + } + *r_session_count += 1; + true + } + ( + LockState::Exclusive { + w_lessee, + w_session_count, + .. + }, + LockType::Write, + ) => { + if w_lessee != &session { + return false; + } + w_session_count.inc(); + true + } + } + } + + // there are many ways for this function to be called in an invalid way: Notably releasing locks that you never + // had to begin with. + fn try_unlock(&mut self, session: &HandleId, typ: &LockType) -> bool { + match (&mut *self, typ) { + (LockState::Free, _) => false, + (LockState::Shared { e_lessees, .. }, LockType::Exist) => { + match e_lessees.remove_entry(session) { + None => false, + Some((k, v)) => { + match v.dec() { + None => { + self.normalize(); + } + Some(n) => { + e_lessees.insert(k, n); + } + } + true + } + } + } + (LockState::Shared { r_lessees, .. }, LockType::Read) => { + match r_lessees.remove_entry(session) { + None => false, + Some((k, v)) => { + match v.dec() { + None => { + self.normalize(); + } + Some(n) => { + r_lessees.insert(k, n); + } + } + true + } + } + } + (LockState::Shared { .. }, LockType::Write) => false, + ( + LockState::Exclusive { + w_lessee, + e_session_count, + .. + }, + LockType::Exist, + ) => { + if w_lessee != session || *e_session_count == 0 { + return false; + } + *e_session_count -= 1; + true + } + ( + LockState::Exclusive { + w_lessee, + r_session_count, + .. + }, + LockType::Read, + ) => { + if w_lessee != session || *r_session_count == 0 { + return false; + } + *r_session_count -= 1; + true + } + ( + LockState::Exclusive { + w_lessee, + w_session_count, + r_session_count, + e_session_count, + }, + LockType::Write, + ) => { + if w_lessee != session { + return false; + } + match w_session_count.dec() { + None => { + let mut e_lessees = BTreeMap::new(); + if let Some(n) = Natural::of(*e_session_count) { + e_lessees.insert(session.clone(), n); + } + let mut r_lessees = BTreeMap::new(); + if let Some(n) = Natural::of(*r_session_count) { + r_lessees.insert(session.clone(), n); + } + *self = LockState::Shared { + e_lessees, + r_lessees, + }; + self.normalize(); + } + Some(n) => *w_session_count = n, + } + true + } + } + } +} +impl Default for LockState { + fn default() -> Self { + LockState::Free + } +} + +#[derive(Debug, Default)] + +pub(super) struct LockTrie { + state: LockState, + children: BTreeMap, +} +impl LockTrie { + #[allow(dead_code)] + fn all bool>(&self, f: F) -> bool { + f(&self.state) && self.children.values().all(|t| t.all(&f)) + } + #[allow(dead_code)] + fn any bool>(&self, f: F) -> bool { + f(&self.state) || self.children.values().any(|t| t.any(&f)) + } + #[allow(dead_code)] + fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool { + self.all(|s| s.sessions().difference(ordset![session]).is_empty()) + } + #[allow(dead_code)] + fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool { + self.all(|s| match s.clone().erase(session) { + LockState::Exclusive { .. } => false, + _ => true, + }) + } + fn subtree_write_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match &self.state { + LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], + _ => self + .children + .values() + .map(|t| t.subtree_write_sessions()) + .fold(OrdSet::new(), OrdSet::union), + } + } + fn subtree_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + let children = self + .children + .values() + .map(LockTrie::subtree_sessions) + .fold(OrdSet::new(), OrdSet::union); + self.state.sessions().union(children) + } + pub fn subtree_lock_info<'a>(&'a self) -> OrdSet { + let mut acc = self + .children + .iter() + .map(|(s, t)| { + t.subtree_lock_info() + .into_iter() + .map(|mut i| LockInfo { + ty: i.ty, + handle_id: i.handle_id, + ptr: { + i.ptr.push_start(s); + i.ptr + }, + }) + .collect() + }) + .fold(ordset![], OrdSet::union); + let self_writes = self.state.write_session().map(|session| LockInfo { + handle_id: session.clone(), + ptr: JsonPointer::default(), + ty: LockType::Write, + }); + let self_reads = self + .state + .read_sessions() + .into_iter() + .map(|session| LockInfo { + handle_id: session.clone(), + ptr: JsonPointer::default(), + ty: LockType::Read, + }); + let self_exists = self + .state + .exist_sessions() + .into_iter() + .map(|session| LockInfo { + handle_id: session.clone(), + ptr: JsonPointer::default(), + ty: LockType::Exist, + }); + acc.extend(self_writes.into_iter().chain(self_reads).chain(self_exists)); + acc + } + fn ancestors_and_trie<'a, S: AsRef, V: SegList>( + &'a self, + ptr: &JsonPointer, + ) -> (Vec<&'a LockState>, Option<&'a LockTrie>) { + match ptr.uncons() { + None => (Vec::new(), Some(self)), + Some((first, rest)) => match self.children.get(first) { + None => (vec![&self.state], None), + Some(t) => { + let (mut v, t) = t.ancestors_and_trie(&rest); + v.push(&self.state); + (v, t) + } + }, + } + } + // no writes in ancestor set, no writes at node + #[allow(dead_code)] + fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_write_free = v + .into_iter() + .cloned() + .map(|s| s.erase(session)) + .all(|s| s.write_free()); + ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free()) + } + // no writes in ancestor set, no writes in subtree + #[allow(dead_code)] + fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_write_free = v + .into_iter() + .cloned() + .map(|s| s.erase(session)) + .all(|s| s.write_free()); + ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session)) + } + // no reads or writes in ancestor set, no locks in subtree + #[allow(dead_code)] + fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_rw_free = v + .into_iter() + .cloned() + .map(|s| s.erase(session)) + .all(|s| s.write_free() && s.read_free()); + ancestor_rw_free && t.map_or(true, |t| t.subtree_is_lock_free_for(session)) + } + // ancestors with writes and writes on the node + fn session_blocking_exist<'a>( + &'a self, + ptr: &JsonPointer, + session: &HandleId, + ) -> Option<&'a HandleId> { + let (v, t) = self.ancestors_and_trie(ptr); + // there can only be one write session per traversal + let ancestor_write = v.into_iter().find_map(|s| s.write_session()); + let node_write = t.and_then(|t| t.state.write_session()); + ancestor_write + .or(node_write) + .and_then(|s| if s == session { None } else { Some(s) }) + } + // ancestors with writes, subtrees with writes + fn sessions_blocking_read<'a>( + &'a self, + ptr: &JsonPointer, + session: &HandleId, + ) -> OrdSet<&'a HandleId> { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_writes = v + .into_iter() + .map(|s| s.write_session().into_iter().collect::>()) + .fold(OrdSet::new(), OrdSet::union); + let relevant_write_sessions = match t { + None => ancestor_writes, + Some(t) => ancestor_writes.union(t.subtree_write_sessions()), + }; + relevant_write_sessions.without(session) + } + // ancestors with reads or writes, subtrees with anything + fn sessions_blocking_write<'a>( + &'a self, + ptr: &JsonPointer, + session: &HandleId, + ) -> OrdSet<&'a HandleId> { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestors = v + .into_iter() + .map(|s| { + s.read_sessions() + .union(s.write_session().into_iter().collect()) + }) + .fold(OrdSet::new(), OrdSet::union); + let subtree = t.map_or(OrdSet::new(), |t| t.subtree_sessions()); + ancestors.union(subtree).without(session) + } + + fn child_mut, V: SegList>(&mut self, ptr: &JsonPointer) -> &mut Self { + match ptr.uncons() { + None => self, + Some((first, rest)) => { + if !self.children.contains_key(first) { + self.children.insert(first.to_owned(), LockTrie::default()); + } + self.children.get_mut(first).unwrap().child_mut(&rest) + } + } + } + + fn sessions_blocking_lock<'a>(&'a self, lock_info: &LockInfo) -> OrdSet<&'a HandleId> { + match &lock_info.ty { + LockType::Exist => self + .session_blocking_exist(&lock_info.ptr, &lock_info.handle_id) + .into_iter() + .collect(), + LockType::Read => self.sessions_blocking_read(&lock_info.ptr, &lock_info.handle_id), + LockType::Write => self.sessions_blocking_write(&lock_info.ptr, &lock_info.handle_id), + } + } + + pub fn try_lock<'a>(&'a mut self, lock_info: &LockInfo) -> Result<(), OrdSet> { + let blocking_sessions = self.sessions_blocking_lock(lock_info); + if !blocking_sessions.is_empty() { + Err(blocking_sessions.into_iter().cloned().collect()) + } else { + drop(blocking_sessions); + let success = self + .child_mut(&lock_info.ptr) + .state + .try_lock(lock_info.handle_id.clone(), &lock_info.ty); + assert!(success); + Ok(()) + } + } + + pub fn unlock(&mut self, lock_info: &LockInfo) { + let t = self.child_mut(&lock_info.ptr); + let success = t.state.try_unlock(&lock_info.handle_id, &lock_info.ty); + assert!(success); + self.prune(); + } + + fn prunable(&self) -> bool { + self.children.is_empty() && self.state == LockState::Free + } + + fn prune(&mut self) { + self.children.retain(|_, t| { + t.prune(); + !t.prunable() + }) + } +} + +#[cfg(test)] +mod proptest { + use super::*; + use ::proptest::prelude::*; + + fn lock_type_gen() -> BoxedStrategy { + prop_oneof![ + Just(crate::LockType::Exist), + Just(crate::LockType::Read), + Just(crate::LockType::Write), + ] + .boxed() + } + + proptest! { + #[test] + fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) { + let mut orig = LockState::Free; + orig.try_lock(HandleId{ + id: session, + #[cfg(feature = "tracing")] + trace: None + }, &typ); + orig.try_unlock(&HandleId{ + id: session, + #[cfg(feature="tracing")] + trace:None + }, &typ); + prop_assert_eq!(orig, LockState::Free); + } + } +}