pub mod model; pub mod prelude; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use axum::extract::ws; use clap::Parser; use imbl_value::InternedString; use itertools::Itertools; use patch_db::json_ptr::{JsonPointer, ROOT}; use patch_db::{DiffPatch, Dump, Revision}; use rpc_toolkit::yajrc::RpcError; use rpc_toolkit::{Context, HandlerArgs, HandlerExt, ParentHandler, from_fn_async}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{self, UnboundedReceiver}; use tokio::sync::watch; use tracing::instrument; use ts_rs::TS; use crate::context::{CliContext, RpcContext}; use crate::prelude::*; use crate::rpc_continuations::{Guid, RpcContinuation}; use crate::util::serde::{HandlerExtSerde, apply_expr}; lazy_static::lazy_static! { static ref PUBLIC: JsonPointer = "/public".parse().unwrap(); } pub trait DbAccess: Sized { fn access<'a>(db: &'a Model) -> &'a Model; } pub trait DbAccessMut: DbAccess { fn access_mut<'a>(db: &'a mut Model) -> &'a mut Model; } pub trait DbAccessByKey: Sized { type Key<'a>; fn access_by_key<'a>(db: &'a Model, key: Self::Key<'_>) -> Option<&'a Model>; } pub trait DbAccessMutByKey: DbAccessByKey { fn access_mut_by_key<'a>( db: &'a mut Model, key: Self::Key<'_>, ) -> Option<&'a mut Model>; } pub fn db() -> ParentHandler { ParentHandler::new() .subcommand( "dump", from_fn_async(cli_dump) .with_display_serializable() .with_about("about.filter-query-db"), ) .subcommand("dump", from_fn_async(dump).no_cli()) .subcommand( "subscribe", from_fn_async(subscribe) .with_metadata("get_session", Value::Bool(true)) .no_cli(), ) .subcommand( "put", put::().with_about("about.command-add-ui-record-db"), ) .subcommand( "apply", from_fn_async(cli_apply) .no_display() .with_about("about.update-db-record"), ) .subcommand("apply", from_fn_async(apply).no_cli()) } #[derive(Deserialize, Serialize)] #[serde(untagged)] pub enum RevisionsRes { Revisions(Vec>), Dump(Dump), } #[derive(Deserialize, Serialize, Parser)] #[serde(rename_all = "camelCase")] #[command(rename_all = "kebab-case")] pub struct CliDumpParams { #[arg( long = "include-private", short = 'p', help = "help.arg.include-private-data" )] #[serde(default)] include_private: bool, #[arg(help = "help.arg.db-path")] path: Option, } #[instrument(skip_all)] async fn cli_dump( HandlerArgs { context, parent_method, method, params: CliDumpParams { include_private, path, }, .. }: HandlerArgs, ) -> Result { let dump = if let Some(path) = path { PatchDb::open(path).await?.dump(&ROOT).await } else { let method = parent_method.into_iter().chain(method).join("."); from_value::( context .call_remote::( &method, imbl_value::json!({ "pointer": if include_private { AsRef::::as_ref(&ROOT) } else { AsRef::::as_ref(&*PUBLIC) } }), ) .await?, )? }; Ok(dump) } #[derive(Deserialize, Serialize, TS)] #[serde(rename_all = "camelCase")] pub struct DumpParams { #[ts(type = "string | null")] pointer: Option, } pub async fn dump(ctx: RpcContext, DumpParams { pointer }: DumpParams) -> Result { Ok(ctx.db.dump(pointer.as_ref().unwrap_or(&*PUBLIC)).await) } #[derive(Deserialize, Serialize, TS)] #[serde(rename_all = "camelCase")] pub struct SubscribeParams { #[ts(type = "string | null")] pointer: Option, #[ts(skip)] #[serde(rename = "__Auth_session")] session: Option, } #[derive(Deserialize, Serialize, TS)] #[serde(rename_all = "camelCase")] pub struct SubscribeRes { #[ts(type = "{ id: number; value: unknown }")] pub dump: Dump, pub guid: Guid, } struct DbSubscriber { rev: u64, sub: UnboundedReceiver, sync_db: watch::Receiver, } impl DbSubscriber { async fn recv(&mut self) -> Option { loop { tokio::select! { rev = self.sub.recv() => { if let Some(rev) = rev.as_ref() { self.rev = rev.id; } return rev } _ = self.sync_db.changed() => { let id = *self.sync_db.borrow(); if id > self.rev { match self.sub.try_recv() { Ok(rev) => { self.rev = rev.id; return Some(rev) } Err(mpsc::error::TryRecvError::Disconnected) => { return None } Err(mpsc::error::TryRecvError::Empty) => { return Some(Revision { id, patch: DiffPatch::default() }) } } } } } } } } pub async fn subscribe( ctx: RpcContext, SubscribeParams { pointer, session }: SubscribeParams, ) -> Result { let (dump, sub) = ctx .db .dump_and_sub(pointer.unwrap_or_else(|| PUBLIC.clone())) .await; let mut sub = DbSubscriber { rev: dump.id, sub, sync_db: ctx.sync_db.subscribe(), }; let guid = Guid::new(); ctx.rpc_continuations .add( guid.clone(), RpcContinuation::ws_authed( &ctx, session, |mut ws| async move { if let Err(e) = async { loop { tokio::select! { rev = sub.recv() => { if let Some(rev) = rev { ws.send(ws::Message::Text( serde_json::to_string(&rev) .with_kind(ErrorKind::Serialization)? .into(), )) .await .with_kind(ErrorKind::Network)?; } else { return ws.normal_close("complete").await; } } msg = ws.recv() => { if msg.transpose().with_kind(ErrorKind::Network)?.is_none() { return Ok(()) } } } } } .await { tracing::error!("Error in db websocket: {e}"); tracing::debug!("{e:?}"); } }, Duration::from_secs(30), ), ) .await; Ok(SubscribeRes { dump, guid }) } #[derive(Deserialize, Serialize, Parser)] #[serde(rename_all = "camelCase")] #[command(rename_all = "kebab-case")] pub struct CliApplyParams { #[arg(long, help = "help.arg.allow-model-mismatch")] allow_model_mismatch: bool, #[arg(help = "help.arg.db-apply-expr")] expr: String, #[arg(help = "help.arg.db-path")] path: Option, } #[instrument(skip_all)] async fn cli_apply( HandlerArgs { context, parent_method, method, params: CliApplyParams { allow_model_mismatch, expr, path, }, .. }: HandlerArgs, ) -> Result<(), RpcError> { if let Some(path) = path { PatchDb::open(path) .await? .apply_function(|db| { let res = apply_expr( serde_json::to_value(patch_db::Value::from(db)) .with_kind(ErrorKind::Deserialization)? .into(), &expr, )?; let value = if allow_model_mismatch { serde_json::from_value::(res.clone().into()).with_ctx(|_| { ( crate::ErrorKind::Deserialization, "result does not match database model", ) })? } else { to_value( &serde_json::from_value::(res.clone().into()).with_ctx( |_| { ( crate::ErrorKind::Deserialization, "result does not match database model", ) }, )?, )? }; Ok::<_, Error>((value, ())) }) .await .result?; } else { let method = parent_method.into_iter().chain(method).join("."); context .call_remote::(&method, imbl_value::json!({ "expr": expr })) .await?; } Ok(()) } #[derive(Deserialize, Serialize, Parser, TS)] #[serde(rename_all = "camelCase")] #[command(rename_all = "kebab-case")] pub struct ApplyParams { #[arg(help = "help.arg.db-apply-expr")] expr: String, } pub async fn apply(ctx: RpcContext, ApplyParams { expr }: ApplyParams) -> Result<(), Error> { ctx.db .mutate(|db| { let res = apply_expr( serde_json::to_value(patch_db::Value::from(db.clone())) .with_kind(ErrorKind::Deserialization)? .into(), &expr, )?; db.ser( &serde_json::from_value::(res.clone().into()).with_ctx(|_| { ( crate::ErrorKind::Deserialization, "result does not match database model", ) })?, ) }) .await .result } pub fn put() -> ParentHandler { ParentHandler::new().subcommand( "ui", from_fn_async(ui) .with_display_serializable() .with_about("about.add-path-value-db") .with_call_remote::(), ) } #[derive(Deserialize, Serialize, Parser, TS)] #[serde(rename_all = "camelCase")] #[command(rename_all = "kebab-case")] pub struct UiParams { #[arg(help = "help.arg.json-pointer")] #[ts(type = "string")] pointer: JsonPointer, #[arg(help = "help.arg.json-value")] #[ts(type = "any")] value: Value, } // #[command(display(display_serializable))] #[instrument(skip_all)] pub async fn ui(ctx: RpcContext, UiParams { pointer, value, .. }: UiParams) -> Result<(), Error> { let ptr = "/public/ui" .parse::() .with_kind(ErrorKind::Database)? + &pointer; ctx.db.put(&ptr, &value).await?; Ok(()) }