Files
start-os/core/src/registry/context.rs
Aiden McClelland 3ae24e63e2 perf: add O_DIRECT uploads and stabilize RPC continuation shutdown
Implements DirectIoFile for faster package uploads by bypassing page cache.
Refactors RpcContinuations to support graceful WebSocket shutdown via
broadcast signal, improving stability during daemon restart.
2026-03-16 13:40:13 -06:00

373 lines
12 KiB
Rust

use std::net::{Ipv4Addr, SocketAddr};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use chrono::Utc;
use clap::Parser;
use cookie::{Cookie, Expiration, SameSite};
use http::HeaderMap;
use imbl::OrdMap;
use imbl_value::InternedString;
use patch_db::PatchDb;
use patch_db::json_ptr::ROOT;
use reqwest::{Client, Proxy};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{CallRemote, Context, Empty, RpcRequest};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio::sync::broadcast::Sender;
use tracing::instrument;
use ts_rs::TS;
use url::Url;
use crate::context::config::{CONFIG_PATH, ContextConfig};
use crate::context::{CliContext, RpcContext};
use crate::middleware::auth::DbContext;
use crate::middleware::auth::local::LocalAuthContext;
use crate::middleware::auth::signature::SignatureAuthContext;
use crate::prelude::*;
use crate::registry::RegistryDatabase;
use crate::registry::device_info::{DEVICE_INFO_HEADER, DeviceInfo};
use crate::registry::migrations::run_migrations;
use crate::registry::signer::SignerInfo;
use crate::rpc_continuations::RpcContinuations;
use crate::sign::AnyVerifyingKey;
use crate::util::io::{append_file, read_file_to_string};
const DEFAULT_REGISTRY_LISTEN: SocketAddr =
SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), 5959);
#[derive(Debug, Clone, Default, Deserialize, Serialize, Parser)]
#[serde(rename_all = "kebab-case")]
#[command(rename_all = "kebab-case")]
pub struct RegistryConfig {
#[arg(short = 'c', long = "config", help = "help.arg.config-file-path")]
pub config: Option<PathBuf>,
#[arg(
short = 'l',
long = "listen",
help = "help.arg.registry-listen-address"
)]
pub registry_listen: Option<SocketAddr>,
#[arg(short = 'H', long = "hostname", help = "help.arg.registry-hostname")]
pub registry_hostname: Vec<InternedString>,
#[arg(short = 'p', long = "tor-proxy", help = "help.arg.tor-proxy-url")]
pub tor_proxy: Option<Url>,
#[arg(short = 'd', long = "datadir", help = "help.arg.data-directory")]
pub datadir: Option<PathBuf>,
#[arg(
short = 'u',
long = "pg-connection-url",
help = "help.arg.postgres-connection-url"
)]
pub pg_connection_url: Option<String>,
}
impl ContextConfig for RegistryConfig {
fn next(&mut self) -> Option<PathBuf> {
self.config.take()
}
fn merge_with(&mut self, mut other: Self) {
self.registry_listen = self.registry_listen.take().or(other.registry_listen);
self.registry_hostname.append(&mut other.registry_hostname);
self.tor_proxy = self.tor_proxy.take().or(other.tor_proxy);
self.datadir = self.datadir.take().or(other.datadir);
}
}
impl RegistryConfig {
pub fn load(mut self) -> Result<Self, Error> {
let path = self.next();
self.load_path_rec(path)?;
self.load_path_rec(Some(CONFIG_PATH))?;
Ok(self)
}
}
pub struct RegistryContextSeed {
pub hostnames: Vec<InternedString>,
pub listen: SocketAddr,
pub db: TypedPatchDb<RegistryDatabase>,
pub datadir: PathBuf,
pub rpc_continuations: RpcContinuations,
pub client: Client,
pub shutdown: Sender<()>,
pub pool: Option<PgPool>,
}
#[derive(Clone)]
pub struct RegistryContext(Arc<RegistryContextSeed>);
impl RegistryContext {
#[instrument(skip_all)]
pub async fn init(config: &RegistryConfig) -> Result<Self, Error> {
let (shutdown, _) = tokio::sync::broadcast::channel(1);
let datadir = config
.datadir
.as_deref()
.unwrap_or_else(|| Path::new("/var/lib/startos"))
.to_owned();
if tokio::fs::metadata(&datadir).await.is_err() {
tokio::fs::create_dir_all(&datadir).await?;
}
let db_path = datadir.join("registry.db");
let db = TypedPatchDb::<RegistryDatabase>::load_unchecked(PatchDb::open(&db_path).await?);
if db.dump(&ROOT).await.value.is_null() {
db.put(&ROOT, &RegistryDatabase::init()).await?;
}
db.mutate(|db| run_migrations(db)).await.result?;
Self::init_auth_cookie().await?;
let tor_proxy_url = config
.tor_proxy
.clone()
.map(Ok)
.unwrap_or_else(|| "socks5h://localhost:9050".parse())?;
let pool: Option<PgPool> = match &config.pg_connection_url {
Some(url) => match PgPool::connect(url.as_str()).await {
Ok(pool) => Some(pool),
Err(_) => None,
},
None => None,
};
if config.registry_hostname.is_empty() {
return Err(Error::new(
eyre!("{}", t!("registry.context.missing-hostname")),
ErrorKind::NotFound,
));
}
Ok(Self(Arc::new(RegistryContextSeed {
hostnames: config.registry_hostname.clone(),
listen: config.registry_listen.unwrap_or(DEFAULT_REGISTRY_LISTEN),
db,
datadir,
rpc_continuations: RpcContinuations::new(None),
client: Client::builder()
.proxy(Proxy::custom(move |url| {
if url.host_str().map_or(false, |h| h.ends_with(".onion")) {
Some(tor_proxy_url.clone())
} else {
None
}
}))
.build()
.with_kind(crate::ErrorKind::ParseUrl)?,
shutdown,
pool,
})))
}
}
impl AsRef<RpcContinuations> for RegistryContext {
fn as_ref(&self) -> &RpcContinuations {
&self.rpc_continuations
}
}
impl Context for RegistryContext {}
impl Deref for RegistryContext {
type Target = RegistryContextSeed;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
#[derive(Debug, Deserialize, Serialize, Parser)]
pub struct RegistryUrlParams {
#[arg(help = "help.arg.registry-url")]
pub registry: Url,
}
impl CallRemote<RegistryContext> for CliContext {
async fn call_remote(
&self,
mut method: &str,
_: OrdMap<&'static str, Value>,
params: Value,
_: Empty,
) -> Result<Value, RpcError> {
let cookie = read_file_to_string(RegistryContext::LOCAL_AUTH_COOKIE_PATH).await;
let url = if let Some(url) = self.registry_url.clone() {
url
} else if cookie.is_ok() || !self.registry_hostname.is_empty() {
let mut url: Url = format!(
"http://{}",
self.registry_listen.unwrap_or(DEFAULT_REGISTRY_LISTEN)
)
.parse()
.map_err(Error::from)?;
url.path_segments_mut()
.map_err(|_| eyre!("Url cannot be base"))
.with_kind(crate::ErrorKind::ParseUrl)?
.push("rpc")
.push("v0");
url
} else {
return Err(Error::new(
eyre!("{}", t!("registry.context.registry-required")),
ErrorKind::InvalidRequest,
)
.into());
};
if let Ok(local) = cookie {
let cookie_url = match url.host() {
Some(url::Host::Ipv4(ip)) if ip.is_loopback() => url.clone(),
Some(url::Host::Ipv6(ip)) if ip.is_loopback() => url.clone(),
_ => format!("http://{DEFAULT_REGISTRY_LISTEN}").parse()?,
};
self.cookie_store
.lock()
.unwrap()
.insert_raw(
&Cookie::build(("local", local))
.domain(cookie_url.host_str().unwrap_or("localhost"))
.expires(Expiration::Session)
.same_site(SameSite::Strict)
.build(),
&cookie_url,
)
.with_kind(crate::ErrorKind::Network)?;
}
method = method.strip_prefix("registry.").unwrap_or(method);
let sig_context = self
.registry_hostname
.get(0)
.cloned()
.or_else(|| url.host().as_ref().map(InternedString::from_display));
crate::middleware::auth::signature::call_remote(
self,
url,
HeaderMap::new(),
sig_context.as_deref(),
method,
params,
)
.await
}
}
impl CallRemote<RegistryContext, RegistryUrlParams> for RpcContext {
async fn call_remote(
&self,
mut method: &str,
metadata: OrdMap<&'static str, Value>,
params: Value,
RegistryUrlParams { mut registry }: RegistryUrlParams,
) -> Result<Value, RpcError> {
let mut headers = HeaderMap::new();
let mut device_info = None;
if metadata
.get("get_device_info")
.and_then(|m| m.as_bool())
.unwrap_or(false)
{
let di = DeviceInfo::load(self).await?;
headers.insert(DEVICE_INFO_HEADER, di.to_header_value());
device_info = Some(di);
}
registry
.path_segments_mut()
.map_err(|_| Error::new(eyre!("cannot extend URL path"), ErrorKind::ParseUrl))?
.push("rpc")
.push("v0");
method = method.strip_prefix("registry.").unwrap_or(method);
let sig_context = registry.host_str().map(InternedString::from);
let mut res = crate::middleware::auth::signature::call_remote(
self,
registry,
headers,
sig_context.as_deref(),
method,
params.clone(),
)
.await?;
if let Some(device_info) = device_info {
device_info.filter_for_hardware(method, params, &mut res)?;
}
Ok(res)
}
}
#[derive(Deserialize)]
pub struct RegistryAuthMetadata {
#[serde(default)]
admin: bool,
}
#[derive(Serialize, Deserialize, TS)]
pub struct AdminLogRecord {
pub timestamp: String,
pub name: String,
#[ts(type = "{ id: string | number | null; method: string; params: any }")]
pub request: RpcRequest,
pub key: AnyVerifyingKey,
}
impl DbContext for RegistryContext {
type Database = RegistryDatabase;
fn db(&self) -> &TypedPatchDb<Self::Database> {
&self.db
}
}
impl LocalAuthContext for RegistryContext {
const LOCAL_AUTH_COOKIE_PATH: &str = "/run/startos/registry.authcookie";
const LOCAL_AUTH_COOKIE_OWNERSHIP: &str = "root:root";
}
impl SignatureAuthContext for RegistryContext {
type AdditionalMetadata = RegistryAuthMetadata;
type CheckPubkeyRes = Option<(AnyVerifyingKey, SignerInfo)>;
async fn sig_context(
&self,
) -> impl IntoIterator<Item = Result<impl AsRef<str> + Send, Error>> + Send {
self.hostnames.iter().map(Ok)
}
fn check_pubkey(
db: &Model<Self::Database>,
pubkey: Option<&AnyVerifyingKey>,
metadata: Self::AdditionalMetadata,
) -> Result<Self::CheckPubkeyRes, Error> {
if let Some(pubkey) = pubkey {
let (guid, admin) = db.as_index().as_signers().get_signer_info(pubkey)?;
if !metadata.admin || db.as_admins().de()?.contains(&guid) {
return Ok(Some((pubkey.clone(), admin)));
}
}
Err(Error::new(
eyre!("{}", t!("registry.context.unauthorized")),
ErrorKind::Authorization,
))
}
async fn post_auth_hook(
&self,
check_pubkey_res: Self::CheckPubkeyRes,
request: &RpcRequest,
) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
if let Some((pubkey, admin)) = check_pubkey_res {
let mut log = append_file(self.datadir.join("admin.log")).await?;
log.write_all(
(serde_json::to_string(&AdminLogRecord {
timestamp: Utc::now().to_rfc3339(),
name: admin.name,
request: request.clone(),
key: pubkey,
})
.with_kind(ErrorKind::Serialization)?
+ "\n")
.as_bytes(),
)
.await?;
}
Ok(())
}
}