Feat: HttpReader (#1733)

* The start of implementing http_reader, issue #1644

* structure done

* v1 of http_reader

* first http_header poc

* Add AsyncSeek trait

* remove import

* move import

* fix typo

* add error checking for async seek

* fix handling of positions

* rename variables

* Update backend/src/util/http_reader.rs

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>

* clean up work

* improve error handling, and change if statement.

Co-authored-by: Stephen Chavez <stephen@start9labs.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
This commit is contained in:
Stephen Chavez
2022-08-18 10:22:51 -06:00
committed by GitHub
parent e1c30a918b
commit 6505c4054f
4 changed files with 365 additions and 103 deletions

View File

@@ -67,6 +67,10 @@ pub enum ErrorKind {
LanPortConflict = 58,
Javascript = 59,
Pem = 60,
TLSInit = 61,
HttpRange = 62,
ContentLength = 63,
BytesError = 64
}
impl ErrorKind {
pub fn as_str(&self) -> &'static str {
@@ -132,6 +136,10 @@ impl ErrorKind {
LanPortConflict => "Incompatible LAN Port Configuration",
Javascript => "Javascript Engine Error",
Pem => "PEM Encoding Error",
TLSInit => "TLS Backend Initialize Error",
HttpRange => "No Support for Web Server HTTP Ranges",
ContentLength => "Request has no content length header",
BytesError => "Could not get the bytes for this request"
}
}
}
@@ -236,6 +244,8 @@ impl From<openssl::error::ErrorStack> for Error {
fn from(e: openssl::error::ErrorStack) -> Self {
Error::new(eyre!("OpenSSL ERROR:\n{}", e), ErrorKind::OpenSsl)
}
}
impl From<Error> for RpcError {
fn from(e: Error) -> Self {

View File

@@ -0,0 +1,300 @@
use std::cmp::min;
use std::convert::TryFrom;
use std::fmt::Display;
use std::io::Error as StdIOError;
use std::pin::Pin;
use std::task::{Context, Poll};
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::FutureExt;
use http::header::{ACCEPT_RANGES, CONTENT_LENGTH, RANGE};
use pin_project::pin_project;
use reqwest::{Client, Url};
use tokio::io::{AsyncRead, AsyncSeek};
use tracing::trace;
use crate::{Error, ResultExt};
#[pin_project]
pub struct HttpReader {
http_url: Url,
cursor_pos: usize,
http_client: Client,
total_bytes: usize,
range_unit: Option<RangeUnit>,
read_in_progress: Option<BoxFuture<'static, Result<Vec<u8>, Error>>>,
}
// If we want to add support for units other than Accept-Ranges: bytes, we can use this enum
#[derive(Clone, Copy)]
enum RangeUnit {
Bytes,
}
impl Display for RangeUnit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RangeUnit::Bytes => write!(f, "bytes"),
}
}
}
impl HttpReader {
pub async fn new(http_url: Url) -> Result<Self, Error> {
let http_client = Client::builder()
.build()
.with_kind(crate::ErrorKind::TLSInit)?;
// Make a head request so that we can get the file size and check for http range support.
let head_request = http_client
.head(http_url.clone())
.send()
.await
.with_kind(crate::ErrorKind::InvalidRequest)?;
let accept_ranges = head_request.headers().get(ACCEPT_RANGES);
let range_unit = match accept_ranges {
Some(range_type) => {
// as per rfc, header will contain data but not always UTF8 characters.
let value = range_type
.to_str()
.map_err(|err| Error::new(err, crate::ErrorKind::Utf8))?;
match value {
"bytes" => Some(RangeUnit::Bytes),
_ => {
return Err(Error::new(
eyre!(
"{} HTTP range downloading not supported with this unit {value}",
http_url
),
crate::ErrorKind::HttpRange,
));
}
}
}
// None can mean just get entire contents, but we currently error out.
None => {
return Err(Error::new(
eyre!(
"{} HTTP range downloading not supported with this url",
http_url
),
crate::ErrorKind::HttpRange,
))
}
};
let total_bytes_option = head_request.headers().get(CONTENT_LENGTH);
let total_bytes = match total_bytes_option {
Some(bytes) => bytes
.to_str()
.map_err(|err| Error::new(err, crate::ErrorKind::Utf8))?
.parse::<usize>()?,
None => {
return Err(Error::new(
eyre!("No content length headers for {}", http_url),
crate::ErrorKind::ContentLength,
))
}
};
Ok(HttpReader {
http_url,
cursor_pos: 0,
http_client,
total_bytes,
range_unit,
read_in_progress: None,
})
}
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
async fn get_range(
range_unit: Option<RangeUnit>,
http_client: Client,
http_url: Url,
start: usize,
len: usize,
total_bytes: usize,
) -> Result<Vec<u8>, Error> {
let mut data = Vec::with_capacity(len);
let end = min(start + len, total_bytes) - 1;
if start > end {
return Ok(data);
}
match range_unit {
Some(unit) => {
let data_range = format!("{}={}-{} ", unit, start, end);
trace!("get range alive? {}", data_range);
let data_resp = http_client
.get(http_url)
.header(RANGE, data_range)
.send()
.await
.with_kind(crate::ErrorKind::InvalidRequest)?;
let status_code = data_resp.status();
//let data_res = data_resp.bytes().await;
if status_code.is_success() {
data = data_resp
.bytes()
.await
.with_kind(crate::ErrorKind::BytesError)?
.to_vec();
}
}
None => unreachable!(),
}
Ok(data)
}
}
impl AsyncRead for HttpReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let this = self.project();
let mut fut = if let Some(fut) = this.read_in_progress.take() {
fut
} else {
HttpReader::get_range(
*this.range_unit,
this.http_client.clone(),
this.http_url.clone(),
*this.cursor_pos,
buf.remaining(),
*this.total_bytes,
)
.boxed()
};
let res_poll = fut.as_mut().poll(cx);
trace!("Polled with remaining bytes in buf: {}", buf.remaining());
match res_poll {
Poll::Ready(result) => match result {
Ok(data_chunk) => {
trace!("data chunk: len: {}", data_chunk.len());
trace!("buf filled len: {}", buf.filled().len());
if data_chunk.len() <= buf.remaining() {
buf.put_slice(&data_chunk);
*this.cursor_pos += data_chunk.len();
Poll::Ready(Ok(()))
} else {
buf.put_slice(&data_chunk);
Poll::Ready(Ok(()))
}
}
Err(err) => Poll::Ready(Err(StdIOError::new(
std::io::ErrorKind::Interrupted,
Box::<dyn std::error::Error + Send + Sync>::from(err.source),
))),
},
Poll::Pending => {
*this.read_in_progress = Some(fut);
Poll::Pending
}
}
}
}
impl AsyncSeek for HttpReader {
fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
let this = self.project();
match position {
std::io::SeekFrom::Start(offset) => {
let pos_res = usize::try_from(offset);
match pos_res {
Ok(pos) => {
if pos > *this.total_bytes {
StdIOError::new(
std::io::ErrorKind::InvalidInput,
format!(
"The offset: {} cannot be greater than {} bytes",
pos, *this.total_bytes
),
);
}
*this.cursor_pos = pos;
}
Err(err) => return Err(StdIOError::new(std::io::ErrorKind::InvalidInput, err)),
}
Ok(())
}
std::io::SeekFrom::Current(offset) => {
// We explicitly check if we read before byte 0.
let new_pos = i64::try_from(*this.cursor_pos)
.map_err(|err| StdIOError::new(std::io::ErrorKind::InvalidInput, err))?
+ offset;
if new_pos < 0 {
return Err(StdIOError::new(
std::io::ErrorKind::InvalidInput,
"Can't read before byte 0",
));
}
*this.cursor_pos = new_pos as usize;
Ok(())
}
std::io::SeekFrom::End(offset) => {
// We explicitly check if we read before byte 0.
let new_pos = i64::try_from(*this.total_bytes)
.map_err(|err| StdIOError::new(std::io::ErrorKind::InvalidInput, err))?
+ offset;
if new_pos < 0 {
return Err(StdIOError::new(
std::io::ErrorKind::InvalidInput,
"Can't read before byte 0",
));
}
*this.cursor_pos = new_pos as usize;
Ok(())
}
}
}
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
Poll::Ready(Ok(self.cursor_pos as u64))
}
}
#[tokio::test]
async fn main_test() {
use tokio::io::AsyncReadExt;
let http_url = Url::parse("https://start9.com/latest/_static/css/main.css").unwrap();
println!("Getting this resource: {}", http_url);
let mut test_reader = HttpReader::new(http_url).await.unwrap();
let mut buf = vec![0; test_reader.total_bytes];
let bytes_read = test_reader.read(&mut buf).await.unwrap();
println!("bytes read: {}", bytes_read);
//println!("{}", String::from_utf8(buf).unwrap());
}

