Fix/memory leak docker (#1505)

* fix: potential fix for the docker leaking the errors and such

* chore: Make sure that the buffer during reading the output will not exceed 10mb ish

* Chore: Add testing

* fix: Docker buffer reading to lines now works

* chore: fixing the broken responses
This commit is contained in:
J M
2022-06-07 12:58:12 -06:00
committed by GitHub
parent f234f894af
commit 9217d00528

View File

@@ -188,8 +188,8 @@ impl DockerProcedure {
); );
let output = NonDetachingJoinHandle::from(tokio::spawn(async move { let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
if let Some(format) = io_format { if let Some(format) = io_format {
let buffer = max_buffer(output, None).await?; let buffer = max_by_lines(output, None).await?;
return Ok::<Value, Error>(match format.from_reader(&*buffer) { return Ok::<Value, Error>(match format.from_slice(buffer.as_bytes()) {
Ok(a) => a, Ok(a) => a,
Err(e) => { Err(e) => {
tracing::warn!( tracing::warn!(
@@ -197,9 +197,7 @@ impl DockerProcedure {
format, format,
e e
); );
String::from_utf8(buffer) Value::String(buffer)
.with_kind(crate::ErrorKind::Deserialization)?
.into()
} }
}); });
} }
@@ -210,7 +208,7 @@ impl DockerProcedure {
} }
let joined_output = lines.join("\n"); let joined_output = lines.join("\n");
Ok(joined_output.into()) Ok(Value::String(joined_output))
})); }));
let err_output = BufReader::new( let err_output = BufReader::new(
handle handle
@@ -314,8 +312,8 @@ impl DockerProcedure {
); );
let output = NonDetachingJoinHandle::from(tokio::spawn(async move { let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
if let Some(format) = io_format { if let Some(format) = io_format {
let buffer = max_buffer(output, None).await?; let buffer = max_by_lines(output, None).await?;
return Ok::<Value, Error>(match format.from_reader(&*buffer) { return Ok::<Value, Error>(match format.from_slice(&buffer.as_bytes()) {
Ok(a) => a, Ok(a) => a,
Err(e) => { Err(e) => {
tracing::warn!( tracing::warn!(
@@ -323,9 +321,7 @@ impl DockerProcedure {
format, format,
e e
); );
String::from_utf8(buffer) Value::String(buffer)
.with_kind(crate::ErrorKind::Deserialization)?
.into()
} }
}); });
} }
@@ -336,7 +332,7 @@ impl DockerProcedure {
} }
let joined_output = lines.join("\n"); let joined_output = lines.join("\n");
Ok(joined_output.into()) Ok(Value::String(joined_output))
})); }));
let exit_status = handle.wait().await.with_kind(crate::ErrorKind::Docker)?; let exit_status = handle.wait().await.with_kind(crate::ErrorKind::Docker)?;
@@ -479,27 +475,28 @@ async fn buf_reader_to_lines(
Ok(output) Ok(output)
} }
async fn max_buffer( async fn max_by_lines(
reader: impl AsyncBufRead + Unpin, reader: impl AsyncBufRead + Unpin,
max_items: impl Into<Option<usize>>, max_items: impl Into<Option<usize>>,
) -> Result<Vec<u8>, Error> { ) -> Result<String, Error> {
let mut buffer = Vec::new(); let mut answer = String::new();
let mut lines = reader.lines(); let mut lines = reader.lines();
let max_items = max_items.into().unwrap_or(10_000_000); let max_items = max_items.into().unwrap_or(10_000_000);
while let Some(line) = lines.next_line().await? { while let Some(line) = lines.next_line().await? {
let mut line = line.into_bytes(); if !answer.is_empty() {
buffer.append(&mut line); answer.push('\n');
if buffer.len() >= max_items { }
answer.push_str(&line);
if answer.len() >= max_items {
return Err(Error::new( return Err(Error::new(
color_eyre::eyre::eyre!("Reading the buffer exceeding limits of {}", max_items), color_eyre::eyre::eyre!("Reading the buffer exceeding limits of {}", max_items),
crate::ErrorKind::Unknown, crate::ErrorKind::Unknown,
)); ));
} }
} }
Ok(buffer) Ok(answer)
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;