Merge branch 'next/major' of github.com:Start9Labs/start-os into rebase/feat/domains

This commit is contained in:
Matt Hill
2023-11-13 16:14:31 -07:00
286 changed files with 3765 additions and 1718 deletions

20
core/helpers/Cargo.toml Normal file
View File

@@ -0,0 +1,20 @@
[package]
name = "helpers"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.64"
color-eyre = "0.6.2"
futures = "0.3.28"
lazy_async_pool = "0.3.3"
models = { path = "../models" }
pin-project = "1.1.3"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["io-util", "sync"] }
tracing = "0.1.39"
yajrc = { version = "*", git = "https://github.com/dr-bonez/yajrc.git", branch = "develop" }

View File

@@ -0,0 +1,31 @@
use std::task::Poll;
use tokio::io::{AsyncRead, ReadBuf};
#[pin_project::pin_project]
pub struct ByteReplacementReader<R> {
pub replace: u8,
pub with: u8,
#[pin]
pub inner: R,
}
impl<R: AsyncRead> AsyncRead for ByteReplacementReader<R> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
match this.inner.poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
for idx in 0..buf.filled().len() {
if buf.filled()[idx] == *this.replace {
buf.filled_mut()[idx] = *this.with;
}
}
Poll::Ready(Ok(()))
}
a => a,
}
}
}

264
core/helpers/src/lib.rs Normal file
View File

