mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
first attempt at fixing locking semantics to prevent livelocks
This commit is contained in:
committed by
Aiden McClelland
parent
678d7b3322
commit
449fbf66d4
2
json-ptr
2
json-ptr
Submodule json-ptr updated: 33a2af147e...a8ac596f06
@@ -19,6 +19,7 @@ trace = ["debug", "tracing-error"]
|
||||
async-trait = "0.1.42"
|
||||
fd-lock-rs = "0.1.3"
|
||||
futures = "0.3.8"
|
||||
im = "*"
|
||||
json-patch = { path = "../json-patch" }
|
||||
json-ptr = { path = "../json-ptr" }
|
||||
lazy_static = "1.4.0"
|
||||
|
||||
@@ -24,6 +24,16 @@ impl PartialEq for HandleId {
|
||||
}
|
||||
}
|
||||
impl Eq for HandleId {}
|
||||
impl PartialOrd for HandleId {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.id.partial_cmp(&other.id)
|
||||
}
|
||||
}
|
||||
impl Ord for HandleId {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
self.id.cmp(&other.id)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait DbHandle: Send + Sync {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
|
||||
use json_ptr::JsonPointer;
|
||||
use im::{ordset, OrdSet};
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use crate::handle::HandleId;
|
||||
@@ -23,21 +24,109 @@ impl Locker {
|
||||
let mut locks_on_lease = vec![dummy_recv];
|
||||
let (_dummy_send, dummy_recv) = oneshot::channel();
|
||||
let mut cancellations = vec![dummy_recv];
|
||||
|
||||
let mut request_queue = VecDeque::<(Request, OrdSet<HandleId>)>::new();
|
||||
while let Some(action) =
|
||||
get_action(&mut new_requests, &mut locks_on_lease, &mut cancellations).await
|
||||
{
|
||||
// to prevent starvation we privilege the front of the queue and only allow requests that
|
||||
// conflict with the request at the front to go through if they are requested by sessions that
|
||||
// are *currently blocking* the front of the queue
|
||||
fn process_new_req(
|
||||
hot_seat: Option<&(Request, OrdSet<HandleId>)>,
|
||||
req: Request,
|
||||
trie: &mut Trie,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>,
|
||||
) {
|
||||
match hot_seat {
|
||||
// hot seat conflicts and request session isn't in current blocking sessions
|
||||
// so we push it to the queue
|
||||
Some((hot_req, hot_blockers))
|
||||
if hot_req.lock_info.conflicts_with(&req.lock_info)
|
||||
&& !hot_blockers.contains(&req.lock_info.handle_id) =>
|
||||
{
|
||||
request_queue.push_back((req, OrdSet::new()))
|
||||
}
|
||||
// otherwise we try and service it immediately, only pushing to the queue if it fails
|
||||
_ => match trie.try_lock(&req.lock_info) {
|
||||
Ok(()) => {
|
||||
let lease = req.complete();
|
||||
locks_on_lease.push(lease);
|
||||
}
|
||||
Err(blocking_sessions) => {
|
||||
request_queue.push_back((req, blocking_sessions))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::trace!("Locker Action: {:#?}", action);
|
||||
match action {
|
||||
Action::HandleRequest(mut req) => {
|
||||
cancellations.extend(req.cancel.take());
|
||||
trie.handle_request(req, &mut locks_on_lease)
|
||||
let hot_seat = request_queue.pop_front();
|
||||
process_new_req(
|
||||
hot_seat.as_ref(),
|
||||
req,
|
||||
&mut trie,
|
||||
&mut locks_on_lease,
|
||||
&mut request_queue,
|
||||
);
|
||||
hot_seat.map_or((), |a| request_queue.push_front(a));
|
||||
}
|
||||
Action::HandleRelease(lock_info) => {
|
||||
trie.handle_release(lock_info, &mut locks_on_lease)
|
||||
// release actual lock
|
||||
if trie.try_unlock(&lock_info) {
|
||||
// try to pop off as many requests off the front of the queue as we can
|
||||
let mut hot_seat = None;
|
||||
while let Some((r, _)) = request_queue.pop_front() {
|
||||
match trie.try_lock(&r.lock_info) {
|
||||
Ok(()) => {
|
||||
let lease = r.complete();
|
||||
locks_on_lease.push(lease);
|
||||
}
|
||||
Err(new_blocking_sessions) => {
|
||||
// set the hot seat and proceed to step two
|
||||
hot_seat = Some((r, new_blocking_sessions));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// when we can no longer do so, try and service the rest of the queue with the new hot seat
|
||||
let old_request_queue = request_queue;
|
||||
request_queue = VecDeque::new();
|
||||
for (r, _) in old_request_queue {
|
||||
// we now want to process each request in the queue as if it was new
|
||||
process_new_req(
|
||||
hot_seat.as_ref(),
|
||||
r,
|
||||
&mut trie,
|
||||
&mut locks_on_lease,
|
||||
&mut request_queue,
|
||||
)
|
||||
}
|
||||
hot_seat.map_or((), |a| request_queue.push_front(a));
|
||||
}
|
||||
}
|
||||
Action::HandleCancel(lock_info) => {
|
||||
trie.handle_cancel(lock_info, &mut locks_on_lease)
|
||||
// trie.handle_cancel(lock_info, &mut locks_on_lease)
|
||||
let entry = request_queue
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, (r, _))| r.lock_info == lock_info);
|
||||
match entry {
|
||||
None => {
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::warn!(
|
||||
"Received cancellation for a lock not currently waiting: {}",
|
||||
lock_info.ptr
|
||||
);
|
||||
}
|
||||
Some((i, _)) => {
|
||||
request_queue.remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "tracing")]
|
||||
@@ -66,7 +155,6 @@ impl Locker {
|
||||
handle_id,
|
||||
ptr,
|
||||
ty: lock_type,
|
||||
segments_handled: 0,
|
||||
};
|
||||
let (send, recv) = oneshot::channel();
|
||||
let (cancel_send, cancel_recv) = oneshot::channel();
|
||||
@@ -132,255 +220,581 @@ async fn get_action(
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Trie {
|
||||
node: Node,
|
||||
state: LockState,
|
||||
children: BTreeMap<String, Trie>,
|
||||
}
|
||||
impl Trie {
|
||||
fn child_mut(&mut self, name: &str) -> &mut Self {
|
||||
if !self.children.contains_key(name) {
|
||||
self.children.insert(name.to_owned(), Trie::default());
|
||||
fn all<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
|
||||
f(&self.state) && self.children.values().all(|t| t.all(&f))
|
||||
}
|
||||
self.children.get_mut(name).unwrap()
|
||||
fn any<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
|
||||
f(&self.state) || self.children.values().any(|t| t.any(&f))
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
if let Some(req) = self.node.handle_request(req, locks_on_lease) {
|
||||
self.child_mut(req.lock_info.current_seg())
|
||||
.handle_request(req, locks_on_lease)
|
||||
fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool {
|
||||
self.all(|s| s.sessions().difference(ordset![session]).is_empty())
|
||||
}
|
||||
fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool {
|
||||
self.all(|s| match s.clone().erase(session) {
|
||||
LockState::Exclusive { .. } => false,
|
||||
_ => true,
|
||||
})
|
||||
}
|
||||
fn subtree_write_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match &self.state {
|
||||
LockState::Exclusive { w_lessee, .. } => ordset![w_lessee],
|
||||
_ => self
|
||||
.children
|
||||
.values()
|
||||
.map(|t| t.subtree_write_sessions())
|
||||
.fold(OrdSet::new(), OrdSet::union),
|
||||
}
|
||||
}
|
||||
fn handle_release(
|
||||
&mut self,
|
||||
lock_info: LockInfo,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
let release = self.node.release(lock_info);
|
||||
for req in std::mem::take(&mut self.node.reqs) {
|
||||
self.handle_request(req, locks_on_lease);
|
||||
fn subtree_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
let children = self
|
||||
.children
|
||||
.values()
|
||||
.map(Trie::subtree_sessions)
|
||||
.fold(OrdSet::new(), OrdSet::union);
|
||||
self.state.sessions().union(children)
|
||||
}
|
||||
if let Some(release) = release {
|
||||
self.child_mut(release.current_seg())
|
||||
.handle_release(release, locks_on_lease)
|
||||
fn ancestors_and_trie<'a, S: AsRef<str>, V: SegList>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> (Vec<&'a LockState>, Option<&'a Trie>) {
|
||||
match ptr.uncons() {
|
||||
None => (Vec::new(), Some(self)),
|
||||
Some((first, rest)) => match self.children.get(first) {
|
||||
None => (vec![&self.state], None),
|
||||
Some(t) => {
|
||||
let (mut v, t) = t.ancestors_and_trie(&rest);
|
||||
v.push(&self.state);
|
||||
(v, t)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
fn handle_cancel(
|
||||
&mut self,
|
||||
lock_info: LockInfo,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
let cancel = self.node.cancel(lock_info);
|
||||
for req in std::mem::take(&mut self.node.reqs) {
|
||||
self.handle_request(req, locks_on_lease);
|
||||
// no writes in ancestor set, no writes at node
|
||||
fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_write_free = v
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.map(|s| s.erase(session))
|
||||
.all(|s| s.write_free());
|
||||
ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free())
|
||||
}
|
||||
if let Some(cancel) = cancel {
|
||||
self.child_mut(cancel.current_seg())
|
||||
.handle_cancel(cancel, locks_on_lease)
|
||||
// no writes in ancestor set, no writes in subtree
|
||||
fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_write_free = v
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.map(|s| s.erase(session))
|
||||
.all(|s| s.write_free());
|
||||
ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session))
|
||||
}
|
||||
// no reads or writes in ancestor set, no locks in subtree
|
||||
fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_rw_free = v
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.map(|s| s.erase(session))
|
||||
.all(|s| s.write_free() && s.read_free());
|
||||
ancestor_rw_free && t.map_or(true, |t| t.subtree_is_lock_free_for(session))
|
||||
}
|
||||
// ancestors with writes and writes on the node
|
||||
fn session_blocking_exist<'a>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer,
|
||||
session: &HandleId,
|
||||
) -> Option<&'a HandleId> {
|
||||
let (v, _t) = self.ancestors_and_trie(ptr);
|
||||
v.into_iter().find_map(|s| s.write_session()).and_then(|h| {
|
||||
if h == session {
|
||||
None
|
||||
} else {
|
||||
Some(h)
|
||||
}
|
||||
})
|
||||
}
|
||||
// ancestors with writes, subtrees with writes
|
||||
fn sessions_blocking_read<'a>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer,
|
||||
session: &HandleId,
|
||||
) -> OrdSet<&'a HandleId> {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_writes = v
|
||||
.into_iter()
|
||||
.map(|s| s.write_session().into_iter().collect::<OrdSet<_>>())
|
||||
.fold(OrdSet::new(), OrdSet::union);
|
||||
let relevant_write_sessions = match t {
|
||||
None => ancestor_writes,
|
||||
Some(t) => ancestor_writes.union(t.subtree_write_sessions()),
|
||||
};
|
||||
relevant_write_sessions.without(session)
|
||||
}
|
||||
// ancestors with reads or writes, subtrees with anything
|
||||
fn sessions_blocking_write<'a>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer,
|
||||
session: &HandleId,
|
||||
) -> OrdSet<&'a HandleId> {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestors = v
|
||||
.into_iter()
|
||||
.map(|s| {
|
||||
s.read_sessions()
|
||||
.union(s.write_session().into_iter().collect())
|
||||
})
|
||||
.fold(OrdSet::new(), OrdSet::union);
|
||||
let subtree = t.map_or(OrdSet::new(), |t| t.subtree_sessions());
|
||||
ancestors.union(subtree).without(session)
|
||||
}
|
||||
|
||||
fn child_mut<S: AsRef<str>, V: SegList>(&mut self, ptr: &JsonPointer<S, V>) -> &mut Self {
|
||||
match ptr.uncons() {
|
||||
None => self,
|
||||
Some((first, rest)) => {
|
||||
if !self.children.contains_key(first) {
|
||||
self.children.insert(first.to_owned(), Trie::default());
|
||||
}
|
||||
self.children.get_mut(first).unwrap().child_mut(&rest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sessions_blocking_lock<'a>(&'a self, lock_info: &LockInfo) -> OrdSet<&'a HandleId> {
|
||||
match &lock_info.ty {
|
||||
LockType::Exist => self
|
||||
.session_blocking_exist(&lock_info.ptr, &lock_info.handle_id)
|
||||
.into_iter()
|
||||
.collect(),
|
||||
LockType::Read => self.sessions_blocking_read(&lock_info.ptr, &lock_info.handle_id),
|
||||
LockType::Write => self.sessions_blocking_write(&lock_info.ptr, &lock_info.handle_id),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_lock<'a>(&'a mut self, lock_info: &LockInfo) -> Result<(), OrdSet<HandleId>> {
|
||||
let blocking_sessions = self.sessions_blocking_lock(lock_info);
|
||||
if !blocking_sessions.is_empty() {
|
||||
Err(blocking_sessions.into_iter().cloned().collect())
|
||||
} else {
|
||||
drop(blocking_sessions);
|
||||
self.child_mut(&lock_info.ptr)
|
||||
.state
|
||||
.lock(lock_info.handle_id.clone(), &lock_info.ty);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn try_unlock(&mut self, lock_info: &LockInfo) -> bool {
|
||||
let t = self.child_mut(&lock_info.ptr);
|
||||
let success = t.state.unlock(&lock_info.handle_id, &lock_info.ty);
|
||||
if success {
|
||||
self.prune();
|
||||
}
|
||||
success
|
||||
}
|
||||
|
||||
fn prunable(&self) -> bool {
|
||||
self.children.is_empty() && self.state == LockState::Free
|
||||
}
|
||||
|
||||
fn prune(&mut self) {
|
||||
self.children.retain(|_, t| {
|
||||
t.prune();
|
||||
!t.prunable()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Node {
|
||||
reader_parents: Vec<HandleId>,
|
||||
readers: Vec<HandleId>,
|
||||
writer_parents: Vec<HandleId>,
|
||||
writers: Vec<HandleId>,
|
||||
reqs: Vec<Request>,
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
struct Natural(usize);
|
||||
impl Natural {
|
||||
fn one() -> Self {
|
||||
Natural(1)
|
||||
}
|
||||
fn of(n: usize) -> Option<Self> {
|
||||
if n == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(Natural(n))
|
||||
}
|
||||
}
|
||||
fn inc(&mut self) {
|
||||
self.0 += 1;
|
||||
}
|
||||
fn dec(mut self) -> Option<Natural> {
|
||||
if self.0 == 0 {
|
||||
None
|
||||
} else {
|
||||
self.0 -= 1;
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Node {
|
||||
// true: If there are any writer_parents, they are `id`.
|
||||
fn write_parent_free(&self, id: &HandleId) -> bool {
|
||||
self.writer_parents.is_empty() || (self.writer_parents.iter().find(|a| a != &id).is_none())
|
||||
}
|
||||
// true: If there are any writers, they are `id`.
|
||||
fn write_free(&self, id: &HandleId) -> bool {
|
||||
self.writers.is_empty() || (self.writers.iter().find(|a| a != &id).is_none())
|
||||
}
|
||||
// true: If there are any reader_parents, they are `id`.
|
||||
fn read_parent_free(&self, id: &HandleId) -> bool {
|
||||
self.reader_parents.is_empty() || (self.reader_parents.iter().find(|a| a != &id).is_none())
|
||||
}
|
||||
// true: If there are any readers, they are `id`.
|
||||
fn read_free(&self, id: &HandleId) -> bool {
|
||||
self.readers.is_empty() || (self.readers.iter().find(|a| a != &id).is_none())
|
||||
}
|
||||
// allow a lock to skip the queue if a lock is already held by the same handle
|
||||
fn can_jump_queue(&self, id: &HandleId) -> bool {
|
||||
self.writers.contains(&id)
|
||||
|| self.writer_parents.contains(&id)
|
||||
|| self.readers.contains(&id)
|
||||
|| self.reader_parents.contains(&id)
|
||||
}
|
||||
// `id` is capable of acquiring this node for the purpose of writing to a child
|
||||
fn write_parent_available(&self, id: &HandleId) -> bool {
|
||||
self.write_free(id)
|
||||
&& self.read_free(id)
|
||||
&& (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
// `id` is capable of acquiring this node for writing
|
||||
fn write_available(&self, id: &HandleId) -> bool {
|
||||
self.write_free(id)
|
||||
&& self.write_parent_free(id)
|
||||
&& self.read_free(id)
|
||||
&& self.read_parent_free(id)
|
||||
&& (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
fn read_parent_available(&self, id: &HandleId) -> bool {
|
||||
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
// `id` is capable of acquiring this node for reading
|
||||
fn read_available(&self, id: &HandleId) -> bool {
|
||||
self.write_free(id)
|
||||
&& self.write_parent_free(id)
|
||||
&& (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Option<Request> {
|
||||
if req.completion.is_closed() {
|
||||
return None;
|
||||
}
|
||||
match (
|
||||
req.lock_info.ty,
|
||||
req.lock_info.segments_handled == req.lock_info.ptr.len(),
|
||||
) {
|
||||
(LockType::Write, true) if self.write_available(&req.lock_info.handle_id) => {
|
||||
self.writers.push(req.lock_info.handle_id.clone());
|
||||
req.process(locks_on_lease)
|
||||
}
|
||||
(LockType::DeepRead, true) if self.read_available(&req.lock_info.handle_id) => {
|
||||
self.readers.push(req.lock_info.handle_id.clone());
|
||||
req.process(locks_on_lease)
|
||||
}
|
||||
(LockType::Write, false) if self.write_parent_available(&req.lock_info.handle_id) => {
|
||||
self.writer_parents.push(req.lock_info.handle_id.clone());
|
||||
req.process(locks_on_lease)
|
||||
}
|
||||
(LockType::DeepRead, false) | (LockType::ShallowRead, _)
|
||||
if self.read_parent_available(&req.lock_info.handle_id) =>
|
||||
{
|
||||
self.reader_parents.push(req.lock_info.handle_id.clone());
|
||||
req.process(locks_on_lease)
|
||||
}
|
||||
_ => {
|
||||
self.reqs.push(req);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
fn release(&mut self, mut lock_info: LockInfo) -> Option<LockInfo> {
|
||||
match (
|
||||
lock_info.ty,
|
||||
lock_info.segments_handled == lock_info.ptr.len(),
|
||||
) {
|
||||
(LockType::Write, true) => {
|
||||
if let Some(idx) = self
|
||||
.writers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, id)| id == &&lock_info.handle_id)
|
||||
.map(|(idx, _)| idx)
|
||||
{
|
||||
self.writers.swap_remove(idx);
|
||||
}
|
||||
}
|
||||
(LockType::DeepRead, true) => {
|
||||
if let Some(idx) = self
|
||||
.readers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, id)| id == &&lock_info.handle_id)
|
||||
.map(|(idx, _)| idx)
|
||||
{
|
||||
self.readers.swap_remove(idx);
|
||||
}
|
||||
}
|
||||
(LockType::Write, false) => {
|
||||
if let Some(idx) = self
|
||||
.writer_parents
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, id)| id == &&lock_info.handle_id)
|
||||
.map(|(idx, _)| idx)
|
||||
{
|
||||
self.writer_parents.swap_remove(idx);
|
||||
}
|
||||
}
|
||||
(LockType::DeepRead, false) | (LockType::ShallowRead, _) => {
|
||||
if let Some(idx) = self
|
||||
.reader_parents
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, id)| id == &&lock_info.handle_id)
|
||||
.map(|(idx, _)| idx)
|
||||
{
|
||||
self.reader_parents.swap_remove(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
if lock_info.ptr.len() == lock_info.segments_handled {
|
||||
None
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
enum LockState {
|
||||
Free,
|
||||
Shared {
|
||||
e_lessees: BTreeMap<HandleId, Natural>,
|
||||
r_lessees: BTreeMap<HandleId, Natural>,
|
||||
},
|
||||
Exclusive {
|
||||
w_lessee: HandleId,
|
||||
w_session_count: Natural, // should never be 0
|
||||
r_session_count: usize,
|
||||
e_session_count: usize,
|
||||
},
|
||||
}
|
||||
impl LockState {
|
||||
fn erase(self, session: &HandleId) -> LockState {
|
||||
match self {
|
||||
LockState::Free => LockState::Free,
|
||||
LockState::Shared {
|
||||
mut e_lessees,
|
||||
mut r_lessees,
|
||||
} => {
|
||||
e_lessees.remove(session);
|
||||
r_lessees.remove(session);
|
||||
if e_lessees.is_empty() && r_lessees.is_empty() {
|
||||
LockState::Free
|
||||
} else {
|
||||
lock_info.segments_handled += 1;
|
||||
Some(lock_info)
|
||||
LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
}
|
||||
}
|
||||
fn cancel(&mut self, mut lock_info: LockInfo) -> Option<LockInfo> {
|
||||
let mut idx = 0;
|
||||
while idx < self.reqs.len() {
|
||||
if self.reqs[idx].completion.is_closed() && self.reqs[idx].lock_info == lock_info {
|
||||
self.reqs.swap_remove(idx);
|
||||
return None;
|
||||
}
|
||||
LockState::Exclusive { ref w_lessee, .. } => {
|
||||
if w_lessee == session {
|
||||
LockState::Free
|
||||
} else {
|
||||
idx += 1;
|
||||
self
|
||||
}
|
||||
}
|
||||
if lock_info.ptr.len() == lock_info.segments_handled {
|
||||
None
|
||||
}
|
||||
}
|
||||
fn write_free(&self) -> bool {
|
||||
match self {
|
||||
LockState::Exclusive { .. } => false,
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
fn read_free(&self) -> bool {
|
||||
match self {
|
||||
LockState::Exclusive {
|
||||
r_session_count, ..
|
||||
} => *r_session_count == 0,
|
||||
LockState::Shared { r_lessees, .. } => r_lessees.is_empty(),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
fn sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Free => OrdSet::new(),
|
||||
LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
} => e_lessees.keys().chain(r_lessees.keys()).collect(),
|
||||
LockState::Exclusive { w_lessee, .. } => ordset![w_lessee],
|
||||
}
|
||||
}
|
||||
fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Free => OrdSet::new(),
|
||||
LockState::Shared { e_lessees, .. } => e_lessees.keys().collect(),
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
e_session_count,
|
||||
..
|
||||
} => {
|
||||
if *e_session_count > 0 {
|
||||
ordset![w_lessee]
|
||||
} else {
|
||||
lock_info.segments_handled += 1;
|
||||
Some(lock_info)
|
||||
OrdSet::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn read_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Free => OrdSet::new(),
|
||||
LockState::Shared { r_lessees, .. } => r_lessees.keys().collect(),
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
r_session_count,
|
||||
..
|
||||
} => {
|
||||
if *r_session_count > 0 {
|
||||
ordset![w_lessee]
|
||||
} else {
|
||||
OrdSet::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn write_session<'a>(&'a self) -> Option<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Exclusive { w_lessee, .. } => Some(w_lessee),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
// note this is not necessarily safe in the overall trie locking model
|
||||
// this function will return true if the state changed as a result of the call
|
||||
// if it returns false it technically means that the call was invalid and did not
|
||||
// change the lock state at all
|
||||
fn lock(&mut self, session: HandleId, typ: &LockType) -> bool {
|
||||
match (&mut *self, typ) {
|
||||
(LockState::Free, LockType::Exist) => {
|
||||
*self = LockState::Shared {
|
||||
e_lessees: [(session, Natural::one())].into(),
|
||||
r_lessees: BTreeMap::new(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Free, LockType::Read) => {
|
||||
*self = LockState::Shared {
|
||||
e_lessees: BTreeMap::new(),
|
||||
r_lessees: [(session, Natural::one())].into(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Free, LockType::Write) => {
|
||||
*self = LockState::Exclusive {
|
||||
w_lessee: session,
|
||||
w_session_count: Natural::one(),
|
||||
r_session_count: 0,
|
||||
e_session_count: 0,
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Shared { e_lessees, .. }, LockType::Exist) => {
|
||||
match e_lessees.get_mut(&session) {
|
||||
None => {
|
||||
e_lessees.insert(session, Natural::one());
|
||||
}
|
||||
Some(v) => v.inc(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Shared { r_lessees, .. }, LockType::Read) => {
|
||||
match r_lessees.get_mut(&session) {
|
||||
None => {
|
||||
r_lessees.insert(session, Natural::one());
|
||||
}
|
||||
Some(v) => v.inc(),
|
||||
}
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
},
|
||||
LockType::Write,
|
||||
) => {
|
||||
for hdl in e_lessees.keys() {
|
||||
if hdl != &session {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for hdl in r_lessees.keys() {
|
||||
if hdl != &session {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
*self = LockState::Exclusive {
|
||||
r_session_count: r_lessees.remove(&session).map_or(0, |x| x.0),
|
||||
e_session_count: e_lessees.remove(&session).map_or(0, |x| x.0),
|
||||
w_lessee: session,
|
||||
w_session_count: Natural::one(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
e_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Exist,
|
||||
) => {
|
||||
if w_lessee != &session {
|
||||
return false;
|
||||
}
|
||||
*e_session_count += 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
r_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Read,
|
||||
) => {
|
||||
if w_lessee != &session {
|
||||
return false;
|
||||
}
|
||||
*r_session_count += 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
w_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Write,
|
||||
) => {
|
||||
if w_lessee != &session {
|
||||
return false;
|
||||
}
|
||||
w_session_count.inc();
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// there are many ways for this function to be called in an invalid way: Notably releasing locks that you never
|
||||
// had to begin with.
|
||||
fn unlock(&mut self, session: &HandleId, typ: &LockType) -> bool {
|
||||
match (&mut *self, typ) {
|
||||
(LockState::Free, _) => false,
|
||||
(LockState::Shared { e_lessees, .. }, LockType::Exist) => {
|
||||
match e_lessees.remove_entry(session) {
|
||||
None => false,
|
||||
Some((k, v)) => match v.dec() {
|
||||
None => true,
|
||||
Some(n) => {
|
||||
e_lessees.insert(k, n);
|
||||
true
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
(LockState::Shared { r_lessees, .. }, LockType::Read) => {
|
||||
match r_lessees.remove_entry(session) {
|
||||
None => false,
|
||||
Some((k, v)) => match v.dec() {
|
||||
None => true,
|
||||
Some(n) => {
|
||||
r_lessees.insert(k, n);
|
||||
true
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
(LockState::Shared { .. }, LockType::Write) => false,
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
e_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Exist,
|
||||
) => {
|
||||
if w_lessee != session || *e_session_count == 0 {
|
||||
return false;
|
||||
}
|
||||
*e_session_count -= 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
r_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Read,
|
||||
) => {
|
||||
if w_lessee != session || *r_session_count == 0 {
|
||||
return false;
|
||||
}
|
||||
*r_session_count -= 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
w_session_count,
|
||||
r_session_count,
|
||||
e_session_count,
|
||||
},
|
||||
LockType::Write,
|
||||
) => {
|
||||
if w_lessee != session {
|
||||
return false;
|
||||
}
|
||||
match w_session_count.dec() {
|
||||
None => {
|
||||
let mut e_lessees = BTreeMap::new();
|
||||
if let Some(n) = Natural::of(*e_session_count) {
|
||||
e_lessees.insert(session.clone(), n);
|
||||
}
|
||||
let mut r_lessees = BTreeMap::new();
|
||||
if let Some(n) = Natural::of(*r_session_count) {
|
||||
r_lessees.insert(session.clone(), n);
|
||||
}
|
||||
*self = LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
};
|
||||
}
|
||||
Some(n) => *w_session_count = n,
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Default for LockState {
|
||||
fn default() -> Self {
|
||||
LockState::Free
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq)]
|
||||
struct LockInfo {
|
||||
ptr: JsonPointer,
|
||||
segments_handled: usize,
|
||||
ty: LockType,
|
||||
handle_id: HandleId,
|
||||
}
|
||||
impl LockInfo {
|
||||
fn current_seg(&self) -> &str {
|
||||
if self.segments_handled == 0 {
|
||||
"" // root
|
||||
} else {
|
||||
self.ptr
|
||||
.get_segment(self.segments_handled - 1)
|
||||
.unwrap_or_default()
|
||||
fn conflicts_with(&self, other: &LockInfo) -> bool {
|
||||
self.handle_id != other.handle_id
|
||||
&& match (self.ty, other.ty) {
|
||||
(LockType::Exist, LockType::Exist) => false,
|
||||
(LockType::Exist, LockType::Read) => false,
|
||||
(LockType::Exist, LockType::Write) => self.ptr.starts_with(&other.ptr),
|
||||
(LockType::Read, LockType::Exist) => false,
|
||||
(LockType::Read, LockType::Read) => false,
|
||||
(LockType::Read, LockType::Write) => {
|
||||
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
(LockType::Write, LockType::Exist) => other.ptr.starts_with(&self.ptr),
|
||||
(LockType::Write, LockType::Read) => {
|
||||
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
(LockType::Write, LockType::Write) => {
|
||||
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
}
|
||||
fn reset(mut self) -> Self {
|
||||
self.segments_handled = 0;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum LockType {
|
||||
ShallowRead,
|
||||
DeepRead,
|
||||
Exist,
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
impl Default for LockType {
|
||||
fn default() -> Self {
|
||||
LockType::ShallowRead
|
||||
LockType::Exist
|
||||
}
|
||||
}
|
||||
|
||||
@@ -391,25 +805,16 @@ struct Request {
|
||||
completion: oneshot::Sender<Guard>,
|
||||
}
|
||||
impl Request {
|
||||
fn process(mut self, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
|
||||
if self.lock_info.ptr.len() == self.lock_info.segments_handled {
|
||||
self.complete(locks_on_lease);
|
||||
None
|
||||
} else {
|
||||
self.lock_info.segments_handled += 1;
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
fn complete(self, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>) {
|
||||
fn complete(self) -> oneshot::Receiver<LockInfo> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
locks_on_lease.push(receiver);
|
||||
if let Err(_) = self.completion.send(Guard {
|
||||
lock_info: self.lock_info.reset(),
|
||||
lock_info: self.lock_info,
|
||||
sender: Some(sender),
|
||||
}) {
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::warn!("Completion sent to closed channel.")
|
||||
}
|
||||
receiver
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ where
|
||||
|
||||
pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> {
|
||||
if lock {
|
||||
self.lock(db, LockType::DeepRead).await;
|
||||
self.lock(db, LockType::Read).await;
|
||||
}
|
||||
Ok(ModelData(db.get(&self.ptr).await?))
|
||||
}
|
||||
@@ -238,7 +238,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
lock: bool,
|
||||
) -> Result<ModelData<Option<T>>, Error> {
|
||||
if lock {
|
||||
self.lock(db, LockType::DeepRead).await;
|
||||
self.lock(db, LockType::Read).await;
|
||||
}
|
||||
Ok(ModelData(db.get(self.0.as_ref()).await?))
|
||||
}
|
||||
@@ -259,8 +259,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
|
||||
pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> {
|
||||
if lock {
|
||||
db.lock(self.0.as_ref().clone(), LockType::ShallowRead)
|
||||
.await;
|
||||
db.lock(self.0.as_ref().clone(), LockType::Exist).await;
|
||||
}
|
||||
Ok(db.exists(&self.as_ref(), None).await?)
|
||||
}
|
||||
@@ -491,7 +490,7 @@ where
|
||||
lock: bool,
|
||||
) -> Result<BTreeSet<T::Key>, Error> {
|
||||
if lock {
|
||||
db.lock(self.as_ref().clone(), LockType::ShallowRead).await;
|
||||
db.lock(self.as_ref().clone(), LockType::Exist).await;
|
||||
}
|
||||
let set = db.keys(self.as_ref(), None).await?;
|
||||
Ok(set
|
||||
|
||||
Reference in New Issue
Block a user