un-workspace

This commit is contained in:
Aiden McClelland
2024-05-03 13:15:49 -06:00
parent 320d832359
commit 5374aef88d
28 changed files with 43 additions and 2601 deletions

284
src/cli.rs Normal file
View File

@@ -0,0 +1,284 @@
use std::any::TypeId;
use std::collections::VecDeque;
use std::ffi::OsString;
use clap::{CommandFactory, FromArgMatches};
use futures::Future;
use imbl_value::{InOMap, Value};
use reqwest::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE};
use reqwest::{Client, Method};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use url::Url;
use yajrc::{Id, RpcError};
use crate::util::{internal_error, parse_error, PhantomData};
use crate::{
AnyHandler, CliBindingsAny, DynHandler, Empty, HandleAny, HandleAnyArgs, Handler, HandlerArgs,
HandlerArgsFor, HandlerTypes, IntoContext, Name, ParentHandler, PrintCliResult,
};
type GenericRpcMethod<'a> = yajrc::GenericRpcMethod<&'a str, Value, Value>;
type RpcRequest<'a> = yajrc::RpcRequest<GenericRpcMethod<'a>>;
type RpcResponse<'a> = yajrc::RpcResponse<GenericRpcMethod<'static>>;
pub struct CliApp<Context: crate::Context + Clone, Config: CommandFactory + FromArgMatches> {
_phantom: PhantomData<(Context, Config)>,
make_ctx: Box<dyn FnOnce(Config) -> Result<Context, RpcError> + Send + Sync>,
root_handler: ParentHandler,
}
impl<Context: crate::Context + Clone, Config: CommandFactory + FromArgMatches>
CliApp<Context, Config>
{
pub fn new<MakeCtx: FnOnce(Config) -> Result<Context, RpcError> + Send + Sync + 'static>(
make_ctx: MakeCtx,
root_handler: ParentHandler,
) -> Self {
Self {
_phantom: PhantomData::new(),
make_ctx: Box::new(make_ctx),
root_handler,
}
}
pub fn run(self, args: impl IntoIterator<Item = OsString>) -> Result<(), RpcError> {
let ctx_ty = TypeId::of::<Context>();
let mut cmd = Config::command();
for (name, handlers) in &self.root_handler.subcommands.0 {
if let (Name(Some(name)), Some(DynHandler::WithCli(handler))) = (
name,
if let Some(handler) = handlers.get(&Some(ctx_ty)) {
Some(handler)
} else if let Some(handler) = handlers.get(&None) {
Some(handler)
} else {
None
},
) {
cmd = cmd.subcommand(handler.cli_command(ctx_ty).name(name));
}
}
let matches = cmd.get_matches_from(args);
let config = Config::from_arg_matches(&matches)?;
let ctx = (self.make_ctx)(config)?;
let root_handler = AnyHandler::new(self.root_handler);
let (method, params) = root_handler.cli_parse(&matches, ctx_ty)?;
let res = root_handler.handle_sync(HandleAnyArgs {
context: ctx.clone().upcast(),
parent_method: VecDeque::new(),
method: method.clone(),
params: params.clone(),
inherited: crate::Empty {},
})?;
root_handler.cli_display(
HandleAnyArgs {
context: ctx.upcast(),
parent_method: VecDeque::new(),
method,
params,
inherited: crate::Empty {},
},
res,
)?;
Ok(())
}
}
pub trait CallRemote<RemoteContext, Extra = Empty>: crate::Context {
fn call_remote(
&self,
method: &str,
params: Value,
extra: Extra,
) -> impl Future<Output = Result<Value, RpcError>> + Send;
}
pub async fn call_remote_http(
client: &Client,
url: Url,
method: &str,
params: Value,
) -> Result<Value, RpcError> {
let rpc_req = RpcRequest {
id: Some(Id::Number(0.into())),
method: GenericRpcMethod::new(method),
params,
};
let mut req = client.request(Method::POST, url);
let body;
#[cfg(feature = "cbor")]
{
req = req.header(CONTENT_TYPE, "application/cbor");
req = req.header(ACCEPT, "application/cbor, application/json");
body = serde_cbor::to_vec(&rpc_req)?;
}
#[cfg(not(feature = "cbor"))]
{
req = req.header(CONTENT_TYPE, "application/json");
req = req.header(ACCEPT, "application/json");
body = serde_json::to_vec(&rpc_req)?;
}
let res = req
.header(CONTENT_LENGTH, body.len())
.body(body)
.send()
.await?;
match res
.headers()
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
{
Some("application/json") => {
serde_json::from_slice::<RpcResponse>(&*res.bytes().await.map_err(internal_error)?)
.map_err(parse_error)?
.result
}
#[cfg(feature = "cbor")]
Some("application/cbor") => {
serde_cbor::from_slice::<RpcResponse>(&*res.bytes().await.map_err(internal_error)?)
.map_err(parse_error)?
.result
}
_ => Err(internal_error("missing content type")),
}
}
pub async fn call_remote_socket(
connection: impl AsyncRead + AsyncWrite,
method: &str,
params: Value,
) -> Result<Value, RpcError> {
let rpc_req = RpcRequest {
id: Some(Id::Number(0.into())),
method: GenericRpcMethod::new(method),
params,
};
let conn = connection;
tokio::pin!(conn);
let mut buf = serde_json::to_vec(&rpc_req).map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})?;
buf.push(b'\n');
conn.write_all(&buf).await.map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})?;
let mut line = String::new();
BufReader::new(conn).read_line(&mut line).await?;
serde_json::from_str::<RpcResponse>(&line)
.map_err(parse_error)?
.result
}
struct CallRemoteHandler<Context, RemoteHandler> {
_phantom: PhantomData<Context>,
handler: RemoteHandler,
}
impl<Context, RemoteHandler> CallRemoteHandler<Context, RemoteHandler> {
pub fn new(handler: RemoteHandler) -> Self {
Self {
_phantom: PhantomData::new(),
handler: handler,
}
}
}
impl<Context, RemoteHandler: Clone> Clone for CallRemoteHandler<Context, RemoteHandler> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData::new(),
handler: self.handler.clone(),
}
}
}
impl<Context, RemoteHandler> std::fmt::Debug for CallRemoteHandler<Context, RemoteHandler> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("CallRemoteHandler").finish()
}
}
impl<Context, RemoteHandler> HandlerTypes for CallRemoteHandler<Context, RemoteHandler>
where
RemoteHandler: HandlerTypes,
RemoteHandler::Params: Serialize,
RemoteHandler::InheritedParams: Serialize,
RemoteHandler::Ok: DeserializeOwned,
RemoteHandler::Err: From<RpcError>,
{
type Params = RemoteHandler::Params;
type InheritedParams = RemoteHandler::InheritedParams;
type Ok = RemoteHandler::Ok;
type Err = RemoteHandler::Err;
}
impl<Context, RemoteHandler> Handler for CallRemoteHandler<Context, RemoteHandler>
where
Context: CallRemote<RemoteHandler::Context>,
RemoteHandler: Handler,
RemoteHandler::Params: Serialize,
RemoteHandler::InheritedParams: Serialize,
RemoteHandler::Ok: DeserializeOwned,
RemoteHandler::Err: From<RpcError>,
{
type Context = Context;
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Context, Self>,
) -> Result<Self::Ok, Self::Err> {
let full_method = handle_args
.parent_method
.into_iter()
.chain(handle_args.method)
.collect::<Vec<_>>();
match handle_args
.context
.call_remote(
&full_method.join("."),
handle_args.raw_params.clone(),
Empty {},
)
.await
{
Ok(a) => imbl_value::from_value(a)
.map_err(internal_error)
.map_err(Self::Err::from),
Err(e) => Err(Self::Err::from(e)),
}
}
}
impl<Context, RemoteHandler> PrintCliResult for CallRemoteHandler<Context, RemoteHandler>
where
Context: CallRemote<RemoteHandler::Context>,
RemoteHandler: PrintCliResult<Context = Context>,
RemoteHandler::Params: Serialize,
RemoteHandler::InheritedParams: Serialize,
RemoteHandler::Ok: DeserializeOwned,
RemoteHandler::Err: From<RpcError>,
{
type Context = Context;
fn print(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
self.handler.print(
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
},
result,
)
}
}

33
src/command_helpers.rs Normal file
View File

@@ -0,0 +1,33 @@
use std::fmt::Display;
use std::io::Stdin;
use std::str::FromStr;
use clap::ArgMatches;
pub use {clap, serde};
pub fn default_arg_parser<T>(arg: &str, _: &ArgMatches) -> Result<T, clap::Error>
where
T: FromStr,
T::Err: Display,
{
arg.parse()
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::ValueValidation, e))
}
pub fn default_stdin_parser<T>(stdin: &mut Stdin, _: &ArgMatches) -> Result<T, clap::Error>
where
T: FromStr,
T::Err: Display,
{
let mut s = String::new();
stdin
.read_line(&mut s)
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::Io, e))?;
if let Some(s) = s.strip_suffix("\n") {
s
} else {
&s
}
.parse()
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::ValueValidation, e))
}

122
src/context.rs Normal file
View File

