diff --git a/backend/src/backup/backup_bulk.rs b/backend/src/backup/backup_bulk.rs index 92aca651e..6e4fb88dd 100644 --- a/backend/src/backup/backup_bulk.rs +++ b/backend/src/backup/backup_bulk.rs @@ -370,7 +370,7 @@ async fn perform_backup( } let luks_folder = Path::new("/media/embassy/config/luks"); if tokio::fs::metadata(&luks_folder).await.is_ok() { - dir_copy(&luks_folder, &luks_folder_bak).await?; + dir_copy(&luks_folder, &luks_folder_bak, None).await?; } let timestamp = Some(Utc::now()); diff --git a/backend/src/backup/restore.rs b/backend/src/backup/restore.rs index 55bca58ca..1589aabe6 100644 --- a/backend/src/backup/restore.rs +++ b/backend/src/backup/restore.rs @@ -109,7 +109,7 @@ async fn approximate_progress( if tokio::fs::metadata(&dir).await.is_err() { *size = 0; } else { - *size = dir_size(&dir).await?; + *size = dir_size(&dir, None).await?; } } Ok(()) @@ -285,7 +285,7 @@ async fn restore_packages( progress_info.package_installs.insert(id.clone(), progress); progress_info .src_volume_size - .insert(id.clone(), dir_size(backup_dir(&id)).await?); + .insert(id.clone(), dir_size(backup_dir(&id), None).await?); progress_info.target_volume_size.insert(id.clone(), 0); let package_id = id.clone(); tasks.push( diff --git a/backend/src/setup.rs b/backend/src/setup.rs index 0ac21cce2..afe92670f 100644 --- a/backend/src/setup.rs +++ b/backend/src/setup.rs @@ -1,9 +1,9 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Duration; use color_eyre::eyre::eyre; use futures::StreamExt; -use helpers::{Rsync, RsyncOptions}; use josekit::jwk::Jwk; use openssl::x509::X509; use patch_db::DbHandle; @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Connection; use tokio::fs::File; use tokio::io::AsyncWriteExt; +use tokio::try_join; use torut::onion::OnionAddressV3; use tracing::instrument; @@ -32,6 +33,7 @@ use crate::disk::REPAIR_DISK_PATH; use crate::hostname::Hostname; use crate::init::{init, InitResult}; use crate::middleware::encrypt::EncryptedWire; +use crate::util::io::{dir_copy, dir_size, Counter}; use crate::{Error, ErrorKind, ResultExt}; #[command(subcommands(status, disk, attach, execute, cifs, complete, get_pubkey, exit))] @@ -420,70 +422,70 @@ async fn migrate( ) .await?; - let mut main_transfer = Rsync::new( - "/media/embassy/migrate/main/", - "/embassy-data/main/", - RsyncOptions { - delete: true, - force: true, - ignore_existing: false, - exclude: Vec::new(), - no_permissions: false, - no_owner: false, - }, - ) - .await?; - let mut package_data_transfer = Rsync::new( + let main_transfer_args = ("/media/embassy/migrate/main/", "/embassy-data/main/"); + let package_data_transfer_args = ( "/media/embassy/migrate/package-data/", "/embassy-data/package-data/", - RsyncOptions { - delete: true, - force: true, - ignore_existing: false, - exclude: vec!["tmp".to_owned()], - no_permissions: false, - no_owner: false, - }, - ) - .await?; + ); - let mut main_prog = 0.0; - let mut main_complete = false; - let mut pkg_prog = 0.0; - let mut pkg_complete = false; - loop { - tokio::select! { - p = main_transfer.progress.next() => { - if let Some(p) = p { - main_prog = p; - } else { - main_prog = 1.0; - main_complete = true; - } - } - p = package_data_transfer.progress.next() => { - if let Some(p) = p { - pkg_prog = p; - } else { - pkg_prog = 1.0; - pkg_complete = true; - } - } - } - if main_prog > 0.0 && pkg_prog > 0.0 { - *ctx.setup_status.write().await = Some(Ok(SetupStatus { - bytes_transferred: ((main_prog * 50.0) + (pkg_prog * 950.0)) as u64, - total_bytes: Some(1000), - complete: false, - })); - } - if main_complete && pkg_complete { - break; - } + let tmpdir = Path::new(package_data_transfer_args.0).join("tmp"); + if tokio::fs::metadata(&tmpdir).await.is_ok() { + tokio::fs::remove_dir_all(&tmpdir).await?; } - main_transfer.wait().await?; - package_data_transfer.wait().await?; + let ordering = std::sync::atomic::Ordering::Relaxed; + + let main_transfer_size = Counter::new(0, ordering); + let package_data_transfer_size = Counter::new(0, ordering); + + let size = tokio::select! { + res = async { + let (main_size, package_data_size) = try_join!( + dir_size(main_transfer_args.0, Some(&main_transfer_size)), + dir_size(package_data_transfer_args.0, Some(&package_data_transfer_size)) + )?; + Ok::<_, Error>(main_size + package_data_size) + } => { res? }, + res = async { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + *ctx.setup_status.write().await = Some(Ok(SetupStatus { + bytes_transferred: 0, + total_bytes: Some(main_transfer_size.load() + package_data_transfer_size.load()), + complete: false, + })); + } + } => res, + }; + + *ctx.setup_status.write().await = Some(Ok(SetupStatus { + bytes_transferred: 0, + total_bytes: Some(size), + complete: false, + })); + + let main_transfer_progress = Counter::new(0, ordering); + let package_data_transfer_progress = Counter::new(0, ordering); + + tokio::select! { + res = async { + try_join!( + dir_copy(main_transfer_args.0, main_transfer_args.1, Some(&main_transfer_progress)), + dir_copy(package_data_transfer_args.0, package_data_transfer_args.1, Some(&package_data_transfer_progress)) + )?; + Ok::<_, Error>(()) + } => { res? }, + res = async { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + *ctx.setup_status.write().await = Some(Ok(SetupStatus { + bytes_transferred: main_transfer_progress.load() + package_data_transfer_progress.load(), + total_bytes: Some(size), + complete: false, + })); + } + } => res, + } let (hostname, tor_addr, root_ca) = setup_init(&ctx, Some(embassy_password)).await?; diff --git a/backend/src/util/io.rs b/backend/src/util/io.rs index f96d7731a..3727598cf 100644 --- a/backend/src/util/io.rs +++ b/backend/src/util/io.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::io::Cursor; use std::os::unix::prelude::MetadataExt; use std::path::Path; +use std::sync::atomic::AtomicU64; use std::task::Poll; use futures::future::{BoxFuture, Fuse}; @@ -224,6 +225,7 @@ pub async fn copy_and_shutdown( pub fn dir_size<'a, P: AsRef + 'a + Send + Sync>( path: P, + ctr: Option<&'a Counter>, ) -> BoxFuture<'a, Result> { async move { tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(path.as_ref()).await?) @@ -231,9 +233,12 @@ pub fn dir_size<'a, P: AsRef + 'a + Send + Sync>( let m = e.metadata().await?; Ok(acc + if m.is_file() { + if let Some(ctr) = ctr { + ctr.add(m.len()); + } m.len() } else if m.is_dir() { - dir_size(e.path()).await? + dir_size(e.path(), ctr).await? } else { 0 }) @@ -419,9 +424,60 @@ impl AsyncWrite for BackTrackingReader { } } +pub struct Counter { + atomic: AtomicU64, + ordering: std::sync::atomic::Ordering, +} +impl Counter { + pub fn new(init: u64, ordering: std::sync::atomic::Ordering) -> Self { + Self { + atomic: AtomicU64::new(init), + ordering, + } + } + pub fn load(&self) -> u64 { + self.atomic.load(self.ordering) + } + pub fn add(&self, value: u64) { + self.atomic.fetch_add(value, self.ordering); + } +} + +#[pin_project::pin_project] +pub struct CountingReader<'a, R> { + ctr: &'a Counter, + #[pin] + rdr: R, +} +impl<'a, R> CountingReader<'a, R> { + pub fn new(rdr: R, ctr: &'a Counter) -> Self { + Self { ctr, rdr } + } + pub fn into_inner(self) -> R { + self.rdr + } +} +impl<'a, R: AsyncRead> AsyncRead for CountingReader<'a, R> { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + let start = buf.filled().len(); + let res = this.rdr.poll_read(cx, buf); + let len = buf.filled().len() - start; + if len > 0 { + this.ctr.add(len as u64); + } + res + } +} + pub fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + Send + Sync>( src: P0, dst: P1, + ctr: Option<&'a Counter>, ) -> BoxFuture<'a, Result<(), crate::Error>> { async move { let m = tokio::fs::metadata(&src).await?; @@ -471,16 +527,17 @@ pub fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + S format!("create {}", dst_path.display()), ) })?; - tokio::io::copy( - &mut tokio::fs::File::open(&src_path).await.with_ctx(|_| { - ( - crate::ErrorKind::Filesystem, - format!("open {}", src_path.display()), - ) - })?, - &mut dst_file, - ) - .await + let mut rdr = tokio::fs::File::open(&src_path).await.with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + format!("open {}", src_path.display()), + ) + })?; + if let Some(ctr) = ctr { + tokio::io::copy(&mut CountingReader::new(rdr, ctr), &mut dst_file).await + } else { + tokio::io::copy(&mut rdr, &mut dst_file).await + } .with_ctx(|_| { ( crate::ErrorKind::Filesystem, @@ -508,7 +565,7 @@ pub fn dir_copy<'a, P0: AsRef + 'a + Send + Sync, P1: AsRef + 'a + S ) })?; } else if m.is_dir() { - dir_copy(src_path, dst_path).await?; + dir_copy(src_path, dst_path, ctr).await?; } else if m.file_type().is_symlink() { tokio::fs::symlink( tokio::fs::read_link(&src_path).await.with_ctx(|_| { diff --git a/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.html b/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.html index 36a039300..fd7fcc24c 100644 --- a/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.html +++ b/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.html @@ -2,15 +2,12 @@ - + Initializing StartOS
- - Progress: {{ (decimal * 100).toFixed(0)}}% + + {{ progress.transferred | toMessage }}
@@ -18,16 +15,22 @@ -

{{ progress.decimal | toMessage }}

+

+ + + Progress: {{ (transferred * 100).toFixed() }}% + + + {{ (progress.totalBytes / 1073741824).toFixed(2) }} GB + + +

diff --git a/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.ts b/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.ts index fd09c84d1..10ec47d31 100644 --- a/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.ts +++ b/frontend/projects/setup-wizard/src/app/pages/loading/loading.page.ts @@ -2,6 +2,14 @@ import { Component } from '@angular/core' import { NavController } from '@ionic/angular' import { StateService } from 'src/app/services/state.service' import { Pipe, PipeTransform } from '@angular/core' +import { BehaviorSubject } from 'rxjs' +import { ApiService } from 'src/app/services/api/api.service' +import { ErrorToastService, pauseFor } from '@start9labs/shared' + +type Progress = { + totalBytes: number | null + transferred: number +} @Component({ selector: 'app-loading', @@ -9,23 +17,49 @@ import { Pipe, PipeTransform } from '@angular/core' styleUrls: ['loading.page.scss'], }) export class LoadingPage { - readonly progress$ = this.stateService.dataProgress$ + readonly progress$ = new BehaviorSubject({ + totalBytes: null, + transferred: 0, + }) constructor( - private readonly stateService: StateService, private readonly navCtrl: NavController, + private readonly api: ApiService, + private readonly errorToastService: ErrorToastService, ) {} ngOnInit() { - this.stateService.pollDataTransferProgress() - const progSub = this.stateService.dataCompletionSubject$.subscribe( - async complete => { - if (complete) { - progSub.unsubscribe() - await this.navCtrl.navigateForward(`/success`) - } - }, - ) + this.poll() + } + + async poll() { + try { + const progress = await this.api.getStatus() + + if (!progress) return + + const { + 'total-bytes': totalBytes, + 'bytes-transferred': bytesTransferred, + } = progress + + this.progress$.next({ + totalBytes, + transferred: totalBytes ? bytesTransferred / totalBytes : 0, + }) + + if (progress.complete) { + this.navCtrl.navigateForward(`/success`) + this.progress$.complete() + return + } + + await pauseFor(250) + + setTimeout(() => this.poll(), 0) // prevent call stack from growing + } catch (e: any) { + this.errorToastService.present(e) + } } } @@ -41,7 +75,7 @@ export class ToMessagePipe implements PipeTransform { } if (!progress) { - return 'Preparing data. This can take a while' + return 'Calculating size' } else if (progress < 1) { return 'Copying data' } else { diff --git a/frontend/projects/setup-wizard/src/app/services/api/mock-api.service.ts b/frontend/projects/setup-wizard/src/app/services/api/mock-api.service.ts index d7ab07bd6..e576e9e1d 100644 --- a/frontend/projects/setup-wizard/src/app/services/api/mock-api.service.ts +++ b/frontend/projects/setup-wizard/src/app/services/api/mock-api.service.ts @@ -17,8 +17,6 @@ let tries: number export class MockApiService extends ApiService { async getStatus() { const restoreOrMigrate = true - const total = 4 - await pauseFor(1000) if (tries === undefined) { @@ -27,7 +25,9 @@ export class MockApiService extends ApiService { } tries++ - const progress = tries - 1 + + const total = tries <= 4 ? tries * 268435456 : 1073741824 + const progress = tries > 4 ? (tries - 4) * 268435456 : 0 return { 'bytes-transferred': restoreOrMigrate ? progress : 0, diff --git a/frontend/projects/setup-wizard/src/app/services/state.service.ts b/frontend/projects/setup-wizard/src/app/services/state.service.ts index 4379ee13d..e70478559 100644 --- a/frontend/projects/setup-wizard/src/app/services/state.service.ts +++ b/frontend/projects/setup-wizard/src/app/services/state.service.ts @@ -1,7 +1,5 @@ import { Injectable } from '@angular/core' -import { BehaviorSubject } from 'rxjs' import { ApiService, RecoverySource } from './api/api.service' -import { pauseFor, ErrorToastService } from '@start9labs/shared' @Injectable({ providedIn: 'root', @@ -12,47 +10,7 @@ export class StateService { recoverySource?: RecoverySource recoveryPassword?: string - dataTransferProgress?: { - bytesTransferred: number - totalBytes: number | null - complete: boolean - } - dataProgress$ = new BehaviorSubject(0) - dataCompletionSubject$ = new BehaviorSubject(false) - - constructor( - private readonly api: ApiService, - private readonly errorToastService: ErrorToastService, - ) {} - - async pollDataTransferProgress() { - await pauseFor(500) - - if (this.dataTransferProgress?.complete) { - this.dataCompletionSubject$.next(true) - return - } - - try { - const progress = await this.api.getStatus() - if (!progress) return - - this.dataTransferProgress = { - bytesTransferred: progress['bytes-transferred'], - totalBytes: progress['total-bytes'], - complete: progress.complete, - } - if (this.dataTransferProgress.totalBytes) { - this.dataProgress$.next( - this.dataTransferProgress.bytesTransferred / - this.dataTransferProgress.totalBytes, - ) - } - } catch (e: any) { - this.errorToastService.present(e) - } - setTimeout(() => this.pollDataTransferProgress(), 0) // prevent call stack from growing - } + constructor(private readonly api: ApiService) {} async importDrive(guid: string, password: string): Promise { await this.api.attach({