Bugfix/patch db subscriber (#2652)

* fix socket sending empty patches

* do not timeout tcp connections, just poll them more

* switch from poll to tcp keepalive
This commit is contained in:
Aiden McClelland
2024-06-24 16:15:56 -06:00
committed by GitHub
parent 2c255b6dfe
commit 9da49be44d
7 changed files with 83 additions and 64 deletions

2
core/Cargo.lock generated
View File

@@ -4881,6 +4881,7 @@ dependencies = [
"hmac", "hmac",
"http 1.1.0", "http 1.1.0",
"http-body-util", "http-body-util",
"hyper-util",
"id-pool", "id-pool",
"imbl", "imbl",
"imbl-value", "imbl-value",
@@ -4936,6 +4937,7 @@ dependencies = [
"sha2 0.10.8", "sha2 0.10.8",
"shell-words", "shell-words",
"simple-logging", "simple-logging",
"socket2",
"sqlx", "sqlx",
"sscanf", "sscanf",
"ssh-key", "ssh-key",

View File

@@ -97,6 +97,7 @@ hex = "0.4.3"
hmac = "0.12.1" hmac = "0.12.1"
http = "1.0.0" http = "1.0.0"
http-body-util = "0.1" http-body-util = "0.1"
hyper-util = { version = "0.1.5", features = ["tokio", "service"] }
id-pool = { version = "0.2.2", default-features = false, features = [ id-pool = { version = "0.2.2", default-features = false, features = [
"serde", "serde",
"u16", "u16",
@@ -159,6 +160,7 @@ serde_yaml = { package = "serde_yml", version = "0.0.10" }
sha2 = "0.10.2" sha2 = "0.10.2"
shell-words = "1" shell-words = "1"
simple-logging = "2.0.2" simple-logging = "2.0.2"
socket2 = "0.5.7"
sqlx = { version = "0.7.2", features = [ sqlx = { version = "0.7.2", features = [
"chrono", "chrono",
"runtime-tokio-rustls", "runtime-tokio-rustls",

View File

@@ -112,24 +112,6 @@ pub async fn find_eth_iface() -> Result<String, Error> {
)) ))
} }
#[pin_project::pin_project]
pub struct SingleAccept<T>(Option<T>);
impl<T> SingleAccept<T> {
pub fn new(conn: T) -> Self {
Self(Some(conn))
}
}
// impl<T> axum_server::accept::Accept for SingleAccept<T> {
// type Conn = T;
// type Error = Infallible;
// fn poll_accept(
// self: std::pin::Pin<&mut Self>,
// _cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<Option<Result<Self::Conn, Self::Error>>> {
// std::task::Poll::Ready(self.project().0.take().map(Ok))
// }
// }
pub struct TcpListeners { pub struct TcpListeners {
listeners: Vec<TcpListener>, listeners: Vec<TcpListener>,
} }

View File

@@ -1,10 +1,15 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::str::FromStr;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::Duration; use std::time::Duration;
use axum::body::Body;
use axum::extract::Request;
use axum::response::Response;
use color_eyre::eyre::eyre; use color_eyre::eyre::eyre;
use helpers::NonDetachingJoinHandle; use helpers::NonDetachingJoinHandle;
use http::Uri;
use imbl_value::InternedString; use imbl_value::InternedString;
use models::ResultExt; use models::ResultExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -20,8 +25,9 @@ use tracing::instrument;
use ts_rs::TS; use ts_rs::TS;
use crate::db::model::Database; use crate::db::model::Database;
use crate::net::static_server::server_error;
use crate::prelude::*; use crate::prelude::*;
use crate::util::io::{BackTrackingReader, TimeoutStream}; use crate::util::io::BackTrackingReader;
use crate::util::serde::MaybeUtf8String; use crate::util::serde::MaybeUtf8String;
// not allowed: <=1024, >=32768, 5355, 5432, 9050, 6010, 9051, 5353 // not allowed: <=1024, >=32768, 5355, 5432, 9050, 6010, 9051, 5353
@@ -113,8 +119,16 @@ impl VHostServer {
loop { loop {
match listener.accept().await { match listener.accept().await {
Ok((stream, _)) => { Ok((stream, _)) => {
let stream = if let Err(e) = socket2::SockRef::from(&stream).set_tcp_keepalive(
Box::pin(TimeoutStream::new(stream, Duration::from_secs(300))); &socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(900))
.with_interval(Duration::from_secs(60))
.with_retries(5),
) {
tracing::error!("Failed to set tcp keepalive: {e}");
tracing::debug!("{e:?}");
}
let mut stream = BackTrackingReader::new(stream); let mut stream = BackTrackingReader::new(stream);
stream.start_buffering(); stream.start_buffering();
let mapping = mapping.clone(); let mapping = mapping.clone();
@@ -129,38 +143,39 @@ impl VHostServer {
{ {
Ok(a) => a, Ok(a) => a,
Err(_) => { Err(_) => {
// stream.rewind(); stream.rewind();
// return hyper::server::Server::builder( return hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
// SingleAccept::new(stream), .serve_connection(
// ) hyper_util::rt::TokioIo::new(stream),
// .serve(make_service_fn(|_| async { hyper_util::service::TowerToHyperService::new(axum::Router::new().fallback(
// Ok::<_, Infallible>(service_fn(|req| async move { axum::routing::method_routing::any(move |req: Request| async move {
// let host = req match async move {
// .headers() let host = req
// .get(http::header::HOST) .headers()
// .and_then(|host| host.to_str().ok()); .get(http::header::HOST)
// let uri = Uri::from_parts({ .and_then(|host| host.to_str().ok());
// let mut parts = let uri = Uri::from_parts({
// req.uri().to_owned().into_parts(); let mut parts = req.uri().to_owned().into_parts();
// parts.authority = host parts.authority = host.map(FromStr::from_str).transpose()?;
// .map(FromStr::from_str) parts
// .transpose()?; })?;
// parts Response::builder()
// })?; .status(http::StatusCode::TEMPORARY_REDIRECT)
// Response::builder() .header(http::header::LOCATION, uri.to_string())
// .status( .body(Body::default())
// http::StatusCode::TEMPORARY_REDIRECT, }.await {
// ) Ok(a) => a,
// .header( Err(e) => {
// http::header::LOCATION, tracing::warn!("Error redirecting http request on ssl port: {e}");
// uri.to_string(), tracing::error!("{e:?}");
// ) server_error(Error::new(e, ErrorKind::Network))
// .body(Body::default()) }
// })) }
// })) }),
// .await )),
// .with_kind(crate::ErrorKind::Network); )
todo!() .await
.map_err(|e| Error::new(color_eyre::eyre::Report::msg(e), ErrorKind::Network));
} }
}; };
let target_name = let target_name =

View File

@@ -146,7 +146,7 @@ impl Manifest {
#[ts(export)] #[ts(export)]
pub struct HardwareRequirements { pub struct HardwareRequirements {
#[serde(default)] #[serde(default)]
#[ts(type = "{ [key: string]: string }")] #[ts(type = "{ [key: string]: string }")] // TODO more specific key
pub device: BTreeMap<String, Regex>, pub device: BTreeMap<String, Regex>,
#[ts(type = "number | null")] #[ts(type = "number | null")]
pub ram: Option<u64>, pub ram: Option<u64>,

View File

@@ -1,4 +1,4 @@
use std::collections::{BTreeSet, VecDeque}; use std::collections::VecDeque;
use std::future::Future; use std::future::Future;
use std::io::Cursor; use std::io::Cursor;
use std::os::unix::prelude::MetadataExt; use std::os::unix::prelude::MetadataExt;
@@ -706,16 +706,16 @@ impl<S: AsyncRead + AsyncWrite> AsyncRead for TimeoutStream<S> {
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> { ) -> std::task::Poll<std::io::Result<()>> {
let mut this = self.project(); let mut this = self.project();
if let std::task::Poll::Ready(_) = this.sleep.as_mut().poll(cx) { let timeout = this.sleep.as_mut().poll(cx);
let res = this.stream.poll_read(cx, buf);
if res.is_ready() {
this.sleep.reset(Instant::now() + *this.timeout);
} else if timeout.is_ready() {
return std::task::Poll::Ready(Err(std::io::Error::new( return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::TimedOut, std::io::ErrorKind::TimedOut,
"timed out", "timed out",
))); )));
} }
let res = this.stream.poll_read(cx, buf);
if res.is_ready() {
this.sleep.reset(Instant::now() + *this.timeout);
}
res res
} }
} }
@@ -725,10 +725,16 @@ impl<S: AsyncRead + AsyncWrite> AsyncWrite for TimeoutStream<S> {
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
buf: &[u8], buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> { ) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.project(); let mut this = self.project();
let timeout = this.sleep.as_mut().poll(cx);
let res = this.stream.poll_write(cx, buf); let res = this.stream.poll_write(cx, buf);
if res.is_ready() { if res.is_ready() {
this.sleep.reset(Instant::now() + *this.timeout); this.sleep.reset(Instant::now() + *this.timeout);
} else if timeout.is_ready() {
return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timed out",
)));
} }
res res
} }
@@ -736,10 +742,16 @@ impl<S: AsyncRead + AsyncWrite> AsyncWrite for TimeoutStream<S> {
self: std::pin::Pin<&mut Self>, self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> { ) -> std::task::Poll<Result<(), std::io::Error>> {
let this = self.project(); let mut this = self.project();
let timeout = this.sleep.as_mut().poll(cx);
let res = this.stream.poll_flush(cx); let res = this.stream.poll_flush(cx);
if res.is_ready() { if res.is_ready() {
this.sleep.reset(Instant::now() + *this.timeout); this.sleep.reset(Instant::now() + *this.timeout);
} else if timeout.is_ready() {
return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timed out",
)));
} }
res res
} }
@@ -747,10 +759,16 @@ impl<S: AsyncRead + AsyncWrite> AsyncWrite for TimeoutStream<S> {
self: std::pin::Pin<&mut Self>, self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> { ) -> std::task::Poll<Result<(), std::io::Error>> {
let this = self.project(); let mut this = self.project();
let timeout = this.sleep.as_mut().poll(cx);
let res = this.stream.poll_shutdown(cx); let res = this.stream.poll_shutdown(cx);
if res.is_ready() { if res.is_ready() {
this.sleep.reset(Instant::now() + *this.timeout); this.sleep.reset(Instant::now() + *this.timeout);
} else if timeout.is_ready() {
return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timed out",
)));
} }
res res
} }