@@ -0,0 +1,122 @@
use std::any::{Any, TypeId};
use imbl_value::imbl::OrdSet;
use tokio::runtime::Handle;
pub trait Context: Any + Send + Sync + 'static {
fn inner_type_id(&self) -> TypeId {
<Self as Any>::type_id(&self)
}
fn runtime(&self) -> Handle {
Handle::current()
}
}
#[allow(private_bounds)]
pub trait IntoContext: sealed::Sealed + Any + Send + Sync + Sized + 'static {
fn runtime(&self) -> Handle;
fn type_ids() -> Option<OrdSet<TypeId>>;
fn inner_type_id(&self) -> TypeId;
fn upcast(self) -> AnyContext;
fn downcast(value: AnyContext) -> Result<Self, AnyContext>;
}
impl<C: Context + Sized> IntoContext for C {
fn runtime(&self) -> Handle {
<C as Context>::runtime(&self)
}
fn type_ids() -> Option<OrdSet<TypeId>> {
let mut set = OrdSet::new();
set.insert(TypeId::of::<C>());
Some(set)
}
fn inner_type_id(&self) -> TypeId {
TypeId::of::<C>()
}
fn upcast(self) -> AnyContext {
AnyContext::new(self)
}
fn downcast(value: AnyContext) -> Result<Self, AnyContext> {
if value.0.inner_type_id() == TypeId::of::<C>() {
unsafe { Ok(value.downcast_unchecked::<C>()) }
} else {
Err(value)
}
}
}
pub enum EitherContext<C1, C2> {
C1(C1),
C2(C2),
}
impl<C1: IntoContext, C2: IntoContext> IntoContext for EitherContext<C1, C2> {
fn runtime(&self) -> Handle {
match self {
Self::C1(a) => a.runtime(),
Self::C2(a) => a.runtime(),
}
}
fn type_ids() -> Option<OrdSet<TypeId>> {
let mut set = OrdSet::new();
set.extend(C1::type_ids()?);
set.extend(C2::type_ids()?);
Some(set)
}
fn inner_type_id(&self) -> TypeId {
match self {
EitherContext::C1(c) => c.inner_type_id(),
EitherContext::C2(c) => c.inner_type_id(),
}
}
fn downcast(value: AnyContext) -> Result<Self, AnyContext> {
match C1::downcast(value) {
Ok(a) => Ok(EitherContext::C1(a)),
Err(value) => match C2::downcast(value) {
Ok(a) => Ok(EitherContext::C2(a)),
Err(value) => Err(value),
},
}
}
fn upcast(self) -> AnyContext {
match self {
Self::C1(c) => c.upcast(),
Self::C2(c) => c.upcast(),
}
}
}
pub struct AnyContext(Box<dyn Context>);
impl AnyContext {
pub fn new<C: Context>(value: C) -> Self {
Self(Box::new(value))
}
unsafe fn downcast_unchecked<C: Context>(self) -> C {
let raw: *mut dyn Context = Box::into_raw(self.0);
*Box::from_raw(raw as *mut C)
}
}
impl IntoContext for AnyContext {
fn runtime(&self) -> Handle {
self.0.runtime()
}
fn type_ids() -> Option<OrdSet<TypeId>> {
None
}
fn inner_type_id(&self) -> TypeId {
self.0.inner_type_id()
}
fn downcast(value: AnyContext) -> Result<Self, AnyContext> {
Ok(value)
}
fn upcast(self) -> AnyContext {
self
}
}
mod sealed {
pub(crate) trait Sealed {}
impl<C: super::Context> Sealed for C {}
impl<C1: super::IntoContext, C2: super::IntoContext> Sealed for super::EitherContext<C1, C2> {}
impl Sealed for super::AnyContext {}
}

728
src/handler/adapters.rs Normal file
View File

@@ -0,0 +1,728 @@
use std::any::TypeId;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
use clap::{CommandFactory, FromArgMatches};
use imbl_value::imbl::{OrdMap, OrdSet};
use imbl_value::Value;
use serde::de::DeserializeOwned;
use serde::Serialize;
use yajrc::RpcError;
use crate::util::{internal_error, invalid_params, without, Flat, PhantomData};
use crate::{
iter_from_ctx_and_handler, AnyContext, AnyHandler, CallRemote, CliBindings, DynHandler,
EitherContext, Empty, Handler, HandlerArgs, HandlerArgsFor, HandlerTypes, IntoContext,
IntoHandlers, OrEmpty, PrintCliResult,
};
pub trait HandlerExt: Handler + Sized {
fn no_cli(self) -> NoCli<Self>;
fn no_display(self) -> NoDisplay<Self>;
fn with_custom_display<P>(self, display: P) -> CustomDisplay<P, Self>
where
P: PrintCliResult<
Params = Self::Params,
InheritedParams = Self::InheritedParams,
Ok = Self::Ok,
Err = Self::Err,
>;
fn with_custom_display_fn<Context: IntoContext, F>(
self,
display: F,
) -> CustomDisplayFn<F, Self, Context>
where
F: Fn(HandlerArgsFor<Context, Self>, Self::Ok) -> Result<(), Self::Err>;
fn with_inherited<Params, InheritedParams, F>(
self,
f: F,
) -> InheritanceHandler<Params, InheritedParams, Self, F>
where
F: Fn(Params, InheritedParams) -> Self::InheritedParams;
fn with_call_remote<Context, Extra>(self) -> RemoteCaller<Context, Self, Extra>;
}
impl<T: Handler + Sized> HandlerExt for T {
fn no_cli(self) -> NoCli<Self> {
NoCli(self)
}
fn no_display(self) -> NoDisplay<Self> {
NoDisplay(self)
}
fn with_custom_display<P>(self, display: P) -> CustomDisplay<P, Self>
where
P: PrintCliResult<
Params = Self::Params,
InheritedParams = Self::InheritedParams,
Ok = Self::Ok,
Err = Self::Err,
>,
{
CustomDisplay {
print: display,
handler: self,
}
}
fn with_custom_display_fn<Context: IntoContext, F>(
self,
display: F,
) -> CustomDisplayFn<F, Self, Context>
where
F: Fn(HandlerArgsFor<Context, Self>, Self::Ok) -> Result<(), Self::Err>,
{
CustomDisplayFn {
_phantom: PhantomData::new(),
print: display,
handler: self,
}
}
fn with_inherited<Params, InheritedParams, F>(
self,
f: F,
) -> InheritanceHandler<Params, InheritedParams, Self, F>
where
F: Fn(Params, InheritedParams) -> Self::InheritedParams,
{
InheritanceHandler {
_phantom: PhantomData::new(),
handler: self,
inherit: f,
}
}
fn with_call_remote<Context, Extra>(self) -> RemoteCaller<Context, Self, Extra> {
RemoteCaller {
_phantom: PhantomData::new(),
handler: self,
}
}
}
#[derive(Debug, Clone)]
pub struct NoCli<H>(pub H);
impl<H: HandlerTypes> HandlerTypes for NoCli<H> {
type Params = H::Params;
type InheritedParams = H::InheritedParams;
type Ok = H::Ok;
type Err = H::Err;
}
impl<H, A, B> IntoHandlers<Flat<A, B>> for NoCli<H>
where
H: Handler,
H::Params: DeserializeOwned,
H::InheritedParams: OrEmpty<Flat<A, B>>,
H::Ok: Serialize + DeserializeOwned,
RpcError: From<H::Err>,
A: Send + Sync + 'static,
B: Send + Sync + 'static,
{
fn into_handlers(self) -> impl IntoIterator<Item = (Option<TypeId>, DynHandler<Flat<A, B>>)> {
iter_from_ctx_and_handler(
self.0.contexts(),
DynHandler::WithoutCli(Arc::new(AnyHandler::new(
self.0.with_inherited(|a, b| OrEmpty::from_t(Flat(a, b))),
))),
)
}
}
#[derive(Debug, Clone)]
pub struct NoDisplay<H>(pub H);
impl<H: HandlerTypes> HandlerTypes for NoDisplay<H> {
type Params = H::Params;
type InheritedParams = H::InheritedParams;
type Ok = H::Ok;
type Err = H::Err;
}
impl<H> Handler for NoDisplay<H>
where
H: Handler,
{
type Context = H::Context;
fn handle_sync(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.0.handle_sync(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
})
}
async fn handle_async(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.0
.handle_async(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
})
.await
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.0.metadata(method, ctx_ty)
}
fn contexts(&self) -> Option<OrdSet<TypeId>> {
self.0.contexts()
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.0.method_from_dots(method, ctx_ty)
}
}
impl<H> PrintCliResult for NoDisplay<H>
where
H: HandlerTypes,
H::Params: FromArgMatches + CommandFactory + Serialize,
{
type Context = AnyContext;
fn print(&self, _: HandlerArgsFor<Self::Context, Self>, _: Self::Ok) -> Result<(), Self::Err> {
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct CustomDisplay<P, H> {
print: P,
handler: H,
}
impl<P, H> HandlerTypes for CustomDisplay<P, H>
where
H: HandlerTypes,
{
type Params = H::Params;
type InheritedParams = H::InheritedParams;
type Ok = H::Ok;
type Err = H::Err;
}
impl<P, H> Handler for CustomDisplay<P, H>
where
H: Handler,
P: Send + Sync + Clone + 'static,
{
type Context = H::Context;
fn handle_sync(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.handler.handle_sync(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
})
}
async fn handle_async(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.handler
.handle_async(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
})
.await
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.handler.metadata(method, ctx_ty)
}
fn contexts(&self) -> Option<OrdSet<TypeId>> {
self.handler.contexts()
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.handler.method_from_dots(method, ctx_ty)
}
}
impl<P, H> PrintCliResult for CustomDisplay<P, H>
where
H: HandlerTypes,
P: PrintCliResult<
Params = H::Params,
InheritedParams = H::InheritedParams,
Ok = H::Ok,
Err = H::Err,
> + Send
+ Sync
+ Clone
+ 'static,
{
type Context = P::Context;
fn print(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
self.print.print(
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
},
result,
)
}
}
pub struct CustomDisplayFn<F, H, Context = AnyContext> {
_phantom: PhantomData<Context>,
print: F,
handler: H,
}
impl<Context, F: Clone, H: Clone> Clone for CustomDisplayFn<F, H, Context> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData::new(),
print: self.print.clone(),
handler: self.handler.clone(),
}
}
}
impl<Context, F: Debug, H: Debug> Debug for CustomDisplayFn<F, H, Context> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CustomDisplayFn")
.field("print", &self.print)
.field("handler", &self.handler)
.finish()
}
}
impl<F, H, Context> HandlerTypes for CustomDisplayFn<F, H, Context>
where
H: HandlerTypes,
{
type Params = H::Params;
type InheritedParams = H::InheritedParams;
type Ok = H::Ok;
type Err = H::Err;
}
impl<F, H, Context> Handler for CustomDisplayFn<F, H, Context>
where
Context: Send + Sync + 'static,
H: Handler,
F: Send + Sync + Clone + 'static,
{
type Context = H::Context;
fn handle_sync(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.handler.handle_sync(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
})
}
async fn handle_async(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.handler
.handle_async(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
})
.await
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.handler.metadata(method, ctx_ty)
}
fn contexts(&self) -> Option<OrdSet<TypeId>> {
self.handler.contexts()
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.handler.method_from_dots(method, ctx_ty)
}
}
impl<F, H, Context> PrintCliResult for CustomDisplayFn<F, H, Context>
where
Context: IntoContext,
H: HandlerTypes,
F: Fn(HandlerArgsFor<Context, H>, H::Ok) -> Result<(), H::Err> + Send + Sync + Clone + 'static,
{
type Context = Context;
fn print(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
(self.print)(
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
},
result,
)
}
}
pub struct RemoteCaller<Context, H, Extra = Empty> {
_phantom: PhantomData<(Context, Extra)>,
handler: H,
}
impl<Context, H: Clone, Extra> Clone for RemoteCaller<Context, H, Extra> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData::new(),
handler: self.handler.clone(),
}
}
}
impl<Context, H: Debug, Extra> Debug for RemoteCaller<Context, H, Extra> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RemoteCaller").field(&self.handler).finish()
}
}
impl<Context, H, Extra> HandlerTypes for RemoteCaller<Context, H, Extra>
where
H: HandlerTypes,
Extra: Send + Sync + 'static,
{
type Params = Flat<H::Params, Extra>;
type InheritedParams = H::InheritedParams;
type Ok = H::Ok;
type Err = H::Err;
}
impl<Context, H, Extra> Handler for RemoteCaller<Context, H, Extra>
where
Context: CallRemote<H::Context, Extra>,
H: Handler,
H::Params: Serialize,
H::InheritedParams: Serialize,
H::Ok: DeserializeOwned,
H::Err: From<RpcError>,
Extra: Serialize + Send + Sync + 'static,
{
type Context = EitherContext<Context, H::Context>;
async fn handle_async(
&self,
HandlerArgs {
context,
parent_method,
method,
params: Flat(params, extra),
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
match context {
EitherContext::C1(context) => {
let full_method = parent_method.into_iter().chain(method).collect::<Vec<_>>();
match context
.call_remote(
&full_method.join("."),
without(raw_params, &extra).map_err(invalid_params)?,
extra,
)
.await
{
Ok(a) => imbl_value::from_value(a)
.map_err(internal_error)
.map_err(Self::Err::from),
Err(e) => Err(Self::Err::from(e)),
}
}
EitherContext::C2(context) => {
self.handler
.handle_async(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
})
.await
}
}
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.handler.metadata(method, ctx_ty)
}
fn contexts(&self) -> Option<OrdSet<TypeId>> {
Self::Context::type_ids()
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.handler.method_from_dots(method, ctx_ty)
}
}
impl<Context, H> PrintCliResult for RemoteCaller<Context, H>
where
Context: IntoContext,
H: PrintCliResult,
{
type Context = H::Context;
fn print(
&self,
HandlerArgs {
context,
parent_method,
method,
params: Flat(params, _),
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
self.handler.print(
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
},
result,
)
}
}
pub struct InheritanceHandler<Params, InheritedParams, H, F> {
_phantom: PhantomData<(Params, InheritedParams)>,
handler: H,
inherit: F,
}
impl<Params, InheritedParams, H: Clone, F: Clone> Clone
for InheritanceHandler<Params, InheritedParams, H, F>
{
fn clone(&self) -> Self {
Self {
_phantom: PhantomData::new(),
handler: self.handler.clone(),
inherit: self.inherit.clone(),
}
}
}
impl<Params, InheritedParams, H: std::fmt::Debug, F> std::fmt::Debug
for InheritanceHandler<Params, InheritedParams, H, F>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("InheritanceHandler")
.field(&self.handler)
.finish()
}
}
impl<Params, InheritedParams, H, F> HandlerTypes
for InheritanceHandler<Params, InheritedParams, H, F>
where
H: HandlerTypes,
Params: Send + Sync,
InheritedParams: Send + Sync,
{
type Params = H::Params;
type InheritedParams = Flat<Params, InheritedParams>;
type Ok = H::Ok;
type Err = H::Err;
}
impl<Params, InheritedParams, H, F> Handler for InheritanceHandler<Params, InheritedParams, H, F>
where
Params: Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
H: Handler,
F: Fn(Params, InheritedParams) -> H::InheritedParams + Send + Sync + Clone + 'static,
{
type Context = H::Context;
fn handle_sync(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.handler.handle_sync(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params: (self.inherit)(inherited_params.0, inherited_params.1),
raw_params,
})
}
async fn handle_async(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
self.handler
.handle_async(HandlerArgs {
context,
parent_method,
method,
params,
inherited_params: (self.inherit)(inherited_params.0, inherited_params.1),
raw_params,
})
.await
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.handler.metadata(method, ctx_ty)
}
fn contexts(&self) -> Option<OrdSet<TypeId>> {
self.handler.contexts()
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.handler.method_from_dots(method, ctx_ty)
}
}
impl<Params, InheritedParams, H, F> CliBindings
for InheritanceHandler<Params, InheritedParams, H, F>
where
Params: Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
H: CliBindings,
F: Fn(Params, InheritedParams) -> H::InheritedParams + Send + Sync + Clone + 'static,
{
type Context = H::Context;
fn cli_command(&self, ctx_ty: TypeId) -> clap::Command {
self.handler.cli_command(ctx_ty)
}
fn cli_parse(
&self,
matches: &clap::ArgMatches,
ctx_ty: TypeId,
) -> Result<(VecDeque<&'static str>, Value), clap::Error> {
self.handler.cli_parse(matches, ctx_ty)
}
fn cli_display(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
self.handler.cli_display(
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params: (self.inherit)(inherited_params.0, inherited_params.1),
raw_params,
},
result,
)
}
}

