diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 779378d12..9368da7e3 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -110,6 +110,27 @@ dependencies = [ "syn 1.0.96", ] +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2 1.0.39", + "quote 1.0.18", + "syn 1.0.96", +] + [[package]] name = "async-trait" version = "0.1.56" @@ -1017,6 +1038,7 @@ name = "embassy-os" version = "0.3.1" dependencies = [ "aes", + "async-stream", "async-trait", "avahi-sys", "base32", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 4dc6282cf..51c1e1f07 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -48,6 +48,7 @@ unstable = ["patch-db/unstable"] [dependencies] aes = { version = "0.7.5", features = ["ctr"] } async-trait = "0.1.51" +async-stream = "0.3.3" avahi-sys = { git = "https://github.com/Start9Labs/avahi-sys", version = "0.10.0", branch = "feature/dynamic-linking", features = [ "dynamic", ], optional = true } diff --git a/backend/src/procedure/docker.rs b/backend/src/procedure/docker.rs index b80eba930..542cc7873 100644 --- a/backend/src/procedure/docker.rs +++ b/backend/src/procedure/docker.rs @@ -5,12 +5,19 @@ use std::net::Ipv4Addr; use std::path::PathBuf; use std::time::Duration; +use async_stream::stream; use bollard::container::RemoveContainerOptions; +use color_eyre::eyre::eyre; +use color_eyre::Report; use futures::future::Either as EitherFuture; +use futures::TryStreamExt; +use helpers::NonDetachingJoinHandle; use nix::sys::signal; use nix::unistd::Pid; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::collections::VecDeque; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader}; use tracing::instrument; use crate::context::RpcContext; @@ -170,14 +177,63 @@ impl DockerProcedure { Done(T), TimedOut, } + + let io_format = self.io_format; + let output = BufReader::new( + handle + .stdout + .take() + .ok_or_else(|| eyre!("Can't takeout stout")) + .with_kind(crate::ErrorKind::Docker)?, + ); + let output = NonDetachingJoinHandle::from(tokio::spawn(async move { + if let Some(format) = io_format { + let buffer = max_buffer(output, None).await?; + return Ok::(match format.from_reader(&*buffer) { + Ok(a) => a, + Err(e) => { + tracing::warn!( + "Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.", + format, + e + ); + String::from_utf8(buffer) + .with_kind(crate::ErrorKind::Deserialization)? + .into() + } + }); + } + + let lines = buf_reader_to_lines(output, 1000).await?; + if lines.is_empty() { + return Ok(Value::Null); + } + + let joined_output = lines.join("\n"); + Ok(joined_output.into()) + })); + let err_output = BufReader::new( + handle + .stderr + .take() + .ok_or_else(|| eyre!("Can't takeout std err")) + .with_kind(crate::ErrorKind::Docker)?, + ); + + let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move { + let lines = buf_reader_to_lines(err_output, 1000).await?; + let joined_output = lines.join("\n"); + Ok::<_, Error>(joined_output) + })); + let res = tokio::select! { - res = handle.wait_with_output() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?), + res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?), res = timeout_fut => { res?; Race::TimedOut }, }; - let res = match res { + let exit_status = match res { Race::Done(x) => x, Race::TimedOut => { if let Some(id) = id { @@ -187,32 +243,19 @@ impl DockerProcedure { return Ok(Err((143, "Timed out. Retrying soon...".to_owned()))); } }; - Ok(if res.status.success() || res.status.code() == Some(143) { - Ok(if let Some(format) = self.io_format { - match format.from_slice(&res.stdout) { - Ok(a) => a, - Err(e) => { - tracing::warn!( - "Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.", - format, - e - ); - serde_json::from_value(String::from_utf8(res.stdout)?.into()) - .with_kind(crate::ErrorKind::Deserialization)? - } - } - } else if res.stdout.is_empty() { - serde_json::from_value(Value::Null).with_kind(crate::ErrorKind::Deserialization)? + Ok( + if exit_status.success() || exit_status.code() == Some(143) { + Ok( + serde_json::from_value(output.await.with_kind(crate::ErrorKind::Unknown)??) + .with_kind(crate::ErrorKind::Deserialization)?, + ) } else { - serde_json::from_value(String::from_utf8(res.stdout)?.into()) - .with_kind(crate::ErrorKind::Deserialization)? - }) - } else { - Err(( - res.status.code().unwrap_or_default(), - String::from_utf8(res.stderr)?, - )) - }) + Err(( + exit_status.code().unwrap_or_default(), + err_output.await.with_kind(crate::ErrorKind::Unknown)??, + )) + }, + ) } #[instrument(skip(ctx, input))] @@ -247,13 +290,32 @@ impl DockerProcedure { .await .with_kind(crate::ErrorKind::Docker)?; } - let res = handle - .wait_with_output() - .await - .with_kind(crate::ErrorKind::Docker)?; - Ok(if res.status.success() || res.status.code() == Some(143) { - Ok(if let Some(format) = &self.io_format { - match format.from_slice(&res.stdout) { + + let err_output = BufReader::new( + handle + .stderr + .take() + .ok_or_else(|| eyre!("Can't takeout std err")) + .with_kind(crate::ErrorKind::Docker)?, + ); + let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move { + let lines = buf_reader_to_lines(err_output, 1000).await?; + let joined_output = lines.join("\n"); + Ok::<_, Error>(joined_output) + })); + + let io_format = self.io_format; + let output = BufReader::new( + handle + .stdout + .take() + .ok_or_else(|| eyre!("Can't takeout stout")) + .with_kind(crate::ErrorKind::Docker)?, + ); + let output = NonDetachingJoinHandle::from(tokio::spawn(async move { + if let Some(format) = io_format { + let buffer = max_buffer(output, None).await?; + return Ok::(match format.from_reader(&*buffer) { Ok(a) => a, Err(e) => { tracing::warn!( @@ -261,22 +323,36 @@ impl DockerProcedure { format, e ); - serde_json::from_value(String::from_utf8(res.stdout)?.into()) + String::from_utf8(buffer) .with_kind(crate::ErrorKind::Deserialization)? + .into() } - } - } else if res.stdout.is_empty() { - serde_json::from_value(Value::Null).with_kind(crate::ErrorKind::Deserialization)? + }); + } + + let lines = buf_reader_to_lines(output, 1000).await?; + if lines.is_empty() { + return Ok(Value::Null); + } + + let joined_output = lines.join("\n"); + Ok(joined_output.into()) + })); + + let exit_status = handle.wait().await.with_kind(crate::ErrorKind::Docker)?; + Ok( + if exit_status.success() || exit_status.code() == Some(143) { + Ok( + serde_json::from_value(output.await.with_kind(crate::ErrorKind::Unknown)??) + .with_kind(crate::ErrorKind::Deserialization)?, + ) } else { - serde_json::from_value(String::from_utf8(res.stdout)?.into()) - .with_kind(crate::ErrorKind::Deserialization)? - }) - } else { - Err(( - res.status.code().unwrap_or_default(), - String::from_utf8(res.stderr)?, - )) - }) + Err(( + exit_status.code().unwrap_or_default(), + err_output.await.with_kind(crate::ErrorKind::Unknown)??, + )) + }, + ) } pub fn container_name(pkg_id: &PackageId, name: Option<&str>) -> String { @@ -288,9 +364,9 @@ impl DockerProcedure { } pub fn uncontainer_name(name: &str) -> Option<(PackageId<&str>, Option<&str>)> { - let (pre_tld, _) = name.split_once(".")?; + let (pre_tld, _) = name.split_once('.')?; if pre_tld.contains('_') { - let (pkg, name) = name.split_once("_")?; + let (pkg, name) = name.split_once('_')?; Some((Id::try_from(pkg).ok()?.into(), Some(name))) } else { Some((Id::try_from(pre_tld).ok()?.into(), None)) @@ -356,3 +432,93 @@ impl DockerProcedure { res } } + +struct RingVec { + value: VecDeque, + capacity: usize, +} +impl RingVec { + fn new(capacity: usize) -> Self { + RingVec { + value: VecDeque::with_capacity(capacity), + capacity, + } + } + fn push(&mut self, item: T) -> Option { + let popped_item = if self.value.len() == self.capacity { + self.value.pop_front() + } else { + None + }; + self.value.push_back(item); + popped_item + } +} + +async fn buf_reader_to_lines( + reader: impl AsyncBufRead + Unpin, + limit: impl Into>, +) -> Result, Error> { + let lines = stream! { + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await? { + yield Ok::<_, Report>(line); + } + }; + let output: RingVec = lines + .try_fold( + RingVec::new(limit.into().unwrap_or(1000)), + |mut acc, line| async move { + acc.push(line); + Ok(acc) + }, + ) + .await + .with_kind(crate::ErrorKind::Unknown)?; + let output: Vec = output.value.into_iter().collect(); + Ok(output) +} + +async fn max_buffer( + reader: impl AsyncBufRead + Unpin, + max_items: impl Into>, +) -> Result, Error> { + let mut buffer = Vec::new(); + + let mut lines = reader.lines(); + let max_items = max_items.into().unwrap_or(10_000_000); + while let Some(line) = lines.next_line().await? { + let mut line = line.into_bytes(); + buffer.append(&mut line); + if buffer.len() >= max_items { + return Err(Error::new( + color_eyre::eyre::eyre!("Reading the buffer exceeding limits of {}", max_items), + crate::ErrorKind::Unknown, + )); + } + } + Ok(buffer) +} + +#[cfg(test)] +mod tests { + use super::*; + /// Note, this size doesn't mean the vec will match. The vec will go to the next size, 0 -> 7 = 7 and so forth 7-15 = 15 + /// Just how the vec with capacity works. + const CAPACITY_IN: usize = 7; + #[test] + fn default_capacity_is_set() { + let ring: RingVec = RingVec::new(CAPACITY_IN); + assert_eq!(CAPACITY_IN, ring.value.capacity()); + assert_eq!(0, ring.value.len()); + } + #[test] + fn capacity_can_not_be_exceeded() { + let mut ring = RingVec::new(CAPACITY_IN); + for i in 1..100usize { + ring.push(i); + } + assert_eq!(CAPACITY_IN, ring.value.capacity()); + assert_eq!(CAPACITY_IN, ring.value.len()); + } +}