remove revision cache (#62)

This commit is contained in:
Aiden McClelland
2023-10-16 16:02:02 -06:00
committed by GitHub
parent 3e5cc22bb6
commit 6af2221add
2 changed files with 14 additions and 59 deletions

View File

@@ -1,5 +1,3 @@
use std::path::Path;
use patch_db::json_ptr::{JsonPointer, PtrSegment}; use patch_db::json_ptr::{JsonPointer, PtrSegment};
use serde_json::Value; use serde_json::Value;
@@ -25,7 +23,7 @@ async fn main() {
Some(("dump", matches)) => { Some(("dump", matches)) => {
let path = matches.value_of("PATH").unwrap(); let path = matches.value_of("PATH").unwrap();
let db = patch_db::PatchDb::open(path).await.unwrap(); let db = patch_db::PatchDb::open(path).await.unwrap();
let dump = db.dump().await.unwrap(); let dump = db.dump().await;
serde_json::to_writer_pretty(&mut std::io::stdout(), &dump.value).unwrap(); serde_json::to_writer_pretty(&mut std::io::stdout(), &dump.value).unwrap();
println!(); println!();
} }

View File

@@ -1,4 +1,4 @@
use std::collections::{BTreeSet, HashMap, VecDeque}; use std::collections::{BTreeSet, HashMap};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{SeekFrom, Write}; use std::io::{SeekFrom, Write};
use std::panic::UnwindSafe; use std::panic::UnwindSafe;
@@ -24,38 +24,6 @@ lazy_static! {
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new()); static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
} }
pub struct RevisionCache {
cache: VecDeque<Arc<Revision>>,
capacity: usize,
}
impl RevisionCache {
pub fn with_capacity(capacity: usize) -> Self {
RevisionCache {
cache: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn push(&mut self, revision: Arc<Revision>) {
while self.capacity > 0 && self.cache.len() >= self.capacity {
self.cache.pop_front();
}
self.cache.push_back(revision);
}
pub fn since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
let start = self.cache.get(0).map(|rev| rev.id)?;
if id < start - 1 {
return None;
}
Some(
self.cache
.iter()
.skip((id - start + 1) as usize)
.cloned()
.collect(),
)
}
}
pub struct Store { pub struct Store {
path: PathBuf, path: PathBuf,
file: FdLock<File>, file: FdLock<File>,
@@ -63,7 +31,6 @@ pub struct Store {
_lock: OwnedMutexGuard<()>, _lock: OwnedMutexGuard<()>,
persistent: Value, persistent: Value,
revision: u64, revision: u64,
revision_cache: RevisionCache,
broadcast: Broadcast<Arc<Revision>>, broadcast: Broadcast<Arc<Revision>>,
} }
impl Store { impl Store {
@@ -132,7 +99,6 @@ impl Store {
_lock, _lock,
persistent, persistent,
revision, revision,
revision_cache: RevisionCache::with_capacity(64),
broadcast: Broadcast::new(), broadcast: Broadcast::new(),
}) })
}) })
@@ -140,12 +106,6 @@ impl Store {
res.compress().await?; res.compress().await?;
Ok(res) Ok(res)
} }
pub(crate) fn get_revisions_since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
if id >= self.revision {
return Some(Vec::new());
}
self.revision_cache.since(id)
}
pub async fn close(mut self) -> Result<(), Error> { pub async fn close(mut self) -> Result<(), Error> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@@ -175,11 +135,14 @@ impl Store {
) -> Result<T, Error> { ) -> Result<T, Error> {
Ok(imbl_value::from_value(self.get_value(ptr))?) Ok(imbl_value::from_value(self.get_value(ptr))?)
} }
pub(crate) fn dump(&self) -> Result<Dump, Error> { pub(crate) fn dump(&self) -> Dump {
Ok(Dump { Dump {
id: self.revision, id: self.revision,
value: self.persistent.clone(), value: self.persistent.clone(),
}) }
}
pub(crate) fn sequence(&self) -> u64 {
self.revision
} }
pub(crate) fn subscribe(&mut self) -> Subscriber { pub(crate) fn subscribe(&mut self) -> Subscriber {
self.broadcast.subscribe() self.broadcast.subscribe()
@@ -285,7 +248,6 @@ impl Store {
let id = self.revision; let id = self.revision;
let res = Arc::new(Revision { id, patch }); let res = Arc::new(Revision { id, patch });
self.revision_cache.push(res.clone());
self.broadcast.send(&res); self.broadcast.send(&res);
Ok(Some(res)) Ok(Some(res))
@@ -302,21 +264,16 @@ impl PatchDb {
store: Arc::new(RwLock::new(Store::open(path).await?)), store: Arc::new(RwLock::new(Store::open(path).await?)),
}) })
} }
pub async fn sync(&self, sequence: u64) -> Result<Result<Vec<Arc<Revision>>, Dump>, Error> { pub async fn dump(&self) -> Dump {
let store = self.store.read().await;
if let Some(revs) = store.get_revisions_since(sequence) {
Ok(Ok(revs))
} else {
Ok(Err(store.dump()?))
}
}
pub async fn dump(&self) -> Result<Dump, Error> {
self.store.read().await.dump() self.store.read().await.dump()
} }
pub async fn dump_and_sub(&self) -> Result<(Dump, Subscriber), Error> { pub async fn sequence(&self) -> u64 {
self.store.read().await.sequence()
}
pub async fn dump_and_sub(&self) -> (Dump, Subscriber) {
let mut store = self.store.write().await; let mut store = self.store.write().await;
let sub = store.broadcast.subscribe(); let sub = store.broadcast.subscribe();
Ok((store.dump()?, sub)) (store.dump(), sub)
} }
pub async fn subscribe(&self) -> Subscriber { pub async fn subscribe(&self) -> Subscriber {
self.store.write().await.subscribe() self.store.write().await.subscribe()