mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 10:21:53 +00:00
reorganize the entire locker
This commit is contained in:
committed by
Aiden McClelland
parent
766e45dc5a
commit
09697a3c5a
@@ -12,6 +12,7 @@ version = "0.1.0"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
[features]
|
||||
default = ["trace", "unstable"]
|
||||
debug = ["tracing"]
|
||||
trace = ["debug", "tracing-error"]
|
||||
unstable = []
|
||||
|
||||
@@ -15,8 +15,6 @@ mod patch;
|
||||
mod store;
|
||||
mod transaction;
|
||||
|
||||
#[cfg(test)]
|
||||
mod proptest;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
79
patch-db/src/locker/action_mux.rs
Normal file
79
patch-db/src/locker/action_mux.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedReceiver},
|
||||
oneshot,
|
||||
};
|
||||
|
||||
use super::{LockInfo, Request};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) enum Action {
|
||||
HandleRequest(Request),
|
||||
HandleRelease(LockInfo),
|
||||
HandleCancel(LockInfo),
|
||||
}
|
||||
|
||||
struct InboundRequestQueue {
|
||||
closed: bool,
|
||||
recv: mpsc::UnboundedReceiver<Request>,
|
||||
}
|
||||
pub(super) struct ActionMux {
|
||||
inbound_request_queue: InboundRequestQueue,
|
||||
unlock_receivers: Vec<oneshot::Receiver<LockInfo>>,
|
||||
cancellation_receivers: Vec<oneshot::Receiver<LockInfo>>,
|
||||
}
|
||||
impl ActionMux {
|
||||
pub fn new(inbound_receiver: UnboundedReceiver<Request>) -> Self {
|
||||
// futures::future::select_all will panic if the list is empty
|
||||
// instead we want it to block forever by adding a channel that will never recv
|
||||
let unlock_receivers = vec![oneshot::channel().1];
|
||||
let cancellation_receivers = vec![oneshot::channel().1];
|
||||
ActionMux {
|
||||
inbound_request_queue: InboundRequestQueue {
|
||||
recv: inbound_receiver,
|
||||
closed: false,
|
||||
},
|
||||
unlock_receivers,
|
||||
cancellation_receivers,
|
||||
}
|
||||
}
|
||||
pub async fn get_action(&mut self) -> Option<Action> {
|
||||
loop {
|
||||
if self.inbound_request_queue.closed
|
||||
&& self.unlock_receivers.len() == 1
|
||||
&& self.cancellation_receivers.len() == 1
|
||||
{
|
||||
return None;
|
||||
}
|
||||
tokio::select! {
|
||||
a = self.inbound_request_queue.recv.recv() => {
|
||||
if let Some(a) = a {
|
||||
return Some(Action::HandleRequest(a));
|
||||
} else {
|
||||
self.inbound_request_queue.closed = true;
|
||||
}
|
||||
}
|
||||
(a, idx, _) = futures::future::select_all(self.unlock_receivers.iter_mut()) => {
|
||||
self.unlock_receivers.swap_remove(idx);
|
||||
return Some(Action::HandleRelease(a.unwrap()))
|
||||
}
|
||||
(a, idx, _) = futures::future::select_all(self.cancellation_receivers.iter_mut()) => {
|
||||
self.cancellation_receivers.swap_remove(idx);
|
||||
if let Ok(a) = a {
|
||||
return Some(Action::HandleCancel(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_unlock_receivers<T: IntoIterator<Item = oneshot::Receiver<LockInfo>>>(
|
||||
&mut self,
|
||||
recv: T,
|
||||
) {
|
||||
self.unlock_receivers.extend(recv)
|
||||
}
|
||||
|
||||
pub fn push_cancellation_receiver(&mut self, recv: oneshot::Receiver<LockInfo>) {
|
||||
self.cancellation_receivers.push(recv)
|
||||
}
|
||||
}
|
||||
278
patch-db/src/locker/bookkeeper.rs
Normal file
278
patch-db/src/locker/bookkeeper.rs
Normal file
@@ -0,0 +1,278 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::{
|
||||
handle::HandleId,
|
||||
locker::{
|
||||
log_utils::{display_session_set, fmt_acquired, fmt_cancelled, fmt_deferred, fmt_released},
|
||||
LockSet,
|
||||
},
|
||||
};
|
||||
use imbl::{ordset, OrdMap, OrdSet};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use super::{order_enforcer::LockOrderEnforcer, trie::LockTrie, LockError, LockInfo, Request};
|
||||
|
||||
// solely responsible for managing the bookkeeping requirements of requests
|
||||
pub(super) struct LockBookkeeper {
|
||||
trie: LockTrie,
|
||||
deferred_request_queue: VecDeque<(Request, OrdSet<HandleId>)>,
|
||||
#[cfg(feature = "unstable")]
|
||||
order_enforcer: LockOrderEnforcer,
|
||||
}
|
||||
impl LockBookkeeper {
|
||||
pub fn new() -> Self {
|
||||
LockBookkeeper {
|
||||
trie: LockTrie::default(),
|
||||
deferred_request_queue: VecDeque::new(),
|
||||
#[cfg(feature = "unstable")]
|
||||
order_enforcer: LockOrderEnforcer::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lease(
|
||||
&mut self,
|
||||
req: Request,
|
||||
) -> Result<Option<oneshot::Receiver<LockInfo>>, LockError> {
|
||||
#[cfg(feature = "unstable")]
|
||||
if let Err(e) = self.order_enforcer.try_insert(&req.lock_info) {
|
||||
req.reject(e.clone());
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// In normal operation we start here
|
||||
let hot_seat = self.deferred_request_queue.pop_front();
|
||||
let res = process_new_req(
|
||||
req,
|
||||
hot_seat.as_ref(),
|
||||
&mut self.trie,
|
||||
&mut self.deferred_request_queue,
|
||||
);
|
||||
|
||||
if let Some(hot_seat) = hot_seat {
|
||||
self.deferred_request_queue.push_front(hot_seat);
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn cancel(&mut self, info: &LockInfo) {
|
||||
#[cfg(feature = "unstable")]
|
||||
self.order_enforcer.remove(&info);
|
||||
|
||||
let entry = self
|
||||
.deferred_request_queue
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, (r, _))| &r.lock_info == info);
|
||||
match entry {
|
||||
None => {
|
||||
#[cfg(feature = "tracing")]
|
||||
warn!(
|
||||
"Received cancellation for a lock not currently waiting: {}",
|
||||
info.ptr
|
||||
);
|
||||
}
|
||||
Some((i, (req, _))) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
info!("{}", fmt_cancelled(&req.lock_info));
|
||||
|
||||
self.deferred_request_queue.remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ret(&mut self, info: &LockInfo) -> Vec<oneshot::Receiver<LockInfo>> {
|
||||
#[cfg(feature = "unstable")]
|
||||
self.order_enforcer.remove(&info);
|
||||
self.trie.unlock(&info);
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
{
|
||||
info!("{}", fmt_released(&info));
|
||||
debug!("Reexamining request queue backlog...");
|
||||
}
|
||||
|
||||
// try to pop off as many requests off the front of the queue as we can
|
||||
let mut new_unlock_receivers = vec![];
|
||||
let mut hot_seat = None;
|
||||
while let Some((r, _)) = self.deferred_request_queue.pop_front() {
|
||||
match self.trie.try_lock(&r.lock_info) {
|
||||
Ok(()) => {
|
||||
let recv = r.complete();
|
||||
new_unlock_receivers.push(recv);
|
||||
}
|
||||
Err(new_blocking_sessions) => {
|
||||
// set the hot seat and proceed to step two
|
||||
hot_seat = Some((r, new_blocking_sessions));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// when we can no longer do so, try and service the rest of the queue with the new hot seat
|
||||
let old_request_queue = std::mem::take(&mut self.deferred_request_queue);
|
||||
for (r, _) in old_request_queue {
|
||||
// we now want to process each request in the queue as if it was new
|
||||
let res = process_new_req(
|
||||
r,
|
||||
hot_seat.as_ref(),
|
||||
&mut self.trie,
|
||||
&mut self.deferred_request_queue,
|
||||
);
|
||||
if let Some(recv) = res {
|
||||
new_unlock_receivers.push(recv);
|
||||
}
|
||||
}
|
||||
if let Some(hot_seat) = hot_seat {
|
||||
self.deferred_request_queue.push_front(hot_seat);
|
||||
}
|
||||
new_unlock_receivers
|
||||
}
|
||||
}
|
||||
|
||||
// to prevent starvation we privilege the front of the queue and only allow requests that
|
||||
// conflict with the request at the front to go through if they are requested by sessions that
|
||||
// are *currently blocking* the front of the queue
|
||||
fn process_new_req(
|
||||
req: Request,
|
||||
hot_seat: Option<&(Request, OrdSet<HandleId>)>,
|
||||
trie: &mut LockTrie,
|
||||
request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>,
|
||||
) -> Option<oneshot::Receiver<LockInfo>> {
|
||||
match hot_seat {
|
||||
// hot seat conflicts and request session isn't in current blocking sessions
|
||||
// so we push it to the queue
|
||||
Some((hot_req, hot_blockers))
|
||||
if hot_req.lock_info.conflicts_with(&req.lock_info)
|
||||
&& !hot_blockers.contains(&req.lock_info.handle_id) =>
|
||||
{
|
||||
#[cfg(feature = "tracing")]
|
||||
{
|
||||
info!("{}", fmt_deferred(&req.lock_info));
|
||||
debug!(
|
||||
"Must wait on hot seat request from session {}",
|
||||
&hot_req.lock_info.handle_id.id
|
||||
);
|
||||
}
|
||||
|
||||
request_queue.push_back((req, ordset![]));
|
||||
kill_deadlocked(request_queue, &*trie);
|
||||
None
|
||||
}
|
||||
// otherwise we try and service it immediately, only pushing to the queue if it fails
|
||||
_ => match trie.try_lock(&req.lock_info) {
|
||||
Ok(()) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
info!("{}", fmt_acquired(&req.lock_info));
|
||||
|
||||
Some(req.complete())
|
||||
}
|
||||
Err(blocking_sessions) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
{
|
||||
info!("{}", fmt_deferred(&req.lock_info));
|
||||
debug!(
|
||||
"Must wait on sessions {}",
|
||||
display_session_set(&blocking_sessions)
|
||||
)
|
||||
}
|
||||
|
||||
request_queue.push_back((req, blocking_sessions));
|
||||
kill_deadlocked(request_queue, &*trie);
|
||||
None
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn kill_deadlocked(request_queue: &mut VecDeque<(Request, OrdSet<HandleId>)>, trie: &LockTrie) {
|
||||
// TODO optimize this, it is unlikely that we are anywhere close to as efficient as we can be here.
|
||||
let deadlocked_reqs = deadlock_scan(request_queue);
|
||||
if !deadlocked_reqs.is_empty() {
|
||||
let locks_waiting = LockSet(
|
||||
deadlocked_reqs
|
||||
.iter()
|
||||
.map(|r| r.lock_info.clone())
|
||||
.collect(),
|
||||
);
|
||||
#[cfg(feature = "tracing")]
|
||||
error!("Deadlock Detected: {:?}", locks_waiting);
|
||||
let err = LockError::DeadlockDetected {
|
||||
locks_waiting,
|
||||
locks_held: LockSet(trie.subtree_lock_info()),
|
||||
};
|
||||
let mut indices_to_remove = Vec::with_capacity(deadlocked_reqs.len());
|
||||
for (i, (req, _)) in request_queue.iter().enumerate() {
|
||||
if deadlocked_reqs.iter().any(|r| std::ptr::eq(*r, req)) {
|
||||
indices_to_remove.push(i)
|
||||
}
|
||||
}
|
||||
let old = std::mem::take(request_queue);
|
||||
for (i, (r, s)) in old.into_iter().enumerate() {
|
||||
if indices_to_remove.contains(&i) {
|
||||
r.reject(err.clone())
|
||||
} else {
|
||||
request_queue.push_back((r, s))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn deadlock_scan<'a>(queue: &'a VecDeque<(Request, OrdSet<HandleId>)>) -> Vec<&'a Request> {
|
||||
let (wait_map, mut req_map) = queue
|
||||
.iter()
|
||||
.map(|(req, set)| ((&req.lock_info.handle_id, set, req)))
|
||||
.fold(
|
||||
(
|
||||
OrdMap::<&'a HandleId, &'a OrdSet<HandleId>>::new(),
|
||||
OrdMap::<&'a HandleId, &'a Request>::new(),
|
||||
),
|
||||
|(mut wmap, mut rmap), (id, wset, req)| {
|
||||
(
|
||||
{
|
||||
wmap.insert(id, wset);
|
||||
wmap
|
||||
},
|
||||
{
|
||||
rmap.insert(id, req);
|
||||
rmap
|
||||
},
|
||||
)
|
||||
},
|
||||
);
|
||||
fn path_to<'a>(
|
||||
graph: &OrdMap<&'a HandleId, &'a OrdSet<HandleId>>,
|
||||
root: &'a HandleId,
|
||||
node: &'a HandleId,
|
||||
) -> OrdSet<&'a HandleId> {
|
||||
if node == root {
|
||||
return ordset![root];
|
||||
}
|
||||
match graph.get(node) {
|
||||
None => ordset![],
|
||||
Some(s) => s
|
||||
.iter()
|
||||
.find_map(|h| Some(path_to(graph, root, h)).filter(|s| s.is_empty()))
|
||||
.map_or(ordset![], |mut s| {
|
||||
s.insert(node);
|
||||
s
|
||||
}),
|
||||
}
|
||||
}
|
||||
for (root, wait_set) in wait_map.iter() {
|
||||
let cycle = wait_set
|
||||
.iter()
|
||||
.find_map(|start| Some(path_to(&wait_map, root, start)).filter(|s| s.is_empty()));
|
||||
match cycle {
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
Some(c) => {
|
||||
return c
|
||||
.into_iter()
|
||||
.map(|id| req_map.remove(id).unwrap())
|
||||
.collect();
|
||||
}
|
||||
}
|
||||
}
|
||||
vec![]
|
||||
}
|
||||
46
patch-db/src/locker/log_utils.rs
Normal file
46
patch-db/src/locker/log_utils.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use super::LockInfo;
|
||||
use imbl::OrdSet;
|
||||
|
||||
use crate::handle::HandleId;
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub fn display_session_set(set: &OrdSet<HandleId>) -> String {
|
||||
use std::fmt::Write;
|
||||
|
||||
let mut display = String::from("{");
|
||||
for session in set.iter() {
|
||||
write!(display, "{},", session.id).unwrap();
|
||||
}
|
||||
display.replace_range(display.len() - 1.., "}");
|
||||
display
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub(super) fn fmt_acquired(lock_info: &LockInfo) -> String {
|
||||
format!(
|
||||
"Acquired: session {} - {} lock on {}",
|
||||
lock_info.handle_id.id, lock_info.ty, lock_info.ptr,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub(super) fn fmt_deferred(deferred_lock_info: &LockInfo) -> String {
|
||||
format!(
|
||||
"Deferred: session {} - {} lock on {}",
|
||||
deferred_lock_info.handle_id.id, deferred_lock_info.ty, deferred_lock_info.ptr,
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn fmt_released(released_lock_info: &LockInfo) -> String {
|
||||
format!(
|
||||
"Released: session {} - {} lock on {}",
|
||||
released_lock_info.handle_id.id, released_lock_info.ty, released_lock_info.ptr
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn fmt_cancelled(cancelled_lock_info: &LockInfo) -> String {
|
||||
format!(
|
||||
"Canceled: session {} - {} lock on {}",
|
||||
cancelled_lock_info.handle_id.id, cancelled_lock_info.ty, cancelled_lock_info.ptr
|
||||
)
|
||||
}
|
||||
301
patch-db/src/locker/mod.rs
Normal file
301
patch-db/src/locker/mod.rs
Normal file
@@ -0,0 +1,301 @@
|
||||
mod action_mux;
|
||||
mod bookkeeper;
|
||||
mod log_utils;
|
||||
mod natural;
|
||||
mod order_enforcer;
|
||||
mod trie;
|
||||
|
||||
use imbl::{ordmap, ordset, OrdMap, OrdSet};
|
||||
use json_ptr::JsonPointer;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
use crate::{handle::HandleId, locker::action_mux::Action};
|
||||
|
||||
use self::{action_mux::ActionMux, bookkeeper::LockBookkeeper};
|
||||
|
||||
pub struct Locker {
|
||||
sender: mpsc::UnboundedSender<Request>,
|
||||
}
|
||||
impl Locker {
|
||||
pub fn new() -> Self {
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
tokio::spawn(async move {
|
||||
let mut action_mux = ActionMux::new(receiver);
|
||||
let mut lock_server = LockBookkeeper::new();
|
||||
|
||||
while let Some(action) = action_mux.get_action().await {
|
||||
#[cfg(feature = "tracing")]
|
||||
trace!("Locker Action: {:#?}", action);
|
||||
match action {
|
||||
Action::HandleRequest(mut req) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New lock request: {}", &req.lock_info);
|
||||
|
||||
// Pertinent Logic
|
||||
let req_cancel = req.cancel.take().expect("Request Cancellation Stolen");
|
||||
match lock_server.lease(req) {
|
||||
Ok(Some(recv)) => {
|
||||
action_mux.push_unlock_receivers(std::iter::once(recv))
|
||||
}
|
||||
Ok(None) => action_mux.push_cancellation_receiver(req_cancel),
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
Action::HandleRelease(lock_info) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New lock release: {}", &lock_info);
|
||||
|
||||
let new_unlock_receivers = lock_server.ret(&lock_info);
|
||||
action_mux.push_unlock_receivers(new_unlock_receivers);
|
||||
}
|
||||
Action::HandleCancel(lock_info) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New request canceled: {}", &lock_info);
|
||||
|
||||
lock_server.cancel(&lock_info)
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Locker { sender }
|
||||
}
|
||||
pub async fn lock(
|
||||
&self,
|
||||
handle_id: HandleId,
|
||||
ptr: JsonPointer,
|
||||
lock_type: LockType,
|
||||
) -> Result<Guard, LockError> {
|
||||
// Local Definitions
|
||||
struct CancelGuard {
|
||||
lock_info: Option<LockInfo>,
|
||||
channel: Option<oneshot::Sender<LockInfo>>,
|
||||
recv: oneshot::Receiver<Result<Guard, LockError>>,
|
||||
}
|
||||
impl Drop for CancelGuard {
|
||||
fn drop(&mut self) {
|
||||
if let (Some(lock_info), Some(channel)) =
|
||||
(self.lock_info.take(), self.channel.take())
|
||||
{
|
||||
self.recv.close();
|
||||
let _ = channel.send(lock_info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Pertinent Logic
|
||||
let lock_info = LockInfo {
|
||||
handle_id,
|
||||
ptr,
|
||||
ty: lock_type,
|
||||
};
|
||||
let (send, recv) = oneshot::channel();
|
||||
let (cancel_send, cancel_recv) = oneshot::channel();
|
||||
let mut cancel_guard = CancelGuard {
|
||||
lock_info: Some(lock_info.clone()),
|
||||
channel: Some(cancel_send),
|
||||
recv,
|
||||
};
|
||||
self.sender
|
||||
.send(Request {
|
||||
lock_info,
|
||||
cancel: Some(cancel_recv),
|
||||
completion: send,
|
||||
})
|
||||
.unwrap();
|
||||
let res = (&mut cancel_guard.recv).await.unwrap();
|
||||
cancel_guard.channel.take();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
|
||||
struct LockInfo {
|
||||
handle_id: HandleId,
|
||||
ptr: JsonPointer,
|
||||
ty: LockType,
|
||||
}
|
||||
impl LockInfo {
|
||||
fn conflicts_with(&self, other: &LockInfo) -> bool {
|
||||
self.handle_id != other.handle_id
|
||||
&& match (self.ty, other.ty) {
|
||||
(LockType::Exist, LockType::Exist) => false,
|
||||
(LockType::Exist, LockType::Read) => false,
|
||||
(LockType::Exist, LockType::Write) => self.ptr.starts_with(&other.ptr),
|
||||
(LockType::Read, LockType::Exist) => false,
|
||||
(LockType::Read, LockType::Read) => false,
|
||||
(LockType::Read, LockType::Write) => {
|
||||
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
(LockType::Write, LockType::Exist) => other.ptr.starts_with(&self.ptr),
|
||||
(LockType::Write, LockType::Read) => {
|
||||
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
(LockType::Write, LockType::Write) => {
|
||||
self.ptr.starts_with(&other.ptr) || other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
}
|
||||
}
|
||||
fn implicitly_grants(&self, other: &LockInfo) -> bool {
|
||||
self.handle_id == other.handle_id
|
||||
&& match self.ty {
|
||||
LockType::Exist => other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr),
|
||||
LockType::Read => {
|
||||
// E's in the ancestry
|
||||
other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr)
|
||||
// nonexclusive locks in the subtree
|
||||
|| other.ty != LockType::Write && other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
LockType::Write => {
|
||||
// E's in the ancestry
|
||||
other.ty == LockType::Exist && self.ptr.starts_with(&other.ptr)
|
||||
// anything in the subtree
|
||||
|| other.ptr.starts_with(&self.ptr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl std::fmt::Display for LockInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}{}{}", self.handle_id.id, self.ty, self.ptr)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum LockType {
|
||||
Exist,
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
impl Default for LockType {
|
||||
fn default() -> Self {
|
||||
LockType::Exist
|
||||
}
|
||||
}
|
||||
impl std::fmt::Display for LockType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let show = match self {
|
||||
LockType::Exist => "E",
|
||||
LockType::Read => "R",
|
||||
LockType::Write => "W",
|
||||
};
|
||||
write!(f, "{}", show)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LockSet(OrdSet<LockInfo>);
|
||||
impl std::fmt::Display for LockSet {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let by_session = self
|
||||
.0
|
||||
.iter()
|
||||
.map(|i| (&i.handle_id, ordset![(&i.ptr, &i.ty)]))
|
||||
.fold(
|
||||
ordmap! {},
|
||||
|m: OrdMap<&HandleId, OrdSet<(&JsonPointer, &LockType)>>, (id, s)| {
|
||||
m.update_with(&id, s, OrdSet::union)
|
||||
},
|
||||
);
|
||||
let num_sessions = by_session.len();
|
||||
for (i, (session, set)) in by_session.into_iter().enumerate() {
|
||||
write!(f, "{}: {{ ", session.id)?;
|
||||
let num_entries = set.len();
|
||||
for (j, (ptr, ty)) in set.into_iter().enumerate() {
|
||||
write!(f, "{}{}", ty, ptr)?;
|
||||
if j == num_entries - 1 {
|
||||
write!(f, " }}")?;
|
||||
} else {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
}
|
||||
if i != num_sessions - 1 {
|
||||
write!(f, "\n")?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, thiserror::Error)]
|
||||
pub enum LockError {
|
||||
#[error("Lock Taxonomy Escalation: Session = {session:?}, First = {first}, Second = {second}")]
|
||||
LockTaxonomyEscalation {
|
||||
session: HandleId,
|
||||
first: JsonPointer,
|
||||
second: JsonPointer,
|
||||
},
|
||||
#[error("Lock Type Escalation: Session = {session:?}, Pointer = {ptr}, First = {first}, Second = {second}")]
|
||||
LockTypeEscalation {
|
||||
session: HandleId,
|
||||
ptr: JsonPointer,
|
||||
first: LockType,
|
||||
second: LockType,
|
||||
},
|
||||
#[error("Lock Type Escalation Implicit: Session = {session:?}, First = {first_ptr}:{first_type}, Second = {second_ptr}:{second_type}")]
|
||||
LockTypeEscalationImplicit {
|
||||
session: HandleId,
|
||||
first_ptr: JsonPointer,
|
||||
first_type: LockType,
|
||||
second_ptr: JsonPointer,
|
||||
second_type: LockType,
|
||||
},
|
||||
#[error(
|
||||
"Non-Canonical Lock Ordering: Session = {session:?}, First = {first}, Second = {second}"
|
||||
)]
|
||||
NonCanonicalOrdering {
|
||||
session: HandleId,
|
||||
first: JsonPointer,
|
||||
second: JsonPointer,
|
||||
},
|
||||
#[error("Deadlock Detected: Locks Held = {locks_held}, Locks Waiting = {locks_waiting}")]
|
||||
DeadlockDetected {
|
||||
locks_held: LockSet,
|
||||
locks_waiting: LockSet,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Request {
|
||||
lock_info: LockInfo,
|
||||
cancel: Option<oneshot::Receiver<LockInfo>>,
|
||||
completion: oneshot::Sender<Result<Guard, LockError>>,
|
||||
}
|
||||
impl Request {
|
||||
fn complete(self) -> oneshot::Receiver<LockInfo> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
if let Err(_) = self.completion.send(Ok(Guard {
|
||||
lock_info: self.lock_info,
|
||||
sender: Some(sender),
|
||||
})) {
|
||||
#[cfg(feature = "tracing")]
|
||||
warn!("Completion sent to closed channel.")
|
||||
}
|
||||
receiver
|
||||
}
|
||||
fn reject(self, err: LockError) {
|
||||
if let Err(_) = self.completion.send(Err(err)) {
|
||||
#[cfg(feature = "tracing")]
|
||||
warn!("Rejection sent to closed channel.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Guard {
|
||||
lock_info: LockInfo,
|
||||
sender: Option<oneshot::Sender<LockInfo>>,
|
||||
}
|
||||
impl Drop for Guard {
|
||||
fn drop(&mut self) {
|
||||
if let Err(_e) = self
|
||||
.sender
|
||||
.take()
|
||||
.unwrap()
|
||||
.send(std::mem::take(&mut self.lock_info))
|
||||
{
|
||||
#[cfg(feature = "tracing")]
|
||||
warn!("Failed to release lock: {:?}", _e)
|
||||
}
|
||||
}
|
||||
}
|
||||
28
patch-db/src/locker/natural.rs
Normal file
28
patch-db/src/locker/natural.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(super) struct Natural(usize);
|
||||
impl Natural {
|
||||
pub fn one() -> Self {
|
||||
Natural(1)
|
||||
}
|
||||
pub fn of(n: usize) -> Option<Self> {
|
||||
if n == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(Natural(n))
|
||||
}
|
||||
}
|
||||
pub fn inc(&mut self) {
|
||||
self.0 += 1;
|
||||
}
|
||||
pub fn dec(mut self) -> Option<Natural> {
|
||||
self.0 -= 1;
|
||||
if self.0 == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
pub fn into_usize(self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
143
patch-db/src/locker/order_enforcer.rs
Normal file
143
patch-db/src/locker/order_enforcer.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
use imbl::{ordmap, OrdMap};
|
||||
use json_ptr::JsonPointer;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{handle::HandleId, LockType};
|
||||
|
||||
use super::{LockError, LockInfo};
|
||||
|
||||
pub(super) struct LockOrderEnforcer {
|
||||
locks_held: OrdMap<HandleId, OrdMap<(JsonPointer, LockType), usize>>,
|
||||
}
|
||||
impl LockOrderEnforcer {
|
||||
pub fn new() -> Self {
|
||||
LockOrderEnforcer {
|
||||
locks_held: ordmap! {},
|
||||
}
|
||||
}
|
||||
// locks must be acquired in lexicographic order for the pointer, and reverse order for type
|
||||
fn validate(&self, req: &LockInfo) -> Result<(), LockError> {
|
||||
// the following notation is used to denote an example sequence that can cause deadlocks
|
||||
//
|
||||
// Individual Lock Requests
|
||||
// 1W/A/B
|
||||
// |||> Node whose lock is being acquired: /A/B (strings prefixed by slashes, indicating descent path)
|
||||
// ||> Type of Lock: W (E/R/W)
|
||||
// |> Session Number: 1 (any natural number)
|
||||
//
|
||||
// Sequences
|
||||
// LockRequest >> LockRequest
|
||||
match self.locks_held.get(&req.handle_id) {
|
||||
None => Ok(()),
|
||||
Some(m) => {
|
||||
// quick accept
|
||||
for (ptr, ty) in m.keys() {
|
||||
let tmp = LockInfo {
|
||||
ptr: ptr.clone(),
|
||||
ty: *ty,
|
||||
handle_id: req.handle_id.clone(),
|
||||
};
|
||||
if tmp.implicitly_grants(req) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
let err = m.keys().find_map(|(ptr, ty)| match ptr.cmp(&req.ptr) {
|
||||
std::cmp::Ordering::Less => {
|
||||
if req.ptr.starts_with(ptr)
|
||||
&& req.ty == LockType::Write
|
||||
&& *ty == LockType::Read
|
||||
{
|
||||
// 1R/A >> 2R/A >> 1W/A/A >> 2W/A/B
|
||||
Some(LockError::LockTypeEscalationImplicit {
|
||||
session: req.handle_id.clone(),
|
||||
first_ptr: ptr.clone(),
|
||||
first_type: *ty,
|
||||
second_ptr: req.ptr.clone(),
|
||||
second_type: req.ty,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
std::cmp::Ordering::Equal => {
|
||||
if req.ty > *ty {
|
||||
// 1R/A >> 2R/A >> 1W/A >> 1W/A
|
||||
Some(LockError::LockTypeEscalation {
|
||||
session: req.handle_id.clone(),
|
||||
ptr: ptr.clone(),
|
||||
first: *ty,
|
||||
second: req.ty,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
std::cmp::Ordering::Greater => Some(if ptr.starts_with(&req.ptr) {
|
||||
// 1W/A/A >> 2W/A/B >> 1R/A >> 2R/A
|
||||
LockError::LockTaxonomyEscalation {
|
||||
session: req.handle_id.clone(),
|
||||
first: ptr.clone(),
|
||||
second: req.ptr.clone(),
|
||||
}
|
||||
} else {
|
||||
// 1W/A >> 2W/B >> 1W/B >> 2W/A
|
||||
LockError::NonCanonicalOrdering {
|
||||
session: req.handle_id.clone(),
|
||||
first: ptr.clone(),
|
||||
second: req.ptr.clone(),
|
||||
}
|
||||
}),
|
||||
});
|
||||
err.map_or(Ok(()), Err)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(super) fn try_insert(&mut self, req: &LockInfo) -> Result<(), LockError> {
|
||||
self.validate(req)?;
|
||||
match self.locks_held.get_mut(&req.handle_id) {
|
||||
None => {
|
||||
self.locks_held.insert(
|
||||
req.handle_id.clone(),
|
||||
ordmap![(req.ptr.clone(), req.ty) => 1],
|
||||
);
|
||||
}
|
||||
Some(locks) => {
|
||||
let k = (req.ptr.clone(), req.ty);
|
||||
match locks.get_mut(&k) {
|
||||
None => {
|
||||
locks.insert(k, 1);
|
||||
}
|
||||
Some(n) => {
|
||||
*n += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
pub(super) fn remove(&mut self, req: &LockInfo) {
|
||||
match self.locks_held.remove_with_key(&req.handle_id) {
|
||||
None => {
|
||||
#[cfg(feature = "tracing")]
|
||||
warn!("Invalid removal from session manager: {:?}", req);
|
||||
}
|
||||
Some((hdl, mut locks)) => {
|
||||
let k = (req.ptr.clone(), req.ty);
|
||||
match locks.remove_with_key(&k) {
|
||||
None => {
|
||||
#[cfg(feature = "tracing")]
|
||||
warn!("Invalid removal from session manager: {:?}", req);
|
||||
}
|
||||
Some((k, n)) => {
|
||||
if n - 1 > 0 {
|
||||
locks.insert(k, n - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
if !locks.is_empty() {
|
||||
self.locks_held.insert(hdl, locks);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
628
patch-db/src/locker/trie.rs
Normal file
628
patch-db/src/locker/trie.rs
Normal file
@@ -0,0 +1,628 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use imbl::{ordset, OrdSet};
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
|
||||
use crate::{handle::HandleId, LockType};
|
||||
|
||||
use super::{natural::Natural, LockInfo};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
enum LockState {
|
||||
Free,
|
||||
Shared {
|
||||
e_lessees: BTreeMap<HandleId, Natural>,
|
||||
r_lessees: BTreeMap<HandleId, Natural>,
|
||||
},
|
||||
Exclusive {
|
||||
w_lessee: HandleId,
|
||||
w_session_count: Natural, // should never be 0
|
||||
r_session_count: usize,
|
||||
e_session_count: usize,
|
||||
},
|
||||
}
|
||||
impl LockState {
|
||||
fn erase(self, session: &HandleId) -> LockState {
|
||||
match self {
|
||||
LockState::Free => LockState::Free,
|
||||
LockState::Shared {
|
||||
mut e_lessees,
|
||||
mut r_lessees,
|
||||
} => {
|
||||
e_lessees.remove(session);
|
||||
r_lessees.remove(session);
|
||||
if e_lessees.is_empty() && r_lessees.is_empty() {
|
||||
LockState::Free
|
||||
} else {
|
||||
LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
}
|
||||
}
|
||||
}
|
||||
LockState::Exclusive { ref w_lessee, .. } => {
|
||||
if w_lessee == session {
|
||||
LockState::Free
|
||||
} else {
|
||||
self
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn write_free(&self) -> bool {
|
||||
match self {
|
||||
LockState::Exclusive { .. } => false,
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
fn read_free(&self) -> bool {
|
||||
match self {
|
||||
LockState::Exclusive {
|
||||
r_session_count, ..
|
||||
} => *r_session_count == 0,
|
||||
LockState::Shared { r_lessees, .. } => r_lessees.is_empty(),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
fn sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Free => OrdSet::new(),
|
||||
LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
} => e_lessees.keys().chain(r_lessees.keys()).collect(),
|
||||
LockState::Exclusive { w_lessee, .. } => ordset![w_lessee],
|
||||
}
|
||||
}
|
||||
#[allow(dead_code)]
|
||||
fn exist_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Free => OrdSet::new(),
|
||||
LockState::Shared { e_lessees, .. } => e_lessees.keys().collect(),
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
e_session_count,
|
||||
..
|
||||
} => {
|
||||
if *e_session_count > 0 {
|
||||
ordset![w_lessee]
|
||||
} else {
|
||||
OrdSet::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn read_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Free => OrdSet::new(),
|
||||
LockState::Shared { r_lessees, .. } => r_lessees.keys().collect(),
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
r_session_count,
|
||||
..
|
||||
} => {
|
||||
if *r_session_count > 0 {
|
||||
ordset![w_lessee]
|
||||
} else {
|
||||
OrdSet::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn write_session<'a>(&'a self) -> Option<&'a HandleId> {
|
||||
match self {
|
||||
LockState::Exclusive { w_lessee, .. } => Some(w_lessee),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize(&mut self) {
|
||||
match &*self {
|
||||
LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
} if e_lessees.is_empty() && r_lessees.is_empty() => {
|
||||
*self = LockState::Free;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
// note this is not necessarily safe in the overall trie locking model
|
||||
// this function will return true if the state changed as a result of the call
|
||||
// if it returns false it technically means that the call was invalid and did not
|
||||
// change the lock state at all
|
||||
fn try_lock(&mut self, session: HandleId, typ: &LockType) -> bool {
|
||||
match (&mut *self, typ) {
|
||||
(LockState::Free, LockType::Exist) => {
|
||||
*self = LockState::Shared {
|
||||
e_lessees: [(session, Natural::one())].into(),
|
||||
r_lessees: BTreeMap::new(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Free, LockType::Read) => {
|
||||
*self = LockState::Shared {
|
||||
e_lessees: BTreeMap::new(),
|
||||
r_lessees: [(session, Natural::one())].into(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Free, LockType::Write) => {
|
||||
*self = LockState::Exclusive {
|
||||
w_lessee: session,
|
||||
w_session_count: Natural::one(),
|
||||
r_session_count: 0,
|
||||
e_session_count: 0,
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Shared { e_lessees, .. }, LockType::Exist) => {
|
||||
match e_lessees.get_mut(&session) {
|
||||
None => {
|
||||
e_lessees.insert(session, Natural::one());
|
||||
}
|
||||
Some(v) => v.inc(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(LockState::Shared { r_lessees, .. }, LockType::Read) => {
|
||||
match r_lessees.get_mut(&session) {
|
||||
None => {
|
||||
r_lessees.insert(session, Natural::one());
|
||||
}
|
||||
Some(v) => v.inc(),
|
||||
}
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
},
|
||||
LockType::Write,
|
||||
) => {
|
||||
for hdl in e_lessees.keys() {
|
||||
if hdl != &session {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for hdl in r_lessees.keys() {
|
||||
if hdl != &session {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
*self = LockState::Exclusive {
|
||||
r_session_count: r_lessees.remove(&session).map_or(0, Natural::into_usize),
|
||||
e_session_count: e_lessees.remove(&session).map_or(0, Natural::into_usize),
|
||||
w_lessee: session,
|
||||
w_session_count: Natural::one(),
|
||||
};
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
e_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Exist,
|
||||
) => {
|
||||
if w_lessee != &session {
|
||||
return false;
|
||||
}
|
||||
*e_session_count += 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
r_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Read,
|
||||
) => {
|
||||
if w_lessee != &session {
|
||||
return false;
|
||||
}
|
||||
*r_session_count += 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
w_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Write,
|
||||
) => {
|
||||
if w_lessee != &session {
|
||||
return false;
|
||||
}
|
||||
w_session_count.inc();
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// there are many ways for this function to be called in an invalid way: Notably releasing locks that you never
|
||||
// had to begin with.
|
||||
fn try_unlock(&mut self, session: &HandleId, typ: &LockType) -> bool {
|
||||
match (&mut *self, typ) {
|
||||
(LockState::Free, _) => false,
|
||||
(LockState::Shared { e_lessees, .. }, LockType::Exist) => {
|
||||
match e_lessees.remove_entry(session) {
|
||||
None => false,
|
||||
Some((k, v)) => {
|
||||
match v.dec() {
|
||||
None => {
|
||||
self.normalize();
|
||||
}
|
||||
Some(n) => {
|
||||
e_lessees.insert(k, n);
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
(LockState::Shared { r_lessees, .. }, LockType::Read) => {
|
||||
match r_lessees.remove_entry(session) {
|
||||
None => false,
|
||||
Some((k, v)) => {
|
||||
match v.dec() {
|
||||
None => {
|
||||
self.normalize();
|
||||
}
|
||||
Some(n) => {
|
||||
r_lessees.insert(k, n);
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
(LockState::Shared { .. }, LockType::Write) => false,
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
e_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Exist,
|
||||
) => {
|
||||
if w_lessee != session || *e_session_count == 0 {
|
||||
return false;
|
||||
}
|
||||
*e_session_count -= 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
r_session_count,
|
||||
..
|
||||
},
|
||||
LockType::Read,
|
||||
) => {
|
||||
if w_lessee != session || *r_session_count == 0 {
|
||||
return false;
|
||||
}
|
||||
*r_session_count -= 1;
|
||||
true
|
||||
}
|
||||
(
|
||||
LockState::Exclusive {
|
||||
w_lessee,
|
||||
w_session_count,
|
||||
r_session_count,
|
||||
e_session_count,
|
||||
},
|
||||
LockType::Write,
|
||||
) => {
|
||||
if w_lessee != session {
|
||||
return false;
|
||||
}
|
||||
match w_session_count.dec() {
|
||||
None => {
|
||||
let mut e_lessees = BTreeMap::new();
|
||||
if let Some(n) = Natural::of(*e_session_count) {
|
||||
e_lessees.insert(session.clone(), n);
|
||||
}
|
||||
let mut r_lessees = BTreeMap::new();
|
||||
if let Some(n) = Natural::of(*r_session_count) {
|
||||
r_lessees.insert(session.clone(), n);
|
||||
}
|
||||
*self = LockState::Shared {
|
||||
e_lessees,
|
||||
r_lessees,
|
||||
};
|
||||
self.normalize();
|
||||
}
|
||||
Some(n) => *w_session_count = n,
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Default for LockState {
|
||||
fn default() -> Self {
|
||||
LockState::Free
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
||||
pub(super) struct LockTrie {
|
||||
state: LockState,
|
||||
children: BTreeMap<String, LockTrie>,
|
||||
}
|
||||
impl LockTrie {
|
||||
#[allow(dead_code)]
|
||||
fn all<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
|
||||
f(&self.state) && self.children.values().all(|t| t.all(&f))
|
||||
}
|
||||
#[allow(dead_code)]
|
||||
fn any<F: Fn(&LockState) -> bool>(&self, f: F) -> bool {
|
||||
f(&self.state) || self.children.values().any(|t| t.any(&f))
|
||||
}
|
||||
#[allow(dead_code)]
|
||||
fn subtree_is_lock_free_for(&self, session: &HandleId) -> bool {
|
||||
self.all(|s| s.sessions().difference(ordset![session]).is_empty())
|
||||
}
|
||||
#[allow(dead_code)]
|
||||
fn subtree_is_exclusive_free_for(&self, session: &HandleId) -> bool {
|
||||
self.all(|s| match s.clone().erase(session) {
|
||||
LockState::Exclusive { .. } => false,
|
||||
_ => true,
|
||||
})
|
||||
}
|
||||
fn subtree_write_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
match &self.state {
|
||||
LockState::Exclusive { w_lessee, .. } => ordset![w_lessee],
|
||||
_ => self
|
||||
.children
|
||||
.values()
|
||||
.map(|t| t.subtree_write_sessions())
|
||||
.fold(OrdSet::new(), OrdSet::union),
|
||||
}
|
||||
}
|
||||
fn subtree_sessions<'a>(&'a self) -> OrdSet<&'a HandleId> {
|
||||
let children = self
|
||||
.children
|
||||
.values()
|
||||
.map(LockTrie::subtree_sessions)
|
||||
.fold(OrdSet::new(), OrdSet::union);
|
||||
self.state.sessions().union(children)
|
||||
}
|
||||
pub fn subtree_lock_info<'a>(&'a self) -> OrdSet<LockInfo> {
|
||||
let mut acc = self
|
||||
.children
|
||||
.iter()
|
||||
.map(|(s, t)| {
|
||||
t.subtree_lock_info()
|
||||
.into_iter()
|
||||
.map(|mut i| LockInfo {
|
||||
ty: i.ty,
|
||||
handle_id: i.handle_id,
|
||||
ptr: {
|
||||
i.ptr.push_start(s);
|
||||
i.ptr
|
||||
},
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.fold(ordset![], OrdSet::union);
|
||||
let self_writes = self.state.write_session().map(|session| LockInfo {
|
||||
handle_id: session.clone(),
|
||||
ptr: JsonPointer::default(),
|
||||
ty: LockType::Write,
|
||||
});
|
||||
let self_reads = self
|
||||
.state
|
||||
.read_sessions()
|
||||
.into_iter()
|
||||
.map(|session| LockInfo {
|
||||
handle_id: session.clone(),
|
||||
ptr: JsonPointer::default(),
|
||||
ty: LockType::Read,
|
||||
});
|
||||
let self_exists = self
|
||||
.state
|
||||
.exist_sessions()
|
||||
.into_iter()
|
||||
.map(|session| LockInfo {
|
||||
handle_id: session.clone(),
|
||||
ptr: JsonPointer::default(),
|
||||
ty: LockType::Exist,
|
||||
});
|
||||
acc.extend(self_writes.into_iter().chain(self_reads).chain(self_exists));
|
||||
acc
|
||||
}
|
||||
fn ancestors_and_trie<'a, S: AsRef<str>, V: SegList>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
) -> (Vec<&'a LockState>, Option<&'a LockTrie>) {
|
||||
match ptr.uncons() {
|
||||
None => (Vec::new(), Some(self)),
|
||||
Some((first, rest)) => match self.children.get(first) {
|
||||
None => (vec![&self.state], None),
|
||||
Some(t) => {
|
||||
let (mut v, t) = t.ancestors_and_trie(&rest);
|
||||
v.push(&self.state);
|
||||
(v, t)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// no writes in ancestor set, no writes at node
|
||||
#[allow(dead_code)]
|
||||
fn can_acquire_exist(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_write_free = v
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.map(|s| s.erase(session))
|
||||
.all(|s| s.write_free());
|
||||
ancestor_write_free && t.map_or(true, |t| t.state.clone().erase(session).write_free())
|
||||
}
|
||||
// no writes in ancestor set, no writes in subtree
|
||||
#[allow(dead_code)]
|
||||
fn can_acquire_read(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_write_free = v
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.map(|s| s.erase(session))
|
||||
.all(|s| s.write_free());
|
||||
ancestor_write_free && t.map_or(true, |t| t.subtree_is_exclusive_free_for(session))
|
||||
}
|
||||
// no reads or writes in ancestor set, no locks in subtree
|
||||
#[allow(dead_code)]
|
||||
fn can_acquire_write(&self, ptr: &JsonPointer, session: &HandleId) -> bool {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_rw_free = v
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.map(|s| s.erase(session))
|
||||
.all(|s| s.write_free() && s.read_free());
|
||||
ancestor_rw_free && t.map_or(true, |t| t.subtree_is_lock_free_for(session))
|
||||
}
|
||||
// ancestors with writes and writes on the node
|
||||
fn session_blocking_exist<'a>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer,
|
||||
session: &HandleId,
|
||||
) -> Option<&'a HandleId> {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
// there can only be one write session per traversal
|
||||
let ancestor_write = v.into_iter().find_map(|s| s.write_session());
|
||||
let node_write = t.and_then(|t| t.state.write_session());
|
||||
ancestor_write
|
||||
.or(node_write)
|
||||
.and_then(|s| if s == session { None } else { Some(s) })
|
||||
}
|
||||
// ancestors with writes, subtrees with writes
|
||||
fn sessions_blocking_read<'a>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer,
|
||||
session: &HandleId,
|
||||
) -> OrdSet<&'a HandleId> {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestor_writes = v
|
||||
.into_iter()
|
||||
.map(|s| s.write_session().into_iter().collect::<OrdSet<_>>())
|
||||
.fold(OrdSet::new(), OrdSet::union);
|
||||
let relevant_write_sessions = match t {
|
||||
None => ancestor_writes,
|
||||
Some(t) => ancestor_writes.union(t.subtree_write_sessions()),
|
||||
};
|
||||
relevant_write_sessions.without(session)
|
||||
}
|
||||
// ancestors with reads or writes, subtrees with anything
|
||||
fn sessions_blocking_write<'a>(
|
||||
&'a self,
|
||||
ptr: &JsonPointer,
|
||||
session: &HandleId,
|
||||
) -> OrdSet<&'a HandleId> {
|
||||
let (v, t) = self.ancestors_and_trie(ptr);
|
||||
let ancestors = v
|
||||
.into_iter()
|
||||
.map(|s| {
|
||||
s.read_sessions()
|
||||
.union(s.write_session().into_iter().collect())
|
||||
})
|
||||
.fold(OrdSet::new(), OrdSet::union);
|
||||
let subtree = t.map_or(OrdSet::new(), |t| t.subtree_sessions());
|
||||
ancestors.union(subtree).without(session)
|
||||
}
|
||||
|
||||
fn child_mut<S: AsRef<str>, V: SegList>(&mut self, ptr: &JsonPointer<S, V>) -> &mut Self {
|
||||
match ptr.uncons() {
|
||||
None => self,
|
||||
Some((first, rest)) => {
|
||||
if !self.children.contains_key(first) {
|
||||
self.children.insert(first.to_owned(), LockTrie::default());
|
||||
}
|
||||
self.children.get_mut(first).unwrap().child_mut(&rest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sessions_blocking_lock<'a>(&'a self, lock_info: &LockInfo) -> OrdSet<&'a HandleId> {
|
||||
match &lock_info.ty {
|
||||
LockType::Exist => self
|
||||
.session_blocking_exist(&lock_info.ptr, &lock_info.handle_id)
|
||||
.into_iter()
|
||||
.collect(),
|
||||
LockType::Read => self.sessions_blocking_read(&lock_info.ptr, &lock_info.handle_id),
|
||||
LockType::Write => self.sessions_blocking_write(&lock_info.ptr, &lock_info.handle_id),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_lock<'a>(&'a mut self, lock_info: &LockInfo) -> Result<(), OrdSet<HandleId>> {
|
||||
let blocking_sessions = self.sessions_blocking_lock(lock_info);
|
||||
if !blocking_sessions.is_empty() {
|
||||
Err(blocking_sessions.into_iter().cloned().collect())
|
||||
} else {
|
||||
drop(blocking_sessions);
|
||||
let success = self
|
||||
.child_mut(&lock_info.ptr)
|
||||
.state
|
||||
.try_lock(lock_info.handle_id.clone(), &lock_info.ty);
|
||||
assert!(success);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unlock(&mut self, lock_info: &LockInfo) {
|
||||
let t = self.child_mut(&lock_info.ptr);
|
||||
let success = t.state.try_unlock(&lock_info.handle_id, &lock_info.ty);
|
||||
assert!(success);
|
||||
self.prune();
|
||||
}
|
||||
|
||||
fn prunable(&self) -> bool {
|
||||
self.children.is_empty() && self.state == LockState::Free
|
||||
}
|
||||
|
||||
fn prune(&mut self) {
|
||||
self.children.retain(|_, t| {
|
||||
t.prune();
|
||||
!t.prunable()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod proptest {
|
||||
use super::*;
|
||||
use ::proptest::prelude::*;
|
||||
|
||||
fn lock_type_gen() -> BoxedStrategy<crate::LockType> {
|
||||
prop_oneof![
|
||||
Just(crate::LockType::Exist),
|
||||
Just(crate::LockType::Read),
|
||||
Just(crate::LockType::Write),
|
||||
]
|
||||
.boxed()
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn unlock_after_lock_is_identity(session in 0..10u64, typ in lock_type_gen()) {
|
||||
let mut orig = LockState::Free;
|
||||
orig.try_lock(HandleId{
|
||||
id: session,
|
||||
#[cfg(feature = "tracing")]
|
||||
trace: None
|
||||
}, &typ);
|
||||
orig.try_unlock(&HandleId{
|
||||
id: session,
|
||||
#[cfg(feature="tracing")]
|
||||
trace:None
|
||||
}, &typ);
|
||||
prop_assert_eq!(orig, LockState::Free);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user