use trie for locking

This commit is contained in:
Aiden McClelland
2021-09-23 17:29:56 -06:00
committed by Aiden McClelland
parent b112d59759
commit e731d091b8
5 changed files with 328 additions and 191 deletions

View File

@@ -7,16 +7,17 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
use crate::{locker::LockerGuard, Locker, PatchDb, Revision, Store, Transaction};
use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction};
use crate::{patch::DiffPatch, Error};
#[async_trait]
pub trait DbHandle: Send + Sync {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
fn id(&self) -> usize;
fn rebase(&mut self) -> Result<(), Error>;
fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>;
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>);
fn locker(&self) -> &Locker;
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -38,7 +39,7 @@ pub trait DbHandle: Send + Sync {
value: &Value,
) -> Result<Option<Arc<Revision>>, Error>;
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>;
async fn lock(&mut self, ptr: &JsonPointer) -> ();
async fn lock(&mut self, ptr: JsonPointer, write: bool) -> ();
async fn get<
T: for<'de> Deserialize<'de>,
S: AsRef<str> + Send + Sync,
@@ -67,14 +68,18 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
..
} = (*self).begin().await?;
Ok(Transaction {
id: self.id(),
parent: self,
locks,
updates,
sub,
})
}
fn id(&self) -> usize {
(**self).id()
}
fn rebase(&mut self) -> Result<(), Error> {
(*self).rebase()
(**self).rebase()
}
fn store(&self) -> Arc<RwLock<Store>> {
(**self).store()
@@ -82,8 +87,8 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
fn subscribe(&self) -> Receiver<Arc<Revision>> {
(**self).subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
(*self).locker_and_locks()
fn locker(&self) -> &Locker {
(**self).locker()
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
@@ -116,8 +121,8 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
(*self).apply(patch).await
}
async fn lock(&mut self, ptr: &JsonPointer) {
(*self).lock(ptr).await
async fn lock(&mut self, ptr: JsonPointer, write: bool) {
(*self).lock(ptr, write).await
}
async fn get<
T: for<'de> Deserialize<'de>,
@@ -143,8 +148,9 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
}
pub struct PatchDbHandle {
pub(crate) id: usize,
pub(crate) db: PatchDb,
pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
pub(crate) locks: Vec<Guard>,
}
#[async_trait]
@@ -152,11 +158,15 @@ impl DbHandle for PatchDbHandle {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
Ok(Transaction {
sub: self.subscribe(),
id: self.id(),
parent: self,
locks: Vec::new(),
updates: DiffPatch::default(),
})
}
fn id(&self) -> usize {
self.id
}
fn rebase(&mut self) -> Result<(), Error> {
Ok(())
}
@@ -166,8 +176,8 @@ impl DbHandle for PatchDbHandle {
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.db.subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
(&self.db.locker, vec![self.locks.as_mut_slice()])
fn locker(&self) -> &Locker {
&self.db.locker
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
@@ -212,8 +222,9 @@ impl DbHandle for PatchDbHandle {
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
self.db.apply(patch, None, None).await
}
async fn lock(&mut self, ptr: &JsonPointer) {
self.db.locker.add_lock(ptr, &mut self.locks, &mut []).await;
async fn lock(&mut self, ptr: JsonPointer, write: bool) {
self.locks
.push(self.db.locker.lock(self.id, ptr, write).await);
}
async fn get<
T: for<'de> Deserialize<'de>,

View File

@@ -1,162 +1,266 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::task::Poll;
use std::collections::{HashMap, HashSet, VecDeque};
use futures::future::BoxFuture;
use futures::FutureExt;
use json_ptr::{JsonPointer, SegList};
use qutex::{Guard, Qutex};
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use json_ptr::JsonPointer;
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
pub struct LockerGuard(Arc<NotifyGuard>, Guard<()>, Arc<LockerInner>);
#[derive(Debug)]
struct NotifyGuard(Arc<Notify>);
impl Drop for NotifyGuard {
fn drop(&mut self) {
self.0.notify_one();
#[derive(Debug, Default)]
struct LockInfo {
ptr: JsonPointer,
segments_handled: usize,
write: bool,
handle_id: usize,
}
impl LockInfo {
fn write(&self) -> bool {
self.write && self.segments_handled == self.ptr.len()
}
fn next_seg(&self) -> Option<&str> {
self.ptr.get_segment(self.segments_handled)
}
}
#[derive(Debug)]
pub struct Locker(Arc<LockerInner>);
#[derive(Debug)]
struct LockerInner {
map: Mutex<HashMap<String, Weak<LockerInner>>>,
self_lock: Qutex<()>,
children_lock: Arc<Notify>,
guard: Mutex<Weak<NotifyGuard>>,
parent: Option<(Arc<NotifyGuard>, String, Arc<LockerInner>)>,
struct Request {
lock_info: LockInfo,
completion: oneshot::Sender<Guard>,
}
impl Drop for LockerInner {
fn drop(&mut self) {
if let Some((_, idx, parent)) = self.parent.take() {
Handle::current().block_on(parent.map.lock()).remove(&idx);
}
}
}
impl Locker {
pub fn new() -> Self {
Locker(Arc::new(LockerInner {
map: Mutex::new(HashMap::new()),
self_lock: Qutex::new(()),
children_lock: {
let notify = Arc::new(Notify::new());
notify.notify_one();
notify
},
guard: Mutex::new(Weak::new()),
parent: None,
}))
}
async fn notify_guard(&self) -> Arc<NotifyGuard> {
let mut lock = self.0.guard.lock().await;
if let Some(n) = lock.upgrade() {
Arc::new(NotifyGuard(n.0.clone()))
} else {
let res = Arc::new(NotifyGuard(self.0.children_lock.clone()));
*lock = Arc::downgrade(&res);
res
}
}
async fn child<'a, S: Into<Cow<'a, str>>>(&self, name: S) -> Locker {
let name: Cow<'a, str> = name.into();
let mut lock = self.0.map.lock().await;
if let Some(child) = lock.get(name.as_ref()).and_then(|w| w.upgrade()) {
Locker(child)
} else {
let name = name.into_owned();
let res = Arc::new(LockerInner {
map: Mutex::new(HashMap::new()),
self_lock: Qutex::new(()),
children_lock: {
let notify = Arc::new(Notify::new());
notify.notify_one();
notify
},
guard: Mutex::new(Weak::new()),
parent: Some((self.notify_guard().await, name.clone(), self.0.clone())),
impl Request {
fn process(mut self, returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
if self.lock_info.ptr.len() == self.lock_info.segments_handled {
let (sender, receiver) = oneshot::channel();
returned_locks.push(receiver);
self.lock_info.segments_handled = 0;
let _ = self.completion.send(Guard {
lock_info: self.lock_info,
sender: Some(sender),
});
lock.insert(name, Arc::downgrade(&res));
Locker(res)
}
}
/// await once: in the queue, await twice: all children dropped
async fn wait_for_children<'a>(&'a self) -> BoxFuture<'a, ()> {
let children = self.0.guard.lock().await;
let mut fut = if children.strong_count() == 0 {
futures::future::ready(()).boxed()
None
} else {
self.0.children_lock.notified().boxed()
};
if matches!(futures::poll!(&mut fut), Poll::Ready(_)) {
return futures::future::ready(()).boxed();
self.lock_info.segments_handled += 1;
Some(self)
}
drop(children);
fut
}
async fn acquire_and_trade(self, guards: Vec<LockerGuard>) -> LockerGuard {
let children_dropped = self.wait_for_children().await;
guards.into_iter().for_each(drop);
children_dropped.await;
let guard = self.0.self_lock.clone().lock().await.unwrap();
LockerGuard(self.notify_guard().await, guard, self.0.clone())
}
#[derive(Debug)]
pub struct Guard {
lock_info: LockInfo,
sender: Option<oneshot::Sender<LockInfo>>,
}
impl Drop for Guard {
fn drop(&mut self) {
let _ = self
.sender
.take()
.unwrap()
.send(std::mem::take(&mut self.lock_info));
}
pub async fn lock<S: AsRef<str>, V: SegList>(
&self,
ptr: &JsonPointer<S, V>,
guards: Vec<LockerGuard>,
) -> LockerGuard {
let mut locker = Locker(self.0.clone());
for seg in ptr.iter() {
locker = locker.child(seg).await;
}
#[derive(Default)]
struct Node {
readers: Vec<usize>,
writers: HashSet<usize>,
reqs: VecDeque<Option<Request>>,
}
impl Node {
fn write_free(&self, id: usize) -> bool {
self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id))
}
fn read_free(&self, id: usize) -> bool {
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
}
fn write_available(&self, id: usize) -> bool {
self.write_free(id) && self.read_free(id) && self.reqs.is_empty()
}
fn read_available(&self, id: usize) -> bool {
self.write_free(id) && self.reqs.is_empty()
}
fn handle_request(
&mut self,
req: Request,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Option<Request> {
if req.lock_info.write() && self.write_available(req.lock_info.handle_id) {
self.writers.insert(req.lock_info.handle_id);
req.process(returned_locks)
} else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) {
self.readers.push(req.lock_info.handle_id);
req.process(returned_locks)
} else {
self.reqs.push_back(Some(req));
None
}
let res = locker.acquire_and_trade(guards).await;
let mut guards = Vec::with_capacity(ptr.len());
let mut cur = res.2.parent.as_ref().map(|(_, _, p)| p);
while let Some(parent) = cur {
guards.push(parent.self_lock.clone().lock().await.unwrap());
cur = parent.parent.as_ref().map(|(_, _, p)| p);
}
res
}
/// TODO: DRAGONS!!!
/// Acquiring a lock to a node above something you already held will do so at the transaction level of the lock you already held!
/// This means even though you dropped the sub tx in which you acquired the higher level lock,
/// the higher lock could still be held by the parent tx which originally held the lower lock.
pub async fn add_lock(
&self,
ptr: &JsonPointer,
locks: &mut Vec<(JsonPointer, Option<LockerGuard>)>, // tx locks
extra_locks: &mut [&mut [(JsonPointer, Option<LockerGuard>)]], // tx parent locks
) {
let mut lock_dest = None;
let mut guards = Vec::new();
for lock in extra_locks
.iter_mut()
.flat_map(|a| a.iter_mut())
.chain(locks.iter_mut())
fn handle_release(
&mut self,
mut lock_info: LockInfo,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> (Option<LockInfo>, Vec<Request>) {
if lock_info.write() {
self.writers.remove(&lock_info.handle_id);
} else if let Some(idx) = self
.readers
.iter()
.enumerate()
.find(|(_, id)| id == &&lock_info.handle_id)
.map(|(idx, _)| idx)
{
if ptr.starts_with(&lock.0) {
return;
}
if lock.0.starts_with(&ptr) {
if let Some(guard) = lock.1.take() {
guards.push(guard);
lock_dest = Some(lock);
self.readers.swap_remove(idx);
}
let new_reqs = self.process_queue(returned_locks);
if lock_info.ptr.len() == lock_info.segments_handled {
(None, new_reqs)
} else {
lock_info.segments_handled += 1;
(Some(lock_info), new_reqs)
}
}
fn process_queue(
&mut self,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Vec<Request> {
let mut ids_processed = HashSet::new();
let mut only_matching = false;
let mut res = Vec::new();
let mut tmp_reqs = std::mem::take(&mut self.reqs);
for req_opt in &mut tmp_reqs {
if let Some(req) = req_opt {
if !only_matching || ids_processed.contains(&req.lock_info.handle_id) {
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|| self.read_available(req.lock_info.handle_id)
{
ids_processed.insert(req.lock_info.handle_id);
if let Some(req) = req_opt.take() {
if let Some(req) = self.handle_request(req, returned_locks) {
res.push(req);
}
}
} else {
only_matching = true;
}
}
}
}
let guard = self.lock(ptr, guards).await;
if let Some(lock) = lock_dest {
lock.0 = ptr.clone();
lock.1 = Some(guard);
} else {
locks.push((ptr.clone(), Some(guard)));
self.reqs = tmp_reqs;
while matches!(self.reqs.get(0), Some(&None)) {
self.reqs.pop_front();
}
res
}
}
#[derive(Default)]
struct Trie {
node: Node,
children: HashMap<String, Trie>,
}
impl Trie {
fn child_mut(&mut self, name: &str) -> &mut Self {
if !self.children.contains_key(name) {
self.children.insert(name.to_owned(), Trie::default());
}
self.children.get_mut(name).unwrap()
}
fn handle_request(
&mut self,
req: Request,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) {
if let Some(req) = self.node.handle_request(req, returned_locks) {
if let Some(seg) = req.lock_info.next_seg() {
self.child_mut(seg).handle_request(req, returned_locks)
}
}
}
fn handle_release(
&mut self,
lock_info: LockInfo,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) {
let (release, reqs) = self.node.handle_release(lock_info, returned_locks);
for req in reqs {
self.handle_request(req, returned_locks);
}
if let Some(release) = release {
if let Some(seg) = release.next_seg() {
self.child_mut(seg).handle_release(release, returned_locks)
}
}
}
}
pub struct Locker {
sender: mpsc::UnboundedSender<Request>,
}
impl Locker {
pub fn new() -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
tokio::spawn(async move {
let mut trie = Trie::default();
let mut new_requests = RequestQueue {
closed: false,
recv: receiver,
};
let mut returned_locks = Vec::new();
while let Some(action) = get_action(&mut new_requests, &mut returned_locks).await {
match action {
Action::HandleRequest(req) => trie.handle_request(req, &mut returned_locks),
Action::HandleRelease(lock_info) => {
trie.handle_release(lock_info, &mut returned_locks)
}
}
}
});
Locker { sender }
}
pub async fn lock(&self, handle_id: usize, ptr: JsonPointer, write: bool) -> Guard {
let (send, recv) = oneshot::channel();
self.sender
.send(Request {
lock_info: LockInfo {
handle_id,
ptr,
write,
segments_handled: 0,
},
completion: send,
})
.unwrap();
recv.await.unwrap()
}
}
struct RequestQueue {
closed: bool,
recv: mpsc::UnboundedReceiver<Request>,
}
enum Action {
HandleRequest(Request),
HandleRelease(LockInfo),
}
async fn get_action(
new_requests: &mut RequestQueue,
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
) -> Option<Action> {
loop {
if new_requests.closed && returned_locks.is_empty() {
return None;
}
tokio::select! {
a = new_requests.recv.recv() => {
if let Some(a) = a {
return Some(Action::HandleRequest(a));
} else {
new_requests.closed = true;
}
}
(a, idx, _) = futures::future::select_all(returned_locks.iter_mut()) => {
returned_locks.swap_remove(idx);
return Some(Action::HandleRelease(a.unwrap()))
}
}
}
}

View File

@@ -10,6 +10,7 @@ use json_ptr::JsonPointer;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::locker::Guard;
use crate::{DbHandle, DiffPatch, Error, Revision};
#[derive(Debug)]
@@ -59,24 +60,25 @@ impl<T: Serialize + for<'de> Deserialize<'de>> DerefMut for ModelDataMut<T> {
pub struct Model<T: Serialize + for<'de> Deserialize<'de>> {
ptr: JsonPointer,
phantom: PhantomData<T>,
lock: Option<Arc<Guard>>,
}
impl<T> Model<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
db.lock(&self.ptr).await
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, write: bool) {
db.lock(self.ptr.clone(), write).await
}
pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> {
if lock {
self.lock(db).await;
self.lock(db, false).await;
}
Ok(ModelData(db.get(&self.ptr).await?))
}
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
self.lock(db).await;
self.lock(db, true).await;
let original = db.get_value(&self.ptr, None).await?;
let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut {
@@ -92,6 +94,7 @@ where
Model {
ptr,
phantom: PhantomData,
lock: self.lock,
}
}
}
@@ -104,7 +107,7 @@ where
db: &mut Db,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
self.lock(db).await;
self.lock(db, true).await;
db.put(&self.ptr, value).await
}
}
@@ -116,6 +119,7 @@ where
Self {
ptr,
phantom: PhantomData,
lock: None,
}
}
}
@@ -143,6 +147,7 @@ where
Model {
ptr: self.ptr.clone(),
phantom: PhantomData,
lock: self.lock.clone(),
}
}
}
@@ -208,8 +213,8 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
#[derive(Debug)]
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
db.lock(self.0.as_ref()).await
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, write: bool) {
db.lock(self.0.as_ref().clone(), write).await
}
pub async fn get<Db: DbHandle>(
@@ -218,7 +223,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
lock: bool,
) -> Result<ModelData<Option<T>>, Error> {
if lock {
self.lock(db).await;
self.lock(db, false).await;
}
Ok(ModelData(db.get(self.0.as_ref()).await?))
}
@@ -227,7 +232,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
&self,
db: &mut Db,
) -> Result<ModelDataMut<Option<T>>, Error> {
self.lock(db).await;
self.lock(db, true).await;
let original = db.get_value(self.0.as_ref(), None).await?;
let current = serde_json::from_value(original.clone())?;
Ok(ModelDataMut {
@@ -239,35 +244,48 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> {
if lock {
db.lock(self.0.as_ref()).await;
db.lock(self.0.as_ref().clone(), false).await;
}
Ok(db.exists(&self.as_ref(), None).await?)
}
pub fn map<
F: FnOnce(T::Model) -> V,
F: FnOnce(T::Model) -> U::Model,
U: Serialize + for<'de> Deserialize<'de> + HasModel,
V: ModelFor<U>,
>(
self,
f: F,
) -> OptionModel<U> {
Into::<JsonPointer>::into(f(self.0)).into()
OptionModel(f(self.0))
}
pub fn and_then<
F: FnOnce(T::Model) -> V,
U: Serialize + for<'de> Deserialize<'de>,
V: ModelFor<Option<U>>,
F: FnOnce(T::Model) -> U::Model,
U: Serialize + for<'de> Deserialize<'de> + HasModel,
>(
self,
f: F,
) -> V {
Into::<JsonPointer>::into(f(self.0)).into()
) -> OptionModel<U> {
OptionModel(f(self.0))
}
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
Ok(if self.exists(db, true).await? {
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref().clone(), true).await;
db.put(self.as_ref(), &Value::Null).await
}
}
impl<T> OptionModel<T>
where
T: HasModel + Serialize + for<'de> Deserialize<'de>,
T::Model: AsMut<Model<T>>,
{
pub async fn check<Db: DbHandle>(mut self, db: &mut Db) -> Result<Option<T::Model>, Error> {
let lock = db
.locker()
.lock(db.id(), self.0.as_ref().clone(), false)
.await;
self.0.as_mut().lock = Some(Arc::new(lock));
Ok(if self.exists(db, false).await? {
Some(self.0)
} else {
None
@@ -281,11 +299,6 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
Err(Error::NodeDoesNotExist(self.0.into()))
}
}
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref()).await;
db.put(self.as_ref(), &Value::Null).await
}
}
impl<T> OptionModel<T>
where
@@ -296,7 +309,7 @@ where
db: &mut Db,
value: &T,
) -> Result<Option<Arc<Revision>>, Error> {
db.lock(self.as_ref()).await;
db.lock(self.as_ref().clone(), true).await;
db.put(self.as_ref(), value).await
}
}
@@ -460,7 +473,7 @@ where
lock: bool,
) -> Result<IndexSet<T::Key>, Error> {
if lock {
db.lock(self.as_ref()).await;
db.lock(self.as_ref().clone(), false).await;
}
let set = db.keys(self.as_ref(), None).await?;
Ok(set
@@ -469,7 +482,7 @@ where
.collect::<Result<_, _>>()?)
}
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
db.lock(self.as_ref()).await;
db.lock(self.as_ref().clone(), true).await;
db.apply(DiffPatch(Patch(vec![PatchOperation::Remove(
RemoveOperation {
path: self.as_ref().clone().join_end(key.as_ref()),

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Error as IOError;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use fd_lock_rs::FdLock;
@@ -210,6 +211,7 @@ pub struct PatchDb {
pub(crate) store: Arc<RwLock<Store>>,
subscriber: Arc<Sender<Arc<Revision>>>,
pub(crate) locker: Arc<Locker>,
handle_id: Arc<AtomicUsize>,
}
impl PatchDb {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
@@ -219,6 +221,7 @@ impl PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)),
locker: Arc::new(Locker::new()),
subscriber: Arc::new(subscriber),
handle_id: Arc::new(AtomicUsize::new(0)),
})
}
pub async fn dump(&self) -> Dump {
@@ -282,6 +285,9 @@ impl PatchDb {
}
pub fn handle(&self) -> PatchDbHandle {
PatchDbHandle {
id: self
.handle_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
db: self.clone(),
locks: Vec::new(),
}

View File

@@ -12,7 +12,7 @@ use tokio::sync::{RwLock, RwLockReadGuard};
use crate::store::Store;
use crate::Error;
use crate::{
locker::{Locker, LockerGuard},
locker::{Guard, Locker},
DbHandle,
};
use crate::{
@@ -21,8 +21,9 @@ use crate::{
};
pub struct Transaction<Parent: DbHandle> {
pub(crate) id: usize,
pub(crate) parent: Parent,
pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
pub(crate) locks: Vec<Guard>,
pub(crate) updates: DiffPatch,
pub(crate) sub: Receiver<Arc<Revision>>,
}
@@ -71,12 +72,16 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
let sub = self.parent.subscribe();
drop(store);
Ok(Transaction {
id: self.id(),
parent: self,
locks: Vec::new(),
updates: DiffPatch::default(),
sub,
})
}
fn id(&self) -> usize {
self.id
}
fn rebase(&mut self) -> Result<(), Error> {
self.parent.rebase()?;
while let Some(rev) = match self.sub.try_recv() {
@@ -94,10 +99,8 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.parent.subscribe()
}
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
let (locker, mut locks) = self.parent.locker_and_locks();
locks.push(&mut self.locks);
(locker, locks)
fn locker(&self) -> &Locker {
self.parent.locker()
}
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
@@ -162,9 +165,9 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
self.updates.append(patch);
Ok(None)
}
async fn lock(&mut self, ptr: &JsonPointer) {
let (locker, mut locks) = self.parent.locker_and_locks();
locker.add_lock(ptr, &mut self.locks, &mut locks).await
async fn lock(&mut self, ptr: JsonPointer, write: bool) {
self.locks
.push(self.parent.locker().lock(self.id, ptr, write).await)
}
async fn get<
T: for<'de> Deserialize<'de>,