Feature/simple syncdb (#2464)

* simplify db sync on rpc endpoints

* switch to patch-db master

* update fe for websocket only stream

* fix api

---------

Co-authored-by: Matt Hill <mattnine@protonmail.com>
This commit is contained in:
Aiden McClelland
2023-10-17 09:49:58 -06:00
committed by GitHub
parent afbab293a8
commit 202695096a
29 changed files with 101 additions and 168 deletions

View File

@@ -134,7 +134,7 @@ pub async fn action(
let manifest = ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(&pkg_id)
.or_not_found(&pkg_id)?

View File

@@ -56,7 +56,7 @@ pub async fn backup_all(
package_ids: Option<OrdSet<PackageId>>,
#[arg] password: crate::auth::PasswordType,
) -> Result<(), Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let old_password_decrypted = old_password
.as_ref()
.unwrap_or(&password)
@@ -265,7 +265,7 @@ async fn perform_backup(
}
}
let ui = ctx.db.peek().await?.into_ui().de()?;
let ui = ctx.db.peek().await.into_ui().de()?;
let mut os_backup_file = AtomicFile::new(
backup_guard.lock().await.as_ref().join("os-backup.cbor"),

View File

@@ -134,7 +134,7 @@ impl BackupActions {
let marketplace_url = ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(&pkg_id)
.or_not_found(pkg_id)?

View File

@@ -310,7 +310,7 @@ async fn assure_restoring(
let mut insert_packages = BTreeMap::new();
for id in ids {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let model = peek.as_package_data().as_idx(&id);

View File

@@ -167,7 +167,7 @@ pub async fn get(
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<ConfigRes, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let manifest = db
.as_package_data()
.as_idx(&id)
@@ -256,7 +256,7 @@ pub async fn configure(
id: &PackageId,
configure_context: ConfigureContext,
) -> Result<BTreeMap<PackageId, String>, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let package = db
.as_package_data()
.as_idx(id)

View File

@@ -1696,7 +1696,6 @@ impl TorAddressPointer {
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?
.as_package_data()
.as_idx(&self.package_id)
.and_then(|pde| pde.as_installed())
@@ -1739,7 +1738,6 @@ impl LanAddressPointer {
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?
.as_package_data()
.as_idx(&self.package_id)
.and_then(|pde| pde.as_installed())
@@ -1775,11 +1773,7 @@ impl ConfigPointer {
Ok(self.select(&Value::Object(cfg.clone())))
} else {
let id = &self.package_id;
let db = ctx
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?;
let db = ctx.db.peek().await;
let manifest = db.as_package_data().as_idx(id).map(|pde| pde.as_manifest());
let cfg_actions = manifest.and_then(|m| m.as_config().transpose_ref());
if let (Some(manifest), Some(cfg_actions)) = (manifest, cfg_actions) {

View File

@@ -276,7 +276,7 @@ impl RpcContext {
})
.await?;
let peek = self.db.peek().await?;
let peek = self.db.peek().await;
for (package_id, package) in peek.as_package_data().as_entries()?.into_iter() {
let action = match package.as_match() {

View File

@@ -12,7 +12,7 @@ use crate::Error;
#[command(display(display_none), metadata(sync_db = true))]
#[instrument(skip_all)]
pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(), Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
@@ -35,7 +35,7 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(
#[command(display(display_none), metadata(sync_db = true))]
pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<MainStatus, Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
@@ -71,7 +71,7 @@ pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<Ma
#[command(display(display_none), metadata(sync_db = true))]
pub async fn restart(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(), Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)

View File

@@ -37,7 +37,7 @@ async fn ws_handler<
session: Option<(HasValidSession, HashSessionToken)>,
ws_fut: WSFut,
) -> Result<(), Error> {
let (dump, sub) = ctx.db.dump_and_sub().await?;
let (dump, sub) = ctx.db.dump_and_sub().await;
let mut stream = ws_fut
.await
.with_kind(ErrorKind::Network)?
@@ -174,7 +174,7 @@ pub async fn subscribe(ctx: RpcContext, req: Request<Body>) -> Result<Response<B
Ok(res)
}
#[command(subcommands(revisions, dump, put, apply))]
#[command(subcommands(dump, put, apply))]
pub fn db() -> Result<(), RpcError> {
Ok(())
}
@@ -186,20 +186,6 @@ pub enum RevisionsRes {
Dump(Dump),
}
#[command(display(display_serializable))]
pub async fn revisions(
#[context] ctx: RpcContext,
#[arg] since: u64,
#[allow(unused_variables)]
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<RevisionsRes, Error> {
Ok(match ctx.db.sync(since).await? {
Ok(revs) => RevisionsRes::Revisions(revs),
Err(dump) => RevisionsRes::Dump(dump),
})
}
#[instrument(skip_all)]
async fn cli_dump(
ctx: CliContext,
@@ -207,7 +193,7 @@ async fn cli_dump(
path: Option<PathBuf>,
) -> Result<Dump, RpcError> {
let dump = if let Some(path) = path {
PatchDb::open(path).await?.dump().await?
PatchDb::open(path).await?.dump().await
} else {
rpc_toolkit::command_helpers::call_remote(
ctx,
@@ -235,7 +221,7 @@ pub async fn dump(
#[arg]
path: Option<PathBuf>,
) -> Result<Dump, Error> {
Ok(ctx.db.dump().await?)
Ok(ctx.db.dump().await)
}
fn apply_expr(input: jaq_core::Val, expr: &str) -> Result<jaq_core::Val, Error> {

View File

@@ -28,7 +28,7 @@ where
#[async_trait::async_trait]
pub trait PatchDbExt {
async fn peek(&self) -> Result<DatabaseModel, Error>;
async fn peek(&self) -> DatabaseModel;
async fn mutate<U: UnwindSafe + Send>(
&self,
f: impl FnOnce(&mut DatabaseModel) -> Result<U, Error> + UnwindSafe + Send,
@@ -40,8 +40,8 @@ pub trait PatchDbExt {
}
#[async_trait::async_trait]
impl PatchDbExt for PatchDb {
async fn peek(&self) -> Result<DatabaseModel, Error> {
Ok(DatabaseModel::from(self.dump().await?.value))
async fn peek(&self) -> DatabaseModel {
DatabaseModel::from(self.dump().await.value)
}
async fn mutate<U: UnwindSafe + Send>(
&self,

View File

@@ -170,7 +170,7 @@ pub async fn configure_logic(
ctx: RpcContext,
(pkg_id, dependency_id): (PackageId, PackageId),
) -> Result<ConfigDryRes, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let pkg = db
.as_package_data()
.as_idx(&pkg_id)

View File

@@ -204,7 +204,7 @@ pub async fn init(cfg: &RpcContextConfig) -> Result<InitResult, Error> {
let account = AccountInfo::load(&secret_store).await?;
let db = cfg.db(&account).await?;
tracing::info!("Opened PatchDB");
let peek = db.peek().await?;
let peek = db.peek().await;
let mut server_info = peek.as_server_info().de()?;
// write to ca cert store

View File

@@ -62,7 +62,7 @@ pub async fn cleanup_failed(ctx: &RpcContext, id: &PackageId) -> Result<(), Erro
if let Some(version) = match ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(id)
.or_not_found(id)?
@@ -141,7 +141,7 @@ pub async fn uninstall<Ex>(ctx: &RpcContext, secrets: &mut Ex, id: &PackageId) -
where
for<'a> &'a mut Ex: Executor<'a, Database = Postgres>,
{
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let entry = db
.as_package_data()
.as_idx(id)

View File

@@ -64,7 +64,7 @@ pub const PKG_WASM_DIR: &str = "package-data/wasm";
#[command(display(display_serializable))]
pub async fn list(#[context] ctx: RpcContext) -> Result<Value, Error> {
Ok(ctx.db.peek().await?.as_package_data().as_entries()?
Ok(ctx.db.peek().await.as_package_data().as_entries()?
.iter()
.filter_map(|(id, pde)| {
let status = match pde.as_match() {
@@ -666,7 +666,7 @@ pub async fn download_install_s9pk(
) -> Result<(), Error> {
let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version;
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
if let Result::<(), Error>::Err(e) = {
let ctx = ctx.clone();
@@ -786,7 +786,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
rdr.validated();
let developer_key = rdr.developer_key().clone();
rdr.reset().await?;
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
tracing::info!("Install {}@{}: Unpacking Manifest", pkg_id, version);
let manifest = progress
@@ -1017,7 +1017,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
)
.await?;
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let prev = peek
.as_package_data()
.as_idx(pkg_id)

View File

@@ -11,7 +11,7 @@ use crate::Error;
#[instrument(skip_all)]
pub async fn check(ctx: &RpcContext, id: &PackageId) -> Result<(), Error> {
let (manifest, started) = {
let peeked = ctx.db.peek().await?;
let peeked = ctx.db.peek().await;
let pde = peeked
.as_package_data()
.as_idx(id)

View File

@@ -53,7 +53,7 @@ impl ManageContainer {
let current_state = Arc::new(watch::channel(StartStop::Stop).0);
let desired_state = Arc::new(
watch::channel::<StartStop>(
get_status(seed.ctx.db.peek().await?, &seed.manifest).into(),
get_status(seed.ctx.db.peek().await, &seed.manifest).into(),
)
.0,
);
@@ -103,7 +103,7 @@ impl ManageContainer {
&self,
seed: &manager_seed::ManagerSeed,
) -> Result<(), Error> {
let current_state = get_status(seed.ctx.db.peek().await?, &seed.manifest);
let current_state = get_status(seed.ctx.db.peek().await, &seed.manifest);
self.override_main_status
.send_modify(|x| *x = Some(current_state));
Ok(())

View File

@@ -267,7 +267,7 @@ impl Manager {
let manage_container = self.manage_container.clone();
let seed = self.seed.clone();
async move {
let peek = seed.ctx.db.peek().await?;
let peek = seed.ctx.db.peek().await;
let state_reverter = DesiredStateReverter::new(manage_container.clone());
let override_guard =
manage_container.set_override(get_status(peek, &seed.manifest).backing_up())?;
@@ -338,7 +338,7 @@ async fn configure(
id: PackageId,
mut configure_context: ConfigureContext,
) -> Result<BTreeMap<PackageId, String>, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let id = &id;
let ctx = &ctx;
let overrides = &mut configure_context.overrides;

View File

@@ -1,4 +1,3 @@
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::FutureExt;
use http::HeaderValue;
@@ -11,7 +10,6 @@ use rpc_toolkit::yajrc::RpcMethod;
use rpc_toolkit::Metadata;
use crate::context::RpcContext;
use crate::{Error, ResultExt};
pub fn db<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
Box::new(
@@ -22,49 +20,19 @@ pub fn db<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
async move {
let m2: DynMiddlewareStage2 = Box::new(move |req, rpc_req| {
async move {
let seq = req.headers.remove("x-patch-sequence");
let sync_db = metadata
.get(rpc_req.method.as_str(), "sync_db")
.unwrap_or(false);
let m3: DynMiddlewareStage3 = Box::new(move |res, _| {
async move {
if sync_db && seq.is_some() {
match async {
let seq = seq
.ok_or_else(|| {
Error::new(
eyre!("Missing X-Patch-Sequence"),
crate::ErrorKind::InvalidRequest,
)
})?
.to_str()
.with_kind(crate::ErrorKind::InvalidRequest)?
.parse()?;
let res = ctx.db.sync(seq).await?;
let json = match res {
Ok(revs) => serde_json::to_vec(&revs),
Err(dump) => serde_json::to_vec(&[dump]),
}
.with_kind(crate::ErrorKind::Serialization)?;
Ok::<_, Error>(base64::encode_config(
&json,
base64::URL_SAFE,
))
}
.await
{
Ok(a) => res
.headers
.append("X-Patch-Updates", HeaderValue::from_str(&a)?),
Err(e) => res.headers.append(
"X-Patch-Error",
HeaderValue::from_str(&base64::encode_config(
&e.to_string(),
base64::URL_SAFE,
))?,
),
};
if sync_db {
res.headers.append(
"X-Patch-Sequence",
HeaderValue::from_str(
&ctx.db.sequence().await.to_string(),
)?,
);
}
Ok(Ok(noop4()))
}

View File

@@ -236,7 +236,7 @@ impl NotificationManager {
subtype: T,
debounce_interval: Option<u32>,
) -> Result<(), Error> {
let peek = db.peek().await?;
let peek = db.peek().await;
if !self
.should_notify(&package_id, &level, &title, debounce_interval)
.await

View File

@@ -21,7 +21,7 @@ pub async fn properties(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Res
#[instrument(skip_all)]
pub async fn fetch_properties(ctx: RpcContext, id: PackageId) -> Result<Value, Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let manifest = peek
.as_package_data()

View File

@@ -58,7 +58,7 @@ pub async fn enable_zram() -> Result<(), Error> {
#[command(display(display_none))]
pub async fn zram(#[context] ctx: RpcContext, #[arg] enable: bool) -> Result<(), Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let zram = db.as_server_info().as_zram().de()?;
if enable == zram {

View File

@@ -76,7 +76,7 @@ fn display_update_result(status: UpdateResult, _: &ArgMatches) {
#[instrument(skip_all)]
async fn maybe_do_update(ctx: RpcContext, marketplace_url: Url) -> Result<Option<()>, Error> {
let peeked = ctx.db.peek().await?;
let peeked = ctx.db.peek().await;
let latest_version: Version = ctx
.client
.get(with_query_params(

View File

@@ -163,7 +163,7 @@ where
}
pub async fn init(db: &PatchDb, secrets: &PgPool) -> Result<(), Error> {
let version = Version::from_util_version(db.peek().await?.as_server_info().as_version().de()?);
let version = Version::from_util_version(db.peek().await.as_server_info().as_version().de()?);
match version {
Version::V0_3_4(v) => v.0.migrate_to(&Current::new(), db.clone(), secrets).await?,

View File

@@ -28,7 +28,7 @@ impl VersionT for Version {
&V0_3_0_COMPAT
}
async fn up(&self, db: PatchDb, _secrets: &PgPool) -> Result<(), Error> {
let peek = db.peek().await?;
let peek = db.peek().await;
let mut url_replacements = BTreeMap::new();
for (_, pde) in peek.as_package_data().as_entries()? {
for (dependency, info) in pde

View File

@@ -1,4 +1,4 @@
import { Observable, Subject } from 'rxjs'
import { Observable } from 'rxjs'
import { Update } from 'patch-db-client'
import { RR } from './api.types'
import { DataModel } from 'src/app/services/patch-db/data-model'
@@ -6,8 +6,6 @@ import { Log } from '@start9labs/shared'
import { WebSocketSubjectConfig } from 'rxjs/webSocket'
export abstract class ApiService {
readonly patchStream$ = new Subject<Update<DataModel>[]>()
// http
// for getting static files: ex icons, instructions, licenses

View File

@@ -1,6 +1,5 @@
import { Inject, Injectable } from '@angular/core'
import {
decodeBase64,
HttpOptions,
HttpService,
isRpcError,
@@ -14,7 +13,7 @@ import { RR } from './api.types'
import { parsePropertiesPermissive } from 'src/app/util/properties.util'
import { ConfigService } from '../config.service'
import { webSocket, WebSocketSubjectConfig } from 'rxjs/webSocket'
import { Observable } from 'rxjs'
import { Observable, filter, firstValueFrom } from 'rxjs'
import { AuthService } from '../auth.service'
import { DOCUMENT } from '@angular/common'
import { DataModel } from '../patch-db/data-model'
@@ -67,7 +66,7 @@ export class LiveApiService extends ApiService {
// auth
async login(params: RR.LoginReq): Promise<RR.loginRes> {
return this.rpcRequest({ method: 'auth.login', params }, false)
return this.rpcRequest({ method: 'auth.login', params })
}
async logout(params: RR.LogoutReq): Promise<RR.LogoutRes> {
@@ -91,7 +90,7 @@ export class LiveApiService extends ApiService {
// server
async echo(params: RR.EchoReq, urlOverride?: string): Promise<RR.EchoRes> {
return this.rpcRequest({ method: 'echo', params }, false, urlOverride)
return this.rpcRequest({ method: 'echo', params }, urlOverride)
}
openPatchWebsocket$(): Observable<Update<DataModel>> {
@@ -434,42 +433,28 @@ export class LiveApiService extends ApiService {
private async rpcRequest<T>(
options: RPCOptions,
addHeader = true,
urlOverride?: string,
): Promise<T> {
if (addHeader) {
options.headers = {
'x-patch-sequence': String(this.patch.cache$.value.sequence),
...(options.headers || {}),
}
}
const res = await this.http.rpcRequest<T>(options, urlOverride)
const encodedUpdates = res.headers.get('x-patch-updates')
const encodedError = res.headers.get('x-patch-error')
const body = res.body
if (encodedUpdates) {
const decoded = decodeBase64(encodedUpdates)
const updates: Update<DataModel>[] = JSON.parse(decoded)
this.patchStream$.next(updates)
}
if (encodedError) {
const error = decodeBase64(encodedError)
console.error(error)
}
const rpcRes = res.body
if (isRpcError(rpcRes)) {
if (rpcRes.error.code === 34) {
if (isRpcError(body)) {
if (body.error.code === 34) {
console.error('Unauthenticated, logging out')
this.auth.setUnverified()
}
throw new RpcError(rpcRes.error)
throw new RpcError(body.error)
}
return rpcRes.result
const patchSequence = res.headers.get('x-patch-sequence')
if (patchSequence)
await firstValueFrom(
this.patch.cache$.pipe(
filter(({ sequence }) => sequence >= Number(patchSequence)),
),
)
return body.result
}
private async httpRequest<T>(opts: HttpOptions): Promise<T> {

View File

@@ -65,7 +65,6 @@ export class MockApiService extends ApiService {
.pipe(
tap(() => {
this.sequence = 0
this.patchStream$.next([])
}),
switchMap(verified =>
iif(
@@ -108,7 +107,9 @@ export class MockApiService extends ApiService {
value: params.value,
},
]
return this.withRevision(patch)
this.mockRevision(patch)
return null
}
// auth
@@ -289,7 +290,9 @@ export class MockApiService extends ApiService {
value: initialProgress,
},
]
return this.withRevision(patch, 'updating')
this.mockRevision(patch)
return 'updating'
}
async restartServer(
@@ -332,7 +335,9 @@ export class MockApiService extends ApiService {
value: params.enable,
},
]
return this.withRevision(patch, null)
this.mockRevision(patch)
return null
}
// marketplace URLs
@@ -385,7 +390,9 @@ export class MockApiService extends ApiService {
value: 0,
},
]
return this.withRevision(patch, Mock.Notifications)
this.mockRevision(patch)
return Mock.Notifications
}
async deleteNotification(
@@ -564,7 +571,9 @@ export class MockApiService extends ApiService {
},
]
return this.withRevision(originalPatch)
this.mockRevision(originalPatch)
return null
}
// package
@@ -629,7 +638,9 @@ export class MockApiService extends ApiService {
},
},
]
return this.withRevision(patch)
this.mockRevision(patch)
return null
}
async getPackageConfig(
@@ -660,7 +671,9 @@ export class MockApiService extends ApiService {
value: true,
},
]
return this.withRevision(patch)
this.mockRevision(patch)
return null
}
async restorePackages(
@@ -684,7 +697,9 @@ export class MockApiService extends ApiService {
}
})
return this.withRevision(patch)
this.mockRevision(patch)
return null
}
async executePackageAction(
@@ -768,7 +783,9 @@ export class MockApiService extends ApiService {
},
]
return this.withRevision(originalPatch)
this.mockRevision(originalPatch)
return null
}
async restartPackage(
@@ -845,7 +862,9 @@ export class MockApiService extends ApiService {
},
]
return this.withRevision(patch)
this.mockRevision(patch)
return null
}
async stopPackage(params: RR.StopPackageReq): Promise<RR.StopPackageRes> {
@@ -876,7 +895,9 @@ export class MockApiService extends ApiService {
},
]
return this.withRevision(patch)
this.mockRevision(patch)
return null
}
async uninstallPackage(
@@ -902,7 +923,9 @@ export class MockApiService extends ApiService {
},
]
return this.withRevision(patch)
this.mockRevision(patch)
return null
}
async dryConfigureDependency(
@@ -1057,23 +1080,4 @@ export class MockApiService extends ApiService {
}
this.mockWsSource$.next(revision)
}
private async withRevision<T>(
patch: Operation<unknown>[],
response: T | null = null,
): Promise<T> {
if (!this.sequence) {
const { sequence } = this.bootstrapper.init()
this.sequence = sequence
}
this.patchStream$.next([
{
id: ++this.sequence,
patch,
},
])
return response as T
}
}

View File

@@ -9,7 +9,7 @@ import {
} from 'rxjs/operators'
import { Update } from 'patch-db-client'
import { DataModel } from './data-model'
import { defer, EMPTY, from, interval, merge, Observable } from 'rxjs'
import { defer, EMPTY, from, interval, Observable } from 'rxjs'
import { AuthService } from '../auth.service'
import { ConnectionService } from '../connection.service'
import { ApiService } from '../api/embassy-api.service'
@@ -51,9 +51,7 @@ export function sourceFactory(
)
return authService.isVerified$.pipe(
switchMap(verified =>
verified ? merge(websocket$, api.patchStream$) : EMPTY,
),
switchMap(verified => (verified ? websocket$ : EMPTY)),
)
})
}