mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
fixes issue where locker actions may be reordered if they fire at close enough times
This commit is contained in:
committed by
Aiden McClelland
parent
28966308bc
commit
21fed0c0be
@@ -38,7 +38,7 @@ impl ActionMux {
|
||||
_dummy_senders: vec![unlock_dummy_send, cancel_dummy_send],
|
||||
}
|
||||
}
|
||||
pub async fn get_action(&mut self) -> Option<Action> {
|
||||
async fn get_action(&mut self) -> Option<Action> {
|
||||
loop {
|
||||
if self.inbound_request_queue.closed
|
||||
&& self.unlock_receivers.len() == 1
|
||||
@@ -68,6 +68,35 @@ impl ActionMux {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_prioritized_action_queue(&mut self) -> Vec<Action> {
|
||||
if let Some(action) = self.get_action().await {
|
||||
let mut actions = Vec::new();
|
||||
// find all serviceable lock releases
|
||||
for mut r in std::mem::take(&mut self.unlock_receivers) {
|
||||
if let Ok(lock_info) = r.try_recv() {
|
||||
actions.push(Action::HandleRelease(lock_info));
|
||||
} else {
|
||||
self.unlock_receivers.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
// find all serviceable lock cancellations
|
||||
for mut r in std::mem::take(&mut self.cancellation_receivers) {
|
||||
if let Ok(lock_info) = r.try_recv() {
|
||||
actions.push(Action::HandleCancel(lock_info));
|
||||
} else {
|
||||
self.cancellation_receivers.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
// finally add the action that started it all
|
||||
actions.push(action);
|
||||
actions
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_unlock_receivers<T: IntoIterator<Item = oneshot::Receiver<LockInfo>>>(
|
||||
&mut self,
|
||||
recv: T,
|
||||
|
||||
@@ -28,36 +28,45 @@ impl Locker {
|
||||
let mut action_mux = ActionMux::new(receiver);
|
||||
let mut lock_server = LockBookkeeper::new();
|
||||
|
||||
while let Some(action) = action_mux.get_action().await {
|
||||
#[cfg(feature = "tracing")]
|
||||
trace!("Locker Action: {:#?}", action);
|
||||
match action {
|
||||
Action::HandleRequest(mut req) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New lock request: {}", &req.lock_info);
|
||||
loop {
|
||||
let actions = action_mux.get_prioritized_action_queue().await;
|
||||
if actions.is_empty() {
|
||||
break;
|
||||
}
|
||||
for action in actions {
|
||||
#[cfg(feature = "tracing")]
|
||||
trace!("Locker Action: {:#?}", action);
|
||||
match action {
|
||||
Action::HandleRequest(mut req) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New lock request: {}", &req.lock_info);
|
||||
|
||||
// Pertinent Logic
|
||||
let req_cancel = req.cancel.take().expect("Request Cancellation Stolen");
|
||||
match lock_server.lease(req) {
|
||||
Ok(Some(recv)) => {
|
||||
action_mux.push_unlock_receivers(std::iter::once(recv))
|
||||
// Pertinent Logic
|
||||
let req_cancel =
|
||||
req.cancel.take().expect("Request Cancellation Stolen");
|
||||
match lock_server.lease(req) {
|
||||
Ok(Some(recv)) => {
|
||||
action_mux.push_unlock_receivers(std::iter::once(recv))
|
||||
}
|
||||
Ok(None) => action_mux.push_cancellation_receiver(req_cancel),
|
||||
Err(_) => {}
|
||||
}
|
||||
Ok(None) => action_mux.push_cancellation_receiver(req_cancel),
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
Action::HandleRelease(lock_info) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New lock release: {}", &lock_info);
|
||||
Action::HandleRelease(lock_info) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New lock release: {}", &lock_info);
|
||||
|
||||
let new_unlock_receivers = lock_server.ret(&lock_info);
|
||||
action_mux.push_unlock_receivers(new_unlock_receivers);
|
||||
}
|
||||
Action::HandleCancel(lock_info) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New request canceled: {}", &lock_info);
|
||||
println!("Release Called {}", &lock_info);
|
||||
|
||||
lock_server.cancel(&lock_info)
|
||||
let new_unlock_receivers = lock_server.ret(&lock_info);
|
||||
action_mux.push_unlock_receivers(new_unlock_receivers);
|
||||
}
|
||||
Action::HandleCancel(lock_info) => {
|
||||
#[cfg(feature = "tracing")]
|
||||
debug!("New request canceled: {}", &lock_info);
|
||||
|
||||
lock_server.cancel(&lock_info)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use super::{LockError, LockInfo};
|
||||
use crate::handle::HandleId;
|
||||
use crate::LockType;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub(super) struct LockOrderEnforcer {
|
||||
locks_held: OrdMap<HandleId, OrdMap<(JsonPointer, LockType), usize>>,
|
||||
}
|
||||
|
||||
@@ -289,6 +289,26 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn enforcer_lock_inverse_identity(lock_order in proptest::collection::vec(arb_lock_info(1,3), 1..30)) {
|
||||
use crate::locker::order_enforcer::LockOrderEnforcer;
|
||||
use rand::seq::SliceRandom;
|
||||
let mut enforcer = LockOrderEnforcer::new();
|
||||
for i in &lock_order {
|
||||
enforcer.try_insert(i);
|
||||
}
|
||||
let mut release_order = lock_order.clone();
|
||||
let slice: &mut [LockInfo] = &mut release_order[..];
|
||||
slice.shuffle(&mut rand::thread_rng());
|
||||
prop_assert!(enforcer != LockOrderEnforcer::new());
|
||||
for i in &release_order {
|
||||
enforcer.remove(i);
|
||||
}
|
||||
prop_assert_eq!(enforcer, LockOrderEnforcer::new());
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn existence_ancestors_dont_block_descendent_writes(s0 in arb_handle_id(10), s1 in arb_handle_id(10), mut ptr0 in arb_json_ptr(3), ptr1 in arb_json_ptr(3)) {
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde_json::Value;
|
||||
use tokio::fs;
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
use crate::{self as patch_db, DbHandle};
|
||||
use crate::{self as patch_db, DbHandle, LockType};
|
||||
|
||||
async fn init_db(db_name: String) -> PatchDb {
|
||||
cleanup_db(&db_name).await;
|
||||
@@ -101,3 +101,16 @@ pub struct Child {
|
||||
|
||||
#[derive(Debug, serde::Deserialize, serde::Serialize, HasModel)]
|
||||
pub struct NewType(Option<Box<Sample>>);
|
||||
|
||||
#[tokio::test]
|
||||
async fn locks_dropped_from_enforcer_on_tx_save() {
|
||||
let db = init_db("test.db".to_string()).await;
|
||||
let mut handle = db.handle();
|
||||
let mut tx = handle.begin().await.unwrap();
|
||||
let ptr_a: JsonPointer = "/a".parse().unwrap();
|
||||
let ptr_b: JsonPointer = "/b".parse().unwrap();
|
||||
tx.lock(ptr_b, LockType::Write).await.unwrap();
|
||||
tx.save().await.unwrap();
|
||||
handle.lock(ptr_a, LockType::Write).await.unwrap();
|
||||
cleanup_db("test.db").await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user