add middleware and metadata

This commit is contained in:
Aiden McClelland
2023-12-28 17:26:46 -07:00
parent 434d521c74
commit 496491a2f9
4 changed files with 431 additions and 55 deletions

View File

@@ -1,4 +1,5 @@
use std::any::TypeId; use std::any::TypeId;
use std::collections::VecDeque;
use std::ffi::OsString; use std::ffi::OsString;
use std::marker::PhantomData; use std::marker::PhantomData;
@@ -63,14 +64,14 @@ impl<Context: crate::Context + Clone, Config: CommandFactory + FromArgMatches>
let (method, params) = root_handler.cli_parse(&matches, ctx_ty)?; let (method, params) = root_handler.cli_parse(&matches, ctx_ty)?;
let res = root_handler.handle_sync(HandleAnyArgs { let res = root_handler.handle_sync(HandleAnyArgs {
context: ctx.clone().upcast(), context: ctx.clone().upcast(),
parent_method: Vec::new(), parent_method: VecDeque::new(),
method: method.clone(), method: method.clone(),
params: params.clone(), params: params.clone(),
})?; })?;
root_handler.cli_display( root_handler.cli_display(
HandleAnyArgs { HandleAnyArgs {
context: ctx.upcast(), context: ctx.upcast(),
parent_method: Vec::new(), parent_method: VecDeque::new(),
method, method,
params, params,
}, },

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use clap::{ArgMatches, Command, CommandFactory, FromArgMatches, Parser}; use clap::{ArgMatches, Command, CommandFactory, FromArgMatches, Parser};
use futures::Future; use futures::Future;
use imbl_value::imbl::OrdMap;
use imbl_value::Value; use imbl_value::Value;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -18,7 +19,7 @@ use crate::{CallRemote, CallRemoteHandler};
pub(crate) struct HandleAnyArgs { pub(crate) struct HandleAnyArgs {
pub(crate) context: AnyContext, pub(crate) context: AnyContext,
pub(crate) parent_method: Vec<&'static str>, pub(crate) parent_method: VecDeque<&'static str>,
pub(crate) method: VecDeque<&'static str>, pub(crate) method: VecDeque<&'static str>,
pub(crate) params: Value, pub(crate) params: Value,
} }
@@ -53,6 +54,11 @@ impl HandleAnyArgs {
pub(crate) trait HandleAny: std::fmt::Debug + Send + Sync { pub(crate) trait HandleAny: std::fmt::Debug + Send + Sync {
fn handle_sync(&self, handle_args: HandleAnyArgs) -> Result<Value, RpcError>; fn handle_sync(&self, handle_args: HandleAnyArgs) -> Result<Value, RpcError>;
async fn handle_async(&self, handle_args: HandleAnyArgs) -> Result<Value, RpcError>; async fn handle_async(&self, handle_args: HandleAnyArgs) -> Result<Value, RpcError>;
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value>;
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>>; fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@@ -63,6 +69,13 @@ impl<T: HandleAny> HandleAny for Arc<T> {
async fn handle_async(&self, handle_args: HandleAnyArgs) -> Result<Value, RpcError> { async fn handle_async(&self, handle_args: HandleAnyArgs) -> Result<Value, RpcError> {
self.deref().handle_async(handle_args).await self.deref().handle_async(handle_args).await
} }
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.deref().metadata(method, ctx_ty)
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> { fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.deref().method_from_dots(method, ctx_ty) self.deref().method_from_dots(method, ctx_ty)
} }
@@ -122,6 +135,16 @@ impl HandleAny for DynHandler {
DynHandler::WithCli(h) => h.handle_async(handle_args).await, DynHandler::WithCli(h) => h.handle_async(handle_args).await,
} }
} }
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
match self {
DynHandler::WithoutCli(h) => h.metadata(method, ctx_ty),
DynHandler::WithCli(h) => h.metadata(method, ctx_ty),
}
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> { fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
match self { match self {
DynHandler::WithoutCli(h) => h.method_from_dots(method, ctx_ty), DynHandler::WithoutCli(h) => h.method_from_dots(method, ctx_ty),
@@ -133,7 +156,7 @@ impl HandleAny for DynHandler {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HandleArgs<Context: IntoContext, H: HandlerTypes + ?Sized> { pub struct HandleArgs<Context: IntoContext, H: HandlerTypes + ?Sized> {
pub context: Context, pub context: Context,
pub parent_method: Vec<&'static str>, pub parent_method: VecDeque<&'static str>,
pub method: VecDeque<&'static str>, pub method: VecDeque<&'static str>,
pub params: H::Params, pub params: H::Params,
pub inherited_params: H::InheritedParams, pub inherited_params: H::InheritedParams,
@@ -179,6 +202,14 @@ pub trait Handler<Context: IntoContext>:
.await .await
.unwrap() .unwrap()
} }
#[allow(unused_variables)]
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
OrdMap::new()
}
fn contexts(&self) -> Option<BTreeSet<TypeId>> { fn contexts(&self) -> Option<BTreeSet<TypeId>> {
Context::type_ids_for(self) Context::type_ids_for(self)
} }
@@ -235,6 +266,13 @@ where
) )
.map_err(internal_error) .map_err(internal_error)
} }
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.handler.metadata(method, ctx_ty)
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> { fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.handler.method_from_dots(method, ctx_ty) self.handler.method_from_dots(method, ctx_ty)
} }
@@ -318,20 +356,27 @@ impl SubcommandMap {
pub struct ParentHandler<Params = NoParams, InheritedParams = NoParams> { pub struct ParentHandler<Params = NoParams, InheritedParams = NoParams> {
_phantom: PhantomData<(Params, InheritedParams)>, _phantom: PhantomData<(Params, InheritedParams)>,
pub(crate) subcommands: SubcommandMap, pub(crate) subcommands: SubcommandMap,
metadata: OrdMap<&'static str, Value>,
} }
impl<Params, InheritedParams> ParentHandler<Params, InheritedParams> { impl<Params, InheritedParams> ParentHandler<Params, InheritedParams> {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
_phantom: PhantomData, _phantom: PhantomData,
subcommands: SubcommandMap(BTreeMap::new()), subcommands: SubcommandMap(BTreeMap::new()),
metadata: OrdMap::new(),
} }
} }
pub fn with_metadata(mut self, key: &'static str, value: Value) -> Self {
self.metadata.insert(key, value);
self
}
} }
impl<Params, InheritedParams> Clone for ParentHandler<Params, InheritedParams> { impl<Params, InheritedParams> Clone for ParentHandler<Params, InheritedParams> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
_phantom: PhantomData, _phantom: PhantomData,
subcommands: self.subcommands.clone(), subcommands: self.subcommands.clone(),
metadata: self.metadata.clone(),
} }
} }
} }
@@ -431,6 +476,19 @@ where
raw_params, raw_params,
}) })
} }
fn metadata(
&self,
method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.handler.metadata(method, ctx_ty)
}
fn contexts(&self) -> Option<BTreeSet<TypeId>> {
self.handler.contexts()
}
fn method_from_dots(&self, method: &str, ctx_ty: TypeId) -> Option<VecDeque<&'static str>> {
self.handler.method_from_dots(method, ctx_ty)
}
} }
impl<Context, Params, InheritedParams, H, F> CliBindings<Context> impl<Context, Params, InheritedParams, H, F> CliBindings<Context>
@@ -742,7 +800,7 @@ where
) -> Result<Self::Ok, Self::Err> { ) -> Result<Self::Ok, Self::Err> {
let cmd = method.pop_front(); let cmd = method.pop_front();
if let Some(cmd) = cmd { if let Some(cmd) = cmd {
parent_method.push(cmd); parent_method.push_back(cmd);
} }
if let Some((_, sub_handler)) = &self.subcommands.get(context.inner_type_id(), cmd) { if let Some((_, sub_handler)) = &self.subcommands.get(context.inner_type_id(), cmd) {
sub_handler.handle_sync(HandleAnyArgs { sub_handler.handle_sync(HandleAnyArgs {
@@ -767,7 +825,7 @@ where
) -> Result<Self::Ok, Self::Err> { ) -> Result<Self::Ok, Self::Err> {
let cmd = method.pop_front(); let cmd = method.pop_front();
if let Some(cmd) = cmd { if let Some(cmd) = cmd {
parent_method.push(cmd); parent_method.push_back(cmd);
} }
if let Some((_, sub_handler)) = self.subcommands.get(context.inner_type_id(), cmd) { if let Some((_, sub_handler)) = self.subcommands.get(context.inner_type_id(), cmd) {
sub_handler sub_handler
@@ -782,6 +840,18 @@ where
Err(yajrc::METHOD_NOT_FOUND_ERROR) Err(yajrc::METHOD_NOT_FOUND_ERROR)
} }
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
let mut metadata = self.metadata.clone();
if let Some((_, handler)) = self.subcommands.get(ctx_ty, method.pop_front()) {
handler.metadata(method, ctx_ty).union(metadata)
} else {
metadata
}
}
fn contexts(&self) -> Option<BTreeSet<TypeId>> { fn contexts(&self) -> Option<BTreeSet<TypeId>> {
let mut set = BTreeSet::new(); let mut set = BTreeSet::new();
for ctx_ty in self.subcommands.0.values().flat_map(|c| c.keys()) { for ctx_ty in self.subcommands.0.values().flat_map(|c| c.keys()) {
@@ -872,7 +942,7 @@ where
) -> Result<(), Self::Err> { ) -> Result<(), Self::Err> {
let cmd = method.pop_front(); let cmd = method.pop_front();
if let Some(cmd) = cmd { if let Some(cmd) = cmd {
parent_method.push(cmd); parent_method.push_back(cmd);
} }
if let Some((_, DynHandler::WithCli(sub_handler))) = if let Some((_, DynHandler::WithCli(sub_handler))) =
self.subcommands.get(context.inner_type_id(), cmd) self.subcommands.get(context.inner_type_id(), cmd)
@@ -896,6 +966,13 @@ pub struct FromFn<F, T, E, Args> {
_phantom: PhantomData<(T, E, Args)>, _phantom: PhantomData<(T, E, Args)>,
function: F, function: F,
blocking: bool, blocking: bool,
metadata: OrdMap<&'static str, Value>,
}
impl<F, T, E, Args> FromFn<F, T, E, Args> {
pub fn with_metadata(mut self, key: &'static str, value: Value) -> Self {
self.metadata.insert(key, value);
self
}
} }
impl<F: Clone, T, E, Args> Clone for FromFn<F, T, E, Args> { impl<F: Clone, T, E, Args> Clone for FromFn<F, T, E, Args> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
@@ -903,6 +980,7 @@ impl<F: Clone, T, E, Args> Clone for FromFn<F, T, E, Args> {
_phantom: PhantomData, _phantom: PhantomData,
function: self.function.clone(), function: self.function.clone(),
blocking: self.blocking, blocking: self.blocking,
metadata: self.metadata.clone(),
} }
} }
} }
@@ -929,6 +1007,7 @@ pub fn from_fn<F, T, E, Args>(function: F) -> FromFn<F, T, E, Args> {
function, function,
_phantom: PhantomData, _phantom: PhantomData,
blocking: false, blocking: false,
metadata: OrdMap::new(),
} }
} }
@@ -937,18 +1016,21 @@ pub fn from_fn_blocking<F, T, E, Args>(function: F) -> FromFn<F, T, E, Args> {
function, function,
_phantom: PhantomData, _phantom: PhantomData,
blocking: true, blocking: true,
metadata: OrdMap::new(),
} }
} }
pub struct FromFnAsync<F, Fut, T, E, Args> { pub struct FromFnAsync<F, Fut, T, E, Args> {
_phantom: PhantomData<(Fut, T, E, Args)>, _phantom: PhantomData<(Fut, T, E, Args)>,
function: F, function: F,
metadata: OrdMap<&'static str, Value>,
} }
impl<F: Clone, Fut, T, E, Args> Clone for FromFnAsync<F, Fut, T, E, Args> { impl<F: Clone, Fut, T, E, Args> Clone for FromFnAsync<F, Fut, T, E, Args> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
_phantom: PhantomData, _phantom: PhantomData,
function: self.function.clone(), function: self.function.clone(),
metadata: self.metadata.clone(),
} }
} }
} }
@@ -972,6 +1054,7 @@ pub fn from_fn_async<F, Fut, T, E, Args>(function: F) -> FromFnAsync<F, Fut, T,
FromFnAsync { FromFnAsync {
function, function,
_phantom: PhantomData, _phantom: PhantomData,
metadata: OrdMap::new(),
} }
} }
@@ -1007,6 +1090,13 @@ where
self.handle_async_with_sync(handle_args).await self.handle_async_with_sync(handle_args).await
} }
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<F, Fut, T, E> HandlerTypes for FromFnAsync<F, Fut, T, E, ()> impl<F, Fut, T, E> HandlerTypes for FromFnAsync<F, Fut, T, E, ()>
where where
@@ -1032,6 +1122,13 @@ where
async fn handle_async(&self, _: HandleArgs<Context, Self>) -> Result<Self::Ok, Self::Err> { async fn handle_async(&self, _: HandleArgs<Context, Self>) -> Result<Self::Ok, Self::Err> {
(self.function)().await (self.function)().await
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<Context, F, T, E> HandlerTypes for FromFn<F, T, E, (Context,)> impl<Context, F, T, E> HandlerTypes for FromFn<F, T, E, (Context,)>
@@ -1067,6 +1164,13 @@ where
self.handle_async_with_sync(handle_args).await self.handle_async_with_sync(handle_args).await
} }
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<Context, F, Fut, T, E> HandlerTypes for FromFnAsync<F, Fut, T, E, (Context,)> impl<Context, F, Fut, T, E> HandlerTypes for FromFnAsync<F, Fut, T, E, (Context,)>
where where
@@ -1096,6 +1200,13 @@ where
) -> Result<Self::Ok, Self::Err> { ) -> Result<Self::Ok, Self::Err> {
(self.function)(handle_args.context).await (self.function)(handle_args.context).await
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<Context, F, T, E, Params> HandlerTypes for FromFn<F, T, E, (Context, Params)> impl<Context, F, T, E, Params> HandlerTypes for FromFn<F, T, E, (Context, Params)>
@@ -1136,6 +1247,13 @@ where
self.handle_async_with_sync(handle_args).await self.handle_async_with_sync(handle_args).await
} }
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<Context, F, Fut, T, E, Params> HandlerTypes for FromFnAsync<F, Fut, T, E, (Context, Params)> impl<Context, F, Fut, T, E, Params> HandlerTypes for FromFnAsync<F, Fut, T, E, (Context, Params)>
@@ -1172,6 +1290,13 @@ where
} = handle_args; } = handle_args;
(self.function)(context, params).await (self.function)(context, params).await
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<Context, F, T, E, Params, InheritedParams> HandlerTypes impl<Context, F, T, E, Params, InheritedParams> HandlerTypes
@@ -1219,6 +1344,13 @@ where
self.handle_async_with_sync(handle_args).await self.handle_async_with_sync(handle_args).await
} }
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<Context, F, Fut, T, E, Params, InheritedParams> HandlerTypes impl<Context, F, Fut, T, E, Params, InheritedParams> HandlerTypes
@@ -1261,6 +1393,13 @@ where
} = handle_args; } = handle_args;
(self.function)(context, params, inherited_params).await (self.function)(context, params, inherited_params).await
} }
fn metadata(
&self,
mut method: VecDeque<&'static str>,
ctx_ty: TypeId,
) -> OrdMap<&'static str, Value> {
self.metadata.clone()
}
} }
impl<Context, F, T, E, Args> CliBindings<Context> for FromFn<F, T, E, Args> impl<Context, F, T, E, Args> CliBindings<Context> for FromFn<F, T, E, Args>

