misc fixes

This commit is contained in:
Aiden McClelland
2021-10-08 16:45:13 -06:00
committed by Aiden McClelland
parent ab6c14afb7
commit f4b70f653f
4 changed files with 249 additions and 127 deletions

View File

@@ -214,21 +214,24 @@ impl RpcContext {
if let Some(market) = crate::db::DatabaseModel::new() if let Some(market) = crate::db::DatabaseModel::new()
.server_info() .server_info()
.package_marketplace() .package_marketplace()
.get(&mut self.db.handle(), false) .get(&mut self.db.handle(), true)
.await? .await?
.to_owned() .to_owned()
{ {
market market
} else { } else {
crate::db::DatabaseModel::new() self.eos_registry_url().await?
.server_info()
.eos_marketplace()
.get(&mut self.db.handle(), false)
.await?
.to_owned()
}, },
) )
} }
pub async fn eos_registry_url(&self) -> Result<Url, Error> {
Ok(crate::db::DatabaseModel::new()
.server_info()
.eos_marketplace()
.get(&mut self.db.handle(), true)
.await?
.to_owned())
}
} }
impl Context for RpcContext { impl Context for RpcContext {
fn host(&self) -> Host<&str> { fn host(&self) -> Host<&str> {

View File

@@ -74,7 +74,7 @@ pub struct ServerInfo {
#[serde(flatten)] #[serde(flatten)]
pub status: ServerStatus, pub status: ServerStatus,
pub eos_marketplace: Url, pub eos_marketplace: Url,
pub package_marketplace: Option<Url>, pub package_marketplace: Option<Url>, // None implies use eos_marketplace
pub wifi: WifiInfo, pub wifi: WifiInfo,
pub unread_notification_count: u64, pub unread_notification_count: u64,
pub connection_addresses: ConnectionAddresses, pub connection_addresses: ConnectionAddresses,
@@ -96,7 +96,7 @@ pub enum ServerStatus {
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub struct UpdateProgress { pub struct UpdateProgress {
pub size: u64, pub size: Option<u64>,
pub downloaded: u64, pub downloaded: u64,
} }

View File

@@ -75,7 +75,8 @@ pub fn main_api() -> Result<(), RpcError> {
system::logs, system::logs,
system::metrics, system::metrics,
shutdown::shutdown, shutdown::shutdown,
shutdown::restart shutdown::restart,
update::update_system
))] ))]
pub fn server() -> Result<(), RpcError> { pub fn server() -> Result<(), RpcError> {
Ok(()) Ok(())

View File

@@ -1,9 +1,16 @@
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use clap::ArgMatches; use clap::ArgMatches;
use digest::Digest; use digest::Digest;
use emver::Version;
use futures::Stream; use futures::Stream;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use patch_db::{DbHandle, Revision};
use regex::Regex; use regex::Regex;
use reqwest::Url;
use rpc_toolkit::command; use rpc_toolkit::command;
use sha2::Sha256; use sha2::Sha256;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@@ -11,12 +18,14 @@ use tokio::pin;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use crate::context::RpcContext; use crate::context::RpcContext;
use crate::db::model::{ServerStatus, UpdateProgress};
use crate::update::latest_information::LatestInformation; use crate::update::latest_information::LatestInformation;
use crate::util::Invoke;
use crate::{Error, ErrorKind, ResultExt}; use crate::{Error, ErrorKind, ResultExt};
/// An user/ daemon would call this to update the system to the latest version and do the updates available, /// An user/ daemon would call this to update the system to the latest version and do the updates available,
/// and this will return something if there is an update, and in that case there will need to be a restart. /// and this will return something if there is an update, and in that case there will need to be a restart.
#[command(display(display_properties))] #[command(rename = "update", display(display_properties))]
pub async fn update_system(#[context] ctx: RpcContext) -> Result<UpdateSystem, Error> { pub async fn update_system(#[context] ctx: RpcContext) -> Result<UpdateSystem, Error> {
if let None = maybe_do_update(ctx).await? { if let None = maybe_do_update(ctx).await? {
return Ok(UpdateSystem::Updated); return Ok(UpdateSystem::Updated);
@@ -42,72 +51,104 @@ fn display_properties(status: UpdateSystem, _: &ArgMatches<'_>) {
} }
} }
const URL: &str = "https://beta-registry-0-3.start9labs.com/eos/latest";
const HEADER_KEY: &str = "CHECKSUM"; const HEADER_KEY: &str = "CHECKSUM";
mod latest_information; mod latest_information;
#[derive(Debug, Clone, Copy)]
enum WritableDrives { enum WritableDrives {
Green, Green,
Blue, Blue,
} }
#[derive(Debug, Clone, Copy)]
struct Boot; struct Boot;
/// We are going to be creating some folders and mounting so /// We are going to be creating some folders and mounting so
/// we need to know the labels for those types. These labels /// we need to know the labels for those types. These labels
/// are the labels that are shipping with the embassy, blue/ green /// are the labels that are shipping with the embassy, blue/ green
/// are where the os sits and will do a swap during update. /// are where the os sits and will do a swap during update.
trait FileType { trait FileType: Copy + Send + Sync + 'static {
fn mount_folder(&self) -> String { fn mount_folder(&self) -> PathBuf {
format!("/media/{}", self.label()) Path::new("/media").join(self.label())
} }
fn label(&self) -> String; fn label(&self) -> &'static str;
fn block_dev(&self) -> &'static Path;
} }
impl FileType for WritableDrives { impl FileType for WritableDrives {
fn label(&self) -> String { fn label(&self) -> &'static str {
match self { match self {
WritableDrives::Green => "green", WritableDrives::Green => "green",
WritableDrives::Blue => "blue", WritableDrives::Blue => "blue",
} }
.to_string() }
fn block_dev(&self) -> &'static Path {
Path::new(match self {
WritableDrives::Green => "/dev/mmcblk0p3",
WritableDrives::Blue => "/dev/mmcblk0p4",
})
} }
} }
impl FileType for Boot { impl FileType for Boot {
fn label(&self) -> String { fn label(&self) -> &'static str {
"system-boot".to_string() "system-boot"
}
fn block_dev(&self) -> &'static Path {
Path::new("/dev/mmcblk0p1")
} }
} }
/// Proven data that this is mounted, should be consumed in an unmount /// Proven data that this is mounted, should be consumed in an unmount
struct MountedResource<X: FileType>(X); struct MountedResource<X: FileType> {
value: X,
mounted: bool,
}
impl<X: FileType> MountedResource<X> { impl<X: FileType> MountedResource<X> {
async fn unmount_label(&self) -> Result<()> { fn new(value: X) -> Self {
let folder = self.0.mount_folder(); MountedResource {
value,
mounted: true,
}
}
async fn unmount(value: X) -> Result<(), Error> {
let folder = value.mount_folder();
tokio::process::Command::new("umount") tokio::process::Command::new("umount")
.arg(&folder) .arg(&folder)
.output() .invoke(crate::ErrorKind::Filesystem)
.await?;
tokio::process::Command::new("rmdir")
.arg(folder)
.output()
.await?; .await?;
tokio::fs::remove_dir_all(&folder)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, folder.display().to_string()))?;
Ok(()) Ok(())
} }
async fn unmount_label(&mut self) -> Result<(), Error> {
Self::unmount(self.value).await?;
self.mounted = false;
Ok(())
}
}
impl<X: FileType> Drop for MountedResource<X> {
fn drop(&mut self) {
if self.mounted {
let value = self.value;
tokio::spawn(async move { Self::unmount(value).await.expect("failed to unmount") });
}
}
} }
/// This will be where we are going to be putting the new update /// This will be where we are going to be putting the new update
struct NewLabel<'a>(&'a MountedResource<WritableDrives>); #[derive(Clone, Copy)]
struct NewLabel(WritableDrives);
/// This is our current label where the os is running /// This is our current label where the os is running
struct CurrentLabel<'a>(&'a MountedResource<WritableDrives>); struct CurrentLabel(WritableDrives);
lazy_static! { lazy_static! {
static ref PARSE_COLOR: Regex = Regex::new("#LABEL=(\\w+) /media/root-ro/").unwrap(); static ref PARSE_COLOR: Regex = Regex::new("#LABEL=(\\w+) /media/root-ro/").unwrap();
} }
async fn maybe_do_update(ctx: RpcContext) -> Result<Option<()>, Error> { async fn maybe_do_update(ctx: RpcContext) -> Result<Option<Arc<Revision>>, Error> {
let mut db = ctx.db.handle(); let mut db = ctx.db.handle();
let latest_version = reqwest::get(URL) let latest_version = reqwest::get(format!("{}/eos/latest", ctx.eos_registry_url().await?))
.await .await
.with_kind(ErrorKind::Network)? .with_kind(ErrorKind::Network)?
.json::<LatestInformation>() .json::<LatestInformation>()
@@ -122,151 +163,228 @@ async fn maybe_do_update(ctx: RpcContext) -> Result<Option<()>, Error> {
if &latest_version <= &current_version { if &latest_version <= &current_version {
return Ok(None); return Ok(None);
} }
let mounted_blue = mount_label(WritableDrives::Blue) let mut db = ctx.db.handle();
.await let mut tx = db.begin().await?;
.with_kind(ErrorKind::Filesystem)?; let mut status = crate::db::DatabaseModel::new()
let mounted_green = mount_label(WritableDrives::Green) .server_info()
.await .status()
.with_kind(ErrorKind::Filesystem)?; .get_mut(&mut tx)
let mounted_boot = mount_label(Boot).await.with_kind(ErrorKind::Filesystem)?; .await?;
let potential_error_actions = async { match &*status {
let (new_label, _current_label) = query_mounted_label(&mounted_blue, &mounted_green) ServerStatus::Updating { .. } => {
.await return Err(Error::new(
.with_kind(ErrorKind::Filesystem)?; anyhow!("Server is already updating!"),
download_file(&new_label).await?; crate::ErrorKind::InvalidRequest,
))
swap_boot_label(&new_label, &mounted_boot).await?; }
Ok::<_, Error>(()) ServerStatus::BackingUp {} => {
return Err(Error::new(
anyhow!("Server is backing up!"),
crate::ErrorKind::InvalidRequest,
))
}
_ => (),
} }
.await;
mounted_blue let mounted_boot = mount_label(Boot).await?;
.unmount_label() let (new_label, _current_label) = query_mounted_label().await?;
.await let (size, download) = download_file(
.with_kind(ErrorKind::Filesystem)?; &EosUrl {
mounted_green base: ctx.eos_registry_url().await?,
.unmount_label() version: latest_version,
.await },
.with_kind(ErrorKind::Filesystem)?; ctx.datadir.join("updates/eos.img"),
mounted_boot new_label,
.unmount_label() )
.await .await?;
.with_kind(ErrorKind::Filesystem)?; *status = ServerStatus::Updating {
potential_error_actions?; update_progress: UpdateProgress {
Ok(Some(())) size,
downloaded: 0,
},
};
status.save(&mut tx).await?;
let rev = tx.commit(None).await?;
tokio::spawn(async move {
match do_update(download, new_label, mounted_boot).await {
Ok(()) => todo!("issue notification"),
Err(e) => {
let mut db = ctx.db.handle();
let mut status = crate::db::DatabaseModel::new()
.server_info()
.status()
.get_mut(&mut db)
.await
.expect("could not access status");
*status = ServerStatus::Updating {
update_progress: UpdateProgress {
size,
downloaded: 0,
},
};
status.save(&mut db).await.expect("could not save status");
todo!("{}, issue notification", e)
}
}
});
Ok(rev)
} }
async fn query_mounted_label<'a>( async fn do_update(
mounted_resource_left: &'a MountedResource<WritableDrives>, download: impl Future<Output = Result<(), Error>>,
mounted_resource_right: &'a MountedResource<WritableDrives>, new_label: NewLabel,
) -> Result<(NewLabel<'a>, CurrentLabel<'a>)> { mut mounted_boot: MountedResource<Boot>,
let output = String::from_utf8( ) -> Result<(), Error> {
tokio::process::Command::new("cat") download.await?;
.arg("/etc/fstab") swap_boot_label(new_label, &mounted_boot).await?;
.output()
.await? mounted_boot.unmount_label().await?;
.stdout,
)?;
match &PARSE_COLOR
.captures(&output)
.ok_or_else(|| anyhow!("Can't find pattern in {}", output))?[1]
{
x if x == &mounted_resource_left.0.label() => Ok((
NewLabel(mounted_resource_left),
CurrentLabel(mounted_resource_right),
)),
x if x == &mounted_resource_right.0.label() => Ok((
NewLabel(mounted_resource_right),
CurrentLabel(mounted_resource_left),
)),
e => bail!("Could not find a mounted resource for {}", e),
}
}
async fn download_file(new_label: &NewLabel<'_>) -> Result<(), Error> {
let download_request = reqwest::get(URL).await.with_kind(ErrorKind::Network)?;
let hash_from_header: String = download_request
.headers()
.get(HEADER_KEY)
.ok_or_else(|| Error::new(anyhow!("No {} in headers", HEADER_KEY), ErrorKind::Network))?
.to_str()
.with_kind(ErrorKind::InvalidRequest)?
.to_owned();
let stream_download = download_request.bytes_stream();
let file_sum = write_stream_to_label(stream_download, new_label).await?;
check_download(&hash_from_header, file_sum).await?;
Ok(()) Ok(())
} }
async fn query_mounted_label() -> Result<(NewLabel, CurrentLabel), Error> {
let output = tokio::fs::read_to_string("/etc/fstab")
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, "/etc/fstab"))?;
match &PARSE_COLOR.captures(&output).ok_or_else(|| {
Error::new(
anyhow!("Can't find pattern in {}", output),
crate::ErrorKind::Filesystem,
)
})?[1]
{
x if x == WritableDrives::Green.label() => Ok((
NewLabel(WritableDrives::Blue),
CurrentLabel(WritableDrives::Green),
)),
x if x == WritableDrives::Blue.label() => Ok((
NewLabel(WritableDrives::Green),
CurrentLabel(WritableDrives::Blue),
)),
e => {
return Err(Error::new(
anyhow!("Could not find a mounted resource for {}", e),
crate::ErrorKind::Filesystem,
))
}
}
}
struct EosUrl {
base: Url,
version: Version,
}
impl std::fmt::Display for EosUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/eos/eos.img?version=={}", self.base, self.version)
}
}
async fn download_file<'a>(
eos_url: &EosUrl,
tmp_img_path: impl AsRef<Path> + 'a,
new_label: NewLabel,
) -> Result<(Option<u64>, impl Future<Output = Result<(), Error>> + 'a), Error> {
let download_request = reqwest::get(eos_url.to_string())
.await
.with_kind(ErrorKind::Network)?;
Ok((
download_request
.headers()
.get("content-length")
.and_then(|a| a.to_str().ok())
.map(|l| l.parse())
.transpose()?,
async move {
let hash_from_header: String = "".to_owned(); // download_request
// .headers()
// .get(HEADER_KEY)
// .ok_or_else(|| Error::new(anyhow!("No {} in headers", HEADER_KEY), ErrorKind::Network))?
// .to_str()
// .with_kind(ErrorKind::InvalidRequest)?
// .to_owned();
let stream_download = download_request.bytes_stream();
let file_sum = write_stream_to_label(stream_download, new_label, tmp_img_path).await?;
check_download(&hash_from_header, file_sum).await?;
Ok(())
},
))
}
async fn write_stream_to_label( async fn write_stream_to_label(
stream_download: impl Stream<Item = Result<rpc_toolkit::hyper::body::Bytes, reqwest::Error>>, stream_download: impl Stream<Item = Result<rpc_toolkit::hyper::body::Bytes, reqwest::Error>>,
file: &NewLabel<'_>, file: NewLabel,
temp_img_path: impl AsRef<Path>,
) -> Result<Vec<u8>, Error> { ) -> Result<Vec<u8>, Error> {
let folder = file.0 .0.mount_folder(); let block_dev = file.0.block_dev();
let file_path = format!("{}/download.img", folder); let file_path = temp_img_path.as_ref();
tokio::process::Command::new("rm")
.arg("-rf")
.arg(format!("{}/*", folder))
.output()
.await?;
let mut file = tokio::fs::File::create(&file_path) let mut file = tokio::fs::File::create(&file_path)
.await .await
.with_kind(ErrorKind::Filesystem)?; .with_kind(ErrorKind::Filesystem)?;
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
pin!(stream_download); pin!(stream_download);
while let Some(Ok(item)) = stream_download.next().await { while let Some(Ok(item)) = stream_download.next().await {
file.write(&item).await.with_kind(ErrorKind::Filesystem)?; file.write_all(&item)
.await
.with_kind(ErrorKind::Filesystem)?;
hasher.update(item); hasher.update(item);
} }
file.flush().await.with_kind(ErrorKind::Filesystem)?; file.flush().await.with_kind(ErrorKind::Filesystem)?;
file.shutdown().await.with_kind(ErrorKind::Filesystem)?;
drop(file); drop(file);
tokio::process::Command::new("dd") tokio::process::Command::new("dd")
.arg(format!("if={}", file_path)) .arg(format!("if={}", file_path.display()))
.arg(format!("of={}", folder)) .arg(format!("of={}", block_dev.display()))
.output() .output()
.await?; .await?;
Ok(hasher.finalize().to_vec()) Ok(hasher.finalize().to_vec())
} }
async fn check_download(hash_from_header: &str, file_digest: Vec<u8>) -> Result<(), Error> { async fn check_download(hash_from_header: &str, file_digest: Vec<u8>) -> Result<(), Error> {
if hex::decode(hash_from_header).with_kind(ErrorKind::Network)? != file_digest { // if hex::decode(hash_from_header).with_kind(ErrorKind::Network)? != file_digest {
return Err(Error::new( // return Err(Error::new(
anyhow!("Hash sum does not match source"), // anyhow!("Hash sum does not match source"),
ErrorKind::Network, // ErrorKind::Network,
)); // ));
} // }
Ok(()) Ok(())
} }
async fn swap_boot_label( async fn swap_boot_label(
new_label: &NewLabel<'_>, new_label: NewLabel,
mounted_boot: &MountedResource<Boot>, mounted_boot: &MountedResource<Boot>,
) -> Result<(), Error> { ) -> Result<(), Error> {
// disk/util add setLabel // disk/util add setLabel
tokio::process::Command::new("sed") tokio::process::Command::new("sed")
.arg(format!(r#""r/(blue|green)/{}/g""#, new_label.0 .0.label())) .arg(format!(
.arg(format!("{}/etc/fstab", mounted_boot.0.mount_folder())) r#""r/LABEL=(blue|green)/LABEL={}/g""#,
new_label.0.label()
))
.arg(mounted_boot.value.mount_folder().join("etc/fstab"))
.output() .output()
.await?; .await?;
Ok(()) Ok(())
} }
async fn mount_label<F>(file_type: F) -> Result<MountedResource<F>> async fn mount_label<F>(file_type: F) -> Result<MountedResource<F>, Error>
where where
F: FileType, F: FileType,
{ {
let label = file_type.label(); let label = file_type.label();
let folder = file_type.mount_folder(); let folder = file_type.mount_folder();
tokio::process::Command::new("mdkir") tokio::fs::create_dir_all(&folder)
.arg(&folder) .await
.output() .with_ctx(|_| (crate::ErrorKind::Filesystem, folder.display().to_string()))?;
.await?;
tokio::process::Command::new("mount") tokio::process::Command::new("mount")
.arg("-L") .arg("-L")
.arg(label) .arg(label)
.arg(folder) .arg(folder)
.output() .invoke(crate::ErrorKind::Filesystem)
.await?; .await?;
Ok(MountedResource(file_type)) Ok(MountedResource::new(file_type))
} }
/// Captured from doing an fstab with an embassy box and the cat from the /etc/fstab /// Captured from doing an fstab with an embassy box and the cat from the /etc/fstab
#[test] #[test]