example wip

This commit is contained in:
Aiden McClelland
2023-12-13 01:33:57 -07:00
parent 64a6c00344
commit be51ffdd87
11 changed files with 769 additions and 406 deletions

74
Cargo.lock generated
View File

@@ -103,7 +103,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]
[[package]]
@@ -114,7 +114,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]
[[package]]
@@ -196,6 +196,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
@@ -210,6 +211,18 @@ dependencies = [
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.41",
]
[[package]]
name = "clap_lex"
version = "0.6.0"
@@ -355,7 +368,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]
[[package]]
@@ -464,6 +477,12 @@ version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
[[package]]
name = "heck"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.3.3"
@@ -513,6 +532,19 @@ dependencies = [
"http 1.0.0",
]
[[package]]
name = "http-body-util"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840"
dependencies = [
"bytes",
"futures-util",
"http 1.0.0",
"http-body 1.0.0",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.8.0"
@@ -790,7 +822,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]
[[package]]
@@ -956,6 +988,8 @@ dependencies = [
"async-trait",
"clap",
"futures",
"http 1.0.0",
"http-body-util",
"hyper 1.0.1",
"imbl-value",
"lazy_static",
@@ -967,6 +1001,7 @@ dependencies = [
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"url",
"yajrc",
]
@@ -1079,7 +1114,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]
[[package]]
@@ -1168,9 +1203,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.40"
version = "2.0.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13fa70a4ee923979ffb522cacce59d34421ebdea5625e1073c4326ef9d2dd42e"
checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269"
dependencies = [
"proc-macro2",
"quote",
@@ -1228,7 +1263,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]
[[package]]
@@ -1273,7 +1308,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]
[[package]]
@@ -1286,6 +1321,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.10"
@@ -1417,7 +1463,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
"wasm-bindgen-shared",
]
@@ -1451,7 +1497,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -1638,9 +1684,9 @@ dependencies = [
[[package]]
name = "yajrc"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc08b562507a1674a1ef886a1aedeeb19d41462386ae09f634995d41bbef87d3"
checksum = "47cb33cb21fb6923a0dd074fd20dfd98fc3758103b7e2607db1354b4a86ef37c"
dependencies = [
"anyhow",
"serde",
@@ -1677,5 +1723,5 @@ checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
"syn 2.0.41",
]

View File

@@ -18,8 +18,10 @@ default = ["cbor"]
[dependencies]
async-stream = "0.3"
async-trait = "0.1"
clap = "4"
clap = { version = "4", features = ["derive"] }
futures = "0.3"
http = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["server", "http1", "http2", "client"] }
imbl-value = "0.1"
lazy_static = "1.4"
@@ -31,5 +33,6 @@ serde_cbor = { version = "0.11", optional = true }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["io-util", "net"] }
url = "2"
yajrc = "0.1"

View File

