mirror of
https://github.com/Start9Labs/start-os.git
synced 2026-03-26 02:11:53 +00:00
fix: potential fix for the docker leaking the errors and such (#1496)
* fix: potential fix for the docker leaking the errors and such * chore: Make sure that the buffer during reading the output will not exceed 10mb ish * Chore: Add testing
This commit is contained in:
22
backend/Cargo.lock
generated
22
backend/Cargo.lock
generated
@@ -110,6 +110,27 @@ dependencies = [
|
||||
"syn 1.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
|
||||
dependencies = [
|
||||
"async-stream-impl",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream-impl"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.39",
|
||||
"quote 1.0.18",
|
||||
"syn 1.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.56"
|
||||
@@ -1017,6 +1038,7 @@ name = "embassy-os"
|
||||
version = "0.3.1"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"avahi-sys",
|
||||
"base32",
|
||||
|
||||
@@ -48,6 +48,7 @@ unstable = ["patch-db/unstable"]
|
||||
[dependencies]
|
||||
aes = { version = "0.7.5", features = ["ctr"] }
|
||||
async-trait = "0.1.51"
|
||||
async-stream = "0.3.3"
|
||||
avahi-sys = { git = "https://github.com/Start9Labs/avahi-sys", version = "0.10.0", branch = "feature/dynamic-linking", features = [
|
||||
"dynamic",
|
||||
], optional = true }
|
||||
|
||||
@@ -5,12 +5,19 @@ use std::net::Ipv4Addr;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_stream::stream;
|
||||
use bollard::container::RemoveContainerOptions;
|
||||
use color_eyre::eyre::eyre;
|
||||
use color_eyre::Report;
|
||||
use futures::future::Either as EitherFuture;
|
||||
use futures::TryStreamExt;
|
||||
use helpers::NonDetachingJoinHandle;
|
||||
use nix::sys::signal;
|
||||
use nix::unistd::Pid;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::VecDeque;
|
||||
use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::context::RpcContext;
|
||||
@@ -170,14 +177,63 @@ impl DockerProcedure {
|
||||
Done(T),
|
||||
TimedOut,
|
||||
}
|
||||
|
||||
let io_format = self.io_format;
|
||||
let output = BufReader::new(
|
||||
handle
|
||||
.stdout
|
||||
.take()
|
||||
.ok_or_else(|| eyre!("Can't takeout stout"))
|
||||
.with_kind(crate::ErrorKind::Docker)?,
|
||||
);
|
||||
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
|
||||
if let Some(format) = io_format {
|
||||
let buffer = max_buffer(output, None).await?;
|
||||
return Ok::<Value, Error>(match format.from_reader(&*buffer) {
|
||||
Ok(a) => a,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
|
||||
format,
|
||||
e
|
||||
);
|
||||
String::from_utf8(buffer)
|
||||
.with_kind(crate::ErrorKind::Deserialization)?
|
||||
.into()
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let lines = buf_reader_to_lines(output, 1000).await?;
|
||||
if lines.is_empty() {
|
||||
return Ok(Value::Null);
|
||||
}
|
||||
|
||||
let joined_output = lines.join("\n");
|
||||
Ok(joined_output.into())
|
||||
}));
|
||||
let err_output = BufReader::new(
|
||||
handle
|
||||
.stderr
|
||||
.take()
|
||||
.ok_or_else(|| eyre!("Can't takeout std err"))
|
||||
.with_kind(crate::ErrorKind::Docker)?,
|
||||
);
|
||||
|
||||
let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
|
||||
let lines = buf_reader_to_lines(err_output, 1000).await?;
|
||||
let joined_output = lines.join("\n");
|
||||
Ok::<_, Error>(joined_output)
|
||||
}));
|
||||
|
||||
let res = tokio::select! {
|
||||
res = handle.wait_with_output() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?),
|
||||
res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?),
|
||||
res = timeout_fut => {
|
||||
res?;
|
||||
Race::TimedOut
|
||||
},
|
||||
};
|
||||
let res = match res {
|
||||
let exit_status = match res {
|
||||
Race::Done(x) => x,
|
||||
Race::TimedOut => {
|
||||
if let Some(id) = id {
|
||||
@@ -187,32 +243,19 @@ impl DockerProcedure {
|
||||
return Ok(Err((143, "Timed out. Retrying soon...".to_owned())));
|
||||
}
|
||||
};
|
||||
Ok(if res.status.success() || res.status.code() == Some(143) {
|
||||
Ok(if let Some(format) = self.io_format {
|
||||
match format.from_slice(&res.stdout) {
|
||||
Ok(a) => a,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.",
|
||||
format,
|
||||
e
|
||||
);
|
||||
serde_json::from_value(String::from_utf8(res.stdout)?.into())
|
||||
.with_kind(crate::ErrorKind::Deserialization)?
|
||||
}
|
||||
}
|
||||
} else if res.stdout.is_empty() {
|
||||
serde_json::from_value(Value::Null).with_kind(crate::ErrorKind::Deserialization)?
|
||||
} else {
|
||||
serde_json::from_value(String::from_utf8(res.stdout)?.into())
|
||||
.with_kind(crate::ErrorKind::Deserialization)?
|
||||
})
|
||||
Ok(
|
||||
if exit_status.success() || exit_status.code() == Some(143) {
|
||||
Ok(
|
||||
serde_json::from_value(output.await.with_kind(crate::ErrorKind::Unknown)??)
|
||||
.with_kind(crate::ErrorKind::Deserialization)?,
|
||||
)
|
||||
} else {
|
||||
Err((
|
||||
res.status.code().unwrap_or_default(),
|
||||
String::from_utf8(res.stderr)?,
|
||||
exit_status.code().unwrap_or_default(),
|
||||
err_output.await.with_kind(crate::ErrorKind::Unknown)??,
|
||||
))
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(skip(ctx, input))]
|
||||
@@ -247,13 +290,32 @@ impl DockerProcedure {
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Docker)?;
|
||||
}
|
||||
let res = handle
|
||||
.wait_with_output()
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Docker)?;
|
||||
Ok(if res.status.success() || res.status.code() == Some(143) {
|
||||
Ok(if let Some(format) = &self.io_format {
|
||||
match format.from_slice(&res.stdout) {
|
||||
|
||||
let err_output = BufReader::new(
|
||||
handle
|
||||
.stderr
|
||||
.take()
|
||||
.ok_or_else(|| eyre!("Can't takeout std err"))
|
||||
.with_kind(crate::ErrorKind::Docker)?,
|
||||
);
|
||||
let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
|
||||
let lines = buf_reader_to_lines(err_output, 1000).await?;
|
||||
let joined_output = lines.join("\n");
|
||||
Ok::<_, Error>(joined_output)
|
||||
}));
|
||||
|
||||
let io_format = self.io_format;
|
||||
let output = BufReader::new(
|
||||
handle
|
||||
.stdout
|
||||
.take()
|
||||
.ok_or_else(|| eyre!("Can't takeout stout"))
|
||||
.with_kind(crate::ErrorKind::Docker)?,
|
||||
);
|
||||
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
|
||||
if let Some(format) = io_format {
|
||||
let buffer = max_buffer(output, None).await?;
|
||||
return Ok::<Value, Error>(match format.from_reader(&*buffer) {
|
||||
Ok(a) => a,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
@@ -261,22 +323,36 @@ impl DockerProcedure {
|
||||
format,
|
||||
e
|
||||
);
|
||||
serde_json::from_value(String::from_utf8(res.stdout)?.into())
|
||||
String::from_utf8(buffer)
|
||||
.with_kind(crate::ErrorKind::Deserialization)?
|
||||
.into()
|
||||
}
|
||||
});
|
||||
}
|
||||
} else if res.stdout.is_empty() {
|
||||
serde_json::from_value(Value::Null).with_kind(crate::ErrorKind::Deserialization)?
|
||||
} else {
|
||||
serde_json::from_value(String::from_utf8(res.stdout)?.into())
|
||||
.with_kind(crate::ErrorKind::Deserialization)?
|
||||
})
|
||||
|
||||
let lines = buf_reader_to_lines(output, 1000).await?;
|
||||
if lines.is_empty() {
|
||||
return Ok(Value::Null);
|
||||
}
|
||||
|
||||
let joined_output = lines.join("\n");
|
||||
Ok(joined_output.into())
|
||||
}));
|
||||
|
||||
let exit_status = handle.wait().await.with_kind(crate::ErrorKind::Docker)?;
|
||||
Ok(
|
||||
if exit_status.success() || exit_status.code() == Some(143) {
|
||||
Ok(
|
||||
serde_json::from_value(output.await.with_kind(crate::ErrorKind::Unknown)??)
|
||||
.with_kind(crate::ErrorKind::Deserialization)?,
|
||||
)
|
||||
} else {
|
||||
Err((
|
||||
res.status.code().unwrap_or_default(),
|
||||
String::from_utf8(res.stderr)?,
|
||||
exit_status.code().unwrap_or_default(),
|
||||
err_output.await.with_kind(crate::ErrorKind::Unknown)??,
|
||||
))
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn container_name(pkg_id: &PackageId, name: Option<&str>) -> String {
|
||||
@@ -288,9 +364,9 @@ impl DockerProcedure {
|
||||
}
|
||||
|
||||
pub fn uncontainer_name(name: &str) -> Option<(PackageId<&str>, Option<&str>)> {
|
||||
let (pre_tld, _) = name.split_once(".")?;
|
||||
let (pre_tld, _) = name.split_once('.')?;
|
||||
if pre_tld.contains('_') {
|
||||
let (pkg, name) = name.split_once("_")?;
|
||||
let (pkg, name) = name.split_once('_')?;
|
||||
Some((Id::try_from(pkg).ok()?.into(), Some(name)))
|
||||
} else {
|
||||
Some((Id::try_from(pre_tld).ok()?.into(), None))
|
||||
@@ -356,3 +432,93 @@ impl DockerProcedure {
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
struct RingVec<T> {
|
||||
value: VecDeque<T>,
|
||||
capacity: usize,
|
||||
}
|
||||
impl<T> RingVec<T> {
|
||||
fn new(capacity: usize) -> Self {
|
||||
RingVec {
|
||||
value: VecDeque::with_capacity(capacity),
|
||||
capacity,
|
||||
}
|
||||
}
|
||||
fn push(&mut self, item: T) -> Option<T> {
|
||||
let popped_item = if self.value.len() == self.capacity {
|
||||
self.value.pop_front()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
self.value.push_back(item);
|
||||
popped_item
|
||||
}
|
||||
}
|
||||
|
||||
async fn buf_reader_to_lines(
|
||||
reader: impl AsyncBufRead + Unpin,
|
||||
limit: impl Into<Option<usize>>,
|
||||
) -> Result<Vec<String>, Error> {
|
||||
let lines = stream! {
|
||||
let mut lines = reader.lines();
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
yield Ok::<_, Report>(line);
|
||||
}
|
||||
};
|
||||
let output: RingVec<String> = lines
|
||||
.try_fold(
|
||||
RingVec::new(limit.into().unwrap_or(1000)),
|
||||
|mut acc, line| async move {
|
||||
acc.push(line);
|
||||
Ok(acc)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.with_kind(crate::ErrorKind::Unknown)?;
|
||||
let output: Vec<String> = output.value.into_iter().collect();
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
async fn max_buffer(
|
||||
reader: impl AsyncBufRead + Unpin,
|
||||
max_items: impl Into<Option<usize>>,
|
||||
) -> Result<Vec<u8>, Error> {
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
let mut lines = reader.lines();
|
||||
let max_items = max_items.into().unwrap_or(10_000_000);
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
let mut line = line.into_bytes();
|
||||
buffer.append(&mut line);
|
||||
if buffer.len() >= max_items {
|
||||
return Err(Error::new(
|
||||
color_eyre::eyre::eyre!("Reading the buffer exceeding limits of {}", max_items),
|
||||
crate::ErrorKind::Unknown,
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
/// Note, this size doesn't mean the vec will match. The vec will go to the next size, 0 -> 7 = 7 and so forth 7-15 = 15
|
||||
/// Just how the vec with capacity works.
|
||||
const CAPACITY_IN: usize = 7;
|
||||
#[test]
|
||||
fn default_capacity_is_set() {
|
||||
let ring: RingVec<usize> = RingVec::new(CAPACITY_IN);
|
||||
assert_eq!(CAPACITY_IN, ring.value.capacity());
|
||||
assert_eq!(0, ring.value.len());
|
||||
}
|
||||
#[test]
|
||||
fn capacity_can_not_be_exceeded() {
|
||||
let mut ring = RingVec::new(CAPACITY_IN);
|
||||
for i in 1..100usize {
|
||||
ring.push(i);
|
||||
}
|
||||
assert_eq!(CAPACITY_IN, ring.value.capacity());
|
||||
assert_eq!(CAPACITY_IN, ring.value.len());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user