misc fixes

This commit is contained in:
Aiden McClelland
2021-09-01 14:47:49 -06:00
committed by Aiden McClelland
parent be5952cb67
commit e9faf1f74d
9 changed files with 141 additions and 32 deletions

View File

@@ -4,7 +4,7 @@ pub mod util;
use std::future::Future;
use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use futures::{FutureExt, SinkExt, StreamExt};
use patch_db::json_ptr::JsonPointer;
use patch_db::{Dump, Revision};
use rpc_toolkit::command;
@@ -34,7 +34,17 @@ async fn ws_handler<
.await
.with_kind(crate::ErrorKind::Network)?
.with_kind(crate::ErrorKind::Unknown)?;
stream.next().await;
loop {
if let Some(Message::Text(_)) = stream
.next()
.await
.transpose()
.with_kind(crate::ErrorKind::Network)?
{
// TODO: check auth
break;
}
}
stream
.send(Message::Text(
rpc_toolkit::serde_json::to_string(&dump).with_kind(crate::ErrorKind::Serialization)?,
@@ -43,20 +53,44 @@ async fn ws_handler<
.with_kind(crate::ErrorKind::Network)?;
loop {
let rev = sub.recv().await.with_kind(crate::ErrorKind::Database)?;
stream
.send(Message::Text(
rpc_toolkit::serde_json::to_string(&rev)
.with_kind(crate::ErrorKind::Serialization)?,
))
.await
.with_kind(crate::ErrorKind::Network)?;
futures::select! {
new_rev = sub.recv().fuse() => {
let rev = new_rev.with_kind(crate::ErrorKind::Database)?;
stream
.send(Message::Text(
rpc_toolkit::serde_json::to_string(&rev)
.with_kind(crate::ErrorKind::Serialization)?,
))
.await
.with_kind(crate::ErrorKind::Network)?;
}
message = stream.next().fuse() => {
match message.transpose().with_kind(crate::ErrorKind::Network)? {
Some(Message::Ping(a)) => {
stream
.send(Message::Pong(a))
.await
.with_kind(crate::ErrorKind::Network)?;
}
Some(Message::Close(frame)) => {
if let Some(reason) = frame.as_ref() {
log::info!("Closing WebSocket: Reason: {} {}", reason.code, reason.reason);
}
stream
.send(Message::Close(frame))
.await
.with_kind(crate::ErrorKind::Network)?;
return Ok(())
}
_ => (),
}
}
}
}
}
pub async fn subscribe(ctx: RpcContext, req: Request<Body>) -> Result<Response<Body>, Error> {
let (parts, body) = req.into_parts();
// is_authed(&ctx, &parts).await?;
let req = Request::from_parts(parts, body);
let (res, ws_fut) = hyper_ws_listener::create_ws(req).with_kind(crate::ErrorKind::Network)?;
if let Some(ws_fut) = ws_fut {