From 449fbf66d4454a02026dd04b6d3614a50662169a Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Wed, 8 Dec 2021 18:20:49 -0700 Subject: [PATCH] first attempt at fixing locking semantics to prevent livelocks --- json-ptr | 2 +- patch-db/Cargo.toml | 1 + patch-db/src/handle.rs | 10 + patch-db/src/locker.rs | 853 ++++++++++++++++++++++++++++++----------- patch-db/src/model.rs | 9 +- 5 files changed, 645 insertions(+), 230 deletions(-) diff --git a/json-ptr b/json-ptr index 33a2af1..a8ac596 160000 --- a/json-ptr +++ b/json-ptr @@ -1 +1 @@ -Subproject commit 33a2af147e5266fa97e373e5e412583be1781991 +Subproject commit a8ac596f063ce1794c68d14aecbc8f801fe95c7b diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index c32522b..abe2de4 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -19,6 +19,7 @@ trace = ["debug", "tracing-error"] async-trait = "0.1.42" fd-lock-rs = "0.1.3" futures = "0.3.8" +im = "*" json-patch = { path = "../json-patch" } json-ptr = { path = "../json-ptr" } lazy_static = "1.4.0" diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index 6ed461d..7630b01 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -24,6 +24,16 @@ impl PartialEq for HandleId { } } impl Eq for HandleId {} +impl PartialOrd for HandleId { + fn partial_cmp(&self, other: &Self) -> Option { + self.id.partial_cmp(&other.id) + } +} +impl Ord for HandleId { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.id.cmp(&other.id) + } +} #[async_trait] pub trait DbHandle: Send + Sync { diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index 4c0e983..394de2d 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -1,6 +1,7 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; -use json_ptr::JsonPointer; +use im::{ordset, OrdSet}; +use json_ptr::{JsonPointer, SegList}; use tokio::sync::{mpsc, oneshot}; use crate::handle::HandleId; @@ -23,21 +24,109 @@ impl Locker { let mut locks_on_lease = vec![dummy_recv]; let (_dummy_send, dummy_recv) = oneshot::channel(); let mut cancellations = vec![dummy_recv]; + + let mut request_queue = VecDeque::<(Request, OrdSet)>::new(); while let Some(action) = get_action(&mut new_requests, &mut locks_on_lease, &mut cancellations).await { + // to prevent starvation we privilege the front of the queue and only allow requests that + // conflict with the request at the front to go through if they are requested by sessions that + // are *currently blocking* the front of the queue + fn process_new_req( + hot_seat: Option<&(Request, OrdSet)>, + req: Request, + trie: &mut Trie, + locks_on_lease: &mut Vec>, + request_queue: &mut VecDeque<(Request, OrdSet)>, + ) { + match hot_seat { + // hot seat conflicts and request session isn't in current blocking sessions + // so we push it to the queue + Some((hot_req, hot_blockers)) + if hot_req.lock_info.conflicts_with(&req.lock_info) + && !hot_blockers.contains(&req.lock_info.handle_id) => + { + request_queue.push_back((req, OrdSet::new())) + } + // otherwise we try and service it immediately, only pushing to the queue if it fails + _ => match trie.try_lock(&req.lock_info) { + Ok(()) => { + let lease = req.complete(); + locks_on_lease.push(lease); + } + Err(blocking_sessions) => { + request_queue.push_back((req, blocking_sessions)) + } + }, + } + } #[cfg(feature = "tracing")] tracing::trace!("Locker Action: {:#?}", action); match action { Action::HandleRequest(mut req) => { cancellations.extend(req.cancel.take()); - trie.handle_request(req, &mut locks_on_lease) + let hot_seat = request_queue.pop_front(); + process_new_req( + hot_seat.as_ref(), + req, + &mut trie, + &mut locks_on_lease, + &mut request_queue, + ); + hot_seat.map_or((), |a| request_queue.push_front(a)); } Action::HandleRelease(lock_info) => { - trie.handle_release(lock_info, &mut locks_on_lease) + // release actual lock + if trie.try_unlock(&lock_info) { + // try to pop off as many requests off the front of the queue as we can + let mut hot_seat = None; + while let Some((r, _)) = request_queue.pop_front() { + match trie.try_lock(&r.lock_info) { + Ok(()) => { + let lease = r.complete(); + locks_on_lease.push(lease); + } + Err(new_blocking_sessions) => { + // set the hot seat and proceed to step two + hot_seat = Some((r, new_blocking_sessions)); + break; + } + } + } + // when we can no longer do so, try and service the rest of the queue with the new hot seat + let old_request_queue = request_queue; + request_queue = VecDeque::new(); + for (r, _) in old_request_queue { + // we now want to process each request in the queue as if it was new + process_new_req( + hot_seat.as_ref(), + r, + &mut trie, + &mut locks_on_lease, + &mut request_queue, + ) + } + hot_seat.map_or((), |a| request_queue.push_front(a)); + } } Action::HandleCancel(lock_info) => { - trie.handle_cancel(lock_info, &mut locks_on_lease) + // trie.handle_cancel(lock_info, &mut locks_on_lease) + let entry = request_queue + .iter() + .enumerate() + .find(|(_, (r, _))| r.lock_info == lock_info); + match entry { + None => { + #[cfg(feature = "tracing")] + tracing::warn!( + "Received cancellation for a lock not currently waiting: {}", + lock_info.ptr + ); + } + Some((i, _)) => { + request_queue.remove(i); + } + } } } #[cfg(feature = "tracing")] @@ -66,7 +155,6 @@ impl Locker { handle_id, ptr, ty: lock_type, - segments_handled: 0, }; let (send, recv) = oneshot::channel(); let (cancel_send, cancel_recv) = oneshot::channel(); @@ -132,255 +220,581 @@ async fn get_action( #[derive(Debug, Default)] struct Trie { - node: Node, + state: LockState, children: BTreeMap, } impl Trie { - fn child_mut(&mut self, name: &str) -> &mut Self { - if !self.children.contains_key(name) { - self.children.insert(name.to_owned(), Trie::default()); - } - self.children.get_mut(name).unwrap() + fn all bool>(&self, f: F) -> bool { + f(&self.state) && self.children.values().all(|t| t.all(&f)) } - fn handle_request( - &mut self, - req: Request, - locks_on_lease: &mut Vec>, - ) { - if let Some(req) = self.node.handle_request(req, locks_on_lease) { - self.child_mut(req.lock_info.current_seg()) - .handle_request(req, locks_on_lease) + fn any bool>(&self, f: F) -> bool { + f(&self.state) || self.children.values().any(|t| t.any(&f)) + } + fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool { + self.all(|s| s.sessions().difference(ordset![session]).is_empty()) + } + fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool { + self.all(|s| match s.clone().erase(session) { + LockState::Exclusive { .. } => false, + _ => true, + }) + } + fn subtree_write_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match &self.state { + LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], + _ => self + .children + .values() + .map(|t| t.subtree_write_sessions()) + .fold(OrdSet::new(), OrdSet::union), } } - fn handle_release( - &mut self, - lock_info: LockInfo, - locks_on_lease: &mut Vec>, - ) { - let release = self.node.release(lock_info); - for req in std::mem::take(&mut self.node.reqs) { - self.handle_request(req, locks_on_lease); - } - if let Some(release) = release { - self.child_mut(release.current_seg()) - .handle_release(release, locks_on_lease) + fn subtree_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + let children = self + .children + .values() + .map(Trie::subtree_sessions) + .fold(OrdSet::new(), OrdSet::union); + self.state.sessions().union(children) + } + fn ancestors_and_trie<'a, S: AsRef, V: SegList>( + &'a self, + ptr: &JsonPointer, + ) -> (Vec<&'a LockState>, Option<&'a Trie>) { + match ptr.uncons() { + None => (Vec::new(), Some(self)), + Some((first, rest)) => match self.children.get(first) { + None => (vec![&self.state], None), + Some(t) => { + let (mut v, t) = t.ancestors_and_trie(&rest); + v.push(&self.state); + (v, t) + } + }, } } - fn handle_cancel( - &mut self, - lock_info: LockInfo, - locks_on_lease: &mut Vec>, - ) { - let cancel = self.node.cancel(lock_info); - for req in std::mem::take(&mut self.node.reqs) { - self.handle_request(req, locks_on_lease); + // no writes in ancestor set, no writes at node + fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_write_free = v + .into_iter() + .cloned() + .map(|s| s.erase(session)) + .all(|s| s.write_free()); + ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free()) + } + // no writes in ancestor set, no writes in subtree + fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_write_free = v + .into_iter() + .cloned() + .map(|s| s.erase(session)) + .all(|s| s.write_free()); + ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session)) + } + // no reads or writes in ancestor set, no locks in subtree + fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_rw_free = v + .into_iter() + .cloned() + .map(|s| s.erase(session)) + .all(|s| s.write_free() && s.read_free()); + ancestor_rw_free && t.map_or(true, |t| t.subtree_is_lock_free_for(session)) + } + // ancestors with writes and writes on the node + fn session_blocking_exist<'a>( + &'a self, + ptr: &JsonPointer, + session: &HandleId, + ) -> Option<&'a HandleId> { + let (v, _t) = self.ancestors_and_trie(ptr); + v.into_iter().find_map(|s| s.write_session()).and_then(|h| { + if h == session { + None + } else { + Some(h) + } + }) + } + // ancestors with writes, subtrees with writes + fn sessions_blocking_read<'a>( + &'a self, + ptr: &JsonPointer, + session: &HandleId, + ) -> OrdSet<&'a HandleId> { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestor_writes = v + .into_iter() + .map(|s| s.write_session().into_iter().collect::>()) + .fold(OrdSet::new(), OrdSet::union); + let relevant_write_sessions = match t { + None => ancestor_writes, + Some(t) => ancestor_writes.union(t.subtree_write_sessions()), + }; + relevant_write_sessions.without(session) + } + // ancestors with reads or writes, subtrees with anything + fn sessions_blocking_write<'a>( + &'a self, + ptr: &JsonPointer, + session: &HandleId, + ) -> OrdSet<&'a HandleId> { + let (v, t) = self.ancestors_and_trie(ptr); + let ancestors = v + .into_iter() + .map(|s| { + s.read_sessions() + .union(s.write_session().into_iter().collect()) + }) + .fold(OrdSet::new(), OrdSet::union); + let subtree = t.map_or(OrdSet::new(), |t| t.subtree_sessions()); + ancestors.union(subtree).without(session) + } + + fn child_mut, V: SegList>(&mut self, ptr: &JsonPointer) -> &mut Self { + match ptr.uncons() { + None => self, + Some((first, rest)) => { + if !self.children.contains_key(first) { + self.children.insert(first.to_owned(), Trie::default()); + } + self.children.get_mut(first).unwrap().child_mut(&rest) + } } - if let Some(cancel) = cancel { - self.child_mut(cancel.current_seg()) - .handle_cancel(cancel, locks_on_lease) + } + + fn sessions_blocking_lock<'a>(&'a self, lock_info: &LockInfo) -> OrdSet<&'a HandleId> { + match &lock_info.ty { + LockType::Exist => self + .session_blocking_exist(&lock_info.ptr, &lock_info.handle_id) + .into_iter() + .collect(), + LockType::Read => self.sessions_blocking_read(&lock_info.ptr, &lock_info.handle_id), + LockType::Write => self.sessions_blocking_write(&lock_info.ptr, &lock_info.handle_id), } } + + fn try_lock<'a>(&'a mut self, lock_info: &LockInfo) -> Result<(), OrdSet> { + let blocking_sessions = self.sessions_blocking_lock(lock_info); + if !blocking_sessions.is_empty() { + Err(blocking_sessions.into_iter().cloned().collect()) + } else { + drop(blocking_sessions); + self.child_mut(&lock_info.ptr) + .state + .lock(lock_info.handle_id.clone(), &lock_info.ty); + Ok(()) + } + } + + fn try_unlock(&mut self, lock_info: &LockInfo) -> bool { + let t = self.child_mut(&lock_info.ptr); + let success = t.state.unlock(&lock_info.handle_id, &lock_info.ty); + if success { + self.prune(); + } + success + } + + fn prunable(&self) -> bool { + self.children.is_empty() && self.state == LockState::Free + } + + fn prune(&mut self) { + self.children.retain(|_, t| { + t.prune(); + !t.prunable() + }) + } } -#[derive(Debug, Default)] -struct Node { - reader_parents: Vec, - readers: Vec, - writer_parents: Vec, - writers: Vec, - reqs: Vec, +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct Natural(usize); +impl Natural { + fn one() -> Self { + Natural(1) + } + fn of(n: usize) -> Option { + if n == 0 { + None + } else { + Some(Natural(n)) + } + } + fn inc(&mut self) { + self.0 += 1; + } + fn dec(mut self) -> Option { + if self.0 == 0 { + None + } else { + self.0 -= 1; + Some(self) + } + } } -impl Node { - // true: If there are any writer_parents, they are `id`. - fn write_parent_free(&self, id: &HandleId) -> bool { - self.writer_parents.is_empty() || (self.writer_parents.iter().find(|a| a != &id).is_none()) - } - // true: If there are any writers, they are `id`. - fn write_free(&self, id: &HandleId) -> bool { - self.writers.is_empty() || (self.writers.iter().find(|a| a != &id).is_none()) - } - // true: If there are any reader_parents, they are `id`. - fn read_parent_free(&self, id: &HandleId) -> bool { - self.reader_parents.is_empty() || (self.reader_parents.iter().find(|a| a != &id).is_none()) - } - // true: If there are any readers, they are `id`. - fn read_free(&self, id: &HandleId) -> bool { - self.readers.is_empty() || (self.readers.iter().find(|a| a != &id).is_none()) - } - // allow a lock to skip the queue if a lock is already held by the same handle - fn can_jump_queue(&self, id: &HandleId) -> bool { - self.writers.contains(&id) - || self.writer_parents.contains(&id) - || self.readers.contains(&id) - || self.reader_parents.contains(&id) - } - // `id` is capable of acquiring this node for the purpose of writing to a child - fn write_parent_available(&self, id: &HandleId) -> bool { - self.write_free(id) - && self.read_free(id) - && (self.reqs.is_empty() || self.can_jump_queue(id)) - } - // `id` is capable of acquiring this node for writing - fn write_available(&self, id: &HandleId) -> bool { - self.write_free(id) - && self.write_parent_free(id) - && self.read_free(id) - && self.read_parent_free(id) - && (self.reqs.is_empty() || self.can_jump_queue(id)) - } - fn read_parent_available(&self, id: &HandleId) -> bool { - self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) - } - // `id` is capable of acquiring this node for reading - fn read_available(&self, id: &HandleId) -> bool { - self.write_free(id) - && self.write_parent_free(id) - && (self.reqs.is_empty() || self.can_jump_queue(id)) - } - fn handle_request( - &mut self, - req: Request, - locks_on_lease: &mut Vec>, - ) -> Option { - if req.completion.is_closed() { - return None; - } - match ( - req.lock_info.ty, - req.lock_info.segments_handled == req.lock_info.ptr.len(), - ) { - (LockType::Write, true) if self.write_available(&req.lock_info.handle_id) => { - self.writers.push(req.lock_info.handle_id.clone()); - req.process(locks_on_lease) - } - (LockType::DeepRead, true) if self.read_available(&req.lock_info.handle_id) => { - self.readers.push(req.lock_info.handle_id.clone()); - req.process(locks_on_lease) - } - (LockType::Write, false) if self.write_parent_available(&req.lock_info.handle_id) => { - self.writer_parents.push(req.lock_info.handle_id.clone()); - req.process(locks_on_lease) - } - (LockType::DeepRead, false) | (LockType::ShallowRead, _) - if self.read_parent_available(&req.lock_info.handle_id) => - { - self.reader_parents.push(req.lock_info.handle_id.clone()); - req.process(locks_on_lease) - } - _ => { - self.reqs.push(req); - None - } - } - } - fn release(&mut self, mut lock_info: LockInfo) -> Option { - match ( - lock_info.ty, - lock_info.segments_handled == lock_info.ptr.len(), - ) { - (LockType::Write, true) => { - if let Some(idx) = self - .writers - .iter() - .enumerate() - .find(|(_, id)| id == &&lock_info.handle_id) - .map(|(idx, _)| idx) - { - self.writers.swap_remove(idx); +#[derive(Debug, Clone, PartialEq, Eq)] +enum LockState { + Free, + Shared { + e_lessees: BTreeMap, + r_lessees: BTreeMap, + }, + Exclusive { + w_lessee: HandleId, + w_session_count: Natural, // should never be 0 + r_session_count: usize, + e_session_count: usize, + }, +} +impl LockState { + fn erase(self, session: &HandleId) -> LockState { + match self { + LockState::Free => LockState::Free, + LockState::Shared { + mut e_lessees, + mut r_lessees, + } => { + e_lessees.remove(session); + r_lessees.remove(session); + if e_lessees.is_empty() && r_lessees.is_empty() { + LockState::Free + } else { + LockState::Shared { + e_lessees, + r_lessees, + } } } - (LockType::DeepRead, true) => { - if let Some(idx) = self - .readers - .iter() - .enumerate() - .find(|(_, id)| id == &&lock_info.handle_id) - .map(|(idx, _)| idx) - { - self.readers.swap_remove(idx); - } - } - (LockType::Write, false) => { - if let Some(idx) = self - .writer_parents - .iter() - .enumerate() - .find(|(_, id)| id == &&lock_info.handle_id) - .map(|(idx, _)| idx) - { - self.writer_parents.swap_remove(idx); - } - } - (LockType::DeepRead, false) | (LockType::ShallowRead, _) => { - if let Some(idx) = self - .reader_parents - .iter() - .enumerate() - .find(|(_, id)| id == &&lock_info.handle_id) - .map(|(idx, _)| idx) - { - self.reader_parents.swap_remove(idx); + LockState::Exclusive { ref w_lessee, .. } => { + if w_lessee == session { + LockState::Free + } else { + self } } } - if lock_info.ptr.len() == lock_info.segments_handled { - None - } else { - lock_info.segments_handled += 1; - Some(lock_info) + } + fn write_free(&self) -> bool { + match self { + LockState::Exclusive { .. } => false, + _ => true, } } - fn cancel(&mut self, mut lock_info: LockInfo) -> Option { - let mut idx = 0; - while idx < self.reqs.len() { - if self.reqs[idx].completion.is_closed() && self.reqs[idx].lock_info == lock_info { - self.reqs.swap_remove(idx); - return None; - } else { - idx += 1; + fn read_free(&self) -> bool { + match self { + LockState::Exclusive { + r_session_count, .. + } => *r_session_count == 0, + LockState::Shared { r_lessees, .. } => r_lessees.is_empty(), + _ => true, + } + } + fn sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match self { + LockState::Free => OrdSet::new(), + LockState::Shared { + e_lessees, + r_lessees, + } => e_lessees.keys().chain(r_lessees.keys()).collect(), + LockState::Exclusive { w_lessee, .. } => ordset![w_lessee], + } + } + fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match self { + LockState::Free => OrdSet::new(), + LockState::Shared { e_lessees, .. } => e_lessees.keys().collect(), + LockState::Exclusive { + w_lessee, + e_session_count, + .. + } => { + if *e_session_count > 0 { + ordset![w_lessee] + } else { + OrdSet::new() + } } } - if lock_info.ptr.len() == lock_info.segments_handled { - None - } else { - lock_info.segments_handled += 1; - Some(lock_info) + } + fn read_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> { + match self { + LockState::Free => OrdSet::new(), + LockState::Shared { r_lessees, .. } => r_lessees.keys().collect(), + LockState::Exclusive { + w_lessee, + r_session_count, + .. + } => { + if *r_session_count > 0 { + ordset![w_lessee] + } else { + OrdSet::new() + } + } } } + fn write_session<'a>(&'a self) -> Option<&'a HandleId> { + match self { + LockState::Exclusive { w_lessee, .. } => Some(w_lessee), + _ => None, + } + } + // note this is not necessarily safe in the overall trie locking model + // this function will return true if the state changed as a result of the call + // if it returns false it technically means that the call was invalid and did not + // change the lock state at all + fn lock(&mut self, session: HandleId, typ: &LockType) -> bool { + match (&mut *self, typ) { + (LockState::Free, LockType::Exist) => { + *self = LockState::Shared { + e_lessees: [(session, Natural::one())].into(), + r_lessees: BTreeMap::new(), + }; + true + } + (LockState::Free, LockType::Read) => { + *self = LockState::Shared { + e_lessees: BTreeMap::new(), + r_lessees: [(session, Natural::one())].into(), + }; + true + } + (LockState::Free, LockType::Write) => { + *self = LockState::Exclusive { + w_lessee: session, + w_session_count: Natural::one(), + r_session_count: 0, + e_session_count: 0, + }; + true + } + (LockState::Shared { e_lessees, .. }, LockType::Exist) => { + match e_lessees.get_mut(&session) { + None => { + e_lessees.insert(session, Natural::one()); + } + Some(v) => v.inc(), + }; + true + } + (LockState::Shared { r_lessees, .. }, LockType::Read) => { + match r_lessees.get_mut(&session) { + None => { + r_lessees.insert(session, Natural::one()); + } + Some(v) => v.inc(), + } + true + } + ( + LockState::Shared { + e_lessees, + r_lessees, + }, + LockType::Write, + ) => { + for hdl in e_lessees.keys() { + if hdl != &session { + return false; + } + } + for hdl in r_lessees.keys() { + if hdl != &session { + return false; + } + } + *self = LockState::Exclusive { + r_session_count: r_lessees.remove(&session).map_or(0, |x| x.0), + e_session_count: e_lessees.remove(&session).map_or(0, |x| x.0), + w_lessee: session, + w_session_count: Natural::one(), + }; + true + } + ( + LockState::Exclusive { + w_lessee, + e_session_count, + .. + }, + LockType::Exist, + ) => { + if w_lessee != &session { + return false; + } + *e_session_count += 1; + true + } + ( + LockState::Exclusive { + w_lessee, + r_session_count, + .. + }, + LockType::Read, + ) => { + if w_lessee != &session { + return false; + } + *r_session_count += 1; + true + } + ( + LockState::Exclusive { + w_lessee, + w_session_count, + .. + }, + LockType::Write, + ) => { + if w_lessee != &session { + return false; + } + w_session_count.inc(); + true + } + } + } + + // there are many ways for this function to be called in an invalid way: Notably releasing locks that you never + // had to begin with. + fn unlock(&mut self, session: &HandleId, typ: &LockType) -> bool { + match (&mut *self, typ) { + (LockState::Free, _) => false, + (LockState::Shared { e_lessees, .. }, LockType::Exist) => { + match e_lessees.remove_entry(session) { + None => false, + Some((k, v)) => match v.dec() { + None => true, + Some(n) => { + e_lessees.insert(k, n); + true + } + }, + } + } + (LockState::Shared { r_lessees, .. }, LockType::Read) => { + match r_lessees.remove_entry(session) { + None => false, + Some((k, v)) => match v.dec() { + None => true, + Some(n) => { + r_lessees.insert(k, n); + true + } + }, + } + } + (LockState::Shared { .. }, LockType::Write) => false, + ( + LockState::Exclusive { + w_lessee, + e_session_count, + .. + }, + LockType::Exist, + ) => { + if w_lessee != session || *e_session_count == 0 { + return false; + } + *e_session_count -= 1; + true + } + ( + LockState::Exclusive { + w_lessee, + r_session_count, + .. + }, + LockType::Read, + ) => { + if w_lessee != session || *r_session_count == 0 { + return false; + } + *r_session_count -= 1; + true + } + ( + LockState::Exclusive { + w_lessee, + w_session_count, + r_session_count, + e_session_count, + }, + LockType::Write, + ) => { + if w_lessee != session { + return false; + } + match w_session_count.dec() { + None => { + let mut e_lessees = BTreeMap::new(); + if let Some(n) = Natural::of(*e_session_count) { + e_lessees.insert(session.clone(), n); + } + let mut r_lessees = BTreeMap::new(); + if let Some(n) = Natural::of(*r_session_count) { + r_lessees.insert(session.clone(), n); + } + *self = LockState::Shared { + e_lessees, + r_lessees, + }; + } + Some(n) => *w_session_count = n, + } + true + } + } + } +} +impl Default for LockState { + fn default() -> Self { + LockState::Free + } } #[derive(Debug, Clone, Default, PartialEq)] struct LockInfo { ptr: JsonPointer, - segments_handled: usize, ty: LockType, handle_id: HandleId, } impl LockInfo { - fn current_seg(&self) -> &str { - if self.segments_handled == 0 { - "" // root - } else { - self.ptr - .get_segment(self.segments_handled - 1) - .unwrap_or_default() - } - } - fn reset(mut self) -> Self { - self.segments_handled = 0; - self + fn conflicts_with(&self, other: &LockInfo) -> bool { + self.handle_id != other.handle_id + && match (self.ty, other.ty) { + (LockType::Exist, LockType::Exist) => false, + (LockType::Exist, LockType::Read) => false, + (LockType::Exist, LockType::Write) => self.ptr.starts_with(&other.ptr), + (LockType::Read, LockType::Exist) => false, + (LockType::Read, LockType::Read) => false, + (LockType::Read, LockType::Write) => { + self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) + } + (LockType::Write, LockType::Exist) => other.ptr.starts_with(&self.ptr), + (LockType::Write, LockType::Read) => { + self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) + } + (LockType::Write, LockType::Write) => { + self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr) + } + } } } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum LockType { - ShallowRead, - DeepRead, + Exist, + Read, Write, } impl Default for LockType { fn default() -> Self { - LockType::ShallowRead + LockType::Exist } } @@ -391,25 +805,16 @@ struct Request { completion: oneshot::Sender, } impl Request { - fn process(mut self, locks_on_lease: &mut Vec>) -> Option { - if self.lock_info.ptr.len() == self.lock_info.segments_handled { - self.complete(locks_on_lease); - None - } else { - self.lock_info.segments_handled += 1; - Some(self) - } - } - fn complete(self, locks_on_lease: &mut Vec>) { + fn complete(self) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); - locks_on_lease.push(receiver); if let Err(_) = self.completion.send(Guard { - lock_info: self.lock_info.reset(), + lock_info: self.lock_info, sender: Some(sender), }) { #[cfg(feature = "tracing")] tracing::warn!("Completion sent to closed channel.") } + receiver } } diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 1791005..eb424a8 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -71,7 +71,7 @@ where pub async fn get(&self, db: &mut Db, lock: bool) -> Result, Error> { if lock { - self.lock(db, LockType::DeepRead).await; + self.lock(db, LockType::Read).await; } Ok(ModelData(db.get(&self.ptr).await?)) } @@ -238,7 +238,7 @@ impl Deserialize<'de>> OptionModel { lock: bool, ) -> Result>, Error> { if lock { - self.lock(db, LockType::DeepRead).await; + self.lock(db, LockType::Read).await; } Ok(ModelData(db.get(self.0.as_ref()).await?)) } @@ -259,8 +259,7 @@ impl Deserialize<'de>> OptionModel { pub async fn exists(&self, db: &mut Db, lock: bool) -> Result { if lock { - db.lock(self.0.as_ref().clone(), LockType::ShallowRead) - .await; + db.lock(self.0.as_ref().clone(), LockType::Exist).await; } Ok(db.exists(&self.as_ref(), None).await?) } @@ -491,7 +490,7 @@ where lock: bool, ) -> Result, Error> { if lock { - db.lock(self.as_ref().clone(), LockType::ShallowRead).await; + db.lock(self.as_ref().clone(), LockType::Exist).await; } let set = db.keys(self.as_ref(), None).await?; Ok(set