address pr comments

This commit is contained in:
Aiden McClelland
2021-09-24 14:20:45 -06:00
committed by Aiden McClelland
parent 162438ab6b
commit 4e4d6fe31a
4 changed files with 211 additions and 201 deletions

View File

@@ -13,7 +13,7 @@ use crate::{patch::DiffPatch, Error};
#[async_trait]
pub trait DbHandle: Send + Sync {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
fn id(&self) -> usize;
fn id(&self) -> u64;
fn rebase(&mut self) -> Result<(), Error>;
fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>;
@@ -75,7 +75,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
sub,
})
}
fn id(&self) -> usize {
fn id(&self) -> u64 {
(**self).id()
}
fn rebase(&mut self) -> Result<(), Error> {
@@ -148,7 +148,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
}
pub struct PatchDbHandle {
pub(crate) id: usize,
pub(crate) id: u64,
pub(crate) db: PatchDb,
pub(crate) locks: Vec<Guard>,
}
@@ -164,7 +164,7 @@ impl DbHandle for PatchDbHandle {
updates: DiffPatch::default(),
})
}
fn id(&self) -> usize {
fn id(&self) -> u64 {
self.id
}
fn rebase(&mut self) -> Result<(), Error> {

View File

@@ -3,187 +3,6 @@ use std::collections::{HashMap, HashSet};
use json_ptr::JsonPointer;
use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Default)]
struct LockInfo {
ptr: JsonPointer,
segments_handled: usize,
write: bool,
handle_id: usize,
}
impl LockInfo {
fn write(&self) -> bool {
self.write && self.segments_handled == self.ptr.len()
}
fn current_seg(&self) -> Option<&str> {
if self.segments_handled == 0 {
Some("") // root
} else {
self.ptr.get_segment(self.segments_handled - 1)
}
}
}
#[derive(Debug)]
struct Request {
lock_info: LockInfo,
completion: oneshot::Sender<Guard>,
}
impl Request {
fn process(mut self, returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
if self.lock_info.ptr.len() == self.lock_info.segments_handled {
let (sender, receiver) = oneshot::channel();
returned_locks.push(receiver);
self.lock_info.segments_handled = 0;
let _ = self.completion.send(Guard {
lock_info: self.lock_info,
sender: Some(sender),
});
None
} else {
self.lock_info.segments_handled += 1;
Some(self)
}
}
}
#[derive(Debug)]
pub struct Guard {
lock_info: LockInfo,
sender: Option<oneshot::Sender<LockInfo>>,
}
impl Drop for Guard {
fn drop(&mut self) {
let _ = self
.sender
.take()
.unwrap()
.send(std::mem::take(&mut self.lock_info));
}
}
#[derive(Debug, Default)]
struct Node {
readers: Vec<usize>,
writers: HashSet<usize>,
reqs: Vec<Request>,
}
impl Node {
fn write_free(&self, id: usize) -> bool {
self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id))
}
fn read_free(&self, id: usize) -> bool {
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
}
// allow a lock to skip the queue if a lock is already held by the same handle
fn can_jump_queue(&self, id: usize) -> bool {
self.writers.contains(&id) || self.readers.contains(&id)
}
fn write_available(&self, id: usize) -> bool {
self.write_free(id)
&& self.read_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id))
}
fn read_available(&self, id: usize) -> bool {
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
}
fn handle_request(
&mut self,
req: Request,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Option<Request> {
if req.lock_info.write() && self.write_available(req.lock_info.handle_id) {
self.writers.insert(req.lock_info.handle_id);
req.process(returned_locks)
} else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) {
self.readers.push(req.lock_info.handle_id);
req.process(returned_locks)
} else {
self.reqs.push(req);
None
}
}
fn handle_release(
&mut self,
mut lock_info: LockInfo,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> (Option<LockInfo>, Vec<Request>) {
if lock_info.write() {
self.writers.remove(&lock_info.handle_id);
} else if let Some(idx) = self
.readers
.iter()
.enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
self.readers.swap_remove(idx);
}
let new_reqs = self.process_queue(returned_locks);
if lock_info.ptr.len() == lock_info.segments_handled {
(None, new_reqs)
} else {
lock_info.segments_handled += 1;
(Some(lock_info), new_reqs)
}
}
fn process_queue(
&mut self,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Vec<Request> {
let mut res = Vec::new();
for req in std::mem::take(&mut self.reqs) {
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|| self.read_available(req.lock_info.handle_id)
{
if let Some(req) = self.handle_request(req, returned_locks) {
res.push(req);
}
}
}
res
}
}
#[derive(Debug, Default)]
struct Trie {
node: Node,
children: HashMap<String, Trie>,
}
impl Trie {
fn child_mut(&mut self, name: &str) -> &mut Self {
if !self.children.contains_key(name) {
self.children.insert(name.to_owned(), Trie::default());
}
self.children.get_mut(name).unwrap()
}
fn handle_request(
&mut self,
req: Request,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) {
if let Some(req) = self.node.handle_request(req, returned_locks) {
if let Some(seg) = req.lock_info.current_seg() {
self.child_mut(seg).handle_request(req, returned_locks)
}
}
}
fn handle_release(
&mut self,
lock_info: LockInfo,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) {
let (release, reqs) = self.node.handle_release(lock_info, returned_locks);
for req in reqs {
self.handle_request(req, returned_locks);
}
if let Some(release) = release {
if let Some(seg) = release.current_seg() {
self.child_mut(seg).handle_release(release, returned_locks)
}
}
}
}
pub struct Locker {
sender: mpsc::UnboundedSender<Request>,
}
@@ -196,15 +15,17 @@ impl Locker {
closed: false,
recv: receiver,
};
let (_non_empty_send, non_empty_recv) = oneshot::channel();
let mut returned_locks = vec![non_empty_recv];
while let Some(action) = get_action(&mut new_requests, &mut returned_locks).await {
// futures::future::select_all will panic if the list is empty
// instead we want it to block forever by adding a channel that will never recv
let (_dummy_send, dummy_recv) = oneshot::channel();
let mut locks_on_lease = vec![dummy_recv];
while let Some(action) = get_action(&mut new_requests, &mut locks_on_lease).await {
#[cfg(feature = "log")]
log::trace!("Locker Action: {:#?}", action);
match action {
Action::HandleRequest(req) => trie.handle_request(req, &mut returned_locks),
Action::HandleRequest(req) => trie.handle_request(req, &mut locks_on_lease),
Action::HandleRelease(lock_info) => {
trie.handle_release(lock_info, &mut returned_locks)
trie.handle_release(lock_info, &mut locks_on_lease)
}
}
#[cfg(feature = "log")]
@@ -213,7 +34,7 @@ impl Locker {
});
Locker { sender }
}
pub async fn lock(&self, handle_id: usize, ptr: JsonPointer, write: bool) -> Guard {
pub async fn lock(&self, handle_id: u64, ptr: JsonPointer, write: bool) -> Guard {
let (send, recv) = oneshot::channel();
self.sender
.send(Request {
@@ -239,13 +60,12 @@ enum Action {
HandleRequest(Request),
HandleRelease(LockInfo),
}
async fn get_action(
new_requests: &mut RequestQueue,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Option<Action> {
loop {
if new_requests.closed && returned_locks.is_empty() {
if new_requests.closed && locks_on_lease.is_empty() {
return None;
}
tokio::select! {
@@ -256,10 +76,200 @@ async fn get_action(
new_requests.closed = true;
}
}
(a, idx, _) = futures::future::select_all(returned_locks.iter_mut()) => {
returned_locks.swap_remove(idx);
(a, idx, _) = futures::future::select_all(locks_on_lease.iter_mut()) => {
locks_on_lease.swap_remove(idx);
return Some(Action::HandleRelease(a.unwrap()))
}
}
}
}
#[derive(Debug, Default)]
struct Trie {
node: Node,
children: HashMap<String, Trie>,
}
impl Trie {
fn child_mut(&mut self, name: &str) -> &mut Self {
if !self.children.contains_key(name) {
self.children.insert(name.to_owned(), Trie::default());
}
self.children.get_mut(name).unwrap()
}
fn handle_request(
&mut self,
req: Request,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) {
if let Some(req) = self.node.handle_request(req, locks_on_lease) {
self.child_mut(req.lock_info.current_seg())
.handle_request(req, locks_on_lease)
}
}
fn handle_release(
&mut self,
lock_info: LockInfo,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) {
let (release, reqs) = self.node.handle_release(lock_info, locks_on_lease);
for req in reqs {
self.handle_request(req, locks_on_lease);
}
if let Some(release) = release {
self.child_mut(release.current_seg())
.handle_release(release, locks_on_lease)
}
}
}
#[derive(Debug, Default)]
struct Node {
readers: Vec<u64>,
writers: HashSet<u64>,
reqs: Vec<Request>,
}
impl Node {
// true: If there are any writers, it is `id`.
fn write_free(&self, id: u64) -> bool {
self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id))
}
// true: If there are any readers, it is `id`.
fn read_free(&self, id: u64) -> bool {
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
}
// allow a lock to skip the queue if a lock is already held by the same handle
fn can_jump_queue(&self, id: u64) -> bool {
self.writers.contains(&id) || self.readers.contains(&id)
}
// `id` is capable of acquiring this node for writing
fn write_available(&self, id: u64) -> bool {
self.write_free(id)
&& self.read_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id))
}
// `id` is capable of acquiring this node for reading
fn read_available(&self, id: u64) -> bool {
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
}
fn handle_request(
&mut self,
req: Request,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Option<Request> {
if req.lock_info.write() && self.write_available(req.lock_info.handle_id) {
self.writers.insert(req.lock_info.handle_id);
req.process(locks_on_lease)
} else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) {
self.readers.push(req.lock_info.handle_id);
req.process(locks_on_lease)
} else {
self.reqs.push(req);
None
}
}
fn release(&mut self, mut lock_info: LockInfo) -> Option<LockInfo> {
if lock_info.write() {
self.writers.remove(&lock_info.handle_id);
} else if let Some(idx) = self
.readers
.iter()
.enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
self.readers.swap_remove(idx);
}
if lock_info.ptr.len() == lock_info.segments_handled {
None
} else {
lock_info.segments_handled += 1;
Some(lock_info)
}
}
fn handle_release(
&mut self,
lock_info: LockInfo,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> (Option<LockInfo>, Vec<Request>) {
(self.release(lock_info), self.process_queue(locks_on_lease))
}
fn process_queue(
&mut self,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Vec<Request> {
let mut res = Vec::new();
for req in std::mem::take(&mut self.reqs) {
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|| self.read_available(req.lock_info.handle_id)
{
if let Some(req) = self.handle_request(req, locks_on_lease) {
res.push(req);
}
}
}
res
}
}
#[derive(Debug, Default)]
struct LockInfo {
ptr: JsonPointer,
segments_handled: usize,
write: bool,
handle_id: u64,
}
impl LockInfo {
fn write(&self) -> bool {
self.write && self.segments_handled == self.ptr.len()
}
fn current_seg(&self) -> &str {
if self.segments_handled == 0 {
"" // root
} else {
self.ptr
.get_segment(self.segments_handled - 1)
.unwrap_or_default()
}
}
fn reset(mut self) -> Self {
self.segments_handled = 0;
self
}
}
#[derive(Debug)]
struct Request {
lock_info: LockInfo,
completion: oneshot::Sender<Guard>,
}
impl Request {
fn process(mut self, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
if self.lock_info.ptr.len() == self.lock_info.segments_handled {
let (sender, receiver) = oneshot::channel();
locks_on_lease.push(receiver);
let _ = self.completion.send(Guard {
lock_info: self.lock_info.reset(),
sender: Some(sender),
});
None
} else {
self.lock_info.segments_handled += 1;
Some(self)
}
}
}
#[derive(Debug)]
pub struct Guard {
lock_info: LockInfo,
sender: Option<oneshot::Sender<LockInfo>>,
}
impl Drop for Guard {
fn drop(&mut self) {
let _ = self
.sender
.take()
.unwrap()
.send(std::mem::take(&mut self.lock_info));
}
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Error as IOError;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use fd_lock_rs::FdLock;
@@ -210,7 +210,7 @@ pub struct PatchDb {
pub(crate) store: Arc<RwLock<Store>>,
subscriber: Arc<Sender<Arc<Revision>>>,
pub(crate) locker: Arc<Locker>,
handle_id: Arc<AtomicUsize>,
handle_id: Arc<AtomicU64>,
}
impl PatchDb {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
@@ -220,7 +220,7 @@ impl PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)),
locker: Arc::new(Locker::new()),
subscriber: Arc::new(subscriber),
handle_id: Arc::new(AtomicUsize::new(0)),
handle_id: Arc::new(AtomicU64::new(0)),
})
}
pub async fn dump(&self) -> Dump {

View File

@@ -21,7 +21,7 @@ use crate::{
};
pub struct Transaction<Parent: DbHandle> {
pub(crate) id: usize,
pub(crate) id: u64,
pub(crate) parent: Parent,
pub(crate) locks: Vec<Guard>,
pub(crate) updates: DiffPatch,
@@ -79,7 +79,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
sub,
})
}
fn id(&self) -> usize {
fn id(&self) -> u64 {
self.id
}
fn rebase(&mut self) -> Result<(), Error> {