diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 46d1abfaa..dfdb08d53 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -80,7 +80,7 @@ embassy_container_init = { path = "../libs/embassy_container_init" } hex = "0.4.3" hmac = "0.12.1" http = "0.2.8" -hyper = "0.14.20" +hyper = { version = "0.14.20", features = ["full"] } hyper-ws-listener = "0.2.0" imbl = "2.0.0" indexmap = { version = "1.9.1", features = ["serde"] } diff --git a/backend/src/net/mod.rs b/backend/src/net/mod.rs index a4e762391..613c530a8 100644 --- a/backend/src/net/mod.rs +++ b/backend/src/net/mod.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use futures::future::BoxFuture; -use hyper::{Body, Client, Error as HyperError, Request, Response}; +use hyper::{Body, Error as HyperError, Request, Response}; use indexmap::IndexSet; use rpc_toolkit::command; @@ -51,5 +51,3 @@ pub struct GeneratedCertificateMountPoint(()); pub type HttpHandler = Arc< dyn Fn(Request) -> BoxFuture<'static, Result, HyperError>> + Send + Sync, >; - -pub type HttpClient = Client; diff --git a/backend/src/net/net_utils.rs b/backend/src/net/net_utils.rs index 380ec5f9b..32bc846c7 100644 --- a/backend/src/net/net_utils.rs +++ b/backend/src/net/net_utils.rs @@ -126,3 +126,14 @@ impl TryFrom for ResourceFqdn { Self::from_str(&value.to_string()) } } + +pub fn is_upgrade_req(req: &Request) -> bool { + req.headers() + .get("connection") + .and_then(|c| c.to_str().ok()) + .map(|c| { + c.split(",") + .any(|c| c.trim().eq_ignore_ascii_case("upgrade")) + }) + .unwrap_or(false) +} diff --git a/backend/src/net/proxy_controller.rs b/backend/src/net/proxy_controller.rs index c166bd46d..3212c1225 100644 --- a/backend/src/net/proxy_controller.rs +++ b/backend/src/net/proxy_controller.rs @@ -5,20 +5,19 @@ use std::sync::Arc; use color_eyre::eyre::eyre; use futures::FutureExt; -use http::{Method, Request, Response}; -use hyper::upgrade::Upgraded; +use http::uri::{Authority, Scheme}; +use http::{Request, Response, Uri}; use hyper::{Body, Error as HyperError}; use models::{InterfaceId, PackageId}; use openssl::pkey::{PKey, Private}; use openssl::x509::X509; -use tokio::net::TcpStream; use tokio::sync::Mutex; -use tracing::{error, info, instrument}; +use tracing::{error, instrument}; -use crate::net::net_utils::ResourceFqdn; +use crate::net::net_utils::{is_upgrade_req, ResourceFqdn}; use crate::net::ssl::SslManager; use crate::net::vhost_controller::VHOSTController; -use crate::net::{HttpClient, HttpHandler, InterfaceMetadata, PackageNetInfo}; +use crate::net::{HttpHandler, InterfaceMetadata, PackageNetInfo}; use crate::{Error, ResultExt}; pub struct ProxyController { @@ -85,59 +84,43 @@ impl ProxyController { self.inner.lock().await.get_embassy_hostname() } - pub async fn proxy( - client: HttpClient, - req: Request, + async fn proxy( + client: &hyper::Client, + mut req: Request, + addr: SocketAddr, ) -> Result, HyperError> { - if Method::CONNECT == req.method() { - // Received an HTTP request like: - // ``` - // CONNECT www.domain.com:443 HTTP/1.1s - // Host: www.domain.com:443 - // Proxy-Connection: Keep-Alive - // ``` - // - // When HTTP method is CONNECT we should return an empty body - // then we can eventually upgrade the connection and talk a new protocol. - // - // Note: only after client received an empty body with STATUS_OK can the - // connection be upgraded, so we can't return a response inside - // `on_upgrade` future. + let mut uri = std::mem::take(req.uri_mut()).into_parts(); - tokio::task::spawn(async move { - let addr = req.uri().clone(); + uri.scheme = Some(Scheme::HTTP); + uri.authority = Authority::from_str(&addr.to_string()).ok(); + match Uri::from_parts(uri) { + Ok(uri) => *req.uri_mut() = uri, + Err(e) => error!("Error rewriting uri: {}", e), + } + let addr = dbg!(req.uri().to_string()); - match hyper::upgrade::on(req).await { - Ok(upgraded) => { - if let Err(e) = Self::tunnel(upgraded, addr.to_string()).await { - error!("server io error: {}", e); - } - } - Err(e) => error!("upgrade error: {}", e), + if is_upgrade_req(&req) { + let upgraded_req = hyper::upgrade::on(&mut req); + let mut res = client.request(req).await?; + let upgraded_res = hyper::upgrade::on(&mut res); + tokio::spawn(async move { + if let Err(e) = async { + let mut req = upgraded_req.await?; + let mut res = upgraded_res.await?; + tokio::io::copy_bidirectional(&mut req, &mut res).await?; + + Ok::<_, color_eyre::eyre::Report>(()) + } + .await + { + error!("error binding together tcp streams for {}: {}", addr, e); } }); - - Ok(Response::new(Body::empty())) + Ok(res) } else { client.request(req).await } } - - // Create a TCP connection to host:port, build a tunnel between the connection and - // the upgraded connection - async fn tunnel(mut upgraded: Upgraded, addr: String) -> std::io::Result<()> { - let mut server = TcpStream::connect(addr).await?; - - let (from_client, from_server) = - tokio::io::copy_bidirectional(&mut upgraded, &mut server).await?; - - info!( - "client wrote {} bytes and received {} bytes", - from_client, from_server - ); - - Ok(()) - } } struct ProxyControllerInner { ssl_manager: SslManager, @@ -263,7 +246,7 @@ impl ProxyControllerInner { .await?; let svc_handler = - Self::create_docker_handle(docker_ipv4.to_string(), lan_port_config.internal) + Self::create_docker_handle((docker_ipv4, lan_port_config.internal).into()) .await; self.add_handle( @@ -282,28 +265,12 @@ impl ProxyControllerInner { Ok(()) } - async fn create_docker_handle(internal_ip: String, port: u16) -> HttpHandler { - let svc_handler: HttpHandler = Arc::new(move |mut req| { - let proxy_addr = internal_ip.clone(); - async move { - let client = HttpClient::new(); - - let uri_string = format!( - "http://{}:{}{}", - proxy_addr, - port, - req.uri() - .path_and_query() - .map(|x| x.as_str()) - .unwrap_or("/") - ); - - let uri = uri_string.parse().unwrap(); - *req.uri_mut() = uri; - - ProxyController::proxy(client, req).await - } - .boxed() + async fn create_docker_handle(internal_addr: SocketAddr) -> HttpHandler { + let svc_handler: HttpHandler = Arc::new(move |req| { + let client = hyper::client::Client::builder() + .set_host(false) + .build_http(); + async move { ProxyController::proxy(&client, req, internal_addr).await }.boxed() }); svc_handler