@@ -0,0 +1,264 @@
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::time::Duration;
use color_eyre::eyre::{eyre, Context, Error};
use futures::future::BoxFuture;
use futures::FutureExt;
use tokio::fs::File;
use tokio::sync::oneshot;
use tokio::task::{JoinError, JoinHandle, LocalSet};
mod byte_replacement_reader;
mod os_api;
mod rpc_client;
mod rsync;
mod script_dir;
pub use byte_replacement_reader::*;
pub use os_api::*;
pub use rpc_client::{RpcClient, UnixRpcClient};
pub use rsync::*;
pub use script_dir::*;
pub fn const_true() -> bool {
true
}
pub fn to_tmp_path(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
let path = path.as_ref();
if let (Some(parent), Some(file_name)) =
(path.parent(), path.file_name().and_then(|f| f.to_str()))
{
Ok(parent.join(format!(".{}.tmp", file_name)))
} else {
Err(eyre!("invalid path: {}", path.display()))
}
}
pub async fn canonicalize(
path: impl AsRef<Path> + Send + Sync,
create_parent: bool,
) -> Result<PathBuf, Error> {
fn create_canonical_folder<'a>(
path: impl AsRef<Path> + Send + Sync + 'a,
) -> BoxFuture<'a, Result<PathBuf, Error>> {
async move {
let path = canonicalize(path, true).await?;
tokio::fs::create_dir(&path)
.await
.with_context(|| path.display().to_string())?;
Ok(path)
}
.boxed()
}
let path = path.as_ref();
if tokio::fs::metadata(path).await.is_err() {
if let (Some(parent), Some(file_name)) = (path.parent(), path.file_name()) {
if create_parent && tokio::fs::metadata(parent).await.is_err() {
return Ok(create_canonical_folder(parent).await?.join(file_name));
} else {
return Ok(tokio::fs::canonicalize(parent)
.await
.with_context(|| parent.display().to_string())?
.join(file_name));
}
}
}
tokio::fs::canonicalize(&path)
.await
.with_context(|| path.display().to_string())
}
#[pin_project::pin_project(PinnedDrop)]
pub struct NonDetachingJoinHandle<T>(#[pin] JoinHandle<T>);
impl<T> NonDetachingJoinHandle<T> {
pub async fn wait_for_abort(self) -> Result<T, JoinError> {
self.abort();
self.await
}
}
impl<T> From<JoinHandle<T>> for NonDetachingJoinHandle<T> {
fn from(t: JoinHandle<T>) -> Self {
NonDetachingJoinHandle(t)
}
}
impl<T> Deref for NonDetachingJoinHandle<T> {
type Target = JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for NonDetachingJoinHandle<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[pin_project::pinned_drop]
impl<T> PinnedDrop for NonDetachingJoinHandle<T> {
fn drop(self: std::pin::Pin<&mut Self>) {
let this = self.project();
this.0.into_ref().get_ref().abort()
}
}
impl<T> Future for NonDetachingJoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.0.poll(cx)
}
}
pub struct AtomicFile {
tmp_path: PathBuf,
path: PathBuf,
file: Option<File>,
}
impl AtomicFile {
pub async fn new(
path: impl AsRef<Path> + Send + Sync,
tmp_path: Option<impl AsRef<Path> + Send + Sync>,
) -> Result<Self, Error> {
let path = canonicalize(&path, true).await?;
let tmp_path = if let Some(tmp_path) = tmp_path {
canonicalize(&tmp_path, true).await?
} else {
to_tmp_path(&path)?
};
let file = File::create(&tmp_path)
.await
.with_context(|| tmp_path.display().to_string())?;
Ok(Self {
tmp_path,
path,
file: Some(file),
})
}
pub async fn rollback(mut self) -> Result<(), Error> {
drop(self.file.take());
tokio::fs::remove_file(&self.tmp_path)
.await
.with_context(|| format!("rm {}", self.tmp_path.display()))?;
Ok(())
}
pub async fn save(mut self) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
if let Some(file) = self.file.as_mut() {
file.flush().await?;
file.shutdown().await?;
file.sync_all().await?;
}
drop(self.file.take());
tokio::fs::rename(&self.tmp_path, &self.path)
.await
.with_context(|| {
format!("mv {} -> {}", self.tmp_path.display(), self.path.display())
})?;
Ok(())
}
}
impl std::ops::Deref for AtomicFile {
type Target = File;
fn deref(&self) -> &Self::Target {
self.file.as_ref().unwrap()
}
}
impl std::ops::DerefMut for AtomicFile {
fn deref_mut(&mut self) -> &mut Self::Target {
self.file.as_mut().unwrap()
}
}
impl Drop for AtomicFile {
fn drop(&mut self) {
if let Some(file) = self.file.take() {
drop(file);
let path = std::mem::take(&mut self.tmp_path);
tokio::spawn(async move { tokio::fs::remove_file(path).await.unwrap() });
}
}
}
pub struct TimedResource<T: 'static + Send> {
handle: NonDetachingJoinHandle<Option<T>>,
ready: oneshot::Sender<()>,
}
impl<T: 'static + Send> TimedResource<T> {
pub fn new(resource: T, timer: Duration) -> Self {
let (send, recv) = oneshot::channel();
let handle = tokio::spawn(async move {
tokio::select! {
_ = tokio::time::sleep(timer) => {
drop(resource);
None
},
_ = recv => Some(resource),
}
});
Self {
handle: handle.into(),
ready: send,
}
}
pub fn new_with_destructor<
Fn: FnOnce(T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send,
>(
resource: T,
timer: Duration,
destructor: Fn,
) -> Self {
let (send, recv) = oneshot::channel();
let handle = tokio::spawn(async move {
tokio::select! {
_ = tokio::time::sleep(timer) => {
destructor(resource).await;
None
},
_ = recv => Some(resource),
}
});
Self {
handle: handle.into(),
ready: send,
}
}
pub async fn get(self) -> Option<T> {
let _ = self.ready.send(());
self.handle.await.unwrap()
}
pub fn is_timed_out(&self) -> bool {
self.ready.is_closed()
}
}
pub async fn spawn_local<
T: 'static + Send,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = T> + 'static,
>(
fut: F,
) -> NonDetachingJoinHandle<T> {
let (send, recv) = tokio::sync::oneshot::channel();
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
let set = LocalSet::new();
send.send(set.spawn_local(fut()).into())
.unwrap_or_else(|_| unreachable!());
set.await
})
});
recv.await.unwrap()
}

View File