View File

@@ -25,6 +25,7 @@ use crate::shutdown::Shutdown;
use crate::{Error, ErrorKind, ResultExt as _};
pub mod config;
pub mod io;
pub mod http_reader;
pub mod logger;
pub mod serde;

View File

@@ -331,7 +331,7 @@ checksum = "d82e7850583ead5f8bbef247e2a3c37a19bd576e8420cd262a6711921827e1e5"
dependencies = [
"base64",
"bollard-stubs",
"bytes 1.1.0",
"bytes",
"futures-core",
"futures-util",
"hex",
@@ -339,13 +339,13 @@ dependencies = [
"hyper",
"hyperlocal",
"log",
"pin-project-lite 0.2.9",
"pin-project-lite",
"serde",
"serde_derive",
"serde_json",
"serde_urlencoded",
"thiserror",
"tokio 1.19.2",
"tokio",
"tokio-util",
"url",
"winapi",
@@ -391,12 +391,6 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "1.1.0"
@@ -547,7 +541,7 @@ dependencies = [
"lazy_static",
"linear-map",
"log",
"nix 0.24.1",
"nix 0.24.2",
"pest",
"pest_derive",
"rand 0.8.5",
@@ -971,7 +965,7 @@ dependencies = [
"libc",
"log",
"models",
"nix 0.24.1",
"nix 0.24.2",
"nom 7.1.1",
"num",
"num_enum",
@@ -1004,8 +998,7 @@ dependencies = [
"stderrlog",
"tar",
"thiserror",
"tokio 1.19.2",
"tokio-compat-02",
"tokio",
"tokio-stream",
"tokio-tar",
"tokio-tungstenite",
@@ -1143,11 +1136,11 @@ dependencies = [
[[package]]
name = "fd-lock-rs"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a24e88458a5abfcd1cb81622511306f1aea43b900ddb9b34ff5ad8857a7685"
checksum = "ef0f547e1d79e058664f2ea7d3a6d82b2ddd5fea4a6650b97b70c38979f34db3"
dependencies = [
"nix 0.19.1",
"nix 0.24.2",
]
[[package]]
@@ -1321,7 +1314,7 @@ dependencies = [
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite 0.2.9",
"pin-project-lite",
"pin-utils",
"slab",
]
@@ -1401,7 +1394,7 @@ version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57"
dependencies = [
"bytes 1.1.0",
"bytes",
"fnv",
"futures-core",
"futures-sink",
@@ -1409,7 +1402,7 @@ dependencies = [
"http",
"indexmap",
"slab",
"tokio 1.19.2",
"tokio",
"tokio-util",
"tracing",
]
@@ -1466,9 +1459,11 @@ dependencies = [
name = "helpers"
version = "0.1.0"
dependencies = [
"color-eyre",
"futures",
"models",
"pin-project",
"tokio 1.19.2",
"tokio",
]
[[package]]
@@ -1511,7 +1506,7 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes 1.1.0",
"bytes",
"fnv",
"itoa 1.0.2",
]
@@ -1522,9 +1517,9 @@ version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes 1.1.0",
"bytes",
"http",
"pin-project-lite 0.2.9",
"pin-project-lite",
]
[[package]]
@@ -1551,7 +1546,7 @@ version = "0.14.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac"
dependencies = [
"bytes 1.1.0",
"bytes",
"futures-channel",
"futures-core",
"futures-util",
@@ -1561,9 +1556,9 @@ dependencies = [
"httparse",
"httpdate",
"itoa 1.0.2",
"pin-project-lite 0.2.9",
"pin-project-lite",
"socket2",
"tokio 1.19.2",
"tokio",
"tower-service",
"tracing",
"want",
@@ -1575,10 +1570,10 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes 1.1.0",
"bytes",
"hyper",
"native-tls",
"tokio 1.19.2",
"tokio",
"tokio-native-tls",
]
@@ -1595,7 +1590,7 @@ dependencies = [
"hyper",
"log",
"sha-1 0.10.0",
"tokio 1.19.2",
"tokio",
"tokio-tungstenite",
]
@@ -1609,7 +1604,7 @@ dependencies = [
"hex",
"hyper",
"pin-project",
"tokio 1.19.2",
"tokio",
]
[[package]]
@@ -2018,18 +2013,6 @@ dependencies = [
"smallvec",
]
[[package]]
name = "nix"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
]
[[package]]
name = "nix"
version = "0.23.1"
@@ -2045,9 +2028,9 @@ dependencies = [
[[package]]
name = "nix"
version = "0.24.1"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f17df307904acd05aa8e32e97bb20f2a0df1728bbc2d771ae8f9a90463441e9"
checksum = "195cdbc1741b8134346d515b3a56a1c94b0912758009cfd53f99ea0f57b065fc"
dependencies = [
"bitflags",
"cfg-if",
@@ -2383,7 +2366,7 @@ dependencies = [
"serde_cbor 0.11.1",
"serde_json",
"thiserror",
"tokio 1.19.2",
"tokio",
"tracing",
"tracing-error 0.1.2",
]
@@ -2522,12 +2505,6 @@ dependencies = [
"syn 1.0.98",
]
[[package]]
name = "pin-project-lite"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777"
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@@ -2879,7 +2856,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92"
dependencies = [
"base64",
"bytes 1.1.0",
"bytes",
"cookie",
"cookie_store",
"encoding_rs",
@@ -2897,12 +2874,12 @@ dependencies = [
"mime",
"native-tls",
"percent-encoding",
"pin-project-lite 0.2.9",
"pin-project-lite",
"proc-macro-hack",
"serde",
"serde_json",
"serde_urlencoded",
"tokio 1.19.2",
"tokio",
"tokio-native-tls",
"tokio-socks",
"tokio-util",
@@ -2920,7 +2897,7 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fc7d9a4063adcdfbb061aa3011b813cce95d3a8990f9ccb5ea85017916a1b0b"
dependencies = [
"bytes 1.1.0",
"bytes",
"cookie",
"cookie_store",
"reqwest",
@@ -2971,7 +2948,7 @@ dependencies = [
"serde_cbor 0.11.2",
"serde_json",
"thiserror",
"tokio 1.19.2",
"tokio",
"url",
"yajrc",
]
@@ -3440,7 +3417,7 @@ dependencies = [
"atoi",
"bitflags",
"byteorder",
"bytes 1.1.0",
"bytes",
"chrono",
"crc",
"crossbeam-queue",
@@ -3506,7 +3483,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f"
dependencies = [
"once_cell",
"tokio 1.19.2",
"tokio",
"tokio-rustls",
]
@@ -3774,52 +3751,26 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092"
dependencies = [
"bytes 0.5.6",
"num_cpus",
"pin-project-lite 0.1.12",
"slab",
]
[[package]]
name = "tokio"
version = "1.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439"
dependencies = [
"bytes 1.1.0",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot 0.12.0",
"pin-project-lite 0.2.9",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-compat-02"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7d4237822b7be8fff0a7a27927462fad435dcb6650f95cea9e946bf6bdc7e07"
dependencies = [
"bytes 0.5.6",
"once_cell",
"pin-project-lite 0.2.9",
"tokio 0.2.25",
"tokio 1.19.2",
"tokio-stream",
]
[[package]]
name = "tokio-macros"
version = "1.7.0"
@@ -3838,7 +3789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio 1.19.2",
"tokio",
]
[[package]]
@@ -3848,7 +3799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"tokio 1.19.2",
"tokio",
"webpki",
]
@@ -3861,7 +3812,7 @@ dependencies = [
"either",
"futures-util",
"thiserror",
"tokio 1.19.2",
"tokio",
]
[[package]]
@@ -3871,8 +3822,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
dependencies = [
"futures-core",
"pin-project-lite 0.2.9",
"tokio 1.19.2",
"pin-project-lite",
"tokio",
"tokio-util",
]
@@ -3885,7 +3836,7 @@ dependencies = [
"futures-core",
"libc",
"redox_syscall 0.2.13",
"tokio 1.19.2",
"tokio",
"tokio-stream",
"xattr",
]
@@ -3898,7 +3849,7 @@ checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae"
dependencies = [
"futures-util",
"log",
"tokio 1.19.2",
"tokio",
"tungstenite",
]
@@ -3908,11 +3859,11 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45"
dependencies = [
"bytes 1.1.0",
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite 0.2.9",
"tokio 1.19.2",
"pin-project-lite",
"tokio",
"tracing",
]
@@ -3942,7 +3893,7 @@ dependencies = [
"serde_derive",
"sha2 0.9.9",
"sha3",
"tokio 1.19.2",
"tokio",
]
[[package]]
@@ -3958,7 +3909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
dependencies = [
"cfg-if",
"pin-project-lite 0.2.9",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
@@ -4079,7 +4030,7 @@ dependencies = [
"rand 0.8.5",
"thiserror",
"time 0.3.11",
"tokio 1.19.2",
"tokio",
"trust-dns-proto",
]
@@ -4104,7 +4055,7 @@ dependencies = [
"smallvec",
"thiserror",
"tinyvec",
"tokio 1.19.2",
"tokio",
"url",
]
@@ -4115,7 +4066,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a395a2e0fd8aac9b4613767a5b4ba4b2040de1b767fa03ace8c9d6f351d60b2d"
dependencies = [
"async-trait",
"bytes 1.1.0",
"bytes",
"cfg-if",
"enum-as-inner",
"env_logger",
@@ -4125,7 +4076,7 @@ dependencies = [
"serde",
"thiserror",
"time 0.3.11",
"tokio 1.19.2",
"tokio",
"toml",
"trust-dns-client",
"trust-dns-proto",
@@ -4145,7 +4096,7 @@ checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5"
dependencies = [
"base64",
"byteorder",
"bytes 1.1.0",
"bytes",
"http",
"httparse",
"log",