555
src/handler/from_fn.rs Normal file
View File

@@ -0,0 +1,555 @@
use std::any::TypeId;
use std::collections::VecDeque;
use std::fmt::Display;
use futures::Future;
use imbl_value::imbl::OrdMap;
use imbl_value::Value;
use serde::de::DeserializeOwned;
use crate::util::PhantomData;
use crate::{
AnyContext, Empty, Handler, HandlerArgs, HandlerArgsFor, HandlerTypes, IntoContext,
PrintCliResult,
};
pub struct FromFn<F, T, E, Args> {
_phantom: PhantomData<(T, E, Args)>,
function: F,
blocking: bool,
metadata: OrdMap<&'static str, Value>,
}
impl<F, T, E, Args> FromFn<F, T, E, Args> {
pub fn with_metadata(mut self, key: &'static str, value: Value) -> Self {
self.metadata.insert(key, value);
self
}
}
impl<F: Clone, T, E, Args> Clone for FromFn<F, T, E, Args> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData::new(),
function: self.function.clone(),
blocking: self.blocking,
metadata: self.metadata.clone(),
}
}
}
impl<F, T, E, Args> std::fmt::Debug for FromFn<F, T, E, Args> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FromFn")
.field("blocking", &self.blocking)
.finish()
}
}
impl<F, T, E, Args> PrintCliResult for FromFn<F, T, E, Args>
where
Self: HandlerTypes,
<Self as HandlerTypes>::Ok: Display,
{
type Context = AnyContext;
fn print(
&self,
_: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
Ok(println!("{result}"))
}
}
pub fn from_fn<F, T, E, Args>(function: F) -> FromFn<F, T, E, Args>
where
FromFn<F, T, E, Args>: HandlerTypes,
{
FromFn {
function,
_phantom: PhantomData::new(),
blocking: false,
metadata: OrdMap::new(),
}
}
pub fn from_fn_blocking<F, T, E, Args>(function: F) -> FromFn<F, T, E, Args>
where
FromFn<F, T, E, Args>: HandlerTypes,
{
FromFn {
function,
_phantom: PhantomData::new(),
blocking: true,
metadata: OrdMap::new(),
}
}
pub struct FromFnAsync<F, Fut, T, E, Args> {
_phantom: PhantomData<(Fut, T, E, Args)>,
function: F,
metadata: OrdMap<&'static str, Value>,
}
impl<F, Fut, T, E, Args> FromFnAsync<F, Fut, T, E, Args> {
pub fn with_metadata(mut self, key: &'static str, value: Value) -> Self {
self.metadata.insert(key, value);
self
}
}
impl<F: Clone, Fut, T, E, Args> Clone for FromFnAsync<F, Fut, T, E, Args> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData::new(),
function: self.function.clone(),
metadata: self.metadata.clone(),
}
}
}
impl<F, Fut, T, E, Args> std::fmt::Debug for FromFnAsync<F, Fut, T, E, Args> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FromFnAsync").finish()
}
}
impl<F, Fut, T, E, Args> PrintCliResult for FromFnAsync<F, Fut, T, E, Args>
where
Self: HandlerTypes,
<Self as HandlerTypes>::Ok: Display,
{
type Context = AnyContext;
fn print(
&self,
_: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
Ok(println!("{result}"))
}
}
pub fn from_fn_async<F, Fut, T, E, Args>(function: F) -> FromFnAsync<F, Fut, T, E, Args>
where
FromFnAsync<F, Fut, T, E, Args>: HandlerTypes,
{
FromFnAsync {
function,
_phantom: PhantomData::new(),
metadata: OrdMap::new(),
}
}
impl<F, T, E, Context, Params, InheritedParams> HandlerTypes
for FromFn<F, T, E, HandlerArgs<Context, Params, InheritedParams>>
where
F: Fn(HandlerArgs<Context, Params, InheritedParams>) -> Result<T, E>
+ Send
+ Sync
+ Clone
+ 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
Context: IntoContext,
Params: Send + Sync,
InheritedParams: Send + Sync,
{
type Params = Params;
type InheritedParams = InheritedParams;
type Ok = T;
type Err = E;
}
impl<F, T, E, Context, Params, InheritedParams> Handler
for FromFn<F, T, E, HandlerArgs<Context, Params, InheritedParams>>
where
F: Fn(HandlerArgs<Context, Params, InheritedParams>) -> Result<T, E>
+ Send
+ Sync
+ Clone
+ 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
Context: IntoContext,
Params: Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
{
type Context = Context;
fn handle_sync(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
(self.function)(handle_args)
}
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
if self.blocking {
self.handle_async_with_sync_blocking(handle_args).await
} else {
self.handle_async_with_sync(handle_args).await
}
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<F, Fut, T, E, Context, Params, InheritedParams> HandlerTypes
for FromFnAsync<F, Fut, T, E, HandlerArgs<Context, Params, InheritedParams>>
where
F: Fn(HandlerArgs<Context, Params, InheritedParams>) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
Context: IntoContext,
Params: Send + Sync,
InheritedParams: Send + Sync,
{
type Params = Params;
type InheritedParams = InheritedParams;
type Ok = T;
type Err = E;
}
impl<F, Fut, T, E, Context, Params, InheritedParams> Handler
for FromFnAsync<F, Fut, T, E, HandlerArgs<Context, Params, InheritedParams>>
where
F: Fn(HandlerArgs<Context, Params, InheritedParams>) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
Context: IntoContext,
Params: Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
{
type Context = Context;
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
(self.function)(handle_args).await
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<F, T, E> HandlerTypes for FromFn<F, T, E, ()>
where
F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Empty;
type InheritedParams = Empty;
type Ok = T;
type Err = E;
}
impl<F, T, E> Handler for FromFn<F, T, E, ()>
where
F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = AnyContext;
fn handle_sync(&self, _: HandlerArgsFor<Self::Context, Self>) -> Result<Self::Ok, Self::Err> {
(self.function)()
}
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
if self.blocking {
self.handle_async_with_sync_blocking(handle_args).await
} else {
self.handle_async_with_sync(handle_args).await
}
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<F, Fut, T, E> HandlerTypes for FromFnAsync<F, Fut, T, E, ()>
where
F: Fn() -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Empty;
type InheritedParams = Empty;
type Ok = T;
type Err = E;
}
impl<F, Fut, T, E> Handler for FromFnAsync<F, Fut, T, E, ()>
where
F: Fn() -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = AnyContext;
async fn handle_async(
&self,
_: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
(self.function)().await
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<Context, F, T, E> HandlerTypes for FromFn<F, T, E, (Context,)>
where
Context: IntoContext,
F: Fn(Context) -> Result<T, E> + Send + Sync + Clone + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Empty;
type InheritedParams = Empty;
type Ok = T;
type Err = E;
}
impl<Context, F, T, E> Handler for FromFn<F, T, E, (Context,)>
where
Context: IntoContext,
F: Fn(Context) -> Result<T, E> + Send + Sync + Clone + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = Context;
fn handle_sync(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
(self.function)(handle_args.context)
}
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
if self.blocking {
self.handle_async_with_sync_blocking(handle_args).await
} else {
self.handle_async_with_sync(handle_args).await
}
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<Context, F, Fut, T, E> HandlerTypes for FromFnAsync<F, Fut, T, E, (Context,)>
where
Context: IntoContext,
F: Fn(Context) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Empty;
type InheritedParams = Empty;
type Ok = T;
type Err = E;
}
impl<Context, F, Fut, T, E> Handler for FromFnAsync<F, Fut, T, E, (Context,)>
where
Context: IntoContext,
F: Fn(Context) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = Context;
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
(self.function)(handle_args.context).await
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<Context, F, T, E, Params> HandlerTypes for FromFn<F, T, E, (Context, Params)>
where
Context: IntoContext,
F: Fn(Context, Params) -> Result<T, E> + Send + Sync + Clone + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Params;
type InheritedParams = Empty;
type Ok = T;
type Err = E;
}
impl<Context, F, T, E, Params> Handler for FromFn<F, T, E, (Context, Params)>
where
Context: IntoContext,
F: Fn(Context, Params) -> Result<T, E> + Send + Sync + Clone + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = Context;
fn handle_sync(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
let HandlerArgs {
context, params, ..
} = handle_args;
(self.function)(context, params)
}
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
if self.blocking {
self.handle_async_with_sync_blocking(handle_args).await
} else {
self.handle_async_with_sync(handle_args).await
}
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<Context, F, Fut, T, E, Params> HandlerTypes for FromFnAsync<F, Fut, T, E, (Context, Params)>
where
Context: IntoContext,
F: Fn(Context, Params) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Params;
type InheritedParams = Empty;
type Ok = T;
type Err = E;
}
impl<Context, F, Fut, T, E, Params> Handler for FromFnAsync<F, Fut, T, E, (Context, Params)>
where
Context: IntoContext,
F: Fn(Context, Params) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = Context;
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
let HandlerArgs {
context, params, ..
} = handle_args;
(self.function)(context, params).await
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<Context, F, T, E, Params, InheritedParams> HandlerTypes
for FromFn<F, T, E, (Context, Params, InheritedParams)>
where
Context: IntoContext,
F: Fn(Context, Params, InheritedParams) -> Result<T, E> + Send + Sync + Clone + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Params;
type InheritedParams = InheritedParams;
type Ok = T;
type Err = E;
}
impl<Context, F, T, E, Params, InheritedParams> Handler
for FromFn<F, T, E, (Context, Params, InheritedParams)>
where
Context: IntoContext,
F: Fn(Context, Params, InheritedParams) -> Result<T, E> + Send + Sync + Clone + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = Context;
fn handle_sync(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
let HandlerArgs {
context,
params,
inherited_params,
..
} = handle_args;
(self.function)(context, params, inherited_params)
}
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
if self.blocking {
self.handle_async_with_sync_blocking(handle_args).await
} else {
self.handle_async_with_sync(handle_args).await
}
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}
impl<Context, F, Fut, T, E, Params, InheritedParams> HandlerTypes
for FromFnAsync<F, Fut, T, E, (Context, Params, InheritedParams)>
where
Context: IntoContext,
F: Fn(Context, Params, InheritedParams) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Params = Params;
type InheritedParams = InheritedParams;
type Ok = T;
type Err = E;
}
impl<Context, F, Fut, T, E, Params, InheritedParams> Handler
for FromFnAsync<F, Fut, T, E, (Context, Params, InheritedParams)>
where
Context: IntoContext,
F: Fn(Context, Params, InheritedParams) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
Params: DeserializeOwned + Send + Sync + 'static,
InheritedParams: Send + Sync + 'static,
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
type Context = Context;
async fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
let HandlerArgs {
context,
params,
inherited_params,
..
} = handle_args;
(self.function)(context, params, inherited_params).await
}
fn metadata(&self, _: VecDeque<&'static str>, _: TypeId) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
}