@@ -7,12 +7,17 @@ use imbl_value::Value;
use reqwest::{Client, Method};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use url::Url;
use yajrc::{GenericRpcMethod, Id, RpcError, RpcRequest};
use yajrc::{Id, RpcError};
use crate::command::{AsyncCommand, DynCommand, LeafCommand, ParentInfo};
use crate::util::{combine, invalid_params, parse_error};
use crate::{CliBindings, ParentChain};
use crate::util::{combine, internal_error, invalid_params, parse_error};
use crate::{CliBindings, SyncCommand};
type GenericRpcMethod<'a> = yajrc::GenericRpcMethod<&'a str, Value, Value>;
type RpcRequest<'a> = yajrc::RpcRequest<GenericRpcMethod<'a>>;
type RpcResponse<'a> = yajrc::RpcResponse<GenericRpcMethod<'static>>;
impl<Context: crate::Context> DynCommand<Context> {
fn cli_app(&self) -> Option<clap::Command> {
@@ -55,7 +60,7 @@ impl<Context: crate::Context> DynCommand<Context> {
}
struct CliApp<Context: crate::Context> {
cli: CliBindings,
cli: CliBindings<Context>,
commands: Vec<DynCommand<Context>>,
}
impl<Context: crate::Context> CliApp<Context> {
@@ -110,6 +115,8 @@ impl<Context: crate::Context> CliAppAsync<Context> {
}),
}
}
}
impl<Context: crate::Context + Clone> CliAppAsync<Context> {
pub async fn run(self, args: Vec<OsString>) -> Result<(), RpcError> {
let cmd = self
.app
@@ -130,10 +137,10 @@ impl<Context: crate::Context> CliAppAsync<Context> {
.implementation
.as_ref()
.ok_or(yajrc::METHOD_NOT_FOUND_ERROR)?
.async_impl)(ctx, parent_method, params)
.async_impl)(ctx.clone(), parent_method.clone(), params.clone())
.await?;
if let Some(display) = display {
display(res).map_err(parse_error)
display(ctx, parent_method, params, res).map_err(parse_error)
} else {
Ok(())
}
@@ -157,6 +164,8 @@ impl<Context: crate::Context> CliAppSync<Context> {
make_ctx: Box::new(|args| make_ctx(imbl_value::from_value(args).map_err(parse_error)?)),
}
}
}
impl<Context: crate::Context + Clone> CliAppSync<Context> {
pub async fn run(self, args: Vec<OsString>) -> Result<(), RpcError> {
let cmd = self
.app
@@ -177,9 +186,9 @@ impl<Context: crate::Context> CliAppSync<Context> {
.implementation
.as_ref()
.ok_or(yajrc::METHOD_NOT_FOUND_ERROR)?
.sync_impl)(ctx, parent_method, params)?;
.sync_impl)(ctx.clone(), parent_method.clone(), params.clone())?;
if let Some(display) = display {
display(res).map_err(parse_error)
display(ctx, parent_method, params, res).map_err(parse_error)
} else {
Ok(())
}
@@ -191,14 +200,12 @@ pub trait CliContext: crate::Context {
async fn call_remote(&self, method: &str, params: Value) -> Result<Value, RpcError>;
}
#[async_trait::async_trait]
pub trait CliContextHttp: crate::Context {
fn client(&self) -> &Client;
fn url(&self) -> Url;
}
#[async_trait::async_trait]
impl<T: CliContextHttp + Sync> CliContext for T {
async fn call_remote(&self, method: &str, params: Value) -> Result<Value, RpcError> {
let rpc_req: RpcRequest<GenericRpcMethod<&str, Value, Value>> = RpcRequest {
let rpc_req = RpcRequest {
id: Some(Id::Number(0.into())),
method: GenericRpcMethod::new(method),
params,
@@ -222,33 +229,61 @@ impl<T: CliContextHttp + Sync> CliContext for T {
.body(body)
.send()
.await?;
Ok(
match res
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
{
Some("application/json") => serde_json::from_slice(&*res.bytes().await?)?,
#[cfg(feature = "cbor")]
Some("application/cbor") => serde_cbor::from_slice(&*res.bytes().await?)?,
_ => {
return Err(RpcError {
data: Some("missing content type".into()),
..yajrc::INTERNAL_ERROR
})
}
},
)
match res
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
{
Some("application/json") => {
serde_json::from_slice::<RpcResponse>(&*res.bytes().await.map_err(internal_error)?)
.map_err(parse_error)?
.result
}
#[cfg(feature = "cbor")]
Some("application/cbor") => {
serde_cbor::from_slice::<RpcResponse>(&*res.bytes().await.map_err(internal_error)?)
.map_err(parse_error)?
.result
}
_ => Err(internal_error("missing content type")),
}
}
}
pub trait RemoteCommand<Context: CliContext>: LeafCommand {
fn metadata() -> Context::Metadata;
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>> {
drop(chain);
Vec::new()
#[async_trait::async_trait]
pub trait CliContextSocket: crate::Context {
type Stream: AsyncRead + AsyncWrite + Send;
async fn connect(&self) -> std::io::Result<Self::Stream>;
async fn call_remote(&self, method: &str, params: Value) -> Result<Value, RpcError> {
let rpc_req = RpcRequest {
id: Some(Id::Number(0.into())),
method: GenericRpcMethod::new(method),
params,
};
let conn = self.connect().await.map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})?;
tokio::pin!(conn);
let mut buf = serde_json::to_vec(&rpc_req).map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})?;
buf.push(b'\n');
conn.write_all(&buf).await.map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})?;
let mut line = String::new();
BufReader::new(conn).read_line(&mut line).await?;
serde_json::from_str::<RpcResponse>(&line)
.map_err(parse_error)?
.result
}
}
pub trait RemoteCommand<Context: CliContext>: LeafCommand<Context> {}
#[async_trait::async_trait]
impl<T, Context> AsyncCommand<Context> for T
where
@@ -258,9 +293,6 @@ where
T::Err: From<RpcError>,
Context: CliContext + Send + 'static,
{
fn metadata() -> Context::Metadata {
T::metadata()
}
async fn implementation(
self,
ctx: Context,
@@ -280,7 +312,33 @@ where
)
.map_err(parse_error)?)
}
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>> {
T::subcommands(chain)
}
impl<T, Context> SyncCommand<Context> for T
where
T: RemoteCommand<Context> + Send + Serialize,
T::Parent: Serialize,
T::Ok: DeserializeOwned,
T::Err: From<RpcError>,
Context: CliContext + Send + 'static,
{
const BLOCKING: bool = true;
fn implementation(
self,
ctx: Context,
parent: ParentInfo<Self::Parent>,
) -> Result<Self::Ok, Self::Err> {
let mut method = parent.method;
method.push(Self::NAME);
Ok(
imbl_value::from_value(ctx.runtime().block_on(ctx.call_remote(
&method.join("."),
combine(
imbl_value::to_value(&self).map_err(invalid_params)?,
imbl_value::to_value(&parent.params).map_err(invalid_params)?,
)?,
))?)
.map_err(parse_error)?,
)
}
}

View File

