mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
remove barrage (#49)
This commit is contained in:
@@ -18,7 +18,6 @@ unstable = []
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1.42"
|
||||
barrage = "0.2.3"
|
||||
fd-lock-rs = "0.1.3"
|
||||
futures = "0.3.8"
|
||||
imbl = "1.0.1"
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use barrage::Receiver;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
@@ -48,7 +47,6 @@ pub trait DbHandle: Send + Sync + Sized {
|
||||
fn id(&self) -> HandleId;
|
||||
fn rebase(&mut self);
|
||||
fn store(&self) -> Arc<RwLock<Store>>;
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>>;
|
||||
fn locker(&self) -> &Locker;
|
||||
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
|
||||
&mut self,
|
||||
@@ -120,9 +118,6 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
(**self).store()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
(**self).subscribe()
|
||||
}
|
||||
fn locker(&self) -> &Locker {
|
||||
(**self).locker()
|
||||
}
|
||||
@@ -211,7 +206,7 @@ impl std::fmt::Debug for PatchDbHandle {
|
||||
impl DbHandle for PatchDbHandle {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||
Ok(Transaction {
|
||||
sub: self.subscribe(),
|
||||
sub: self.db.subscribe().await,
|
||||
id: self.id(),
|
||||
parent: self,
|
||||
locks: Vec::new(),
|
||||
@@ -225,9 +220,6 @@ impl DbHandle for PatchDbHandle {
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.db.store.clone()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.db.subscribe()
|
||||
}
|
||||
fn locker(&self) -> &Locker {
|
||||
&self.db.locker
|
||||
}
|
||||
@@ -342,9 +334,6 @@ pub mod test_utils {
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn locker(&self) -> &Locker {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::io::Error as IOError;
|
||||
use std::sync::Arc;
|
||||
|
||||
use barrage::Disconnected;
|
||||
use json_ptr::JsonPointer;
|
||||
use locker::LockError;
|
||||
use thiserror::Error;
|
||||
@@ -15,6 +14,7 @@ mod model;
|
||||
mod model_paths;
|
||||
mod patch;
|
||||
mod store;
|
||||
mod subscriber;
|
||||
mod transaction;
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -34,7 +34,7 @@ pub use {json_patch, json_ptr};
|
||||
|
||||
pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier};
|
||||
|
||||
pub type Subscriber = barrage::Receiver<Arc<Revision>>;
|
||||
pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver<Arc<Revision>>;
|
||||
|
||||
pub mod test_utils {
|
||||
use super::*;
|
||||
@@ -60,7 +60,7 @@ pub enum Error {
|
||||
#[error("Database Cache Corrupted: {0}")]
|
||||
CacheCorrupted(Arc<Error>),
|
||||
#[error("Subscriber Error: {0:?}")]
|
||||
Subscriber(Disconnected),
|
||||
Subscriber(#[from] tokio::sync::mpsc::error::TryRecvError),
|
||||
#[error("Node Does Not Exist: {0}")]
|
||||
NodeDoesNotExist(JsonPointer),
|
||||
#[error("Invalid Lock Request: {0}")]
|
||||
@@ -68,8 +68,3 @@ pub enum Error {
|
||||
#[error("Invalid Lock Request: {0}")]
|
||||
Locker(String),
|
||||
}
|
||||
impl From<Disconnected> for Error {
|
||||
fn from(e: Disconnected) -> Self {
|
||||
Error::Subscriber(e)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
|
||||
use barrage::{Receiver, Sender};
|
||||
use fd_lock_rs::FdLock;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use lazy_static::lazy_static;
|
||||
@@ -18,7 +17,8 @@ use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
|
||||
use crate::handle::HandleId;
|
||||
use crate::locker::Locker;
|
||||
use crate::patch::{diff, DiffPatch, Dump, Revision};
|
||||
use crate::{Error, PatchDbHandle};
|
||||
use crate::subscriber::Broadcast;
|
||||
use crate::{Error, PatchDbHandle, Subscriber};
|
||||
|
||||
lazy_static! {
|
||||
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
|
||||
@@ -64,6 +64,7 @@ pub struct Store {
|
||||
persistent: Value,
|
||||
revision: u64,
|
||||
revision_cache: RevisionCache,
|
||||
broadcast: Broadcast<Arc<Revision>>,
|
||||
}
|
||||
impl Store {
|
||||
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
@@ -121,6 +122,7 @@ impl Store {
|
||||
persistent,
|
||||
revision,
|
||||
revision_cache: RevisionCache::with_capacity(64),
|
||||
broadcast: Broadcast::new(),
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
@@ -168,6 +170,9 @@ impl Store {
|
||||
value: self.persistent.clone(),
|
||||
})
|
||||
}
|
||||
pub(crate) fn subscribe(&mut self) -> Subscriber {
|
||||
self.broadcast.subscribe()
|
||||
}
|
||||
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
|
||||
&mut self,
|
||||
ptr: &JsonPointer<S, V>,
|
||||
@@ -247,6 +252,7 @@ impl Store {
|
||||
let id = self.revision;
|
||||
let res = Arc::new(Revision { id, patch });
|
||||
self.revision_cache.push(res.clone());
|
||||
self.broadcast.send(&res);
|
||||
|
||||
Ok(Some(res))
|
||||
}
|
||||
@@ -255,18 +261,14 @@ impl Store {
|
||||
#[derive(Clone)]
|
||||
pub struct PatchDb {
|
||||
pub(crate) store: Arc<RwLock<Store>>,
|
||||
subscriber: Arc<(Sender<Arc<Revision>>, Receiver<Arc<Revision>>)>,
|
||||
pub(crate) locker: Arc<Locker>,
|
||||
handle_id: Arc<AtomicU64>,
|
||||
}
|
||||
impl PatchDb {
|
||||
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
|
||||
let subscriber = barrage::unbounded();
|
||||
|
||||
Ok(PatchDb {
|
||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||
locker: Arc::new(Locker::new()),
|
||||
subscriber: Arc::new(subscriber),
|
||||
handle_id: Arc::new(AtomicU64::new(0)),
|
||||
})
|
||||
}
|
||||
@@ -281,11 +283,14 @@ impl PatchDb {
|
||||
pub async fn dump(&self) -> Result<Dump, Error> {
|
||||
self.store.read().await.dump()
|
||||
}
|
||||
pub async fn dump_and_sub(&self) -> Result<(Dump, Receiver<Arc<Revision>>), Error> {
|
||||
let store = self.store.read().await;
|
||||
let sub = self.subscriber.1.clone();
|
||||
pub async fn dump_and_sub(&self) -> Result<(Dump, Subscriber), Error> {
|
||||
let mut store = self.store.write().await;
|
||||
let sub = store.broadcast.subscribe();
|
||||
Ok((store.dump()?, sub))
|
||||
}
|
||||
pub async fn subscribe(&self) -> Subscriber {
|
||||
self.store.write().await.subscribe()
|
||||
}
|
||||
pub async fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
|
||||
self.store.read().await.exists(ptr)
|
||||
}
|
||||
@@ -311,9 +316,6 @@ impl PatchDb {
|
||||
) -> Result<Option<Arc<Revision>>, Error> {
|
||||
let mut store = self.store.write().await;
|
||||
let rev = store.put(ptr, value).await?;
|
||||
if let Some(rev) = rev.as_ref() {
|
||||
self.subscriber.0.send(rev.clone()).unwrap_or_default();
|
||||
}
|
||||
Ok(rev)
|
||||
}
|
||||
pub async fn apply(
|
||||
@@ -327,14 +329,8 @@ impl PatchDb {
|
||||
self.store.write().await
|
||||
};
|
||||
let rev = store.apply(patch).await?;
|
||||
if let Some(rev) = rev.as_ref() {
|
||||
self.subscriber.0.send(rev.clone()).unwrap_or_default(); // ignore errors
|
||||
}
|
||||
Ok(rev)
|
||||
}
|
||||
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.subscriber.1.clone()
|
||||
}
|
||||
pub fn handle(&self) -> PatchDbHandle {
|
||||
PatchDbHandle {
|
||||
id: HandleId {
|
||||
|
||||
35
patch-db/src/subscriber.rs
Normal file
35
patch-db/src/subscriber.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Broadcast<T: Clone> {
|
||||
listeners: Vec<mpsc::UnboundedSender<T>>,
|
||||
}
|
||||
impl<T: Clone> Default for Broadcast<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
listeners: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T: Clone> Broadcast<T> {
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
pub fn send(&mut self, value: &T) {
|
||||
let mut i = 0;
|
||||
while i < self.listeners.len() {
|
||||
if self.listeners[i].send(value.clone()).is_err() {
|
||||
self.listeners.swap_remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver<T> {
|
||||
let (send, recv) = mpsc::unbounded_channel();
|
||||
self.listeners.push(send);
|
||||
recv
|
||||
}
|
||||
}
|
||||
@@ -23,7 +23,6 @@ async fn init_db(db_name: String) -> PatchDb {
|
||||
c: NewType(None),
|
||||
},
|
||||
},
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -35,7 +34,7 @@ async fn cleanup_db(db_name: &str) {
|
||||
}
|
||||
|
||||
async fn put_string_into_root(db: PatchDb, s: String) -> Arc<Revision> {
|
||||
db.put(&JsonPointer::<&'static str>::default(), &s, None)
|
||||
db.put(&JsonPointer::<&'static str>::default(), &s)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
@@ -47,7 +46,7 @@ async fn basic() {
|
||||
let ptr: JsonPointer = "/b/b".parse().unwrap();
|
||||
let mut get_res: Value = db.get(&ptr).await.unwrap();
|
||||
assert_eq!(get_res, 1);
|
||||
db.put(&ptr, "hello", None).await.unwrap();
|
||||
db.put(&ptr, "hello").await.unwrap();
|
||||
get_res = db.get(&ptr).await.unwrap();
|
||||
assert_eq!(get_res, "hello");
|
||||
cleanup_db("test.db").await;
|
||||
@@ -61,7 +60,7 @@ async fn transaction() {
|
||||
let ptr: JsonPointer = "/b/b".parse().unwrap();
|
||||
tx.put(&ptr, &(2 as usize)).await.unwrap();
|
||||
tx.put(&ptr, &(1 as usize)).await.unwrap();
|
||||
let _res = tx.commit(None).await.unwrap();
|
||||
let _res = tx.commit().await.unwrap();
|
||||
println!("res = {:?}", _res);
|
||||
cleanup_db("test.db").await;
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use barrage::Receiver;
|
||||
use json_ptr::{JsonPointer, SegList};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
@@ -15,14 +14,14 @@ use crate::{
|
||||
locker::{Guard, LockType, Locker},
|
||||
};
|
||||
use crate::{handle::HandleId, model_paths::JsonGlob};
|
||||
use crate::{DbHandle, Error, PatchDbHandle};
|
||||
use crate::{DbHandle, Error, PatchDbHandle, Subscriber};
|
||||
|
||||
pub struct Transaction<Parent: DbHandle> {
|
||||
pub(crate) id: HandleId,
|
||||
pub(crate) parent: Parent,
|
||||
pub(crate) locks: Vec<Guard>,
|
||||
pub(crate) updates: DiffPatch,
|
||||
pub(crate) sub: Receiver<Arc<Revision>>,
|
||||
pub(crate) sub: Subscriber,
|
||||
}
|
||||
impl Transaction<&mut PatchDbHandle> {
|
||||
pub async fn commit(mut self) -> Result<Option<Arc<Revision>>, Error> {
|
||||
@@ -56,9 +55,9 @@ impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
|
||||
impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
|
||||
let store_lock = self.parent.store();
|
||||
let store = store_lock.read().await;
|
||||
let mut store = store_lock.write().await;
|
||||
self.rebase();
|
||||
let sub = self.parent.subscribe();
|
||||
let sub = store.subscribe();
|
||||
drop(store);
|
||||
Ok(Transaction {
|
||||
id: self.id(),
|
||||
@@ -73,16 +72,13 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
|
||||
}
|
||||
fn rebase(&mut self) {
|
||||
self.parent.rebase();
|
||||
while let Some(rev) = self.sub.try_recv().unwrap() {
|
||||
while let Ok(rev) = self.sub.try_recv() {
|
||||
self.updates.rebase(&rev.patch);
|
||||
}
|
||||
}
|
||||
fn store(&self) -> Arc<RwLock<Store>> {
|
||||
self.parent.store()
|
||||
}
|
||||
fn subscribe(&self) -> Receiver<Arc<Revision>> {
|
||||
self.parent.subscribe()
|
||||
}
|
||||
fn locker(&self) -> &Locker {
|
||||
self.parent.locker()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user