From 50657d63e2b70e6b1ebb84468c09a04a9b9e78ac Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Mon, 26 Sep 2022 13:35:13 -0600 Subject: [PATCH] remove barrage (#49) --- patch-db/Cargo.toml | 1 - patch-db/src/handle.rs | 13 +------------ patch-db/src/lib.rs | 11 +++-------- patch-db/src/store.rs | 32 ++++++++++++++------------------ patch-db/src/subscriber.rs | 35 +++++++++++++++++++++++++++++++++++ patch-db/src/test.rs | 7 +++---- patch-db/src/transaction.rs | 14 +++++--------- 7 files changed, 61 insertions(+), 52 deletions(-) create mode 100644 patch-db/src/subscriber.rs diff --git a/patch-db/Cargo.toml b/patch-db/Cargo.toml index 5f31523..273619c 100644 --- a/patch-db/Cargo.toml +++ b/patch-db/Cargo.toml @@ -18,7 +18,6 @@ unstable = [] [dependencies] async-trait = "0.1.42" -barrage = "0.2.3" fd-lock-rs = "0.1.3" futures = "0.3.8" imbl = "1.0.1" diff --git a/patch-db/src/handle.rs b/patch-db/src/handle.rs index 5967f9d..22d1f58 100644 --- a/patch-db/src/handle.rs +++ b/patch-db/src/handle.rs @@ -2,7 +2,6 @@ use std::collections::BTreeSet; use std::sync::Arc; use async_trait::async_trait; -use barrage::Receiver; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -48,7 +47,6 @@ pub trait DbHandle: Send + Sync + Sized { fn id(&self) -> HandleId; fn rebase(&mut self); fn store(&self) -> Arc>; - fn subscribe(&self) -> Receiver>; fn locker(&self) -> &Locker; async fn exists + Send + Sync, V: SegList + Send + Sync>( &mut self, @@ -120,9 +118,6 @@ impl DbHandle for &mut Handle { fn store(&self) -> Arc> { (**self).store() } - fn subscribe(&self) -> Receiver> { - (**self).subscribe() - } fn locker(&self) -> &Locker { (**self).locker() } @@ -211,7 +206,7 @@ impl std::fmt::Debug for PatchDbHandle { impl DbHandle for PatchDbHandle { async fn begin<'a>(&'a mut self) -> Result, Error> { Ok(Transaction { - sub: self.subscribe(), + sub: self.db.subscribe().await, id: self.id(), parent: self, locks: Vec::new(), @@ -225,9 +220,6 @@ impl DbHandle for PatchDbHandle { fn store(&self) -> Arc> { self.db.store.clone() } - fn subscribe(&self) -> Receiver> { - self.db.subscribe() - } fn locker(&self) -> &Locker { &self.db.locker } @@ -342,9 +334,6 @@ pub mod test_utils { fn store(&self) -> Arc> { unimplemented!() } - fn subscribe(&self) -> Receiver> { - unimplemented!() - } fn locker(&self) -> &Locker { unimplemented!() } diff --git a/patch-db/src/lib.rs b/patch-db/src/lib.rs index 509a609..e5026d9 100644 --- a/patch-db/src/lib.rs +++ b/patch-db/src/lib.rs @@ -1,7 +1,6 @@ use std::io::Error as IOError; use std::sync::Arc; -use barrage::Disconnected; use json_ptr::JsonPointer; use locker::LockError; use thiserror::Error; @@ -15,6 +14,7 @@ mod model; mod model_paths; mod patch; mod store; +mod subscriber; mod transaction; #[cfg(test)] @@ -34,7 +34,7 @@ pub use {json_patch, json_ptr}; pub use bulk_locks::{LockReceipt, LockTarget, LockTargetId, Verifier}; -pub type Subscriber = barrage::Receiver>; +pub type Subscriber = tokio::sync::mpsc::UnboundedReceiver>; pub mod test_utils { use super::*; @@ -60,7 +60,7 @@ pub enum Error { #[error("Database Cache Corrupted: {0}")] CacheCorrupted(Arc), #[error("Subscriber Error: {0:?}")] - Subscriber(Disconnected), + Subscriber(#[from] tokio::sync::mpsc::error::TryRecvError), #[error("Node Does Not Exist: {0}")] NodeDoesNotExist(JsonPointer), #[error("Invalid Lock Request: {0}")] @@ -68,8 +68,3 @@ pub enum Error { #[error("Invalid Lock Request: {0}")] Locker(String), } -impl From for Error { - fn from(e: Disconnected) -> Self { - Error::Subscriber(e) - } -} diff --git a/patch-db/src/store.rs b/patch-db/src/store.rs index d299e4f..2e5c487 100644 --- a/patch-db/src/store.rs +++ b/patch-db/src/store.rs @@ -5,7 +5,6 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use barrage::{Receiver, Sender}; use fd_lock_rs::FdLock; use json_ptr::{JsonPointer, SegList}; use lazy_static::lazy_static; @@ -18,7 +17,8 @@ use tokio::sync::{Mutex, OwnedMutexGuard, RwLock, RwLockWriteGuard}; use crate::handle::HandleId; use crate::locker::Locker; use crate::patch::{diff, DiffPatch, Dump, Revision}; -use crate::{Error, PatchDbHandle}; +use crate::subscriber::Broadcast; +use crate::{Error, PatchDbHandle, Subscriber}; lazy_static! { static ref OPEN_STORES: Mutex>>> = Mutex::new(HashMap::new()); @@ -64,6 +64,7 @@ pub struct Store { persistent: Value, revision: u64, revision_cache: RevisionCache, + broadcast: Broadcast>, } impl Store { pub(crate) async fn open>(path: P) -> Result { @@ -121,6 +122,7 @@ impl Store { persistent, revision, revision_cache: RevisionCache::with_capacity(64), + broadcast: Broadcast::new(), }) }) .await??; @@ -168,6 +170,9 @@ impl Store { value: self.persistent.clone(), }) } + pub(crate) fn subscribe(&mut self) -> Subscriber { + self.broadcast.subscribe() + } pub(crate) async fn put, V: SegList>( &mut self, ptr: &JsonPointer, @@ -247,6 +252,7 @@ impl Store { let id = self.revision; let res = Arc::new(Revision { id, patch }); self.revision_cache.push(res.clone()); + self.broadcast.send(&res); Ok(Some(res)) } @@ -255,18 +261,14 @@ impl Store { #[derive(Clone)] pub struct PatchDb { pub(crate) store: Arc>, - subscriber: Arc<(Sender>, Receiver>)>, pub(crate) locker: Arc, handle_id: Arc, } impl PatchDb { pub async fn open>(path: P) -> Result { - let subscriber = barrage::unbounded(); - Ok(PatchDb { store: Arc::new(RwLock::new(Store::open(path).await?)), locker: Arc::new(Locker::new()), - subscriber: Arc::new(subscriber), handle_id: Arc::new(AtomicU64::new(0)), }) } @@ -281,11 +283,14 @@ impl PatchDb { pub async fn dump(&self) -> Result { self.store.read().await.dump() } - pub async fn dump_and_sub(&self) -> Result<(Dump, Receiver>), Error> { - let store = self.store.read().await; - let sub = self.subscriber.1.clone(); + pub async fn dump_and_sub(&self) -> Result<(Dump, Subscriber), Error> { + let mut store = self.store.write().await; + let sub = store.broadcast.subscribe(); Ok((store.dump()?, sub)) } + pub async fn subscribe(&self) -> Subscriber { + self.store.write().await.subscribe() + } pub async fn exists, V: SegList>(&self, ptr: &JsonPointer) -> bool { self.store.read().await.exists(ptr) } @@ -311,9 +316,6 @@ impl PatchDb { ) -> Result>, Error> { let mut store = self.store.write().await; let rev = store.put(ptr, value).await?; - if let Some(rev) = rev.as_ref() { - self.subscriber.0.send(rev.clone()).unwrap_or_default(); - } Ok(rev) } pub async fn apply( @@ -327,14 +329,8 @@ impl PatchDb { self.store.write().await }; let rev = store.apply(patch).await?; - if let Some(rev) = rev.as_ref() { - self.subscriber.0.send(rev.clone()).unwrap_or_default(); // ignore errors - } Ok(rev) } - pub fn subscribe(&self) -> Receiver> { - self.subscriber.1.clone() - } pub fn handle(&self) -> PatchDbHandle { PatchDbHandle { id: HandleId { diff --git a/patch-db/src/subscriber.rs b/patch-db/src/subscriber.rs new file mode 100644 index 0000000..bc04aa5 --- /dev/null +++ b/patch-db/src/subscriber.rs @@ -0,0 +1,35 @@ +use tokio::sync::mpsc; + +#[derive(Debug)] +pub struct Broadcast { + listeners: Vec>, +} +impl Default for Broadcast { + fn default() -> Self { + Self { + listeners: Vec::new(), + } + } +} +impl Broadcast { + pub fn new() -> Self { + Default::default() + } + + pub fn send(&mut self, value: &T) { + let mut i = 0; + while i < self.listeners.len() { + if self.listeners[i].send(value.clone()).is_err() { + self.listeners.swap_remove(i); + } else { + i += 1; + } + } + } + + pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver { + let (send, recv) = mpsc::unbounded_channel(); + self.listeners.push(send); + recv + } +} diff --git a/patch-db/src/test.rs b/patch-db/src/test.rs index 279249f..6327afc 100644 --- a/patch-db/src/test.rs +++ b/patch-db/src/test.rs @@ -23,7 +23,6 @@ async fn init_db(db_name: String) -> PatchDb { c: NewType(None), }, }, - None, ) .await .unwrap(); @@ -35,7 +34,7 @@ async fn cleanup_db(db_name: &str) { } async fn put_string_into_root(db: PatchDb, s: String) -> Arc { - db.put(&JsonPointer::<&'static str>::default(), &s, None) + db.put(&JsonPointer::<&'static str>::default(), &s) .await .unwrap() .unwrap() @@ -47,7 +46,7 @@ async fn basic() { let ptr: JsonPointer = "/b/b".parse().unwrap(); let mut get_res: Value = db.get(&ptr).await.unwrap(); assert_eq!(get_res, 1); - db.put(&ptr, "hello", None).await.unwrap(); + db.put(&ptr, "hello").await.unwrap(); get_res = db.get(&ptr).await.unwrap(); assert_eq!(get_res, "hello"); cleanup_db("test.db").await; @@ -61,7 +60,7 @@ async fn transaction() { let ptr: JsonPointer = "/b/b".parse().unwrap(); tx.put(&ptr, &(2 as usize)).await.unwrap(); tx.put(&ptr, &(1 as usize)).await.unwrap(); - let _res = tx.commit(None).await.unwrap(); + let _res = tx.commit().await.unwrap(); println!("res = {:?}", _res); cleanup_db("test.db").await; } diff --git a/patch-db/src/transaction.rs b/patch-db/src/transaction.rs index a4c54c4..d1253e0 100644 --- a/patch-db/src/transaction.rs +++ b/patch-db/src/transaction.rs @@ -2,7 +2,6 @@ use std::collections::BTreeSet; use std::sync::Arc; use async_trait::async_trait; -use barrage::Receiver; use json_ptr::{JsonPointer, SegList}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -15,14 +14,14 @@ use crate::{ locker::{Guard, LockType, Locker}, }; use crate::{handle::HandleId, model_paths::JsonGlob}; -use crate::{DbHandle, Error, PatchDbHandle}; +use crate::{DbHandle, Error, PatchDbHandle, Subscriber}; pub struct Transaction { pub(crate) id: HandleId, pub(crate) parent: Parent, pub(crate) locks: Vec, pub(crate) updates: DiffPatch, - pub(crate) sub: Receiver>, + pub(crate) sub: Subscriber, } impl Transaction<&mut PatchDbHandle> { pub async fn commit(mut self) -> Result>, Error> { @@ -56,9 +55,9 @@ impl Transaction { impl DbHandle for Transaction { async fn begin<'a>(&'a mut self) -> Result, Error> { let store_lock = self.parent.store(); - let store = store_lock.read().await; + let mut store = store_lock.write().await; self.rebase(); - let sub = self.parent.subscribe(); + let sub = store.subscribe(); drop(store); Ok(Transaction { id: self.id(), @@ -73,16 +72,13 @@ impl DbHandle for Transaction { } fn rebase(&mut self) { self.parent.rebase(); - while let Some(rev) = self.sub.try_recv().unwrap() { + while let Ok(rev) = self.sub.try_recv() { self.updates.rebase(&rev.patch); } } fn store(&self) -> Arc> { self.parent.store() } - fn subscribe(&self) -> Receiver> { - self.parent.subscribe() - } fn locker(&self) -> &Locker { self.parent.locker() }