View File

@@ -1,60 +1,287 @@
use std::any::TypeId;
use std::task::Context; use std::task::Context;
use futures::future::BoxFuture; use futures::future::{join_all, BoxFuture};
use futures::FutureExt;
use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
use http::request::Parts; use http::request::Parts;
use http_body_util::BodyExt;
use hyper::body::{Bytes, Incoming}; use hyper::body::{Bytes, Incoming};
use hyper::service::Service;
use hyper::{Request, Response}; use hyper::{Request, Response};
use yajrc::{RpcRequest, RpcResponse}; use imbl_value::Value;
use serde::de::DeserializeOwned;
use serde::Serialize;
use yajrc::{RpcError, RpcMethod};
use crate::server::{RpcRequest, RpcResponse, SingleOrBatchRpcRequest};
use crate::util::{internal_error, parse_error};
use crate::{handler, HandleAny, Server};
const FALLBACK_ERROR: &str = "{\"error\":{\"code\":-32603,\"message\":\"Internal error\",\"data\":\"Failed to serialize rpc response\"}}";
pub fn fallback_rpc_error_response() -> Response<Bytes> {
Response::builder()
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, FALLBACK_ERROR.len())
.body(Bytes::from_static(FALLBACK_ERROR.as_bytes()))
.unwrap()
}
pub fn json_http_response<T: Serialize>(t: &T) -> Response<Bytes> {
let body = match serde_json::to_vec(t) {
Ok(a) => a,
Err(_) => return fallback_rpc_error_response(),
};
Response::builder()
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, body.len())
.body(Bytes::from(body))
.unwrap_or_else(|_| fallback_rpc_error_response())
}
type BoxBody = http_body_util::combinators::BoxBody<Bytes, hyper::Error>; type BoxBody = http_body_util::combinators::BoxBody<Bytes, hyper::Error>;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Middleware<Context: crate::Context> { pub trait Middleware<Context: Send + 'static>: Clone + Send + Sync + 'static {
type ProcessHttpRequestResult; type Metadata: DeserializeOwned + Send + 'static;
#[allow(unused_variables)]
async fn process_http_request( async fn process_http_request(
&self, &mut self,
req: &mut Request<BoxBody>, context: &Context,
) -> Result<Self::ProcessHttpRequestResult, hyper::Result<Response<Bytes>>>; request: &mut Request<BoxBody>,
type ProcessRpcRequestResult; ) -> Result<(), Response<Bytes>> {
Ok(())
}
#[allow(unused_variables)]
async fn process_rpc_request( async fn process_rpc_request(
&self, &mut self,
prev: Self::ProcessHttpRequestResult, metadata: Self::Metadata,
// metadata: &Context::Metadata, request: &mut RpcRequest,
req: &mut RpcRequest, ) -> Result<(), RpcResponse> {
) -> Result<Self::ProcessRpcRequestResult, RpcResponse>; Ok(())
type ProcessRpcResponseResult; }
async fn process_rpc_response( #[allow(unused_variables)]
&self, async fn process_rpc_response(&mut self, response: &mut RpcResponse) {}
prev: Self::ProcessRpcRequestResult, #[allow(unused_variables)]
res: &mut RpcResponse, async fn process_http_response(&mut self, response: &mut Response<Bytes>) {}
) -> Self::ProcessRpcResponseResult;
async fn process_http_response(
&self,
prev: Self::ProcessRpcResponseResult,
res: &mut Response<Bytes>,
);
} }
// pub struct DynMiddleware<Context: crate::Context> { #[allow(private_bounds)]
// process_http_request: Box< trait _Middleware<Context>: Send + Sync {
// dyn for<'a> Fn( fn dyn_clone(&self) -> DynMiddleware<Context>;
// &'a mut Request<BoxBody>, fn process_http_request<'a>(
// ) -> BoxFuture< &'a mut self,
// 'a, context: &'a Context,
// Result<DynProcessRpcRequest<Context>, hyper::Result<Response<Bytes>>>, request: &'a mut Request<BoxBody>,
// > + Send ) -> BoxFuture<'a, Result<(), Response<Bytes>>>;
// + Sync, fn process_rpc_request<'a>(
// >, &'a mut self,
// } metadata: Value,
// type DynProcessRpcRequest<'m, Context: crate::Context> = Box< request: &'a mut RpcRequest,
// dyn for<'a> FnOnce( ) -> BoxFuture<'a, Result<(), RpcResponse>>;
// &'a Context::Metadata, fn process_rpc_response<'a>(&'a mut self, response: &'a mut RpcResponse) -> BoxFuture<'a, ()>;
// &'a mut RpcRequest, fn process_http_response<'a>(
// ) &'a mut self,
// -> BoxFuture<'a, Result<DynProcessRpcResponse<'m>, DynSkipHandler<'m>>> response: &'a mut Response<Bytes>,
// + Send ) -> BoxFuture<'a, ()>;
// + Sync }
// + 'm, impl<Context: Send + 'static, T: Middleware<Context> + Send + Sync> _Middleware<Context> for T {
// >; fn dyn_clone(&self) -> DynMiddleware<Context> {
// type DynProcessRpcResponse<'m> = DynMiddleware(Box::new(<Self as Clone>::clone(&self)))
// Box<dyn for<'a> FnOnce(&'a mut RpcResponse) -> BoxFuture<'a, DynProcessHttpResponse<'m>>>; }
fn process_http_request<'a>(
&'a mut self,
context: &'a Context,
request: &'a mut Request<BoxBody>,
) -> BoxFuture<'a, Result<(), Response<Bytes>>> {
<Self as Middleware<Context>>::process_http_request(self, context, request)
}
fn process_rpc_request<'a>(
&'a mut self,
metadata: Value,
request: &'a mut RpcRequest,
) -> BoxFuture<'a, Result<(), RpcResponse>> {
<Self as Middleware<Context>>::process_rpc_request(
self,
match imbl_value::from_value(metadata) {
Ok(a) => a,
Err(e) => return async { Err(internal_error(e).into()) }.boxed(),
},
request,
)
}
fn process_rpc_response<'a>(&'a mut self, response: &'a mut RpcResponse) -> BoxFuture<'a, ()> {
<Self as Middleware<Context>>::process_rpc_response(self, response)
}
fn process_http_response<'a>(
&'a mut self,
response: &'a mut Response<Bytes>,
) -> BoxFuture<'a, ()> {
<Self as Middleware<Context>>::process_http_response(self, response)
}
}
struct DynMiddleware<Context>(Box<dyn _Middleware<Context>>);
impl<Context> Clone for DynMiddleware<Context> {
fn clone(&self) -> Self {
self.0.dyn_clone()
}
}
pub struct HttpServer<Context: crate::Context> {
inner: Server<Context>,
middleware: Vec<DynMiddleware<Context>>,
}
impl<Context: crate::Context> Clone for HttpServer<Context> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
middleware: self.middleware.clone(),
}
}
}
impl<Context: crate::Context> Server<Context> {
pub fn for_http(self) -> HttpServer<Context> {
HttpServer {
inner: self,
middleware: Vec::new(),
}
}
pub fn middleware<T: Middleware<Context>>(self, middleware: T) -> HttpServer<Context> {
self.for_http().middleware(middleware)
}
}
impl<Context: crate::Context> HttpServer<Context> {
pub fn middleware<T: Middleware<Context>>(mut self, middleware: T) -> Self {
self.middleware.push(DynMiddleware(Box::new(middleware)));
self
}
async fn process_http_request(&self, mut req: Request<BoxBody>) -> Response<Bytes> {
let mut mid = self.middleware.clone();
match async {
let ctx = (self.inner.make_ctx)().await?;
for middleware in mid.iter_mut().rev() {
if let Err(e) = middleware.0.process_http_request(&ctx, &mut req).await {
return Ok::<_, RpcError>(e);
}
}
let (_, body) = req.into_parts();
match serde_json::from_slice::<SingleOrBatchRpcRequest>(
&*body.collect().await.map_err(internal_error)?.to_bytes(),
)
.map_err(parse_error)?
{
SingleOrBatchRpcRequest::Single(rpc_req) => {
let mut res =
json_http_response(&self.process_rpc_request(&mut mid, rpc_req).await);
for middleware in mid.iter_mut() {
middleware.0.process_http_response(&mut res).await;
}
Ok(res)
}
SingleOrBatchRpcRequest::Batch(rpc_reqs) => {
let (mids, rpc_res): (Vec<_>, Vec<_>) =
join_all(rpc_reqs.into_iter().map(|rpc_req| async {
let mut mid = mid.clone();
let res = self.process_rpc_request(&mut mid, rpc_req).await;
(mid, res)
}))
.await
.into_iter()
.unzip();
let mut res = json_http_response(&rpc_res);
for mut mid in mids.into_iter().fold(
vec![Vec::with_capacity(rpc_res.len()); mid.len()],
|mut acc, x| {
for (idx, middleware) in x.into_iter().enumerate() {
acc[idx].push(middleware);
}
acc
},
) {
for middleware in mid.iter_mut() {
middleware.0.process_http_response(&mut res).await;
}
}
Ok(res)
}
}
}
.await
{
Ok(a) => a,
Err(e) => json_http_response(&RpcResponse {
id: None,
result: Err(e),
}),
}
}
async fn process_rpc_request(
&self,
mid: &mut Vec<DynMiddleware<Context>>,
mut req: RpcRequest,
) -> RpcResponse {
let metadata = Value::Object(
self.inner
.root_handler
.metadata(
match self
.inner
.root_handler
.method_from_dots(req.method.as_str(), TypeId::of::<Context>())
{
Some(a) => a,
None => {
return RpcResponse {
id: req.id,
result: Err(yajrc::METHOD_NOT_FOUND_ERROR),
}
}
},
TypeId::of::<Context>(),
)
.into_iter()
.map(|(key, value)| (key.into(), value))
.collect(),
);
let mut res = async {
for middleware in mid.iter_mut().rev() {
if let Err(res) = middleware
.0
.process_rpc_request(metadata.clone(), &mut req)
.await
{
return res;
}
}
self.inner.handle_single_request(req).await
}
.await;
for middleware in mid.iter_mut() {
middleware.0.process_rpc_response(&mut res).await;
}
res
}
pub fn handle(&self, req: Request<Incoming>) -> BoxFuture<'static, Response<Bytes>> {
let server = self.clone();
async move {
server
.process_http_request(req.map(|b| BoxBody::new(b)))
.await
}
.boxed()
}
}
impl<Context: crate::Context> Service<Request<Incoming>> for HttpServer<Context> {
type Response = Response<Bytes>;
type Error = hyper::Error;
type Future = futures::future::Map<
BoxFuture<'static, Self::Response>,
fn(Self::Response) -> Result<Self::Response, Self::Error>,
>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
self.handle(req).map(Ok)
}
}

