This commit is contained in:
Aiden McClelland
2023-12-12 20:43:04 -07:00
parent b246ea4179
commit 64a6c00344
10 changed files with 329 additions and 24 deletions

26
Cargo.lock generated
View File

@@ -84,6 +84,28 @@ version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.40",
]
[[package]]
name = "async-trait"
version = "0.1.74"
@@ -596,7 +618,8 @@ dependencies = [
[[package]]
name = "imbl-value"
version = "0.1.0"
source = "git+https://github.com/Start9Labs/imbl-value.git#929395141c3a882ac366c12ac9402d0ebaa2201b"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6d3d8cdfd1ac46aab6195692baf9c65dd34dcc7e7ddfb426a2c014736ba90c"
dependencies = [
"imbl",
"serde",
@@ -929,6 +952,7 @@ dependencies = [
name = "rpc-toolkit"
version = "0.2.3"
dependencies = [
"async-stream",
"async-trait",
"clap",
"futures",

View File

@@ -16,11 +16,12 @@ cbor = ["serde_cbor"]
default = ["cbor"]
[dependencies]
async-stream = "0.3"
async-trait = "0.1"
clap = "4"
futures = "0.3"
hyper = { version = "1", features = ["server", "http1", "http2", "client"] }
imbl-value = { git = "https://github.com/Start9Labs/imbl-value.git" }
imbl-value = "0.1"
lazy_static = "1.4"
openssl = { version = "0.10", features = ["vendored"] }
reqwest = { version = "0.11" }

View File

@@ -14,7 +14,7 @@ use crate::command::{AsyncCommand, DynCommand, LeafCommand, ParentInfo};
use crate::util::{combine, invalid_params, parse_error};
use crate::{CliBindings, ParentChain};
impl<Context> DynCommand<Context> {
impl<Context: crate::Context> DynCommand<Context> {
fn cli_app(&self) -> Option<clap::Command> {
if let Some(cli) = &self.cli {
Some(
@@ -54,11 +54,11 @@ impl<Context> DynCommand<Context> {
}
}
struct CliApp<Context> {
struct CliApp<Context: crate::Context> {
cli: CliBindings,
commands: Vec<DynCommand<Context>>,
}
impl<Context> CliApp<Context> {
impl<Context: crate::Context> CliApp<Context> {
pub fn new<Cmd: FromArgMatches + CommandFactory + Serialize>(
commands: Vec<DynCommand<Context>>,
) -> Self {
@@ -90,11 +90,11 @@ impl<Context> CliApp<Context> {
}
}
pub struct CliAppAsync<Context> {
pub struct CliAppAsync<Context: crate::Context> {
app: CliApp<Context>,
make_ctx: Box<dyn FnOnce(Value) -> BoxFuture<'static, Result<Context, RpcError>> + Send>,
}
impl<Context> CliAppAsync<Context> {
impl<Context: crate::Context> CliAppAsync<Context> {
pub fn new<
Cmd: FromArgMatches + CommandFactory + Serialize + DeserializeOwned + Send,
F: FnOnce(Cmd) -> Fut + Send + 'static,
@@ -140,11 +140,11 @@ impl<Context> CliAppAsync<Context> {
}
}
pub struct CliAppSync<Context> {
pub struct CliAppSync<Context: crate::Context> {
app: CliApp<Context>,
make_ctx: Box<dyn FnOnce(Value) -> Result<Context, RpcError> + Send>,
}
impl<Context> CliAppSync<Context> {
impl<Context: crate::Context> CliAppSync<Context> {
pub fn new<
Cmd: FromArgMatches + CommandFactory + Serialize + DeserializeOwned + Send,
F: FnOnce(Cmd) -> Result<Context, RpcError> + Send + 'static,
@@ -187,11 +187,11 @@ impl<Context> CliAppSync<Context> {
}
#[async_trait::async_trait]
pub trait CliContext {
pub trait CliContext: crate::Context {
async fn call_remote(&self, method: &str, params: Value) -> Result<Value, RpcError>;
}
pub trait CliContextHttp {
pub trait CliContextHttp: crate::Context {
fn client(&self) -> &Client;
fn url(&self) -> Url;
}
@@ -243,6 +243,7 @@ impl<T: CliContextHttp + Sync> CliContext for T {
}
pub trait RemoteCommand<Context: CliContext>: LeafCommand {
fn metadata() -> Context::Metadata;
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>> {
drop(chain);
Vec::new()
@@ -257,6 +258,9 @@ where
T::Err: From<RpcError>,
Context: CliContext + Send + 'static,
{
fn metadata() -> Context::Metadata {
T::metadata()
}
async fn implementation(
self,
ctx: Context,
@@ -276,4 +280,7 @@ where
)
.map_err(parse_error)?)
}
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>> {
T::subcommands(chain)
}
}

View File

@@ -1,4 +1,5 @@
use std::marker::PhantomData;
use std::sync::Arc;
use clap::{ArgMatches, CommandFactory, FromArgMatches};
use futures::future::BoxFuture;
@@ -12,18 +13,22 @@ use crate::util::{extract, Flat};
/// Stores a command's implementation for a given context
/// Can be created from anything that implements ParentCommand, AsyncCommand, or SyncCommand
pub struct DynCommand<Context> {
pub struct DynCommand<Context: crate::Context> {
pub(crate) name: &'static str,
pub(crate) metadata: Context::Metadata,
pub(crate) implementation: Option<Implementation<Context>>,
pub(crate) cli: Option<CliBindings>,
pub(crate) subcommands: Vec<Self>,
}
pub(crate) struct Implementation<Context> {
pub(crate) async_impl: Box<
dyn Fn(Context, Vec<&'static str>, Value) -> BoxFuture<'static, Result<Value, RpcError>>,
pub(crate) async_impl: Arc<
dyn Fn(Context, Vec<&'static str>, Value) -> BoxFuture<'static, Result<Value, RpcError>>
+ Send
+ Sync,
>,
pub(crate) sync_impl: Box<dyn Fn(Context, Vec<&'static str>, Value) -> Result<Value, RpcError>>,
pub(crate) sync_impl:
Box<dyn Fn(Context, Vec<&'static str>, Value) -> Result<Value, RpcError> + Send + Sync>,
}
pub(crate) struct CliBindings {
@@ -107,15 +112,17 @@ where
}
/// Implement this for a command that has no implementation, but simply exists to organize subcommands
pub trait ParentCommand<Context>: Command {
pub trait ParentCommand<Context: crate::Context>: Command {
fn metadata() -> Context::Metadata;
fn subcommands(chain: ParentChain<Self>) -> Vec<DynCommand<Context>>;
}
impl<Context> DynCommand<Context> {
impl<Context: crate::Context> DynCommand<Context> {
pub fn from_parent<
Cmd: ParentCommand<Context> + FromArgMatches + CommandFactory + Serialize,
>() -> Self {
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
implementation: None,
cli: Some(CliBindings::from_parent::<Cmd>()),
subcommands: Cmd::subcommands(ParentChain::<Cmd>(PhantomData)),
@@ -124,6 +131,7 @@ impl<Context> DynCommand<Context> {
pub fn from_parent_no_cli<Cmd: ParentCommand<Context>>() -> Self {
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
implementation: None,
cli: None,
subcommands: Cmd::subcommands(ParentChain::<Cmd>(PhantomData)),
@@ -140,7 +148,8 @@ pub trait LeafCommand: Command {
/// Implement this if your Command's implementation is async
#[async_trait::async_trait]
pub trait AsyncCommand<Context>: LeafCommand {
pub trait AsyncCommand<Context: crate::Context>: LeafCommand {
fn metadata() -> Context::Metadata;
async fn implementation(
self,
ctx: Context,
@@ -155,7 +164,7 @@ impl<Context: crate::Context> Implementation<Context> {
fn for_async<Cmd: AsyncCommand<Context>>(contains: Contains<Cmd::Parent>) -> Self {
drop(contains);
Self {
async_impl: Box::new(|ctx, parent_method, params| {
async_impl: Arc::new(|ctx, parent_method, params| {
async move {
let parent_params = extract::<Cmd::Parent>(&params)?;
imbl_value::to_value(
@@ -211,6 +220,7 @@ impl<Context: crate::Context> DynCommand<Context> {
) -> Self {
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
implementation: Some(Implementation::for_async::<Cmd>(contains)),
cli: Some(CliBindings::from_leaf::<Cmd>()),
subcommands: Cmd::subcommands(ParentChain::<Cmd>(PhantomData)),
@@ -219,6 +229,7 @@ impl<Context: crate::Context> DynCommand<Context> {
pub fn from_async_no_cli<Cmd: AsyncCommand<Context>>(contains: Contains<Cmd::Parent>) -> Self {
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
implementation: Some(Implementation::for_async::<Cmd>(contains)),
cli: None,
subcommands: Cmd::subcommands(ParentChain::<Cmd>(PhantomData)),
@@ -227,8 +238,9 @@ impl<Context: crate::Context> DynCommand<Context> {
}
/// Implement this if your Command's implementation is not async
pub trait SyncCommand<Context>: LeafCommand {
pub trait SyncCommand<Context: crate::Context>: LeafCommand {
const BLOCKING: bool;
fn metadata() -> Context::Metadata;
fn implementation(
self,
ctx: Context,
@@ -239,12 +251,12 @@ pub trait SyncCommand<Context>: LeafCommand {
Vec::new()
}
}
impl<Context: Send + 'static> Implementation<Context> {
impl<Context: crate::Context> Implementation<Context> {
fn for_sync<Cmd: SyncCommand<Context>>(contains: Contains<Cmd::Parent>) -> Self {
drop(contains);
Self {
async_impl: if Cmd::BLOCKING {
Box::new(|ctx, parent_method, params| {
Arc::new(|ctx, parent_method, params| {
tokio::task::spawn_blocking(move || {
let parent_params = extract::<Cmd::Parent>(&params)?;
imbl_value::to_value(
@@ -276,7 +288,7 @@ impl<Context: Send + 'static> Implementation<Context> {
.boxed()
})
} else {
Box::new(|ctx, parent_method, params| {
Arc::new(|ctx, parent_method, params| {
async move {
let parent_params = extract::<Cmd::Parent>(&params)?;
imbl_value::to_value(
@@ -327,12 +339,13 @@ impl<Context: Send + 'static> Implementation<Context> {
}
}
}
impl<Context: Send + 'static> DynCommand<Context> {
impl<Context: crate::Context> DynCommand<Context> {
pub fn from_sync<Cmd: SyncCommand<Context> + FromArgMatches + CommandFactory + Serialize>(
contains: Contains<Cmd::Parent>,
) -> Self {
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
implementation: Some(Implementation::for_sync::<Cmd>(contains)),
cli: Some(CliBindings::from_leaf::<Cmd>()),
subcommands: Cmd::subcommands(ParentChain::<Cmd>(PhantomData)),
@@ -341,6 +354,7 @@ impl<Context: Send + 'static> DynCommand<Context> {
pub fn from_sync_no_cli<Cmd: SyncCommand<Context>>(contains: Contains<Cmd::Parent>) -> Self {
Self {
name: Cmd::NAME,
metadata: Cmd::metadata(),
implementation: Some(Implementation::for_sync::<Cmd>(contains)),
cli: None,
subcommands: Cmd::subcommands(ParentChain::<Cmd>(PhantomData)),

View File

@@ -1,6 +1,7 @@
use tokio::runtime::Handle;
pub trait Context: Send + 'static {
type Metadata: Default;
fn runtime(&self) -> Handle {
Handle::current()
}

View File

@@ -23,6 +23,7 @@ pub use context::Context;
///
/// See also: [arg](rpc_toolkit_macro::arg), [context](rpc_toolkit_macro::context)
pub use rpc_toolkit_macro::command;
pub use server::*;
pub use {clap, futures, hyper, reqwest, serde, serde_json, tokio, url, yajrc};
mod cli;
@@ -31,4 +32,5 @@ mod command;
mod context;
// mod metadata;
// pub mod rpc_server_helpers;
mod server;
mod util;

View File

View File

@@ -0,0 +1,224 @@
use std::sync::Arc;
use futures::future::{join_all, BoxFuture};
use futures::stream::{BoxStream, Fuse};
use futures::{Future, FutureExt, Stream, StreamExt, TryStreamExt};
use imbl_value::Value;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use yajrc::{AnyParams, RpcError, RpcMethod, RpcRequest, RpcResponse, SingleOrBatchRpcRequest};
use crate::util::{invalid_request, parse_error};
use crate::DynCommand;
mod http;
mod socket;
pub use http::*;
pub use socket::*;
impl<Context: crate::Context> DynCommand<Context> {
fn cmd_from_method(
&self,
method: &[&str],
parent_method: Vec<&'static str>,
) -> Result<(Vec<&'static str>, &DynCommand<Context>), RpcError> {
let mut ret_method = parent_method;
ret_method.push(self.name);
if let Some((cmd, rest)) = method.split_first() {
self.subcommands
.iter()
.find(|c| c.name == *cmd)
.ok_or(yajrc::METHOD_NOT_FOUND_ERROR)?
.cmd_from_method(rest, ret_method)
} else {
Ok((ret_method, self))
}
}
}
pub struct Server<Context: crate::Context> {
commands: Vec<DynCommand<Context>>,
make_ctx: Arc<dyn Fn() -> BoxFuture<'static, Result<Context, RpcError>> + Send + Sync>,
}
impl<Context: crate::Context> Server<Context> {
pub fn new<
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Context, RpcError>> + Send + 'static,
>(
commands: Vec<DynCommand<Context>>,
make_ctx: F,
) -> Self {
Server {
commands,
make_ctx: Arc::new(move || make_ctx().boxed()),
}
}
pub fn handle_command(
&self,
method: &str,
params: Value,
) -> impl Future<Output = Result<Value, RpcError>> + Send + 'static {
let from_self = (|| {
let method: Vec<_> = method.split(".").collect();
let (cmd, rest) = method.split_first().ok_or(yajrc::METHOD_NOT_FOUND_ERROR)?;
let (method, cmd) = self
.commands
.iter()
.find(|c| c.name == *cmd)
.ok_or(yajrc::METHOD_NOT_FOUND_ERROR)?
.cmd_from_method(rest, Vec::new())?;
Ok::<_, RpcError>((
cmd.implementation
.as_ref()
.ok_or(yajrc::METHOD_NOT_FOUND_ERROR)?
.async_impl
.clone(),
self.make_ctx.clone(),
method,
params,
))
})();
async move {
let (implementation, make_ctx, method, params) = from_self?;
implementation(make_ctx().await?, method, params).await
}
}
fn handle_single_request(
&self,
RpcRequest { id, method, params }: RpcRequest,
) -> impl Future<Output = RpcResponse> + Send + 'static {
let handle = (|| {
Ok::<_, RpcError>(self.handle_command(
method.as_str(),
match params {
AnyParams::Named(a) => serde_json::Value::Object(a).into(),
_ => {
return Err(RpcError {
data: Some("positional parameters unsupported".into()),
..yajrc::INVALID_PARAMS_ERROR
})
}
},
))
})();
async move {
RpcResponse {
id,
result: match handle {
Ok(handle) => handle.await.map(serde_json::Value::from),
Err(e) => Err(e),
},
}
}
}
pub fn handle(&self, request: Value) -> BoxFuture<'static, Result<Value, RpcError>> {
let request =
imbl_value::from_value::<SingleOrBatchRpcRequest>(request).map_err(invalid_request);
match request {
Ok(SingleOrBatchRpcRequest::Single(req)) => {
let fut = self.handle_single_request(req);
async { imbl_value::to_value(&fut.await).map_err(parse_error) }.boxed()
}
Ok(SingleOrBatchRpcRequest::Batch(reqs)) => {
let futs: Vec<_> = reqs
.into_iter()
.map(|req| self.handle_single_request(req))
.collect();
async { imbl_value::to_value(&join_all(futs).await).map_err(parse_error) }.boxed()
}
Err(e) => async { Err(e) }.boxed(),
}
}
pub fn stream<'a>(
&'a self,
requests: impl Stream<Item = Result<Value, RpcError>> + Send + 'a,
) -> impl Stream<Item = Result<Value, RpcError>> + 'a {
let mut running = RunningCommands::default();
let mut requests = requests.boxed().fuse();
async fn next<'a, Context: crate::Context>(
server: &'a Server<Context>,
running: &mut RunningCommands,
requests: &mut Fuse<BoxStream<'a, Result<Value, RpcError>>>,
) -> Result<Option<Value>, RpcError> {
loop {
tokio::select! {
req = requests.try_next() => {
let req = req?;
if let Some(req) = req {
running.running.push(tokio::spawn(server.handle(req)));
} else {
running.closed = true;
}
}
res = running.try_next() => {
return res;
}
}
}
}
async_stream::try_stream! {
while let Some(res) = next(self, &mut running, &mut requests).await? {
yield res;
}
}
}
}
#[derive(Default)]
struct RunningCommands {
closed: bool,
running: Vec<JoinHandle<Result<Value, RpcError>>>,
}
impl Stream for RunningCommands {
type Item = Result<Value, RpcError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let item = self
.running
.iter_mut()
.enumerate()
.find_map(|(i, f)| match f.poll_unpin(cx) {
std::task::Poll::Pending => None,
std::task::Poll::Ready(e) => Some((
i,
e.map_err(|e| RpcError {
data: Some(e.to_string().into()),
..yajrc::INTERNAL_ERROR
})
.and_then(|a| a),
)),
});
match item {
Some((idx, res)) => {
drop(self.running.swap_remove(idx));
std::task::Poll::Ready(Some(res))
}
None => {
if !self.closed || !self.running.is_empty() {
std::task::Poll::Pending
} else {
std::task::Poll::Ready(None)
}
}
}
}
}
impl Drop for RunningCommands {
fn drop(&mut self) {
for hdl in &self.running {
hdl.abort();
}
if let Ok(rt) = Handle::try_current() {
rt.block_on(join_all(std::mem::take(&mut self.running).into_iter()));
}
}
}

View File

@@ -0,0 +1,25 @@
use futures::{AsyncWrite, Future, Stream};
use tokio::io::AsyncRead;
use tokio::sync::oneshot;
use yajrc::RpcError;
use crate::Server;
pub struct ShutdownHandle(oneshot::Sender<()>);
pub struct SocketServer<Context: crate::Context> {
server: Server<Context>,
}
impl<Context: crate::Context> SocketServer<Context> {
pub fn run_json<T: AsyncRead + AsyncWrite>(
&self,
listener: impl Stream<Item = T>,
) -> (ShutdownHandle, impl Future<Output = Result<(), RpcError>>) {
let (shutdown_send, shutdown_recv) = oneshot::channel();
(ShutdownHandle(shutdown_send), async move {
//asdf
//adf
Ok(())
})
}
}

View File

@@ -35,6 +35,13 @@ pub fn invalid_params(e: imbl_value::Error) -> RpcError {
}
}
pub fn invalid_request(e: imbl_value::Error) -> RpcError {
RpcError {
data: Some(e.to_string().into()),
..yajrc::INVALID_REQUEST_ERROR
}
}
pub fn parse_error(e: imbl_value::Error) -> RpcError {
RpcError {
data: Some(e.to_string().into()),