diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index 70285c0..6b658ef 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -11,6 +11,8 @@ repository = "https://github.com/Start9Labs/patch-db" version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +debug = ["log"] [dependencies] async-trait = "0.1.42" @@ -20,7 +22,7 @@ json-patch = { path = "../json-patch" } json-ptr = { path = "../json-ptr" } lazy_static = "1.4.0" log = { version = "*", optional = true } -nix = "0.20.0" +nix = "0.22.1" patch-db-macro = { path = "../patch-db-macro" } serde = { version = "1.0.118", features = ["rc"] } serde_cbor = { path = "../cbor" } diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index d5f7ce5..ebf982e 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -10,10 +10,13 @@ use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard}; use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction}; use crate::{patch::DiffPatch, Error}; +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct HandleId(pub(crate) u64); + #[async_trait] pub trait DbHandle: Send + Sync { async fn begin<'a>(&'a mut self) -> Result, Error>; - fn id(&self) -> u64; + fn id(&self) -> HandleId; fn rebase(&mut self) -> Result<(), Error>; fn store(&self) -> Arc>; fn subscribe(&self) -> Receiver>; @@ -75,7 +78,7 @@ impl DbHandle for &mut Handle { sub, }) } - fn id(&self) -> u64 { + fn id(&self) -> HandleId { (**self).id() } fn rebase(&mut self) -> Result<(), Error> { @@ -148,7 +151,7 @@ impl DbHandle for &mut Handle { } pub struct PatchDbHandle { - pub(crate) id: u64, + pub(crate) id: HandleId, pub(crate) db: PatchDb, pub(crate) locks: Vec, } @@ -171,8 +174,8 @@ impl DbHandle for PatchDbHandle { updates: DiffPatch::default(), }) } - fn id(&self) -> u64 { - self.id + fn id(&self) -> HandleId { + self.id.clone() } fn rebase(&mut self) -> Result<(), Error> { Ok(()) @@ -231,7 +234,7 @@ impl DbHandle for PatchDbHandle { } async fn lock(&mut self, ptr: JsonPointer, write: bool) { self.locks - .push(self.db.locker.lock(self.id, ptr, write).await); + .push(self.db.locker.lock(self.id.clone(), ptr, write).await); } async fn get< T: for<'de> Deserialize<'de>, diff --git a/patch-db/src/locker.rs b/patch-db/src/locker.rs index 3f3fcf2..b3be181 100644 --- a/patch-db/src/locker.rs +++ b/patch-db/src/locker.rs @@ -1,8 +1,10 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::BTreeMap; use json_ptr::JsonPointer; use tokio::sync::{mpsc, oneshot}; +use crate::handle::HandleId; + pub struct Locker { sender: mpsc::UnboundedSender, } @@ -34,7 +36,7 @@ impl Locker { }); Locker { sender } } - pub async fn lock(&self, handle_id: u64, ptr: JsonPointer, write: bool) -> Guard { + pub async fn lock(&self, handle_id: HandleId, ptr: JsonPointer, write: bool) -> Guard { let (send, recv) = oneshot::channel(); self.sender .send(Request { @@ -88,7 +90,7 @@ async fn get_action( #[derive(Debug, Default)] struct Trie { node: Node, - children: HashMap, + children: BTreeMap, } impl Trie { fn child_mut(&mut self, name: &str) -> &mut Self { @@ -125,31 +127,31 @@ impl Trie { #[derive(Debug, Default)] struct Node { - readers: Vec, - writers: Vec, + readers: Vec, + writers: Vec, reqs: Vec, } impl Node { // true: If there are any writers, it is `id`. - fn write_free(&self, id: u64) -> bool { - self.writers.is_empty() || (self.writers.iter().filter(|a| a != &&id).count() == 0) + fn write_free(&self, id: &HandleId) -> bool { + self.writers.is_empty() || (self.writers.iter().filter(|a| a != &id).count() == 0) } // true: If there are any readers, it is `id`. - fn read_free(&self, id: u64) -> bool { - self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0) + fn read_free(&self, id: &HandleId) -> bool { + self.readers.is_empty() || (self.readers.iter().filter(|a| a != &id).count() == 0) } // allow a lock to skip the queue if a lock is already held by the same handle - fn can_jump_queue(&self, id: u64) -> bool { + fn can_jump_queue(&self, id: &HandleId) -> bool { self.writers.contains(&id) || self.readers.contains(&id) } // `id` is capable of acquiring this node for writing - fn write_available(&self, id: u64) -> bool { + fn write_available(&self, id: &HandleId) -> bool { self.write_free(id) && self.read_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) } // `id` is capable of acquiring this node for reading - fn read_available(&self, id: u64) -> bool { + fn read_available(&self, id: &HandleId) -> bool { self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) } fn handle_request( @@ -157,11 +159,11 @@ impl Node { req: Request, locks_on_lease: &mut Vec>, ) -> Option { - if req.lock_info.write() && self.write_available(req.lock_info.handle_id) { - self.writers.push(req.lock_info.handle_id); + if req.lock_info.write() && self.write_available(&req.lock_info.handle_id) { + self.writers.push(req.lock_info.handle_id.clone()); req.process(locks_on_lease) - } else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) { - self.readers.push(req.lock_info.handle_id); + } else if !req.lock_info.write() && self.read_available(&req.lock_info.handle_id) { + self.readers.push(req.lock_info.handle_id.clone()); req.process(locks_on_lease) } else { self.reqs.push(req); @@ -202,7 +204,7 @@ struct LockInfo { ptr: JsonPointer, segments_handled: usize, write: bool, - handle_id: u64, + handle_id: HandleId, } impl LockInfo { fn write(&self) -> bool { diff --git a/patch-db/src/model.rs b/patch-db/src/model.rs index 98aa4f7..2e36787 100644 --- a/patch-db/src/model.rs +++ b/patch-db/src/model.rs @@ -9,7 +9,6 @@ use json_ptr::JsonPointer; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::locker::Guard; use crate::{DbHandle, DiffPatch, Error, Revision}; #[derive(Debug)] @@ -59,7 +58,6 @@ impl Deserialize<'de>> DerefMut for ModelDataMut { pub struct Model Deserialize<'de>> { ptr: JsonPointer, phantom: PhantomData, - lock: Option>, } impl Model where @@ -93,7 +91,6 @@ where Model { ptr, phantom: PhantomData, - lock: self.lock, } } } @@ -118,7 +115,6 @@ where Self { ptr, phantom: PhantomData, - lock: None, } } } @@ -146,7 +142,6 @@ where Model { ptr: self.ptr.clone(), phantom: PhantomData, - lock: self.lock.clone(), } } } @@ -297,13 +292,8 @@ where T: HasModel + Serialize + for<'de> Deserialize<'de>, T::Model: DerefMut>, { - pub async fn check(mut self, db: &mut Db) -> Result, Error> { - let lock = db - .locker() - .lock(db.id(), self.0.as_ref().clone(), false) - .await; - self.0.lock = Some(Arc::new(lock)); - Ok(if self.exists(db, false).await? { + pub async fn check(self, db: &mut Db) -> Result, Error> { + Ok(if self.exists(db, true).await? { Some(self.0) } else { None diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index 26b5f70..b699907 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -15,6 +15,7 @@ use tokio::fs::File; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard}; +use crate::handle::HandleId; use crate::patch::{diff, DiffPatch, Dump, Revision}; use crate::Error; use crate::{locker::Locker, PatchDbHandle}; @@ -284,9 +285,10 @@ impl PatchDb { } pub fn handle(&self) -> PatchDbHandle { PatchDbHandle { - id: self - .handle_id - .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + id: HandleId( + self.handle_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + ), db: self.clone(), locks: Vec::new(), } diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index cc6d766..209f728 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -9,6 +9,7 @@ use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::broadcast::Receiver; use tokio::sync::{RwLock, RwLockReadGuard}; +use crate::handle::HandleId; use crate::store::Store; use crate::Error; use crate::{ @@ -21,7 +22,7 @@ use crate::{ }; pub struct Transaction { - pub(crate) id: u64, + pub(crate) id: HandleId, pub(crate) parent: Parent, pub(crate) locks: Vec, pub(crate) updates: DiffPatch, @@ -79,8 +80,8 @@ impl DbHandle for Transaction { sub, }) } - fn id(&self) -> u64 { - self.id + fn id(&self) -> HandleId { + self.id.clone() } fn rebase(&mut self) -> Result<(), Error> { self.parent.rebase()?; @@ -167,7 +168,7 @@ impl DbHandle for Transaction { } async fn lock(&mut self, ptr: JsonPointer, write: bool) { self.locks - .push(self.parent.locker().lock(self.id, ptr, write).await) + .push(self.parent.locker().lock(self.id.clone(), ptr, write).await) } async fn get< T: for<'de> Deserialize<'de>,