View File

@@ -1,4 +1,5 @@
use std::any::TypeId; use std::any::TypeId;
use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use futures::future::{join_all, BoxFuture}; use futures::future::{join_all, BoxFuture};
@@ -24,6 +25,14 @@ pub struct Server<Context: crate::Context> {
make_ctx: Arc<dyn Fn() -> BoxFuture<'static, Result<Context, RpcError>> + Send + Sync>, make_ctx: Arc<dyn Fn() -> BoxFuture<'static, Result<Context, RpcError>> + Send + Sync>,
root_handler: Arc<AnyHandler<Context, ParentHandler>>, root_handler: Arc<AnyHandler<Context, ParentHandler>>,
} }
impl<Context: crate::Context> Clone for Server<Context> {
fn clone(&self) -> Self {
Self {
make_ctx: self.make_ctx.clone(),
root_handler: self.root_handler.clone(),
}
}
}
impl<Context: crate::Context> Server<Context> { impl<Context: crate::Context> Server<Context> {
pub fn new< pub fn new<
MakeCtx: Fn() -> Fut + Send + Sync + 'static, MakeCtx: Fn() -> Fut + Send + Sync + 'static,
@@ -54,7 +63,7 @@ impl<Context: crate::Context> Server<Context> {
root_handler root_handler
.handle_async(HandleAnyArgs { .handle_async(HandleAnyArgs {
context: make_ctx().await?.upcast(), context: make_ctx().await?.upcast(),
parent_method: Vec::new(), parent_method: VecDeque::new(),
method: method.ok_or_else(|| yajrc::METHOD_NOT_FOUND_ERROR)?, method: method.ok_or_else(|| yajrc::METHOD_NOT_FOUND_ERROR)?,
params, params,
}) })