mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 10:21:52 +00:00
[Fix] websocket connecting and patchDB connection monitoring (#1738)
* refactor how we handle rpc responses and patchdb connection monitoring * websockets only * remove unused global error handlers * chore: clear storage inside auth service * feat: convert all global toasts to declarative approach (#1754) * no more reference to serverID Co-authored-by: Aiden McClelland <me@drbonez.dev> Co-authored-by: waterplea <alexander@inkin.ru>
This commit is contained in:
@@ -2,23 +2,22 @@ pub mod model;
|
||||
pub mod package;
|
||||
pub mod util;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use color_eyre::eyre::eyre;
|
||||
use futures::{FutureExt, SinkExt, StreamExt};
|
||||
use patch_db::json_ptr::JsonPointer;
|
||||
use patch_db::{Dump, Revision};
|
||||
use rpc_toolkit::command;
|
||||
use rpc_toolkit::hyper::upgrade::Upgraded;
|
||||
use rpc_toolkit::hyper::{Body, Error as HyperError, Request, Response};
|
||||
use rpc_toolkit::yajrc::{GenericRpcMethod, RpcError, RpcResponse};
|
||||
use rpc_toolkit::yajrc::RpcError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::sync::{broadcast, oneshot};
|
||||
use tokio::task::JoinError;
|
||||
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
|
||||
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tracing::instrument;
|
||||
@@ -30,11 +29,12 @@ use crate::middleware::auth::{HasValidSession, HashSessionToken};
|
||||
use crate::util::serde::{display_serializable, IoFormat};
|
||||
use crate::{Error, ResultExt};
|
||||
|
||||
#[instrument(skip(ctx, ws_fut))]
|
||||
#[instrument(skip(ctx, session, ws_fut))]
|
||||
async fn ws_handler<
|
||||
WSFut: Future<Output = Result<Result<WebSocketStream<Upgraded>, HyperError>, JoinError>>,
|
||||
>(
|
||||
ctx: RpcContext,
|
||||
session: Option<(HasValidSession, HashSessionToken)>,
|
||||
ws_fut: WSFut,
|
||||
) -> Result<(), Error> {
|
||||
let (dump, sub) = ctx.db.dump_and_sub().await;
|
||||
@@ -43,50 +43,21 @@ async fn ws_handler<
|
||||
.with_kind(crate::ErrorKind::Network)?
|
||||
.with_kind(crate::ErrorKind::Unknown)?;
|
||||
|
||||
let (has_valid_session, token) = loop {
|
||||
if let Some(Message::Text(cookie)) = stream
|
||||
.next()
|
||||
if let Some((session, token)) = session {
|
||||
let kill = subscribe_to_session_kill(&ctx, token).await;
|
||||
send_dump(session, &mut stream, dump).await?;
|
||||
|
||||
deal_with_messages(session, kill, sub, stream).await?;
|
||||
} else {
|
||||
stream
|
||||
.close(Some(CloseFrame {
|
||||
code: CloseCode::Error,
|
||||
reason: "UNAUTHORIZED".into(),
|
||||
}))
|
||||
.await
|
||||
.transpose()
|
||||
.with_kind(crate::ErrorKind::Network)?
|
||||
{
|
||||
let cookie_str = serde_json::from_str::<Cow<str>>(&cookie)
|
||||
.with_kind(crate::ErrorKind::Deserialization)?;
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
}
|
||||
|
||||
let id = basic_cookies::Cookie::parse(&cookie_str)
|
||||
.with_kind(crate::ErrorKind::Authorization)?
|
||||
.into_iter()
|
||||
.find(|c| c.get_name() == "session")
|
||||
.ok_or_else(|| {
|
||||
Error::new(eyre!("UNAUTHORIZED"), crate::ErrorKind::Authorization)
|
||||
})?;
|
||||
let authenticated_session = HashSessionToken::from_cookie(&id);
|
||||
match HasValidSession::from_session(&authenticated_session, &ctx).await {
|
||||
Err(e) => {
|
||||
stream
|
||||
.send(Message::Text(
|
||||
serde_json::to_string(
|
||||
&RpcResponse::<GenericRpcMethod<String>>::from_result(Err::<
|
||||
_,
|
||||
RpcError,
|
||||
>(
|
||||
e.into()
|
||||
)),
|
||||
)
|
||||
.with_kind(crate::ErrorKind::Serialization)?,
|
||||
))
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
return Ok(());
|
||||
}
|
||||
Ok(has_validation) => break (has_validation, authenticated_session),
|
||||
}
|
||||
}
|
||||
};
|
||||
let kill = subscribe_to_session_kill(&ctx, token).await;
|
||||
send_dump(has_valid_session, &mut stream, dump).await?;
|
||||
|
||||
deal_with_messages(has_valid_session, kill, sub, stream).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -115,39 +86,25 @@ async fn deal_with_messages(
|
||||
futures::select! {
|
||||
_ = (&mut kill).fuse() => {
|
||||
tracing::info!("Closing WebSocket: Reason: Session Terminated");
|
||||
stream
|
||||
.close(Some(CloseFrame {
|
||||
code: CloseCode::Error,
|
||||
reason: "UNAUTHORIZED".into(),
|
||||
}))
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
return Ok(())
|
||||
}
|
||||
new_rev = sub.recv().fuse() => {
|
||||
let rev = new_rev.with_kind(crate::ErrorKind::Database)?;
|
||||
stream
|
||||
.send(Message::Text(
|
||||
serde_json::to_string(
|
||||
&RpcResponse::<GenericRpcMethod<String>>::from_result(Ok::<_, RpcError>(
|
||||
serde_json::to_value(&rev).with_kind(crate::ErrorKind::Serialization)?,
|
||||
)),
|
||||
)
|
||||
.with_kind(crate::ErrorKind::Serialization)?,
|
||||
))
|
||||
.send(Message::Text(serde_json::to_string(&rev).with_kind(crate::ErrorKind::Serialization)?))
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
}
|
||||
message = stream.next().fuse() => {
|
||||
let message = message.transpose().with_kind(crate::ErrorKind::Network)?;
|
||||
match message {
|
||||
Some(Message::Ping(a)) => {
|
||||
stream
|
||||
.send(Message::Pong(a))
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
}
|
||||
Some(Message::Close(frame)) => {
|
||||
if let Some(reason) = frame.as_ref() {
|
||||
tracing::info!("Closing WebSocket: Reason: {} {}", reason.code, reason.reason);
|
||||
} else {
|
||||
tracing::info!("Closing WebSocket: Reason: Unknown");
|
||||
}
|
||||
return Ok(())
|
||||
}
|
||||
None => {
|
||||
tracing::info!("Closing WebSocket: Stream Finished");
|
||||
return Ok(())
|
||||
@@ -155,12 +112,6 @@ async fn deal_with_messages(
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
|
||||
stream
|
||||
.send(Message::Ping(Vec::new()))
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,13 +123,7 @@ async fn send_dump(
|
||||
) -> Result<(), Error> {
|
||||
stream
|
||||
.send(Message::Text(
|
||||
serde_json::to_string(&RpcResponse::<GenericRpcMethod<String>>::from_result(Ok::<
|
||||
_,
|
||||
RpcError,
|
||||
>(
|
||||
serde_json::to_value(&dump).with_kind(crate::ErrorKind::Serialization)?,
|
||||
)))
|
||||
.with_kind(crate::ErrorKind::Serialization)?,
|
||||
serde_json::to_string(&dump).with_kind(crate::ErrorKind::Serialization)?,
|
||||
))
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Network)?;
|
||||
@@ -187,11 +132,27 @@ async fn send_dump(
|
||||
|
||||
pub async fn subscribe(ctx: RpcContext, req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
let (parts, body) = req.into_parts();
|
||||
let session = match async {
|
||||
let token = HashSessionToken::from_request_parts(&parts)?;
|
||||
let session = HasValidSession::from_session(&token, &ctx).await?;
|
||||
Ok::<_, Error>((session, token))
|
||||
}
|
||||
.await
|
||||
{
|
||||
Ok(a) => Some(a),
|
||||
Err(e) => {
|
||||
if e.kind != crate::ErrorKind::Authorization {
|
||||
tracing::error!("Error Authenticating Websocket: {}", e);
|
||||
tracing::debug!("{:?}", e);
|
||||
}
|
||||
None
|
||||
}
|
||||
};
|
||||
let req = Request::from_parts(parts, body);
|
||||
let (res, ws_fut) = hyper_ws_listener::create_ws(req).with_kind(crate::ErrorKind::Network)?;
|
||||
if let Some(ws_fut) = ws_fut {
|
||||
tokio::task::spawn(async move {
|
||||
match ws_handler(ctx, ws_fut).await {
|
||||
match ws_handler(ctx, session, ws_fut).await {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
tracing::error!("WebSocket Closed: {}", e);
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::process::Stdio;
|
||||
use std::time::{Duration, UNIX_EPOCH};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use color_eyre::eyre::eyre;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::Stream;
|
||||
use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
|
||||
use futures::{FutureExt, SinkExt, Stream, StreamExt, TryStreamExt};
|
||||
use hyper::upgrade::Upgraded;
|
||||
use hyper::Error as HyperError;
|
||||
use rpc_toolkit::command;
|
||||
@@ -30,7 +28,8 @@ use crate::core::rpc_continuations::{RequestGuid, RpcContinuation};
|
||||
use crate::error::ResultExt;
|
||||
use crate::procedure::docker::DockerProcedure;
|
||||
use crate::s9pk::manifest::PackageId;
|
||||
use crate::util::{display_none, serde::Reversible};
|
||||
use crate::util::display_none;
|
||||
use crate::util::serde::Reversible;
|
||||
use crate::{Error, ErrorKind};
|
||||
|
||||
#[pin_project::pin_project]
|
||||
|
||||
@@ -12,7 +12,9 @@ use rpc_toolkit::command_helpers::prelude::RequestParts;
|
||||
use rpc_toolkit::hyper::header::COOKIE;
|
||||
use rpc_toolkit::hyper::http::Error as HttpError;
|
||||
use rpc_toolkit::hyper::{Body, Request, Response};
|
||||
use rpc_toolkit::rpc_server_helpers::{noop3, to_response, DynMiddleware, DynMiddlewareStage2};
|
||||
use rpc_toolkit::rpc_server_helpers::{
|
||||
noop4, to_response, DynMiddleware, DynMiddlewareStage2, DynMiddlewareStage3,
|
||||
};
|
||||
use rpc_toolkit::yajrc::RpcMethod;
|
||||
use rpc_toolkit::Metadata;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -198,8 +200,7 @@ pub fn auth<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
|
||||
|_| StatusCode::OK,
|
||||
)?));
|
||||
} else if rpc_req.method.as_str() == "auth.login" {
|
||||
let mut guard = rate_limiter.lock().await;
|
||||
guard.0 += 1;
|
||||
let guard = rate_limiter.lock().await;
|
||||
if guard.1.elapsed() < Duration::from_secs(20) {
|
||||
if guard.0 >= 3 {
|
||||
let (res_parts, _) = Response::new(()).into_parts();
|
||||
@@ -216,13 +217,25 @@ pub fn auth<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
|
||||
|_| StatusCode::OK,
|
||||
)?));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let m3: DynMiddlewareStage3 = Box::new(move |_, res| {
|
||||
async move {
|
||||
let mut guard = rate_limiter.lock().await;
|
||||
if guard.1.elapsed() < Duration::from_secs(20) {
|
||||
if res.is_err() {
|
||||
guard.0 += 1;
|
||||
}
|
||||
} else {
|
||||
guard.0 = 0;
|
||||
}
|
||||
guard.1 = Instant::now();
|
||||
Ok(Ok(noop4()))
|
||||
}
|
||||
}
|
||||
Ok(Ok(noop3()))
|
||||
.boxed()
|
||||
});
|
||||
Ok(Ok(m3))
|
||||
}
|
||||
.boxed()
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user