do not start progress at 0 before diff complete (#1999)

* do not start progress at 0 before diff complete

* fix: Test rsync

* don't report progress of 0

* drop initialization value from progress stream

Co-authored-by: BluJ <mogulslayer@gmail.com>
This commit is contained in:
Aiden McClelland
2022-11-29 19:13:32 -07:00
committed by GitHub
parent 71d1418559
commit aafcce871e
4 changed files with 80 additions and 14 deletions

View File

@@ -463,7 +463,8 @@ async fn migrate(
ignore_existing: false,
exclude: Vec::new(),
},
)?;
)
.await?;
let mut package_data_transfer = Rsync::new(
"/media/embassy/migrate/package-data/",
"/embassy-data/package-data/",
@@ -473,7 +474,8 @@ async fn migrate(
ignore_existing: false,
exclude: vec!["tmp".to_owned()],
},
)?;
)
.await?;
let mut main_prog = 0.0;
let mut main_complete = false;

View File

@@ -200,7 +200,8 @@ async fn do_update(ctx: RpcContext, eos_url: EosUrl) -> Result<(), Error> {
eos_url.rsync_path()?,
"/media/embassy/next",
Default::default(),
)?;
)
.await?;
while let Some(progress) = rsync.progress.next().await {
crate::db::DatabaseModel::new()
.server_info()
@@ -305,7 +306,8 @@ async fn sync_boot() -> Result<(), Error> {
ignore_existing: true,
exclude: Vec::new(),
},
)?
)
.await?
.wait()
.await?;
if !*IS_RASPBERRY_PI {

View File

@@ -1,6 +1,7 @@
use std::path::Path;
use color_eyre::eyre::eyre;
use futures::StreamExt;
use models::{Error, ErrorKind};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::process::{Child, Command};
@@ -39,7 +40,7 @@ pub struct Rsync {
pub progress: WatchStream<f64>,
}
impl Rsync {
pub fn new(
pub async fn new(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
options: RsyncOptions,
@@ -60,6 +61,7 @@ impl Rsync {
let mut command = cmd
.arg("-acAXH")
.arg("--info=progress2")
.arg("--no-inc-recursive")
.arg(src.as_ref())
.arg(dst.as_ref())
.kill_on_drop(true)
@@ -99,12 +101,9 @@ impl Rsync {
})
.lines();
while let Some(line) = lines.next_line().await? {
if line.contains(" to-chk=0/") {
if let Some(percentage) = line
.split_ascii_whitespace()
.find_map(|col| col.strip_suffix("%"))
{
if let Err(err) = send.send(percentage.parse::<f64>()? / 100.0) {
if let Some(percentage) = parse_percentage(&line) {
if percentage > 0.0 {
if let Err(err) = send.send(percentage / 100.0) {
return Err(Error::new(
eyre!("rsync progress send error: {}", err),
ErrorKind::Filesystem,
@@ -116,11 +115,13 @@ impl Rsync {
Ok(())
})
.into();
let mut progress = WatchStream::new(recv);
progress.next().await;
Ok(Rsync {
command,
_progress_task: progress_task,
stderr,
progress: WatchStream::new(recv),
progress,
})
}
pub async fn wait(mut self) -> Result<(), Error> {
@@ -149,3 +150,63 @@ impl Rsync {
Ok(())
}
}
fn parse_percentage(line: &str) -> Option<f64> {
if let Some(percentage) = line
.split_ascii_whitespace()
.find_map(|col| col.strip_suffix("%"))
{
return percentage.parse().ok();
}
None
}
#[test]
fn test_parse() {
let input = " 1.07G 57% 95.20MB/s 0:00:10 (xfr#1, to-chk=0/2)";
assert_eq!(Some(57.0), parse_percentage(input));
}
#[tokio::test]
async fn test_rsync() {
use futures::StreamExt;
use tokio::fs;
let mut seen_zero = false;
let mut seen_in_between = false;
let mut seen_hundred = false;
fs::remove_dir_all("/tmp/test_rsync")
.await
.unwrap_or_default();
fs::create_dir_all("/tmp/test_rsync/a").await.unwrap();
fs::create_dir_all("/tmp/test_rsync/b").await.unwrap();
for i in 0..100 {
tokio::io::copy(
&mut fs::File::open("/dev/urandom").await.unwrap().take(100_000),
&mut fs::File::create(format!("/tmp/test_rsync/a/sample.{i}.bin"))
.await
.unwrap(),
)
.await
.unwrap();
}
let mut rsync = Rsync::new(
"/tmp/test_rsync/a/",
"/tmp/test_rsync/b/",
Default::default(),
)
.await
.unwrap();
while let Some(progress) = rsync.progress.next().await {
if progress <= 0.05 {
seen_zero = true;
} else if progress > 0.05 && progress < 1.0 {
seen_in_between = true
} else {
seen_hundred = true;
}
}
rsync.wait().await.unwrap();
assert!(seen_zero, "seen zero");
assert!(seen_in_between, "seen in between 0 and 100");
assert!(seen_hundred, "seen 100");
}

View File

@@ -707,8 +707,9 @@ mod fns {
);
}
let running_rsync =
Rsync::new(src, dst, options).map_err(|e| anyhow::anyhow!("{:?}", e.source))?;
let running_rsync = Rsync::new(src, dst, options)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e.source))?;
let insert_id = {
let mut rsyncs = rsyncs.lock().await;
let next = rsyncs.0 + 1;