429
src/handler/mod.rs Normal file
View File

@@ -0,0 +1,429 @@
use std::any::TypeId;
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::Arc;
use clap::{ArgMatches, Command, CommandFactory, FromArgMatches, Parser};
use futures::Future;
use imbl_value::imbl::{OrdMap, OrdSet};
use imbl_value::Value;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use yajrc::RpcError;
use crate::context::{AnyContext, IntoContext};
use crate::util::{internal_error, invalid_params, Flat};
pub mod adapters;
pub mod from_fn;
pub mod parent;
pub use adapters::*;
pub use from_fn::*;
pub use parent::*;
pub(crate) struct HandleAnyArgs<Inherited> {
pub(crate) context: AnyContext,
pub(crate) parent_method: VecDeque<&'static str>,
pub(crate) method: VecDeque<&'static str>,
pub(crate) params: Value,
pub(crate) inherited: Inherited,
}
impl<Inherited: Send + Sync> HandleAnyArgs<Inherited> {
fn downcast<Context: IntoContext, H>(
self,
) -> Result<HandlerArgsFor<Context, H>, imbl_value::Error>
where
H: HandlerTypes<InheritedParams = Inherited>,
H::Params: DeserializeOwned,
{
let Self {
context,
parent_method,
method,
params,
inherited,
} = self;
Ok(HandlerArgs {
context: Context::downcast(context).map_err(|_| imbl_value::Error {
kind: imbl_value::ErrorKind::Deserialization,
source: serde::ser::Error::custom("context does not match expected"),
})?,
parent_method,
method,
params: imbl_value::from_value(params.clone())?,
inherited_params: inherited,
raw_params: params,
})
}
}
#[async_trait::async_trait]
pub(crate) trait HandleAny: Send + Sync {
type Inherited: Send;
fn handle_sync(&self, handle_args: HandleAnyArgs<Self::Inherited>) -> Result<Value, RpcError>;
async fn handle_async(
&self,
handle_args: HandleAnyArgs<Self::Inherited>,
) -> Result<Value, RpcError>;
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value>;
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>>;
}
#[async_trait::async_trait]
impl<T: HandleAny> HandleAny for Arc<T> {
type Inherited = T::Inherited;
fn handle_sync(&self, handle_args: HandleAnyArgs<Self::Inherited>) -> Result<Value, RpcError> {
self.deref().handle_sync(handle_args)
}
async fn handle_async(
&self,
handle_args: HandleAnyArgs<Self::Inherited>,
) -> Result<Value, RpcError> {
self.deref().handle_async(handle_args).await
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.deref().metadata(method, ctx_ty)
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.deref().method_from_dots(method, ctx_ty)
}
}
pub(crate) trait CliBindingsAny {
type Inherited;
fn cli_command(&self, ctx_ty: TypeId) -> Command;
fn cli_parse(
&self,
matches: &ArgMatches,
ctx_ty: TypeId,
) -> Result<(VecDeque<&'static str>, Value), clap::Error>;
fn cli_display(
&self,
handle_args: HandleAnyArgs<Self::Inherited>,
result: Value,
) -> Result<(), RpcError>;
}
pub trait CliBindings: HandlerTypes {
type Context: IntoContext;
fn cli_command(&self, ctx_ty: TypeId) -> Command;
fn cli_parse(
&self,
matches: &ArgMatches,
ctx_ty: TypeId,
) -> Result<(VecDeque<&'static str>, Value), clap::Error>;
fn cli_display(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err>;
}
pub trait PrintCliResult: HandlerTypes {
type Context: IntoContext;
fn print(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err>;
}
impl<T> CliBindings for T
where
T: HandlerTypes,
T::Params: CommandFactory + FromArgMatches + Serialize,
T: PrintCliResult,
{
type Context = T::Context;
fn cli_command(&self, _: TypeId) -> clap::Command {
Self::Params::command()
}
fn cli_parse(
&self,
matches: &clap::ArgMatches,
_: TypeId,
) -> Result<(VecDeque<&'static str>, Value), clap::Error> {
Self::Params::from_arg_matches(matches).and_then(|a| {
Ok((
VecDeque::new(),
imbl_value::to_value(&a)
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::ValueValidation, e))?,
))
})
}
fn cli_display(
&self,
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<Self::Context, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
self.print(
HandlerArgs {
context,
parent_method,
method,
params,
inherited_params,
raw_params,
},
result,
)
}
}
pub(crate) trait HandleAnyWithCli<Inherited>:
HandleAny<Inherited = Inherited> + CliBindingsAny<Inherited = Inherited>
{
}
impl<Inherited, T: HandleAny<Inherited = Inherited> + CliBindingsAny<Inherited = Inherited>>
HandleAnyWithCli<Inherited> for T
{
}
#[allow(private_interfaces)]
pub enum DynHandler<Inherited> {
WithoutCli(Arc<dyn HandleAny<Inherited = Inherited>>),
WithCli(Arc<dyn HandleAnyWithCli<Inherited>>),
}
impl<Inherited> Clone for DynHandler<Inherited> {
fn clone(&self) -> Self {
match self {
Self::WithCli(a) => Self::WithCli(a.clone()),
Self::WithoutCli(a) => Self::WithoutCli(a.clone()),
}
}
}
#[async_trait::async_trait]
impl<Inherited: Send> HandleAny for DynHandler<Inherited> {
type Inherited = Inherited;
fn handle_sync(&self, handle_args: HandleAnyArgs<Self::Inherited>) -> Result<Value, RpcError> {
match self {
DynHandler::WithoutCli(h) => h.handle_sync(handle_args),
DynHandler::WithCli(h) => h.handle_sync(handle_args),
}
}
async fn handle_async(
&self,
handle_args: HandleAnyArgs<Self::Inherited>,
) -> Result<Value, RpcError> {
match self {
DynHandler::WithoutCli(h) => h.handle_async(handle_args).await,
DynHandler::WithCli(h) => h.handle_async(handle_args).await,
}
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
match self {
DynHandler::WithoutCli(h) => h.metadata(method, ctx_ty),
DynHandler::WithCli(h) => h.metadata(method, ctx_ty),
}
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
match self {
DynHandler::WithoutCli(h) => h.method_from_dots(method, ctx_ty),
DynHandler::WithCli(h) => h.method_from_dots(method, ctx_ty),
}
}
}
#[allow(type_alias_bounds)]
pub type HandlerArgsFor<Context: IntoContext, H: HandlerTypes + ?Sized> =
HandlerArgs<Context, H::Params, H::InheritedParams>;
#[derive(Debug, Clone)]
pub struct HandlerArgs<
Context: IntoContext,
Params: Send + Sync = Empty,
InheritedParams: Send + Sync = Empty,
> {
pub context: Context,
pub parent_method: VecDeque<&'static str>,
pub method: VecDeque<&'static str>,
pub params: Params,
pub inherited_params: InheritedParams,
pub raw_params: Value,
}
pub trait HandlerTypes {
type Params: Send + Sync;
type InheritedParams: Send + Sync;
type Ok: Send + Sync;
type Err: Send + Sync;
}
pub trait Handler: HandlerTypes + Clone + Send + Sync + 'static {
type Context: IntoContext;
fn handle_sync(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> Result<Self::Ok, Self::Err> {
handle_args
.context
.runtime()
.block_on(self.handle_async(handle_args))
}
fn handle_async(
&self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> impl Future<Output = Result<Self::Ok, Self::Err>> + Send;
fn handle_async_with_sync<'a>(
&'a self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> impl Future<Output = Result<Self::Ok, Self::Err>> + Send + 'a {
async move { self.handle_sync(handle_args) }
}
fn handle_async_with_sync_blocking<'a>(
&'a self,
handle_args: HandlerArgsFor<Self::Context, Self>,
) -> impl Future<Output = Result<Self::Ok, Self::Err>> + Send + 'a {
async move {
let s = self.clone();
handle_args
.context
.runtime()
.spawn_blocking(move || s.handle_sync(handle_args))
.await
.unwrap()
}
}
#[allow(unused_variables)]
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
OrdMap::new()
}
fn contexts(&self) -> Option<OrdSet<TypeId>> {
Self::Context::type_ids()
}
#[allow(unused_variables)]
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
if method.is_empty() {
Some(VecDeque::new())
} else {
None
}
}
}
pub(crate) struct AnyHandler<H>(H);
impl<H> AnyHandler<H> {
pub(crate) fn new(handler: H) -> Self {
Self(handler)
}
}
impl<H: std::fmt::Debug> std::fmt::Debug for AnyHandler<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("AnyHandler").field(&self.0).finish()
}
}
#[async_trait::async_trait]
impl<H: Handler> HandleAny for AnyHandler<H>
where
H::Params: DeserializeOwned,
H::Ok: Serialize,
RpcError: From<H::Err>,
{
type Inherited = H::InheritedParams;
fn handle_sync(&self, handle_args: HandleAnyArgs<Self::Inherited>) -> Result<Value, RpcError> {
imbl_value::to_value(
&self
.0
.handle_sync(handle_args.downcast::<_, H>().map_err(invalid_params)?)?,
)
.map_err(internal_error)
}
async fn handle_async(
&self,
handle_args: HandleAnyArgs<Self::Inherited>,
) -> Result<Value, RpcError> {
imbl_value::to_value(
&self
.0
.handle_async(handle_args.downcast::<_, H>().map_err(invalid_params)?)
.await?,
)
.map_err(internal_error)
}
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.0.metadata(method, ctx_ty)
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.0.method_from_dots(method, ctx_ty)
}
}
impl<H: CliBindings> CliBindingsAny for AnyHandler<H>
where
H: CliBindings,
H::Params: DeserializeOwned,
H::Ok: Serialize + DeserializeOwned,
RpcError: From<H::Err>,
{
type Inherited = H::InheritedParams;
fn cli_command(&self, ctx_ty: TypeId) -> Command {
self.0.cli_command(ctx_ty)
}
fn cli_parse(
&self,
matches: &ArgMatches,
ctx_ty: TypeId,
) -> Result<(VecDeque<&'static str>, Value), clap::Error> {
self.0.cli_parse(matches, ctx_ty)
}
fn cli_display(
&self,
handle_args: HandleAnyArgs<Self::Inherited>,
result: Value,
) -> Result<(), RpcError> {
self.0
.cli_display(
handle_args.downcast::<_, H>().map_err(invalid_params)?,
imbl_value::from_value(result).map_err(internal_error)?,
)
.map_err(RpcError::from)
}
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, Parser)]
pub struct Empty {}
pub(crate) trait OrEmpty<T> {
fn from_t(t: T) -> Self;
}
impl<T> OrEmpty<T> for T {
fn from_t(t: T) -> Self {
t
}
}
impl<A, B> OrEmpty<Flat<A, B>> for Empty {
fn from_t(t: Flat<A, B>) -> Self {
Empty {}
}
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, Parser)]
pub enum Never {}

