cache corrupted error

This commit is contained in:
Aiden McClelland
2021-01-08 15:40:44 -07:00
parent ecdb1d706c
commit 4dbad8a7c5

View File

@@ -39,6 +39,8 @@ pub enum Error {
Join(#[from] tokio::task::JoinError), Join(#[from] tokio::task::JoinError),
#[error("FD Lock Error: {0}")] #[error("FD Lock Error: {0}")]
FDLock(#[from] fd_lock_rs::Error), FDLock(#[from] fd_lock_rs::Error),
#[error("Database Cache Corrupted: {0}")]
CacheCorrupted(Arc<IOError>),
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -52,6 +54,7 @@ pub struct DiffPatch(Patch);
pub struct Store { pub struct Store {
file: FdLock<File>, file: FdLock<File>,
cache_corrupted: Option<Arc<IOError>>,
data: Value, data: Value,
revision: u64, revision: u64,
} }
@@ -95,12 +98,28 @@ impl Store {
Ok::<_, Error>(Store { Ok::<_, Error>(Store {
file: f.map(File::from_std), file: f.map(File::from_std),
cache_corrupted: None,
data, data,
revision, revision,
}) })
}) })
.await??) .await??)
} }
fn check_cache_corrupted(&self) -> Result<(), Error> {
if let Some(ref err) = self.cache_corrupted {
Err(Error::CacheCorrupted(err.clone()))
} else {
Ok(())
}
}
fn get_data(&self) -> Result<&Value, Error> {
self.check_cache_corrupted()?;
Ok(&self.data)
}
fn get_data_mut(&mut self) -> Result<&mut Value, Error> {
self.check_cache_corrupted()?;
Ok(&mut self.data)
}
pub async fn close(mut self) -> Result<(), Error> { pub async fn close(mut self) -> Result<(), Error> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@@ -114,11 +133,11 @@ impl Store {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
) -> Result<T, Error> { ) -> Result<T, Error> {
Ok(serde_json::from_value( Ok(serde_json::from_value(
ptr.get(&self.data).unwrap_or(&Value::Null).clone(), ptr.get(self.get_data()?).unwrap_or(&Value::Null).clone(),
)?) )?)
} }
pub fn dump(&self) -> Value { pub fn dump(&self) -> Value {
self.data.clone() self.get_data().unwrap().clone()
} }
pub async fn put<T: Serialize, S: AsRef<str>, V: SegList>( pub async fn put<T: Serialize, S: AsRef<str>, V: SegList>(
&mut self, &mut self,
@@ -126,7 +145,7 @@ impl Store {
value: &T, value: &T,
) -> Result<Arc<Revision>, Error> { ) -> Result<Arc<Revision>, Error> {
let mut patch = DiffPatch(json_patch::diff( let mut patch = DiffPatch(json_patch::diff(
ptr.get(&self.data).unwrap_or(&Value::Null), ptr.get(self.get_data()?).unwrap_or(&Value::Null),
&serde_json::to_value(value)?, &serde_json::to_value(value)?,
)); ));
patch.0.prepend(ptr); patch.0.prepend(ptr);
@@ -135,8 +154,9 @@ impl Store {
pub async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> { pub async fn apply(&mut self, patch: DiffPatch) -> Result<Arc<Revision>, Error> {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
self.check_cache_corrupted()?;
let patch_bin = serde_cbor::to_vec(&patch.0)?; let patch_bin = serde_cbor::to_vec(&patch.0)?;
json_patch::patch(&mut self.data, &patch.0)?; json_patch::patch(self.get_data_mut()?, &patch.0)?;
async fn sync_to_disk(file: &mut File, patch_bin: &[u8]) -> Result<(), IOError> { async fn sync_to_disk(file: &mut File, patch_bin: &[u8]) -> Result<(), IOError> {
file.write_all(patch_bin).await?; file.write_all(patch_bin).await?;
@@ -145,8 +165,9 @@ impl Store {
Ok(()) Ok(())
} }
if let Err(e) = sync_to_disk(&mut *self.file, &patch_bin).await { if let Err(e) = sync_to_disk(&mut *self.file, &patch_bin).await {
eprintln!("I/O Error: {}", e); let e = Arc::new(e);
panic!("Failed to sync data to disk after successfully applying changes in memory."); self.cache_corrupted = Some(e.clone());
return Err(Error::CacheCorrupted(e));
// TODO: try to recover. // TODO: try to recover.
} }
@@ -223,7 +244,7 @@ impl Transaction {
ptr: &JsonPointer<S, V>, ptr: &JsonPointer<S, V>,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
let mut data: Value = ptr let mut data: Value = ptr
.get(&self.db.store.read().await.data) .get(self.db.store.read().await.get_data()?)
.unwrap_or(&Value::Null) .unwrap_or(&Value::Null)
.clone(); .clone();
for op in (self.updates.0).0.iter() { for op in (self.updates.0).0.iter() {