diff --git a/backend/src/setup.rs b/backend/src/setup.rs index e3c5c2538..f656c0e88 100644 --- a/backend/src/setup.rs +++ b/backend/src/setup.rs @@ -463,7 +463,8 @@ async fn migrate( ignore_existing: false, exclude: Vec::new(), }, - )?; + ) + .await?; let mut package_data_transfer = Rsync::new( "/media/embassy/migrate/package-data/", "/embassy-data/package-data/", @@ -473,7 +474,8 @@ async fn migrate( ignore_existing: false, exclude: vec!["tmp".to_owned()], }, - )?; + ) + .await?; let mut main_prog = 0.0; let mut main_complete = false; diff --git a/backend/src/update/mod.rs b/backend/src/update/mod.rs index d29ec8ce7..fe8e8bb4b 100644 --- a/backend/src/update/mod.rs +++ b/backend/src/update/mod.rs @@ -200,7 +200,8 @@ async fn do_update(ctx: RpcContext, eos_url: EosUrl) -> Result<(), Error> { eos_url.rsync_path()?, "/media/embassy/next", Default::default(), - )?; + ) + .await?; while let Some(progress) = rsync.progress.next().await { crate::db::DatabaseModel::new() .server_info() @@ -305,7 +306,8 @@ async fn sync_boot() -> Result<(), Error> { ignore_existing: true, exclude: Vec::new(), }, - )? + ) + .await? .wait() .await?; if !*IS_RASPBERRY_PI { diff --git a/libs/helpers/src/rsync.rs b/libs/helpers/src/rsync.rs index 70e2b7daf..e317416ff 100644 --- a/libs/helpers/src/rsync.rs +++ b/libs/helpers/src/rsync.rs @@ -1,6 +1,7 @@ use std::path::Path; use color_eyre::eyre::eyre; +use futures::StreamExt; use models::{Error, ErrorKind}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; use tokio::process::{Child, Command}; @@ -39,7 +40,7 @@ pub struct Rsync { pub progress: WatchStream, } impl Rsync { - pub fn new( + pub async fn new( src: impl AsRef, dst: impl AsRef, options: RsyncOptions, @@ -60,6 +61,7 @@ impl Rsync { let mut command = cmd .arg("-acAXH") .arg("--info=progress2") + .arg("--no-inc-recursive") .arg(src.as_ref()) .arg(dst.as_ref()) .kill_on_drop(true) @@ -99,12 +101,9 @@ impl Rsync { }) .lines(); while let Some(line) = lines.next_line().await? { - if line.contains(" to-chk=0/") { - if let Some(percentage) = line - .split_ascii_whitespace() - .find_map(|col| col.strip_suffix("%")) - { - if let Err(err) = send.send(percentage.parse::()? / 100.0) { + if let Some(percentage) = parse_percentage(&line) { + if percentage > 0.0 { + if let Err(err) = send.send(percentage / 100.0) { return Err(Error::new( eyre!("rsync progress send error: {}", err), ErrorKind::Filesystem, @@ -116,11 +115,13 @@ impl Rsync { Ok(()) }) .into(); + let mut progress = WatchStream::new(recv); + progress.next().await; Ok(Rsync { command, _progress_task: progress_task, stderr, - progress: WatchStream::new(recv), + progress, }) } pub async fn wait(mut self) -> Result<(), Error> { @@ -149,3 +150,63 @@ impl Rsync { Ok(()) } } + +fn parse_percentage(line: &str) -> Option { + if let Some(percentage) = line + .split_ascii_whitespace() + .find_map(|col| col.strip_suffix("%")) + { + return percentage.parse().ok(); + } + None +} + +#[test] +fn test_parse() { + let input = " 1.07G 57% 95.20MB/s 0:00:10 (xfr#1, to-chk=0/2)"; + assert_eq!(Some(57.0), parse_percentage(input)); +} + +#[tokio::test] +async fn test_rsync() { + use futures::StreamExt; + use tokio::fs; + let mut seen_zero = false; + let mut seen_in_between = false; + let mut seen_hundred = false; + fs::remove_dir_all("/tmp/test_rsync") + .await + .unwrap_or_default(); + fs::create_dir_all("/tmp/test_rsync/a").await.unwrap(); + fs::create_dir_all("/tmp/test_rsync/b").await.unwrap(); + for i in 0..100 { + tokio::io::copy( + &mut fs::File::open("/dev/urandom").await.unwrap().take(100_000), + &mut fs::File::create(format!("/tmp/test_rsync/a/sample.{i}.bin")) + .await + .unwrap(), + ) + .await + .unwrap(); + } + let mut rsync = Rsync::new( + "/tmp/test_rsync/a/", + "/tmp/test_rsync/b/", + Default::default(), + ) + .await + .unwrap(); + while let Some(progress) = rsync.progress.next().await { + if progress <= 0.05 { + seen_zero = true; + } else if progress > 0.05 && progress < 1.0 { + seen_in_between = true + } else { + seen_hundred = true; + } + } + rsync.wait().await.unwrap(); + assert!(seen_zero, "seen zero"); + assert!(seen_in_between, "seen in between 0 and 100"); + assert!(seen_hundred, "seen 100"); +} diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index 8d3029b8a..0639e1598 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -707,8 +707,9 @@ mod fns { ); } - let running_rsync = - Rsync::new(src, dst, options).map_err(|e| anyhow::anyhow!("{:?}", e.source))?; + let running_rsync = Rsync::new(src, dst, options) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e.source))?; let insert_id = { let mut rsyncs = rsyncs.lock().await; let next = rsyncs.0 + 1;