372
src/handler/parent.rs Normal file
View File

@@ -0,0 +1,372 @@
use std::any::TypeId;
use std::collections::VecDeque;
use std::sync::Arc;
use clap::{ArgMatches, Command, CommandFactory, FromArgMatches};
use imbl_value::imbl::{OrdMap, OrdSet};
use imbl_value::Value;
use serde::de::DeserializeOwned;
use serde::Serialize;
use yajrc::RpcError;
use crate::util::{combine, Flat, PhantomData};
use crate::{
AnyContext, AnyHandler, CliBindings, DynHandler, Empty, HandleAny, HandleAnyArgs, Handler,
HandlerArgs, HandlerArgsFor, HandlerExt, HandlerTypes, IntoContext, OrEmpty,
};
pub trait IntoHandlers<Inherited>: HandlerTypes {
fn into_handlers(self) -> impl IntoIterator<Item = (Option<TypeId>, DynHandler<Inherited>)>;
}
impl<H, A, B> IntoHandlers<Flat<A, B>> for H
where
H: Handler + CliBindings,
H::Params: DeserializeOwned,
H::InheritedParams: OrEmpty<Flat<A, B>>,
H::Ok: Serialize + DeserializeOwned,
RpcError: From<H::Err>,
A: Send + Sync + 'static,
B: Send + Sync + 'static,
{
fn into_handlers(self) -> impl IntoIterator<Item = (Option<TypeId>, DynHandler<Flat<A, B>>)> {
iter_from_ctx_and_handler(
intersect_type_ids(self.contexts(), <Self as CliBindings>::Context::type_ids()),
DynHandler::WithCli(Arc::new(AnyHandler::new(
self.with_inherited(|a, b| OrEmpty::from_t(Flat(a, b))),
))),
)
}
}
pub(crate) fn iter_from_ctx_and_handler<Inherited>(
ctx: Option<OrdSet<TypeId>>,
handler: DynHandler<Inherited>,
) -> impl IntoIterator<Item = (Option<TypeId>, DynHandler<Inherited>)> {
if let Some(ctx) = ctx {
itertools::Either::Left(ctx.into_iter().map(Some))
} else {
itertools::Either::Right(std::iter::once(None))
}
.map(move |ctx| (ctx, handler.clone()))
}
pub(crate) fn intersect_type_ids(
a: Option<OrdSet<TypeId>>,
b: Option<OrdSet<TypeId>>,
) -> Option<OrdSet<TypeId>> {
match (a, b) {
(None, None) => None,
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(Some(a), Some(b)) => Some(a.intersection(b)),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct Name(pub(crate) Option<&'static str>);
impl<'a> std::borrow::Borrow<Option<&'a str>> for Name {
fn borrow(&self) -> &Option<&'a str> {
&self.0
}
}
pub(crate) struct SubcommandMap<Inherited>(
pub(crate) OrdMap<Name, OrdMap<Option<TypeId>, DynHandler<Inherited>>>,
);
impl<Inherited> Clone for SubcommandMap<Inherited> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<Inherited> SubcommandMap<Inherited> {
fn insert(
&mut self,
name: Option<&'static str>,
handlers: impl IntoIterator<Item = (Option<TypeId>, DynHandler<Inherited>)>,
) {
let mut for_name = self.0.remove(&name).unwrap_or_default();
for (ctx_ty, handler) in handlers {
for_name.insert(ctx_ty, handler);
}
self.0.insert(Name(name), for_name);
}
fn get<'a>(
&'a self,
ctx_ty: TypeId,
name: Option<&str>,
) -> Option<(Name, &'a DynHandler<Inherited>)> {
if let Some((name, for_name)) = self.0.get_key_value(&name) {
if let Some(for_ctx) = for_name.get(&Some(ctx_ty)) {
Some((*name, for_ctx))
} else {
for_name.get(&None).map(|h| (*name, h))
}
} else {
None
}
}
}
pub struct ParentHandler<Params = Empty, InheritedParams = Empty> {
_phantom: PhantomData<Params>,
pub(crate) subcommands: SubcommandMap<Flat<Params, InheritedParams>>,
metadata: OrdMap<&'static str, Value>,
}
impl<Params, InheritedParams> ParentHandler<Params, InheritedParams> {
pub fn new() -> Self {
Self {
_phantom: PhantomData::new(),
subcommands: SubcommandMap(OrdMap::new()),
metadata: OrdMap::new(),
}
}
pub fn with_metadata(mut self, key: &'static str, value: Value) -> Self {
self.metadata.insert(key, value);
self
}
}
impl<Params, InheritedParams> Clone for ParentHandler<Params, InheritedParams> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData::new(),
subcommands: self.subcommands.clone(),
metadata: self.metadata.clone(),
}
}
}
impl<Params, InheritedParams> std::fmt::Debug for ParentHandler<Params, InheritedParams> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ParentHandler")
// .field(&self.subcommands)
.finish()
}
}
impl<Params, InheritedParams> ParentHandler<Params, InheritedParams> {
fn get_contexts(&self) -> Option<OrdSet<TypeId>> {
let mut set = OrdSet::new();
for ctx_ty in self.subcommands.0.values().flat_map(|c| c.keys()) {
set.insert((*ctx_ty)?);
}
Some(set)
}
#[allow(private_bounds)]
pub fn subcommand<H>(mut self, name: &'static str, handler: H) -> Self
where
H: IntoHandlers<Flat<Params, InheritedParams>>,
{
self.subcommands
.insert(name.into(), handler.into_handlers());
self
}
#[allow(private_bounds)]
pub fn root_handler<H>(mut self, handler: H) -> Self
where
H: IntoHandlers<Flat<Params, InheritedParams>> + HandlerTypes<Params = Empty>,
{
self.subcommands.insert(None, handler.into_handlers());
self
}
}
impl<Params, InheritedParams> HandlerTypes for ParentHandler<Params, InheritedParams>
where
Params: Send + Sync,
InheritedParams: Send + Sync,
{
type Params = Params;
type InheritedParams = InheritedParams;
type Ok = Value;
type Err = RpcError;
}
impl<Params, InheritedParams> Handler for ParentHandler<Params, InheritedParams>
where
Params: Send + Sync + 'static,
InheritedParams: Serialize + Send + Sync + 'static,
{
type Context = AnyContext;
fn handle_sync(
&self,
HandlerArgs {
context,
mut parent_method,
mut method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<AnyContext, Self>,
) -> Result<Self::Ok, Self::Err> {
let cmd = method.pop_front();
if let Some(cmd) = cmd {
parent_method.push_back(cmd);
}
if let Some((_, sub_handler)) = &self.subcommands.get(context.inner_type_id(), cmd) {
sub_handler.handle_sync(HandleAnyArgs {
context,
parent_method,
method,
params: raw_params,
inherited: Flat(params, inherited_params),
})
} else {
Err(yajrc::METHOD_NOT_FOUND_ERROR)
}
}
async fn handle_async(
&self,
HandlerArgs {
context,
mut parent_method,
mut method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<AnyContext, Self>,
) -> Result<Self::Ok, Self::Err> {
let cmd = method.pop_front();
if let Some(cmd) = cmd {
parent_method.push_back(cmd);
}
if let Some((_, sub_handler)) = self.subcommands.get(context.inner_type_id(), cmd) {
sub_handler
.handle_async(HandleAnyArgs {
context,
parent_method,
method,
params: raw_params,
inherited: Flat(params, inherited_params),
})
.await
} else {
Err(yajrc::METHOD_NOT_FOUND_ERROR)
}
}
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
let metadata = self.metadata.clone();
if let Some((_, handler)) = self.subcommands.get(ctx_ty, method.pop_front()) {
handler.metadata(method, ctx_ty).union(metadata)
} else {
metadata
}
}
fn contexts(&self) -> Option<OrdSet<TypeId>> {
self.get_contexts()
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
let (head, tail) = if method.is_empty() {
(None, None)
} else {
method
.split_once(".")
.map(|(head, tail)| (Some(head), Some(tail)))
.unwrap_or((Some(method), None))
};
let (Name(name), h) = self.subcommands.get(ctx_ty, head)?;
let mut res = VecDeque::new();
if let Some(name) = name {
res.push_back(name);
}
if let Some(tail) = tail {
res.append(&mut h.method_from_dots(tail, ctx_ty)?);
}
Some(res)
}
}
impl<Params, InheritedParams> CliBindings for ParentHandler<Params, InheritedParams>
where
Params: FromArgMatches + CommandFactory + Serialize + Send + Sync + 'static,
InheritedParams: Serialize + Send + Sync + 'static,
{
type Context = AnyContext;
fn cli_command(&self, ctx_ty: TypeId) -> Command {
let mut base = Params::command().subcommand_required(true);
for (name, handlers) in &self.subcommands.0 {
match (
name,
if let Some(handler) = handlers.get(&Some(ctx_ty)) {
Some(handler)
} else if let Some(handler) = handlers.get(&None) {
Some(handler)
} else {
None
},
) {
(Name(Some(name)), Some(DynHandler::WithCli(handler))) => {
base = base.subcommand(handler.cli_command(ctx_ty).name(name));
}
(Name(None), Some(DynHandler::WithCli(_))) => {
base = base.subcommand_required(false);
}
_ => (),
}
}
base
}
fn cli_parse(
&self,
matches: &ArgMatches,
ctx_ty: TypeId,
) -> Result<(VecDeque<&'static str>, Value), clap::Error> {
let root_params = imbl_value::to_value(&Params::from_arg_matches(matches)?)
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::ValueValidation, e))?;
let (name, matches) = match matches.subcommand() {
Some((name, matches)) => (Some(name), matches),
None => (None, matches),
};
if let Some((Name(Some(name)), DynHandler::WithCli(handler))) =
self.subcommands.get(ctx_ty, name)
{
let (mut method, params) = handler.cli_parse(matches, ctx_ty)?;
method.push_front(name);
Ok((
method,
combine(root_params, params)
.map_err(|e| clap::Error::raw(clap::error::ErrorKind::ArgumentConflict, e))?,
))
} else {
Ok((VecDeque::new(), root_params))
}
}
fn cli_display(
&self,
HandlerArgs {
context,
mut parent_method,
mut method,
params,
inherited_params,
raw_params,
}: HandlerArgsFor<AnyContext, Self>,
result: Self::Ok,
) -> Result<(), Self::Err> {
let cmd = method.pop_front();
if let Some(cmd) = cmd {
parent_method.push_back(cmd);
}
if let Some((_, DynHandler::WithCli(sub_handler))) =
self.subcommands.get(context.inner_type_id(), cmd)
{
sub_handler.cli_display(
HandleAnyArgs {
context,
parent_method,
method,
params: raw_params,
inherited: Flat(params, inherited_params),
},
result,
)
} else {
Err(yajrc::METHOD_NOT_FOUND_ERROR)
}
}
}

