remove barrage

This commit is contained in:
Aiden McClelland
2022-09-26 12:21:39 -06:00
parent e74f36f073
commit 41863d99f1
7 changed files with 61 additions and 52 deletions

View File

@@ -18,7 +18,6 @@ unstable = []
[dependencies]
async-trait = "0.1.42"
barrage = "0.2.3"
fd-lock-rs = "0.1.3"
futures = "0.3.8"
imbl = "1.0.1"

View File

@@ -2,7 +2,6 @@ use std::collections::BTreeSet;
use std::sync::Arc;
use async_trait::async_trait;
use barrage::Receiver;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -48,7 +47,6 @@ pub trait DbHandle: Send + Sync + Sized {
fn id(&self) -> HandleId;
fn rebase(&mut self);
fn store(&self) -> Arc<RwLock<Store>>;
fn subscribe(&self) -> Receiver<Arc<Revision>>;
fn locker(&self) -> &Locker;
async fn exists<S: AsRef<str> + Send + Sync, V: SegList + Send + Sync>(
&mut self,
@@ -120,9 +118,6 @@ impl<Handle: DbHandle + ?Sized> DbHandle for &mut Handle {
fn store(&self) -> Arc<RwLock<Store>> {
(**self).store()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
(**self).subscribe()
}
fn locker(&self) -> &Locker {
(**self).locker()
}
@@ -211,7 +206,7 @@ impl std::fmt::Debug for PatchDbHandle {
impl DbHandle for PatchDbHandle {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
Ok(Transaction {
sub: self.subscribe(),
sub: self.db.subscribe().await,
id: self.id(),
parent: self,
locks: Vec::new(),
@@ -225,9 +220,6 @@ impl DbHandle for PatchDbHandle {
fn store(&self) -> Arc<RwLock<Store>> {
self.db.store.clone()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.db.subscribe()
}
fn locker(&self) -> &Locker {
&self.db.locker
}
@@ -342,9 +334,6 @@ pub mod test_utils {
fn store(&self) -> Arc<RwLock<Store>> {
unimplemented!()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
unimplemented!()
}
fn locker(&self) -> &Locker {
unimplemented!()
}

View File

@@ -1,7 +1,6 @@
use std::io::Error as IOError;
use std::sync::Arc;
use barrage::Disconnected;
use json_ptr::JsonPointer;
use locker::LockError;
use thiserror::Error;
@@ -15,6 +14,7 @@ mod model;
mod model_paths;
mod patch;
mod store;
mod subscriber;
mod transaction;
#[cfg(test)]
@@ -34,7 +34,7 @@ pub use {json_patch, json_ptr};
pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier};
pub type Subscriber = barrage::Receiver<Arc<Revision>>;
pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver<Arc<Revision>>;
pub mod test_utils {
use super::*;
@@ -60,7 +60,7 @@ pub enum Error {
#[error("Database Cache Corrupted: {0}")]
CacheCorrupted(Arc<Error>),
#[error("Subscriber Error: {0:?}")]
Subscriber(Disconnected),
Subscriber(#[from] tokio::sync::mpsc::error::TryRecvError),
#[error("Node Does Not Exist: {0}")]
NodeDoesNotExist(JsonPointer),
#[error("Invalid Lock Request: {0}")]
@@ -68,8 +68,3 @@ pub enum Error {
#[error("Invalid Lock Request: {0}")]
Locker(String),
}
impl From<Disconnected> for Error {
fn from(e: Disconnected) -> Self {
Error::Subscriber(e)
}
}

View File

@@ -5,7 +5,6 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use barrage::{Receiver, Sender};
use fd_lock_rs::FdLock;
use json_ptr::{JsonPointer, SegList};
use lazy_static::lazy_static;
@@ -18,7 +17,8 @@ use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard};
use crate::handle::HandleId;
use crate::locker::Locker;
use crate::patch::{diff, DiffPatch, Dump, Revision};
use crate::{Error, PatchDbHandle};
use crate::subscriber::Broadcast;
use crate::{Error, PatchDbHandle, Subscriber};
lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
@@ -64,6 +64,7 @@ pub struct Store {
persistent: Value,
revision: u64,
revision_cache: RevisionCache,
broadcast: Broadcast<Arc<Revision>>,
}
impl Store {
pub(crate) async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
@@ -121,6 +122,7 @@ impl Store {
persistent,
revision,
revision_cache: RevisionCache::with_capacity(64),
broadcast: Broadcast::new(),
})
})
.await??;
@@ -168,6 +170,9 @@ impl Store {
value: self.persistent.clone(),
})
}
pub(crate) fn subscribe(&mut self) -> Subscriber {
self.broadcast.subscribe()
}
pub(crate) async fn put<T: Serialize + ?Sized, S: AsRef<str>, V: SegList>(
&mut self,
ptr: &JsonPointer<S, V>,
@@ -247,6 +252,7 @@ impl Store {
let id = self.revision;
let res = Arc::new(Revision { id, patch });
self.revision_cache.push(res.clone());
self.broadcast.send(&res);
Ok(Some(res))
}
@@ -255,18 +261,14 @@ impl Store {
#[derive(Clone)]
pub struct PatchDb {
pub(crate) store: Arc<RwLock<Store>>,
subscriber: Arc<(Sender<Arc<Revision>>, Receiver<Arc<Revision>>)>,
pub(crate) locker: Arc<Locker>,
handle_id: Arc<AtomicU64>,
}
impl PatchDb {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let subscriber = barrage::unbounded();
Ok(PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)),
locker: Arc::new(Locker::new()),
subscriber: Arc::new(subscriber),
handle_id: Arc::new(AtomicU64::new(0)),
})
}
@@ -281,11 +283,14 @@ impl PatchDb {
pub async fn dump(&self) -> Result<Dump, Error> {
self.store.read().await.dump()
}
pub async fn dump_and_sub(&self) -> Result<(Dump, Receiver<Arc<Revision>>), Error> {
let store = self.store.read().await;
let sub = self.subscriber.1.clone();
pub async fn dump_and_sub(&self) -> Result<(Dump, Subscriber), Error> {
let mut store = self.store.write().await;
let sub = store.broadcast.subscribe();
Ok((store.dump()?, sub))
}
pub async fn subscribe(&self) -> Subscriber {
self.store.write().await.subscribe()
}
pub async fn exists<S: AsRef<str>, V: SegList>(&self, ptr: &JsonPointer<S, V>) -> bool {
self.store.read().await.exists(ptr)
}
@@ -311,9 +316,6 @@ impl PatchDb {
) -> Result<Option<Arc<Revision>>, Error> {
let mut store = self.store.write().await;
let rev = store.put(ptr, value).await?;
if let Some(rev) = rev.as_ref() {
self.subscriber.0.send(rev.clone()).unwrap_or_default();
}
Ok(rev)
}
pub async fn apply(
@@ -327,14 +329,8 @@ impl PatchDb {
self.store.write().await
};
let rev = store.apply(patch).await?;
if let Some(rev) = rev.as_ref() {
self.subscriber.0.send(rev.clone()).unwrap_or_default(); // ignore errors
}
Ok(rev)
}
pub fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.subscriber.1.clone()
}
pub fn handle(&self) -> PatchDbHandle {
PatchDbHandle {
id: HandleId {

View File

@@ -0,0 +1,35 @@
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct Broadcast<T: Clone> {
listeners: Vec<mpsc::UnboundedSender<T>>,
}
impl<T: Clone> Default for Broadcast<T> {
fn default() -> Self {
Self {
listeners: Vec::new(),
}
}
}
impl<T: Clone> Broadcast<T> {
pub fn new() -> Self {
Default::default()
}
pub fn send(&mut self, value: &T) {
let mut i = 0;
while i < self.listeners.len() {
if self.listeners[i].send(value.clone()).is_err() {
self.listeners.swap_remove(i);
} else {
i += 1;
}
}
}
pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver<T> {
let (send, recv) = mpsc::unbounded_channel();
self.listeners.push(send);
recv
}
}

View File

@@ -23,7 +23,6 @@ async fn init_db(db_name: String) -> PatchDb {
c: NewType(None),
},
},
None,
)
.await
.unwrap();
@@ -35,7 +34,7 @@ async fn cleanup_db(db_name: &str) {
}
async fn put_string_into_root(db: PatchDb, s: String) -> Arc<Revision> {
db.put(&JsonPointer::<&'static str>::default(), &s, None)
db.put(&JsonPointer::<&'static str>::default(), &s)
.await
.unwrap()
.unwrap()
@@ -47,7 +46,7 @@ async fn basic() {
let ptr: JsonPointer = "/b/b".parse().unwrap();
let mut get_res: Value = db.get(&ptr).await.unwrap();
assert_eq!(get_res, 1);
db.put(&ptr, "hello", None).await.unwrap();
db.put(&ptr, "hello").await.unwrap();
get_res = db.get(&ptr).await.unwrap();
assert_eq!(get_res, "hello");
cleanup_db("test.db").await;
@@ -61,7 +60,7 @@ async fn transaction() {
let ptr: JsonPointer = "/b/b".parse().unwrap();
tx.put(&ptr, &(2 as usize)).await.unwrap();
tx.put(&ptr, &(1 as usize)).await.unwrap();
let _res = tx.commit(None).await.unwrap();
let _res = tx.commit().await.unwrap();
println!("res = {:?}", _res);
cleanup_db("test.db").await;
}

View File

@@ -2,7 +2,6 @@ use std::collections::BTreeSet;
use std::sync::Arc;
use async_trait::async_trait;
use barrage::Receiver;
use json_ptr::{JsonPointer, SegList};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -15,14 +14,14 @@ use crate::{
locker::{Guard, LockType, Locker},
};
use crate::{handle::HandleId, model_paths::JsonGlob};
use crate::{DbHandle, Error, PatchDbHandle};
use crate::{DbHandle, Error, PatchDbHandle, Subscriber};
pub struct Transaction<Parent: DbHandle> {
pub(crate) id: HandleId,
pub(crate) parent: Parent,
pub(crate) locks: Vec<Guard>,
pub(crate) updates: DiffPatch,
pub(crate) sub: Receiver<Arc<Revision>>,
pub(crate) sub: Subscriber,
}
impl Transaction<&mut PatchDbHandle> {
pub async fn commit(mut self) -> Result<Option<Arc<Revision>>, Error> {
@@ -56,9 +55,9 @@ impl<Parent: DbHandle + Send + Sync> Transaction<Parent> {
impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
async fn begin<'a>(&'a mut self) -> Result<Transaction<&'a mut Self>, Error> {
let store_lock = self.parent.store();
let store = store_lock.read().await;
let mut store = store_lock.write().await;
self.rebase();
let sub = self.parent.subscribe();
let sub = store.subscribe();
drop(store);
Ok(Transaction {
id: self.id(),
@@ -73,16 +72,13 @@ impl<Parent: DbHandle + Send + Sync> DbHandle for Transaction<Parent> {
}
fn rebase(&mut self) {
self.parent.rebase();
while let Some(rev) = self.sub.try_recv().unwrap() {
while let Ok(rev) = self.sub.try_recv() {
self.updates.rebase(&rev.patch);
}
}
fn store(&self) -> Arc<RwLock<Store>> {
self.parent.store()
}
fn subscribe(&self) -> Receiver<Arc<Revision>> {
self.parent.subscribe()
}
fn locker(&self) -> &Locker {
self.parent.locker()
}