appmgr: update to tokio 0.3.4

This commit is contained in:
Aiden McClelland
2020-12-01 15:44:02 -07:00
committed by Keagan McClelland
parent 3efb38a742
commit 3a082c108b
7 changed files with 310 additions and 129 deletions

View File

@@ -2,6 +2,7 @@ use std::path::Path;
use failure::ResultExt as _;
use futures::stream::StreamExt;
use tokio_compat_02::IoCompat;
use tokio_tar as tar;
use crate::config::{ConfigRuleEntry, ConfigSpec};
@@ -48,7 +49,7 @@ pub async fn info_full<P: AsRef<Path>>(
.with_context(|e| format!("{}: {}", p.display(), e))
.with_code(crate::error::FILESYSTEM_ERROR)?;
log::info!("Extracting archive.");
let mut pkg = tar::Archive::new(r);
let mut pkg = tar::Archive::new(IoCompat::new(r));
let mut entries = pkg.entries()?;
log::info!("Opening manifest from archive.");
let manifest = entries
@@ -62,7 +63,7 @@ pub async fn info_full<P: AsRef<Path>>(
"Package File Invalid or Corrupted"
);
log::trace!("Deserializing manifest.");
let manifest: Manifest = from_cbor_async_reader(manifest).await?;
let manifest: Manifest = from_cbor_async_reader(IoCompat::new(manifest)).await?;
let manifest = manifest.into_latest();
crate::ensure_code!(
crate::version::Current::new()
@@ -93,7 +94,7 @@ pub async fn info_full<P: AsRef<Path>>(
"Package File Invalid or Corrupted"
);
log::trace!("Deserializing config spec.");
let spec = from_cbor_async_reader(spec).await?;
let spec = from_cbor_async_reader(IoCompat::new(spec)).await?;
log::info!("Opening config rules from archive.");
let rules = entries
.next()
@@ -108,7 +109,7 @@ pub async fn info_full<P: AsRef<Path>>(
"Package File Invalid or Corrupted"
);
log::trace!("Deserializing config rules.");
let rules = from_cbor_async_reader(rules).await?;
let rules = from_cbor_async_reader(IoCompat::new(rules)).await?;
Some(AppConfig { spec, rules })
} else {
None
@@ -124,7 +125,7 @@ pub async fn print_instructions<P: AsRef<Path>>(path: P) -> Result<(), Error> {
.with_context(|e| format!("{}: {}", p.display(), e))
.with_code(crate::error::FILESYSTEM_ERROR)?;
log::info!("Extracting archive.");
let mut pkg = tar::Archive::new(r);
let mut pkg = tar::Archive::new(IoCompat::new(r));
let mut entries = pkg.entries()?;
log::info!("Opening manifest from archive.");
let manifest = entries
@@ -138,7 +139,7 @@ pub async fn print_instructions<P: AsRef<Path>>(path: P) -> Result<(), Error> {
"Package File Invalid or Corrupted"
);
log::trace!("Deserializing manifest.");
let manifest: Manifest = from_cbor_async_reader(manifest).await?;
let manifest: Manifest = from_cbor_async_reader(IoCompat::new(manifest)).await?;
let manifest = manifest.into_latest();
crate::ensure_code!(
crate::version::Current::new()
@@ -166,7 +167,7 @@ pub async fn print_instructions<P: AsRef<Path>>(path: P) -> Result<(), Error> {
if manifest.has_instructions {
use tokio::io::AsyncWriteExt;
let mut instructions = entries
let instructions = entries
.next()
.await
.ok_or(crate::install::Error::CorruptedPkgFile(
@@ -175,7 +176,7 @@ pub async fn print_instructions<P: AsRef<Path>>(path: P) -> Result<(), Error> {
.no_code()??;
let mut stdout = tokio::io::stdout();
tokio::io::copy(&mut instructions, &mut stdout)
tokio::io::copy(&mut IoCompat::new(instructions), &mut stdout)
.await
.with_code(crate::error::FILESYSTEM_ERROR)?;
stdout

View File

@@ -14,8 +14,9 @@ use std::time::Duration;
use failure::ResultExt as _;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use tokio::io::AsyncRead;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, ReadBuf};
use tokio_compat_02::IoCompat;
use tokio_tar as tar;
use crate::config::{ConfigRuleEntry, ConfigSpec};
@@ -62,13 +63,13 @@ where
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
buf: &mut ReadBuf,
) -> Poll<std::io::Result<()>> {
let atomic = self.as_ref().1.clone(); // TODO: not efficient
match unsafe { self.map_unchecked_mut(|a| &mut a.0) }.poll_read(cx, buf) {
Poll::Ready(Ok(res)) => {
atomic.fetch_add(res as u64, atomic::Ordering::SeqCst);
Poll::Ready(Ok(res))
Poll::Ready(Ok(())) => {
atomic.fetch_add(buf.filled().len() as u64, atomic::Ordering::SeqCst);
Poll::Ready(Ok(()))
}
a => a,
}
@@ -141,7 +142,7 @@ pub async fn download(url: &str, name: Option<&str>) -> Result<PathBuf, crate::E
if is_done {
break;
}
tokio::time::delay_for(Duration::from_millis(10)).await;
tokio::time::sleep(Duration::from_millis(10)).await;
}
if !*crate::QUIET.read().await {
println!("\rDownloading... 100%");
@@ -191,7 +192,7 @@ pub async fn install_path<P: AsRef<Path>>(p: P, name: Option<&str>) -> Result<()
if is_done {
break;
}
tokio::time::delay_for(Duration::from_millis(10)).await;
tokio::time::sleep(Duration::from_millis(10)).await;
}
if !*crate::QUIET.read().await {
println!("\rInstalling... 100%");
@@ -213,7 +214,7 @@ pub async fn install<R: AsyncRead + Unpin + Send + Sync>(
name: Option<&str>,
) -> Result<(), crate::Error> {
log::info!("Extracting archive.");
let mut pkg = tar::Archive::new(r);
let mut pkg = tar::Archive::new(IoCompat::new(r));
let mut entries = pkg.entries()?;
log::info!("Opening manifest from archive.");
let manifest = entries
@@ -227,7 +228,9 @@ pub async fn install<R: AsyncRead + Unpin + Send + Sync>(
"Package File Invalid or Corrupted"
);
log::trace!("Deserializing manifest.");
let manifest: Manifest = from_cbor_async_reader(manifest).await.no_code()?;
let manifest: Manifest = from_cbor_async_reader(IoCompat::new(manifest))
.await
.no_code()?;
match manifest {
Manifest::V0(m) => install_v0(m, entries, name).await?,
};
@@ -236,7 +239,7 @@ pub async fn install<R: AsyncRead + Unpin + Send + Sync>(
pub async fn install_v0<R: AsyncRead + Unpin + Send + Sync>(
manifest: ManifestV0,
mut entries: tar::Entries<R>,
mut entries: tar::Entries<IoCompat<R>>,
name: Option<&str>,
) -> Result<(), crate::Error> {
crate::ensure_code!(
@@ -291,7 +294,7 @@ pub async fn install_v0<R: AsyncRead + Unpin + Send + Sync>(
"Package File Invalid or Corrupted"
);
log::trace!("Deserializing config spec.");
let config_spec: ConfigSpec = from_cbor_async_reader(config_spec).await?;
let config_spec: ConfigSpec = from_cbor_async_reader(IoCompat::new(config_spec)).await?;
log::info!("Saving config spec.");
let mut config_spec_out = app_dir.join("config_spec.yaml").write(None).await?;
to_yaml_async_writer(&mut *config_spec_out, &config_spec).await?;
@@ -308,14 +311,15 @@ pub async fn install_v0<R: AsyncRead + Unpin + Send + Sync>(
"Package File Invalid or Corrupted"
);
log::trace!("Deserializing config rules.");
let config_rules: Vec<ConfigRuleEntry> = from_cbor_async_reader(config_rules).await?;
let config_rules: Vec<ConfigRuleEntry> =
from_cbor_async_reader(IoCompat::new(config_rules)).await?;
log::info!("Saving config rules.");
let mut config_rules_out = app_dir.join("config_rules.yaml").write(None).await?;
to_yaml_async_writer(&mut *config_rules_out, &config_rules).await?;
config_rules_out.commit().await?;
if manifest.has_instructions {
log::info!("Opening instructions from archive.");
let mut instructions = entries
let instructions = entries
.next()
.await
.ok_or(Error::CorruptedPkgFile("missing config rules"))
@@ -327,7 +331,7 @@ pub async fn install_v0<R: AsyncRead + Unpin + Send + Sync>(
);
log::info!("Saving instructions.");
let mut instructions_out = app_dir.join("instructions.md").write(None).await?;
tokio::io::copy(&mut instructions, &mut *instructions_out)
tokio::io::copy(&mut IoCompat::new(instructions), &mut *instructions_out)
.await
.with_code(crate::error::FILESYSTEM_ERROR)?;
instructions_out.commit().await?;
@@ -407,11 +411,13 @@ pub async fn install_v0<R: AsyncRead + Unpin + Send + Sync>(
.arg("stop")
.arg(&manifest.id)
.spawn()?
.wait()
.await?;
tokio::process::Command::new("docker")
.arg("rm")
.arg(&manifest.id)
.spawn()?
.wait()
.await?;
crate::ensure_code!(
tokio::process::Command::new("docker")
@@ -426,7 +432,7 @@ pub async fn install_v0<R: AsyncRead + Unpin + Send + Sync>(
)
}
log::info!("Opening image.tar from archive.");
let mut image = entries
let image = entries
.next()
.await
.ok_or(Error::CorruptedPkgFile("missing image.tar"))
@@ -452,12 +458,12 @@ pub async fn install_v0<R: AsyncRead + Unpin + Send + Sync>(
})
.spawn()?;
let mut child_in = child.stdin.take().unwrap();
tokio::io::copy(&mut image, &mut child_in).await?;
tokio::io::copy(&mut IoCompat::new(image), &mut child_in).await?;
child_in.flush().await?;
child_in.shutdown().await?;
drop(child_in);
crate::ensure_code!(
child.await?.success(),
child.wait().await?.success(),
crate::error::DOCKER_ERROR,
"Failed to Load Docker Image From Tar"
);

View File

@@ -5,6 +5,7 @@ use failure::ResultExt;
use futures::stream::StreamExt;
use linear_map::LinearMap;
use rand::SeedableRng;
use tokio_compat_02::IoCompat;
use tokio_tar as tar;
use crate::config::{ConfigRuleEntry, ConfigSpec};
@@ -41,7 +42,7 @@ pub async fn pack(path: &str, output: &str) -> Result<(), failure::Error> {
output.display(),
);
let out_file = tokio::fs::File::create(output).await?;
let mut out = tar::Builder::new(out_file);
let mut out = tar::Builder::new(IoCompat::new(out_file));
log::info!("Reading {}/manifest.yaml.", path.display());
let manifest: Manifest = crate::util::from_yaml_async_reader(
tokio::fs::File::open(path.join("manifest.yaml"))
@@ -130,7 +131,7 @@ pub async fn pack(path: &str, output: &str) -> Result<(), failure::Error> {
h.set_size(0);
h.set_path(format!("APPMGR_DIR_END:{}", asset.src.display()))?;
h.set_cksum();
out.append(&h, tokio::io::empty()).await?;
out.append(&h, IoCompat::new(tokio::io::empty())).await?;
} else {
out.append_path_with_name(&file_path, &asset.src).await?;
}
@@ -144,7 +145,8 @@ pub async fn pack(path: &str, output: &str) -> Result<(), failure::Error> {
log::info!("Writing image.tar to archive.");
let mut header = tar::Header::new_gnu();
header.set_size(image.metadata().await?.len());
out.append_data(&mut header, "image.tar", image).await?;
out.append_data(&mut header, "image.tar", IoCompat::new(image))
.await?;
}
}
out.into_inner().await?;
@@ -199,7 +201,7 @@ pub async fn verify(path: &str) -> Result<(), failure::Error> {
.await
.with_context(|e| format!("{}: {}", path.display(), e))?;
log::info!("Extracting archive.");
let mut pkg = tar::Archive::new(r);
let mut pkg = tar::Archive::new(IoCompat::new(r));
let mut entries = pkg.entries()?;
log::info!("Opening manifest from archive.");
let manifest = entries
@@ -212,7 +214,7 @@ pub async fn verify(path: &str) -> Result<(), failure::Error> {
manifest.path()?.display()
);
log::trace!("Deserializing manifest.");
let manifest: Manifest = from_cbor_async_reader(manifest).await?;
let manifest: Manifest = from_cbor_async_reader(IoCompat::new(manifest)).await?;
let manifest = manifest.into_latest();
ensure!(
crate::version::Current::new()
@@ -245,7 +247,7 @@ pub async fn verify(path: &str) -> Result<(), failure::Error> {
config_spec.path()?.display()
);
log::trace!("Deserializing config spec.");
let config_spec: ConfigSpec = from_cbor_async_reader(config_spec).await?;
let config_spec: ConfigSpec = from_cbor_async_reader(IoCompat::new(config_spec)).await?;
log::trace!("Validating config spec.");
config_spec.validate(&manifest)?;
let config = config_spec.gen(&mut rand::rngs::StdRng::from_entropy(), &None)?;
@@ -261,7 +263,8 @@ pub async fn verify(path: &str) -> Result<(), failure::Error> {
config_rules.path()?.display()
);
log::trace!("Deserializing config rules.");
let config_rules: Vec<ConfigRuleEntry> = from_cbor_async_reader(config_rules).await?;
let config_rules: Vec<ConfigRuleEntry> =
from_cbor_async_reader(IoCompat::new(config_rules)).await?;
log::trace!("Validating config rules against config spec.");
let mut cfgs = LinearMap::new();
cfgs.insert(name, Cow::Borrowed(&config));
@@ -375,7 +378,7 @@ pub async fn verify(path: &str) -> Result<(), failure::Error> {
.await
.ok_or_else(|| format_err!("image.tar is missing manifest.json"))??;
let image_manifest: Vec<DockerManifest> =
from_json_async_reader(image_manifest).await?;
from_json_async_reader(IoCompat::new(image_manifest)).await?;
image_manifest
.into_iter()
.flat_map(|a| a.repo_tags)

View File

@@ -198,7 +198,7 @@ pub async fn read_tor_address(name: &str, timeout: Option<Duration>) -> Result<S
}
}
} {
tokio::time::delay_for(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
let tor_addr = match tokio::fs::read_to_string(&addr_path).await {
@@ -238,7 +238,7 @@ pub async fn read_tor_key(
}
}
} {
tokio::time::delay_for(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
let tor_key = match version {

View File

@@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
use failure::ResultExt as _;
use file_lock::FileLock;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use crate::Error;
use crate::ResultExt as _;
@@ -244,8 +244,8 @@ impl tokio::io::AsyncRead for UpdateHandle<ForRead> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
unsafe { self.map_unchecked_mut(|a| a.file.file.as_mut().unwrap()) }.poll_read(cx, buf)
}
}
@@ -371,7 +371,13 @@ where
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
tokio::io::AsyncRead::poll_read(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf)
let mut read_buf = ReadBuf::new(buf);
tokio::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
&mut read_buf,
)
.map(|res| res.map(|_| read_buf.filled().len()))
}
}
impl<T> tokio::io::AsyncRead for AsyncCompat<T>
@@ -381,9 +387,14 @@ where
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
futures::io::AsyncRead::poll_read(unsafe { self.map_unchecked_mut(|a| &mut a.0) }, cx, buf)
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
futures::io::AsyncRead::poll_read(
unsafe { self.map_unchecked_mut(|a| &mut a.0) },
cx,
buf.initialize_unfilled(),
)
.map(|res| res.map(|len| buf.set_filled(len)))
}
}
impl<T> futures::io::AsyncWrite for AsyncCompat<T>