35
src/lib.rs Normal file
View File

@@ -0,0 +1,35 @@
pub use cli::*;
// pub use command::*;
pub use context::*;
pub use handler::*;
/// `#[command(...)]`
/// - `#[command(cli_only)]` -> executed by CLI instead of RPC server (leaf commands only)
/// - `#[command(rpc_only)]` -> no CLI bindings (leaf commands only)
/// - `#[command(local)]` -> executed wherever it was invoked. (By RPC when hit from RPC server, by cli when invoked from CLI)
/// - `#[command(blocking)]` -> run with [spawn_blocking](tokio::task::spawn_blocking) if in an async context
/// - `#[command(about = "About text")]` -> Set about text for the command
/// - `#[command(rename = "new_name")]` -> Set the name of the command to `new_name` in the RPC and CLI
/// - `#[command(subcommands(...))]` -> Set this as the parent command for the listed subcommands
/// - note: the return type of the decorated function must be the [Context] required by its subcommands, and all args must implement [Clone](std::clone::Clone).
/// - `#[command(subcommands(self(self_command_impl)))]` -> If no subcommand is provided, call this function
/// - `self_command_impl :: Context ctx, Display res, Into<RpcError> err => ctx -> Result<res, err>`
/// - note: [Display](std::fmt::Display) is not required for `res` if it has a custom display function that will take it as input
/// - if `self_command_impl` is async, write `self(self_command_impl(async))`
/// - if `self_command_impl` is blocking, write `self(self_command_impl(blocking))`
/// - default: require a subcommand if subcommands are specified
/// - `#[command(display(custom_display_fn))]` -> Use the function `custom_display_fn` to display the command's result (leaf commands only)
/// - `custom_display_fn :: res -> ()`
/// - note: `res` is the type of the decorated command's output
/// - default: `default_display`
///
/// See also: [arg](rpc_toolkit_macro::arg), [context](rpc_toolkit_macro::context)
pub use rpc_toolkit_macro::command;
pub use server::*;
pub use {clap, futures, reqwest, serde, serde_json, tokio, url, yajrc};
mod cli;
pub mod command_helpers;
mod context;
mod handler;
mod server;
mod util;

68
src/metadata.rs Normal file
View File

@@ -0,0 +1,68 @@
macro_rules! getter_for {
($($name:ident => $t:ty,)*) => {
$(
#[allow(unused_variables)]
fn $name(&self, command: &str, key: &str) -> Option<$t> {
None
}
)*
};
}
pub trait Metadata: Copy + Default + Send + Sync + 'static {
fn get<Ty: Primitive>(&self, command: &str, key: &str) -> Option<Ty> {
Ty::from_metadata(self, command, key)
}
getter_for!(
get_bool => bool,
get_u8 => u8,
get_u16 => u16,
get_u32 => u32,
get_u64 => u64,
get_usize => usize,
get_i8 => i8,
get_i16 => i16,
get_i32 => i32,
get_i64 => i64,
get_isize => isize,
get_f32 => f32,
get_f64 => f64,
get_char => char,
get_str => &'static str,
get_bstr => &'static [u8],
);
}
macro_rules! impl_primitive_for {
($($name:ident => $t:ty,)*) => {
$(
impl Primitive for $t {
fn from_metadata<M: Metadata + ?Sized>(m: &M, command: &str, key: &str) -> Option<Self> {
m.$name(command, key)
}
}
)*
};
}
pub trait Primitive: Copy {
fn from_metadata<M: Metadata + ?Sized>(m: &M, command: &str, key: &str) -> Option<Self>;
}
impl_primitive_for!(
get_bool => bool,
get_u8 => u8,
get_u16 => u16,
get_u32 => u32,
get_u64 => u64,
get_usize => usize,
get_i8 => i8,
get_i16 => i16,
get_i32 => i32,
get_i64 => i64,
get_isize => isize,
get_f32 => f32,
get_f64 => f64,
get_char => char,
get_str => &'static str,
get_bstr => &'static [u8],
);

305
src/server/http.rs Normal file
View File

