From 5b91b5f4367961b811d9123ed041ce8aed2c1c0d Mon Sep 17 00:00:00 2001 From: J M <2364004+Blu-J@users.noreply.github.com> Date: Tue, 15 Nov 2022 13:44:53 -0700 Subject: [PATCH] feat: For ota update using rsyncd (#1938) * feat: For ota update using rsyncd * chore: Fix where we rsync to. * chore: Getting rsync to work * chore: Add in the is raspberry pi * chore: Update is raspberry pi --- Makefile | 2 +- backend/src/lib.rs | 3 + backend/src/net/net_utils.rs | 9 +- backend/src/update/mod.rs | 70 +++++-------- build/registry/resyncRsyncRegistry | 54 ++++++++++ libs/helpers/Cargo.toml | 2 +- libs/helpers/src/rsync.rs | 2 +- libs/models/src/js_engine_types.rs | 163 +++++++++++++++++++++++++++++ 8 files changed, 255 insertions(+), 50 deletions(-) create mode 100755 build/registry/resyncRsyncRegistry create mode 100644 libs/models/src/js_engine_types.rs diff --git a/Makefile b/Makefile index ab9082752..767883eb8 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ EMBASSY_SRC := backend/embassyd.service backend/embassy-init.service $(EMBASSY_U COMPAT_SRC := $(shell find system-images/compat/ -not -path 'system-images/compat/target/*' -and -not -name *.tar -and -not -name target) UTILS_SRC := $(shell find system-images/utils/ -not -name *.tar) BINFMT_SRC := $(shell find system-images/binfmt/ -not -name *.tar) -BACKEND_SRC := $(shell find backend/src) $(shell find backend/migrations) $(shell find patch-db/*/src) backend/Cargo.toml backend/Cargo.lock +BACKEND_SRC := $(shell find backend/src) $(shell find backend/migrations) $(shell find patch-db/*/src) $(shell find libs/*/src) libs/*/Cargo.toml backend/Cargo.toml backend/Cargo.lock FRONTEND_SHARED_SRC := $(shell find frontend/projects/shared) $(shell ls -p frontend/ | grep -v / | sed 's/^/frontend\//g') frontend/package.json frontend/node_modules frontend/config.json patch-db/client/dist frontend/patchdb-ui-seed.json FRONTEND_UI_SRC := $(shell find frontend/projects/ui) FRONTEND_SETUP_WIZARD_SRC := $(shell find frontend/projects/setup-wizard) diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 8bca91f20..00c8b7457 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -8,6 +8,9 @@ lazy_static::lazy_static! { let (arch, _) = TARGET.split_once("-").unwrap(); arch }; + pub static ref IS_RASPBERRY_PI: bool = { + *ARCH == "aarch64" + }; } pub mod action; diff --git a/backend/src/net/net_utils.rs b/backend/src/net/net_utils.rs index b6d3434a5..73d97675b 100644 --- a/backend/src/net/net_utils.rs +++ b/backend/src/net/net_utils.rs @@ -18,7 +18,7 @@ pub fn host_addr_fqdn(req: &Request) -> Result { .map_err(|e| Error::new(eyre!("{}", e), crate::ErrorKind::AsciiError))? .to_string(); - let host_uri: ResourceFqdn = host_str.parse()?; + let host_uri: ResourceFqdn = host_str.split(':').next().unwrap().parse()?; Ok(host_uri) } @@ -35,7 +35,7 @@ pub enum ResourceFqdn { root: String, tld: Tld, }, - LocalHost + LocalHost, } impl fmt::Display for ResourceFqdn { @@ -51,7 +51,7 @@ impl fmt::Display for ResourceFqdn { } => { write!(f, "{}", full_uri) } - ResourceFqdn::LocalHost => write!(f, "localhost") + ResourceFqdn::LocalHost => write!(f, "localhost"), } } } @@ -77,11 +77,10 @@ impl FromStr for ResourceFqdn { type Err = Error; fn from_str(input: &str) -> Result { - if input == "localhost" { return Ok(ResourceFqdn::LocalHost); } - + if let Ok(ip) = input.parse::() { return Ok(ResourceFqdn::IpAddr(ip)); } diff --git a/backend/src/update/mod.rs b/backend/src/update/mod.rs index b28a775b5..2324c40b0 100644 --- a/backend/src/update/mod.rs +++ b/backend/src/update/mod.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -16,11 +17,8 @@ use tracing::instrument; use crate::context::RpcContext; use crate::db::model::UpdateProgress; use crate::disk::mount::filesystem::bind::Bind; -use crate::disk::mount::filesystem::block_dev::BlockDev; -use crate::disk::mount::filesystem::httpdirfs::HttpDirFS; -use crate::disk::mount::filesystem::ReadOnly; use crate::disk::mount::filesystem::ReadWrite; -use crate::disk::mount::guard::{MountGuard, TmpMountGuard}; +use crate::disk::mount::guard::MountGuard; use crate::notifications::NotificationLevel; use crate::sound::{ CIRCLE_OF_5THS_SHORT, UPDATE_FAILED_1, UPDATE_FAILED_2, UPDATE_FAILED_3, UPDATE_FAILED_4, @@ -29,7 +27,7 @@ use crate::update::latest_information::LatestInformation; use crate::util::Invoke; use crate::version::{Current, VersionT}; -use crate::{Error, ErrorKind, ResultExt}; +use crate::{Error, ErrorKind, ResultExt, IS_RASPBERRY_PI}; mod latest_information; @@ -131,23 +129,10 @@ async fn maybe_do_update( // validate (hash) fs // kernel update? // swap selected fs - let new_block_dev = TmpMountGuard::mount( - &HttpDirFS::new( - EosUrl { - base: marketplace_url, - version: latest_version, - } - .to_string() - .parse()?, - ), - ReadOnly, - ) - .await?; - let new_fs = TmpMountGuard::mount( - &BlockDev::new(new_block_dev.as_ref().join("eos.img")), - ReadOnly, - ) - .await?; + let eos_url = EosUrl { + base: marketplace_url, + version: latest_version, + }; status.update_progress = Some(UpdateProgress { size: Some(100), @@ -157,7 +142,7 @@ async fn maybe_do_update( let rev = tx.commit().await?; tokio::spawn(async move { - let res = do_update(ctx.clone(), new_fs, new_block_dev).await; + let res = do_update(ctx.clone(), eos_url).await; let mut db = ctx.db.handle(); let mut status = crate::db::DatabaseModel::new() .server_info() @@ -212,14 +197,10 @@ async fn maybe_do_update( Ok(rev) } -#[instrument(skip(ctx, new_fs, new_block_dev))] -async fn do_update( - ctx: RpcContext, - new_fs: TmpMountGuard, - new_block_dev: TmpMountGuard, -) -> Result<(), Error> { +#[instrument(skip(ctx, eos_url))] +async fn do_update(ctx: RpcContext, eos_url: EosUrl) -> Result<(), Error> { let mut rsync = Rsync::new( - new_fs.as_ref().join(""), + eos_url.rsync_path()?, "/media/embassy/next", Default::default(), )?; @@ -238,8 +219,6 @@ async fn do_update( .await?; } rsync.wait().await?; - new_fs.unmount().await?; - new_block_dev.unmount().await?; copy_fstab().await?; copy_machine_id().await?; @@ -255,16 +234,23 @@ struct EosUrl { base: Url, version: Version, } -impl std::fmt::Display for EosUrl { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}/eos/v0/eos.img?spec=={}&eos-version={}&arch={}", - self.base, - self.version, - Current::new().semver(), - &*crate::ARCH, - ) + +impl EosUrl { + #[instrument()] + pub fn rsync_path(&self) -> Result { + let host = self + .base + .host_str() + .ok_or_else(|| Error::new(eyre!("Could not get host of base"), ErrorKind::ParseUrl))?; + let version: &Version = &self.version; + let arch = if *IS_RASPBERRY_PI { + "raspberry_pi" + } else { + *crate::ARCH + }; + Ok(format!("{host}::{version}/{arch}/") + .parse() + .map_err(|e| Error::new(eyre!("Could not parse path"), ErrorKind::ParseUrl))?) } } diff --git a/build/registry/resyncRsyncRegistry b/build/registry/resyncRsyncRegistry new file mode 100755 index 000000000..88e4b07b9 --- /dev/null +++ b/build/registry/resyncRsyncRegistry @@ -0,0 +1,54 @@ +#!/bin/sh + +# Expecting that the eos files are sitting in ~/resources/eos/*/*.{arch}.squashfs +# and that the arch could be aarm64 or x86_64 + +# Then we are going to make sure that each of these files is then put on the rsyncd server +# so the embassies can pull them down + + +cat > /etc/rsyncd.conf << RD +uid = root +gid = root +use chroot = yes +max connections = 4 +pid file = /var/run/rsyncd.pid +exclude = lost+found/ +transfer logging = yes +timeout = 900 +ignore nonreadable = yes +dont compress = *.gz *.tgz *.zip *.z *.Z *.rpm *.deb *.bz2 + +RD + +for dir in ~/resources/eos/*/*.squashfs # list directories in the form "/tmp/dirname/" +do + directory=${dir%/*} + cd $directory + filename=${dir##*/} + version=$(echo $directory | sed -r 's/.*\///') + version_dir="/srv/rsync/$version" + type=$(echo "$filename" | sed -r "s/^.*?\.(\w+)\.squashfs$/\1/") + new_dir="$version_dir/$type" + + + echo "Making new dir $new_dir" + mkdir -p $new_dir + + if ! test -n "$(mount -l | grep $new_dir)" + then + echo "Mounting $filename to $new_dir" + mount $filename $new_dir + fi + +cat >> /etc/rsyncd.conf << INSERTING +[$version] +path = $version_dir +read only = yes + +INSERTING + +done + +echo "Created rsyncd.conf file, restarting service" +systemctl restart rsync diff --git a/libs/helpers/Cargo.toml b/libs/helpers/Cargo.toml index 31bb07326..08549c670 100644 --- a/libs/helpers/Cargo.toml +++ b/libs/helpers/Cargo.toml @@ -13,4 +13,4 @@ pin-project = "1.0.11" serde = { version = "1.0", features = ["derive", "rc"] } tokio = { version = "1.19.2", features = ["full"] } tokio-stream = { version = "0.1.9", features = ["io-util", "sync"] } -tracing = "0.1.35" +tracing = "0.1.35" \ No newline at end of file diff --git a/libs/helpers/src/rsync.rs b/libs/helpers/src/rsync.rs index 40a76eb6e..9a817ecd0 100644 --- a/libs/helpers/src/rsync.rs +++ b/libs/helpers/src/rsync.rs @@ -57,7 +57,7 @@ impl Rsync { cmd.arg(format!("--exclude={}", exclude)); } let mut command = cmd - .arg("-ac") + .arg("-acAXH") .arg("--info=progress2") .arg(src.as_ref()) .arg(dst.as_ref()) diff --git a/libs/models/src/js_engine_types.rs b/libs/models/src/js_engine_types.rs new file mode 100644 index 000000000..ab3b44c06 --- /dev/null +++ b/libs/models/src/js_engine_types.rs @@ -0,0 +1,163 @@ +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; + +use color_eyre::eyre::bail; +use embassy_container_init::{Input, Output, ProcessId, RpcId}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + Mutex, +}; + +/// Used by the js-executor, it is the ability to just create a command in an already running exec +pub type ExecCommand = Arc< + dyn Fn( + String, + Vec, + UnboundedSender, + Option, + ) -> Pin> + 'static>> + + Send + + Sync + + 'static, +>; + +/// Used by the js-executor, it is the ability to just create a command in an already running exec +pub type SendKillSignal = Arc< + dyn Fn(RpcId, u32) -> Pin> + 'static>> + + Send + + Sync + + 'static, +>; + +pub trait CommandInserter { + fn insert_command( + &self, + command: String, + args: Vec, + sender: UnboundedSender, + timeout: Option, + ) -> Pin>>>; + + fn send_signal(&self, id: RpcId, command: u32) -> Pin>>; +} + +pub type ArcCommandInserter = Arc>>>; + +pub struct ExecutingCommand { + rpc_id: RpcId, + /// Will exist until killed + command_inserter: Arc>>, + owned_futures: Arc>>>>>, +} + +impl ExecutingCommand { + pub async fn new( + command_inserter: ArcCommandInserter, + command: String, + args: Vec, + timeout: Option, + ) -> Result { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); + let rpc_id = { + let locked_command_inserter = command_inserter.lock().await; + let locked_command_inserter = match &*locked_command_inserter { + Some(a) => a, + None => bail!("Expecting containers.main in the package manifest".to_string()), + }; + match locked_command_inserter + .insert_command(command, args, sender, timeout) + .await + { + Some(a) => a, + None => bail!("Couldn't get command started ".to_string()), + } + }; + let executing_commands = ExecutingCommand { + rpc_id, + command_inserter: Arc::new(Mutex::new(Some(command_inserter.clone()))), + owned_futures: Default::default(), + }; + // let waiting = self.wait() + Ok(executing_commands) + } + + async fn wait( + rpc_id: RpcId, + mut outputs: UnboundedReceiver, + ) -> Result, String)> { + let (process_id_send, process_id_recv) = tokio::sync::oneshot::channel::(); + let mut answer = String::new(); + let mut command_error = String::new(); + let mut status: Option = None; + let mut process_id_send = Some(process_id_send); + while let Some(output) = outputs.recv().await { + match output { + Output::ProcessId(process_id) => { + if let Some(process_id_send) = process_id_send.take() { + if let Err(err) = process_id_send.send(process_id) { + tracing::error!( + "Could not get a process id {process_id:?} sent for {rpc_id:?}" + ); + tracing::debug!("{err:?}"); + } + } + } + Output::Line(value) => { + answer.push_str(&value); + answer.push('\n'); + } + Output::Error(error) => { + command_error.push_str(&error); + command_error.push('\n'); + } + Output::Done(error_code) => { + status = error_code; + break; + } + } + } + if !command_error.is_empty() { + return Err((status, command_error)); + } + + Ok(answer) + } + + async fn send_signal(&self, signal: u32) { + let locked = self.command_inserter.lock().await; + let inner = match &*locked { + Some(a) => a, + None => return, + }; + let locked = inner.lock().await; + let command_inserter = match &*locked { + Some(a) => a, + None => return, + }; + command_inserter.send_signal(self.rpc_id, signal); + } + /// Should only be called when output::done + async fn killed(&self) { + *self.owned_futures.lock().await = Default::default(); + *self.command_inserter.lock().await = Default::default(); + } + pub fn rpc_id(&self) -> RpcId { + self.rpc_id + } +} + +impl Drop for ExecutingCommand { + fn drop(&mut self) { + let command_inserter = self.command_inserter.clone(); + let rpc_id = self.rpc_id.clone(); + tokio::spawn(async move { + let command_inserter_lock = command_inserter.lock().await; + let command_inserter = match &*command_inserter_lock { + Some(a) => a, + None => { + return; + } + }; + command_inserter.send_kill_command(rpc_id, 9).await; + }); + } +}