diff --git a/appmgr/src/bin/embassyd.rs b/appmgr/src/bin/embassyd.rs index 55a8aa70a..947a16954 100644 --- a/appmgr/src/bin/embassyd.rs +++ b/appmgr/src/bin/embassyd.rs @@ -3,6 +3,7 @@ use std::time::Duration; use color_eyre::eyre::eyre; use embassy::context::{DiagnosticContext, RpcContext}; +use embassy::core::rpc_continuations::RequestGuid; use embassy::db::subscribe; use embassy::middleware::auth::auth; use embassy::middleware::cors::cors; @@ -153,10 +154,45 @@ async fn inner_main(cfg_path: Option<&str>) -> Result, Error> { move |req| { let ctx = ctx.clone(); async move { + tracing::debug!("Request to {}", req.uri().path()); match req.uri().path() { - "/db" => { + "/ws/db" => { Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500)) } + path if path.starts_with("/rest/rpc/") => { + match RequestGuid::from( + path.strip_prefix("/rest/rpc/").unwrap(), + ) { + None => { + tracing::debug!("No Guid Path"); + Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + } + Some(guid) => { + match ctx + .rpc_stream_continuations + .lock() + .await + .remove(&guid) + { + None => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()), + Some(cont) => { + match (cont.handler)(req).await { + Ok(r) => Ok(r), + Err(e) => Response::builder() + .status( + StatusCode::INTERNAL_SERVER_ERROR, + ) + .body(Body::from(format!("{}", e))), + } + } + } + } + } + } _ => Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::empty()), diff --git a/appmgr/src/context/cli.rs b/appmgr/src/context/cli.rs index 3ea300cc1..2d325b6d7 100644 --- a/appmgr/src/context/cli.rs +++ b/appmgr/src/context/cli.rs @@ -29,7 +29,8 @@ pub struct CliContextConfig { #[derive(Debug)] pub struct CliContextSeed { - pub url: Url, + pub base_url: Url, + pub rpc_url: Url, pub client: Client, pub cookie_store: Arc, pub cookie_path: PathBuf, @@ -69,14 +70,14 @@ impl CliContext { } else { CliContextConfig::default() }; - let url = if let Some(host) = matches.value_of("host") { + let mut url = if let Some(host) = matches.value_of("host") { host.parse()? } else if let Some(host) = base.host { host } else { format!( "http://{}", - base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()) + base.bind_rpc.unwrap_or(([127, 0, 0, 1], 80).into()) ) .parse()? }; @@ -100,7 +101,15 @@ impl CliContext { CookieStore::default() })); Ok(CliContext(Arc::new(CliContextSeed { - url, + base_url: url.clone(), + rpc_url: { + url.path_segments_mut() + .map_err(|_| eyre!("Url cannot be base")) + .with_kind(crate::ErrorKind::ParseUrl)? + .push("rpc") + .push("v1"); + dbg!(url) + }, client: { let mut builder = Client::builder().cookie_provider(cookie_store.clone()); if let Some(proxy) = proxy { @@ -122,19 +131,19 @@ impl std::ops::Deref for CliContext { } impl Context for CliContext { fn protocol(&self) -> &str { - self.0.url.scheme() + self.0.base_url.scheme() } fn host(&self) -> Host<&str> { - self.0.url.host().unwrap_or(DEFAULT_HOST) + self.0.base_url.host().unwrap_or(DEFAULT_HOST) } fn port(&self) -> u16 { - self.0.url.port().unwrap_or(DEFAULT_PORT) + self.0.base_url.port().unwrap_or(DEFAULT_PORT) } fn path(&self) -> &str { - self.0.url.path() + self.0.rpc_url.path() } fn url(&self) -> Url { - self.0.url.clone() + self.0.rpc_url.clone() } fn client(&self) -> &Client { &self.0.client diff --git a/appmgr/src/context/rpc.rs b/appmgr/src/context/rpc.rs index b119fc673..f003e0f73 100644 --- a/appmgr/src/context/rpc.rs +++ b/appmgr/src/context/rpc.rs @@ -19,6 +19,7 @@ use tokio::fs::File; use tokio::sync::{broadcast, oneshot, Mutex, RwLock}; use tracing::instrument; +use crate::core::rpc_continuations::{RequestGuid, RpcContinuation}; use crate::db::model::Database; use crate::hostname::{get_hostname, get_id}; use crate::manager::ManagerMap; @@ -122,6 +123,7 @@ pub struct RpcContextSeed { pub tor_socks: SocketAddr, pub notification_manager: NotificationManager, pub open_authed_websockets: Mutex>>>, + pub rpc_stream_continuations: Mutex>, } #[derive(Clone)] @@ -187,6 +189,7 @@ impl RpcContext { ))), notification_manager, open_authed_websockets: Mutex::new(BTreeMap::new()), + rpc_stream_continuations: Mutex::new(BTreeMap::new()), }); let metrics_seed = seed.clone(); tokio::spawn(async move { diff --git a/appmgr/src/core/mod.rs b/appmgr/src/core/mod.rs new file mode 100644 index 000000000..7c2dbbb06 --- /dev/null +++ b/appmgr/src/core/mod.rs @@ -0,0 +1 @@ +pub mod rpc_continuations; diff --git a/appmgr/src/core/rpc_continuations.rs b/appmgr/src/core/rpc_continuations.rs new file mode 100644 index 000000000..da4a8c7b4 --- /dev/null +++ b/appmgr/src/core/rpc_continuations.rs @@ -0,0 +1,53 @@ +use std::time::Instant; + +use futures::future::BoxFuture; +use http::{Request, Response}; +use hyper::Body; +use rand::RngCore; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] +pub struct RequestGuid = String>(T); +impl RequestGuid { + pub fn new() -> Self { + let mut buf = [0; 40]; + rand::thread_rng().fill_bytes(&mut buf); + RequestGuid(base32::encode( + base32::Alphabet::RFC4648 { padding: false }, + &buf, + )) + } + + pub fn from(r: &str) -> Option { + if r.len() != 64 { + return None; + } + for c in r.chars() { + if !(c >= 'A' && c <= 'Z' || c >= '2' && c <= '7') { + return None; + } + } + Some(RequestGuid(r.to_owned())) + } +} +#[test] +fn parse_guid() { + println!( + "{:?}", + RequestGuid::from(&format!("{}", RequestGuid::new())) + ) +} + +impl> std::fmt::Display for RequestGuid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.as_ref().fmt(f) + } +} + +pub struct RpcContinuation { + pub created_at: Instant, + pub handler: Box< + dyn FnOnce(Request) -> BoxFuture<'static, Result, crate::Error>> + + Send + + Sync, + >, +} diff --git a/appmgr/src/install/mod.rs b/appmgr/src/install/mod.rs index c48613c01..3cc5cd2a1 100644 --- a/appmgr/src/install/mod.rs +++ b/appmgr/src/install/mod.rs @@ -1,17 +1,22 @@ use std::collections::{BTreeMap, BTreeSet}; use std::io::SeekFrom; -use std::path::Path; +use std::marker::PhantomData; +use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::{Duration, Instant}; use color_eyre::eyre::eyre; use emver::VersionRange; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; -use http::StatusCode; +use http::header::CONTENT_LENGTH; +use http::{Request, Response, StatusCode}; +use hyper::Body; use patch_db::{DbHandle, LockType}; -use rpc_toolkit::command; +use rpc_toolkit::yajrc::RpcError; +use rpc_toolkit::{command, Context}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt}; use tokio::process::Command; @@ -19,7 +24,8 @@ use tokio_stream::wrappers::ReadDirStream; use tracing::instrument; use self::cleanup::cleanup_failed; -use crate::context::RpcContext; +use crate::context::{CliContext, RpcContext}; +use crate::core::rpc_continuations::{RequestGuid, RpcContinuation}; use crate::db::model::{ CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry, RecoveredPackageInfo, StaticDependencyInfo, StaticFiles, @@ -36,10 +42,10 @@ use crate::s9pk::manifest::{Manifest, PackageId}; use crate::s9pk::reader::S9pkReader; use crate::status::{MainStatus, Status}; use crate::util::io::copy_and_shutdown; -use crate::util::{display_none, display_serializable, AsyncFileExt, Version}; +use crate::util::{display_none, display_serializable, AsyncFileExt, IoFormat, Version}; use crate::version::{Current, VersionT}; use crate::volume::asset_dir; -use crate::{Error, ResultExt}; +use crate::{Error, ErrorKind, ResultExt}; pub mod cleanup; pub mod progress; @@ -69,7 +75,10 @@ pub async fn list(#[context] ctx: RpcContext) -> Result Result { + let new_ctx = ctx.clone(); + let guid = RequestGuid::new(); + let handler = Box::new(|req: Request| { + async move { + let content_length = match req.headers().get(CONTENT_LENGTH).map(|a| a.to_str()) { + None => None, + Some(Err(_)) => { + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from("Invalid Content Length")) + .with_kind(ErrorKind::Network) + } + Some(Ok(a)) => match a.parse::() { + Err(_) => { + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from("Invalid Content Length")) + .with_kind(ErrorKind::Network) + } + Ok(a) => Some(a), + }, + }; + let progress = InstallProgress::new(content_length); + + let mut hdl = new_ctx.db.handle(); + let mut tx = hdl.begin().await?; + + let mut pde = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&manifest.id) + .get_mut(&mut tx) + .await?; + match pde.take() { + Some(PackageDataEntry::Installed { + installed, + manifest, + static_files, + }) => { + *pde = Some(PackageDataEntry::Updating { + install_progress: progress.clone(), + installed, + manifest, + static_files, + }) + } + None => { + *pde = Some(PackageDataEntry::Installing { + install_progress: progress.clone(), + static_files: StaticFiles::local( + &manifest.id, + &manifest.version, + &manifest.assets.icon_type(), + ), + manifest: manifest.clone(), + }) + } + _ => { + return Err(Error::new( + eyre!("Cannot install over a package in a transient state"), + crate::ErrorKind::InvalidRequest, + )) + } + } + pde.save(&mut tx).await?; + tx.commit(None).await?; + drop(hdl); + + download_install_s9pk( + &new_ctx, + &manifest, + progress, + tokio_util::io::StreamReader::new(req.into_body().map_err(|e| { + std::io::Error::new( + match &e { + e if e.is_connect() => std::io::ErrorKind::ConnectionRefused, + e if e.is_timeout() => std::io::ErrorKind::TimedOut, + _ => std::io::ErrorKind::Other, + }, + e, + ) + })), + ) + .await?; + Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .with_kind(ErrorKind::Network) + } + .boxed() + }); + let cont = RpcContinuation { + created_at: Instant::now(), // TODO + handler: handler, + }; + // gc the map + let mut guard = ctx.rpc_stream_continuations.lock().await; + let gced = std::mem::take(&mut *guard) + .into_iter() + .filter(|(_, v)| v.created_at.elapsed() < Duration::from_secs(30)) + .collect::>(); + *guard = gced; + drop(guard); + // insert the new continuation + ctx.rpc_stream_continuations + .lock() + .await + .insert(guid.clone(), cont); + Ok(guid) +} + +#[instrument(skip(ctx))] +async fn cli_install(ctx: CliContext, target: String) -> Result<(), RpcError> { + if target.ends_with(".s9pk") { + let path = PathBuf::from(target); + + // inspect manifest no verify + let manifest = crate::inspect::manifest(path.clone(), true, Some(IoFormat::Json)).await?; + + // rpc call remote sideload + tracing::debug!("calling package.sideload"); + let guid = rpc_toolkit::command_helpers::call_remote( + ctx.clone(), + "package.sideload", + serde_json::json!({ "manifest": manifest }), + PhantomData::, + ) + .await? + .result?; + tracing::debug!("package.sideload succeeded {:?}", guid); + + // hit continuation api with guid that comes back + let file = tokio::fs::File::open(path).await?; + let content_length = file.metadata().await?.len(); + let body = Body::wrap_stream(tokio_util::io::ReaderStream::new(file)); + let client = reqwest::Client::new(); + let res = client + .post(dbg!(format!( + "{}://{}/rest/rpc/{}", + ctx.protocol(), + ctx.host(), + guid + ))) + .header(CONTENT_LENGTH, content_length) + .body(body) + .send() + .await?; + if res.status().as_u16() == 200 { + tracing::info!("Package Uploaded") + } else { + tracing::info!("Package Upload failed: {}", res.text().await?) + } + } else { + tracing::debug!("calling package.install"); + rpc_toolkit::command_helpers::call_remote( + ctx, + "package.install", + serde_json::json!({ "id": target }), + PhantomData::<()>, + ) + .await? + .result?; + tracing::debug!("package.install succeeded"); + } + Ok(()) +} + #[command( subcommands(self(uninstall_impl(async)), uninstall_dry), display(display_none) diff --git a/appmgr/src/lib.rs b/appmgr/src/lib.rs index 4a5e75ff3..a2888d30a 100644 --- a/appmgr/src/lib.rs +++ b/appmgr/src/lib.rs @@ -8,6 +8,7 @@ pub mod backup; pub mod config; pub mod context; pub mod control; +pub mod core; pub mod db; pub mod dependencies; pub mod developer; @@ -83,6 +84,7 @@ pub fn server() -> Result<(), RpcError> { #[command(subcommands( action::action, install::install, + install::sideload, install::uninstall, install::list, config::config, diff --git a/appmgr/src/nginx/main-ui.conf.template b/appmgr/src/nginx/main-ui.conf.template index 3a21ae5c1..8a3e27145 100644 --- a/appmgr/src/nginx/main-ui.conf.template +++ b/appmgr/src/nginx/main-ui.conf.template @@ -21,11 +21,18 @@ server {{ }} location /ws/ {{ - proxy_pass http://127.0.0.1:5960/; + proxy_pass http://127.0.0.1:5960$request_uri; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; }} + location /rest/ {{ + proxy_pass http://127.0.0.1:5960$request_uri; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + client_max_body_size 0; + }} + location /public/ {{ proxy_pass http://127.0.0.1:5961/; }} @@ -72,6 +79,13 @@ server {{ proxy_set_header Connection "Upgrade"; }} + location /rest/ {{ + proxy_pass http://127.0.0.1:5960$request_uri; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + client_max_body_size 0; + }} + location /public/ {{ proxy_pass http://127.0.0.1:5961/; }}