provide context to middleware in all stages

This commit is contained in:
Aiden McClelland
2024-01-03 13:24:10 -07:00
parent 4bcaf67a03
commit 408c3f683a

View File

@@ -54,15 +54,16 @@ pub trait Middleware<Context: Send + 'static>: Clone + Send + Sync + 'static {
#[allow(unused_variables)] #[allow(unused_variables)]
async fn process_rpc_request( async fn process_rpc_request(
&mut self, &mut self,
context: &Context,
metadata: Self::Metadata, metadata: Self::Metadata,
request: &mut RpcRequest, request: &mut RpcRequest,
) -> Result<(), RpcResponse> { ) -> Result<(), RpcResponse> {
Ok(()) Ok(())
} }
#[allow(unused_variables)] #[allow(unused_variables)]
async fn process_rpc_response(&mut self, response: &mut RpcResponse) {} async fn process_rpc_response(&mut self, context: &Context, response: &mut RpcResponse) {}
#[allow(unused_variables)] #[allow(unused_variables)]
async fn process_http_response(&mut self, response: &mut Response<Bytes>) {} async fn process_http_response(&mut self, context: &Context, response: &mut Response<Bytes>) {}
} }
#[allow(private_bounds)] #[allow(private_bounds)]
@@ -75,12 +76,19 @@ trait _Middleware<Context>: Send + Sync {
) -> BoxFuture<'a, Result<(), Response<Bytes>>>; ) -> BoxFuture<'a, Result<(), Response<Bytes>>>;
fn process_rpc_request<'a>( fn process_rpc_request<'a>(
&'a mut self, &'a mut self,
context: &'a Context,
metadata: Value, metadata: Value,
request: &'a mut RpcRequest, request: &'a mut RpcRequest,
) -> BoxFuture<'a, Result<(), RpcResponse>>; ) -> BoxFuture<'a, Result<(), RpcResponse>>;
fn process_rpc_response<'a>(&'a mut self, response: &'a mut RpcResponse) -> BoxFuture<'a, ()>; fn process_rpc_response<'a>(
&'a mut self,
context: &'a Context,
response: &'a mut RpcResponse,
) -> BoxFuture<'a, ()>;
fn process_http_response<'a>( fn process_http_response<'a>(
&'a mut self, &'a mut self,
context: &'a Context,
response: &'a mut Response<Bytes>, response: &'a mut Response<Bytes>,
) -> BoxFuture<'a, ()>; ) -> BoxFuture<'a, ()>;
} }
@@ -97,11 +105,13 @@ impl<Context: Send + 'static, T: Middleware<Context> + Send + Sync> _Middleware<
} }
fn process_rpc_request<'a>( fn process_rpc_request<'a>(
&'a mut self, &'a mut self,
context: &'a Context,
metadata: Value, metadata: Value,
request: &'a mut RpcRequest, request: &'a mut RpcRequest,
) -> BoxFuture<'a, Result<(), RpcResponse>> { ) -> BoxFuture<'a, Result<(), RpcResponse>> {
<Self as Middleware<Context>>::process_rpc_request( <Self as Middleware<Context>>::process_rpc_request(
self, self,
context,
match imbl_value::from_value(metadata) { match imbl_value::from_value(metadata) {
Ok(a) => a, Ok(a) => a,
Err(e) => return async { Err(internal_error(e).into()) }.boxed(), Err(e) => return async { Err(internal_error(e).into()) }.boxed(),
@@ -109,14 +119,19 @@ impl<Context: Send + 'static, T: Middleware<Context> + Send + Sync> _Middleware<
request, request,
) )
} }
fn process_rpc_response<'a>(&'a mut self, response: &'a mut RpcResponse) -> BoxFuture<'a, ()> { fn process_rpc_response<'a>(
<Self as Middleware<Context>>::process_rpc_response(self, response) &'a mut self,
context: &'a Context,
response: &'a mut RpcResponse,
) -> BoxFuture<'a, ()> {
<Self as Middleware<Context>>::process_rpc_response(self, context, response)
} }
fn process_http_response<'a>( fn process_http_response<'a>(
&'a mut self, &'a mut self,
context: &'a Context,
response: &'a mut Response<Bytes>, response: &'a mut Response<Bytes>,
) -> BoxFuture<'a, ()> { ) -> BoxFuture<'a, ()> {
<Self as Middleware<Context>>::process_http_response(self, response) <Self as Middleware<Context>>::process_http_response(self, context, response)
} }
} }
@@ -171,10 +186,11 @@ impl<Context: crate::Context> HttpServer<Context> {
.map_err(parse_error)? .map_err(parse_error)?
{ {
SingleOrBatchRpcRequest::Single(rpc_req) => { SingleOrBatchRpcRequest::Single(rpc_req) => {
let mut res = let mut res = json_http_response(
json_http_response(&self.process_rpc_request(&mut mid, rpc_req).await); &self.process_rpc_request(&ctx, &mut mid, rpc_req).await,
);
for middleware in mid.iter_mut() { for middleware in mid.iter_mut() {
middleware.0.process_http_response(&mut res).await; middleware.0.process_http_response(&ctx, &mut res).await;
} }
Ok(res) Ok(res)
} }
@@ -182,7 +198,7 @@ impl<Context: crate::Context> HttpServer<Context> {
let (mids, rpc_res): (Vec<_>, Vec<_>) = let (mids, rpc_res): (Vec<_>, Vec<_>) =
join_all(rpc_reqs.into_iter().map(|rpc_req| async { join_all(rpc_reqs.into_iter().map(|rpc_req| async {
let mut mid = mid.clone(); let mut mid = mid.clone();
let res = self.process_rpc_request(&mut mid, rpc_req).await; let res = self.process_rpc_request(&ctx, &mut mid, rpc_req).await;
(mid, res) (mid, res)
})) }))
.await .await
@@ -199,7 +215,7 @@ impl<Context: crate::Context> HttpServer<Context> {
}, },
) { ) {
for middleware in mid.iter_mut() { for middleware in mid.iter_mut() {
middleware.0.process_http_response(&mut res).await; middleware.0.process_http_response(&ctx, &mut res).await;
} }
} }
Ok(res) Ok(res)
@@ -217,6 +233,7 @@ impl<Context: crate::Context> HttpServer<Context> {
} }
async fn process_rpc_request( async fn process_rpc_request(
&self, &self,
ctx: &Context,
mid: &mut Vec<DynMiddleware<Context>>, mid: &mut Vec<DynMiddleware<Context>>,
mut req: RpcRequest, mut req: RpcRequest,
) -> RpcResponse { ) -> RpcResponse {
@@ -247,7 +264,7 @@ impl<Context: crate::Context> HttpServer<Context> {
for middleware in mid.iter_mut().rev() { for middleware in mid.iter_mut().rev() {
if let Err(res) = middleware if let Err(res) = middleware
.0 .0
.process_rpc_request(metadata.clone(), &mut req) .process_rpc_request(ctx, metadata.clone(), &mut req)
.await .await
{ {
return res; return res;
@@ -257,7 +274,7 @@ impl<Context: crate::Context> HttpServer<Context> {
} }
.await; .await;
for middleware in mid.iter_mut() { for middleware in mid.iter_mut() {
middleware.0.process_rpc_response(&mut res).await; middleware.0.process_rpc_response(ctx, &mut res).await;
} }
res res
} }