better transfer progress (#2350)

* better transfer progress

* frontend for calculating transfer size

* fixes from testing

* improve internal api

---------

Co-authored-by: Matt Hill <mattnine@protonmail.com>
This commit is contained in:
Aiden McClelland
2023-07-13 19:40:53 -06:00
committed by GitHub
parent cc0e525dc5
commit 2c07cf50fa
8 changed files with 203 additions and 149 deletions

View File

@@ -370,7 +370,7 @@ async fn perform_backup<Db: DbHandle>(
}
let luks_folder = Path::new("/media/embassy/config/luks");
if tokio::fs::metadata(&luks_folder).await.is_ok() {
dir_copy(&luks_folder, &luks_folder_bak).await?;
dir_copy(&luks_folder, &luks_folder_bak, None).await?;
}
let timestamp = Some(Utc::now());

View File

@@ -109,7 +109,7 @@ async fn approximate_progress(
if tokio::fs::metadata(&dir).await.is_err() {
*size = 0;
} else {
*size = dir_size(&dir).await?;
*size = dir_size(&dir, None).await?;
}
}
Ok(())
@@ -285,7 +285,7 @@ async fn restore_packages(
progress_info.package_installs.insert(id.clone(), progress);
progress_info
.src_volume_size
.insert(id.clone(), dir_size(backup_dir(&id)).await?);
.insert(id.clone(), dir_size(backup_dir(&id), None).await?);
progress_info.target_volume_size.insert(id.clone(), 0);
let package_id = id.clone();
tasks.push(

View File

@@ -1,9 +1,9 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use color_eyre::eyre::eyre;
use futures::StreamExt;
use helpers::{Rsync, RsyncOptions};
use josekit::jwk::Jwk;
use openssl::x509::X509;
use patch_db::DbHandle;
@@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use sqlx::Connection;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::try_join;
use torut::onion::OnionAddressV3;
use tracing::instrument;
@@ -32,6 +33,7 @@ use crate::disk::REPAIR_DISK_PATH;
use crate::hostname::Hostname;
use crate::init::{init, InitResult};
use crate::middleware::encrypt::EncryptedWire;
use crate::util::io::{dir_copy, dir_size, Counter};
use crate::{Error, ErrorKind, ResultExt};
#[command(subcommands(status, disk, attach, execute, cifs, complete, get_pubkey, exit))]
@@ -420,70 +422,70 @@ async fn migrate(
)
.await?;
let mut main_transfer = Rsync::new(
"/media/embassy/migrate/main/",
"/embassy-data/main/",
RsyncOptions {
delete: true,
force: true,
ignore_existing: false,
exclude: Vec::new(),
no_permissions: false,
no_owner: false,
},
)
.await?;
let mut package_data_transfer = Rsync::new(
let main_transfer_args = ("/media/embassy/migrate/main/", "/embassy-data/main/");
let package_data_transfer_args = (
"/media/embassy/migrate/package-data/",
"/embassy-data/package-data/",
RsyncOptions {
delete: true,
force: true,
ignore_existing: false,
exclude: vec!["tmp".to_owned()],
no_permissions: false,
no_owner: false,
},
)
.await?;
);
let mut main_prog = 0.0;
let mut main_complete = false;
let mut pkg_prog = 0.0;
let mut pkg_complete = false;
let tmpdir = Path::new(package_data_transfer_args.0).join("tmp");
if tokio::fs::metadata(&tmpdir).await.is_ok() {
tokio::fs::remove_dir_all(&tmpdir).await?;
}
let ordering = std::sync::atomic::Ordering::Relaxed;
let main_transfer_size = Counter::new(0, ordering);
let package_data_transfer_size = Counter::new(0, ordering);
let size = tokio::select! {
res = async {
let (main_size, package_data_size) = try_join!(
dir_size(main_transfer_args.0, Some(&main_transfer_size)),
dir_size(package_data_transfer_args.0, Some(&package_data_transfer_size))
)?;
Ok::<_, Error>(main_size + package_data_size)
} => { res? },
res = async {
loop {
tokio::select! {
p = main_transfer.progress.next() => {
if let Some(p) = p {
main_prog = p;
} else {
main_prog = 1.0;
main_complete = true;
}
}
p = package_data_transfer.progress.next() => {
if let Some(p) = p {
pkg_prog = p;
} else {
pkg_prog = 1.0;
pkg_complete = true;
}
}
}
if main_prog > 0.0 && pkg_prog > 0.0 {
tokio::time::sleep(Duration::from_secs(1)).await;
*ctx.setup_status.write().await = Some(Ok(SetupStatus {
bytes_transferred: ((main_prog * 50.0) + (pkg_prog * 950.0)) as u64,
total_bytes: Some(1000),
bytes_transferred: 0,
total_bytes: Some(main_transfer_size.load() + package_data_transfer_size.load()),
complete: false,
}));
}
if main_complete && pkg_complete {
break;
}
}
} => res,
};
main_transfer.wait().await?;
package_data_transfer.wait().await?;
*ctx.setup_status.write().await = Some(Ok(SetupStatus {
bytes_transferred: 0,
total_bytes: Some(size),
complete: false,
}));
let main_transfer_progress = Counter::new(0, ordering);
let package_data_transfer_progress = Counter::new(0, ordering);
tokio::select! {
res = async {
try_join!(
dir_copy(main_transfer_args.0, main_transfer_args.1, Some(&main_transfer_progress)),
dir_copy(package_data_transfer_args.0, package_data_transfer_args.1, Some(&package_data_transfer_progress))
)?;
Ok::<_, Error>(())
} => { res? },
res = async {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
*ctx.setup_status.write().await = Some(Ok(SetupStatus {
bytes_transferred: main_transfer_progress.load() + package_data_transfer_progress.load(),
total_bytes: Some(size),
complete: false,
}));
}
} => res,
}
let (hostname, tor_addr, root_ca) = setup_init(&ctx, Some(embassy_password)).await?;

View File

@@ -2,6 +2,7 @@ use std::future::Future;
use std::io::Cursor;
use std::os::unix::prelude::MetadataExt;
use std::path::Path;
use std::sync::atomic::AtomicU64;
use std::task::Poll;
use futures::future::{BoxFuture, Fuse};
@@ -224,6 +225,7 @@ pub async fn copy_and_shutdown<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
pub fn dir_size<'a, P: AsRef<Path> + 'a + Send + Sync>(
path: P,
ctr: Option<&'a Counter>,
) -> BoxFuture<'a, Result<u64, std::io::Error>> {
async move {
tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(path.as_ref()).await?)
@@ -231,9 +233,12 @@ pub fn dir_size<'a, P: AsRef<Path> + 'a + Send + Sync>(
let m = e.metadata().await?;
Ok(acc
+ if m.is_file() {
if let Some(ctr) = ctr {
ctr.add(m.len());
}
m.len()
} else if m.is_dir() {
dir_size(e.path()).await?
dir_size(e.path(), ctr).await?
} else {
0
})
@@ -419,9 +424,60 @@ impl<T: AsyncWrite> AsyncWrite for BackTrackingReader<T> {
}
}
pub struct Counter {
atomic: AtomicU64,
ordering: std::sync::atomic::Ordering,
}
impl Counter {
pub fn new(init: u64, ordering: std::sync::atomic::Ordering) -> Self {
Self {
atomic: AtomicU64::new(init),
ordering,
}
}
pub fn load(&self) -> u64 {
self.atomic.load(self.ordering)
}
pub fn add(&self, value: u64) {
self.atomic.fetch_add(value, self.ordering);
}
}
#[pin_project::pin_project]
pub struct CountingReader<'a, R> {
ctr: &'a Counter,
#[pin]
rdr: R,
}
impl<'a, R> CountingReader<'a, R> {
pub fn new(rdr: R, ctr: &'a Counter) -> Self {
Self { ctr, rdr }
}
pub fn into_inner(self) -> R {
self.rdr
}
}
impl<'a, R: AsyncRead> AsyncRead for CountingReader<'a, R> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let this = self.project();
let start = buf.filled().len();
let res = this.rdr.poll_read(cx, buf);
let len = buf.filled().len() - start;
if len > 0 {
this.ctr.add(len as u64);
}
res
}
}
pub fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + Send + Sync>(
src: P0,
dst: P1,
ctr: Option<&'a Counter>,
) -> BoxFuture<'a, Result<(), crate::Error>> {
async move {
let m = tokio::fs::metadata(&src).await?;
@@ -471,16 +527,17 @@ pub fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + S
format!("create {}", dst_path.display()),
)
})?;
tokio::io::copy(
&mut tokio::fs::File::open(&src_path).await.with_ctx(|_| {
let mut rdr = tokio::fs::File::open(&src_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
format!("open {}", src_path.display()),
)
})?,
&mut dst_file,
)
.await
})?;
if let Some(ctr) = ctr {
tokio::io::copy(&mut CountingReader::new(rdr, ctr), &mut dst_file).await
} else {
tokio::io::copy(&mut rdr, &mut dst_file).await
}
.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
@@ -508,7 +565,7 @@ pub fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + S
)
})?;
} else if m.is_dir() {
dir_copy(src_path, dst_path).await?;
dir_copy(src_path, dst_path, ctr).await?;
} else if m.file_type().is_symlink() {
tokio::fs::symlink(
tokio::fs::read_link(&src_path).await.with_ctx(|_| {

View File

@@ -2,15 +2,12 @@
<ion-grid>
<ion-row class="ion-align-items-center">
<ion-col class="ion-text-center">
<ion-card
*ngIf="{ decimal: progress$ | async } as progress"
color="dark"
>
<ion-card *ngIf="progress$ | async as progress" color="dark">
<ion-card-header>
<ion-card-title>Initializing StartOS</ion-card-title>
<div class="center-wrapper">
<ion-card-subtitle *ngIf="progress.decimal as decimal">
Progress: {{ (decimal * 100).toFixed(0)}}%
<ion-card-subtitle>
{{ progress.transferred | toMessage }}
</ion-card-subtitle>
</div>
</ion-card-header>
@@ -18,16 +15,22 @@
<ion-card-content class="ion-margin">
<ion-progress-bar
color="tertiary"
style="
max-width: 700px;
margin: auto;
padding-bottom: 20px;
margin-bottom: 40px;
"
[type]="progress.decimal && progress.decimal < 1 ? 'determinate' : 'indeterminate'"
[value]="progress.decimal || 0"
style="max-width: 700px; margin: auto; margin-bottom: 36px"
[type]="progress.transferred && progress.transferred < 1 ? 'determinate' : 'indeterminate'"
[value]="progress.transferred || 0"
></ion-progress-bar>
<p>{{ progress.decimal | toMessage }}</p>
<p>
<ng-container *ngIf="progress.totalBytes as total">
<ng-container
*ngIf="progress.transferred as transferred; else calculating"
>
Progress: {{ (transferred * 100).toFixed() }}%
</ng-container>
<ng-template #calculating>
{{ (progress.totalBytes / 1073741824).toFixed(2) }} GB
</ng-template>
</ng-container>
</p>
</ion-card-content>
</ion-card>
</ion-col>

View File

@@ -2,6 +2,14 @@ import { Component } from '@angular/core'
import { NavController } from '@ionic/angular'
import { StateService } from 'src/app/services/state.service'
import { Pipe, PipeTransform } from '@angular/core'
import { BehaviorSubject } from 'rxjs'
import { ApiService } from 'src/app/services/api/api.service'
import { ErrorToastService, pauseFor } from '@start9labs/shared'
type Progress = {
totalBytes: number | null
transferred: number
}
@Component({
selector: 'app-loading',
@@ -9,23 +17,49 @@ import { Pipe, PipeTransform } from '@angular/core'
styleUrls: ['loading.page.scss'],
})
export class LoadingPage {
readonly progress$ = this.stateService.dataProgress$
readonly progress$ = new BehaviorSubject<Progress>({
totalBytes: null,
transferred: 0,
})
constructor(
private readonly stateService: StateService,
private readonly navCtrl: NavController,
private readonly api: ApiService,
private readonly errorToastService: ErrorToastService,
) {}
ngOnInit() {
this.stateService.pollDataTransferProgress()
const progSub = this.stateService.dataCompletionSubject$.subscribe(
async complete => {
if (complete) {
progSub.unsubscribe()
await this.navCtrl.navigateForward(`/success`)
this.poll()
}
async poll() {
try {
const progress = await this.api.getStatus()
if (!progress) return
const {
'total-bytes': totalBytes,
'bytes-transferred': bytesTransferred,
} = progress
this.progress$.next({
totalBytes,
transferred: totalBytes ? bytesTransferred / totalBytes : 0,
})
if (progress.complete) {
this.navCtrl.navigateForward(`/success`)
this.progress$.complete()
return
}
await pauseFor(250)
setTimeout(() => this.poll(), 0) // prevent call stack from growing
} catch (e: any) {
this.errorToastService.present(e)
}
},
)
}
}
@@ -41,7 +75,7 @@ export class ToMessagePipe implements PipeTransform {
}
if (!progress) {
return 'Preparing data. This can take a while'
return 'Calculating size'
} else if (progress < 1) {
return 'Copying data'
} else {

View File

@@ -17,8 +17,6 @@ let tries: number
export class MockApiService extends ApiService {
async getStatus() {
const restoreOrMigrate = true
const total = 4
await pauseFor(1000)
if (tries === undefined) {
@@ -27,7 +25,9 @@ export class MockApiService extends ApiService {
}
tries++
const progress = tries - 1
const total = tries <= 4 ? tries * 268435456 : 1073741824
const progress = tries > 4 ? (tries - 4) * 268435456 : 0
return {
'bytes-transferred': restoreOrMigrate ? progress : 0,

View File

@@ -1,7 +1,5 @@
import { Injectable } from '@angular/core'
import { BehaviorSubject } from 'rxjs'
import { ApiService, RecoverySource } from './api/api.service'
import { pauseFor, ErrorToastService } from '@start9labs/shared'
@Injectable({
providedIn: 'root',
@@ -12,47 +10,7 @@ export class StateService {
recoverySource?: RecoverySource
recoveryPassword?: string
dataTransferProgress?: {
bytesTransferred: number
totalBytes: number | null
complete: boolean
}
dataProgress$ = new BehaviorSubject<number>(0)
dataCompletionSubject$ = new BehaviorSubject(false)
constructor(
private readonly api: ApiService,
private readonly errorToastService: ErrorToastService,
) {}
async pollDataTransferProgress() {
await pauseFor(500)
if (this.dataTransferProgress?.complete) {
this.dataCompletionSubject$.next(true)
return
}
try {
const progress = await this.api.getStatus()
if (!progress) return
this.dataTransferProgress = {
bytesTransferred: progress['bytes-transferred'],
totalBytes: progress['total-bytes'],
complete: progress.complete,
}
if (this.dataTransferProgress.totalBytes) {
this.dataProgress$.next(
this.dataTransferProgress.bytesTransferred /
this.dataTransferProgress.totalBytes,
)
}
} catch (e: any) {
this.errorToastService.present(e)
}
setTimeout(() => this.pollDataTransferProgress(), 0) // prevent call stack from growing
}
constructor(private readonly api: ApiService) {}
async importDrive(guid: string, password: string): Promise<void> {
await this.api.attach({