@@ -17,7 +17,7 @@ pub struct DynCommand<Context: crate::Context> {
pub(crate) name: &'static str,
pub(crate) metadata: Context::Metadata,
pub(crate) implementation: Option<Implementation<Context>>,
pub(crate) cli: Option<CliBindings>,
pub(crate) cli: Option<CliBindings<Context>>,
pub(crate) subcommands: Vec<Self>,
}
@@ -31,12 +31,18 @@ pub(crate) struct Implementation<Context> {
Box<dyn Fn(Context, Vec<&'static str>, Value) -> Result<Value, RpcError> + Send + Sync>,
}
pub(crate) struct CliBindings {
pub(crate) struct CliBindings<Context> {
pub(crate) cmd: clap::Command,
pub(crate) parser: Box<dyn for<'a> Fn(&'a ArgMatches) -> Result<Value, RpcError> + Send + Sync>,
pub(crate) display: Option<Box<dyn Fn(Value) -> Result<(), imbl_value::Error> + Send + Sync>>,
pub(crate) display: Option<
Box<
dyn Fn(Context, Vec<&'static str>, Value, Value) -> Result<(), imbl_value::Error>
+ Send
+ Sync,
>,
>,
}
impl CliBindings {
impl<Context: crate::Context> CliBindings<Context> {
pub(crate) fn from_parent<Cmd: FromArgMatches + CommandFactory + Serialize>() -> Self {
Self {
cmd: Cmd::command(),
@@ -53,10 +59,19 @@ impl CliBindings {
display: None,
}
}
fn from_leaf<Cmd: FromArgMatches + CommandFactory + Serialize + LeafCommand>() -> Self {
fn from_leaf<Cmd: FromArgMatches + CommandFactory + Serialize + LeafCommand<Context>>() -> Self
{
Self {
display: Some(Box::new(|res| {
Ok(Cmd::display(imbl_value::from_value(res)?))
display: Some(Box::new(|ctx, parent_method, params, res| {
let parent_params = imbl_value::from_value(params.clone())?;
Ok(imbl_value::from_value::<Cmd>(params)?.display(
ctx,
ParentInfo {
method: parent_method,
params: parent_params,
},
imbl_value::from_value(res)?,
))
})),
..Self::from_parent::<Cmd>()
}
@@ -113,13 +128,18 @@ where
/// Implement this for a command that has no implementation, but simply exists to organize subcommands
pub trait ParentCommand<Context: crate::Context>: Command {
fn metadata() -> Context::Metadata;
fn metadata() -> Context::Metadata {
Context::Metadata::default()
}
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>>;
}
impl<Context: crate::Context> DynCommand<Context> {
pub fn from_parent<
Cmd: ParentCommand<Context> + FromArgMatches + CommandFactory + Serialize,
>() -> Self {
>(
contains: Contains<Cmd::Parent>,
) -> Self {
drop(contains);
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
@@ -128,7 +148,10 @@ impl<Context: crate::Context> DynCommand<Context> {
subcommands: Cmd::subcommands(ParentChain::<Cmd>(PhantomData)),
}
}
pub fn from_parent_no_cli<Cmd: ParentCommand<Context>>() -> Self {
pub fn from_parent_no_cli<Cmd: ParentCommand<Context>>(
contains: Contains<Cmd::Parent>,
) -> Self {
drop(contains);
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
@@ -140,25 +163,27 @@ impl<Context: crate::Context> DynCommand<Context> {
}
/// Implement this for any command with an implementation
pub trait LeafCommand: Command {
pub trait LeafCommand<Context: crate::Context>: Command {
type Ok: DeserializeOwned + Serialize + Send;
type Err: From<RpcError> + Into<RpcError> + Send;
fn display(res: Self::Ok);
fn metadata() -> Context::Metadata {
Context::Metadata::default()
}
fn display(self, ctx: Context, parent: ParentInfo<Self::Parent>, res: Self::Ok);
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>> {
drop(chain);
Vec::new()
}
}
/// Implement this if your Command's implementation is async
#[async_trait::async_trait]
pub trait AsyncCommand<Context: crate::Context>: LeafCommand {
fn metadata() -> Context::Metadata;
pub trait AsyncCommand<Context: crate::Context>: LeafCommand<Context> {
async fn implementation(
self,
ctx: Context,
parent: ParentInfo<Self::Parent>,
) -> Result<Self::Ok, Self::Err>;
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>> {
drop(chain);
Vec::new()
}
}
impl<Context: crate::Context> Implementation<Context> {
fn for_async<Cmd: AsyncCommand<Context>>(contains: Contains<Cmd::Parent>) -> Self {
@@ -238,18 +263,13 @@ impl<Context: crate::Context> DynCommand<Context> {
}
/// Implement this if your Command's implementation is not async
pub trait SyncCommand<Context: crate::Context>: LeafCommand {
pub trait SyncCommand<Context: crate::Context>: LeafCommand<Context> {
const BLOCKING: bool;
fn metadata() -> Context::Metadata;
fn implementation(
self,
ctx: Context,
parent: ParentInfo<Self::Parent>,
) -> Result<Self::Ok, Self::Err>;
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>> {
drop(chain);
Vec::new()
}
}
impl<Context: crate::Context> Implementation<Context> {
fn for_sync<Cmd: SyncCommand<Context>>(contains: Contains<Cmd::Parent>) -> Self {

View File

@@ -1,7 +1,7 @@
use tokio::runtime::Handle;
pub trait Context: Send + 'static {
type Metadata: Default;
type Metadata: Default + Send + Sync;
fn runtime(&self) -> Handle {
Handle::current()
}

View File

@@ -0,0 +1,60 @@
use std::task::Context;
use futures::future::BoxFuture;
use http::request::Parts;
use hyper::body::{Bytes, Incoming};
use hyper::{Request, Response};
use yajrc::{RpcRequest, RpcResponse};
type BoxBody = http_body_util::combinators::BoxBody<Bytes, hyper::Error>;
#[async_trait::async_trait]
pub trait Middleware<Context: crate::Context> {
type ProcessHttpRequestResult;
async fn process_http_request(
&self,
req: &mut Request<BoxBody>,
) -> Result<Self::ProcessHttpRequestResult, hyper::Result<Response<Bytes>>>;
type ProcessRpcRequestResult;
async fn process_rpc_request(
&self,
prev: Self::ProcessHttpRequestResult,
metadata: &Context::Metadata,
req: &mut RpcRequest,
) -> Result<Self::ProcessRpcRequestResult, RpcResponse>;
type ProcessRpcResponseResult;
async fn process_rpc_response(
&self,
prev: Self::ProcessRpcRequestResult,
res: &mut RpcResponse,
) -> Self::ProcessRpcResponseResult;
async fn process_http_response(
&self,
prev: Self::ProcessRpcResponseResult,
res: &mut Response<Bytes>,
);
}
// pub struct DynMiddleware<Context: crate::Context> {
// process_http_request: Box<
// dyn for<'a> Fn(
// &'a mut Request<BoxBody>,
// ) -> BoxFuture<
// 'a,
// Result<DynProcessRpcRequest<Context>, hyper::Result<Response<Bytes>>>,
// > + Send
// + Sync,
// >,
// }
// type DynProcessRpcRequest<'m, Context: crate::Context> = Box<
// dyn for<'a> FnOnce(
// &'a Context::Metadata,
// &'a mut RpcRequest,
// )
// -> BoxFuture<'a, Result<DynProcessRpcResponse<'m>, DynSkipHandler<'m>>>
// + Send
// + Sync
// + 'm,
// >;
// type DynProcessRpcResponse<'m> =
// Box<dyn for<'a> FnOnce(&'a mut RpcResponse) -> BoxFuture<'a, DynProcessHttpResponse<'m>>>;

View File

@@ -1,16 +1,19 @@
use std::borrow::Cow;
use std::sync::Arc;
use futures::future::{join_all, BoxFuture};
use futures::stream::{BoxStream, Fuse};
use futures::{Future, FutureExt, Stream, StreamExt, TryStreamExt};
use futures::{Future, FutureExt, Stream, StreamExt};
use imbl_value::Value;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use yajrc::{AnyParams, RpcError, RpcMethod, RpcRequest, RpcResponse, SingleOrBatchRpcRequest};
use yajrc::{AnyParams, AnyRpcMethod, RpcError, RpcMethod};
use crate::util::{invalid_request, parse_error};
use crate::util::{invalid_request, JobRunner};
use crate::DynCommand;
type GenericRpcMethod = yajrc::GenericRpcMethod<String, Value, Value>;
type RpcRequest = yajrc::RpcRequest<GenericRpcMethod>;
type RpcResponse = yajrc::RpcResponse<GenericRpcMethod>;
type SingleOrBatchRpcRequest = yajrc::SingleOrBatchRpcRequest<GenericRpcMethod>;
mod http;
mod socket;
@@ -91,134 +94,58 @@ impl<Context: crate::Context> Server<Context> {
&self,
RpcRequest { id, method, params }: RpcRequest,
) -> impl Future<Output = RpcResponse> + Send + 'static {
let handle = (|| {
Ok::<_, RpcError>(self.handle_command(
method.as_str(),
match params {
AnyParams::Named(a) => serde_json::Value::Object(a).into(),
_ => {
return Err(RpcError {
data: Some("positional parameters unsupported".into()),
..yajrc::INVALID_PARAMS_ERROR
})
}
},
))
})();
let handle = (|| Ok::<_, RpcError>(self.handle_command(method.as_str(), params)))();
async move {
RpcResponse {
id,
result: match handle {
Ok(handle) => handle.await.map(serde_json::Value::from),
Ok(handle) => handle.await,
Err(e) => Err(e),
},
}
}
}
pub fn handle(&self, request: Value) -> BoxFuture<'static, Result<Value, RpcError>> {
let request =
imbl_value::from_value::<SingleOrBatchRpcRequest>(request).map_err(invalid_request);
match request {
pub fn handle(
&self,
request: Result<Value, RpcError>,
) -> BoxFuture<'static, Result<Value, imbl_value::Error>> {
match request.and_then(|request| {
imbl_value::from_value::<SingleOrBatchRpcRequest>(request).map_err(invalid_request)
}) {
Ok(SingleOrBatchRpcRequest::Single(req)) => {
let fut = self.handle_single_request(req);
async { imbl_value::to_value(&fut.await).map_err(parse_error) }.boxed()
async { imbl_value::to_value(&fut.await) }.boxed()
}
Ok(SingleOrBatchRpcRequest::Batch(reqs)) => {
let futs: Vec<_> = reqs
.into_iter()
.map(|req| self.handle_single_request(req))
.collect();
async { imbl_value::to_value(&join_all(futs).await).map_err(parse_error) }.boxed()
async { imbl_value::to_value(&join_all(futs).await) }.boxed()
}
Err(e) => async { Err(e) }.boxed(),
Err(e) => async {
imbl_value::to_value(&RpcResponse {
id: None,
result: Err(e),
})
}
.boxed(),
}
}
pub fn stream<'a>(
&'a self,
requests: impl Stream<Item = Result<Value, RpcError>> + Send + 'a,
) -> impl Stream<Item = Result<Value, RpcError>> + 'a {
let mut running = RunningCommands::default();
let mut requests = requests.boxed().fuse();
async fn next<'a, Context: crate::Context>(
server: &'a Server<Context>,
running: &mut RunningCommands,
requests: &mut Fuse<BoxStream<'a, Result<Value, RpcError>>>,
) -> Result<Option<Value>, RpcError> {
loop {
tokio::select! {
req = requests.try_next() => {
let req = req?;
if let Some(req) = req {
running.running.push(tokio::spawn(server.handle(req)));
} else {
running.closed = true;
}
}
res = running.try_next() => {
return res;
}
}
}
}
) -> impl Stream<Item = Result<Value, imbl_value::Error>> + 'a {
async_stream::try_stream! {
while let Some(res) = next(self, &mut running, &mut requests).await? {
let mut runner = JobRunner::new();
let requests = requests.fuse().map(|req| self.handle(req));
tokio::pin!(requests);
while let Some(res) = runner.next_result(&mut requests).await.transpose()? {
yield res;
}
}
}
}
#[derive(Default)]
struct RunningCommands {
closed: bool,
running: Vec<JoinHandle<Result<Value, RpcError>>>,
}
impl Stream for RunningCommands {
type Item = Result<Value, RpcError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let item = self
.running
.iter_mut()
.enumerate()
.find_map(|(i, f)| match f.poll_unpin(cx) {
std::task::Poll::Pending => None,
std::task::Poll::Ready(e) => Some((
i,
e.map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})
.and_then(|a| a),
)),
});
match item {
Some((idx, res)) => {
drop(self.running.swap_remove(idx));
std::task::Poll::Ready(Some(res))
}
None => {
if !self.closed || !self.running.is_empty() {
std::task::Poll::Pending
} else {
std::task::Poll::Ready(None)
}
}
}
}
}
impl Drop for RunningCommands {
fn drop(&mut self) {
for hdl in &self.running {
hdl.abort();
}
if let Ok(rt) = Handle::try_current() {
rt.block_on(join_all(std::mem::take(&mut self.running).into_iter()));
}
}
}

View File

@@ -1,25 +1,95 @@
use futures::{AsyncWrite, Future, Stream};
use tokio::io::AsyncRead;
use tokio::sync::oneshot;
use std::path::Path;
use std::sync::Arc;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use imbl_value::Value;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, ToSocketAddrs, UnixListener};
use tokio::sync::OnceCell;
use yajrc::RpcError;
use crate::util::{parse_error, JobRunner};
use crate::Server;
pub struct ShutdownHandle(oneshot::Sender<()>);
pub struct SocketServer<Context: crate::Context> {
server: Server<Context>,
}
impl<Context: crate::Context> SocketServer<Context> {
pub fn run_json<T: AsyncRead + AsyncWrite>(
&self,
listener: impl Stream<Item = T>,
) -> (ShutdownHandle, impl Future<Output = Result<(), RpcError>>) {
let (shutdown_send, shutdown_recv) = oneshot::channel();
(ShutdownHandle(shutdown_send), async move {
//asdf
//adf
Ok(())
})
#[derive(Clone)]
pub struct ShutdownHandle(Arc<OnceCell<()>>);
impl ShutdownHandle {
pub fn shutdown(self) {
let _ = self.0.set(());
}
}
impl<Context: crate::Context> Server<Context> {
pub fn run_socket<'a, T: AsyncRead + AsyncWrite + Send>(
&'a self,
listener: impl Stream<Item = std::io::Result<T>> + 'a,
error_handler: impl Fn(std::io::Error) + Sync + 'a,
) -> (ShutdownHandle, impl Future<Output = ()> + 'a) {
let shutdown = Arc::new(OnceCell::new());
(ShutdownHandle(shutdown.clone()), async move {
let mut runner = JobRunner::<std::io::Result<()>>::new();
let jobs = listener.map(|pipe| async {
let pipe = pipe?;
let (r, mut w) = tokio::io::split(pipe);
let stream = self.stream(
tokio_stream::wrappers::LinesStream::new(BufReader::new(r).lines())
.map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})
.try_filter_map(|a| async move {
Ok(if a.is_empty() {
None
} else {
Some(serde_json::from_str::<Value>(&a).map_err(parse_error)?)
})
}),
);
tokio::pin!(stream);
while let Some(res) = stream.next().await {
if let Err(e) = async {
let mut buf = serde_json::to_vec(
&res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
buf.push(b'\n');
w.write_all(&buf).await
}
.await
{
error_handler(e)
}
}
Ok(())
});
tokio::pin!(jobs);
while let Some(res) = runner.next_result(&mut jobs).await {
if let Err(e) = res {
error_handler(e)
}
}
})
}
pub fn run_unix<'a>(
&'a self,
path: impl AsRef<Path> + 'a,
error_handler: impl Fn(std::io::Error) + Sync + 'a,
) -> std::io::Result<(ShutdownHandle, impl Future<Output = ()> + 'a)> {
let listener = UnixListener::bind(path)?;
Ok(self.run_socket(
tokio_stream::wrappers::UnixListenerStream::new(listener),
error_handler,
))
}
pub async fn run_tcp<'a>(
&'a self,
addr: impl ToSocketAddrs + 'a,
error_handler: impl Fn(std::io::Error) + Sync + 'a,
) -> std::io::Result<(ShutdownHandle, impl Future<Output = ()> + 'a)> {
let listener = TcpListener::bind(addr).await?;
Ok(self.run_socket(
tokio_stream::wrappers::TcpListenerStream::new(listener),
error_handler,
))
}
}

View File

@@ -1,3 +1,7 @@
use std::fmt::Display;
use futures::future::BoxFuture;
use futures::{Future, FutureExt, Stream, StreamExt};
use imbl_value::Value;
use serde::de::DeserializeOwned;
use serde::Deserialize;
@@ -42,13 +46,20 @@ pub fn invalid_request(e: imbl_value::Error) -> RpcError {
}
}
pub fn parse_error(e: imbl_value::Error) -> RpcError {
pub fn parse_error(e: impl Display) -> RpcError {
RpcError {
data: Some(e.to_string().into()),
..yajrc::PARSE_ERROR
}
}
pub fn internal_error(e: impl Display) -> RpcError {
RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
}
}
pub struct Flat<A, B>(pub A, pub B);
impl<'de, A, B> Deserialize<'de> for Flat<A, B>
where
@@ -65,3 +76,72 @@ where
Ok(Flat(a, b))
}
}
pub fn poll_select_all<'a, T>(
futs: &mut Vec<BoxFuture<'a, T>>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<T> {
let item = futs
.iter_mut()
.enumerate()
.find_map(|(i, f)| match f.poll_unpin(cx) {
std::task::Poll::Pending => None,
std::task::Poll::Ready(e) => Some((i, e)),
});
match item {
Some((idx, res)) => {
drop(futs.swap_remove(idx));
std::task::Poll::Ready(res)
}
None => std::task::Poll::Pending,
}
}
pub struct JobRunner<'a, T> {
closed: bool,
running: Vec<BoxFuture<'a, T>>,
}
impl<'a, T> JobRunner<'a, T> {
pub fn new() -> Self {
JobRunner {
closed: false,
running: Vec::new(),
}
}
pub async fn next_result<
Src: Stream<Item = Fut> + Unpin,
Fut: Future<Output = T> + Send + 'a,
>(
&mut self,
job_source: &mut Src,
) -> Option<T> {
loop {
tokio::select! {
job = job_source.next() => {
if let Some(job) = job {
self.running.push(job.boxed());
} else {
self.closed = true;
}
}
res = self.next() => {
return res;
}
}
}
}
}
impl<'a, T> Stream for JobRunner<'a, T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match poll_select_all(&mut self.running, cx) {
std::task::Poll::Pending if self.closed && self.running.is_empty() => {
std::task::Poll::Ready(None)
}
a => a.map(Some),
}
}
}

View File

@@ -1,214 +1,205 @@
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;
// use std::fmt::Display;
// use std::str::FromStr;
// use std::sync::Arc;
use futures::FutureExt;
use hyper::Request;
use rpc_toolkit::clap::Arg;
use rpc_toolkit::hyper::http::Error as HttpError;
use rpc_toolkit::hyper::{Body, Response};
use rpc_toolkit::rpc_server_helpers::{
DynMiddlewareStage2, DynMiddlewareStage3, DynMiddlewareStage4,
};
use rpc_toolkit::serde::{Deserialize, Serialize};
use rpc_toolkit::url::Host;
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{command, rpc_server, run_cli, Context, Metadata};
// use futures::FutureExt;
// use hyper::Request;
// use rpc_toolkit::clap::Arg;
// use rpc_toolkit::hyper::http::Error as HttpError;
// use rpc_toolkit::hyper::{Body, Response};
// use rpc_toolkit::rpc_server_helpers::{
// DynMiddlewareStage2, DynMiddlewareStage3, DynMiddlewareStage4,
// };
// use rpc_toolkit::serde::{Deserialize, Serialize};
// use rpc_toolkit::url::Host;
// use rpc_toolkit::yajrc::RpcError;
// use rpc_toolkit::{command, rpc_server, run_cli, Context, Metadata};
#[derive(Debug, Clone)]
pub struct AppState(Arc<ConfigSeed>);
impl From<AppState> for () {
fn from(_: AppState) -> Self {
()
}
}
// #[derive(Debug, Clone)]
// pub struct AppState(Arc<ConfigSeed>);
// impl From<AppState> for () {
// fn from(_: AppState) -> Self {
// ()
// }
// }
#[derive(Debug)]
pub struct ConfigSeed {
host: Host,
port: u16,
}
// #[derive(Debug)]
// pub struct ConfigSeed {
// host: Host,
// port: u16,
// }
impl Context for AppState {
fn host(&self) -> Host<&str> {
match &self.0.host {
Host::Domain(s) => Host::Domain(s.as_str()),
Host::Ipv4(i) => Host::Ipv4(*i),
Host::Ipv6(i) => Host::Ipv6(*i),
}
}
fn port(&self) -> u16 {
self.0.port
}
}
// impl Context for AppState {
// type Metadata = ();
// }
fn test_string() -> String {
"test".to_owned()
}
// fn test_string() -> String {
// "test".to_owned()
// }
#[command(
about = "Does the thing",
subcommands("dothething2::<U, E>", self(dothething_impl(async)))
)]
async fn dothething<
U: Serialize + for<'a> Deserialize<'a> + FromStr<Err = E> + Clone + 'static,
E: Display,
>(
#[context] _ctx: AppState,
#[arg(short = 'a')] arg1: Option<String>,
#[arg(short = 'b', default = "test_string")] val: String,
#[arg(short = 'c', help = "I am the flag `c`!", default)] arg3: bool,
#[arg(stdin)] structured: U,
) -> Result<(Option<String>, String, bool, U), RpcError> {
Ok((arg1, val, arg3, structured))
}
// #[command(
// about = "Does the thing",
// subcommands("dothething2::<U, E>", self(dothething_impl(async)))
// )]
// async fn dothething<
// U: Serialize + for<'a> Deserialize<'a> + FromStr<Err = E> + Clone + 'static,
// E: Display,
// >(
// #[context] _ctx: AppState,
// #[arg(short = 'a')] arg1: Option<String>,
// #[arg(short = 'b', default = "test_string")] val: String,
// #[arg(short = 'c', help = "I am the flag `c`!", default)] arg3: bool,
// #[arg(stdin)] structured: U,
// ) -> Result<(Option<String>, String, bool, U), RpcError> {
// Ok((arg1, val, arg3, structured))
// }
async fn dothething_impl<U: Serialize>(
ctx: AppState,
parent_data: (Option<String>, String, bool, U),
) -> Result<String, RpcError> {
Ok(format!(
"{:?}, {:?}, {}, {}, {}",
ctx,
parent_data.0,
parent_data.1,
parent_data.2,
serde_json::to_string_pretty(&parent_data.3)?
))
}
// async fn dothething_impl<U: Serialize>(
// ctx: AppState,
// parent_data: (Option<String>, String, bool, U),
// ) -> Result<String, RpcError> {
// Ok(format!(
// "{:?}, {:?}, {}, {}, {}",
// ctx,
// parent_data.0,
// parent_data.1,
// parent_data.2,
// serde_json::to_string_pretty(&parent_data.3)?
// ))
// }
#[command(about = "Does the thing")]
fn dothething2<U: Serialize + for<'a> Deserialize<'a> + FromStr<Err = E>, E: Display>(
#[parent_data] parent_data: (Option<String>, String, bool, U),
#[arg(stdin)] structured2: U,
) -> Result<String, RpcError> {
Ok(format!(
"{:?}, {}, {}, {}, {}",
parent_data.0,
parent_data.1,
parent_data.2,
serde_json::to_string_pretty(&parent_data.3)?,
serde_json::to_string_pretty(&structured2)?,
))
}
// #[command(about = "Does the thing")]
// fn dothething2<U: Serialize + for<'a> Deserialize<'a> + FromStr<Err = E>, E: Display>(
// #[parent_data] parent_data: (Option<String>, String, bool, U),
// #[arg(stdin)] structured2: U,
// ) -> Result<String, RpcError> {
// Ok(format!(
// "{:?}, {}, {}, {}, {}",
// parent_data.0,
// parent_data.1,
// parent_data.2,
// serde_json::to_string_pretty(&parent_data.3)?,
// serde_json::to_string_pretty(&structured2)?,
// ))
// }
async fn cors<M: Metadata + 'static>(
req: &mut Request<Body>,
_: M,
) -> Result<Result<DynMiddlewareStage2, Response<Body>>, HttpError> {
if req.method() == hyper::Method::OPTIONS {
Ok(Err(Response::builder()
.header("Access-Control-Allow-Origin", "*")
.body(Body::empty())?))
} else {
Ok(Ok(Box::new(|_, _| {
async move {
let res: DynMiddlewareStage3 = Box::new(|_, _| {
async move {
let res: DynMiddlewareStage4 = Box::new(|res| {
async move {
res.headers_mut()
.insert("Access-Control-Allow-Origin", "*".parse()?);
Ok::<_, HttpError>(())
}
.boxed()
});
Ok::<_, HttpError>(Ok(res))
}
.boxed()
});
Ok::<_, HttpError>(Ok(res))
}
.boxed()
})))
}
}
// async fn cors<M: Metadata + 'static>(
// req: &mut Request<Body>,
// _: M,
// ) -> Result<Result<DynMiddlewareStage2, Response<Body>>, HttpError> {
// if req.method() == hyper::Method::OPTIONS {
// Ok(Err(Response::builder()
// .header("Access-Control-Allow-Origin", "*")
// .body(Body::empty())?))
// } else {
// Ok(Ok(Box::new(|_, _| {
// async move {
// let res: DynMiddlewareStage3 = Box::new(|_, _| {
// async move {
// let res: DynMiddlewareStage4 = Box::new(|res| {
// async move {
// res.headers_mut()
// .insert("Access-Control-Allow-Origin", "*".parse()?);
// Ok::<_, HttpError>(())
// }
// .boxed()
// });
// Ok::<_, HttpError>(Ok(res))
// }
// .boxed()
// });
// Ok::<_, HttpError>(Ok(res))
// }
// .boxed()
// })))
// }
// }
#[tokio::test]
async fn test_rpc() {
use tokio::io::AsyncWriteExt;
// #[tokio::test]
// async fn test_rpc() {
// use tokio::io::AsyncWriteExt;
let seed = Arc::new(ConfigSeed {
host: Host::parse("localhost").unwrap(),
port: 8000,
});
let server = rpc_server!({
command: dothething::<String, _>,
context: AppState(seed),
middleware: [
cors,
],
});
let handle = tokio::spawn(server);
let mut cmd = tokio::process::Command::new("cargo")
.arg("test")
.arg("--package")
.arg("rpc-toolkit")
.arg("--test")
.arg("test")
.arg("--")
.arg("cli_test")
.arg("--exact")
.arg("--nocapture")
.arg("--")
// .arg("-b")
// .arg("test")
.arg("dothething2")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
cmd.stdin
.take()
.unwrap()
.write_all(b"TEST\nHAHA")
.await
.unwrap();
let out = cmd.wait_with_output().await.unwrap();
assert!(out.status.success());
assert!(dbg!(std::str::from_utf8(&out.stdout).unwrap())
.contains("\nNone, test, false, \"TEST\", \"HAHA\"\n"));
handle.abort();
}
// let seed = Arc::new(ConfigSeed {
// host: Host::parse("localhost").unwrap(),
// port: 8000,
// });
// let server = rpc_server!({
// command: dothething::<String, _>,
// context: AppState(seed),
// middleware: [
// cors,
// ],
// });
// let handle = tokio::spawn(server);
// let mut cmd = tokio::process::Command::new("cargo")
// .arg("test")
// .arg("--package")
// .arg("rpc-toolkit")
// .arg("--test")
// .arg("test")
// .arg("--")
// .arg("cli_test")
// .arg("--exact")
// .arg("--nocapture")
// .arg("--")
// // .arg("-b")
// // .arg("test")
// .arg("dothething2")
// .stdin(std::process::Stdio::piped())
// .stdout(std::process::Stdio::piped())
// .spawn()
// .unwrap();
// cmd.stdin
// .take()
// .unwrap()
// .write_all(b"TEST\nHAHA")
// .await
// .unwrap();
// let out = cmd.wait_with_output().await.unwrap();
// assert!(out.status.success());
// assert!(dbg!(std::str::from_utf8(&out.stdout).unwrap())
// .contains("\nNone, test, false, \"TEST\", \"HAHA\"\n"));
// handle.abort();
// }
#[test]
fn cli_test() {
let app = dothething::build_app();
let mut skip = true;
let args = std::iter::once(std::ffi::OsString::from("cli_test"))
.chain(std::env::args_os().into_iter().skip_while(|a| {
if a == "--" {
skip = false;
return true;
}
skip
}))
.collect::<Vec<_>>();
if skip {
return;
}
let matches = app.get_matches_from(args);
let seed = Arc::new(ConfigSeed {
host: Host::parse("localhost").unwrap(),
port: 8000,
});
dothething::cli_handler::<String, _, _, _>(AppState(seed), (), None, &matches, "".into(), ())
.unwrap();
}
// #[test]
// fn cli_test() {
// let app = dothething::build_app();
// let mut skip = true;
// let args = std::iter::once(std::ffi::OsString::from("cli_test"))
// .chain(std::env::args_os().into_iter().skip_while(|a| {
// if a == "--" {
// skip = false;
// return true;
// }
// skip
// }))
// .collect::<Vec<_>>();
// if skip {
// return;
// }
// let matches = app.get_matches_from(args);
// let seed = Arc::new(ConfigSeed {
// host: Host::parse("localhost").unwrap(),
// port: 8000,
// });
// dothething::cli_handler::<String, _, _, _>(AppState(seed), (), None, &matches, "".into(), ())
// .unwrap();
// }
#[test]
#[ignore]
fn cli_example() {
run_cli! ({
command: dothething::<String, _>,
app: app => app
.arg(Arg::with_name("host").long("host").short('h').takes_value(true))
.arg(Arg::with_name("port").long("port").short('p').takes_value(true)),
context: matches => AppState(Arc::new(ConfigSeed {
host: Host::parse(matches.value_of("host").unwrap_or("localhost")).unwrap(),
port: matches.value_of("port").unwrap_or("8000").parse().unwrap(),
}))
})
}
// #[test]
// #[ignore]
// fn cli_example() {
// run_cli! ({
// command: dothething::<String, _>,
// app: app => app
// .arg(Arg::with_name("host").long("host").short('h').takes_value(true))
// .arg(Arg::with_name("port").long("port").short('p').takes_value(true)),
// context: matches => AppState(Arc::new(ConfigSeed {
// host: Host::parse(matches.value_of("host").unwrap_or("localhost")).unwrap(),
// port: matches.value_of("port").unwrap_or("8000").parse().unwrap(),
// }))
// })
// }
////////////////////////////////////////////////
// ////////////////////////////////////////////////

