mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 10:21:53 +00:00
use trie for locking
This commit is contained in:
@@ -7,16 +7,17 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::sync::{broadcast::Receiver, RwLock, RwLockReadGuard};
|
||||
|
||||
use crate::{locker::LockerGuard, Locker, PatchDb, Revision, Store, Transaction};
|
||||
use crate::{locker::Guard, Locker, PatchDb, Revision, Store, Transaction};
|
||||
use crate::{patch::DiffPatch, Error};
|
||||
|
||||
#[async_trait]
|
||||
pub trait DbHandle: Send + Sync {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error>;
|
||||
fn id(&self) -> usize;
|
||||
fn rebase(&mut self) -> Result<(), Error>;
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>);
|
||||
fn locker(&self) -> &Locker;
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
@@ -38,7 +39,7 @@ pub trait DbHandle: Send + Sync {
|
||||
value: &Value,
|
||||
) -> Result<Option<Arc<Revision>>, Error>;
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error>;
|
||||
async fn lock(&mut self, ptr: &JsonPointer) -> ();
|
||||
async fn lock(&mut self, ptr: JsonPointer, write: bool) -> ();
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
S: AsRef<str> + Send + Sync,
|
||||
@@ -67,14 +68,18 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
..
|
||||
} = (*self).begin().await?;
|
||||
Ok(Transaction {
|
||||
id: self.id(),
|
||||
parent: self,
|
||||
locks,
|
||||
updates,
|
||||
sub,
|
||||
})
|
||||
}
|
||||
fn id(&self) -> usize {
|
||||
(**self).id()
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
(*self).rebase()
|
||||
(**self).rebase()
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
(**self).store()
|
||||
@@ -82,8 +87,8 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
(**self).subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
|
||||
(*self).locker_and_locks()
|
||||
fn locker(&self) -> &Locker {
|
||||
(**self).locker()
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
@@ -116,8 +121,8 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||
(*self).apply(patch).await
|
||||
}
|
||||
async fn lock(&mut self, ptr: &JsonPointer) {
|
||||
(*self).lock(ptr).await
|
||||
async fn lock(&mut self, ptr: JsonPointer, write: bool) {
|
||||
(*self).lock(ptr, write).await
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
@@ -143,8 +148,9 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
}
|
||||
|
||||
pub struct PatchDbHandle {
|
||||
pub(crate) id: usize,
|
||||
pub(crate) db: PatchDb,
|
||||
pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
|
||||
pub(crate) locks: Vec<Guard>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -152,11 +158,15 @@ impl DbHandle for PatchDbHandle {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||
Ok(Transaction {
|
||||
sub: self.subscribe(),
|
||||
id: self.id(),
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch::default(),
|
||||
})
|
||||
}
|
||||
fn id(&self) -> usize {
|
||||
self.id
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
@@ -166,8 +176,8 @@ impl DbHandle for PatchDbHandle {
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.db.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
|
||||
(&self.db.locker, vec![self.locks.as_mut_slice()])
|
||||
fn locker(&self) -> &Locker {
|
||||
&self.db.locker
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
@@ -212,8 +222,9 @@ impl DbHandle for PatchDbHandle {
|
||||
async fn apply(&mut self, patch: DiffPatch) -> Result<Option<Arc<Revision>>, Error> {
|
||||
self.db.apply(patch, None, None).await
|
||||
}
|
||||
async fn lock(&mut self, ptr: &JsonPointer) {
|
||||
self.db.locker.add_lock(ptr, &mut self.locks, &mut []).await;
|
||||
async fn lock(&mut self, ptr: JsonPointer, write: bool) {
|
||||
self.locks
|
||||
.push(self.db.locker.lock(self.id, ptr, write).await);
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
|
||||
@@ -1,162 +1,266 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::task::Poll;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use futures::FutureExt;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use qutex::{Guard, Qutex};
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use json_ptr::JsonPointer;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LockerGuard(Arc<NotifyGuard>, Guard<()>, Arc<LockerInner>);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotifyGuard(Arc<Notify>);
|
||||
impl Drop for NotifyGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.notify_one();
|
||||
#[derive(Debug, Default)]
|
||||
struct LockInfo {
|
||||
ptr: JsonPointer,
|
||||
segments_handled: usize,
|
||||
write: bool,
|
||||
handle_id: usize,
|
||||
}
|
||||
impl LockInfo {
|
||||
fn write(&self) -> bool {
|
||||
self.write && self.segments_handled == self.ptr.len()
|
||||
}
|
||||
fn next_seg(&self) -> Option<&str> {
|
||||
self.ptr.get_segment(self.segments_handled)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Locker(Arc<LockerInner>);
|
||||
#[derive(Debug)]
|
||||
struct LockerInner {
|
||||
map: Mutex<HashMap<String, Weak<LockerInner>>>,
|
||||
self_lock: Qutex<()>,
|
||||
children_lock: Arc<Notify>,
|
||||
guard: Mutex<Weak<NotifyGuard>>,
|
||||
parent: Option<(Arc<NotifyGuard>, String, Arc<LockerInner>)>,
|
||||
struct Request {
|
||||
lock_info: LockInfo,
|
||||
completion: oneshot::Sender<Guard>,
|
||||
}
|
||||
impl Drop for LockerInner {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, idx, parent)) = self.parent.take() {
|
||||
Handle::current().block_on(parent.map.lock()).remove(&idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Locker {
|
||||
pub fn new() -> Self {
|
||||
Locker(Arc::new(LockerInner {
|
||||
map: Mutex::new(HashMap::new()),
|
||||
self_lock: Qutex::new(()),
|
||||
children_lock: {
|
||||
let notify = Arc::new(Notify::new());
|
||||
notify.notify_one();
|
||||
notify
|
||||
},
|
||||
guard: Mutex::new(Weak::new()),
|
||||
parent: None,
|
||||
}))
|
||||
}
|
||||
async fn notify_guard(&self) -> Arc<NotifyGuard> {
|
||||
let mut lock = self.0.guard.lock().await;
|
||||
if let Some(n) = lock.upgrade() {
|
||||
Arc::new(NotifyGuard(n.0.clone()))
|
||||
} else {
|
||||
let res = Arc::new(NotifyGuard(self.0.children_lock.clone()));
|
||||
*lock = Arc::downgrade(&res);
|
||||
res
|
||||
}
|
||||
}
|
||||
async fn child<'a, S: Into<Cow<'a, str>>>(&self, name: S) -> Locker {
|
||||
let name: Cow<'a, str> = name.into();
|
||||
let mut lock = self.0.map.lock().await;
|
||||
if let Some(child) = lock.get(name.as_ref()).and_then(|w| w.upgrade()) {
|
||||
Locker(child)
|
||||
} else {
|
||||
let name = name.into_owned();
|
||||
let res = Arc::new(LockerInner {
|
||||
map: Mutex::new(HashMap::new()),
|
||||
self_lock: Qutex::new(()),
|
||||
children_lock: {
|
||||
let notify = Arc::new(Notify::new());
|
||||
notify.notify_one();
|
||||
notify
|
||||
},
|
||||
guard: Mutex::new(Weak::new()),
|
||||
parent: Some((self.notify_guard().await, name.clone(), self.0.clone())),
|
||||
impl Request {
|
||||
fn process(mut self, returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>) -> Option<Self> {
|
||||
if self.lock_info.ptr.len() == self.lock_info.segments_handled {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
returned_locks.push(receiver);
|
||||
self.lock_info.segments_handled = 0;
|
||||
let _ = self.completion.send(Guard {
|
||||
lock_info: self.lock_info,
|
||||
sender: Some(sender),
|
||||
});
|
||||
lock.insert(name, Arc::downgrade(&res));
|
||||
Locker(res)
|
||||
}
|
||||
}
|
||||
/// await once: in the queue, await twice: all children dropped
|
||||
async fn wait_for_children<'a>(&'a self) -> BoxFuture<'a, ()> {
|
||||
let children = self.0.guard.lock().await;
|
||||
let mut fut = if children.strong_count() == 0 {
|
||||
futures::future::ready(()).boxed()
|
||||
None
|
||||
} else {
|
||||
self.0.children_lock.notified().boxed()
|
||||
};
|
||||
if matches!(futures::poll!(&mut fut), Poll::Ready(_)) {
|
||||
return futures::future::ready(()).boxed();
|
||||
self.lock_info.segments_handled += 1;
|
||||
Some(self)
|
||||
}
|
||||
drop(children);
|
||||
fut
|
||||
}
|
||||
async fn acquire_and_trade(self, guards: Vec<LockerGuard>) -> LockerGuard {
|
||||
let children_dropped = self.wait_for_children().await;
|
||||
guards.into_iter().for_each(drop);
|
||||
children_dropped.await;
|
||||
let guard = self.0.self_lock.clone().lock().await.unwrap();
|
||||
LockerGuard(self.notify_guard().await, guard, self.0.clone())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Guard {
|
||||
lock_info: LockInfo,
|
||||
sender: Option<oneshot::Sender<LockInfo>>,
|
||||
}
|
||||
impl Drop for Guard {
|
||||
fn drop(&mut self) {
|
||||
let _ = self
|
||||
.sender
|
||||
.take()
|
||||
.unwrap()
|
||||
.send(std::mem::take(&mut self.lock_info));
|
||||
}
|
||||
pub async fn lock<S: AsRef<str>, V: SegList>(
|
||||
&self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
guards: Vec<LockerGuard>,
|
||||
) -> LockerGuard {
|
||||
let mut locker = Locker(self.0.clone());
|
||||
for seg in ptr.iter() {
|
||||
locker = locker.child(seg).await;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Node {
|
||||
readers: Vec<usize>,
|
||||
writers: HashSet<usize>,
|
||||
reqs: VecDeque<Option<Request>>,
|
||||
}
|
||||
impl Node {
|
||||
fn write_free(&self, id: usize) -> bool {
|
||||
self.writers.is_empty() || (self.writers.len() == 1 && self.writers.contains(&id))
|
||||
}
|
||||
fn read_free(&self, id: usize) -> bool {
|
||||
self.readers.is_empty() || (self.readers.iter().filter(|a| a != &&id).count() == 0)
|
||||
}
|
||||
fn write_available(&self, id: usize) -> bool {
|
||||
self.write_free(id) && self.read_free(id) && self.reqs.is_empty()
|
||||
}
|
||||
fn read_available(&self, id: usize) -> bool {
|
||||
self.write_free(id) && self.reqs.is_empty()
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Option<Request> {
|
||||
if req.lock_info.write() && self.write_available(req.lock_info.handle_id) {
|
||||
self.writers.insert(req.lock_info.handle_id);
|
||||
req.process(returned_locks)
|
||||
} else if !req.lock_info.write() && self.read_available(req.lock_info.handle_id) {
|
||||
self.readers.push(req.lock_info.handle_id);
|
||||
req.process(returned_locks)
|
||||
} else {
|
||||
self.reqs.push_back(Some(req));
|
||||
None
|
||||
}
|
||||
let res = locker.acquire_and_trade(guards).await;
|
||||
let mut guards = Vec::with_capacity(ptr.len());
|
||||
let mut cur = res.2.parent.as_ref().map(|(_, _, p)| p);
|
||||
while let Some(parent) = cur {
|
||||
guards.push(parent.self_lock.clone().lock().await.unwrap());
|
||||
cur = parent.parent.as_ref().map(|(_, _, p)| p);
|
||||
}
|
||||
res
|
||||
}
|
||||
/// TODO: DRAGONS!!!
|
||||
/// Acquiring a lock to a node above something you already held will do so at the transaction level of the lock you already held!
|
||||
/// This means even though you dropped the sub tx in which you acquired the higher level lock,
|
||||
/// the higher lock could still be held by the parent tx which originally held the lower lock.
|
||||
pub async fn add_lock(
|
||||
&self,
|
||||
ptr: &JsonPointer,
|
||||
locks: &mut Vec<(JsonPointer, Option<LockerGuard>)>, // tx locks
|
||||
extra_locks: &mut [&mut [(JsonPointer, Option<LockerGuard>)]], // tx parent locks
|
||||
) {
|
||||
let mut lock_dest = None;
|
||||
let mut guards = Vec::new();
|
||||
for lock in extra_locks
|
||||
.iter_mut()
|
||||
.flat_map(|a| a.iter_mut())
|
||||
.chain(locks.iter_mut())
|
||||
fn handle_release(
|
||||
&mut self,
|
||||
mut lock_info: LockInfo,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> (Option<LockInfo>, Vec<Request>) {
|
||||
if lock_info.write() {
|
||||
self.writers.remove(&lock_info.handle_id);
|
||||
} else if let Some(idx) = self
|
||||
.readers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, id)| id == &&lock_info.handle_id)
|
||||
.map(|(idx, _)| idx)
|
||||
{
|
||||
if ptr.starts_with(&lock.0) {
|
||||
return;
|
||||
}
|
||||
if lock.0.starts_with(&ptr) {
|
||||
if let Some(guard) = lock.1.take() {
|
||||
guards.push(guard);
|
||||
lock_dest = Some(lock);
|
||||
self.readers.swap_remove(idx);
|
||||
}
|
||||
let new_reqs = self.process_queue(returned_locks);
|
||||
if lock_info.ptr.len() == lock_info.segments_handled {
|
||||
(None, new_reqs)
|
||||
} else {
|
||||
lock_info.segments_handled += 1;
|
||||
(Some(lock_info), new_reqs)
|
||||
}
|
||||
}
|
||||
fn process_queue(
|
||||
&mut self,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Vec<Request> {
|
||||
let mut ids_processed = HashSet::new();
|
||||
let mut only_matching = false;
|
||||
let mut res = Vec::new();
|
||||
let mut tmp_reqs = std::mem::take(&mut self.reqs);
|
||||
for req_opt in &mut tmp_reqs {
|
||||
if let Some(req) = req_opt {
|
||||
if !only_matching || ids_processed.contains(&req.lock_info.handle_id) {
|
||||
if (req.lock_info.write() && self.write_available(req.lock_info.handle_id))
|
||||
|| self.read_available(req.lock_info.handle_id)
|
||||
{
|
||||
ids_processed.insert(req.lock_info.handle_id);
|
||||
if let Some(req) = req_opt.take() {
|
||||
if let Some(req) = self.handle_request(req, returned_locks) {
|
||||
res.push(req);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
only_matching = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let guard = self.lock(ptr, guards).await;
|
||||
if let Some(lock) = lock_dest {
|
||||
lock.0 = ptr.clone();
|
||||
lock.1 = Some(guard);
|
||||
} else {
|
||||
locks.push((ptr.clone(), Some(guard)));
|
||||
self.reqs = tmp_reqs;
|
||||
while matches!(self.reqs.get(0), Some(&None)) {
|
||||
self.reqs.pop_front();
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Trie {
|
||||
node: Node,
|
||||
children: HashMap<String, Trie>,
|
||||
}
|
||||
impl Trie {
|
||||
fn child_mut(&mut self, name: &str) -> &mut Self {
|
||||
if !self.children.contains_key(name) {
|
||||
self.children.insert(name.to_owned(), Trie::default());
|
||||
}
|
||||
self.children.get_mut(name).unwrap()
|
||||
}
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
req: Request,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
if let Some(req) = self.node.handle_request(req, returned_locks) {
|
||||
if let Some(seg) = req.lock_info.next_seg() {
|
||||
self.child_mut(seg).handle_request(req, returned_locks)
|
||||
}
|
||||
}
|
||||
}
|
||||
fn handle_release(
|
||||
&mut self,
|
||||
lock_info: LockInfo,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) {
|
||||
let (release, reqs) = self.node.handle_release(lock_info, returned_locks);
|
||||
for req in reqs {
|
||||
self.handle_request(req, returned_locks);
|
||||
}
|
||||
if let Some(release) = release {
|
||||
if let Some(seg) = release.next_seg() {
|
||||
self.child_mut(seg).handle_release(release, returned_locks)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Locker {
|
||||
sender: mpsc::UnboundedSender<Request>,
|
||||
}
|
||||
impl Locker {
|
||||
pub fn new() -> Self {
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
tokio::spawn(async move {
|
||||
let mut trie = Trie::default();
|
||||
let mut new_requests = RequestQueue {
|
||||
closed: false,
|
||||
recv: receiver,
|
||||
};
|
||||
let mut returned_locks = Vec::new();
|
||||
while let Some(action) = get_action(&mut new_requests, &mut returned_locks).await {
|
||||
match action {
|
||||
Action::HandleRequest(req) => trie.handle_request(req, &mut returned_locks),
|
||||
Action::HandleRelease(lock_info) => {
|
||||
trie.handle_release(lock_info, &mut returned_locks)
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Locker { sender }
|
||||
}
|
||||
pub async fn lock(&self, handle_id: usize, ptr: JsonPointer, write: bool) -> Guard {
|
||||
let (send, recv) = oneshot::channel();
|
||||
self.sender
|
||||
.send(Request {
|
||||
lock_info: LockInfo {
|
||||
handle_id,
|
||||
ptr,
|
||||
write,
|
||||
segments_handled: 0,
|
||||
},
|
||||
completion: send,
|
||||
})
|
||||
.unwrap();
|
||||
recv.await.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
struct RequestQueue {
|
||||
closed: bool,
|
||||
recv: mpsc::UnboundedReceiver<Request>,
|
||||
}
|
||||
|
||||
enum Action {
|
||||
HandleRequest(Request),
|
||||
HandleRelease(LockInfo),
|
||||
}
|
||||
|
||||
async fn get_action(
|
||||
new_requests: &mut RequestQueue,
|
||||
returned_locks: &mut Vec<oneshot::Receiver<LockInfo>>,
|
||||
) -> Option<Action> {
|
||||
loop {
|
||||
if new_requests.closed && returned_locks.is_empty() {
|
||||
return None;
|
||||
}
|
||||
tokio::select! {
|
||||
a = new_requests.recv.recv() => {
|
||||
if let Some(a) = a {
|
||||
return Some(Action::HandleRequest(a));
|
||||
} else {
|
||||
new_requests.closed = true;
|
||||
}
|
||||
}
|
||||
(a, idx, _) = futures::future::select_all(returned_locks.iter_mut()) => {
|
||||
returned_locks.swap_remove(idx);
|
||||
return Some(Action::HandleRelease(a.unwrap()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use json_ptr::JsonPointer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::locker::Guard;
|
||||
use crate::{DbHandle, DiffPatch, Error, Revision};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -59,24 +60,25 @@ impl<T: Serialize + for<'de> Deserialize<'de>> DerefMut for ModelDataMut<T> {
|
||||
pub struct Model<T: Serialize + for<'de> Deserialize<'de>> {
|
||||
ptr: JsonPointer,
|
||||
phantom: PhantomData<T>,
|
||||
lock: Option<Arc<Guard>>,
|
||||
}
|
||||
impl<T> Model<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
|
||||
db.lock(&self.ptr).await
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, write: bool) {
|
||||
db.lock(self.ptr.clone(), write).await
|
||||
}
|
||||
|
||||
pub async fn get<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<ModelData<T>, Error> {
|
||||
if lock {
|
||||
self.lock(db).await;
|
||||
self.lock(db, false).await;
|
||||
}
|
||||
Ok(ModelData(db.get(&self.ptr).await?))
|
||||
}
|
||||
|
||||
pub async fn get_mut<Db: DbHandle>(&self, db: &mut Db) -> Result<ModelDataMut<T>, Error> {
|
||||
self.lock(db).await;
|
||||
self.lock(db, true).await;
|
||||
let original = db.get_value(&self.ptr, None).await?;
|
||||
let current = serde_json::from_value(original.clone())?;
|
||||
Ok(ModelDataMut {
|
||||
@@ -92,6 +94,7 @@ where
|
||||
Model {
|
||||
ptr,
|
||||
phantom: PhantomData,
|
||||
lock: self.lock,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -104,7 +107,7 @@ where
|
||||
db: &mut Db,
|
||||
value: &T,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
self.lock(db).await;
|
||||
self.lock(db, true).await;
|
||||
db.put(&self.ptr, value).await
|
||||
}
|
||||
}
|
||||
@@ -116,6 +119,7 @@ where
|
||||
Self {
|
||||
ptr,
|
||||
phantom: PhantomData,
|
||||
lock: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -143,6 +147,7 @@ where
|
||||
Model {
|
||||
ptr: self.ptr.clone(),
|
||||
phantom: PhantomData,
|
||||
lock: self.lock.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -208,8 +213,8 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> HasModel for Box<T> {
|
||||
#[derive(Debug)]
|
||||
pub struct OptionModel<T: HasModel + Serialize + for<'de> Deserialize<'de>>(T::Model);
|
||||
impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db) {
|
||||
db.lock(self.0.as_ref()).await
|
||||
pub async fn lock<Db: DbHandle>(&self, db: &mut Db, write: bool) {
|
||||
db.lock(self.0.as_ref().clone(), write).await
|
||||
}
|
||||
|
||||
pub async fn get<Db: DbHandle>(
|
||||
@@ -218,7 +223,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
lock: bool,
|
||||
) -> Result<ModelData<Option<T>>, Error> {
|
||||
if lock {
|
||||
self.lock(db).await;
|
||||
self.lock(db, false).await;
|
||||
}
|
||||
Ok(ModelData(db.get(self.0.as_ref()).await?))
|
||||
}
|
||||
@@ -227,7 +232,7 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
&self,
|
||||
db: &mut Db,
|
||||
) -> Result<ModelDataMut<Option<T>>, Error> {
|
||||
self.lock(db).await;
|
||||
self.lock(db, true).await;
|
||||
let original = db.get_value(self.0.as_ref(), None).await?;
|
||||
let current = serde_json::from_value(original.clone())?;
|
||||
Ok(ModelDataMut {
|
||||
@@ -239,35 +244,48 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
|
||||
pub async fn exists<Db: DbHandle>(&self, db: &mut Db, lock: bool) -> Result<bool, Error> {
|
||||
if lock {
|
||||
db.lock(self.0.as_ref()).await;
|
||||
db.lock(self.0.as_ref().clone(), false).await;
|
||||
}
|
||||
Ok(db.exists(&self.as_ref(), None).await?)
|
||||
}
|
||||
|
||||
pub fn map<
|
||||
F: FnOnce(T::Model) -> V,
|
||||
F: FnOnce(T::Model) -> U::Model,
|
||||
U: Serialize + for<'de> Deserialize<'de> + HasModel,
|
||||
V: ModelFor<U>,
|
||||
>(
|
||||
self,
|
||||
f: F,
|
||||
) -> OptionModel<U> {
|
||||
Into::<JsonPointer>::into(f(self.0)).into()
|
||||
OptionModel(f(self.0))
|
||||
}
|
||||
|
||||
pub fn and_then<
|
||||
F: FnOnce(T::Model) -> V,
|
||||
U: Serialize + for<'de> Deserialize<'de>,
|
||||
V: ModelFor<Option<U>>,
|
||||
F: FnOnce(T::Model) -> U::Model,
|
||||
U: Serialize + for<'de> Deserialize<'de> + HasModel,
|
||||
>(
|
||||
self,
|
||||
f: F,
|
||||
) -> V {
|
||||
Into::<JsonPointer>::into(f(self.0)).into()
|
||||
) -> OptionModel<U> {
|
||||
OptionModel(f(self.0))
|
||||
}
|
||||
|
||||
pub async fn check<Db: DbHandle>(self, db: &mut Db) -> Result<Option<T::Model>, Error> {
|
||||
Ok(if self.exists(db, true).await? {
|
||||
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
|
||||
db.lock(self.as_ref().clone(), true).await;
|
||||
db.put(self.as_ref(), &Value::Null).await
|
||||
}
|
||||
}
|
||||
impl<T> OptionModel<T>
|
||||
where
|
||||
T: HasModel + Serialize + for<'de> Deserialize<'de>,
|
||||
T::Model: AsMut<Model<T>>,
|
||||
{
|
||||
pub async fn check<Db: DbHandle>(mut self, db: &mut Db) -> Result<Option<T::Model>, Error> {
|
||||
let lock = db
|
||||
.locker()
|
||||
.lock(db.id(), self.0.as_ref().clone(), false)
|
||||
.await;
|
||||
self.0.as_mut().lock = Some(Arc::new(lock));
|
||||
Ok(if self.exists(db, false).await? {
|
||||
Some(self.0)
|
||||
} else {
|
||||
None
|
||||
@@ -281,11 +299,6 @@ impl<T: HasModel + Serialize + for<'de> Deserialize<'de>> OptionModel<T> {
|
||||
Err(Error::NodeDoesNotExist(self.0.into()))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete<Db: DbHandle>(&self, db: &mut Db) -> Result<Option<Arc<Revision>>, Error> {
|
||||
db.lock(self.as_ref()).await;
|
||||
db.put(self.as_ref(), &Value::Null).await
|
||||
}
|
||||
}
|
||||
impl<T> OptionModel<T>
|
||||
where
|
||||
@@ -296,7 +309,7 @@ where
|
||||
db: &mut Db,
|
||||
value: &T,
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
db.lock(self.as_ref()).await;
|
||||
db.lock(self.as_ref().clone(), true).await;
|
||||
db.put(self.as_ref(), value).await
|
||||
}
|
||||
}
|
||||
@@ -460,7 +473,7 @@ where
|
||||
lock: bool,
|
||||
) -> Result<IndexSet<T::Key>, Error> {
|
||||
if lock {
|
||||
db.lock(self.as_ref()).await;
|
||||
db.lock(self.as_ref().clone(), false).await;
|
||||
}
|
||||
let set = db.keys(self.as_ref(), None).await?;
|
||||
Ok(set
|
||||
@@ -469,7 +482,7 @@ where
|
||||
.collect::<Result<_, _>>()?)
|
||||
}
|
||||
pub async fn remove<Db: DbHandle>(&self, db: &mut Db, key: &T::Key) -> Result<(), Error> {
|
||||
db.lock(self.as_ref()).await;
|
||||
db.lock(self.as_ref().clone(), true).await;
|
||||
db.apply(DiffPatch(Patch(vec![PatchOperation::Remove(
|
||||
RemoveOperation {
|
||||
path: self.as_ref().clone().join_end(key.as_ref()),
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Error as IOError;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use fd_lock_rs::FdLock;
|
||||
@@ -210,6 +211,7 @@ pub struct PatchDb {
|
||||
pub(crate) store: Arc<RwLock<Store>>,
|
||||
subscriber: Arc<Sender<Arc<Revision>>>,
|
||||
pub(crate) locker: Arc<Locker>,
|
||||
handle_id: Arc<AtomicUsize>,
|
||||
}
|
||||
impl PatchDb {
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
@@ -219,6 +221,7 @@ impl PatchDb {
|
||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||
locker: Arc::new(Locker::new()),
|
||||
subscriber: Arc::new(subscriber),
|
||||
handle_id: Arc::new(AtomicUsize::new(0)),
|
||||
})
|
||||
}
|
||||
pub async fn dump(&self) -> Dump {
|
||||
@@ -282,6 +285,9 @@ impl PatchDb {
|
||||
}
|
||||
pub fn handle(&self) -> PatchDbHandle {
|
||||
PatchDbHandle {
|
||||
id: self
|
||||
.handle_id
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
|
||||
db: self.clone(),
|
||||
locks: Vec::new(),
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
use crate::store::Store;
|
||||
use crate::Error;
|
||||
use crate::{
|
||||
locker::{Locker, LockerGuard},
|
||||
locker::{Guard, Locker},
|
||||
DbHandle,
|
||||
};
|
||||
use crate::{
|
||||
@@ -21,8 +21,9 @@ use crate::{
|
||||
};
|
||||
|
||||
pub struct Transaction<Parent: DbHandle> {
|
||||
pub(crate) id: usize,
|
||||
pub(crate) parent: Parent,
|
||||
pub(crate) locks: Vec<(JsonPointer, Option<LockerGuard>)>,
|
||||
pub(crate) locks: Vec<Guard>,
|
||||
pub(crate) updates: DiffPatch,
|
||||
pub(crate) sub: Receiver<Arc<Revision>>,
|
||||
}
|
||||
@@ -71,12 +72,16 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
let sub = self.parent.subscribe();
|
||||
drop(store);
|
||||
Ok(Transaction {
|
||||
id: self.id(),
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
updates: DiffPatch::default(),
|
||||
sub,
|
||||
})
|
||||
}
|
||||
fn id(&self) -> usize {
|
||||
self.id
|
||||
}
|
||||
fn rebase(&mut self) -> Result<(), Error> {
|
||||
self.parent.rebase()?;
|
||||
while let Some(rev) = match self.sub.try_recv() {
|
||||
@@ -94,10 +99,8 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.parent.subscribe()
|
||||
}
|
||||
fn locker_and_locks(&mut self) -> (&Locker, Vec<&mut [(JsonPointer, Option<LockerGuard>)]>) {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locks.push(&mut self.locks);
|
||||
(locker, locks)
|
||||
fn locker(&self) -> &Locker {
|
||||
self.parent.locker()
|
||||
}
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
@@ -162,9 +165,9 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
self.updates.append(patch);
|
||||
Ok(None)
|
||||
}
|
||||
async fn lock(&mut self, ptr: &JsonPointer) {
|
||||
let (locker, mut locks) = self.parent.locker_and_locks();
|
||||
locker.add_lock(ptr, &mut self.locks, &mut locks).await
|
||||
async fn lock(&mut self, ptr: JsonPointer, write: bool) {
|
||||
self.locks
|
||||
.push(self.parent.locker().lock(self.id, ptr, write).await)
|
||||
}
|
||||
async fn get<
|
||||
T: for<'de> Deserialize<'de>,
|
||||
|
||||
Reference in New Issue
Block a user