mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 02:11:53 +00:00
mute unexpected eof & protect against fd leaks (#2369)
This commit is contained in:
@@ -3,6 +3,7 @@ use std::convert::Infallible;
|
||||
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use color_eyre::eyre::eyre;
|
||||
use helpers::NonDetachingJoinHandle;
|
||||
@@ -19,7 +20,7 @@ use tokio_rustls::{LazyConfigAcceptor, TlsConnector};
|
||||
use crate::net::keys::Key;
|
||||
use crate::net::ssl::SslManager;
|
||||
use crate::net::utils::SingleAccept;
|
||||
use crate::util::io::BackTrackingReader;
|
||||
use crate::util::io::{BackTrackingReader, TimeoutStream};
|
||||
use crate::Error;
|
||||
|
||||
// not allowed: <=1024, >=32768, 5355, 5432, 9050, 6010, 9051, 5353
|
||||
@@ -104,6 +105,8 @@ impl VHostServer {
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((stream, _)) => {
|
||||
let stream =
|
||||
Box::pin(TimeoutStream::new(stream, Duration::from_secs(300)));
|
||||
let mut stream = BackTrackingReader::new(stream);
|
||||
stream.start_buffering();
|
||||
let mapping = mapping.clone();
|
||||
@@ -271,7 +274,7 @@ impl VHostServer {
|
||||
&mut tls_stream,
|
||||
&mut target_stream,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
}
|
||||
Err(AlpnInfo::Reflect) => {
|
||||
for proto in
|
||||
@@ -286,7 +289,7 @@ impl VHostServer {
|
||||
&mut tls_stream,
|
||||
&mut tcp_stream,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
}
|
||||
Err(AlpnInfo::Specified(alpn)) => {
|
||||
cfg.alpn_protocols = alpn;
|
||||
@@ -297,9 +300,16 @@ impl VHostServer {
|
||||
&mut tls_stream,
|
||||
&mut tcp_stream,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
}
|
||||
}
|
||||
.map_or_else(
|
||||
|e| match e.kind() {
|
||||
std::io::ErrorKind::UnexpectedEof => Ok(()),
|
||||
_ => Err(e),
|
||||
},
|
||||
|_| Ok(()),
|
||||
)?;
|
||||
} else {
|
||||
// 503
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::os::unix::prelude::MetadataExt;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::future::{BoxFuture, Fuse};
|
||||
use futures::{AsyncSeek, FutureExt, TryStreamExt};
|
||||
@@ -12,6 +13,8 @@ use nix::unistd::{Gid, Uid};
|
||||
use tokio::io::{
|
||||
duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf,
|
||||
};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::{Instant, Sleep};
|
||||
|
||||
use crate::ResultExt;
|
||||
|
||||
@@ -520,13 +523,12 @@ pub fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + S
|
||||
let dst_path = dst_path.join(e.file_name());
|
||||
if m.is_file() {
|
||||
let len = m.len();
|
||||
let mut dst_file =
|
||||
&mut tokio::fs::File::create(&dst_path).await.with_ctx(|_| {
|
||||
(
|
||||
crate::ErrorKind::Filesystem,
|
||||
format!("create {}", dst_path.display()),
|
||||
)
|
||||
})?;
|
||||
let mut dst_file = tokio::fs::File::create(&dst_path).await.with_ctx(|_| {
|
||||
(
|
||||
crate::ErrorKind::Filesystem,
|
||||
format!("create {}", dst_path.display()),
|
||||
)
|
||||
})?;
|
||||
let mut rdr = tokio::fs::File::open(&src_path).await.with_ctx(|_| {
|
||||
(
|
||||
crate::ErrorKind::Filesystem,
|
||||
@@ -592,3 +594,77 @@ pub fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + S
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct TimeoutStream<S: AsyncRead + AsyncWrite = TcpStream> {
|
||||
timeout: Duration,
|
||||
#[pin]
|
||||
sleep: Sleep,
|
||||
#[pin]
|
||||
stream: S,
|
||||
}
|
||||
impl<S: AsyncRead + AsyncWrite> TimeoutStream<S> {
|
||||
pub fn new(stream: S, timeout: Duration) -> Self {
|
||||
Self {
|
||||
timeout,
|
||||
sleep: tokio::time::sleep(timeout),
|
||||
stream,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<S: AsyncRead + AsyncWrite> AsyncRead for TimeoutStream<S> {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let mut this = self.project();
|
||||
if let std::task::Poll::Ready(_) = this.sleep.as_mut().poll(cx) {
|
||||
return std::task::Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"timed out",
|
||||
)));
|
||||
}
|
||||
let res = this.stream.poll_read(cx, buf);
|
||||
if res.is_ready() {
|
||||
this.sleep.reset(Instant::now() + *this.timeout);
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
impl<S: AsyncRead + AsyncWrite> AsyncWrite for TimeoutStream<S> {
|
||||
fn poll_write(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
||||
let mut this = self.project();
|
||||
let res = this.stream.poll_write(cx, buf);
|
||||
if res.is_ready() {
|
||||
this.sleep.reset(Instant::now() + *this.timeout);
|
||||
}
|
||||
res
|
||||
}
|
||||
fn poll_flush(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||
let mut this = self.project();
|
||||
let res = this.stream.poll_flush(cx);
|
||||
if res.is_ready() {
|
||||
this.sleep.reset(Instant::now() + *this.timeout);
|
||||
}
|
||||
res
|
||||
}
|
||||
fn poll_shutdown(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||
let mut this = self.project();
|
||||
let res = this.stream.poll_shutdown(cx);
|
||||
if res.is_ready() {
|
||||
this.sleep.reset(Instant::now() + *this.timeout);
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ RestartSec=3
|
||||
ManagedOOMPreference=avoid
|
||||
CPUAccounting=true
|
||||
CPUWeight=1000
|
||||
LimitNOFILE=65536
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
Reference in New Issue
Block a user