@@ -0,0 +1,305 @@
use std::any::TypeId;
use axum::body::Body;
use axum::extract::Request;
use axum::handler::Handler;
use axum::response::Response;
use futures::future::{join_all, BoxFuture};
use futures::{Future, FutureExt};
use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
use http_body_util::BodyExt;
use imbl_value::imbl::Vector;
use imbl_value::Value;
use serde::de::DeserializeOwned;
use serde::Serialize;
use yajrc::{RpcError, RpcMethod};
use crate::server::{RpcRequest, RpcResponse, SingleOrBatchRpcRequest};
use crate::util::{internal_error, parse_error};
use crate::{HandleAny, Server};
const FALLBACK_ERROR: &str = "{\"error\":{\"code\":-32603,\"message\":\"Internal error\",\"data\":\"Failed to serialize rpc response\"}}";
pub fn fallback_rpc_error_response() -> Response {
Response::builder()
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, FALLBACK_ERROR.len())
.body(Body::from(FALLBACK_ERROR.as_bytes()))
.unwrap()
}
pub fn json_http_response<T: Serialize>(t: &T) -> Response {
let body = match serde_json::to_vec(t) {
Ok(a) => a,
Err(_) => return fallback_rpc_error_response(),
};
Response::builder()
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, body.len())
.body(Body::from(body))
.unwrap_or_else(|_| fallback_rpc_error_response())
}
pub trait Middleware<Context: Send + 'static>: Clone + Send + Sync + 'static {
type Metadata: DeserializeOwned + Send + 'static;
#[allow(unused_variables)]
fn process_http_request(
&mut self,
context: &Context,
request: &mut Request,
) -> impl Future<Output = Result<(), Response>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn process_rpc_request(
&mut self,
context: &Context,
metadata: Self::Metadata,
request: &mut RpcRequest,
) -> impl Future<Output = Result<(), RpcResponse>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
fn process_rpc_response(
&mut self,
context: &Context,
response: &mut RpcResponse,
) -> impl Future<Output = ()> + Send {
async { () }
}
#[allow(unused_variables)]
fn process_http_response(
&mut self,
context: &Context,
response: &mut Response,
) -> impl Future<Output = ()> + Send {
async { () }
}
}
#[allow(private_bounds)]
trait _Middleware<Context>: Send + Sync {
fn dyn_clone(&self) -> DynMiddleware<Context>;
fn process_http_request<'a>(
&'a mut self,
context: &'a Context,
request: &'a mut Request,
) -> BoxFuture<'a, Result<(), Response>>;
fn process_rpc_request<'a>(
&'a mut self,
context: &'a Context,
metadata: Value,
request: &'a mut RpcRequest,
) -> BoxFuture<'a, Result<(), RpcResponse>>;
fn process_rpc_response<'a>(
&'a mut self,
context: &'a Context,
response: &'a mut RpcResponse,
) -> BoxFuture<'a, ()>;
fn process_http_response<'a>(
&'a mut self,
context: &'a Context,
response: &'a mut Response,
) -> BoxFuture<'a, ()>;
}
impl<Context: Send + 'static, T: Middleware<Context> + Send + Sync> _Middleware<Context> for T {
fn dyn_clone(&self) -> DynMiddleware<Context> {
DynMiddleware(Box::new(<Self as Clone>::clone(&self)))
}
fn process_http_request<'a>(
&'a mut self,
context: &'a Context,
request: &'a mut Request,
) -> BoxFuture<'a, Result<(), Response>> {
<Self as Middleware<Context>>::process_http_request(self, context, request).boxed()
}
fn process_rpc_request<'a>(
&'a mut self,
context: &'a Context,
metadata: Value,
request: &'a mut RpcRequest,
) -> BoxFuture<'a, Result<(), RpcResponse>> {
<Self as Middleware<Context>>::process_rpc_request(
self,
context,
match imbl_value::from_value(metadata) {
Ok(a) => a,
Err(e) => return async { Err(internal_error(e).into()) }.boxed(),
},
request,
)
.boxed()
}
fn process_rpc_response<'a>(
&'a mut self,
context: &'a Context,
response: &'a mut RpcResponse,
) -> BoxFuture<'a, ()> {
<Self as Middleware<Context>>::process_rpc_response(self, context, response).boxed()
}
fn process_http_response<'a>(
&'a mut self,
context: &'a Context,
response: &'a mut Response,
) -> BoxFuture<'a, ()> {
<Self as Middleware<Context>>::process_http_response(self, context, response).boxed()
}
}
struct DynMiddleware<Context>(Box<dyn _Middleware<Context>>);
impl<Context> Clone for DynMiddleware<Context> {
fn clone(&self) -> Self {
self.0.dyn_clone()
}
}
pub struct HttpServer<Context: crate::Context> {
inner: Server<Context>,
middleware: Vector<DynMiddleware<Context>>,
}
impl<Context: crate::Context> Clone for HttpServer<Context> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
middleware: self.middleware.clone(),
}
}
}
impl<Context: crate::Context> Server<Context> {
pub fn for_http(self) -> HttpServer<Context> {
HttpServer {
inner: self,
middleware: Vector::new(),
}
}
pub fn middleware<T: Middleware<Context>>(self, middleware: T) -> HttpServer<Context> {
self.for_http().middleware(middleware)
}
}
impl<Context: crate::Context> HttpServer<Context> {
pub fn middleware<T: Middleware<Context>>(mut self, middleware: T) -> Self {
self.middleware
.push_back(DynMiddleware(Box::new(middleware)));
self
}
async fn process_http_request(&self, mut req: Request) -> Response {
let mut mid = self.middleware.clone();
match async {
let ctx = (self.inner.make_ctx)().await?;
for middleware in mid.iter_mut().rev() {
if let Err(e) = middleware.0.process_http_request(&ctx, &mut req).await {
return Ok::<_, RpcError>(e);
}
}
let (_, body) = req.into_parts();
match serde_json::from_slice::<SingleOrBatchRpcRequest>(
&*body.collect().await.map_err(internal_error)?.to_bytes(),
)
.map_err(parse_error)?
{
SingleOrBatchRpcRequest::Single(rpc_req) => {
let mut res = json_http_response(
&self.process_rpc_request(&ctx, &mut mid, rpc_req).await,
);
for middleware in mid.iter_mut() {
middleware.0.process_http_response(&ctx, &mut res).await;
}
Ok(res)
}
SingleOrBatchRpcRequest::Batch(rpc_reqs) => {
let (mids, rpc_res): (Vec<_>, Vec<_>) =
join_all(rpc_reqs.into_iter().map(|rpc_req| async {
let mut mid = mid.clone();
let res = self.process_rpc_request(&ctx, &mut mid, rpc_req).await;
(mid, res)
}))
.await
.into_iter()
.unzip();
let mut res = json_http_response(&rpc_res);
for mut mid in mids.into_iter().fold(
vec![Vec::with_capacity(rpc_res.len()); mid.len()],
|mut acc, x| {
for (idx, middleware) in x.into_iter().enumerate() {
acc[idx].push(middleware);
}
acc
},
) {
for middleware in mid.iter_mut() {
middleware.0.process_http_response(&ctx, &mut res).await;
}
}
Ok(res)
}
}
}
.await
{
Ok(a) => a,
Err(e) => json_http_response(&RpcResponse {
id: None,
result: Err(e),
}),
}
}
async fn process_rpc_request(
&self,
ctx: &Context,
mid: &mut Vector<DynMiddleware<Context>>,
mut req: RpcRequest,
) -> RpcResponse {
let metadata = Value::Object(
self.inner
.root_handler
.metadata(
match self
.inner
.root_handler
.method_from_dots(req.method.as_str(), TypeId::of::<Context>())
{
Some(a) => a,
None => {
return RpcResponse {
id: req.id,
result: Err(yajrc::METHOD_NOT_FOUND_ERROR),
}
}
},
TypeId::of::<Context>(),
)
.into_iter()
.map(|(key, value)| (key.into(), value))
.collect(),
);
let mut res = async {
for middleware in mid.iter_mut().rev() {
if let Err(res) = middleware
.0
.process_rpc_request(ctx, metadata.clone(), &mut req)
.await
{
return res;
}
}
self.inner.handle_single_request(req).await
}
.await;
for middleware in mid.iter_mut() {
middleware.0.process_rpc_response(ctx, &mut res).await;
}
res
}
pub fn handle(&self, req: Request) -> BoxFuture<'static, Response> {
let server = self.clone();
async move { server.process_http_request(req).await }.boxed()
}
}
impl<Context: crate::Context> Handler<(), ()> for HttpServer<Context> {
type Future = BoxFuture<'static, Response>;
fn call(self, req: Request, _: ()) -> Self::Future {
self.handle(req)
}
}

133
src/server/mod.rs Normal file
View File

@@ -0,0 +1,133 @@
use std::any::TypeId;
use std::collections::VecDeque;
use std::sync::Arc;
use futures::future::{join_all, BoxFuture};
use futures::{Future, FutureExt, Stream, StreamExt};
use imbl_value::{InOMap, Value};
use yajrc::{RpcError, RpcMethod};
use crate::util::{invalid_request, JobRunner};
use crate::{AnyHandler, Empty, HandleAny, HandleAnyArgs, IntoContext, ParentHandler};
pub type GenericRpcMethod = yajrc::GenericRpcMethod<String, Value, Value>;
pub type RpcRequest = yajrc::RpcRequest<GenericRpcMethod>;
pub type RpcResponse = yajrc::RpcResponse<GenericRpcMethod>;
pub type SingleOrBatchRpcRequest = yajrc::SingleOrBatchRpcRequest<GenericRpcMethod>;
pub mod http;
pub mod socket;
pub use http::*;
pub use socket::*;
pub struct Server<Context: crate::Context> {
make_ctx: Arc<dyn Fn() -> BoxFuture<'static, Result<Context, RpcError>> + Send + Sync>,
root_handler: Arc<AnyHandler<ParentHandler>>,
}
impl<Context: crate::Context> Clone for Server<Context> {
fn clone(&self) -> Self {
Self {
make_ctx: self.make_ctx.clone(),
root_handler: self.root_handler.clone(),
}
}
}
impl<Context: crate::Context> Server<Context> {
pub fn new<
MakeCtx: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Context, RpcError>> + Send + 'static,
>(
make_ctx: MakeCtx,
root_handler: ParentHandler,
) -> Self {
Server {
make_ctx: Arc::new(move || make_ctx().boxed()),
root_handler: Arc::new(AnyHandler::new(root_handler)),
}
}
pub fn handle_command(
&self,
method: &str,
params: Value,
) -> impl Future<Output = Result<Value, RpcError>> + Send + 'static {
let (make_ctx, root_handler, method) = (
self.make_ctx.clone(),
self.root_handler.clone(),
self.root_handler
.method_from_dots(method, TypeId::of::<Context>()),
);
async move {
root_handler
.handle_async(HandleAnyArgs {
context: make_ctx().await?.upcast(),
parent_method: VecDeque::new(),
method: method.ok_or_else(|| yajrc::METHOD_NOT_FOUND_ERROR)?,
params,
inherited: crate::Empty {},
})
.await
}
}
fn handle_single_request(
&self,
RpcRequest { id, method, params }: RpcRequest,
) -> impl Future<Output = RpcResponse> + Send + 'static {
let handle = (|| Ok::<_, RpcError>(self.handle_command(method.as_str(), params)))();
async move {
RpcResponse {
id,
result: match handle {
Ok(handle) => handle.await,
Err(e) => Err(e),
},
}
}
}
pub fn handle(
&self,
request: Result<Value, RpcError>,
) -> BoxFuture<'static, Result<Value, imbl_value::Error>> {
match request.and_then(|request| {
imbl_value::from_value::<SingleOrBatchRpcRequest>(request).map_err(invalid_request)
}) {
Ok(SingleOrBatchRpcRequest::Single(req)) => {
let fut = self.handle_single_request(req);
async { imbl_value::to_value(&fut.await) }.boxed()
}
Ok(SingleOrBatchRpcRequest::Batch(reqs)) => {
let futs: Vec<_> = reqs
.into_iter()
.map(|req| self.handle_single_request(req))
.collect();
async { imbl_value::to_value(&join_all(futs).await) }.boxed()
}
Err(e) => async {
imbl_value::to_value(&RpcResponse {
id: None,
result: Err(e),
})
}
.boxed(),
}
}
pub fn stream<'a>(
&'a self,
requests: impl Stream<Item = Result<Value, RpcError>> + Send + 'a,
) -> impl Stream<Item = Result<Value, imbl_value::Error>> + 'a {
async_stream::try_stream! {
let mut runner = JobRunner::new();
let requests = requests.fuse().map(|req| self.handle(req));
tokio::pin!(requests);
while let Some(res) = runner.next_result(&mut requests).await.transpose()? {
yield res;
}
}
}
}

