mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
address pr comments
This commit is contained in:
@@ -13,7 +13,7 @@ use crate::{patch::DiffPatch, Error};
|
||||
#[async_trait]
|
||||
pub trait DbHandle: Send + Sync {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
|
||||
fn id(&self) -> usize;
|
||||
fn id(&self) -> u64;
|
||||
fn rebase(&mut self) -> Result<(), Error>;
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
@@ -75,7 +75,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
sub,
|
||||
})
|
||||
}
|
||||
fn id(&self) -> usize {
|
||||
fn id(&self) -> u64 {
|
||||
(**self).id()
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
@@ -148,7 +148,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
}
|
||||
|
||||
pub struct PatchDbHandle {
|
||||
pub(crate) id: usize,
|
||||
pub(crate) id: u64,
|
||||
pub(crate) db: PatchDb,
|
||||
pub(crate) locks: Vec<Guard>,
|
||||
}
|
||||
@@ -164,7 +164,7 @@ impl DbHandle for PatchDbHandle {
|
||||
updates: DiffPatch::default(),
|
||||
})
|
||||
}
|
||||
fn id(&self) -> usize {
|
||||
fn id(&self) -> u64 {
|
||||
self.id
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
|
||||
@@ -3,187 +3,6 @@ use std::collections::{HashMap, HashSet};
|
||||
use json_ptr::JsonPointer;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LockInfo {
|
||||
ptr: JsonPointer,
|
||||
segments_handled: usize,
|
||||
write: bool,
|
||||
handle_id: usize,
|
||||
}
|
||||
impl LockInfo {
|
||||
fn write(&self) -> bool {
|
||||
self.write && self.segments_handled == self.ptr.len()
|
||||
}
|
||||
fn current_seg(&self) -> Option<&str> {
|
||||
if self.segments_handled == 0 {
|
||||
Some("") // root
|
||||
} else {
|
||||
self.ptr.get_segment(self.segments_handled - 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Request {
|
||||
lock_info: LockInfo,
|
||||
completion: oneshot::Sender<Guard>,
|
||||
}
|
||||
impl Request {
|
||||
fn process(mut self, returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
|
||||
if self.lock_info.ptr.len() == self.lock_info.segments_handled {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
returned_locks.push(receiver);
|
||||
self.lock_info.segments_handled = 0;
|
||||
let _ = self.completion.send(Guard {
|
||||
lock_info: self.lock_info,
|
||||
sender: Some(sender),
|
||||
});
|
||||
None
|
||||
} else {
|
||||
self.lock_info.segments_handled += 1;
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Guard {
|
||||
lock_info: LockInfo,
|
||||
sender: Option<oneshot::Sender<LockInfo>>,
|
||||
}
|
||||
impl Drop for Guard {
|
||||
fn drop(&mut self) {
|
||||
let _ = self
|
||||
.sender
|
||||
.take()
|
||||
.unwrap()
|
||||
.send(std::mem::take(&mut self.lock_info));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Node {
|
||||
readers: Vec<usize>,
|
||||
writers: HashSet<usize>,
|
||||
reqs: Vec<Request>,
|
||||
}
|
||||
impl Node {
|
||||
fn write_free(&self, id: usize) -> bool {
|
||||
self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id))
|
||||
}
|
||||
fn read_free(&self, id: usize) -> bool {
|
||||
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
|
||||
}
|
||||
// allow a lock to skip the queue if a lock is already held by the same handle
|
||||
fn can_jump_queue(&self, id: usize) -> bool {
|
||||
self.writers.contains(&id) || self.readers.contains(&id)
|
||||
}
|
||||
fn write_available(&self, id: usize) -> bool {
|
||||
self.write_free(id)
|
||||
&& self.read_free(id)
|
||||
&& (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
fn read_available(&self, id: usize) -> bool {
|
||||
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Option<Request> {
|
||||
if req.lock_info.write() && self.write_available(req.lock_info.handle_id) {
|
||||
self.writers.insert(req.lock_info.handle_id);
|
||||
req.process(returned_locks)
|
||||
} else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) {
|
||||
self.readers.push(req.lock_info.handle_id);
|
||||
req.process(returned_locks)
|
||||
} else {
|
||||
self.reqs.push(req);
|
||||
None
|
||||
}
|
||||
}
|
||||
fn handle_release(
|
||||
&mut self,
|
||||
mut lock_info: LockInfo,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> (Option<LockInfo>, Vec<Request>) {
|
||||
if lock_info.write() {
|
||||
self.writers.remove(&lock_info.handle_id);
|
||||
} else if let Some(idx) = self
|
||||
.readers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, id)| id == &&lock_info.handle_id)
|
||||
.map(|(idx, _)| idx)
|
||||
{
|
||||
self.readers.swap_remove(idx);
|
||||
}
|
||||
let new_reqs = self.process_queue(returned_locks);
|
||||
if lock_info.ptr.len() == lock_info.segments_handled {
|
||||
(None, new_reqs)
|
||||
} else {
|
||||
lock_info.segments_handled += 1;
|
||||
(Some(lock_info), new_reqs)
|
||||
}
|
||||
}
|
||||
fn process_queue(
|
||||
&mut self,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Vec<Request> {
|
||||
let mut res = Vec::new();
|
||||
for req in std::mem::take(&mut self.reqs) {
|
||||
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|
||||
|| self.read_available(req.lock_info.handle_id)
|
||||
{
|
||||
if let Some(req) = self.handle_request(req, returned_locks) {
|
||||
res.push(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Trie {
|
||||
node: Node,
|
||||
children: HashMap<String, Trie>,
|
||||
}
|
||||
impl Trie {
|
||||
fn child_mut(&mut self, name: &str) -> &mut Self {
|
||||
if !self.children.contains_key(name) {
|
||||
self.children.insert(name.to_owned(), Trie::default());
|
||||
}
|
||||
self.children.get_mut(name).unwrap()
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
if let Some(req) = self.node.handle_request(req, returned_locks) {
|
||||
if let Some(seg) = req.lock_info.current_seg() {
|
||||
self.child_mut(seg).handle_request(req, returned_locks)
|
||||
}
|
||||
}
|
||||
}
|
||||
fn handle_release(
|
||||
&mut self,
|
||||
lock_info: LockInfo,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
let (release, reqs) = self.node.handle_release(lock_info, returned_locks);
|
||||
for req in reqs {
|
||||
self.handle_request(req, returned_locks);
|
||||
}
|
||||
if let Some(release) = release {
|
||||
if let Some(seg) = release.current_seg() {
|
||||
self.child_mut(seg).handle_release(release, returned_locks)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Locker {
|
||||
sender: mpsc::UnboundedSender<Request>,
|
||||
}
|
||||
@@ -196,15 +15,17 @@ impl Locker {
|
||||
closed: false,
|
||||
recv: receiver,
|
||||
};
|
||||
let (_non_empty_send, non_empty_recv) = oneshot::channel();
|
||||
let mut returned_locks = vec![non_empty_recv];
|
||||
while let Some(action) = get_action(&mut new_requests, &mut returned_locks).await {
|
||||
// futures::future::select_all will panic if the list is empty
|
||||
// instead we want it to block forever by adding a channel that will never recv
|
||||
let (_dummy_send, dummy_recv) = oneshot::channel();
|
||||
let mut locks_on_lease = vec![dummy_recv];
|
||||
while let Some(action) = get_action(&mut new_requests, &mut locks_on_lease).await {
|
||||
#[cfg(feature = "log")]
|
||||
log::trace!("Locker Action: {:#?}", action);
|
||||
match action {
|
||||
Action::HandleRequest(req) => trie.handle_request(req, &mut returned_locks),
|
||||
Action::HandleRequest(req) => trie.handle_request(req, &mut locks_on_lease),
|
||||
Action::HandleRelease(lock_info) => {
|
||||
trie.handle_release(lock_info, &mut returned_locks)
|
||||
trie.handle_release(lock_info, &mut locks_on_lease)
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "log")]
|
||||
@@ -213,7 +34,7 @@ impl Locker {
|
||||
});
|
||||
Locker { sender }
|
||||
}
|
||||
pub async fn lock(&self, handle_id: usize, ptr: JsonPointer, write: bool) -> Guard {
|
||||
pub async fn lock(&self, handle_id: u64, ptr: JsonPointer, write: bool) -> Guard {
|
||||
let (send, recv) = oneshot::channel();
|
||||
self.sender
|
||||
.send(Request {
|
||||
@@ -239,13 +60,12 @@ enum Action {
|
||||
HandleRequest(Request),
|
||||
HandleRelease(LockInfo),
|
||||
}
|
||||
|
||||
async fn get_action(
|
||||
new_requests: &mut RequestQueue,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Option<Action> {
|
||||
loop {
|
||||
if new_requests.closed && returned_locks.is_empty() {
|
||||
if new_requests.closed && locks_on_lease.is_empty() {
|
||||
return None;
|
||||
}
|
||||
tokio::select! {
|
||||
@@ -256,10 +76,200 @@ async fn get_action(
|
||||
new_requests.closed = true;
|
||||
}
|
||||
}
|
||||
(a, idx, _) = futures::future::select_all(returned_locks.iter_mut()) => {
|
||||
returned_locks.swap_remove(idx);
|
||||
(a, idx, _) = futures::future::select_all(locks_on_lease.iter_mut()) => {
|
||||
locks_on_lease.swap_remove(idx);
|
||||
return Some(Action::HandleRelease(a.unwrap()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Trie {
|
||||
node: Node,
|
||||
children: HashMap<String, Trie>,
|
||||
}
|
||||
impl Trie {
|
||||
fn child_mut(&mut self, name: &str) -> &mut Self {
|
||||
if !self.children.contains_key(name) {
|
||||
self.children.insert(name.to_owned(), Trie::default());
|
||||
}
|
||||
self.children.get_mut(name).unwrap()
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
if let Some(req) = self.node.handle_request(req, locks_on_lease) {
|
||||
self.child_mut(req.lock_info.current_seg())
|
||||
.handle_request(req, locks_on_lease)
|
||||
}
|
||||
}
|
||||
fn handle_release(
|
||||
&mut self,
|
||||
lock_info: LockInfo,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
let (release, reqs) = self.node.handle_release(lock_info, locks_on_lease);
|
||||
for req in reqs {
|
||||
self.handle_request(req, locks_on_lease);
|
||||
}
|
||||
if let Some(release) = release {
|
||||
self.child_mut(release.current_seg())
|
||||
.handle_release(release, locks_on_lease)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Node {
|
||||
readers: Vec<u64>,
|
||||
writers: HashSet<u64>,
|
||||
reqs: Vec<Request>,
|
||||
}
|
||||
impl Node {
|
||||
// true: If there are any writers, it is `id`.
|
||||
fn write_free(&self, id: u64) -> bool {
|
||||
self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id))
|
||||
}
|
||||
// true: If there are any readers, it is `id`.
|
||||
fn read_free(&self, id: u64) -> bool {
|
||||
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
|
||||
}
|
||||
// allow a lock to skip the queue if a lock is already held by the same handle
|
||||
fn can_jump_queue(&self, id: u64) -> bool {
|
||||
self.writers.contains(&id) || self.readers.contains(&id)
|
||||
}
|
||||
// `id` is capable of acquiring this node for writing
|
||||
fn write_available(&self, id: u64) -> bool {
|
||||
self.write_free(id)
|
||||
&& self.read_free(id)
|
||||
&& (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
// `id` is capable of acquiring this node for reading
|
||||
fn read_available(&self, id: u64) -> bool {
|
||||
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Option<Request> {
|
||||
if req.lock_info.write() && self.write_available(req.lock_info.handle_id) {
|
||||
self.writers.insert(req.lock_info.handle_id);
|
||||
req.process(locks_on_lease)
|
||||
} else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) {
|
||||
self.readers.push(req.lock_info.handle_id);
|
||||
req.process(locks_on_lease)
|
||||
} else {
|
||||
self.reqs.push(req);
|
||||
None
|
||||
}
|
||||
}
|
||||
fn release(&mut self, mut lock_info: LockInfo) -> Option<LockInfo> {
|
||||
if lock_info.write() {
|
||||
self.writers.remove(&lock_info.handle_id);
|
||||
} else if let Some(idx) = self
|
||||
.readers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, id)| id == &&lock_info.handle_id)
|
||||
.map(|(idx, _)| idx)
|
||||
{
|
||||
self.readers.swap_remove(idx);
|
||||
}
|
||||
if lock_info.ptr.len() == lock_info.segments_handled {
|
||||
None
|
||||
} else {
|
||||
lock_info.segments_handled += 1;
|
||||
Some(lock_info)
|
||||
}
|
||||
}
|
||||
fn handle_release(
|
||||
&mut self,
|
||||
lock_info: LockInfo,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> (Option<LockInfo>, Vec<Request>) {
|
||||
(self.release(lock_info), self.process_queue(locks_on_lease))
|
||||
}
|
||||
fn process_queue(
|
||||
&mut self,
|
||||
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Vec<Request> {
|
||||
let mut res = Vec::new();
|
||||
for req in std::mem::take(&mut self.reqs) {
|
||||
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|
||||
|| self.read_available(req.lock_info.handle_id)
|
||||
{
|
||||
if let Some(req) = self.handle_request(req, locks_on_lease) {
|
||||
res.push(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LockInfo {
|
||||
ptr: JsonPointer,
|
||||
segments_handled: usize,
|
||||
write: bool,
|
||||
handle_id: u64,
|
||||
}
|
||||
impl LockInfo {
|
||||
fn write(&self) -> bool {
|
||||
self.write && self.segments_handled == self.ptr.len()
|
||||
}
|
||||
fn current_seg(&self) -> &str {
|
||||
if self.segments_handled == 0 {
|
||||
"" // root
|
||||
} else {
|
||||
self.ptr
|
||||
.get_segment(self.segments_handled - 1)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
fn reset(mut self) -> Self {
|
||||
self.segments_handled = 0;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Request {
|
||||
lock_info: LockInfo,
|
||||
completion: oneshot::Sender<Guard>,
|
||||
}
|
||||
impl Request {
|
||||
fn process(mut self, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
|
||||
if self.lock_info.ptr.len() == self.lock_info.segments_handled {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
locks_on_lease.push(receiver);
|
||||
let _ = self.completion.send(Guard {
|
||||
lock_info: self.lock_info.reset(),
|
||||
sender: Some(sender),
|
||||
});
|
||||
None
|
||||
} else {
|
||||
self.lock_info.segments_handled += 1;
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Guard {
|
||||
lock_info: LockInfo,
|
||||
sender: Option<oneshot::Sender<LockInfo>>,
|
||||
}
|
||||
impl Drop for Guard {
|
||||
fn drop(&mut self) {
|
||||
let _ = self
|
||||
.sender
|
||||
.take()
|
||||
.unwrap()
|
||||
.send(std::mem::take(&mut self.lock_info));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Error as IOError;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
|
||||
use fd_lock_rs::FdLock;
|
||||
@@ -210,7 +210,7 @@ pub struct PatchDb {
|
||||
pub(crate) store: Arc<RwLock<Store>>,
|
||||
subscriber: Arc<Sender<Arc<Revision>>>,
|
||||
pub(crate) locker: Arc<Locker>,
|
||||
handle_id: Arc<AtomicUsize>,
|
||||
handle_id: Arc<AtomicU64>,
|
||||
}
|
||||
impl PatchDb {
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
@@ -220,7 +220,7 @@ impl PatchDb {
|
||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||
locker: Arc::new(Locker::new()),
|
||||
subscriber: Arc::new(subscriber),
|
||||
handle_id: Arc::new(AtomicUsize::new(0)),
|
||||
handle_id: Arc::new(AtomicU64::new(0)),
|
||||
})
|
||||
}
|
||||
pub async fn dump(&self) -> Dump {
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::{
|
||||
};
|
||||
|
||||
pub struct Transaction<Parent: DbHandle> {
|
||||
pub(crate) id: usize,
|
||||
pub(crate) id: u64,
|
||||
pub(crate) parent: Parent,
|
||||
pub(crate) locks: Vec<Guard>,
|
||||
pub(crate) updates: DiffPatch,
|
||||
@@ -79,7 +79,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
sub,
|
||||
})
|
||||
}
|
||||
fn id(&self) -> usize {
|
||||
fn id(&self) -> u64 {
|
||||
self.id
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
|
||||
Reference in New Issue
Block a user