From 234f0d75e86e79c9280336f9e291afbb01312f80 Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Thu, 20 Jul 2023 11:40:30 -0600 Subject: [PATCH] mute unexpected eof & protect against fd leaks (#2369) --- backend/src/net/vhost.rs | 18 ++++++-- backend/src/util/io.rs | 90 ++++++++++++++++++++++++++++++++++++---- backend/startd.service | 1 + 3 files changed, 98 insertions(+), 11 deletions(-) diff --git a/backend/src/net/vhost.rs b/backend/src/net/vhost.rs index c80f17e18..e8e754baf 100644 --- a/backend/src/net/vhost.rs +++ b/backend/src/net/vhost.rs @@ -3,6 +3,7 @@ use std::convert::Infallible; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::str::FromStr; use std::sync::{Arc, Weak}; +use std::time::Duration; use color_eyre::eyre::eyre; use helpers::NonDetachingJoinHandle; @@ -19,7 +20,7 @@ use tokio_rustls::{LazyConfigAcceptor, TlsConnector}; use crate::net::keys::Key; use crate::net::ssl::SslManager; use crate::net::utils::SingleAccept; -use crate::util::io::BackTrackingReader; +use crate::util::io::{BackTrackingReader, TimeoutStream}; use crate::Error; // not allowed: <=1024, >=32768, 5355, 5432, 9050, 6010, 9051, 5353 @@ -104,6 +105,8 @@ impl VHostServer { loop { match listener.accept().await { Ok((stream, _)) => { + let stream = + Box::pin(TimeoutStream::new(stream, Duration::from_secs(300))); let mut stream = BackTrackingReader::new(stream); stream.start_buffering(); let mapping = mapping.clone(); @@ -271,7 +274,7 @@ impl VHostServer { &mut tls_stream, &mut target_stream, ) - .await?; + .await } Err(AlpnInfo::Reflect) => { for proto in @@ -286,7 +289,7 @@ impl VHostServer { &mut tls_stream, &mut tcp_stream, ) - .await?; + .await } Err(AlpnInfo::Specified(alpn)) => { cfg.alpn_protocols = alpn; @@ -297,9 +300,16 @@ impl VHostServer { &mut tls_stream, &mut tcp_stream, ) - .await?; + .await } } + .map_or_else( + |e| match e.kind() { + std::io::ErrorKind::UnexpectedEof => Ok(()), + _ => Err(e), + }, + |_| Ok(()), + )?; } else { // 503 } diff --git a/backend/src/util/io.rs b/backend/src/util/io.rs index 3727598cf..ad831d14f 100644 --- a/backend/src/util/io.rs +++ b/backend/src/util/io.rs @@ -4,6 +4,7 @@ use std::os::unix::prelude::MetadataExt; use std::path::Path; use std::sync::atomic::AtomicU64; use std::task::Poll; +use std::time::Duration; use futures::future::{BoxFuture, Fuse}; use futures::{AsyncSeek, FutureExt, TryStreamExt}; @@ -12,6 +13,8 @@ use nix::unistd::{Gid, Uid}; use tokio::io::{ duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf, }; +use tokio::net::TcpStream; +use tokio::time::{Instant, Sleep}; use crate::ResultExt; @@ -520,13 +523,12 @@ pub fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + S let dst_path = dst_path.join(e.file_name()); if m.is_file() { let len = m.len(); - let mut dst_file = - &mut tokio::fs::File::create(&dst_path).await.with_ctx(|_| { - ( - crate::ErrorKind::Filesystem, - format!("create {}", dst_path.display()), - ) - })?; + let mut dst_file = tokio::fs::File::create(&dst_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + format!("create {}", dst_path.display()), + ) + })?; let mut rdr = tokio::fs::File::open(&src_path).await.with_ctx(|_| { ( crate::ErrorKind::Filesystem, @@ -592,3 +594,77 @@ pub fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + S } .boxed() } + +#[pin_project::pin_project] +pub struct TimeoutStream { + timeout: Duration, + #[pin] + sleep: Sleep, + #[pin] + stream: S, +} +impl TimeoutStream { + pub fn new(stream: S, timeout: Duration) -> Self { + Self { + timeout, + sleep: tokio::time::sleep(timeout), + stream, + } + } +} +impl AsyncRead for TimeoutStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + if let std::task::Poll::Ready(_) = this.sleep.as_mut().poll(cx) { + return std::task::Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "timed out", + ))); + } + let res = this.stream.poll_read(cx, buf); + if res.is_ready() { + this.sleep.reset(Instant::now() + *this.timeout); + } + res + } +} +impl AsyncWrite for TimeoutStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let mut this = self.project(); + let res = this.stream.poll_write(cx, buf); + if res.is_ready() { + this.sleep.reset(Instant::now() + *this.timeout); + } + res + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + let res = this.stream.poll_flush(cx); + if res.is_ready() { + this.sleep.reset(Instant::now() + *this.timeout); + } + res + } + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + let res = this.stream.poll_shutdown(cx); + if res.is_ready() { + this.sleep.reset(Instant::now() + *this.timeout); + } + res + } +} diff --git a/backend/startd.service b/backend/startd.service index 2bebd23b8..894298e54 100644 --- a/backend/startd.service +++ b/backend/startd.service @@ -13,6 +13,7 @@ RestartSec=3 ManagedOOMPreference=avoid CPUAccounting=true CPUWeight=1000 +LimitNOFILE=65536 [Install] WantedBy=multi-user.target