95
src/server/socket.rs Normal file
View File

@@ -0,0 +1,95 @@
use std::path::Path;
use std::sync::Arc;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use imbl_value::Value;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, ToSocketAddrs, UnixListener};
use tokio::sync::Notify;
use yajrc::RpcError;
use crate::util::{parse_error, JobRunner, StreamUntil};
use crate::Server;
#[derive(Clone)]
pub struct ShutdownHandle(Arc<Notify>);
impl ShutdownHandle {
pub fn shutdown(self) {
self.0.notify_one();
}
}
impl<Context: crate::Context> Server<Context> {
pub fn run_socket<'a, T: AsyncRead + AsyncWrite + Send>(
&'a self,
listener: impl Stream<Item = std::io::Result<T>> + 'a,
error_handler: impl Fn(std::io::Error) + Sync + 'a,
) -> (ShutdownHandle, impl Future<Output = ()> + 'a) {
let shutdown = Arc::new(Notify::new());
(ShutdownHandle(shutdown.clone()), async move {
let mut runner = JobRunner::<std::io::Result<()>>::new();
let jobs = StreamUntil::new(listener, shutdown.notified()).map(|pipe| async {
let pipe = pipe?;
let (r, mut w) = tokio::io::split(pipe);
let stream = self.stream(
tokio_stream::wrappers::LinesStream::new(BufReader::new(r).lines())
.map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})
.try_filter_map(|a| async move {
Ok(if a.is_empty() {
None
} else {
Some(serde_json::from_str::<Value>(&a).map_err(parse_error)?)
})
}),
);
tokio::pin!(stream);
while let Some(res) = stream.next().await {
if let Err(e) = async {
let mut buf = serde_json::to_vec(
&res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
buf.push(b'\n');
w.write_all(&buf).await
}
.await
{
error_handler(e)
}
}
Ok(())
});
tokio::pin!(jobs);
while let Some(res) = runner.next_result(&mut jobs).await {
if let Err(e) = res {
error_handler(e)
}
}
})
}
pub fn run_unix<'a>(
&'a self,
path: impl AsRef<Path> + 'a,
error_handler: impl Fn(std::io::Error) + Sync + 'a,
) -> std::io::Result<(ShutdownHandle, impl Future<Output = ()> + 'a)> {
let listener = UnixListener::bind(path)?;
Ok(self.run_socket(
tokio_stream::wrappers::UnixListenerStream::new(listener),
error_handler,
))
}
pub async fn run_tcp<'a>(
&'a self,
addr: impl ToSocketAddrs + 'a,
error_handler: impl Fn(std::io::Error) + Sync + 'a,
) -> std::io::Result<(ShutdownHandle, impl Future<Output = ()> + 'a)> {
let listener = TcpListener::bind(addr).await?;
Ok(self.run_socket(
tokio_stream::wrappers::TcpListenerStream::new(listener),
error_handler,
))
}
}

296
src/util.rs Normal file
View File

@@ -0,0 +1,296 @@
use std::fmt::{Debug, Display};
use futures::future::{BoxFuture, FusedFuture};
use futures::stream::FusedStream;
use futures::{Future, FutureExt, Stream, StreamExt};
use imbl_value::Value;
use serde::de::DeserializeOwned;
use serde::ser::Error;
use serde::{Deserialize, Serialize};
use yajrc::RpcError;
pub fn extract<T: DeserializeOwned>(value: &Value) -> Result<T, RpcError> {
imbl_value::from_value(value.clone()).map_err(invalid_params)
}
pub fn without<T: Serialize>(value: Value, remove: &T) -> Result<Value, imbl_value::Error> {
let to_remove = imbl_value::to_value(remove)?;
let (Value::Object(mut value), Value::Object(to_remove)) = (value, to_remove) else {
return Err(imbl_value::Error {
kind: imbl_value::ErrorKind::Serialization,
source: serde_json::Error::custom("params must be object"),
});
};
for k in to_remove.keys() {
value.remove(k);
}
Ok(Value::Object(value))
}
pub fn combine(v1: Value, v2: Value) -> Result<Value, imbl_value::Error> {
let (Value::Object(mut v1), Value::Object(v2)) = (v1, v2) else {
return Err(imbl_value::Error {
kind: imbl_value::ErrorKind::Serialization,
source: serde_json::Error::custom("params must be object"),
});
};
for (key, value) in v2 {
if v1.insert(key.clone(), value).is_some() {
return Err(imbl_value::Error {
kind: imbl_value::ErrorKind::Serialization,
source: serde_json::Error::custom(lazy_format::lazy_format!(
"duplicate key: {key}"
)),
});
}
}
Ok(Value::Object(v1))
}
pub fn invalid_params(e: imbl_value::Error) -> RpcError {
RpcError {
data: Some(e.to_string().into()),
..yajrc::INVALID_PARAMS_ERROR
}
}
pub fn invalid_request(e: imbl_value::Error) -> RpcError {
RpcError {
data: Some(e.to_string().into()),
..yajrc::INVALID_REQUEST_ERROR
}
}
pub fn parse_error(e: impl Display) -> RpcError {
RpcError {
data: Some(e.to_string().into()),
..yajrc::PARSE_ERROR
}
}
pub fn internal_error(e: impl Display) -> RpcError {
RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
}
}
pub struct Flat<A, B>(pub A, pub B);
impl<'de, A, B> Deserialize<'de> for Flat<A, B>
where
A: DeserializeOwned,
B: DeserializeOwned,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let v = Value::deserialize(deserializer)?;
let a = imbl_value::from_value(v.clone()).map_err(serde::de::Error::custom)?;
let b = imbl_value::from_value(v).map_err(serde::de::Error::custom)?;
Ok(Flat(a, b))
}
}
impl<A, B> Serialize for Flat<A, B>
where
A: Serialize,
B: Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(serde::Serialize)]
struct FlatStruct<'a, A, B> {
#[serde(flatten)]
a: &'a A,
#[serde(flatten)]
b: &'a B,
}
FlatStruct {
a: &self.0,
b: &self.1,
}
.serialize(serializer)
}
}
impl<A, B> clap::CommandFactory for Flat<A, B>
where
A: clap::CommandFactory,
B: clap::Args,
{
fn command() -> clap::Command {
B::augment_args(A::command())
}
fn command_for_update() -> clap::Command {
B::augment_args_for_update(A::command_for_update())
}
}
impl<A, B> clap::FromArgMatches for Flat<A, B>
where
A: clap::FromArgMatches,
B: clap::FromArgMatches,
{
fn from_arg_matches(matches: &clap::ArgMatches) -> Result<Self, clap::Error> {
Ok(Self(
A::from_arg_matches(matches)?,
B::from_arg_matches(matches)?,
))
}
fn from_arg_matches_mut(matches: &mut clap::ArgMatches) -> Result<Self, clap::Error> {
Ok(Self(
A::from_arg_matches_mut(matches)?,
B::from_arg_matches_mut(matches)?,
))
}
fn update_from_arg_matches(&mut self, matches: &clap::ArgMatches) -> Result<(), clap::Error> {
self.0.update_from_arg_matches(matches)?;
self.1.update_from_arg_matches(matches)?;
Ok(())
}
fn update_from_arg_matches_mut(
&mut self,
matches: &mut clap::ArgMatches,
) -> Result<(), clap::Error> {
self.0.update_from_arg_matches_mut(matches)?;
self.1.update_from_arg_matches_mut(matches)?;
Ok(())
}
}
pub fn poll_select_all<'a, T>(
futs: &mut Vec<BoxFuture<'a, T>>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<T> {
let item = futs
.iter_mut()
.enumerate()
.find_map(|(i, f)| match f.poll_unpin(cx) {
std::task::Poll::Pending => None,
std::task::Poll::Ready(e) => Some((i, e)),
});
match item {
Some((idx, res)) => {
drop(futs.swap_remove(idx));
std::task::Poll::Ready(res)
}
None => std::task::Poll::Pending,
}
}
pub struct JobRunner<'a, T> {
closed: bool,
running: Vec<BoxFuture<'a, T>>,
}
impl<'a, T> JobRunner<'a, T> {
pub fn new() -> Self {
JobRunner {
closed: false,
running: Vec::new(),
}
}
pub async fn next_result<
Src: Stream<Item = Fut> + Unpin,
Fut: Future<Output = T> + Send + 'a,
>(
&mut self,
job_source: &mut Src,
) -> Option<T> {
loop {
tokio::select! {
job = job_source.next() => {
if let Some(job) = job {
self.running.push(job.boxed());
} else {
self.closed = true;
if self.running.is_empty() {
return None;
}
}
}
res = self.next() => {
return res;
}
}
}
}
}
impl<'a, T> Stream for JobRunner<'a, T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match poll_select_all(&mut self.running, cx) {
std::task::Poll::Pending if self.closed && self.running.is_empty() => {
std::task::Poll::Ready(None)
}
a => a.map(Some),
}
}
}
#[pin_project::pin_project]
pub struct StreamUntil<S, F> {
#[pin]
stream: S,
#[pin]
until: F,
done: bool,
}
impl<S, F> StreamUntil<S, F> {
pub fn new(stream: S, until: F) -> Self {
Self {
stream,
until,
done: false,
}
}
}
impl<S, F> Stream for StreamUntil<S, F>
where
S: Stream,
F: Future,
{
type Item = S::Item;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
*this.done = *this.done || this.until.poll(cx).is_ready();
if *this.done {
std::task::Poll::Ready(None)
} else {
this.stream.poll_next(cx)
}
}
}
impl<S, F> FusedStream for StreamUntil<S, F>
where
S: FusedStream,
F: FusedFuture,
{
fn is_terminated(&self) -> bool {
self.done || self.stream.is_terminated() || self.until.is_terminated()
}
}
pub struct PhantomData<T>(std::marker::PhantomData<T>);
impl<T> PhantomData<T> {
pub fn new() -> Self {
PhantomData(std::marker::PhantomData)
}
}
impl<T> Clone for PhantomData<T> {
fn clone(&self) -> Self {
PhantomData::new()
}
}
impl<T> Debug for PhantomData<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
unsafe impl<T> Send for PhantomData<T> {}
unsafe impl<T> Sync for PhantomData<T> {}