From d5bd843e72027b2a2d389ed378e5bcf51846fd82 Mon Sep 17 00:00:00 2001 From: Aiden McClelland Date: Fri, 24 Sep 2021 14:20:45 -0600 Subject: [PATCH] address pr comments --- patch-db/src/handle.rs | 8 +- patch-db/src/locker.rs | 394 ++++++++++++++++++------------------ patch-db/src/store.rs | 6 +- patch-db/src/transaction.rs | 4 +- 4 files changed, 211 insertions(+), 201 deletions(-) diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index 4405bf5..c843962 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -13,7 +13,7 @@ use crate::{patch::DiffPatch, Error}; #[async_trait] pub trait DbHandle: Send + Sync { async fn begin<'a>(&'a mut self) -> Result, Error>; - fn id(&self) -> usize; + fn id(&self) -> u64; fn rebase(&mut self) -> Result<(), Error>; fn store(&self) -> Arc>; fn subscribe(&self) -> Receiver>; @@ -75,7 +75,7 @@ impl DbHandle for &mut Handle { sub, }) } - fn id(&self) -> usize { + fn id(&self) -> u64 { (**self).id() } fn rebase(&mut self) -> Result<(), Error> { @@ -148,7 +148,7 @@ impl DbHandle for &mut Handle { } pub struct PatchDbHandle { - pub(crate) id: usize, + pub(crate) id: u64, pub(crate) db: PatchDb, pub(crate) locks: Vec, } @@ -164,7 +164,7 @@ impl DbHandle for PatchDbHandle { updates: DiffPatch::default(), }) } - fn id(&self) -> usize { + fn id(&self) -> u64 { self.id } fn rebase(&mut self) -> Result<(), Error> { diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index 73b3a6d..d0ded00 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -3,187 +3,6 @@ use std::collections::{HashMap, HashSet}; use json_ptr::JsonPointer; use tokio::sync::{mpsc, oneshot}; -#[derive(Debug, Default)] -struct LockInfo { - ptr: JsonPointer, - segments_handled: usize, - write: bool, - handle_id: usize, -} -impl LockInfo { - fn write(&self) -> bool { - self.write && self.segments_handled == self.ptr.len() - } - fn current_seg(&self) -> Option<&str> { - if self.segments_handled == 0 { - Some("") // root - } else { - self.ptr.get_segment(self.segments_handled - 1) - } - } -} - -#[derive(Debug)] -struct Request { - lock_info: LockInfo, - completion: oneshot::Sender, -} -impl Request { - fn process(mut self, returned_locks: &mut Vec>) -> Option { - if self.lock_info.ptr.len() == self.lock_info.segments_handled { - let (sender, receiver) = oneshot::channel(); - returned_locks.push(receiver); - self.lock_info.segments_handled = 0; - let _ = self.completion.send(Guard { - lock_info: self.lock_info, - sender: Some(sender), - }); - None - } else { - self.lock_info.segments_handled += 1; - Some(self) - } - } -} - -#[derive(Debug)] -pub struct Guard { - lock_info: LockInfo, - sender: Option>, -} -impl Drop for Guard { - fn drop(&mut self) { - let _ = self - .sender - .take() - .unwrap() - .send(std::mem::take(&mut self.lock_info)); - } -} - -#[derive(Debug, Default)] -struct Node { - readers: Vec, - writers: HashSet, - reqs: Vec, -} -impl Node { - fn write_free(&self, id: usize) -> bool { - self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id)) - } - fn read_free(&self, id: usize) -> bool { - self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0) - } - // allow a lock to skip the queue if a lock is already held by the same handle - fn can_jump_queue(&self, id: usize) -> bool { - self.writers.contains(&id) || self.readers.contains(&id) - } - fn write_available(&self, id: usize) -> bool { - self.write_free(id) - && self.read_free(id) - && (self.reqs.is_empty() || self.can_jump_queue(id)) - } - fn read_available(&self, id: usize) -> bool { - self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) - } - fn handle_request( - &mut self, - req: Request, - returned_locks: &mut Vec>, - ) -> Option { - if req.lock_info.write() && self.write_available(req.lock_info.handle_id) { - self.writers.insert(req.lock_info.handle_id); - req.process(returned_locks) - } else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) { - self.readers.push(req.lock_info.handle_id); - req.process(returned_locks) - } else { - self.reqs.push(req); - None - } - } - fn handle_release( - &mut self, - mut lock_info: LockInfo, - returned_locks: &mut Vec>, - ) -> (Option, Vec) { - if lock_info.write() { - self.writers.remove(&lock_info.handle_id); - } else if let Some(idx) = self - .readers - .iter() - .enumerate() - .find(|(_, id)| id == &&lock_info.handle_id) - .map(|(idx, _)| idx) - { - self.readers.swap_remove(idx); - } - let new_reqs = self.process_queue(returned_locks); - if lock_info.ptr.len() == lock_info.segments_handled { - (None, new_reqs) - } else { - lock_info.segments_handled += 1; - (Some(lock_info), new_reqs) - } - } - fn process_queue( - &mut self, - returned_locks: &mut Vec>, - ) -> Vec { - let mut res = Vec::new(); - for req in std::mem::take(&mut self.reqs) { - if (req.lock_info.write() && self.write_available(req.lock_info.handle_id)) - || self.read_available(req.lock_info.handle_id) - { - if let Some(req) = self.handle_request(req, returned_locks) { - res.push(req); - } - } - } - res - } -} - -#[derive(Debug, Default)] -struct Trie { - node: Node, - children: HashMap, -} -impl Trie { - fn child_mut(&mut self, name: &str) -> &mut Self { - if !self.children.contains_key(name) { - self.children.insert(name.to_owned(), Trie::default()); - } - self.children.get_mut(name).unwrap() - } - fn handle_request( - &mut self, - req: Request, - returned_locks: &mut Vec>, - ) { - if let Some(req) = self.node.handle_request(req, returned_locks) { - if let Some(seg) = req.lock_info.current_seg() { - self.child_mut(seg).handle_request(req, returned_locks) - } - } - } - fn handle_release( - &mut self, - lock_info: LockInfo, - returned_locks: &mut Vec>, - ) { - let (release, reqs) = self.node.handle_release(lock_info, returned_locks); - for req in reqs { - self.handle_request(req, returned_locks); - } - if let Some(release) = release { - if let Some(seg) = release.current_seg() { - self.child_mut(seg).handle_release(release, returned_locks) - } - } - } -} - pub struct Locker { sender: mpsc::UnboundedSender, } @@ -196,15 +15,17 @@ impl Locker { closed: false, recv: receiver, }; - let (_non_empty_send, non_empty_recv) = oneshot::channel(); - let mut returned_locks = vec![non_empty_recv]; - while let Some(action) = get_action(&mut new_requests, &mut returned_locks).await { + // futures::future::select_all will panic if the list is empty + // instead we want it to block forever by adding a channel that will never recv + let (_dummy_send, dummy_recv) = oneshot::channel(); + let mut locks_on_lease = vec![dummy_recv]; + while let Some(action) = get_action(&mut new_requests, &mut locks_on_lease).await { #[cfg(feature = "log")] log::trace!("Locker Action: {:#?}", action); match action { - Action::HandleRequest(req) => trie.handle_request(req, &mut returned_locks), + Action::HandleRequest(req) => trie.handle_request(req, &mut locks_on_lease), Action::HandleRelease(lock_info) => { - trie.handle_release(lock_info, &mut returned_locks) + trie.handle_release(lock_info, &mut locks_on_lease) } } #[cfg(feature = "log")] @@ -213,7 +34,7 @@ impl Locker { }); Locker { sender } } - pub async fn lock(&self, handle_id: usize, ptr: JsonPointer, write: bool) -> Guard { + pub async fn lock(&self, handle_id: u64, ptr: JsonPointer, write: bool) -> Guard { let (send, recv) = oneshot::channel(); self.sender .send(Request { @@ -239,13 +60,12 @@ enum Action { HandleRequest(Request), HandleRelease(LockInfo), } - async fn get_action( new_requests: &mut RequestQueue, - returned_locks: &mut Vec>, + locks_on_lease: &mut Vec>, ) -> Option { loop { - if new_requests.closed && returned_locks.is_empty() { + if new_requests.closed && locks_on_lease.is_empty() { return None; } tokio::select! { @@ -256,10 +76,200 @@ async fn get_action( new_requests.closed = true; } } - (a, idx, _) = futures::future::select_all(returned_locks.iter_mut()) => { - returned_locks.swap_remove(idx); + (a, idx, _) = futures::future::select_all(locks_on_lease.iter_mut()) => { + locks_on_lease.swap_remove(idx); return Some(Action::HandleRelease(a.unwrap())) } } } } + +#[derive(Debug, Default)] +struct Trie { + node: Node, + children: HashMap, +} +impl Trie { + fn child_mut(&mut self, name: &str) -> &mut Self { + if !self.children.contains_key(name) { + self.children.insert(name.to_owned(), Trie::default()); + } + self.children.get_mut(name).unwrap() + } + fn handle_request( + &mut self, + req: Request, + locks_on_lease: &mut Vec>, + ) { + if let Some(req) = self.node.handle_request(req, locks_on_lease) { + self.child_mut(req.lock_info.current_seg()) + .handle_request(req, locks_on_lease) + } + } + fn handle_release( + &mut self, + lock_info: LockInfo, + locks_on_lease: &mut Vec>, + ) { + let (release, reqs) = self.node.handle_release(lock_info, locks_on_lease); + for req in reqs { + self.handle_request(req, locks_on_lease); + } + if let Some(release) = release { + self.child_mut(release.current_seg()) + .handle_release(release, locks_on_lease) + } + } +} + +#[derive(Debug, Default)] +struct Node { + readers: Vec, + writers: HashSet, + reqs: Vec, +} +impl Node { + // true: If there are any writers, it is `id`. + fn write_free(&self, id: u64) -> bool { + self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id)) + } + // true: If there are any readers, it is `id`. + fn read_free(&self, id: u64) -> bool { + self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0) + } + // allow a lock to skip the queue if a lock is already held by the same handle + fn can_jump_queue(&self, id: u64) -> bool { + self.writers.contains(&id) || self.readers.contains(&id) + } + // `id` is capable of acquiring this node for writing + fn write_available(&self, id: u64) -> bool { + self.write_free(id) + && self.read_free(id) + && (self.reqs.is_empty() || self.can_jump_queue(id)) + } + // `id` is capable of acquiring this node for reading + fn read_available(&self, id: u64) -> bool { + self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) + } + fn handle_request( + &mut self, + req: Request, + locks_on_lease: &mut Vec>, + ) -> Option { + if req.lock_info.write() && self.write_available(req.lock_info.handle_id) { + self.writers.insert(req.lock_info.handle_id); + req.process(locks_on_lease) + } else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) { + self.readers.push(req.lock_info.handle_id); + req.process(locks_on_lease) + } else { + self.reqs.push(req); + None + } + } + fn release(&mut self, mut lock_info: LockInfo) -> Option { + if lock_info.write() { + self.writers.remove(&lock_info.handle_id); + } else if let Some(idx) = self + .readers + .iter() + .enumerate() + .find(|(_, id)| id == &&lock_info.handle_id) + .map(|(idx, _)| idx) + { + self.readers.swap_remove(idx); + } + if lock_info.ptr.len() == lock_info.segments_handled { + None + } else { + lock_info.segments_handled += 1; + Some(lock_info) + } + } + fn handle_release( + &mut self, + lock_info: LockInfo, + locks_on_lease: &mut Vec>, + ) -> (Option, Vec) { + (self.release(lock_info), self.process_queue(locks_on_lease)) + } + fn process_queue( + &mut self, + locks_on_lease: &mut Vec>, + ) -> Vec { + let mut res = Vec::new(); + for req in std::mem::take(&mut self.reqs) { + if (req.lock_info.write() && self.write_available(req.lock_info.handle_id)) + || self.read_available(req.lock_info.handle_id) + { + if let Some(req) = self.handle_request(req, locks_on_lease) { + res.push(req); + } + } + } + res + } +} + +#[derive(Debug, Default)] +struct LockInfo { + ptr: JsonPointer, + segments_handled: usize, + write: bool, + handle_id: u64, +} +impl LockInfo { + fn write(&self) -> bool { + self.write && self.segments_handled == self.ptr.len() + } + fn current_seg(&self) -> &str { + if self.segments_handled == 0 { + "" // root + } else { + self.ptr + .get_segment(self.segments_handled - 1) + .unwrap_or_default() + } + } + fn reset(mut self) -> Self { + self.segments_handled = 0; + self + } +} + +#[derive(Debug)] +struct Request { + lock_info: LockInfo, + completion: oneshot::Sender, +} +impl Request { + fn process(mut self, locks_on_lease: &mut Vec>) -> Option { + if self.lock_info.ptr.len() == self.lock_info.segments_handled { + let (sender, receiver) = oneshot::channel(); + locks_on_lease.push(receiver); + let _ = self.completion.send(Guard { + lock_info: self.lock_info.reset(), + sender: Some(sender), + }); + None + } else { + self.lock_info.segments_handled += 1; + Some(self) + } + } +} + +#[derive(Debug)] +pub struct Guard { + lock_info: LockInfo, + sender: Option>, +} +impl Drop for Guard { + fn drop(&mut self) { + let _ = self + .sender + .take() + .unwrap() + .send(std::mem::take(&mut self.lock_info)); + } +} diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 971bb2c..b96dede 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::fs::OpenOptions; use std::io::Error as IOError; use std::path::{Path, PathBuf}; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use fd_lock_rs::FdLock; @@ -210,7 +210,7 @@ pub struct PatchDb { pub(crate) store: Arc>, subscriber: Arc>>, pub(crate) locker: Arc, - handle_id: Arc, + handle_id: Arc, } impl PatchDb { pub async fn open>(path: P) -> Result { @@ -220,7 +220,7 @@ impl PatchDb { store: Arc::new(RwLock::new(Store::open(path).await?)), locker: Arc::new(Locker::new()), subscriber: Arc::new(subscriber), - handle_id: Arc::new(AtomicUsize::new(0)), + handle_id: Arc::new(AtomicU64::new(0)), }) } pub async fn dump(&self) -> Dump { diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index 80685af..5035c8d 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -21,7 +21,7 @@ use crate::{ }; pub struct Transaction { - pub(crate) id: usize, + pub(crate) id: u64, pub(crate) parent: Parent, pub(crate) locks: Vec, pub(crate) updates: DiffPatch, @@ -79,7 +79,7 @@ impl DbHandle for Transaction { sub, }) } - fn id(&self) -> usize { + fn id(&self) -> u64 { self.id } fn rebase(&mut self) -> Result<(), Error> {