diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index 48d650c5a..54322fd74 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -214,21 +214,24 @@ impl RpcContext { if let Some(market) = crate::db::DatabaseModel::new() .server_info() .package_marketplace() - .get(&mut self.db.handle(), false) + .get(&mut self.db.handle(), true) .await? .to_owned() { market } else { - crate::db::DatabaseModel::new() - .server_info() - .eos_marketplace() - .get(&mut self.db.handle(), false) - .await? - .to_owned() + self.eos_registry_url().await? }, ) } + pub async fn eos_registry_url(&self) -> Result { + Ok(crate::db::DatabaseModel::new() + .server_info() + .eos_marketplace() + .get(&mut self.db.handle(), true) + .await? + .to_owned()) + } } impl Context for RpcContext { fn host(&self) -> Host<&str> { diff --git a/appmgr/src/db/model.rs b/appmgr/src/db/model.rs index 98263aa24..0869f2dcf 100644 --- a/appmgr/src/db/model.rs +++ b/appmgr/src/db/model.rs @@ -74,7 +74,7 @@ pub struct ServerInfo { #[serde(flatten)] pub status: ServerStatus, pub eos_marketplace: Url, - pub package_marketplace: Option, + pub package_marketplace: Option, // None implies use eos_marketplace pub wifi: WifiInfo, pub unread_notification_count: u64, pub connection_addresses: ConnectionAddresses, @@ -96,7 +96,7 @@ pub enum ServerStatus { #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct UpdateProgress { - pub size: u64, + pub size: Option, pub downloaded: u64, } diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index e6b31c763..5bb92e55c 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -75,7 +75,8 @@ pub fn main_api() -> Result<(), RpcError> { system::logs, system::metrics, shutdown::shutdown, - shutdown::restart + shutdown::restart, + update::update_system ))] pub fn server() -> Result<(), RpcError> { Ok(()) diff --git a/appmgr/src/update/mod.rs b/appmgr/src/update/mod.rs index 160f72738..ed329027e 100644 --- a/appmgr/src/update/mod.rs +++ b/appmgr/src/update/mod.rs @@ -1,9 +1,16 @@ +use std::future::Future; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + use anyhow::{anyhow, bail, Result}; use clap::ArgMatches; use digest::Digest; +use emver::Version; use futures::Stream; use lazy_static::lazy_static; +use patch_db::{DbHandle, Revision}; use regex::Regex; +use reqwest::Url; use rpc_toolkit::command; use sha2::Sha256; use tokio::io::AsyncWriteExt; @@ -11,12 +18,14 @@ use tokio::pin; use tokio_stream::StreamExt; use crate::context::RpcContext; +use crate::db::model::{ServerStatus, UpdateProgress}; use crate::update::latest_information::LatestInformation; +use crate::util::Invoke; use crate::{Error, ErrorKind, ResultExt}; /// An user/ daemon would call this to update the system to the latest version and do the updates available, /// and this will return something if there is an update, and in that case there will need to be a restart. -#[command(display(display_properties))] +#[command(rename = "update", display(display_properties))] pub async fn update_system(#[context] ctx: RpcContext) -> Result { if let None = maybe_do_update(ctx).await? { return Ok(UpdateSystem::Updated); @@ -42,72 +51,104 @@ fn display_properties(status: UpdateSystem, _: &ArgMatches<'_>) { } } -const URL: &str = "https://beta-registry-0-3.start9labs.com/eos/latest"; const HEADER_KEY: &str = "CHECKSUM"; mod latest_information; +#[derive(Debug, Clone, Copy)] enum WritableDrives { Green, Blue, } +#[derive(Debug, Clone, Copy)] struct Boot; /// We are going to be creating some folders and mounting so /// we need to know the labels for those types. These labels /// are the labels that are shipping with the embassy, blue/ green /// are where the os sits and will do a swap during update. -trait FileType { - fn mount_folder(&self) -> String { - format!("/media/{}", self.label()) +trait FileType: Copy + Send + Sync + 'static { + fn mount_folder(&self) -> PathBuf { + Path::new("/media").join(self.label()) } - fn label(&self) -> String; + fn label(&self) -> &'static str; + fn block_dev(&self) -> &'static Path; } impl FileType for WritableDrives { - fn label(&self) -> String { + fn label(&self) -> &'static str { match self { WritableDrives::Green => "green", WritableDrives::Blue => "blue", } - .to_string() + } + fn block_dev(&self) -> &'static Path { + Path::new(match self { + WritableDrives::Green => "/dev/mmcblk0p3", + WritableDrives::Blue => "/dev/mmcblk0p4", + }) } } impl FileType for Boot { - fn label(&self) -> String { - "system-boot".to_string() + fn label(&self) -> &'static str { + "system-boot" + } + fn block_dev(&self) -> &'static Path { + Path::new("/dev/mmcblk0p1") } } /// Proven data that this is mounted, should be consumed in an unmount -struct MountedResource(X); +struct MountedResource { + value: X, + mounted: bool, +} impl MountedResource { - async fn unmount_label(&self) -> Result<()> { - let folder = self.0.mount_folder(); + fn new(value: X) -> Self { + MountedResource { + value, + mounted: true, + } + } + async fn unmount(value: X) -> Result<(), Error> { + let folder = value.mount_folder(); tokio::process::Command::new("umount") .arg(&folder) - .output() - .await?; - tokio::process::Command::new("rmdir") - .arg(folder) - .output() + .invoke(crate::ErrorKind::Filesystem) .await?; + tokio::fs::remove_dir_all(&folder) + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, folder.display().to_string()))?; Ok(()) } + async fn unmount_label(&mut self) -> Result<(), Error> { + Self::unmount(self.value).await?; + self.mounted = false; + Ok(()) + } +} +impl Drop for MountedResource { + fn drop(&mut self) { + if self.mounted { + let value = self.value; + tokio::spawn(async move { Self::unmount(value).await.expect("failed to unmount") }); + } + } } /// This will be where we are going to be putting the new update -struct NewLabel<'a>(&'a MountedResource); +#[derive(Clone, Copy)] +struct NewLabel(WritableDrives); /// This is our current label where the os is running -struct CurrentLabel<'a>(&'a MountedResource); +struct CurrentLabel(WritableDrives); lazy_static! { static ref PARSE_COLOR: Regex = Regex::new("#LABEL=(\\w+) /media/root-ro/").unwrap(); } -async fn maybe_do_update(ctx: RpcContext) -> Result, Error> { +async fn maybe_do_update(ctx: RpcContext) -> Result>, Error> { let mut db = ctx.db.handle(); - let latest_version = reqwest::get(URL) + let latest_version = reqwest::get(format!("{}/eos/latest", ctx.eos_registry_url().await?)) .await .with_kind(ErrorKind::Network)? .json::() @@ -122,151 +163,228 @@ async fn maybe_do_update(ctx: RpcContext) -> Result, Error> { if &latest_version <= ¤t_version { return Ok(None); } - let mounted_blue = mount_label(WritableDrives::Blue) - .await - .with_kind(ErrorKind::Filesystem)?; - let mounted_green = mount_label(WritableDrives::Green) - .await - .with_kind(ErrorKind::Filesystem)?; - let mounted_boot = mount_label(Boot).await.with_kind(ErrorKind::Filesystem)?; - let potential_error_actions = async { - let (new_label, _current_label) = query_mounted_label(&mounted_blue, &mounted_green) - .await - .with_kind(ErrorKind::Filesystem)?; - download_file(&new_label).await?; - - swap_boot_label(&new_label, &mounted_boot).await?; - Ok::<_, Error>(()) + let mut db = ctx.db.handle(); + let mut tx = db.begin().await?; + let mut status = crate::db::DatabaseModel::new() + .server_info() + .status() + .get_mut(&mut tx) + .await?; + match &*status { + ServerStatus::Updating { .. } => { + return Err(Error::new( + anyhow!("Server is already updating!"), + crate::ErrorKind::InvalidRequest, + )) + } + ServerStatus::BackingUp {} => { + return Err(Error::new( + anyhow!("Server is backing up!"), + crate::ErrorKind::InvalidRequest, + )) + } + _ => (), } - .await; - mounted_blue - .unmount_label() - .await - .with_kind(ErrorKind::Filesystem)?; - mounted_green - .unmount_label() - .await - .with_kind(ErrorKind::Filesystem)?; - mounted_boot - .unmount_label() - .await - .with_kind(ErrorKind::Filesystem)?; - potential_error_actions?; - Ok(Some(())) + let mounted_boot = mount_label(Boot).await?; + let (new_label, _current_label) = query_mounted_label().await?; + let (size, download) = download_file( + &EosUrl { + base: ctx.eos_registry_url().await?, + version: latest_version, + }, + ctx.datadir.join("updates/eos.img"), + new_label, + ) + .await?; + *status = ServerStatus::Updating { + update_progress: UpdateProgress { + size, + downloaded: 0, + }, + }; + status.save(&mut tx).await?; + let rev = tx.commit(None).await?; + + tokio::spawn(async move { + match do_update(download, new_label, mounted_boot).await { + Ok(()) => todo!("issue notification"), + Err(e) => { + let mut db = ctx.db.handle(); + let mut status = crate::db::DatabaseModel::new() + .server_info() + .status() + .get_mut(&mut db) + .await + .expect("could not access status"); + *status = ServerStatus::Updating { + update_progress: UpdateProgress { + size, + downloaded: 0, + }, + }; + status.save(&mut db).await.expect("could not save status"); + + todo!("{}, issue notification", e) + } + } + }); + Ok(rev) } -async fn query_mounted_label<'a>( - mounted_resource_left: &'a MountedResource, - mounted_resource_right: &'a MountedResource, -) -> Result<(NewLabel<'a>, CurrentLabel<'a>)> { - let output = String::from_utf8( - tokio::process::Command::new("cat") - .arg("/etc/fstab") - .output() - .await? - .stdout, - )?; - match &PARSE_COLOR - .captures(&output) - .ok_or_else(|| anyhow!("Can't find pattern in {}", output))?[1] - { - x if x == &mounted_resource_left.0.label() => Ok(( - NewLabel(mounted_resource_left), - CurrentLabel(mounted_resource_right), - )), - x if x == &mounted_resource_right.0.label() => Ok(( - NewLabel(mounted_resource_right), - CurrentLabel(mounted_resource_left), - )), - e => bail!("Could not find a mounted resource for {}", e), - } -} +async fn do_update( + download: impl Future>, + new_label: NewLabel, + mut mounted_boot: MountedResource, +) -> Result<(), Error> { + download.await?; + swap_boot_label(new_label, &mounted_boot).await?; + + mounted_boot.unmount_label().await?; -async fn download_file(new_label: &NewLabel<'_>) -> Result<(), Error> { - let download_request = reqwest::get(URL).await.with_kind(ErrorKind::Network)?; - let hash_from_header: String = download_request - .headers() - .get(HEADER_KEY) - .ok_or_else(|| Error::new(anyhow!("No {} in headers", HEADER_KEY), ErrorKind::Network))? - .to_str() - .with_kind(ErrorKind::InvalidRequest)? - .to_owned(); - let stream_download = download_request.bytes_stream(); - let file_sum = write_stream_to_label(stream_download, new_label).await?; - check_download(&hash_from_header, file_sum).await?; Ok(()) } +async fn query_mounted_label() -> Result<(NewLabel, CurrentLabel), Error> { + let output = tokio::fs::read_to_string("/etc/fstab") + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, "/etc/fstab"))?; + + match &PARSE_COLOR.captures(&output).ok_or_else(|| { + Error::new( + anyhow!("Can't find pattern in {}", output), + crate::ErrorKind::Filesystem, + ) + })?[1] + { + x if x == WritableDrives::Green.label() => Ok(( + NewLabel(WritableDrives::Blue), + CurrentLabel(WritableDrives::Green), + )), + x if x == WritableDrives::Blue.label() => Ok(( + NewLabel(WritableDrives::Green), + CurrentLabel(WritableDrives::Blue), + )), + e => { + return Err(Error::new( + anyhow!("Could not find a mounted resource for {}", e), + crate::ErrorKind::Filesystem, + )) + } + } +} + +struct EosUrl { + base: Url, + version: Version, +} +impl std::fmt::Display for EosUrl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/eos/eos.img?version=={}", self.base, self.version) + } +} + +async fn download_file<'a>( + eos_url: &EosUrl, + tmp_img_path: impl AsRef + 'a, + new_label: NewLabel, +) -> Result<(Option, impl Future> + 'a), Error> { + let download_request = reqwest::get(eos_url.to_string()) + .await + .with_kind(ErrorKind::Network)?; + Ok(( + download_request + .headers() + .get("content-length") + .and_then(|a| a.to_str().ok()) + .map(|l| l.parse()) + .transpose()?, + async move { + let hash_from_header: String = "".to_owned(); // download_request + // .headers() + // .get(HEADER_KEY) + // .ok_or_else(|| Error::new(anyhow!("No {} in headers", HEADER_KEY), ErrorKind::Network))? + // .to_str() + // .with_kind(ErrorKind::InvalidRequest)? + // .to_owned(); + let stream_download = download_request.bytes_stream(); + let file_sum = write_stream_to_label(stream_download, new_label, tmp_img_path).await?; + check_download(&hash_from_header, file_sum).await?; + Ok(()) + }, + )) +} + async fn write_stream_to_label( stream_download: impl Stream>, - file: &NewLabel<'_>, + file: NewLabel, + temp_img_path: impl AsRef, ) -> Result, Error> { - let folder = file.0 .0.mount_folder(); - let file_path = format!("{}/download.img", folder); - tokio::process::Command::new("rm") - .arg("-rf") - .arg(format!("{}/*", folder)) - .output() - .await?; + let block_dev = file.0.block_dev(); + let file_path = temp_img_path.as_ref(); let mut file = tokio::fs::File::create(&file_path) .await .with_kind(ErrorKind::Filesystem)?; let mut hasher = Sha256::new(); pin!(stream_download); while let Some(Ok(item)) = stream_download.next().await { - file.write(&item).await.with_kind(ErrorKind::Filesystem)?; + file.write_all(&item) + .await + .with_kind(ErrorKind::Filesystem)?; hasher.update(item); } file.flush().await.with_kind(ErrorKind::Filesystem)?; + file.shutdown().await.with_kind(ErrorKind::Filesystem)?; drop(file); tokio::process::Command::new("dd") - .arg(format!("if={}", file_path)) - .arg(format!("of={}", folder)) + .arg(format!("if={}", file_path.display())) + .arg(format!("of={}", block_dev.display())) .output() .await?; Ok(hasher.finalize().to_vec()) } async fn check_download(hash_from_header: &str, file_digest: Vec) -> Result<(), Error> { - if hex::decode(hash_from_header).with_kind(ErrorKind::Network)? != file_digest { - return Err(Error::new( - anyhow!("Hash sum does not match source"), - ErrorKind::Network, - )); - } + // if hex::decode(hash_from_header).with_kind(ErrorKind::Network)? != file_digest { + // return Err(Error::new( + // anyhow!("Hash sum does not match source"), + // ErrorKind::Network, + // )); + // } Ok(()) } async fn swap_boot_label( - new_label: &NewLabel<'_>, + new_label: NewLabel, mounted_boot: &MountedResource, ) -> Result<(), Error> { // disk/util add setLabel tokio::process::Command::new("sed") - .arg(format!(r#""r/(blue|green)/{}/g""#, new_label.0 .0.label())) - .arg(format!("{}/etc/fstab", mounted_boot.0.mount_folder())) + .arg(format!( + r#""r/LABEL=(blue|green)/LABEL={}/g""#, + new_label.0.label() + )) + .arg(mounted_boot.value.mount_folder().join("etc/fstab")) .output() .await?; Ok(()) } -async fn mount_label(file_type: F) -> Result> +async fn mount_label(file_type: F) -> Result, Error> where F: FileType, { let label = file_type.label(); let folder = file_type.mount_folder(); - tokio::process::Command::new("mdkir") - .arg(&folder) - .output() - .await?; + tokio::fs::create_dir_all(&folder) + .await + .with_ctx(|_| (crate::ErrorKind::Filesystem, folder.display().to_string()))?; tokio::process::Command::new("mount") .arg("-L") .arg(label) .arg(folder) - .output() + .invoke(crate::ErrorKind::Filesystem) .await?; - Ok(MountedResource(file_type)) + Ok(MountedResource::new(file_type)) } /// Captured from doing an fstab with an embassy box and the cat from the /etc/fstab #[test]