Files
start-os/core/startos/src/progress.rs
Aiden McClelland fab13db4b4 Feature/lxc container runtime (#2514)
* wip: static-server errors

* wip: fix wifi

* wip: Fix the service_effects

* wip: Fix cors in the middleware

* wip(chore): Auth clean up the lint.

* wip(fix): Vhost

* wip: continue manager refactor

Co-authored-by: J H <Blu-J@users.noreply.github.com>

* wip: service manager refactor

* wip: Some fixes

* wip(fix): Fix the lib.rs

* wip

* wip(fix): Logs

* wip: bins

* wip(innspect): Add in the inspect

* wip: config

* wip(fix): Diagnostic

* wip(fix): Dependencies

* wip: context

* wip(fix) Sorta auth

* wip: warnings

* wip(fix): registry/admin

* wip(fix) marketplace

* wip(fix) Some more converted and fixed with the linter and config

* wip: Working on the static server

* wip(fix)static server

* wip: Remove some asynnc

* wip: Something about the request and regular rpc

* wip: gut install

Co-authored-by: J H <Blu-J@users.noreply.github.com>

* wip: Convert the static server into the new system

* wip delete file

* test

* wip(fix) vhost does not need the with safe defaults

* wip: Adding in the wifi

* wip: Fix the developer and the verify

* wip: new install flow

Co-authored-by: J H <Blu-J@users.noreply.github.com>

* fix middleware

* wip

* wip: Fix the auth

* wip

* continue service refactor

* feature: Service get_config

* feat: Action

* wip: Fighting the great fight against the borrow checker

* wip: Remove an error in a file that I just need to deel with later

* chore: Add in some more lifetime stuff to the services

* wip: Install fix on lifetime

* cleanup

* wip: Deal with the borrow later

* more cleanup

* resolve borrowchecker errors

* wip(feat): add in the handler for the socket, for now

* wip(feat): Update the service_effect_handler::action

* chore: Add in the changes to make sure the from_service goes to context

* chore: Change the

* refactor service map

* fix references to service map

* fill out restore

* wip: Before I work on the store stuff

* fix backup module

* handle some warnings

* feat: add in the ui components on the rust side

* feature: Update the procedures

* chore: Update the js side of the main and a few of the others

* chore: Update the rpc listener to match the persistant container

* wip: Working on updating some things to have a better name

* wip(feat): Try and get the rpc to return the correct shape?

* lxc wip

* wip(feat): Try and get the rpc to return the correct shape?

* build for container runtime wip

* remove container-init

* fix build

* fix error

* chore: Update to work I suppose

* lxc wip

* remove docker module and feature

* download alpine squashfs automatically

* overlays effect

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* chore: Add the overlay effect

* feat: Add the mounter in the main

* chore: Convert to use the mounts, still need to work with the sandbox

* install fixes

* fix ssl

* fixes from testing

* implement tmpfile for upload

* wip

* misc fixes

* cleanup

* cleanup

* better progress reporting

* progress for sideload

* return real guid

* add devmode script

* fix lxc rootfs path

* fix percentage bar

* fix progress bar styling

* fix build for unstable

* tweaks

* label progress

* tweaks

* update progress more often

* make symlink in rpc_client

* make socket dir

* fix parent path

* add start-cli to container

* add echo and gitInfo commands

* wip: Add the init + errors

* chore: Add in the exit effect for the system

* chore: Change the type to null for failure to parse

* move sigterm timeout to stopping status

* update order

* chore: Update the return type

* remove dbg

* change the map error

* chore: Update the thing to capture id

* chore add some life changes

* chore: Update the loging

* chore: Update the package to run module

* us From for RpcError

* chore: Update to use import instead

* chore: update

* chore: Use require for the backup

* fix a default

* update the type that is wrong

* chore: Update the type of the manifest

* chore: Update to make null

* only symlink if not exists

* get rid of double result

* better debug info for ErrorCollection

* chore: Update effects

* chore: fix

* mount assets and volumes

* add exec instead of spawn

* fix mounting in image

* fix overlay mounts

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* misc fixes

* feat: Fix two

* fix: systemForEmbassy main

* chore: Fix small part of main loop

* chore: Modify the bundle

* merge

* fixMain loop"

* move tsc to makefile

* chore: Update the return types of the health check

* fix client

* chore: Convert the todo to use tsmatches

* add in the fixes for the seen and create the hack to allow demo

* chore: Update to include the systemForStartOs

* chore UPdate to the latest types from the expected outout

* fixes

* fix typo

* Don't emit if failure on tsc

* wip

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* add s9pk api

* add inspection

* add inspect manifest

* newline after display serializable

* fix squashfs in image name

* edit manifest

Co-authored-by: Jade <Blu-J@users.noreply.github.com>

* wait for response on repl

* ignore sig for now

* ignore sig for now

* re-enable sig verification

* fix

* wip

* env and chroot

* add profiling logs

* set uid & gid in squashfs to 100000

* set uid of sqfs to 100000

* fix mksquashfs args

* add env to compat

* fix

* re-add docker feature flag

* fix docker output format being stupid

* here be dragons

* chore: Add in the cross compiling for something

* fix npm link

* extract logs from container on exit

* chore: Update for testing

* add log capture to drop trait

* chore: add in the modifications that I make

* chore: Update small things for no updates

* chore: Update the types of something

* chore: Make main not complain

* idmapped mounts

* idmapped volumes

* re-enable kiosk

* chore: Add in some logging for the new system

* bring in start-sdk

* remove avahi

* chore: Update the deps

* switch to musl

* chore: Update the version of prettier

* chore: Organize'

* chore: Update some of the headers back to the standard of fetch

* fix musl build

* fix idmapped mounts

* fix cross build

* use cross compiler for correct arch

* feat: Add in the faked ssl stuff for the effects

* @dr_bonez Did a solution here

* chore: Something that DrBonez

* chore: up

* wip: We have a working server!!!

* wip

* uninstall

* wip

* tes

---------

Co-authored-by: J H <dragondef@gmail.com>
Co-authored-by: J H <Blu-J@users.noreply.github.com>
Co-authored-by: J H <2364004+Blu-J@users.noreply.github.com>
2024-02-17 18:14:14 +00:00

443 lines
14 KiB
Rust

use std::panic::UnwindSafe;
use std::sync::Arc;
use std::time::Duration;
use futures::Future;
use imbl_value::{InOMap, InternedString};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncSeek, AsyncWrite};
use tokio::sync::{mpsc, watch};
use crate::db::model::DatabaseModel;
use crate::prelude::*;
lazy_static::lazy_static! {
static ref SPINNER: ProgressStyle = ProgressStyle::with_template("{spinner} {msg}...").unwrap();
static ref PERCENTAGE: ProgressStyle = ProgressStyle::with_template("{msg} {percent}% {wide_bar} [{bytes}/{total_bytes}] [{binary_bytes_per_sec} {eta}]").unwrap();
static ref BYTES: ProgressStyle = ProgressStyle::with_template("{spinner} {wide_msg} [{bytes}/?] [{binary_bytes_per_sec} {elapsed}]").unwrap();
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(untagged)]
pub enum Progress {
Complete(bool),
Progress { done: u64, total: Option<u64> },
}
impl Progress {
pub fn new() -> Self {
Progress::Complete(false)
}
pub fn update_bar(self, bar: &ProgressBar) {
match self {
Self::Complete(false) => {
bar.set_style(SPINNER.clone());
bar.tick();
}
Self::Complete(true) => {
bar.finish();
}
Self::Progress { done, total: None } => {
bar.set_style(BYTES.clone());
bar.set_position(done);
bar.tick();
}
Self::Progress {
done,
total: Some(total),
} => {
bar.set_style(PERCENTAGE.clone());
bar.set_position(done);
bar.set_length(total);
bar.tick();
}
}
}
pub fn set_done(&mut self, done: u64) {
*self = match *self {
Self::Complete(false) => Self::Progress { done, total: None },
Self::Progress { mut done, total } => {
if let Some(total) = total {
if done > total {
done = total;
}
}
Self::Progress { done, total }
}
Self::Complete(true) => Self::Complete(true),
};
}
pub fn set_total(&mut self, total: u64) {
*self = match *self {
Self::Complete(false) => Self::Progress {
done: 0,
total: Some(total),
},
Self::Progress { done, .. } => Self::Progress {
done,
total: Some(total),
},
Self::Complete(true) => Self::Complete(true),
}
}
pub fn add_total(&mut self, total: u64) {
if let Self::Progress {
done,
total: Some(old),
} = *self
{
*self = Self::Progress {
done,
total: Some(old + total),
};
} else {
self.set_total(total)
}
}
pub fn complete(&mut self) {
*self = Self::Complete(true);
}
}
impl std::ops::Add<u64> for Progress {
type Output = Self;
fn add(self, rhs: u64) -> Self::Output {
match self {
Self::Complete(false) => Self::Progress {
done: rhs,
total: None,
},
Self::Progress { done, total } => {
let mut done = done + rhs;
if let Some(total) = total {
if done > total {
done = total;
}
}
Self::Progress { done, total }
}
Self::Complete(true) => Self::Complete(true),
}
}
}
impl std::ops::AddAssign<u64> for Progress {
fn add_assign(&mut self, rhs: u64) {
*self = *self + rhs;
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct NamedProgress {
pub name: InternedString,
pub progress: Progress,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FullProgress {
pub overall: Progress,
pub phases: Vec<NamedProgress>,
}
impl FullProgress {
pub fn new() -> Self {
Self {
overall: Progress::new(),
phases: Vec::new(),
}
}
}
pub struct FullProgressTracker {
overall: Arc<watch::Sender<Progress>>,
overall_recv: watch::Receiver<Progress>,
phases: InOMap<InternedString, watch::Receiver<Progress>>,
new_phase: (
mpsc::UnboundedSender<(InternedString, watch::Receiver<Progress>)>,
mpsc::UnboundedReceiver<(InternedString, watch::Receiver<Progress>)>,
),
}
impl FullProgressTracker {
pub fn new() -> Self {
let (overall, overall_recv) = watch::channel(Progress::new());
Self {
overall: Arc::new(overall),
overall_recv,
phases: InOMap::new(),
new_phase: mpsc::unbounded_channel(),
}
}
fn fill_phases(&mut self) -> bool {
let mut changed = false;
while let Ok((name, phase)) = self.new_phase.1.try_recv() {
self.phases.insert(name, phase);
changed = true;
}
changed
}
pub fn snapshot(&mut self) -> FullProgress {
self.fill_phases();
FullProgress {
overall: *self.overall.borrow(),
phases: self
.phases
.iter()
.map(|(name, progress)| NamedProgress {
name: name.clone(),
progress: *progress.borrow(),
})
.collect(),
}
}
pub async fn changed(&mut self) {
if self.fill_phases() {
return;
}
let phases = self
.phases
.iter_mut()
.map(|(_, p)| Box::pin(p.changed()))
.collect_vec();
tokio::select! {
_ = self.overall_recv.changed() => (),
_ = futures::future::select_all(phases) => (),
}
}
pub fn handle(&self) -> FullProgressTrackerHandle {
FullProgressTrackerHandle {
overall: self.overall.clone(),
new_phase: self.new_phase.0.clone(),
}
}
pub fn sync_to_db<DerefFn>(
mut self,
db: PatchDb,
deref: DerefFn,
min_interval: Option<Duration>,
) -> impl Future<Output = Result<(), Error>> + 'static
where
DerefFn: Fn(&mut DatabaseModel) -> Option<&mut Model<FullProgress>> + 'static,
for<'a> &'a DerefFn: UnwindSafe + Send,
{
async move {
loop {
let progress = self.snapshot();
if db
.mutate(|v| {
if let Some(p) = deref(v) {
p.ser(&progress)?;
Ok(false)
} else {
Ok(true)
}
})
.await?
{
break;
}
tokio::join!(self.changed(), async {
if let Some(interval) = min_interval {
tokio::time::sleep(interval).await
} else {
futures::future::ready(()).await
}
});
}
Ok(())
}
}
}
#[derive(Clone)]
pub struct FullProgressTrackerHandle {
overall: Arc<watch::Sender<Progress>>,
new_phase: mpsc::UnboundedSender<(InternedString, watch::Receiver<Progress>)>,
}
impl FullProgressTrackerHandle {
pub fn add_phase(
&self,
name: InternedString,
overall_contribution: Option<u64>,
) -> PhaseProgressTrackerHandle {
if let Some(overall_contribution) = overall_contribution {
self.overall
.send_modify(|o| o.add_total(overall_contribution));
}
let (send, recv) = watch::channel(Progress::new());
let _ = self.new_phase.send((name, recv));
PhaseProgressTrackerHandle {
overall: self.overall.clone(),
overall_contribution,
contributed: 0,
progress: send,
}
}
pub fn complete(&self) {
self.overall.send_modify(|o| o.complete());
}
}
pub struct PhaseProgressTrackerHandle {
overall: Arc<watch::Sender<Progress>>,
overall_contribution: Option<u64>,
contributed: u64,
progress: watch::Sender<Progress>,
}
impl PhaseProgressTrackerHandle {
fn update_overall(&mut self) {
if let Some(overall_contribution) = self.overall_contribution {
let contribution = match *self.progress.borrow() {
Progress::Complete(true) => overall_contribution,
Progress::Progress {
done,
total: Some(total),
} => ((done as f64 / total as f64) * overall_contribution as f64) as u64,
_ => 0,
};
if contribution > self.contributed {
self.overall
.send_modify(|o| *o += contribution - self.contributed);
self.contributed = contribution;
}
}
}
pub fn set_done(&mut self, done: u64) {
self.progress.send_modify(|p| p.set_done(done));
self.update_overall();
}
pub fn set_total(&mut self, total: u64) {
self.progress.send_modify(|p| p.set_total(total));
self.update_overall();
}
pub fn add_total(&mut self, total: u64) {
self.progress.send_modify(|p| p.add_total(total));
self.update_overall();
}
pub fn complete(&mut self) {
self.progress.send_modify(|p| p.complete());
self.update_overall();
}
}
impl std::ops::AddAssign<u64> for PhaseProgressTrackerHandle {
fn add_assign(&mut self, rhs: u64) {
self.progress.send_modify(|p| *p += rhs);
self.update_overall();
}
}
#[pin_project::pin_project]
pub struct ProgressTrackerWriter<W> {
#[pin]
writer: W,
progress: PhaseProgressTrackerHandle,
}
impl<W> ProgressTrackerWriter<W> {
pub fn new(writer: W, progress: PhaseProgressTrackerHandle) -> Self {
Self { writer, progress }
}
pub fn into_inner(self) -> (W, PhaseProgressTrackerHandle) {
(self.writer, self.progress)
}
}
impl<W: AsyncWrite> AsyncWrite for ProgressTrackerWriter<W> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.project();
match this.writer.poll_write(cx, buf) {
std::task::Poll::Ready(Ok(n)) => {
*this.progress += n as u64;
std::task::Poll::Ready(Ok(n))
}
a => a,
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().writer.poll_flush(cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().writer.poll_shutdown(cx)
}
fn is_write_vectored(&self) -> bool {
self.writer.is_write_vectored()
}
fn poll_write_vectored(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().writer.poll_write_vectored(cx, bufs)
}
}
impl<W: AsyncSeek> AsyncSeek for ProgressTrackerWriter<W> {
fn start_seek(
self: std::pin::Pin<&mut Self>,
position: std::io::SeekFrom,
) -> std::io::Result<()> {
self.project().writer.start_seek(position)
}
fn poll_complete(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<u64>> {
let this = self.project();
match this.writer.poll_complete(cx) {
std::task::Poll::Ready(Ok(n)) => {
this.progress.set_done(n);
std::task::Poll::Ready(Ok(n))
}
a => a,
}
}
}
pub struct PhasedProgressBar {
multi: MultiProgress,
overall: ProgressBar,
phases: InOMap<InternedString, ProgressBar>,
}
impl PhasedProgressBar {
pub fn new(name: &str) -> Self {
let multi = MultiProgress::new();
Self {
overall: multi.add(
ProgressBar::new(0)
.with_style(SPINNER.clone())
.with_message(name.to_owned()),
),
multi,
phases: InOMap::new(),
}
}
pub fn update(&mut self, progress: &FullProgress) {
for phase in progress.phases.iter() {
if !self.phases.contains_key(&phase.name) {
self.phases.insert(
phase.name.clone(),
self.multi
.add(ProgressBar::new(0).with_style(SPINNER.clone()))
.with_message((&*phase.name).to_owned()),
);
}
}
progress.overall.update_bar(&self.overall);
for (name, bar) in self.phases.iter() {
if let Some(progress) = progress.phases.iter().find_map(|p| {
if &p.name == name {
Some(p.progress)
} else {
None
}
}) {
progress.update_bar(bar);
}
}
}
}