diff --git a/Cargo.lock b/Cargo.lock index 6b3d782..208f7ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,6 +84,28 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.40", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -596,7 +618,8 @@ dependencies = [ [[package]] name = "imbl-value" version = "0.1.0" -source = "git+https://github.com/Start9Labs/imbl-value.git#929395141c3a882ac366c12ac9402d0ebaa2201b" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6d3d8cdfd1ac46aab6195692baf9c65dd34dcc7e7ddfb426a2c014736ba90c" dependencies = [ "imbl", "serde", @@ -929,6 +952,7 @@ dependencies = [ name = "rpc-toolkit" version = "0.2.3" dependencies = [ + "async-stream", "async-trait", "clap", "futures", diff --git a/rpc-toolkit/Cargo.toml b/rpc-toolkit/Cargo.toml index c537815..aefba8e 100644 --- a/rpc-toolkit/Cargo.toml +++ b/rpc-toolkit/Cargo.toml @@ -16,11 +16,12 @@ cbor = ["serde_cbor"] default = ["cbor"] [dependencies] +async-stream = "0.3" async-trait = "0.1" clap = "4" futures = "0.3" hyper = { version = "1", features = ["server", "http1", "http2", "client"] } -imbl-value = { git = "https://github.com/Start9Labs/imbl-value.git" } +imbl-value = "0.1" lazy_static = "1.4" openssl = { version = "0.10", features = ["vendored"] } reqwest = { version = "0.11" } diff --git a/rpc-toolkit/src/cli.rs b/rpc-toolkit/src/cli.rs index eebeae3..8afdc6d 100644 --- a/rpc-toolkit/src/cli.rs +++ b/rpc-toolkit/src/cli.rs @@ -14,7 +14,7 @@ use crate::command::{AsyncCommand, DynCommand, LeafCommand, ParentInfo}; use crate::util::{combine, invalid_params, parse_error}; use crate::{CliBindings, ParentChain}; -impl DynCommand { +impl DynCommand { fn cli_app(&self) -> Option { if let Some(cli) = &self.cli { Some( @@ -54,11 +54,11 @@ impl DynCommand { } } -struct CliApp { +struct CliApp { cli: CliBindings, commands: Vec>, } -impl CliApp { +impl CliApp { pub fn new( commands: Vec>, ) -> Self { @@ -90,11 +90,11 @@ impl CliApp { } } -pub struct CliAppAsync { +pub struct CliAppAsync { app: CliApp, make_ctx: Box BoxFuture<'static, Result> + Send>, } -impl CliAppAsync { +impl CliAppAsync { pub fn new< Cmd: FromArgMatches + CommandFactory + Serialize + DeserializeOwned + Send, F: FnOnce(Cmd) -> Fut + Send + 'static, @@ -140,11 +140,11 @@ impl CliAppAsync { } } -pub struct CliAppSync { +pub struct CliAppSync { app: CliApp, make_ctx: Box Result + Send>, } -impl CliAppSync { +impl CliAppSync { pub fn new< Cmd: FromArgMatches + CommandFactory + Serialize + DeserializeOwned + Send, F: FnOnce(Cmd) -> Result + Send + 'static, @@ -187,11 +187,11 @@ impl CliAppSync { } #[async_trait::async_trait] -pub trait CliContext { +pub trait CliContext: crate::Context { async fn call_remote(&self, method: &str, params: Value) -> Result; } -pub trait CliContextHttp { +pub trait CliContextHttp: crate::Context { fn client(&self) -> &Client; fn url(&self) -> Url; } @@ -243,6 +243,7 @@ impl CliContext for T { } pub trait RemoteCommand: LeafCommand { + fn metadata() -> Context::Metadata; fn subcommands(chain: ParentChain) -> Vec> { drop(chain); Vec::new() @@ -257,6 +258,9 @@ where T::Err: From, Context: CliContext + Send + 'static, { + fn metadata() -> Context::Metadata { + T::metadata() + } async fn implementation( self, ctx: Context, @@ -276,4 +280,7 @@ where ) .map_err(parse_error)?) } + fn subcommands(chain: ParentChain) -> Vec> { + T::subcommands(chain) + } } diff --git a/rpc-toolkit/src/command.rs b/rpc-toolkit/src/command.rs index fefd057..05e2549 100644 --- a/rpc-toolkit/src/command.rs +++ b/rpc-toolkit/src/command.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use std::sync::Arc; use clap::{ArgMatches, CommandFactory, FromArgMatches}; use futures::future::BoxFuture; @@ -12,18 +13,22 @@ use crate::util::{extract, Flat}; /// Stores a command's implementation for a given context /// Can be created from anything that implements ParentCommand, AsyncCommand, or SyncCommand -pub struct DynCommand { +pub struct DynCommand { pub(crate) name: &'static str, + pub(crate) metadata: Context::Metadata, pub(crate) implementation: Option>, pub(crate) cli: Option, pub(crate) subcommands: Vec, } pub(crate) struct Implementation { - pub(crate) async_impl: Box< - dyn Fn(Context, Vec<&'static str>, Value) -> BoxFuture<'static, Result>, + pub(crate) async_impl: Arc< + dyn Fn(Context, Vec<&'static str>, Value) -> BoxFuture<'static, Result> + + Send + + Sync, >, - pub(crate) sync_impl: Box, Value) -> Result>, + pub(crate) sync_impl: + Box, Value) -> Result + Send + Sync>, } pub(crate) struct CliBindings { @@ -107,15 +112,17 @@ where } /// Implement this for a command that has no implementation, but simply exists to organize subcommands -pub trait ParentCommand: Command { +pub trait ParentCommand: Command { + fn metadata() -> Context::Metadata; fn subcommands(chain: ParentChain) -> Vec>; } -impl DynCommand { +impl DynCommand { pub fn from_parent< Cmd: ParentCommand + FromArgMatches + CommandFactory + Serialize, >() -> Self { Self { name: Cmd::NAME, + metadata: Cmd::metadata(), implementation: None, cli: Some(CliBindings::from_parent::()), subcommands: Cmd::subcommands(ParentChain::(PhantomData)), @@ -124,6 +131,7 @@ impl DynCommand { pub fn from_parent_no_cli>() -> Self { Self { name: Cmd::NAME, + metadata: Cmd::metadata(), implementation: None, cli: None, subcommands: Cmd::subcommands(ParentChain::(PhantomData)), @@ -140,7 +148,8 @@ pub trait LeafCommand: Command { /// Implement this if your Command's implementation is async #[async_trait::async_trait] -pub trait AsyncCommand: LeafCommand { +pub trait AsyncCommand: LeafCommand { + fn metadata() -> Context::Metadata; async fn implementation( self, ctx: Context, @@ -155,7 +164,7 @@ impl Implementation { fn for_async>(contains: Contains) -> Self { drop(contains); Self { - async_impl: Box::new(|ctx, parent_method, params| { + async_impl: Arc::new(|ctx, parent_method, params| { async move { let parent_params = extract::(¶ms)?; imbl_value::to_value( @@ -211,6 +220,7 @@ impl DynCommand { ) -> Self { Self { name: Cmd::NAME, + metadata: Cmd::metadata(), implementation: Some(Implementation::for_async::(contains)), cli: Some(CliBindings::from_leaf::()), subcommands: Cmd::subcommands(ParentChain::(PhantomData)), @@ -219,6 +229,7 @@ impl DynCommand { pub fn from_async_no_cli>(contains: Contains) -> Self { Self { name: Cmd::NAME, + metadata: Cmd::metadata(), implementation: Some(Implementation::for_async::(contains)), cli: None, subcommands: Cmd::subcommands(ParentChain::(PhantomData)), @@ -227,8 +238,9 @@ impl DynCommand { } /// Implement this if your Command's implementation is not async -pub trait SyncCommand: LeafCommand { +pub trait SyncCommand: LeafCommand { const BLOCKING: bool; + fn metadata() -> Context::Metadata; fn implementation( self, ctx: Context, @@ -239,12 +251,12 @@ pub trait SyncCommand: LeafCommand { Vec::new() } } -impl Implementation { +impl Implementation { fn for_sync>(contains: Contains) -> Self { drop(contains); Self { async_impl: if Cmd::BLOCKING { - Box::new(|ctx, parent_method, params| { + Arc::new(|ctx, parent_method, params| { tokio::task::spawn_blocking(move || { let parent_params = extract::(¶ms)?; imbl_value::to_value( @@ -276,7 +288,7 @@ impl Implementation { .boxed() }) } else { - Box::new(|ctx, parent_method, params| { + Arc::new(|ctx, parent_method, params| { async move { let parent_params = extract::(¶ms)?; imbl_value::to_value( @@ -327,12 +339,13 @@ impl Implementation { } } } -impl DynCommand { +impl DynCommand { pub fn from_sync + FromArgMatches + CommandFactory + Serialize>( contains: Contains, ) -> Self { Self { name: Cmd::NAME, + metadata: Cmd::metadata(), implementation: Some(Implementation::for_sync::(contains)), cli: Some(CliBindings::from_leaf::()), subcommands: Cmd::subcommands(ParentChain::(PhantomData)), @@ -341,6 +354,7 @@ impl DynCommand { pub fn from_sync_no_cli>(contains: Contains) -> Self { Self { name: Cmd::NAME, + metadata: Cmd::metadata(), implementation: Some(Implementation::for_sync::(contains)), cli: None, subcommands: Cmd::subcommands(ParentChain::(PhantomData)), diff --git a/rpc-toolkit/src/context.rs b/rpc-toolkit/src/context.rs index 57035cb..b718129 100644 --- a/rpc-toolkit/src/context.rs +++ b/rpc-toolkit/src/context.rs @@ -1,6 +1,7 @@ use tokio::runtime::Handle; pub trait Context: Send + 'static { + type Metadata: Default; fn runtime(&self) -> Handle { Handle::current() } diff --git a/rpc-toolkit/src/lib.rs b/rpc-toolkit/src/lib.rs index e4e47f9..7c5807e 100644 --- a/rpc-toolkit/src/lib.rs +++ b/rpc-toolkit/src/lib.rs @@ -23,6 +23,7 @@ pub use context::Context; /// /// See also: [arg](rpc_toolkit_macro::arg), [context](rpc_toolkit_macro::context) pub use rpc_toolkit_macro::command; +pub use server::*; pub use {clap, futures, hyper, reqwest, serde, serde_json, tokio, url, yajrc}; mod cli; @@ -31,4 +32,5 @@ mod command; mod context; // mod metadata; // pub mod rpc_server_helpers; +mod server; mod util; diff --git a/rpc-toolkit/src/server/http.rs b/rpc-toolkit/src/server/http.rs new file mode 100644 index 0000000..e69de29 diff --git a/rpc-toolkit/src/server/mod.rs b/rpc-toolkit/src/server/mod.rs new file mode 100644 index 0000000..be25722 --- /dev/null +++ b/rpc-toolkit/src/server/mod.rs @@ -0,0 +1,224 @@ +use std::sync::Arc; + +use futures::future::{join_all, BoxFuture}; +use futures::stream::{BoxStream, Fuse}; +use futures::{Future, FutureExt, Stream, StreamExt, TryStreamExt}; +use imbl_value::Value; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; +use yajrc::{AnyParams, RpcError, RpcMethod, RpcRequest, RpcResponse, SingleOrBatchRpcRequest}; + +use crate::util::{invalid_request, parse_error}; +use crate::DynCommand; + +mod http; +mod socket; + +pub use http::*; +pub use socket::*; + +impl DynCommand { + fn cmd_from_method( + &self, + method: &[&str], + parent_method: Vec<&'static str>, + ) -> Result<(Vec<&'static str>, &DynCommand), RpcError> { + let mut ret_method = parent_method; + ret_method.push(self.name); + if let Some((cmd, rest)) = method.split_first() { + self.subcommands + .iter() + .find(|c| c.name == *cmd) + .ok_or(yajrc::METHOD_NOT_FOUND_ERROR)? + .cmd_from_method(rest, ret_method) + } else { + Ok((ret_method, self)) + } + } +} + +pub struct Server { + commands: Vec>, + make_ctx: Arc BoxFuture<'static, Result> + Send + Sync>, +} +impl Server { + pub fn new< + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + >( + commands: Vec>, + make_ctx: F, + ) -> Self { + Server { + commands, + make_ctx: Arc::new(move || make_ctx().boxed()), + } + } + + pub fn handle_command( + &self, + method: &str, + params: Value, + ) -> impl Future> + Send + 'static { + let from_self = (|| { + let method: Vec<_> = method.split(".").collect(); + let (cmd, rest) = method.split_first().ok_or(yajrc::METHOD_NOT_FOUND_ERROR)?; + let (method, cmd) = self + .commands + .iter() + .find(|c| c.name == *cmd) + .ok_or(yajrc::METHOD_NOT_FOUND_ERROR)? + .cmd_from_method(rest, Vec::new())?; + Ok::<_, RpcError>(( + cmd.implementation + .as_ref() + .ok_or(yajrc::METHOD_NOT_FOUND_ERROR)? + .async_impl + .clone(), + self.make_ctx.clone(), + method, + params, + )) + })(); + + async move { + let (implementation, make_ctx, method, params) = from_self?; + implementation(make_ctx().await?, method, params).await + } + } + + fn handle_single_request( + &self, + RpcRequest { id, method, params }: RpcRequest, + ) -> impl Future + Send + 'static { + let handle = (|| { + Ok::<_, RpcError>(self.handle_command( + method.as_str(), + match params { + AnyParams::Named(a) => serde_json::Value::Object(a).into(), + _ => { + return Err(RpcError { + data: Some("positional parameters unsupported".into()), + ..yajrc::INVALID_PARAMS_ERROR + }) + } + }, + )) + })(); + async move { + RpcResponse { + id, + result: match handle { + Ok(handle) => handle.await.map(serde_json::Value::from), + Err(e) => Err(e), + }, + } + } + } + + pub fn handle(&self, request: Value) -> BoxFuture<'static, Result> { + let request = + imbl_value::from_value::(request).map_err(invalid_request); + match request { + Ok(SingleOrBatchRpcRequest::Single(req)) => { + let fut = self.handle_single_request(req); + async { imbl_value::to_value(&fut.await).map_err(parse_error) }.boxed() + } + Ok(SingleOrBatchRpcRequest::Batch(reqs)) => { + let futs: Vec<_> = reqs + .into_iter() + .map(|req| self.handle_single_request(req)) + .collect(); + async { imbl_value::to_value(&join_all(futs).await).map_err(parse_error) }.boxed() + } + Err(e) => async { Err(e) }.boxed(), + } + } + + pub fn stream<'a>( + &'a self, + requests: impl Stream> + Send + 'a, + ) -> impl Stream> + 'a { + let mut running = RunningCommands::default(); + let mut requests = requests.boxed().fuse(); + async fn next<'a, Context: crate::Context>( + server: &'a Server, + running: &mut RunningCommands, + requests: &mut Fuse>>, + ) -> Result, RpcError> { + loop { + tokio::select! { + req = requests.try_next() => { + let req = req?; + if let Some(req) = req { + running.running.push(tokio::spawn(server.handle(req))); + } else { + running.closed = true; + } + } + res = running.try_next() => { + return res; + } + } + } + } + async_stream::try_stream! { + while let Some(res) = next(self, &mut running, &mut requests).await? { + yield res; + } + } + } +} + +#[derive(Default)] +struct RunningCommands { + closed: bool, + running: Vec>>, +} + +impl Stream for RunningCommands { + type Item = Result; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let item = self + .running + .iter_mut() + .enumerate() + .find_map(|(i, f)| match f.poll_unpin(cx) { + std::task::Poll::Pending => None, + std::task::Poll::Ready(e) => Some(( + i, + e.map_err(|e| RpcError { + data: Some(e.to_string().into()), + ..yajrc::INTERNAL_ERROR + }) + .and_then(|a| a), + )), + }); + match item { + Some((idx, res)) => { + drop(self.running.swap_remove(idx)); + std::task::Poll::Ready(Some(res)) + } + None => { + if !self.closed || !self.running.is_empty() { + std::task::Poll::Pending + } else { + std::task::Poll::Ready(None) + } + } + } + } +} +impl Drop for RunningCommands { + fn drop(&mut self) { + for hdl in &self.running { + hdl.abort(); + } + if let Ok(rt) = Handle::try_current() { + rt.block_on(join_all(std::mem::take(&mut self.running).into_iter())); + } + } +} diff --git a/rpc-toolkit/src/server/socket.rs b/rpc-toolkit/src/server/socket.rs new file mode 100644 index 0000000..42e8aa3 --- /dev/null +++ b/rpc-toolkit/src/server/socket.rs @@ -0,0 +1,25 @@ +use futures::{AsyncWrite, Future, Stream}; +use tokio::io::AsyncRead; +use tokio::sync::oneshot; +use yajrc::RpcError; + +use crate::Server; + +pub struct ShutdownHandle(oneshot::Sender<()>); + +pub struct SocketServer { + server: Server, +} +impl SocketServer { + pub fn run_json( + &self, + listener: impl Stream, + ) -> (ShutdownHandle, impl Future>) { + let (shutdown_send, shutdown_recv) = oneshot::channel(); + (ShutdownHandle(shutdown_send), async move { + //asdf + //adf + Ok(()) + }) + } +} diff --git a/rpc-toolkit/src/util.rs b/rpc-toolkit/src/util.rs index eccfad9..b56cb47 100644 --- a/rpc-toolkit/src/util.rs +++ b/rpc-toolkit/src/util.rs @@ -35,6 +35,13 @@ pub fn invalid_params(e: imbl_value::Error) -> RpcError { } } +pub fn invalid_request(e: imbl_value::Error) -> RpcError { + RpcError { + data: Some(e.to_string().into()), + ..yajrc::INVALID_REQUEST_ERROR + } +} + pub fn parse_error(e: imbl_value::Error) -> RpcError { RpcError { data: Some(e.to_string().into()),