@@ -0,0 +1,82 @@
use std::sync::Arc;
use color_eyre::Report;
use models::InterfaceId;
use models::PackageId;
use serde_json::Value;
use tokio::sync::mpsc;
pub struct RuntimeDropped;
pub struct Callback {
id: Arc<String>,
sender: mpsc::UnboundedSender<(Arc<String>, Vec<Value>)>,
}
impl Callback {
pub fn new(id: String, sender: mpsc::UnboundedSender<(Arc<String>, Vec<Value>)>) -> Self {
Self {
id: Arc::new(id),
sender,
}
}
pub fn is_listening(&self) -> bool {
self.sender.is_closed()
}
pub fn call(&self, args: Vec<Value>) -> Result<(), RuntimeDropped> {
self.sender
.send((self.id.clone(), args))
.map_err(|_| RuntimeDropped)
}
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct AddressSchemaOnion {
pub id: InterfaceId,
pub external_port: u16,
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct AddressSchemaLocal {
pub id: InterfaceId,
pub external_port: u16,
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Address(pub String);
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Domain;
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Name;
#[async_trait::async_trait]
#[allow(unused_variables)]
pub trait OsApi: Send + Sync + 'static {
async fn get_service_config(
&self,
id: PackageId,
path: &str,
callback: Option<Callback>,
) -> Result<Vec<Value>, Report>;
async fn bind_local(
&self,
internal_port: u16,
address_schema: AddressSchemaLocal,
) -> Result<Address, Report>;
async fn bind_onion(
&self,
internal_port: u16,
address_schema: AddressSchemaOnion,
) -> Result<Address, Report>;
async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report>;
async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report>;
fn set_started(&self) -> Result<(), Report>;
async fn restart(&self) -> Result<(), Report>;
async fn start(&self) -> Result<(), Report>;
async fn stop(&self) -> Result<(), Report>;
}

View File

@@ -0,0 +1,192 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Weak};
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use lazy_async_pool::Pool;
use models::{Error, ErrorKind, ResultExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio::runtime::Handle;
use tokio::sync::{oneshot, Mutex};
use yajrc::{Id, RpcError, RpcMethod, RpcRequest, RpcResponse};
use crate::NonDetachingJoinHandle;
type DynWrite = Box<dyn AsyncWrite + Unpin + Send + Sync + 'static>;
type ResponseMap = BTreeMap<Id, oneshot::Sender<Result<Value, RpcError>>>;
const MAX_TRIES: u64 = 3;
pub struct RpcClient {
id: Arc<AtomicUsize>,
_handler: NonDetachingJoinHandle<()>,
writer: DynWrite,
responses: Weak<Mutex<ResponseMap>>,
}
impl RpcClient {
pub fn new<
W: AsyncWrite + Unpin + Send + Sync + 'static,
R: AsyncRead + Unpin + Send + Sync + 'static,
>(
writer: W,
reader: R,
id: Arc<AtomicUsize>,
) -> Self {
let writer: DynWrite = Box::new(writer);
let responses = Arc::new(Mutex::new(ResponseMap::new()));
let weak_responses = Arc::downgrade(&responses);
RpcClient {
id,
_handler: tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
while let Some(line) = lines.next_line().await.transpose() {
match line.map_err(Error::from).and_then(|l| {
serde_json::from_str::<RpcResponse>(&l)
.with_kind(ErrorKind::Deserialization)
}) {
Ok(l) => {
if let Some(id) = l.id {
if let Some(res) = responses.lock().await.remove(&id) {
if let Err(e) = res.send(l.result) {
tracing::warn!(
"RpcClient Response for Unknown ID: {:?}",
e
);
}
} else {
tracing::warn!(
"RpcClient Response for Unknown ID: {:?}",
l.result
);
}
} else {
tracing::info!("RpcClient Notification: {:?}", l);
}
}
Err(e) => {
tracing::error!("RpcClient Error: {}", e);
tracing::debug!("{:?}", e);
}
}
}
})
.into(),
writer,
responses: weak_responses,
}
}
pub async fn request<T: RpcMethod>(
&mut self,
method: T,
params: T::Params,
) -> Result<T::Response, RpcError>
where
T: Serialize,
T::Params: Serialize,
T::Response: for<'de> Deserialize<'de>,
{
let id = Id::Number(
self.id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
.into(),
);
let request = RpcRequest {
id: Some(id.clone()),
method,
params,
};
if let Some(w) = self.responses.upgrade() {
let (send, recv) = oneshot::channel();
w.lock().await.insert(id.clone(), send);
self.writer
.write_all((serde_json::to_string(&request)? + "\n").as_bytes())
.await
.map_err(|e| {
let mut err = yajrc::INTERNAL_ERROR.clone();
err.data = Some(json!(e.to_string()));
err
})?;
match recv.await {
Ok(val) => {
return Ok(serde_json::from_value(val?)?);
}
Err(_err) => {
tokio::task::yield_now().await;
}
}
}
tracing::debug!(
"Client has finished {:?}",
futures::poll!(&mut self._handler)
);
let mut err = yajrc::INTERNAL_ERROR.clone();
err.data = Some(json!("RpcClient thread has terminated"));
Err(err)
}
}
pub struct UnixRpcClient {
pool: Pool<
RpcClient,
Box<dyn Fn() -> BoxFuture<'static, Result<RpcClient, std::io::Error>> + Send + Sync>,
BoxFuture<'static, Result<RpcClient, std::io::Error>>,
std::io::Error,
>,
}
impl UnixRpcClient {
pub fn new(path: PathBuf) -> Self {
let rt = Handle::current();
let id = Arc::new(AtomicUsize::new(0));
Self {
pool: Pool::new(
0,
Box::new(move || {
let path = path.clone();
let id = id.clone();
rt.spawn(async move {
let (r, w) = UnixStream::connect(&path).await?.into_split();
Ok(RpcClient::new(w, r, id))
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.and_then(|x| async move { x })
.boxed()
}),
),
}
}
pub async fn request<T: RpcMethod>(
&self,
method: T,
params: T::Params,
) -> Result<T::Response, RpcError>
where
T: Serialize + Clone,
T::Params: Serialize + Clone,
T::Response: for<'de> Deserialize<'de>,
{
let mut tries = 0;
let res = loop {
tries += 1;
let mut client = self.pool.clone().get().await?;
let res = client.request(method.clone(), params.clone()).await;
match &res {
Err(e) if e.code == yajrc::INTERNAL_ERROR.code => {
client.destroy();
}
_ => break res,
}
if tries > MAX_TRIES {
tracing::warn!("Max Tries exceeded");
break res;
}
};
res
}
}

227
core/helpers/src/rsync.rs Normal file
View File

@@ -0,0 +1,227 @@
use std::path::Path;
use color_eyre::eyre::eyre;
use futures::StreamExt;
use models::{Error, ErrorKind};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use crate::{const_true, ByteReplacementReader, NonDetachingJoinHandle};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RsyncOptions {
#[serde(default = "const_true")]
pub delete: bool,
#[serde(default = "const_true")]
pub force: bool,
#[serde(default)]
pub ignore_existing: bool,
#[serde(default)]
pub exclude: Vec<String>,
#[serde(default = "const_true")]
pub no_permissions: bool,
#[serde(default = "const_true")]
pub no_owner: bool,
}
impl Default for RsyncOptions {
fn default() -> Self {
Self {
delete: true,
force: true,
ignore_existing: false,
exclude: Vec::new(),
no_permissions: false,
no_owner: false,
}
}
}
pub struct Rsync {
pub command: Child,
_progress_task: NonDetachingJoinHandle<Result<(), Error>>,
stderr: NonDetachingJoinHandle<Result<String, Error>>,
pub progress: WatchStream<f64>,
}
impl Rsync {
pub async fn new(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
options: RsyncOptions,
) -> Result<Self, Error> {
let mut cmd = Command::new("rsync");
if options.delete {
cmd.arg("--delete");
}
if options.force {
cmd.arg("--force");
}
if options.ignore_existing {
cmd.arg("--ignore-existing");
}
if options.no_permissions {
cmd.arg("--no-perms");
}
if options.no_owner {
cmd.arg("--no-owner");
}
for exclude in options.exclude {
cmd.arg(format!("--exclude={}", exclude));
}
if options.no_permissions {
cmd.arg("--no-perms");
}
let mut command = cmd
.arg("-actAXH")
.arg("--info=progress2")
.arg("--no-inc-recursive")
.arg(src.as_ref())
.arg(dst.as_ref())
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let cmd_stdout = match command.stdout.take() {
None => {
return Err(Error::new(
eyre!("rsync command stdout is none"),
ErrorKind::Filesystem,
))
}
Some(a) => a,
};
let mut cmd_stderr = match command.stderr.take() {
None => {
return Err(Error::new(
eyre!("rsync command stderr is none"),
ErrorKind::Filesystem,
))
}
Some(a) => a,
};
let (send, recv) = watch::channel(0.0);
let stderr = tokio::spawn(async move {
let mut res = String::new();
cmd_stderr.read_to_string(&mut res).await?;
Ok(res)
})
.into();
let progress_task = tokio::spawn(async move {
let mut lines = BufReader::new(ByteReplacementReader {
replace: b'\r',
with: b'\n',
inner: cmd_stdout,
})
.lines();
while let Some(line) = lines.next_line().await? {
if let Some(percentage) = parse_percentage(&line) {
if percentage > 0.0 {
if let Err(err) = send.send(percentage / 100.0) {
return Err(Error::new(
eyre!("rsync progress send error: {}", err),
ErrorKind::Filesystem,
));
}
}
}
}
Ok(())
})
.into();
let mut progress = WatchStream::new(recv);
progress.next().await;
Ok(Rsync {
command,
_progress_task: progress_task,
stderr,
progress,
})
}
pub async fn wait(mut self) -> Result<(), Error> {
let status = self.command.wait().await?;
let stderr = match self.stderr.await {
Err(err) => {
return Err(Error::new(
eyre!("rsync stderr error: {}", err),
ErrorKind::Filesystem,
))
}
Ok(a) => a?,
};
if status.success() {
tracing::info!("rsync: {}", stderr);
} else {
return Err(Error::new(
eyre!(
"rsync error: {} {} ",
status.code().map(|x| x.to_string()).unwrap_or_default(),
stderr
),
ErrorKind::Filesystem,
));
}
Ok(())
}
}
fn parse_percentage(line: &str) -> Option<f64> {
if let Some(percentage) = line
.split_ascii_whitespace()
.find_map(|col| col.strip_suffix("%"))
{
return percentage.parse().ok();
}
None
}
#[test]
fn test_parse() {
let input = " 1.07G 57% 95.20MB/s 0:00:10 (xfr#1, to-chk=0/2)";
assert_eq!(Some(57.0), parse_percentage(input));
}
#[tokio::test]
async fn test_rsync() {
use futures::StreamExt;
use tokio::fs;
let mut seen_zero = false;
let mut seen_in_between = false;
let mut seen_hundred = false;
fs::remove_dir_all("/tmp/test_rsync")
.await
.unwrap_or_default();
fs::create_dir_all("/tmp/test_rsync/a").await.unwrap();
fs::create_dir_all("/tmp/test_rsync/b").await.unwrap();
for i in 0..100 {
tokio::io::copy(
&mut fs::File::open("/dev/urandom").await.unwrap().take(100_000),
&mut fs::File::create(format!("/tmp/test_rsync/a/sample.{i}.bin"))
.await
.unwrap(),
)
.await
.unwrap();
}
let mut rsync = Rsync::new(
"/tmp/test_rsync/a/",
"/tmp/test_rsync/b/",
Default::default(),
)
.await
.unwrap();
while let Some(progress) = rsync.progress.next().await {
if progress <= 0.05 {
seen_zero = true;
} else if progress > 0.05 && progress < 1.0 {
seen_in_between = true
} else {
seen_hundred = true;
}
}
rsync.wait().await.unwrap();
assert!(seen_zero, "seen zero");
assert!(seen_in_between, "seen in between 0 and 100");
assert!(seen_hundred, "seen 100");
}

View File

@@ -0,0 +1,13 @@
use std::path::{Path, PathBuf};
use models::{PackageId, Version};
pub const PKG_SCRIPT_DIR: &str = "package-data/scripts";
pub fn script_dir<P: AsRef<Path>>(datadir: P, pkg_id: &PackageId, version: &Version) -> PathBuf {
datadir
.as_ref()
.join(&*PKG_SCRIPT_DIR)
.join(pkg_id)
.join(version.as_str())
}