View File

@@ -1 +1,109 @@
pub struct App;
use std::path::PathBuf;
use clap::Parser;
use futures::Future;
use rpc_toolkit::{
AsyncCommand, CliContextSocket, Command, Contains, Context, DynCommand, LeafCommand, NoParent,
ParentCommand, ParentInfo, Server, ShutdownHandle,
};
use serde::{Deserialize, Serialize};
use tokio::net::UnixStream;
use yajrc::RpcError;
struct ServerContext;
impl Context for ServerContext {
type Metadata = ();
}
struct CliContext(PathBuf);
impl Context for CliContext {
type Metadata = ();
}
#[async_trait::async_trait]
impl CliContextSocket for CliContext {
type Stream = UnixStream;
async fn connect(&self) -> std::io::Result<Self::Stream> {
UnixStream::connect(&self.0).await
}
}
#[async_trait::async_trait]
impl rpc_toolkit::CliContext for CliContext {
async fn call_remote(
&self,
method: &str,
params: imbl_value::Value,
) -> Result<imbl_value::Value, RpcError> {
<Self as CliContextSocket>::call_remote(self, method, params).await
}
}
async fn run_server() {
Server::new(
vec![
DynCommand::from_parent::<Group>(Contains::none()),
DynCommand::from_async::<Thing1>(Contains::none()),
// DynCommand::from_async::<Thing2>(Contains::none()),
// DynCommand::from_sync::<Thing3>(Contains::none()),
// DynCommand::from_sync::<Thing4>(Contains::none()),
],
|| async { Ok(ServerContext) },
)
.run_unix("./test.sock", |e| eprintln!("{e}"))
.unwrap()
.1
.await
}
#[derive(Debug, Deserialize, Serialize, Parser)]
struct Group {
#[arg(short, long)]
verbose: bool,
}
impl Command for Group {
const NAME: &'static str = "group";
type Parent = NoParent;
}
impl<Ctx> ParentCommand<Ctx> for Group
where
Ctx: Context,
// SubThing: AsyncCommand<Ctx>,
Thing1: AsyncCommand<Ctx>,
{
fn subcommands(chain: rpc_toolkit::ParentChain<Self>) -> Vec<DynCommand<Ctx>> {
vec![
// DynCommand::from_async::<SubThing>(chain.child()),
DynCommand::from_async::<Thing1>(Contains::none()),
]
}
}
#[derive(Debug, Deserialize, Serialize, Parser)]
struct Thing1 {
thing: String,
}
impl Command for Thing1 {
const NAME: &'static str = "thing1";
type Parent = NoParent;
}
impl LeafCommand<ServerContext> for Thing1 {
type Ok = String;
type Err = RpcError;
fn display(self, _: ServerContext, _: rpc_toolkit::ParentInfo<Self::Parent>, res: Self::Ok) {
println!("{}", res);
}
}
#[async_trait::async_trait]
impl AsyncCommand<ServerContext> for Thing1 {
async fn implementation(
self,
_: ServerContext,
_: ParentInfo<Self::Parent>,
) -> Result<Self::Ok, Self::Err> {
Ok(format!("Thing1 is {}", self.thing))
}
}
#[tokio::test]
async fn test() {
let server = tokio::spawn(run_server());
}