remove analyticsd, clean up script

This commit is contained in:
Aiden McClelland
2024-06-24 16:15:32 -06:00
parent b0c0cd7fda
commit 00f7fa507b
48 changed files with 11 additions and 1211 deletions

View File

@@ -1,121 +0,0 @@
use std::net::{Ipv4Addr, SocketAddr};
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use clap::Parser;
use reqwest::{Client, Proxy};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{call_remote_http, CallRemote, Context, Empty};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio::sync::broadcast::Sender;
use tracing::instrument;
use url::Url;
use crate::context::config::{ContextConfig, CONFIG_PATH};
use crate::context::RpcContext;
use crate::prelude::*;
use crate::rpc_continuations::RpcContinuations;
#[derive(Debug, Clone, Default, Deserialize, Serialize, Parser)]
#[serde(rename_all = "kebab-case")]
#[command(rename_all = "kebab-case")]
pub struct AnalyticsConfig {
#[arg(short = 'c', long = "config")]
pub config: Option<PathBuf>,
#[arg(short = 'l', long = "listen")]
pub listen: Option<SocketAddr>,
#[arg(short = 'p', long = "proxy")]
pub tor_proxy: Option<Url>,
#[arg(short = 'd', long = "dbconnect")]
pub dbconnect: Option<Url>,
}
impl ContextConfig for AnalyticsConfig {
fn next(&mut self) -> Option<PathBuf> {
self.config.take()
}
fn merge_with(&mut self, other: Self) {
self.listen = self.listen.take().or(other.listen);
self.tor_proxy = self.tor_proxy.take().or(other.tor_proxy);
self.dbconnect = self.dbconnect.take().or(other.dbconnect);
}
}
impl AnalyticsConfig {
pub fn load(mut self) -> Result<Self, Error> {
let path = self.next();
self.load_path_rec(path)?;
self.load_path_rec(Some(CONFIG_PATH))?;
Ok(self)
}
}
pub struct AnalyticsContextSeed {
pub listen: SocketAddr,
pub db: PgPool,
pub rpc_continuations: RpcContinuations,
pub client: Client,
pub shutdown: Sender<()>,
}
#[derive(Clone)]
pub struct AnalyticsContext(Arc<AnalyticsContextSeed>);
impl AnalyticsContext {
#[instrument(skip_all)]
pub async fn init(config: &AnalyticsConfig) -> Result<Self, Error> {
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let dbconnect = config
.dbconnect
.clone()
.unwrap_or_else(|| "postgres://localhost/analytics".parse().unwrap())
.to_owned();
let db = PgPool::connect(dbconnect.as_str()).await?;
let tor_proxy_url = config
.tor_proxy
.clone()
.map(Ok)
.unwrap_or_else(|| "socks5h://localhost:9050".parse())?;
Ok(Self(Arc::new(AnalyticsContextSeed {
listen: config
.listen
.unwrap_or(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 5959)),
db,
rpc_continuations: RpcContinuations::new(),
client: Client::builder()
.proxy(Proxy::custom(move |url| {
if url.host_str().map_or(false, |h| h.ends_with(".onion")) {
Some(tor_proxy_url.clone())
} else {
None
}
}))
.build()
.with_kind(crate::ErrorKind::ParseUrl)?,
shutdown,
})))
}
}
impl AsRef<RpcContinuations> for AnalyticsContext {
fn as_ref(&self) -> &RpcContinuations {
&self.rpc_continuations
}
}
impl Context for AnalyticsContext {}
impl Deref for AnalyticsContext {
type Target = AnalyticsContextSeed;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
impl CallRemote<AnalyticsContext> for RpcContext {
async fn call_remote(&self, method: &str, params: Value, _: Empty) -> Result<Value, RpcError> {
if let Some(analytics_url) = self.analytics_url.clone() {
call_remote_http(&self.client, analytics_url, method, params).await
} else {
Ok(Value::Null)
}
}
}

View File

@@ -1,78 +0,0 @@
use std::net::SocketAddr;
use axum::Router;
use futures::future::ready;
use rpc_toolkit::{Context, ParentHandler, Server};
use crate::analytics::context::AnalyticsContext;
use crate::middleware::cors::Cors;
use crate::net::static_server::{bad_request, not_found, server_error};
use crate::net::web_server::WebServer;
use crate::rpc_continuations::Guid;
pub mod context;
pub fn analytics_api<C: Context>() -> ParentHandler<C> {
ParentHandler::new()
}
pub fn analytics_server_router(ctx: AnalyticsContext) -> Router {
use axum::extract as x;
use axum::routing::{any, get, post};
Router::new()
.route("/rpc/*path", {
let ctx = ctx.clone();
post(
Server::new(move || ready(Ok(ctx.clone())), analytics_api())
.middleware(Cors::new())
)
})
.route(
"/ws/rpc/*path",
get({
let ctx = ctx.clone();
move |x::Path(path): x::Path<String>,
ws: axum::extract::ws::WebSocketUpgrade| async move {
match Guid::from(&path) {
None => {
tracing::debug!("No Guid Path");
bad_request()
}
Some(guid) => match ctx.rpc_continuations.get_ws_handler(&guid).await {
Some(cont) => ws.on_upgrade(cont),
_ => not_found(),
},
}
}
}),
)
.route(
"/rest/rpc/*path",
any({
let ctx = ctx.clone();
move |request: x::Request| async move {
let path = request
.uri()
.path()
.strip_prefix("/rest/rpc/")
.unwrap_or_default();
match Guid::from(&path) {
None => {
tracing::debug!("No Guid Path");
bad_request()
}
Some(guid) => match ctx.rpc_continuations.get_rest_handler(&guid).await {
None => not_found(),
Some(cont) => cont(request).await.unwrap_or_else(server_error),
},
}
}
}),
)
}
impl WebServer {
pub fn analytics(bind: SocketAddr, ctx: AnalyticsContext) -> Self {
Self::new(bind, analytics_server_router(ctx))
}
}

View File

@@ -1,86 +0,0 @@
use std::ffi::OsString;
use clap::Parser;
use futures::FutureExt;
use tokio::signal::unix::signal;
use tracing::instrument;
use crate::analytics::context::{AnalyticsConfig, AnalyticsContext};
use crate::net::web_server::WebServer;
use crate::prelude::*;
use crate::util::logger::EmbassyLogger;
#[instrument(skip_all)]
async fn inner_main(config: &AnalyticsConfig) -> Result<(), Error> {
let server = async {
let ctx = AnalyticsContext::init(config).await?;
let server = WebServer::analytics(ctx.listen, ctx.clone());
let mut shutdown_recv = ctx.shutdown.subscribe();
let sig_handler_ctx = ctx;
let sig_handler = tokio::spawn(async move {
use tokio::signal::unix::SignalKind;
futures::future::select_all(
[
SignalKind::interrupt(),
SignalKind::quit(),
SignalKind::terminate(),
]
.iter()
.map(|s| {
async move {
signal(*s)
.unwrap_or_else(|_| panic!("register {:?} handler", s))
.recv()
.await
}
.boxed()
}),
)
.await;
sig_handler_ctx
.shutdown
.send(())
.map_err(|_| ())
.expect("send shutdown signal");
});
shutdown_recv
.recv()
.await
.with_kind(crate::ErrorKind::Unknown)?;
sig_handler.abort();
Ok::<_, Error>(server)
}
.await?;
server.shutdown().await;
Ok(())
}
pub fn main(args: impl IntoIterator<Item = OsString>) {
EmbassyLogger::init();
let config = AnalyticsConfig::parse_from(args).load().unwrap();
let res = {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("failed to initialize runtime");
rt.block_on(inner_main(&config))
};
match res {
Ok(()) => (),
Err(e) => {
eprintln!("{}", e.source);
tracing::debug!("{:?}", e.source);
drop(e.source);
std::process::exit(e.kind as i32)
}
}
}

View File

@@ -2,8 +2,6 @@ use std::collections::VecDeque;
use std::ffi::OsString;
use std::path::Path;
#[cfg(feature = "analytics")]
pub mod analytics;
#[cfg(feature = "container-runtime")]
pub mod container_cli;
pub mod deprecated;
@@ -26,8 +24,6 @@ fn select_executable(name: &str) -> Option<fn(VecDeque<OsString>)> {
"startd" => Some(startd::main),
#[cfg(feature = "registry")]
"registry" => Some(registry::main),
#[cfg(feature = "analyticsd")]
"analyticsd" => Some(analyticsd::main),
"embassy-cli" => Some(|_| deprecated::renamed("embassy-cli", "start-cli")),
"embassy-sdk" => Some(|_| deprecated::renamed("embassy-sdk", "start-sdk")),
"embassyd" => Some(|_| deprecated::renamed("embassyd", "startd")),

View File

@@ -113,8 +113,6 @@ pub struct ServerConfig {
pub datadir: Option<PathBuf>,
#[arg(long = "disable-encryption")]
pub disable_encryption: Option<bool>,
#[arg(short = 'a', long = "analytics-url")]
pub analytics_url: Option<Url>,
}
impl ContextConfig for ServerConfig {
fn next(&mut self) -> Option<PathBuf> {
@@ -133,7 +131,6 @@ impl ContextConfig for ServerConfig {
.or(other.revision_cache_size);
self.datadir = self.datadir.take().or(other.datadir);
self.disable_encryption = self.disable_encryption.take().or(other.disable_encryption);
self.analytics_url = self.analytics_url.take().or(other.analytics_url);
}
}

View File

@@ -58,7 +58,6 @@ pub struct RpcContextSeed {
pub start_time: Instant,
#[cfg(feature = "dev")]
pub dev: Dev,
pub analytics_url: Option<Url>,
}
pub struct Dev {
@@ -191,7 +190,6 @@ impl RpcContext {
dev: Dev {
lxc: Mutex::new(BTreeMap::new()),
},
analytics_url: config.analytics_url.clone(),
});
let res = Self(seed.clone());

View File

@@ -24,7 +24,6 @@ lazy_static::lazy_static! {
pub mod account;
pub mod action;
pub mod analytics;
pub mod auth;
pub mod backup;
pub mod bins;

View File

@@ -281,10 +281,6 @@ pub async fn execute<C: Context>(
IoFormat::Yaml.to_vec(&ServerConfig {
os_partitions: Some(part_info.clone()),
ethernet_interface: Some(eth_iface),
#[cfg(feature = "dev")]
analytics_url: None,
#[cfg(not(feature = "dev"))]
analytics_url: Some("https://analytics.start9.com".parse()?), // TODO: FullMetal
..Default::default()
})?,
)

View File

@@ -1,6 +1,6 @@
#!/bin/bash
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
cd "$(dirname "${BASH_SOURCE[0]}")"
TMP_DIR=$(mktemp -d)
mkdir $TMP_DIR/pgdata
docker run -d --rm --name=tmp_postgres -e POSTGRES_PASSWORD=password -v $TMP_DIR/pgdata:/var/lib/postgresql/data postgres
@@ -8,17 +8,15 @@ docker run -d --rm --name=tmp_postgres -e POSTGRES_PASSWORD=password -v $TMP_DIR
(
set -e
ctr=0
until docker exec tmp_postgres psql -U postgres || [ $ctr -ge 5 ]; do
until docker exec tmp_postgres psql -U postgres 2> /dev/null || [ $ctr -ge 5 ]; do
ctr=$[ctr + 1]
sleep 5;
done
PG_IP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' tmp_postgres)
SCHEMA_DUMP="registry_schema.sql"
DATABASE_URL=postgres://postgres:password@$PG_IP/postgres
psql $DATABASE_URL -f "$SCRIPT_DIR/$SCHEMA_DUMP"
cat "./registry_schema.sql" | docker exec -i tmp_postgres psql -U postgres -d postgres -f-
cd ../../..
DATABASE_URL=postgres://postgres:password@$PG_IP/postgres PLATFORM=$(uname -m) cargo sqlx prepare -- --lib --profile=test --workspace
echo "Subscript Complete"
)