Bugfix/websockets (#2808)

* retry logic for init status

* fix login flashing and sideload hanging

* add logging

* misc backend bugfixes

* use closingObserver instead

* always show reinstall button

* go back to endWith

* show error if sideload fails

* refactor more watch channels

* navigate to services page on sideload complete

* handle error closure events properly

* handle error scenario better in sideload websocket

* remove a clone

---------

Co-authored-by: Matt Hill <mattnine@protonmail.com>
This commit is contained in:
Aiden McClelland
2025-01-14 03:39:52 +00:00
committed by GitHub
parent eb1f3a0ced
commit 5d759f810c
22 changed files with 451 additions and 293 deletions

View File

@@ -1,5 +1,5 @@
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path}; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -162,21 +162,30 @@ impl SetupContext {
if let Err(e) = async { if let Err(e) = async {
let mut stream = let mut stream =
progress_tracker.stream(Some(Duration::from_millis(100))); progress_tracker.stream(Some(Duration::from_millis(100)));
while let Some(progress) = stream.next().await { loop {
ws.send(ws::Message::Text( tokio::select! {
serde_json::to_string(&progress) progress = stream.next() => {
.with_kind(ErrorKind::Serialization)?, if let Some(progress) = progress {
)) ws.send(ws::Message::Text(
.await serde_json::to_string(&progress)
.with_kind(ErrorKind::Network)?; .with_kind(ErrorKind::Serialization)?,
if progress.overall.is_complete() { ))
break; .await
.with_kind(ErrorKind::Network)?;
if progress.overall.is_complete() {
return ws.normal_close("complete").await;
}
} else {
return ws.normal_close("complete").await;
}
}
msg = ws.recv() => {
if msg.transpose().with_kind(ErrorKind::Network)?.is_none() {
return Ok(())
}
}
} }
} }
ws.normal_close("complete").await?;
Ok::<_, Error>(())
} }
.await .await
{ {

View File

@@ -198,17 +198,26 @@ pub async fn subscribe(
session, session,
|mut ws| async move { |mut ws| async move {
if let Err(e) = async { if let Err(e) = async {
while let Some(rev) = sub.recv().await { loop {
ws.send(ws::Message::Text( tokio::select! {
serde_json::to_string(&rev).with_kind(ErrorKind::Serialization)?, rev = sub.recv() => {
)) if let Some(rev) = rev {
.await ws.send(ws::Message::Text(
.with_kind(ErrorKind::Network)?; serde_json::to_string(&rev).with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
} else {
return ws.normal_close("complete").await;
}
}
msg = ws.recv() => {
if msg.transpose().with_kind(ErrorKind::Network)?.is_none() {
return Ok(())
}
}
}
} }
ws.normal_close("complete").await?;
Ok::<_, Error>(())
} }
.await .await
{ {

View File

@@ -2,6 +2,7 @@ use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use axum::extract::ws;
use clap::builder::ValueParserFactory; use clap::builder::ValueParserFactory;
use clap::{value_parser, CommandFactory, FromArgMatches, Parser}; use clap::{value_parser, CommandFactory, FromArgMatches, Parser};
use color_eyre::eyre::eyre; use color_eyre::eyre::eyre;
@@ -12,7 +13,7 @@ use itertools::Itertools;
use models::{FromStrParser, VersionString}; use models::{FromStrParser, VersionString};
use reqwest::header::{HeaderMap, CONTENT_LENGTH}; use reqwest::header::{HeaderMap, CONTENT_LENGTH};
use reqwest::Url; use reqwest::Url;
use rpc_toolkit::yajrc::{GenericRpcMethod, RpcError}; use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::HandlerArgs; use rpc_toolkit::HandlerArgs;
use rustyline_async::ReadlineEvent; use rustyline_async::ReadlineEvent;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -188,7 +189,7 @@ pub async fn sideload(
SideloadParams { session }: SideloadParams, SideloadParams { session }: SideloadParams,
) -> Result<SideloadResponse, Error> { ) -> Result<SideloadResponse, Error> {
let (upload, file) = upload(&ctx, session.clone()).await?; let (upload, file) = upload(&ctx, session.clone()).await?;
let (err_send, err_recv) = oneshot::channel::<Error>(); let (err_send, mut err_recv) = oneshot::channel::<Error>();
let progress = Guid::new(); let progress = Guid::new();
let progress_tracker = FullProgressTracker::new(); let progress_tracker = FullProgressTracker::new();
let mut progress_listener = progress_tracker.stream(Some(Duration::from_millis(200))); let mut progress_listener = progress_tracker.stream(Some(Duration::from_millis(200)));
@@ -198,40 +199,44 @@ pub async fn sideload(
RpcContinuation::ws_authed( RpcContinuation::ws_authed(
&ctx, &ctx,
session, session,
|mut ws| { |mut ws| async move {
use axum::extract::ws::Message; if let Err(e) = async {
async move { loop {
if let Err(e) = async {
tokio::select! { tokio::select! {
res = async { progress = progress_listener.next() => {
while let Some(progress) = progress_listener.next().await { if let Some(progress) = progress {
ws.send(Message::Text( ws.send(ws::Message::Text(
serde_json::to_string(&progress) serde_json::to_string(&progress)
.with_kind(ErrorKind::Serialization)?, .with_kind(ErrorKind::Serialization)?,
)) ))
.await .await
.with_kind(ErrorKind::Network)?; .with_kind(ErrorKind::Network)?;
if progress.overall.is_complete() {
return ws.normal_close("complete").await;
}
} else {
return ws.normal_close("complete").await;
} }
Ok::<_, Error>(()) }
} => res?, msg = ws.recv() => {
err = err_recv => { if msg.transpose().with_kind(ErrorKind::Network)?.is_none() {
return Ok(())
}
}
err = (&mut err_recv) => {
if let Ok(e) = err { if let Ok(e) = err {
ws.close_result(Err::<&str, _>(e.clone_output())).await?; ws.close_result(Err::<&str, _>(e.clone_output())).await?;
return Err(e) return Err(e)
} }
} }
} }
ws.normal_close("complete").await?;
Ok::<_, Error>(())
}
.await
{
tracing::error!("Error tracking sideload progress: {e}");
tracing::debug!("{e:?}");
} }
} }
.await
{
tracing::error!("Error tracking sideload progress: {e}");
tracing::debug!("{e:?}");
}
}, },
Duration::from_secs(600), Duration::from_secs(600),
), ),
@@ -255,9 +260,9 @@ pub async fn sideload(
} }
.await .await
{ {
let _ = err_send.send(e.clone_output());
tracing::error!("Error sideloading package: {e}"); tracing::error!("Error sideloading package: {e}");
tracing::debug!("{e:?}"); tracing::debug!("{e:?}");
let _ = err_send.send(e);
} }
}); });
Ok(SideloadResponse { upload, progress }) Ok(SideloadResponse { upload, progress })

View File

@@ -30,6 +30,7 @@ use crate::error::ResultExt;
use crate::lxc::ContainerId; use crate::lxc::ContainerId;
use crate::prelude::*; use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations}; use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations};
use crate::util::net::WebSocketExt;
use crate::util::serde::Reversible; use crate::util::serde::Reversible;
use crate::util::Invoke; use crate::util::Invoke;
@@ -80,34 +81,28 @@ async fn ws_handler(
.with_kind(ErrorKind::Network)?; .with_kind(ErrorKind::Network)?;
} }
let mut ws_closed = false; loop {
while let Some(entry) = tokio::select! { tokio::select! {
a = logs.try_next() => Some(a?), entry = logs.try_next() => {
a = stream.try_next() => { a.with_kind(crate::ErrorKind::Network)?; ws_closed = true; None } if let Some(entry) = entry? {
} { let (_, log_entry) = entry.log_entry()?;
if let Some(entry) = entry { stream
let (_, log_entry) = entry.log_entry()?; .send(ws::Message::Text(
stream serde_json::to_string(&log_entry).with_kind(ErrorKind::Serialization)?,
.send(ws::Message::Text( ))
serde_json::to_string(&log_entry).with_kind(ErrorKind::Serialization)?, .await
)) .with_kind(ErrorKind::Network)?;
.await } else {
.with_kind(ErrorKind::Network)?; return stream.normal_close("complete").await;
}
},
msg = stream.try_next() => {
if msg.with_kind(crate::ErrorKind::Network)?.is_none() {
return Ok(())
}
}
} }
} }
if !ws_closed {
stream
.send(ws::Message::Close(Some(ws::CloseFrame {
code: ws::close_code::NORMAL,
reason: "Log Stream Finished".into(),
})))
.await
.with_kind(ErrorKind::Network)?;
drop(stream);
}
Ok(())
} }
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]

View File

@@ -8,10 +8,11 @@ use id_pool::IdPool;
use imbl_value::InternedString; use imbl_value::InternedString;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::{mpsc, watch}; use tokio::sync::mpsc;
use crate::db::model::public::NetworkInterfaceInfo; use crate::db::model::public::NetworkInterfaceInfo;
use crate::prelude::*; use crate::prelude::*;
use crate::util::sync::Watch;
use crate::util::Invoke; use crate::util::Invoke;
pub const START9_BRIDGE_IFACE: &str = "lxcbr0"; pub const START9_BRIDGE_IFACE: &str = "lxcbr0";
@@ -147,17 +148,16 @@ pub struct LanPortForwardController {
_thread: NonDetachingJoinHandle<()>, _thread: NonDetachingJoinHandle<()>,
} }
impl LanPortForwardController { impl LanPortForwardController {
pub fn new( pub fn new(mut ip_info: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>) -> Self {
mut net_iface: watch::Receiver<BTreeMap<InternedString, NetworkInterfaceInfo>>,
) -> Self {
let (req_send, mut req_recv) = mpsc::unbounded_channel(); let (req_send, mut req_recv) = mpsc::unbounded_channel();
let thread = NonDetachingJoinHandle::from(tokio::spawn(async move { let thread = NonDetachingJoinHandle::from(tokio::spawn(async move {
let mut state = ForwardState::default(); let mut state = ForwardState::default();
let mut interfaces = net_iface let mut interfaces = ip_info.peek_and_mark_seen(|ip_info| {
.borrow_and_update() ip_info
.iter() .iter()
.map(|(iface, info)| (iface.clone(), info.public())) .map(|(iface, info)| (iface.clone(), info.public()))
.collect(); .collect()
});
let mut reply: Option<oneshot::Sender<Result<(), Error>>> = None; let mut reply: Option<oneshot::Sender<Result<(), Error>>> = None;
loop { loop {
tokio::select! { tokio::select! {
@@ -171,12 +171,13 @@ impl LanPortForwardController {
break; break;
} }
} }
_ = net_iface.changed() => { _ = ip_info.changed() => {
interfaces = net_iface interfaces = ip_info.peek(|ip_info| {
.borrow() ip_info
.iter() .iter()
.map(|(iface, info)| (iface.clone(), info.public())) .map(|(iface, info)| (iface.clone(), info.public()))
.collect(); .collect()
});
} }
} }
let res = state.sync(&interfaces).await; let res = state.sync(&interfaces).await;

View File

@@ -1,7 +1,5 @@
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6};
use std::pin::Pin;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::task::Poll; use std::task::Poll;
use std::time::Duration; use std::time::Duration;
@@ -19,7 +17,6 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::watch;
use ts_rs::TS; use ts_rs::TS;
use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream}; use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream};
use zbus::zvariant::{ use zbus::zvariant::{
@@ -35,7 +32,7 @@ use crate::prelude::*;
use crate::util::future::Until; use crate::util::future::Until;
use crate::util::io::open_file; use crate::util::io::open_file;
use crate::util::serde::{display_serializable, HandlerExtSerde}; use crate::util::serde::{display_serializable, HandlerExtSerde};
use crate::util::sync::SyncMutex; use crate::util::sync::{SyncMutex, Watch};
use crate::util::Invoke; use crate::util::Invoke;
pub fn network_interface_api<C: Context>() -> ParentHandler<C> { pub fn network_interface_api<C: Context>() -> ParentHandler<C> {
@@ -112,7 +109,7 @@ pub fn network_interface_api<C: Context>() -> ParentHandler<C> {
async fn list_interfaces( async fn list_interfaces(
ctx: RpcContext, ctx: RpcContext,
) -> Result<BTreeMap<InternedString, NetworkInterfaceInfo>, Error> { ) -> Result<BTreeMap<InternedString, NetworkInterfaceInfo>, Error> {
Ok(ctx.net_controller.net_iface.ip_info.borrow().clone()) Ok(ctx.net_controller.net_iface.ip_info.read())
} }
#[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)] #[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)]
@@ -322,7 +319,7 @@ impl<'a> StubStream<'a> for SignalStream<'a> {
} }
#[instrument(skip_all)] #[instrument(skip_all)]
async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, NetworkInterfaceInfo>>) { async fn watcher(write_to: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>) {
loop { loop {
let res: Result<(), Error> = async { let res: Result<(), Error> = async {
let connection = Connection::system().await?; let connection = Connection::system().await?;
@@ -425,7 +422,7 @@ async fn watch_ip(
connection: &Connection, connection: &Connection,
device_proxy: device::DeviceProxy<'_>, device_proxy: device::DeviceProxy<'_>,
iface: InternedString, iface: InternedString,
write_to: &watch::Sender<BTreeMap<InternedString, NetworkInterfaceInfo>>, write_to: &Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut until = Until::new() let mut until = Until::new()
.with_stream( .with_stream(
@@ -593,13 +590,13 @@ async fn watch_ip(
pub struct NetworkInterfaceController { pub struct NetworkInterfaceController {
db: TypedPatchDb<Database>, db: TypedPatchDb<Database>,
ip_info: watch::Sender<BTreeMap<InternedString, NetworkInterfaceInfo>>, ip_info: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>,
_watcher: NonDetachingJoinHandle<()>, _watcher: NonDetachingJoinHandle<()>,
listeners: SyncMutex<BTreeMap<u16, Weak<()>>>, listeners: SyncMutex<BTreeMap<u16, Weak<()>>>,
} }
impl NetworkInterfaceController { impl NetworkInterfaceController {
pub fn subscribe(&self) -> watch::Receiver<BTreeMap<InternedString, NetworkInterfaceInfo>> { pub fn subscribe(&self) -> Watch<BTreeMap<InternedString, NetworkInterfaceInfo>> {
self.ip_info.subscribe() self.ip_info.clone_unseen()
} }
async fn sync( async fn sync(
@@ -667,7 +664,7 @@ impl NetworkInterfaceController {
Ok(()) Ok(())
} }
pub fn new(db: TypedPatchDb<Database>) -> Self { pub fn new(db: TypedPatchDb<Database>) -> Self {
let (ip_info, mut recv) = watch::channel(BTreeMap::new()); let mut ip_info = Watch::new(BTreeMap::new());
Self { Self {
db: db.clone(), db: db.clone(),
ip_info: ip_info.clone(), ip_info: ip_info.clone(),
@@ -695,7 +692,7 @@ impl NetworkInterfaceController {
let res: Result<(), Error> = async { let res: Result<(), Error> = async {
loop { loop {
if let Err(e) = async { if let Err(e) = async {
let ip_info = { recv.borrow().clone() }; let ip_info = ip_info.read();
Self::sync(&db, &ip_info).boxed().await?; Self::sync(&db, &ip_info).boxed().await?;
Ok::<_, Error>(()) Ok::<_, Error>(())
@@ -706,7 +703,7 @@ impl NetworkInterfaceController {
tracing::debug!("{e:?}"); tracing::debug!("{e:?}");
} }
let _ = recv.changed().await; let _ = ip_info.changed().await;
} }
} }
.await; .await;
@@ -733,12 +730,10 @@ impl NetworkInterfaceController {
l.insert(port, Arc::downgrade(&arc)); l.insert(port, Arc::downgrade(&arc));
Ok(()) Ok(())
})?; })?;
let mut ip_info = self.ip_info.subscribe(); let ip_info = self.ip_info.clone_unseen();
ip_info.mark_changed();
Ok(NetworkInterfaceListener { Ok(NetworkInterfaceListener {
_arc: arc, _arc: arc,
ip_info, ip_info,
changed: None,
listeners: ListenerMap::new(port), listeners: ListenerMap::new(port),
}) })
} }
@@ -760,12 +755,11 @@ impl NetworkInterfaceController {
l.insert(port, Arc::downgrade(&arc)); l.insert(port, Arc::downgrade(&arc));
Ok(()) Ok(())
})?; })?;
let mut ip_info = self.ip_info.subscribe(); let ip_info = self.ip_info.clone_unseen();
ip_info.mark_changed(); ip_info.mark_changed();
Ok(NetworkInterfaceListener { Ok(NetworkInterfaceListener {
_arc: arc, _arc: arc,
ip_info, ip_info,
changed: None,
listeners, listeners,
}) })
} }
@@ -985,9 +979,8 @@ impl ListenerMap {
} }
pub struct NetworkInterfaceListener { pub struct NetworkInterfaceListener {
ip_info: watch::Receiver<BTreeMap<InternedString, NetworkInterfaceInfo>>, ip_info: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>,
listeners: ListenerMap, listeners: ListenerMap,
changed: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
_arc: Arc<()>, _arc: Arc<()>,
} }
impl NetworkInterfaceListener { impl NetworkInterfaceListener {
@@ -995,29 +988,14 @@ impl NetworkInterfaceListener {
self.listeners.port self.listeners.port
} }
fn poll_ip_info_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut changed = if let Some(changed) = self.changed.take() {
changed
} else {
let mut ip_info = self.ip_info.clone();
Box::pin(async move {
let _ = ip_info.changed().await;
})
};
let res = changed.poll_unpin(cx);
if res.is_pending() {
self.changed = Some(changed);
}
res
}
pub fn poll_accept( pub fn poll_accept(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
public: bool, public: bool,
) -> Poll<Result<Accepted, Error>> { ) -> Poll<Result<Accepted, Error>> {
if self.poll_ip_info_changed(cx).is_ready() || public != self.listeners.prev_public { while self.ip_info.poll_changed(cx).is_ready() || public != self.listeners.prev_public {
self.listeners.update(&*self.ip_info.borrow(), public)?; self.ip_info
.peek(|ip_info| self.listeners.update(ip_info, public))?;
} }
self.listeners.poll_accept(cx) self.listeners.poll_accept(cx)
} }

View File

@@ -7,13 +7,12 @@ use std::task::Poll;
use std::time::Duration; use std::time::Duration;
use axum::Router; use axum::Router;
use futures::future::{BoxFuture, Either}; use futures::future::Either;
use futures::FutureExt; use futures::FutureExt;
use helpers::NonDetachingJoinHandle; use helpers::NonDetachingJoinHandle;
use hyper_util::rt::{TokioIo, TokioTimer}; use hyper_util::rt::{TokioIo, TokioTimer};
use hyper_util::service::TowerToHyperService;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{oneshot, watch}; use tokio::sync::oneshot;
use crate::context::{DiagnosticContext, InitContext, InstallContext, RpcContext, SetupContext}; use crate::context::{DiagnosticContext, InitContext, InstallContext, RpcContext, SetupContext};
use crate::net::network_interface::NetworkInterfaceListener; use crate::net::network_interface::NetworkInterfaceListener;
@@ -23,6 +22,7 @@ use crate::net::static_server::{
}; };
use crate::prelude::*; use crate::prelude::*;
use crate::util::actor::background::BackgroundJobQueue; use crate::util::actor::background::BackgroundJobQueue;
use crate::util::sync::Watch;
pub struct Accepted { pub struct Accepted {
pub https_redirect: bool, pub https_redirect: bool,
@@ -76,42 +76,22 @@ impl<A: Accept> Accept for Option<A> {
#[pin_project::pin_project] #[pin_project::pin_project]
pub struct Acceptor<A: Accept> { pub struct Acceptor<A: Accept> {
acceptor: (watch::Sender<A>, watch::Receiver<A>), acceptor: Watch<A>,
changed: Option<BoxFuture<'static, ()>>,
} }
impl<A: Accept + Send + Sync + 'static> Acceptor<A> { impl<A: Accept + Send + Sync + 'static> Acceptor<A> {
pub fn new(acceptor: A) -> Self { pub fn new(acceptor: A) -> Self {
Self { Self {
acceptor: watch::channel(acceptor), acceptor: Watch::new(acceptor),
changed: None,
} }
} }
fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut changed = if let Some(changed) = self.changed.take() { self.acceptor.poll_changed(cx)
changed
} else {
let mut recv = self.acceptor.1.clone();
async move {
let _ = recv.changed().await;
}
.boxed()
};
let res = changed.poll_unpin(cx);
if res.is_pending() {
self.changed = Some(changed);
}
res
} }
fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>> { fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<Accepted, Error>> {
let _ = self.poll_changed(cx); let _ = self.poll_changed(cx);
let mut res = Poll::Pending; self.acceptor.peek_mut(|a| a.poll_accept(cx))
self.acceptor.0.send_if_modified(|a| {
res = a.poll_accept(cx);
false
});
res
} }
async fn accept(&mut self) -> Result<Accepted, Error> { async fn accept(&mut self) -> Result<Accepted, Error> {
@@ -139,7 +119,7 @@ impl Acceptor<UpgradableListener> {
} }
pub struct WebServerAcceptorSetter<A: Accept> { pub struct WebServerAcceptorSetter<A: Accept> {
acceptor: watch::Sender<A>, acceptor: Watch<A>,
} }
impl<A: Accept, B: Accept> WebServerAcceptorSetter<Option<Either<A, B>>> { impl<A: Accept, B: Accept> WebServerAcceptorSetter<Option<Either<A, B>>> {
pub fn try_upgrade<F: FnOnce(A) -> Result<B, Error>>(&self, f: F) -> Result<(), Error> { pub fn try_upgrade<F: FnOnce(A) -> Result<B, Error>>(&self, f: F) -> Result<(), Error> {
@@ -160,7 +140,7 @@ impl<A: Accept, B: Accept> WebServerAcceptorSetter<Option<Either<A, B>>> {
} }
} }
impl<A: Accept> Deref for WebServerAcceptorSetter<A> { impl<A: Accept> Deref for WebServerAcceptorSetter<A> {
type Target = watch::Sender<A>; type Target = Watch<A>;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.acceptor &self.acceptor
} }
@@ -168,8 +148,8 @@ impl<A: Accept> Deref for WebServerAcceptorSetter<A> {
pub struct WebServer<A: Accept> { pub struct WebServer<A: Accept> {
shutdown: oneshot::Sender<()>, shutdown: oneshot::Sender<()>,
router: watch::Sender<Option<Router>>, router: Watch<Option<Router>>,
acceptor: watch::Sender<A>, acceptor: Watch<A>,
thread: NonDetachingJoinHandle<()>, thread: NonDetachingJoinHandle<()>,
} }
impl<A: Accept + Send + Sync + 'static> WebServer<A> { impl<A: Accept + Send + Sync + 'static> WebServer<A> {
@@ -180,8 +160,9 @@ impl<A: Accept + Send + Sync + 'static> WebServer<A> {
} }
pub fn new(mut acceptor: Acceptor<A>) -> Self { pub fn new(mut acceptor: Acceptor<A>) -> Self {
let acceptor_send = acceptor.acceptor.0.clone(); let acceptor_send = acceptor.acceptor.clone();
let (router, service) = watch::channel::<Option<Router>>(None); let router = Watch::<Option<Router>>::new(None);
let service = router.clone_unseen();
let (shutdown, shutdown_recv) = oneshot::channel(); let (shutdown, shutdown_recv) = oneshot::channel();
let thread = NonDetachingJoinHandle::from(tokio::spawn(async move { let thread = NonDetachingJoinHandle::from(tokio::spawn(async move {
#[derive(Clone)] #[derive(Clone)]
@@ -201,6 +182,34 @@ impl<A: Accept + Send + Sync + 'static> WebServer<A> {
} }
} }
struct SwappableRouter(Watch<Option<Router>>, bool);
impl hyper::service::Service<hyper::Request<hyper::body::Incoming>> for SwappableRouter {
type Response = <Router as tower_service::Service<
hyper::Request<hyper::body::Incoming>,
>>::Response;
type Error = <Router as tower_service::Service<
hyper::Request<hyper::body::Incoming>,
>>::Error;
type Future = <Router as tower_service::Service<
hyper::Request<hyper::body::Incoming>,
>>::Future;
fn call(&self, req: hyper::Request<hyper::body::Incoming>) -> Self::Future {
use tower_service::Service;
if self.1 {
redirecter().call(req)
} else {
let router = self.0.read();
if let Some(mut router) = router {
router.call(req)
} else {
refresher().call(req)
}
}
}
}
let accept = AtomicBool::new(true); let accept = AtomicBool::new(true);
let queue_cell = Arc::new(RwLock::new(None)); let queue_cell = Arc::new(RwLock::new(None));
let graceful = hyper_util::server::graceful::GracefulShutdown::new(); let graceful = hyper_util::server::graceful::GracefulShutdown::new();
@@ -224,45 +233,16 @@ impl<A: Accept + Send + Sync + 'static> WebServer<A> {
loop { loop {
if let Err(e) = async { if let Err(e) = async {
let accepted = acceptor.accept().await?; let accepted = acceptor.accept().await?;
if accepted.https_redirect { queue.add_job(
queue.add_job( graceful.watch(
graceful.watch( server
server .serve_connection_with_upgrades(
.serve_connection_with_upgrades( TokioIo::new(accepted.stream),
TokioIo::new(accepted.stream), SwappableRouter(service.clone(), accepted.https_redirect),
TowerToHyperService::new(redirecter().into_service()), )
) .into_owned(),
.into_owned(), ),
), );
);
} else {
let service = { service.borrow().clone() };
if let Some(service) = service {
queue.add_job(
graceful.watch(
server
.serve_connection_with_upgrades(
TokioIo::new(accepted.stream),
TowerToHyperService::new(service.into_service()),
)
.into_owned(),
),
);
} else {
queue.add_job(
graceful.watch(
server
.serve_connection_with_upgrades(
TokioIo::new(accepted.stream),
TowerToHyperService::new(
refresher().into_service(),
),
)
.into_owned(),
),
);
}
}
Ok::<_, Error>(()) Ok::<_, Error>(())
} }
@@ -303,7 +283,7 @@ impl<A: Accept + Send + Sync + 'static> WebServer<A> {
} }
pub fn serve_router(&mut self, router: Router) { pub fn serve_router(&mut self, router: Router) {
self.router.send_replace(Some(router)); self.router.send(Some(router))
} }
pub fn serve_main(&mut self, ctx: RpcContext) { pub fn serve_main(&mut self, ctx: RpcContext) {

View File

@@ -66,9 +66,7 @@ impl Actor for ServiceActor {
tracing::debug!("{e:?}"); tracing::debug!("{e:?}");
} }
if ip_info.changed().await.is_err() { ip_info.changed().await;
break;
};
} }
}); });
} }

View File

@@ -20,7 +20,7 @@ use ts_rs::TS;
use crate::context::{CliContext, RpcContext}; use crate::context::{CliContext, RpcContext};
use crate::disk::mount::filesystem::bind::Bind; use crate::disk::mount::filesystem::bind::Bind;
use crate::disk::mount::filesystem::block_dev::BlockDev; use crate::disk::mount::filesystem::block_dev::BlockDev;
use crate::disk::mount::filesystem::efivarfs::{ EfiVarFs}; use crate::disk::mount::filesystem::efivarfs::EfiVarFs;
use crate::disk::mount::filesystem::overlayfs::OverlayGuard; use crate::disk::mount::filesystem::overlayfs::OverlayGuard;
use crate::disk::mount::filesystem::MountType; use crate::disk::mount::filesystem::MountType;
use crate::disk::mount::guard::{GenericMountGuard, MountGuard, TmpMountGuard}; use crate::disk::mount::guard::{GenericMountGuard, MountGuard, TmpMountGuard};
@@ -106,7 +106,7 @@ pub async fn update_system(
.with_kind(ErrorKind::Database)?, .with_kind(ErrorKind::Database)?,
) )
.await; .await;
while { loop {
let progress = ctx let progress = ctx
.db .db
.peek() .peek()
@@ -122,14 +122,22 @@ pub async fn update_system(
)) ))
.await .await
.with_kind(ErrorKind::Network)?; .with_kind(ErrorKind::Network)?;
progress.is_some() if progress.is_none() {
} { return ws.normal_close("complete").await;
sub.recv().await; }
tokio::select! {
_ = sub.recv() => (),
res = async {
loop {
if ws.recv().await.transpose().with_kind(ErrorKind::Network)?.is_none() {
return Ok(())
}
}
} => {
return res
}
}
} }
ws.normal_close("complete").await?;
Ok::<_, Error>(())
} }
.await .await
{ {

View File

@@ -19,13 +19,8 @@ pub trait WebSocketExt {
} }
impl WebSocketExt for ws::WebSocket { impl WebSocketExt for ws::WebSocket {
async fn normal_close(mut self, msg: impl Into<Cow<'static, str>> + Send) -> Result<(), Error> { async fn normal_close(self, msg: impl Into<Cow<'static, str>> + Send) -> Result<(), Error> {
self.send(ws::Message::Close(Some(CloseFrame { self.close_result(Ok::<_, Error>(msg)).await
code: 1000,
reason: msg.into(),
})))
.await
.with_kind(ErrorKind::Network)
} }
async fn close_result( async fn close_result(
mut self, mut self,
@@ -38,15 +33,23 @@ impl WebSocketExt for ws::WebSocket {
reason: msg.into(), reason: msg.into(),
}))) })))
.await .await
.with_kind(ErrorKind::Network), .with_kind(ErrorKind::Network)?,
Err(e) => self Err(e) => self
.send(ws::Message::Close(Some(CloseFrame { .send(ws::Message::Close(Some(CloseFrame {
code: 1011, code: 1011,
reason: e.to_string().into(), reason: e.to_string().into(),
}))) })))
.await .await
.with_kind(ErrorKind::Network), .with_kind(ErrorKind::Network)?,
} }
while !matches!(
self.recv()
.await
.transpose()
.with_kind(ErrorKind::Network)?,
Some(ws::Message::Close(_)) | None
) {}
Ok(())
} }
} }

View File

@@ -1,3 +1,7 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Poll, Waker};
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct SyncMutex<T>(std::sync::Mutex<T>); pub struct SyncMutex<T>(std::sync::Mutex<T>);
impl<T> SyncMutex<T> { impl<T> SyncMutex<T> {
@@ -11,3 +15,140 @@ impl<T> SyncMutex<T> {
f(&*self.0.lock().unwrap()) f(&*self.0.lock().unwrap())
} }
} }
struct WatchShared<T> {
version: u64,
data: T,
wakers: Vec<Waker>,
}
impl<T> WatchShared<T> {
fn modified(&mut self) {
self.version += 1;
for waker in self.wakers.drain(..) {
waker.wake();
}
}
}
#[pin_project::pin_project]
pub struct Watch<T> {
shared: Arc<SyncMutex<WatchShared<T>>>,
version: u64,
}
impl<T> Clone for Watch<T> {
fn clone(&self) -> Self {
Self {
shared: self.shared.clone(),
version: self.version,
}
}
}
impl<T> Watch<T> {
pub fn new(init: T) -> Self {
Self {
shared: Arc::new(SyncMutex::new(WatchShared {
version: 1,
data: init,
wakers: Vec::new(),
})),
version: 0,
}
}
pub fn clone_unseen(&self) -> Self {
Self {
shared: self.shared.clone(),
version: 0,
}
}
pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
self.shared.mutate(|shared| {
if shared.version != self.version {
self.version = shared.version;
Poll::Ready(())
} else {
let waker = cx.waker();
if !shared.wakers.iter().any(|w| w.will_wake(waker)) {
shared.wakers.push(waker.clone());
}
Poll::Pending
}
})
}
pub async fn changed(&mut self) {
futures::future::poll_fn(|cx| self.poll_changed(cx)).await
}
pub fn send_if_modified<F: FnOnce(&mut T) -> bool>(&self, modify: F) -> bool {
self.shared.mutate(|shared| {
let changed = modify(&mut shared.data);
if changed {
shared.modified();
}
changed
})
}
pub fn send_modify<U, F: FnOnce(&mut T) -> U>(&self, modify: F) -> U {
self.shared.mutate(|shared| {
let res = modify(&mut shared.data);
shared.modified();
res
})
}
pub fn send_replace(&self, new: T) -> T {
self.send_modify(|a| std::mem::replace(a, new))
}
pub fn send(&self, new: T) {
self.send_replace(new);
}
pub fn mark_changed(&self) {
self.shared.mutate(|shared| shared.modified())
}
pub fn mark_unseen(&mut self) {
self.version = 0;
}
pub fn mark_seen(&mut self) {
self.shared.peek(|shared| {
self.version = shared.version;
})
}
pub fn peek<U, F: FnOnce(&T) -> U>(&self, f: F) -> U {
self.shared.peek(|shared| f(&shared.data))
}
pub fn peek_and_mark_seen<U, F: FnOnce(&T) -> U>(&mut self, f: F) -> U {
self.shared.peek(|shared| {
self.version = shared.version;
f(&shared.data)
})
}
pub fn peek_mut<U, F: FnOnce(&mut T) -> U>(&self, f: F) -> U {
self.shared.mutate(|shared| f(&mut shared.data))
}
}
impl<T: Clone> Watch<T> {
pub fn read(&self) -> T {
self.peek(|a| a.clone())
}
}
impl<T: Clone> futures::Stream for Watch<T> {
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
this.shared.mutate(|shared| {
if shared.version != *this.version {
*this.version = shared.version;
Poll::Ready(Some(shared.data.clone()))
} else {
let waker = cx.waker();
if !shared.wakers.iter().any(|w| w.will_wake(waker)) {
shared.wakers.push(waker.clone());
}
Poll::Pending
}
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(1, None)
}
}

View File

@@ -17,17 +17,16 @@ StartOS v0.3.6 is a complete rewrite of the OS internals (everything you don't s
## Changelog ## Changelog
- [Switch to lxc-based container runtime](#lxc) - [Switch to lxc-based container runtime](#lxc)
- [Update s9pk archive format](#new-s9pk-archive-format) - [Update s9pk archive format](#s9pk-archive-format)
- [Improve config](#better-config) - [Improve Actions](#actions)
- [Unify Actions](#unify-actions)
- [Use squashfs images for OS updates](#squashfs-updates) - [Use squashfs images for OS updates](#squashfs-updates)
- [Introduce Typescript package API and SDK](#typescript-package-api-and-sdk) - [Introduce Typescript package API and SDK](#typescript-sdk)
- [Remove Postgresql](#remove-postgressql) - [Remove Postgresql](#remove-postgressql)
- [Implement detailed progress reporting](#progress-reporting) - [Implement detailed progress reporting](#progress-reporting)
- [Improve registry protocol](#registry-protocol) - [Improve registry protocol](#registry-protocol)
- [Replace unique .local URLs with unique ports](#lan-port-forwarding) - [Replace unique .local URLs with unique ports](#lan-port-forwarding)
- [Use start-fs Fuse module for improved backups](#improved-backups) - [Use start-fs Fuse module for improved backups](#improved-backups)
- [Switch to Exver for versioning](#Exver) - [Switch to Exver for versioning](#exver)
- [Support clearnet hosting via start-cli](#clearnet) - [Support clearnet hosting via start-cli](#clearnet)
### LXC ### LXC
@@ -38,21 +37,17 @@ StartOS now uses a nested container paradigm based on LXC for the outer containe
The S9PK archive format has been overhauled to allow for signature verification of partial downloads, and allow direct mounting of container images without unpacking the s9pk. The S9PK archive format has been overhauled to allow for signature verification of partial downloads, and allow direct mounting of container images without unpacking the s9pk.
### Better config
Expanded support for input types and a new UI makes configuring services easier and more powerful.
### Actions ### Actions
Actions take arbitrary form input _and_ return arbitrary responses, thus satisfying the needs of both Config and Properties, which will be removed in a future release. This gives packages developers the ability to break up Config and Properties into smaller, more specific formats, or to exclude them entirely without polluting the UI. Actions take arbitrary form input and return arbitrary responses, thus satisfying the needs of both Config and Properties, which have been removed. The new actions API gives packages developers the ability to break up Config and Properties into smaller, more specific formats, or to exclude them entirely without polluting the UI. Improved form design and new input types round out the actions experience.
### Squashfs updates ### Squashfs updates
StartOS now uses squashfs images to represent OS updates. This allows for better update verification, and improved reliability over rsync updates. StartOS now uses squashfs images to represent OS updates. This allows for better update verification, and improved reliability over rsync updates.
### Typescript package API and SDK ### Typescript SDK
StartOS now exposes a Typescript API. Package developers can take advantage in a simple, typesafe way using the new start-sdk. A barebones StartOS package (s9pk) can be produced in minutes with minimal knowledge or skill. More advanced developers can use the SDK to create highly customized user experiences with their service. Package developers can now take advantage of StartOS APIs using the new start-sdk, available in Typescript. A barebones StartOS package (s9pk) can be produced in minutes with minimal knowledge or skill. More advanced developers can use the SDK to create highly customized user experiences with their service.
### Remove PostgresSQL ### Remove PostgresSQL
@@ -76,8 +71,14 @@ The new start-fs fuse module unifies file system expectations for various platfo
### Exver ### Exver
StartOS now uses Extended Versioning (Exver), which consists of three parts, separated by semicolons: (1) a Semver-compliant upstream version, (2) a Semver-compliant wrapper version, and (3) an optional "flavor" prefix. Flavors can be thought of as alternative implementations of services, where a user would only want one or the other installed, and data can feasibly be migrating beetween the two. Another common characteristic of flavors is that they satisfy the same API requirement of dependents, though this is not strictly necessary. A valid Exver looks something like this: `#knots:28.0.:1.0-beta.1`. This would translate to "the first beta release of StartOS wrapper version 1.0 of Bitcoin Knots version 27.0". StartOS now uses Extended Versioning (Exver), which consists of three parts: (1) a Semver-compliant upstream version, (2) a Semver-compliant wrapper version, and (3) an optional "flavor" prefix. Flavors can be thought of as alternative implementations of services, where a user would only want one or the other installed, and data can feasibly be migrating between the two. Another common characteristic of flavors is that they satisfy the same API requirement of dependents, though this is not strictly necessary. A valid Exver looks something like this: `#knots:28.0.:1.0-beta.1`. This would translate to "the first beta release of StartOS wrapper version 1.0 of Bitcoin Knots version 27.0".
### Clearnet ### Clearnet
It is now possible, and quite easy, to expose specific services interfaces to the public Internet on a standard domain using start-cli. This functionality will be expanded upon and moved into the StartOS UI in a future release. It is now possible, and quite easy, to expose service interfaces to the public Internet on a standard domain using start-cli. In addition to choosing which service interfaces to expose on which domains/subdomains, users have two options:
1. Open ports on their router. This option is free and easy to accomplish with most routers. The drawback is that the user's home IP address is revealed to anyone accessing the exposes resources. For example, hosting a blog in this way would reveal your home IP address, and therefor your approximate location on Earth, to your readers.
2. Use a Wireguard VPN to proxy web traffic. This option requires the user to provision a $5-$10/month remote VPS and perform a few, simple commands. The result is the successful obfuscation of the users home IP address.
The CLI-driven clearnet functionality will be expanded upon and moved into the main StartOS UI in a future release.

View File

@@ -12,7 +12,7 @@ const routes: Routes = [
}, },
{ {
path: 'login', path: 'login',
canActivate: [UnauthGuard], canActivate: [UnauthGuard, stateNot(['error', 'initializing'])],
loadChildren: () => loadChildren: () =>
import('./pages/login/login.module').then(m => m.LoginPageModule), import('./pages/login/login.module').then(m => m.LoginPageModule),
}, },

View File

@@ -1,10 +1,8 @@
import { inject, Injectable } from '@angular/core' import { inject, Injectable } from '@angular/core'
import { ErrorService } from '@start9labs/shared'
import { T } from '@start9labs/start-sdk' import { T } from '@start9labs/start-sdk'
import { import {
catchError, catchError,
defer, defer,
EMPTY,
from, from,
map, map,
Observable, Observable,
@@ -24,40 +22,46 @@ interface MappedProgress {
export class InitService extends Observable<MappedProgress> { export class InitService extends Observable<MappedProgress> {
private readonly state = inject(StateService) private readonly state = inject(StateService)
private readonly api = inject(ApiService) private readonly api = inject(ApiService)
private readonly errorService = inject(ErrorService)
private readonly progress$ = defer(() => private readonly progress$ = defer(() =>
from(this.api.initGetProgress()), from(this.api.initGetProgress()),
).pipe( ).pipe(
switchMap(({ guid, progress }) => switchMap(({ guid, progress }) =>
this.api.openWebsocket$<T.FullProgress>(guid).pipe(startWith(progress)), this.api
.openWebsocket$<T.FullProgress>(guid, {
closeObserver: {
next: () => {
this.state.syncState()
},
},
})
.pipe(startWith(progress)),
), ),
map(({ phases, overall }) => { map(({ phases, overall }) => ({
return { total: getOverallDecimal(overall),
total: getOverallDecimal(overall), message: phases
message: phases .filter(
.filter( (
( p,
p, ): p is {
): p is { name: string
name: string progress: {
progress: { done: number
done: number total: number | null
total: number | null }
} } => p.progress !== true && p.progress !== null,
} => p.progress !== true && p.progress !== null, )
) .map(p => `<b>${p.name}</b>${getPhaseBytes(p.progress)}`)
.map(p => `<b>${p.name}</b>${getPhaseBytes(p.progress)}`) .join(', '),
.join(', '), })),
}
}),
tap(({ total }) => { tap(({ total }) => {
if (total === 1) { if (total === 1) {
this.state.syncState() this.state.syncState()
} }
}), }),
catchError(e => { catchError((e, caught$) => {
console.error(e) console.error(e)
return EMPTY this.state.syncState()
return caught$
}), }),
) )

View File

@@ -35,19 +35,17 @@
> >
Downgrade Downgrade
</ion-button> </ion-button>
<ng-container *ngIf="showDevTools$ | async"> <ion-button
<ion-button *ngIf="(manifest.version | compareExver : pkg.version) === 0"
*ngIf="(manifest.version | compareExver : pkg.version) === 0" expand="block"
expand="block" color="success"
color="success" (click)="tryInstall()"
(click)="tryInstall()" >
> Reinstall
Reinstall </ion-button>
</ion-button>
</ng-container>
</ng-container> </ng-container>
</ng-container> </ng-container>
<ng-template #install> <ng-template #install>
<ion-button <ion-button
expand="block" expand="block"

View File

@@ -18,7 +18,6 @@ import {
} from '@start9labs/shared' } from '@start9labs/shared'
import { PatchDB } from 'patch-db-client' import { PatchDB } from 'patch-db-client'
import { firstValueFrom } from 'rxjs' import { firstValueFrom } from 'rxjs'
import { ClientStorageService } from 'src/app/services/client-storage.service'
import { MarketplaceService } from 'src/app/services/marketplace.service' import { MarketplaceService } from 'src/app/services/marketplace.service'
import { import {
DataModel, DataModel,
@@ -50,11 +49,8 @@ export class MarketplaceShowControlsComponent {
@Input() @Input()
conflict?: string | null conflict?: string | null
readonly showDevTools$ = this.ClientStorageService.showDevTools$
constructor( constructor(
private readonly alertCtrl: AlertController, private readonly alertCtrl: AlertController,
private readonly ClientStorageService: ClientStorageService,
@Inject(AbstractMarketplaceService) @Inject(AbstractMarketplaceService)
private readonly marketplaceService: MarketplaceService, private readonly marketplaceService: MarketplaceService,
private readonly loader: LoadingService, private readonly loader: LoadingService,

View File

@@ -1,13 +1,12 @@
import { Component } from '@angular/core' import { Component } from '@angular/core'
import { isPlatform } from '@ionic/angular' import { isPlatform } from '@ionic/angular'
import { ErrorService, LoadingService } from '@start9labs/shared' import { ErrorService, LoadingService } from '@start9labs/shared'
import { S9pk, T } from '@start9labs/start-sdk' import { S9pk } from '@start9labs/start-sdk'
import cbor from 'cbor' import cbor from 'cbor'
import { ApiService } from 'src/app/services/api/embassy-api.service' import { ApiService } from 'src/app/services/api/embassy-api.service'
import { ConfigService } from 'src/app/services/config.service' import { ConfigService } from 'src/app/services/config.service'
import { SideloadService } from './sideload.service' import { SideloadService } from './sideload.service'
import { filter, firstValueFrom } from 'rxjs' import { filter, firstValueFrom } from 'rxjs'
import mime from 'mime'
interface Positions { interface Positions {
[key: string]: [bigint, bigint] // [position, length] [key: string]: [bigint, bigint] // [position, length]
@@ -121,10 +120,8 @@ export class SideloadPage {
try { try {
const res = await this.api.sideloadPackage() const res = await this.api.sideloadPackage()
this.sideloadService.followProgress(res.progress) this.sideloadService.followProgress(res.progress)
this.api await this.api.uploadPackage(res.upload, this.toUpload.file!)
.uploadPackage(res.upload, this.toUpload.file!) await firstValueFrom(this.progress$)
.catch(e => console.error(e))
await firstValueFrom(this.progress$.pipe(filter(Boolean)))
} catch (e: any) { } catch (e: any) {
this.errorService.handleError(e) this.errorService.handleError(e)
} finally { } finally {

View File

@@ -1,6 +1,16 @@
import { Injectable } from '@angular/core' import { inject, Injectable } from '@angular/core'
import { Router } from '@angular/router'
import { ErrorService } from '@start9labs/shared'
import { T } from '@start9labs/start-sdk' import { T } from '@start9labs/start-sdk'
import { BehaviorSubject, endWith, shareReplay, Subject, switchMap } from 'rxjs' import {
catchError,
EMPTY,
endWith,
shareReplay,
Subject,
switchMap,
tap,
} from 'rxjs'
import { ApiService } from 'src/app/services/api/embassy-api.service' import { ApiService } from 'src/app/services/api/embassy-api.service'
@Injectable({ @Injectable({
@@ -8,11 +18,34 @@ import { ApiService } from 'src/app/services/api/embassy-api.service'
}) })
export class SideloadService { export class SideloadService {
private readonly guid$ = new Subject<string>() private readonly guid$ = new Subject<string>()
private readonly errorService = inject(ErrorService)
private readonly router = inject(Router)
readonly progress$ = this.guid$.pipe( readonly progress$ = this.guid$.pipe(
switchMap(guid => switchMap(guid =>
this.api.openWebsocket$<T.FullProgress>(guid).pipe(endWith(null)), this.api
.openWebsocket$<T.FullProgress>(guid, {
closeObserver: {
next: event => {
if (event.code !== 1000) {
this.errorService.handleError(event.reason)
}
},
},
})
.pipe(
tap(p => {
if (p.overall === true) {
this.router.navigate([''], { replaceUrl: true })
}
}),
endWith(null),
),
), ),
catchError(e => {
this.errorService.handleError('Websocket connection broken. Try again.')
return EMPTY
}),
shareReplay(1), shareReplay(1),
) )

View File

@@ -7,6 +7,7 @@ import {
GetPackagesRes, GetPackagesRes,
MarketplacePkg, MarketplacePkg,
} from '@start9labs/marketplace' } from '@start9labs/marketplace'
import { WebSocketSubject } from 'rxjs/webSocket'
export abstract class ApiService { export abstract class ApiService {
// http // http
@@ -30,7 +31,7 @@ export abstract class ApiService {
abstract openWebsocket$<T>( abstract openWebsocket$<T>(
guid: string, guid: string,
config?: RR.WebsocketConfig<T>, config?: RR.WebsocketConfig<T>,
): Observable<T> ): WebSocketSubject<T>
// state // state

View File

@@ -11,7 +11,7 @@ import { PATCH_CACHE } from 'src/app/services/patch-db/patch-db-source'
import { ApiService } from './embassy-api.service' import { ApiService } from './embassy-api.service'
import { RR } from './api.types' import { RR } from './api.types'
import { ConfigService } from '../config.service' import { ConfigService } from '../config.service'
import { webSocket } from 'rxjs/webSocket' import { webSocket, WebSocketSubject } from 'rxjs/webSocket'
import { Observable, filter, firstValueFrom } from 'rxjs' import { Observable, filter, firstValueFrom } from 'rxjs'
import { AuthService } from '../auth.service' import { AuthService } from '../auth.service'
import { DOCUMENT } from '@angular/common' import { DOCUMENT } from '@angular/common'
@@ -85,7 +85,7 @@ export class LiveApiService extends ApiService {
openWebsocket$<T>( openWebsocket$<T>(
guid: string, guid: string,
config: RR.WebsocketConfig<T> = {}, config: RR.WebsocketConfig<T> = {},
): Observable<T> { ): WebSocketSubject<T> {
const { location } = this.document.defaultView! const { location } = this.document.defaultView!
const protocol = location.protocol === 'http:' ? 'ws' : 'wss' const protocol = location.protocol === 'http:' ? 'ws' : 'wss'
const host = location.host const host = location.host

View File

@@ -37,6 +37,7 @@ import {
GetPackagesRes, GetPackagesRes,
MarketplacePkg, MarketplacePkg,
} from '@start9labs/marketplace' } from '@start9labs/marketplace'
import { WebSocketSubject } from 'rxjs/webSocket'
const PROGRESS: T.FullProgress = { const PROGRESS: T.FullProgress = {
overall: { overall: {
@@ -107,11 +108,11 @@ export class MockApiService extends ApiService {
openWebsocket$<T>( openWebsocket$<T>(
guid: string, guid: string,
config: RR.WebsocketConfig<T> = {}, config: RR.WebsocketConfig<T> = {},
): Observable<T> { ): WebSocketSubject<T> {
if (guid === 'db-guid') { if (guid === 'db-guid') {
return this.mockWsSource$.pipe<any>( return this.mockWsSource$.pipe<any>(
shareReplay({ bufferSize: 1, refCount: true }), shareReplay({ bufferSize: 1, refCount: true }),
) ) as WebSocketSubject<T>
} else if (guid === 'logs-guid') { } else if (guid === 'logs-guid') {
return interval(50).pipe<any>( return interval(50).pipe<any>(
map((_, index) => { map((_, index) => {
@@ -120,16 +121,16 @@ export class MockApiService extends ApiService {
if (index === 100) throw new Error('HAAHHA') if (index === 100) throw new Error('HAAHHA')
return Mock.ServerLogs[0] return Mock.ServerLogs[0]
}), }),
) ) as WebSocketSubject<T>
} else if (guid === 'init-progress-guid') { } else if (guid === 'init-progress-guid') {
return from(this.initProgress()).pipe( return from(this.initProgress()).pipe(
startWith(PROGRESS), startWith(PROGRESS),
) as Observable<T> ) as WebSocketSubject<T>
} else if (guid === 'sideload-progress-guid') { } else if (guid === 'sideload-progress-guid') {
config.openObserver?.next(new Event('')) config.openObserver?.next(new Event(''))
return from(this.initProgress()).pipe( return from(this.initProgress()).pipe(
startWith(PROGRESS), startWith(PROGRESS),
) as Observable<T> ) as WebSocketSubject<T>
} else { } else {
throw new Error('invalid guid type') throw new Error('invalid guid type')
} }

View File

@@ -27,9 +27,9 @@ export class AuthService {
) {} ) {}
init(): void { init(): void {
const loggedIn = this.storage.get(this.LOGGED_IN_KEY) const loggedIn = this.storage.get<boolean>(this.LOGGED_IN_KEY)
if (loggedIn) { if (loggedIn) {
this.setVerified() this.authState$.next(AuthState.VERIFIED)
} else { } else {
this.setUnverified() this.setUnverified()
} }