mirror of
https://github.com/Start9Labs/patch-db.git
synced 2026-03-26 02:11:54 +00:00
remove revision cache
This commit is contained in:
@@ -1,5 +1,3 @@
|
|||||||
use std::path::Path;
|
|
||||||
|
|
||||||
use patch_db::json_ptr::{JsonPointer, PtrSegment};
|
use patch_db::json_ptr::{JsonPointer, PtrSegment};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
@@ -25,7 +23,7 @@ async fn main() {
|
|||||||
Some(("dump", matches)) => {
|
Some(("dump", matches)) => {
|
||||||
let path = matches.value_of("PATH").unwrap();
|
let path = matches.value_of("PATH").unwrap();
|
||||||
let db = patch_db::PatchDb::open(path).await.unwrap();
|
let db = patch_db::PatchDb::open(path).await.unwrap();
|
||||||
let dump = db.dump().await.unwrap();
|
let dump = db.dump().await;
|
||||||
serde_json::to_writer_pretty(&mut std::io::stdout(), &dump.value).unwrap();
|
serde_json::to_writer_pretty(&mut std::io::stdout(), &dump.value).unwrap();
|
||||||
println!();
|
println!();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::collections::{BTreeSet, HashMap, VecDeque};
|
use std::collections::{BTreeSet, HashMap};
|
||||||
use std::fs::OpenOptions;
|
use std::fs::OpenOptions;
|
||||||
use std::io::{SeekFrom, Write};
|
use std::io::{SeekFrom, Write};
|
||||||
use std::panic::UnwindSafe;
|
use std::panic::UnwindSafe;
|
||||||
@@ -24,38 +24,6 @@ lazy_static! {
|
|||||||
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
|
static ref OPEN_STORES: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>> = Mutex::new(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RevisionCache {
|
|
||||||
cache: VecDeque<Arc<Revision>>,
|
|
||||||
capacity: usize,
|
|
||||||
}
|
|
||||||
impl RevisionCache {
|
|
||||||
pub fn with_capacity(capacity: usize) -> Self {
|
|
||||||
RevisionCache {
|
|
||||||
cache: VecDeque::with_capacity(capacity),
|
|
||||||
capacity,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn push(&mut self, revision: Arc<Revision>) {
|
|
||||||
while self.capacity > 0 && self.cache.len() >= self.capacity {
|
|
||||||
self.cache.pop_front();
|
|
||||||
}
|
|
||||||
self.cache.push_back(revision);
|
|
||||||
}
|
|
||||||
pub fn since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
|
|
||||||
let start = self.cache.get(0).map(|rev| rev.id)?;
|
|
||||||
if id < start - 1 {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
Some(
|
|
||||||
self.cache
|
|
||||||
.iter()
|
|
||||||
.skip((id - start + 1) as usize)
|
|
||||||
.cloned()
|
|
||||||
.collect(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Store {
|
pub struct Store {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
file: FdLock<File>,
|
file: FdLock<File>,
|
||||||
@@ -63,7 +31,6 @@ pub struct Store {
|
|||||||
_lock: OwnedMutexGuard<()>,
|
_lock: OwnedMutexGuard<()>,
|
||||||
persistent: Value,
|
persistent: Value,
|
||||||
revision: u64,
|
revision: u64,
|
||||||
revision_cache: RevisionCache,
|
|
||||||
broadcast: Broadcast<Arc<Revision>>,
|
broadcast: Broadcast<Arc<Revision>>,
|
||||||
}
|
}
|
||||||
impl Store {
|
impl Store {
|
||||||
@@ -132,7 +99,6 @@ impl Store {
|
|||||||
_lock,
|
_lock,
|
||||||
persistent,
|
persistent,
|
||||||
revision,
|
revision,
|
||||||
revision_cache: RevisionCache::with_capacity(64),
|
|
||||||
broadcast: Broadcast::new(),
|
broadcast: Broadcast::new(),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@@ -140,12 +106,6 @@ impl Store {
|
|||||||
res.compress().await?;
|
res.compress().await?;
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
pub(crate) fn get_revisions_since(&self, id: u64) -> Option<Vec<Arc<Revision>>> {
|
|
||||||
if id >= self.revision {
|
|
||||||
return Some(Vec::new());
|
|
||||||
}
|
|
||||||
self.revision_cache.since(id)
|
|
||||||
}
|
|
||||||
pub async fn close(mut self) -> Result<(), Error> {
|
pub async fn close(mut self) -> Result<(), Error> {
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
@@ -175,11 +135,14 @@ impl Store {
|
|||||||
) -> Result<T, Error> {
|
) -> Result<T, Error> {
|
||||||
Ok(imbl_value::from_value(self.get_value(ptr))?)
|
Ok(imbl_value::from_value(self.get_value(ptr))?)
|
||||||
}
|
}
|
||||||
pub(crate) fn dump(&self) -> Result<Dump, Error> {
|
pub(crate) fn dump(&self) -> Dump {
|
||||||
Ok(Dump {
|
Dump {
|
||||||
id: self.revision,
|
id: self.revision,
|
||||||
value: self.persistent.clone(),
|
value: self.persistent.clone(),
|
||||||
})
|
}
|
||||||
|
}
|
||||||
|
pub(crate) fn sequence(&self) -> u64 {
|
||||||
|
self.revision
|
||||||
}
|
}
|
||||||
pub(crate) fn subscribe(&mut self) -> Subscriber {
|
pub(crate) fn subscribe(&mut self) -> Subscriber {
|
||||||
self.broadcast.subscribe()
|
self.broadcast.subscribe()
|
||||||
@@ -285,7 +248,6 @@ impl Store {
|
|||||||
|
|
||||||
let id = self.revision;
|
let id = self.revision;
|
||||||
let res = Arc::new(Revision { id, patch });
|
let res = Arc::new(Revision { id, patch });
|
||||||
self.revision_cache.push(res.clone());
|
|
||||||
self.broadcast.send(&res);
|
self.broadcast.send(&res);
|
||||||
|
|
||||||
Ok(Some(res))
|
Ok(Some(res))
|
||||||
@@ -302,21 +264,16 @@ impl PatchDb {
|
|||||||
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
store: Arc::new(RwLock::new(Store::open(path).await?)),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
pub async fn sync(&self, sequence: u64) -> Result<Result<Vec<Arc<Revision>>, Dump>, Error> {
|
pub async fn dump(&self) -> Dump {
|
||||||
let store = self.store.read().await;
|
|
||||||
if let Some(revs) = store.get_revisions_since(sequence) {
|
|
||||||
Ok(Ok(revs))
|
|
||||||
} else {
|
|
||||||
Ok(Err(store.dump()?))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub async fn dump(&self) -> Result<Dump, Error> {
|
|
||||||
self.store.read().await.dump()
|
self.store.read().await.dump()
|
||||||
}
|
}
|
||||||
pub async fn dump_and_sub(&self) -> Result<(Dump, Subscriber), Error> {
|
pub async fn sequence(&self) -> u64 {
|
||||||
|
self.store.read().await.sequence()
|
||||||
|
}
|
||||||
|
pub async fn dump_and_sub(&self) -> (Dump, Subscriber) {
|
||||||
let mut store = self.store.write().await;
|
let mut store = self.store.write().await;
|
||||||
let sub = store.broadcast.subscribe();
|
let sub = store.broadcast.subscribe();
|
||||||
Ok((store.dump()?, sub))
|
(store.dump(), sub)
|
||||||
}
|
}
|
||||||
pub async fn subscribe(&self) -> Subscriber {
|
pub async fn subscribe(&self) -> Subscriber {
|
||||||
self.store.write().await.subscribe()
|
self.store.write().await.subscribe()
|
||||||
|
|||||||
Reference in New Issue
Block a user