don't store locks on models

This commit is contained in:
Aiden McClelland
2021-09-28 12:33:17 -06:00
committed by Aiden McClelland
parent 89b8278273
commit 53ed4cd614
6 changed files with 43 additions and 43 deletions

View File

@@ -11,6 +11,8 @@ repository = "https://github.com/Start9Labs/patch-db"
version = "0.1.0" version = "0.1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
debug = ["log"]
[dependencies] [dependencies]
async-trait = "0.1.42" async-trait = "0.1.42"
@@ -20,7 +22,7 @@ json-patch = { path = "../json-patch" }
json-ptr = { path = "../json-ptr" } json-ptr = { path = "../json-ptr" }
lazy_static = "1.4.0" lazy_static = "1.4.0"
log = { version = "*", optional = true } log = { version = "*", optional = true }
nix = "0.20.0" nix = "0.22.1"
patch-db-macro = { path = "../patch-db-macro" } patch-db-macro = { path = "../patch-db-macro" }
serde = { version = "1.0.118", features = ["rc"] } serde = { version = "1.0.118", features = ["rc"] }
serde_cbor = { path = "../cbor" } serde_cbor = { path = "../cbor" }

View File

@@ -10,10 +10,13 @@ use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction}; use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction};
use crate::{patch::DiffPatch, Error}; use crate::{patch::DiffPatch, Error};
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct HandleId(pub(crate) u64);
#[async_trait] #[async_trait]
pub trait DbHandle: Send + Sync { pub trait DbHandle: Send + Sync {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>; async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
fn id(&self) -> u64; fn id(&self) -> HandleId;
fn rebase(&mut self) -> Result<(), Error>; fn rebase(&mut self) -> Result<(), Error>;
fn store(&self) -> Arc<RwLock<Store>>; fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>; fn subscribe(&self) -> Receiver<Arc<Revision>>;
@@ -75,7 +78,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
sub, sub,
}) })
} }
fn id(&self) -> u64 { fn id(&self) -> HandleId {
(**self).id() (**self).id()
} }
fn rebase(&mut self) -> Result<(), Error> { fn rebase(&mut self) -> Result<(), Error> {
@@ -148,7 +151,7 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
} }
pub struct PatchDbHandle { pub struct PatchDbHandle {
pub(crate) id: u64, pub(crate) id: HandleId,
pub(crate) db: PatchDb, pub(crate) db: PatchDb,
pub(crate) locks: Vec<Guard>, pub(crate) locks: Vec<Guard>,
} }
@@ -171,8 +174,8 @@ impl DbHandle for PatchDbHandle {
updates: DiffPatch::default(), updates: DiffPatch::default(),
}) })
} }
fn id(&self) -> u64 { fn id(&self) -> HandleId {
self.id self.id.clone()
} }
fn rebase(&mut self) -> Result<(), Error> { fn rebase(&mut self) -> Result<(), Error> {
Ok(()) Ok(())
@@ -231,7 +234,7 @@ impl DbHandle for PatchDbHandle {
} }
async fn lock(&mut self, ptr: JsonPointer, write: bool) { async fn lock(&mut self, ptr: JsonPointer, write: bool) {
self.locks self.locks
.push(self.db.locker.lock(self.id, ptr, write).await); .push(self.db.locker.lock(self.id.clone(), ptr, write).await);
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,

View File

@@ -1,8 +1,10 @@
use std::collections::{HashMap, HashSet}; use std::collections::BTreeMap;
use json_ptr::JsonPointer; use json_ptr::JsonPointer;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::handle::HandleId;
pub struct Locker { pub struct Locker {
sender: mpsc::UnboundedSender<Request>, sender: mpsc::UnboundedSender<Request>,
} }
@@ -34,7 +36,7 @@ impl Locker {
}); });
Locker { sender } Locker { sender }
} }
pub async fn lock(&self, handle_id: u64, ptr: JsonPointer, write: bool) -> Guard { pub async fn lock(&self, handle_id: HandleId, ptr: JsonPointer, write: bool) -> Guard {
let (send, recv) = oneshot::channel(); let (send, recv) = oneshot::channel();
self.sender self.sender
.send(Request { .send(Request {
@@ -88,7 +90,7 @@ async fn get_action(
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct Trie { struct Trie {
node: Node, node: Node,
children: HashMap<String, Trie>, children: BTreeMap<String, Trie>,
} }
impl Trie { impl Trie {
fn child_mut(&mut self, name: &str) -> &mut Self { fn child_mut(&mut self, name: &str) -> &mut Self {
@@ -125,31 +127,31 @@ impl Trie {
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct Node { struct Node {
readers: Vec<u64>, readers: Vec<HandleId>,
writers: Vec<u64>, writers: Vec<HandleId>,
reqs: Vec<Request>, reqs: Vec<Request>,
} }
impl Node { impl Node {
// true: If there are any writers, it is `id`. // true: If there are any writers, it is `id`.
fn write_free(&self, id: u64) -> bool { fn write_free(&self, id: &HandleId) -> bool {
self.writers.is_empty() || (self.writers.iter().filter(|a| a != &&id).count() == 0) self.writers.is_empty() || (self.writers.iter().filter(|a| a != &id).count() == 0)
} }
// true: If there are any readers, it is `id`. // true: If there are any readers, it is `id`.
fn read_free(&self, id: u64) -> bool { fn read_free(&self, id: &HandleId) -> bool {
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0) self.readers.is_empty() || (self.readers.iter().filter(|a| a != &id).count() == 0)
} }
// allow a lock to skip the queue if a lock is already held by the same handle // allow a lock to skip the queue if a lock is already held by the same handle
fn can_jump_queue(&self, id: u64) -> bool { fn can_jump_queue(&self, id: &HandleId) -> bool {
self.writers.contains(&id) || self.readers.contains(&id) self.writers.contains(&id) || self.readers.contains(&id)
} }
// `id` is capable of acquiring this node for writing // `id` is capable of acquiring this node for writing
fn write_available(&self, id: u64) -> bool { fn write_available(&self, id: &HandleId) -> bool {
self.write_free(id) self.write_free(id)
&& self.read_free(id) && self.read_free(id)
&& (self.reqs.is_empty() || self.can_jump_queue(id)) && (self.reqs.is_empty() || self.can_jump_queue(id))
} }
// `id` is capable of acquiring this node for reading // `id` is capable of acquiring this node for reading
fn read_available(&self, id: u64) -> bool { fn read_available(&self, id: &HandleId) -> bool {
self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id)) self.write_free(id) && (self.reqs.is_empty() || self.can_jump_queue(id))
} }
fn handle_request( fn handle_request(
@@ -157,11 +159,11 @@ impl Node {
req: Request, req: Request,
locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>, locks_on_lease: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Option<Request> { ) -> Option<Request> {
if req.lock_info.write() && self.write_available(req.lock_info.handle_id) { if req.lock_info.write() && self.write_available(&req.lock_info.handle_id) {
self.writers.push(req.lock_info.handle_id); self.writers.push(req.lock_info.handle_id.clone());
req.process(locks_on_lease) req.process(locks_on_lease)
} else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) { } else if !req.lock_info.write() && self.read_available(&req.lock_info.handle_id) {
self.readers.push(req.lock_info.handle_id); self.readers.push(req.lock_info.handle_id.clone());
req.process(locks_on_lease) req.process(locks_on_lease)
} else { } else {
self.reqs.push(req); self.reqs.push(req);
@@ -202,7 +204,7 @@ struct LockInfo {
ptr: JsonPointer, ptr: JsonPointer,
segments_handled: usize, segments_handled: usize,
write: bool, write: bool,
handle_id: u64, handle_id: HandleId,
} }
impl LockInfo { impl LockInfo {
fn write(&self) -> bool { fn write(&self) -> bool {

View File

@@ -9,7 +9,6 @@ use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::locker::Guard;
use crate::{DbHandle, DiffPatch, Error, Revision}; use crate::{DbHandle, DiffPatch, Error, Revision};
#[derive(Debug)] #[derive(Debug)]
@@ -59,7 +58,6 @@ impl<T: Serialize + for<'de> Deserialize<'de>> DerefMut for ModelDataMut<T> {
pub struct Model<T: Serialize + for<'de> Deserialize<'de>> { pub struct Model<T: Serialize + for<'de> Deserialize<'de>> {
ptr: JsonPointer, ptr: JsonPointer,
phantom: PhantomData<T>, phantom: PhantomData<T>,
lock: Option<Arc<Guard>>,
} }
impl<T> Model<T> impl<T> Model<T>
where where
@@ -93,7 +91,6 @@ where
Model { Model {
ptr, ptr,
phantom: PhantomData, phantom: PhantomData,
lock: self.lock,
} }
} }
} }
@@ -118,7 +115,6 @@ where
Self { Self {
ptr, ptr,
phantom: PhantomData, phantom: PhantomData,
lock: None,
} }
} }
} }
@@ -146,7 +142,6 @@ where
Model { Model {
ptr: self.ptr.clone(), ptr: self.ptr.clone(),
phantom: PhantomData, phantom: PhantomData,
lock: self.lock.clone(),
} }
} }
} }
@@ -297,13 +292,8 @@ where
T: HasModel + Serialize + for<'de> Deserialize<'de>, T: HasModel + Serialize + for<'de> Deserialize<'de>,
T::Model: DerefMut<Target = Model<T>>, T::Model: DerefMut<Target = Model<T>>,
{ {
pub async fn check<Db: DbHandle>(mut self, db: &mut Db) -> Result<Option<T::Model>, Error> { pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
let lock = db Ok(if self.exists(db, true).await? {
.locker()
.lock(db.id(), self.0.as_ref().clone(), false)
.await;
self.0.lock = Some(Arc::new(lock));
Ok(if self.exists(db, false).await? {
Some(self.0) Some(self.0)
} else { } else {
None None

View File

@@ -15,6 +15,7 @@ use tokio::fs::File;
use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard}; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
use crate::handle::HandleId;
use crate::patch::{diff, DiffPatch, Dump, Revision}; use crate::patch::{diff, DiffPatch, Dump, Revision};
use crate::Error; use crate::Error;
use crate::{locker::Locker, PatchDbHandle}; use crate::{locker::Locker, PatchDbHandle};
@@ -284,9 +285,10 @@ impl PatchDb {
} }
pub fn handle(&self) -> PatchDbHandle { pub fn handle(&self) -> PatchDbHandle {
PatchDbHandle { PatchDbHandle {
id: self id: HandleId(
.handle_id self.handle_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst), .fetch_add(1, std::sync::atomic::Ordering::SeqCst),
),
db: self.clone(), db: self.clone(),
locks: Vec::new(), locks: Vec::new(),
} }

View File

@@ -9,6 +9,7 @@ use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::sync::{RwLock, RwLockReadGuard};
use crate::handle::HandleId;
use crate::store::Store; use crate::store::Store;
use crate::Error; use crate::Error;
use crate::{ use crate::{
@@ -21,7 +22,7 @@ use crate::{
}; };
pub struct Transaction<Parent: DbHandle> { pub struct Transaction<Parent: DbHandle> {
pub(crate) id: u64, pub(crate) id: HandleId,
pub(crate) parent: Parent, pub(crate) parent: Parent,
pub(crate) locks: Vec<Guard>, pub(crate) locks: Vec<Guard>,
pub(crate) updates: DiffPatch, pub(crate) updates: DiffPatch,
@@ -79,8 +80,8 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
sub, sub,
}) })
} }
fn id(&self) -> u64 { fn id(&self) -> HandleId {
self.id self.id.clone()
} }
fn rebase(&mut self) -> Result<(), Error> { fn rebase(&mut self) -> Result<(), Error> {
self.parent.rebase()?; self.parent.rebase()?;
@@ -167,7 +168,7 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
} }
async fn lock(&mut self, ptr: JsonPointer, write: bool) { async fn lock(&mut self, ptr: JsonPointer, write: bool) {
self.locks self.locks
.push(self.parent.locker().lock(self.id, ptr, write).await) .push(self.parent.locker().lock(self.id.clone(), ptr, write).await)
} }
async fn get< async fn get<
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,