Feature/s9pk sideload (#808)

* adds rpc continuations endpoint, rewires websocket endpoint

* sideload processor finished

* cli portion

* bugfixes

* moar bugfixes

* cleanup

* unfuck rust

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>

* fixes url source

Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
This commit is contained in:
Keagan McClelland
2021-11-19 12:31:22 -07:00
committed by Aiden McClelland
parent fd07d0208d
commit 18e9d260d0
8 changed files with 317 additions and 18 deletions

View File

@@ -3,6 +3,7 @@ use std::time::Duration;
use color_eyre::eyre::eyre;
use embassy::context::{DiagnosticContext, RpcContext};
use embassy::core::rpc_continuations::RequestGuid;
use embassy::db::subscribe;
use embassy::middleware::auth::auth;
use embassy::middleware::cors::cors;
@@ -153,10 +154,45 @@ async fn inner_main(cfg_path: Option<&str>) -> Result<Option<Shutdown>, Error> {
move |req| {
let ctx = ctx.clone();
async move {
tracing::debug!("Request to {}", req.uri().path());
match req.uri().path() {
"/db" => {
"/ws/db" => {
Ok(subscribe(ctx, req).await.unwrap_or_else(err_to_500))
}
path if path.starts_with("/rest/rpc/") => {
match RequestGuid::from(
path.strip_prefix("/rest/rpc/").unwrap(),
) {
None => {
tracing::debug!("No Guid Path");
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
}
Some(guid) => {
match ctx
.rpc_stream_continuations
.lock()
.await
.remove(&guid)
{
None => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty()),
Some(cont) => {
match (cont.handler)(req).await {
Ok(r) => Ok(r),
Err(e) => Response::builder()
.status(
StatusCode::INTERNAL_SERVER_ERROR,
)
.body(Body::from(format!("{}", e))),
}
}
}
}
}
}
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty()),

View File

@@ -29,7 +29,8 @@ pub struct CliContextConfig {
#[derive(Debug)]
pub struct CliContextSeed {
pub url: Url,
pub base_url: Url,
pub rpc_url: Url,
pub client: Client,
pub cookie_store: Arc<CookieStoreMutex>,
pub cookie_path: PathBuf,
@@ -69,14 +70,14 @@ impl CliContext {
} else {
CliContextConfig::default()
};
let url = if let Some(host) = matches.value_of("host") {
let mut url = if let Some(host) = matches.value_of("host") {
host.parse()?
} else if let Some(host) = base.host {
host
} else {
format!(
"http://{}",
base.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into())
base.bind_rpc.unwrap_or(([127, 0, 0, 1], 80).into())
)
.parse()?
};
@@ -100,7 +101,15 @@ impl CliContext {
CookieStore::default()
}));
Ok(CliContext(Arc::new(CliContextSeed {
url,
base_url: url.clone(),
rpc_url: {
url.path_segments_mut()
.map_err(|_| eyre!("Url cannot be base"))
.with_kind(crate::ErrorKind::ParseUrl)?
.push("rpc")
.push("v1");
dbg!(url)
},
client: {
let mut builder = Client::builder().cookie_provider(cookie_store.clone());
if let Some(proxy) = proxy {
@@ -122,19 +131,19 @@ impl std::ops::Deref for CliContext {
}
impl Context for CliContext {
fn protocol(&self) -> &str {
self.0.url.scheme()
self.0.base_url.scheme()
}
fn host(&self) -> Host<&str> {
self.0.url.host().unwrap_or(DEFAULT_HOST)
self.0.base_url.host().unwrap_or(DEFAULT_HOST)
}
fn port(&self) -> u16 {
self.0.url.port().unwrap_or(DEFAULT_PORT)
self.0.base_url.port().unwrap_or(DEFAULT_PORT)
}
fn path(&self) -> &str {
self.0.url.path()
self.0.rpc_url.path()
}
fn url(&self) -> Url {
self.0.url.clone()
self.0.rpc_url.clone()
}
fn client(&self) -> &Client {
&self.0.client

View File

@@ -19,6 +19,7 @@ use tokio::fs::File;
use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
use tracing::instrument;
use crate::core::rpc_continuations::{RequestGuid, RpcContinuation};
use crate::db::model::Database;
use crate::hostname::{get_hostname, get_id};
use crate::manager::ManagerMap;
@@ -122,6 +123,7 @@ pub struct RpcContextSeed {
pub tor_socks: SocketAddr,
pub notification_manager: NotificationManager,
pub open_authed_websockets: Mutex<BTreeMap<HashSessionToken, Vec<oneshot::Sender<()>>>>,
pub rpc_stream_continuations: Mutex<BTreeMap<RequestGuid, RpcContinuation>>,
}
#[derive(Clone)]
@@ -187,6 +189,7 @@ impl RpcContext {
))),
notification_manager,
open_authed_websockets: Mutex::new(BTreeMap::new()),
rpc_stream_continuations: Mutex::new(BTreeMap::new()),
});
let metrics_seed = seed.clone();
tokio::spawn(async move {

1
appmgr/src/core/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod rpc_continuations;

View File

@@ -0,0 +1,53 @@
use std::time::Instant;
use futures::future::BoxFuture;
use http::{Request, Response};
use hyper::Body;
use rand::RngCore;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
pub struct RequestGuid<T: AsRef<str> = String>(T);
impl RequestGuid {
pub fn new() -> Self {
let mut buf = [0; 40];
rand::thread_rng().fill_bytes(&mut buf);
RequestGuid(base32::encode(
base32::Alphabet::RFC4648 { padding: false },
&buf,
))
}
pub fn from(r: &str) -> Option<RequestGuid> {
if r.len() != 64 {
return None;
}
for c in r.chars() {
if !(c >= 'A' && c <= 'Z' || c >= '2' && c <= '7') {
return None;
}
}
Some(RequestGuid(r.to_owned()))
}
}
#[test]
fn parse_guid() {
println!(
"{:?}",
RequestGuid::from(&format!("{}", RequestGuid::new()))
)
}
impl<T: AsRef<str>> std::fmt::Display for RequestGuid<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.as_ref().fmt(f)
}
}
pub struct RpcContinuation {
pub created_at: Instant,
pub handler: Box<
dyn FnOnce(Request<Body>) -> BoxFuture<'static, Result<Response<Body>, crate::Error>>
+ Send
+ Sync,
>,
}

View File

@@ -1,17 +1,22 @@
use std::collections::{BTreeMap, BTreeSet};
use std::io::SeekFrom;
use std::path::Path;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use color_eyre::eyre::eyre;
use emver::VersionRange;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use http::StatusCode;
use http::header::CONTENT_LENGTH;
use http::{Request, Response, StatusCode};
use hyper::Body;
use patch_db::{DbHandle, LockType};
use rpc_toolkit::command;
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{command, Context};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio::process::Command;
@@ -19,7 +24,8 @@ use tokio_stream::wrappers::ReadDirStream;
use tracing::instrument;
use self::cleanup::cleanup_failed;
use crate::context::RpcContext;
use crate::context::{CliContext, RpcContext};
use crate::core::rpc_continuations::{RequestGuid, RpcContinuation};
use crate::db::model::{
CurrentDependencyInfo, InstalledPackageDataEntry, PackageDataEntry, RecoveredPackageInfo,
StaticDependencyInfo, StaticFiles,
@@ -36,10 +42,10 @@ use crate::s9pk::manifest::{Manifest, PackageId};
use crate::s9pk::reader::S9pkReader;
use crate::status::{MainStatus, Status};
use crate::util::io::copy_and_shutdown;
use crate::util::{display_none, display_serializable, AsyncFileExt, Version};
use crate::util::{display_none, display_serializable, AsyncFileExt, IoFormat, Version};
use crate::version::{Current, VersionT};
use crate::volume::asset_dir;
use crate::{Error, ResultExt};
use crate::{Error, ErrorKind, ResultExt};
pub mod cleanup;
pub mod progress;
@@ -69,7 +75,10 @@ pub async fn list(#[context] ctx: RpcContext) -> Result<Vec<(PackageId, Version)
.collect())
}
#[command(display(display_none))]
#[command(
custom_cli(cli_install(async, context(CliContext))),
display(display_none)
)]
#[instrument(skip(ctx))]
pub async fn install(
#[context] ctx: RpcContext,
@@ -192,6 +201,178 @@ pub async fn install(
})
}
#[command(rpc_only, display(display_none))]
#[instrument(skip(ctx))]
pub async fn sideload(
#[context] ctx: RpcContext,
#[arg] manifest: Manifest,
) -> Result<RequestGuid, Error> {
let new_ctx = ctx.clone();
let guid = RequestGuid::new();
let handler = Box::new(|req: Request<Body>| {
async move {
let content_length = match req.headers().get(CONTENT_LENGTH).map(|a| a.to_str()) {
None => None,
Some(Err(_)) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Invalid Content Length"))
.with_kind(ErrorKind::Network)
}
Some(Ok(a)) => match a.parse::<u64>() {
Err(_) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Invalid Content Length"))
.with_kind(ErrorKind::Network)
}
Ok(a) => Some(a),
},
};
let progress = InstallProgress::new(content_length);
let mut hdl = new_ctx.db.handle();
let mut tx = hdl.begin().await?;
let mut pde = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&manifest.id)
.get_mut(&mut tx)
.await?;
match pde.take() {
Some(PackageDataEntry::Installed {
installed,
manifest,
static_files,
}) => {
*pde = Some(PackageDataEntry::Updating {
install_progress: progress.clone(),
installed,
manifest,
static_files,
})
}
None => {
*pde = Some(PackageDataEntry::Installing {
install_progress: progress.clone(),
static_files: StaticFiles::local(
&manifest.id,
&manifest.version,
&manifest.assets.icon_type(),
),
manifest: manifest.clone(),
})
}
_ => {
return Err(Error::new(
eyre!("Cannot install over a package in a transient state"),
crate::ErrorKind::InvalidRequest,
))
}
}
pde.save(&mut tx).await?;
tx.commit(None).await?;
drop(hdl);
download_install_s9pk(
&new_ctx,
&manifest,
progress,
tokio_util::io::StreamReader::new(req.into_body().map_err(|e| {
std::io::Error::new(
match &e {
e if e.is_connect() => std::io::ErrorKind::ConnectionRefused,
e if e.is_timeout() => std::io::ErrorKind::TimedOut,
_ => std::io::ErrorKind::Other,
},
e,
)
})),
)
.await?;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.with_kind(ErrorKind::Network)
}
.boxed()
});
let cont = RpcContinuation {
created_at: Instant::now(), // TODO
handler: handler,
};
// gc the map
let mut guard = ctx.rpc_stream_continuations.lock().await;
let gced = std::mem::take(&mut *guard)
.into_iter()
.filter(|(_, v)| v.created_at.elapsed() < Duration::from_secs(30))
.collect::<BTreeMap<RequestGuid, RpcContinuation>>();
*guard = gced;
drop(guard);
// insert the new continuation
ctx.rpc_stream_continuations
.lock()
.await
.insert(guid.clone(), cont);
Ok(guid)
}
#[instrument(skip(ctx))]
async fn cli_install(ctx: CliContext, target: String) -> Result<(), RpcError> {
if target.ends_with(".s9pk") {
let path = PathBuf::from(target);
// inspect manifest no verify
let manifest = crate::inspect::manifest(path.clone(), true, Some(IoFormat::Json)).await?;
// rpc call remote sideload
tracing::debug!("calling package.sideload");
let guid = rpc_toolkit::command_helpers::call_remote(
ctx.clone(),
"package.sideload",
serde_json::json!({ "manifest": manifest }),
PhantomData::<RequestGuid>,
)
.await?
.result?;
tracing::debug!("package.sideload succeeded {:?}", guid);
// hit continuation api with guid that comes back
let file = tokio::fs::File::open(path).await?;
let content_length = file.metadata().await?.len();
let body = Body::wrap_stream(tokio_util::io::ReaderStream::new(file));
let client = reqwest::Client::new();
let res = client
.post(dbg!(format!(
"{}://{}/rest/rpc/{}",
ctx.protocol(),
ctx.host(),
guid
)))
.header(CONTENT_LENGTH, content_length)
.body(body)
.send()
.await?;
if res.status().as_u16() == 200 {
tracing::info!("Package Uploaded")
} else {
tracing::info!("Package Upload failed: {}", res.text().await?)
}
} else {
tracing::debug!("calling package.install");
rpc_toolkit::command_helpers::call_remote(
ctx,
"package.install",
serde_json::json!({ "id": target }),
PhantomData::<()>,
)
.await?
.result?;
tracing::debug!("package.install succeeded");
}
Ok(())
}
#[command(
subcommands(self(uninstall_impl(async)), uninstall_dry),
display(display_none)

View File

@@ -8,6 +8,7 @@ pub mod backup;
pub mod config;
pub mod context;
pub mod control;
pub mod core;
pub mod db;
pub mod dependencies;
pub mod developer;
@@ -83,6 +84,7 @@ pub fn server() -> Result<(), RpcError> {
#[command(subcommands(
action::action,
install::install,
install::sideload,
install::uninstall,
install::list,
config::config,

View File

@@ -21,11 +21,18 @@ server {{
}}
location /ws/ {{
proxy_pass http://127.0.0.1:5960/;
proxy_pass http://127.0.0.1:5960$request_uri;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}}
location /rest/ {{
proxy_pass http://127.0.0.1:5960$request_uri;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
client_max_body_size 0;
}}
location /public/ {{
proxy_pass http://127.0.0.1:5961/;
}}
@@ -72,6 +79,13 @@ server {{
proxy_set_header Connection "Upgrade";
}}
location /rest/ {{
proxy_pass http://127.0.0.1:5960$request_uri;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
client_max_body_size 0;
}}
location /public/ {{
proxy_pass http://127.0.0.1:5961/;
}}