From 6505c4054fc2d18b553155dd9561ea33dab99305 Mon Sep 17 00:00:00 2001 From: Stephen Chavez Date: Thu, 18 Aug 2022 10:22:51 -0600 Subject: [PATCH] Feat: HttpReader (#1733) * The start of implementing http_reader, issue #1644 * structure done * v1 of http_reader * first http_header poc * Add AsyncSeek trait * remove import * move import * fix typo * add error checking for async seek * fix handling of positions * rename variables * Update backend/src/util/http_reader.rs Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> * clean up work * improve error handling, and change if statement. Co-authored-by: Stephen Chavez Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> --- backend/src/error.rs | 10 ++ backend/src/util/http_reader.rs | 300 ++++++++++++++++++++++++++++++++ backend/src/util/mod.rs | 1 + system-images/compat/Cargo.lock | 157 ++++++----------- 4 files changed, 365 insertions(+), 103 deletions(-) create mode 100644 backend/src/util/http_reader.rs diff --git a/backend/src/error.rs b/backend/src/error.rs index 3cc39a458..82b14dcac 100644 --- a/backend/src/error.rs +++ b/backend/src/error.rs @@ -67,6 +67,10 @@ pub enum ErrorKind { LanPortConflict = 58, Javascript = 59, Pem = 60, + TLSInit = 61, + HttpRange = 62, + ContentLength = 63, + BytesError = 64 } impl ErrorKind { pub fn as_str(&self) -> &'static str { @@ -132,6 +136,10 @@ impl ErrorKind { LanPortConflict => "Incompatible LAN Port Configuration", Javascript => "Javascript Engine Error", Pem => "PEM Encoding Error", + TLSInit => "TLS Backend Initialize Error", + HttpRange => "No Support for Web Server HTTP Ranges", + ContentLength => "Request has no content length header", + BytesError => "Could not get the bytes for this request" } } } @@ -236,6 +244,8 @@ impl From for Error { fn from(e: openssl::error::ErrorStack) -> Self { Error::new(eyre!("OpenSSL ERROR:\n{}", e), ErrorKind::OpenSsl) } + + } impl From for RpcError { fn from(e: Error) -> Self { diff --git a/backend/src/util/http_reader.rs b/backend/src/util/http_reader.rs new file mode 100644 index 000000000..2ed64a9c1 --- /dev/null +++ b/backend/src/util/http_reader.rs @@ -0,0 +1,300 @@ +use std::cmp::min; +use std::convert::TryFrom; +use std::fmt::Display; +use std::io::Error as StdIOError; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use color_eyre::eyre::eyre; +use futures::future::BoxFuture; +use futures::FutureExt; +use http::header::{ACCEPT_RANGES, CONTENT_LENGTH, RANGE}; +use pin_project::pin_project; +use reqwest::{Client, Url}; +use tokio::io::{AsyncRead, AsyncSeek}; +use tracing::trace; + +use crate::{Error, ResultExt}; + +#[pin_project] +pub struct HttpReader { + http_url: Url, + cursor_pos: usize, + http_client: Client, + total_bytes: usize, + range_unit: Option, + read_in_progress: Option, Error>>>, +} + +// If we want to add support for units other than Accept-Ranges: bytes, we can use this enum +#[derive(Clone, Copy)] +enum RangeUnit { + Bytes, +} + +impl Display for RangeUnit { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RangeUnit::Bytes => write!(f, "bytes"), + } + } +} + +impl HttpReader { + pub async fn new(http_url: Url) -> Result { + let http_client = Client::builder() + .build() + .with_kind(crate::ErrorKind::TLSInit)?; + + // Make a head request so that we can get the file size and check for http range support. + let head_request = http_client + .head(http_url.clone()) + .send() + .await + .with_kind(crate::ErrorKind::InvalidRequest)?; + + let accept_ranges = head_request.headers().get(ACCEPT_RANGES); + + let range_unit = match accept_ranges { + Some(range_type) => { + // as per rfc, header will contain data but not always UTF8 characters. + + let value = range_type + .to_str() + .map_err(|err| Error::new(err, crate::ErrorKind::Utf8))?; + + match value { + "bytes" => Some(RangeUnit::Bytes), + _ => { + return Err(Error::new( + eyre!( + "{} HTTP range downloading not supported with this unit {value}", + http_url + ), + crate::ErrorKind::HttpRange, + )); + } + } + } + + // None can mean just get entire contents, but we currently error out. + None => { + return Err(Error::new( + eyre!( + "{} HTTP range downloading not supported with this url", + http_url + ), + crate::ErrorKind::HttpRange, + )) + } + }; + + let total_bytes_option = head_request.headers().get(CONTENT_LENGTH); + + let total_bytes = match total_bytes_option { + Some(bytes) => bytes + .to_str() + .map_err(|err| Error::new(err, crate::ErrorKind::Utf8))? + .parse::()?, + None => { + return Err(Error::new( + eyre!("No content length headers for {}", http_url), + crate::ErrorKind::ContentLength, + )) + } + }; + + Ok(HttpReader { + http_url, + cursor_pos: 0, + http_client, + total_bytes, + range_unit, + read_in_progress: None, + }) + } + + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests + async fn get_range( + range_unit: Option, + http_client: Client, + http_url: Url, + start: usize, + len: usize, + total_bytes: usize, + ) -> Result, Error> { + let mut data = Vec::with_capacity(len); + + let end = min(start + len, total_bytes) - 1; + + if start > end { + return Ok(data); + } + + match range_unit { + Some(unit) => { + let data_range = format!("{}={}-{} ", unit, start, end); + trace!("get range alive? {}", data_range); + + let data_resp = http_client + .get(http_url) + .header(RANGE, data_range) + .send() + .await + .with_kind(crate::ErrorKind::InvalidRequest)?; + + let status_code = data_resp.status(); + //let data_res = data_resp.bytes().await; + if status_code.is_success() { + data = data_resp + .bytes() + .await + .with_kind(crate::ErrorKind::BytesError)? + .to_vec(); + } + } + + None => unreachable!(), + } + + Ok(data) + } +} + +impl AsyncRead for HttpReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + + let mut fut = if let Some(fut) = this.read_in_progress.take() { + fut + } else { + HttpReader::get_range( + *this.range_unit, + this.http_client.clone(), + this.http_url.clone(), + *this.cursor_pos, + buf.remaining(), + *this.total_bytes, + ) + .boxed() + }; + + let res_poll = fut.as_mut().poll(cx); + trace!("Polled with remaining bytes in buf: {}", buf.remaining()); + + match res_poll { + Poll::Ready(result) => match result { + Ok(data_chunk) => { + trace!("data chunk: len: {}", data_chunk.len()); + trace!("buf filled len: {}", buf.filled().len()); + + if data_chunk.len() <= buf.remaining() { + buf.put_slice(&data_chunk); + *this.cursor_pos += data_chunk.len(); + + Poll::Ready(Ok(())) + } else { + buf.put_slice(&data_chunk); + + Poll::Ready(Ok(())) + } + } + Err(err) => Poll::Ready(Err(StdIOError::new( + std::io::ErrorKind::Interrupted, + Box::::from(err.source), + ))), + }, + Poll::Pending => { + *this.read_in_progress = Some(fut); + + Poll::Pending + } + } + } +} + +impl AsyncSeek for HttpReader { + fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { + let this = self.project(); + + match position { + std::io::SeekFrom::Start(offset) => { + let pos_res = usize::try_from(offset); + + match pos_res { + Ok(pos) => { + if pos > *this.total_bytes { + StdIOError::new( + std::io::ErrorKind::InvalidInput, + format!( + "The offset: {} cannot be greater than {} bytes", + pos, *this.total_bytes + ), + ); + } + *this.cursor_pos = pos; + } + Err(err) => return Err(StdIOError::new(std::io::ErrorKind::InvalidInput, err)), + } + Ok(()) + } + std::io::SeekFrom::Current(offset) => { + // We explicitly check if we read before byte 0. + let new_pos = i64::try_from(*this.cursor_pos) + .map_err(|err| StdIOError::new(std::io::ErrorKind::InvalidInput, err))? + + offset; + + if new_pos < 0 { + return Err(StdIOError::new( + std::io::ErrorKind::InvalidInput, + "Can't read before byte 0", + )); + } + + *this.cursor_pos = new_pos as usize; + Ok(()) + } + + std::io::SeekFrom::End(offset) => { + // We explicitly check if we read before byte 0. + let new_pos = i64::try_from(*this.total_bytes) + .map_err(|err| StdIOError::new(std::io::ErrorKind::InvalidInput, err))? + + offset; + + if new_pos < 0 { + return Err(StdIOError::new( + std::io::ErrorKind::InvalidInput, + "Can't read before byte 0", + )); + } + + *this.cursor_pos = new_pos as usize; + Ok(()) + } + } + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(self.cursor_pos as u64)) + } +} + +#[tokio::test] +async fn main_test() { + use tokio::io::AsyncReadExt; + let http_url = Url::parse("https://start9.com/latest/_static/css/main.css").unwrap(); + + println!("Getting this resource: {}", http_url); + let mut test_reader = HttpReader::new(http_url).await.unwrap(); + + let mut buf = vec![0; test_reader.total_bytes]; + let bytes_read = test_reader.read(&mut buf).await.unwrap(); + + println!("bytes read: {}", bytes_read); + + //println!("{}", String::from_utf8(buf).unwrap()); +} diff --git a/backend/src/util/mod.rs b/backend/src/util/mod.rs index 48fbe38b8..abb637df0 100644 --- a/backend/src/util/mod.rs +++ b/backend/src/util/mod.rs @@ -25,6 +25,7 @@ use crate::shutdown::Shutdown; use crate::{Error, ErrorKind, ResultExt as _}; pub mod config; pub mod io; +pub mod http_reader; pub mod logger; pub mod serde; diff --git a/system-images/compat/Cargo.lock b/system-images/compat/Cargo.lock index ccb5de1f2..a5db73869 100644 --- a/system-images/compat/Cargo.lock +++ b/system-images/compat/Cargo.lock @@ -331,7 +331,7 @@ checksum = "d82e7850583ead5f8bbef247e2a3c37a19bd576e8420cd262a6711921827e1e5" dependencies = [ "base64", "bollard-stubs", - "bytes 1.1.0", + "bytes", "futures-core", "futures-util", "hex", @@ -339,13 +339,13 @@ dependencies = [ "hyper", "hyperlocal", "log", - "pin-project-lite 0.2.9", + "pin-project-lite", "serde", "serde_derive", "serde_json", "serde_urlencoded", "thiserror", - "tokio 1.19.2", + "tokio", "tokio-util", "url", "winapi", @@ -391,12 +391,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "bytes" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" - [[package]] name = "bytes" version = "1.1.0" @@ -547,7 +541,7 @@ dependencies = [ "lazy_static", "linear-map", "log", - "nix 0.24.1", + "nix 0.24.2", "pest", "pest_derive", "rand 0.8.5", @@ -971,7 +965,7 @@ dependencies = [ "libc", "log", "models", - "nix 0.24.1", + "nix 0.24.2", "nom 7.1.1", "num", "num_enum", @@ -1004,8 +998,7 @@ dependencies = [ "stderrlog", "tar", "thiserror", - "tokio 1.19.2", - "tokio-compat-02", + "tokio", "tokio-stream", "tokio-tar", "tokio-tungstenite", @@ -1143,11 +1136,11 @@ dependencies = [ [[package]] name = "fd-lock-rs" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a24e88458a5abfcd1cb81622511306f1aea43b900ddb9b34ff5ad8857a7685" +checksum = "ef0f547e1d79e058664f2ea7d3a6d82b2ddd5fea4a6650b97b70c38979f34db3" dependencies = [ - "nix 0.19.1", + "nix 0.24.2", ] [[package]] @@ -1321,7 +1314,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.9", + "pin-project-lite", "pin-utils", "slab", ] @@ -1401,7 +1394,7 @@ version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" dependencies = [ - "bytes 1.1.0", + "bytes", "fnv", "futures-core", "futures-sink", @@ -1409,7 +1402,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio 1.19.2", + "tokio", "tokio-util", "tracing", ] @@ -1466,9 +1459,11 @@ dependencies = [ name = "helpers" version = "0.1.0" dependencies = [ + "color-eyre", + "futures", "models", "pin-project", - "tokio 1.19.2", + "tokio", ] [[package]] @@ -1511,7 +1506,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ - "bytes 1.1.0", + "bytes", "fnv", "itoa 1.0.2", ] @@ -1522,9 +1517,9 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes 1.1.0", + "bytes", "http", - "pin-project-lite 0.2.9", + "pin-project-lite", ] [[package]] @@ -1551,7 +1546,7 @@ version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ - "bytes 1.1.0", + "bytes", "futures-channel", "futures-core", "futures-util", @@ -1561,9 +1556,9 @@ dependencies = [ "httparse", "httpdate", "itoa 1.0.2", - "pin-project-lite 0.2.9", + "pin-project-lite", "socket2", - "tokio 1.19.2", + "tokio", "tower-service", "tracing", "want", @@ -1575,10 +1570,10 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.1.0", + "bytes", "hyper", "native-tls", - "tokio 1.19.2", + "tokio", "tokio-native-tls", ] @@ -1595,7 +1590,7 @@ dependencies = [ "hyper", "log", "sha-1 0.10.0", - "tokio 1.19.2", + "tokio", "tokio-tungstenite", ] @@ -1609,7 +1604,7 @@ dependencies = [ "hex", "hyper", "pin-project", - "tokio 1.19.2", + "tokio", ] [[package]] @@ -2018,18 +2013,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "nix" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2" -dependencies = [ - "bitflags", - "cc", - "cfg-if", - "libc", -] - [[package]] name = "nix" version = "0.23.1" @@ -2045,9 +2028,9 @@ dependencies = [ [[package]] name = "nix" -version = "0.24.1" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f17df307904acd05aa8e32e97bb20f2a0df1728bbc2d771ae8f9a90463441e9" +checksum = "195cdbc1741b8134346d515b3a56a1c94b0912758009cfd53f99ea0f57b065fc" dependencies = [ "bitflags", "cfg-if", @@ -2383,7 +2366,7 @@ dependencies = [ "serde_cbor 0.11.1", "serde_json", "thiserror", - "tokio 1.19.2", + "tokio", "tracing", "tracing-error 0.1.2", ] @@ -2522,12 +2505,6 @@ dependencies = [ "syn 1.0.98", ] -[[package]] -name = "pin-project-lite" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" - [[package]] name = "pin-project-lite" version = "0.2.9" @@ -2879,7 +2856,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92" dependencies = [ "base64", - "bytes 1.1.0", + "bytes", "cookie", "cookie_store", "encoding_rs", @@ -2897,12 +2874,12 @@ dependencies = [ "mime", "native-tls", "percent-encoding", - "pin-project-lite 0.2.9", + "pin-project-lite", "proc-macro-hack", "serde", "serde_json", "serde_urlencoded", - "tokio 1.19.2", + "tokio", "tokio-native-tls", "tokio-socks", "tokio-util", @@ -2920,7 +2897,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fc7d9a4063adcdfbb061aa3011b813cce95d3a8990f9ccb5ea85017916a1b0b" dependencies = [ - "bytes 1.1.0", + "bytes", "cookie", "cookie_store", "reqwest", @@ -2971,7 +2948,7 @@ dependencies = [ "serde_cbor 0.11.2", "serde_json", "thiserror", - "tokio 1.19.2", + "tokio", "url", "yajrc", ] @@ -3440,7 +3417,7 @@ dependencies = [ "atoi", "bitflags", "byteorder", - "bytes 1.1.0", + "bytes", "chrono", "crc", "crossbeam-queue", @@ -3506,7 +3483,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f" dependencies = [ "once_cell", - "tokio 1.19.2", + "tokio", "tokio-rustls", ] @@ -3774,52 +3751,26 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" -[[package]] -name = "tokio" -version = "0.2.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" -dependencies = [ - "bytes 0.5.6", - "num_cpus", - "pin-project-lite 0.1.12", - "slab", -] - [[package]] name = "tokio" version = "1.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" dependencies = [ - "bytes 1.1.0", + "bytes", "libc", "memchr", "mio", "num_cpus", "once_cell", "parking_lot 0.12.0", - "pin-project-lite 0.2.9", + "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", "winapi", ] -[[package]] -name = "tokio-compat-02" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7d4237822b7be8fff0a7a27927462fad435dcb6650f95cea9e946bf6bdc7e07" -dependencies = [ - "bytes 0.5.6", - "once_cell", - "pin-project-lite 0.2.9", - "tokio 0.2.25", - "tokio 1.19.2", - "tokio-stream", -] - [[package]] name = "tokio-macros" version = "1.7.0" @@ -3838,7 +3789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio 1.19.2", + "tokio", ] [[package]] @@ -3848,7 +3799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ "rustls", - "tokio 1.19.2", + "tokio", "webpki", ] @@ -3861,7 +3812,7 @@ dependencies = [ "either", "futures-util", "thiserror", - "tokio 1.19.2", + "tokio", ] [[package]] @@ -3871,8 +3822,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" dependencies = [ "futures-core", - "pin-project-lite 0.2.9", - "tokio 1.19.2", + "pin-project-lite", + "tokio", "tokio-util", ] @@ -3885,7 +3836,7 @@ dependencies = [ "futures-core", "libc", "redox_syscall 0.2.13", - "tokio 1.19.2", + "tokio", "tokio-stream", "xattr", ] @@ -3898,7 +3849,7 @@ checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae" dependencies = [ "futures-util", "log", - "tokio 1.19.2", + "tokio", "tungstenite", ] @@ -3908,11 +3859,11 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" dependencies = [ - "bytes 1.1.0", + "bytes", "futures-core", "futures-sink", - "pin-project-lite 0.2.9", - "tokio 1.19.2", + "pin-project-lite", + "tokio", "tracing", ] @@ -3942,7 +3893,7 @@ dependencies = [ "serde_derive", "sha2 0.9.9", "sha3", - "tokio 1.19.2", + "tokio", ] [[package]] @@ -3958,7 +3909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160" dependencies = [ "cfg-if", - "pin-project-lite 0.2.9", + "pin-project-lite", "tracing-attributes", "tracing-core", ] @@ -4079,7 +4030,7 @@ dependencies = [ "rand 0.8.5", "thiserror", "time 0.3.11", - "tokio 1.19.2", + "tokio", "trust-dns-proto", ] @@ -4104,7 +4055,7 @@ dependencies = [ "smallvec", "thiserror", "tinyvec", - "tokio 1.19.2", + "tokio", "url", ] @@ -4115,7 +4066,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a395a2e0fd8aac9b4613767a5b4ba4b2040de1b767fa03ace8c9d6f351d60b2d" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes", "cfg-if", "enum-as-inner", "env_logger", @@ -4125,7 +4076,7 @@ dependencies = [ "serde", "thiserror", "time 0.3.11", - "tokio 1.19.2", + "tokio", "toml", "trust-dns-client", "trust-dns-proto", @@ -4145,7 +4096,7 @@ checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" dependencies = [ "base64", "byteorder", - "bytes 1.1.0", + "bytes", "http", "httparse", "log",