chore: Use send_modify instead of send

This commit is contained in:
BluJ
2023-07-10 10:36:16 -06:00
parent 29b0850a94
commit b7abd878ac
4 changed files with 70 additions and 16 deletions

View File

@@ -443,9 +443,11 @@ async fn restore_package<'a>(
Ok((
progress.clone(),
async move {
tracing::error!("BLUJ downloading {id}");
download_install_s9pk(&ctx, &manifest, None, progress, file, None).await?;
tracing::error!("BLUJ downlowded {id}");
guard.unmount().await?;
tracing::error!("BLUJ unmounted {id}");
Ok(())
}

View File

@@ -18,6 +18,7 @@ use patch_db::{DbHandle, LockType};
use reqwest::Url;
use rpc_toolkit::command;
use rpc_toolkit::yajrc::RpcError;
use serde_json::{json, Value};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio::process::Command;
@@ -60,7 +61,7 @@ pub const PKG_PUBLIC_DIR: &str = "package-data/public";
pub const PKG_WASM_DIR: &str = "package-data/wasm";
#[command(display(display_serializable))]
pub async fn list(#[context] ctx: RpcContext) -> Result<Vec<(PackageId, Version)>, Error> {
pub async fn list(#[context] ctx: RpcContext) -> Result<Value, Error> {
let mut hdl = ctx.db.handle();
let package_data = crate::db::DatabaseModel::new()
.package_data()
@@ -70,11 +71,25 @@ pub async fn list(#[context] ctx: RpcContext) -> Result<Vec<(PackageId, Version)
Ok(package_data
.0
.iter()
.filter_map(|(id, pde)| match pde {
PackageDataEntry::Installed { installed, .. } => {
Some((id.clone(), installed.manifest.version.clone()))
}
_ => None,
.filter_map(|(id, pde)| {
serde_json::to_value(match pde {
PackageDataEntry::Installed { installed, .. } => {
json!({ "status":"installed","id": id.clone(), "version": installed.manifest.version.clone()})
}
PackageDataEntry::Installing { manifest, install_progress, .. } => {
json!({ "status":"installing","id": id.clone(), "version": manifest.version.clone(), "progress": install_progress.clone()})
}
PackageDataEntry::Updating { manifest, installed, install_progress, .. } => {
json!({ "status":"updating","id": id.clone(), "version": installed.manifest.version.clone(), "progress": install_progress.clone()})
}
PackageDataEntry::Restoring { manifest, install_progress, .. } => {
json!({ "status":"restoring","id": id.clone(), "version": manifest.version.clone(), "progress": install_progress.clone()})
}
PackageDataEntry::Removing { manifest, .. } => {
json!({ "status":"removing", "id": id.clone(), "version": manifest.version.clone()})
}
})
.ok()
})
.collect())
}
@@ -1152,6 +1167,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
})
.collect(),
);
tracing::error!("BLUJ 1");
let current_dependents = {
let mut deps = BTreeMap::new();
for package in crate::db::DatabaseModel::new()
@@ -1199,12 +1215,14 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
}
CurrentDependents(deps)
};
tracing::error!("BLUJ 2");
let mut pde = model
.clone()
.expect(&mut tx)
.await?
.get_mut(&mut tx)
.await?;
tracing::error!("BLUJ 3");
let installed = InstalledPackageDataEntry {
status: Status {
configured: manifest.config.is_none(),
@@ -1231,7 +1249,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
current_dependencies: current_dependencies.clone(),
interface_addresses,
};
tracing::error!("BLUJ 4");
let prev = std::mem::replace(
&mut *pde,
PackageDataEntry::Installed {
@@ -1240,7 +1258,9 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
static_files,
},
);
tracing::error!("BLUJ 5");
pde.save(&mut tx).await?;
tracing::error!("BLUJ 6");
let receipts = InstallS9Receipts::new(&mut tx).await?;
// UpdateDependencyReceipts
let mut dep_errs = model
@@ -1253,6 +1273,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
.dependency_errors()
.get_mut(&mut tx)
.await?;
tracing::error!("BLUJ 7");
*dep_errs = DependencyErrors::init(
ctx,
&mut tx,
@@ -1261,7 +1282,9 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.try_heal_receipts,
)
.await?;
tracing::error!("BLUJ 8");
dep_errs.save(&mut tx).await?;
tracing::error!("BLUJ 9");
if let PackageDataEntry::Updating {
installed: prev, ..
@@ -1297,6 +1320,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
migration.or(prev_migration)
};
tracing::error!("BLUJ 9.1");
remove_from_current_dependents_lists(
&mut tx,
pkg_id,
@@ -1304,12 +1328,14 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.current_dependents,
)
.await?; // remove previous
tracing::error!("BLUJ 9.2");
let configured = if let Some(f) = viable_migration {
f.await?.configured && prev_is_configured
} else {
false
};
tracing::error!("BLUJ 9.3");
if configured && manifest.config.is_some() {
let breakages = BTreeMap::new();
let overrides = Default::default();
@@ -1322,6 +1348,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
overrides,
};
crate::config::configure(&ctx, pkg_id, configure_context).await?;
tracing::error!("BLUJ 9.4");
} else {
add_dependent_to_current_dependents_lists(
&mut tx,
@@ -1330,6 +1357,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.current_dependents,
)
.await?; // add new
tracing::error!("BLUJ 9.5");
}
if configured || manifest.config.is_none() {
let mut main_status = crate::db::DatabaseModel::new()
@@ -1344,8 +1372,10 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
.main()
.get_mut(&mut tx)
.await?;
tracing::error!("BLUJ 9.6");
*main_status = prev.status.main;
main_status.save(&mut tx).await?;
tracing::error!("BLUJ 9.7");
}
update_dependency_errors_of_dependents(
ctx,
@@ -1359,10 +1389,13 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.update_dependency_receipts,
)
.await?;
tracing::error!("BLUJ 9.8");
if &prev.manifest.version != version {
cleanup(ctx, &prev.manifest.id, &prev.manifest.version).await?;
}
tracing::error!("BLUJ 9.9");
} else if let PackageDataEntry::Restoring { .. } = prev {
tracing::error!("BLUJ 9.10");
manifest
.backup
.restore(
@@ -1374,6 +1407,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&manifest.volumes,
)
.await?;
tracing::error!("BLUJ 9.11");
add_dependent_to_current_dependents_lists(
&mut tx,
pkg_id,
@@ -1381,6 +1415,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.current_dependents,
)
.await?;
tracing::error!("BLUJ 9.12");
update_dependency_errors_of_dependents(
ctx,
&mut tx,
@@ -1389,7 +1424,9 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.update_dependency_receipts,
)
.await?;
tracing::error!("BLUJ 9.13");
} else {
tracing::error!("BLUJ 9.14");
add_dependent_to_current_dependents_lists(
&mut tx,
pkg_id,
@@ -1397,6 +1434,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.current_dependents,
)
.await?;
tracing::error!("BLUJ 9.15");
update_dependency_errors_of_dependents(
ctx,
&mut tx,
@@ -1405,15 +1443,20 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
&receipts.config.update_dependency_receipts,
)
.await?;
tracing::error!("BLUJ 9.16");
}
tracing::error!("BLUJ 10");
if let Some(installed) = pde.installed() {
reconfigure_dependents_with_live_pointers(ctx, &mut tx, &receipts.config, installed)
.await?;
}
tracing::error!("BLUJ 11");
sql_tx.commit().await?;
tracing::error!("BLUJ 12");
tx.commit().await?;
tracing::error!("BLUJ 13");
tracing::info!("Install {}@{}: Complete", pkg_id, version);

View File

@@ -61,17 +61,16 @@ impl ManageContainer {
pub fn set_override(&self, override_status: Option<MainStatus>) -> GeneralBoxedGuard {
self.override_main_status
.send(override_status)
.unwrap_or_default();
.send_modify(|x| *x = override_status);
let override_main_status = self.override_main_status.clone();
let guard = GeneralBoxedGuard::new(move || {
override_main_status.send(None).unwrap_or_default();
override_main_status.send_modify(|x| *x = None);
});
guard
}
pub fn to_desired(&self, new_state: StartStop) {
self.desired_state.send(new_state).unwrap_or_default();
self.desired_state.send_modify(|x| *x = new_state);
}
pub async fn wait_for_desired(&self, new_state: StartStop) {
@@ -123,7 +122,7 @@ async fn create_service_manager(
}
}
}
current_state.send(StartStop::Stop).unwrap_or_default();
current_state.send_modify(|x| *x = StartStop::Stop);
}
(StartStop::Stop, StartStop::Start) => starting_service(
current_state.clone(),
@@ -201,10 +200,10 @@ fn starting_service(
let set_running = {
let current_state = current_state.clone();
Arc::new(move || {
current_state.send(StartStop::Start).unwrap_or_default();
current_state.send_modify(|x| *x = StartStop::Start);
})
};
let set_stopped = { move || current_state.send(StartStop::Stop) };
let set_stopped = { move || current_state.send_modify(|x| *x = StartStop::Stop) };
let running_main_loop = async move {
while desired_state.borrow().is_start() {
let result = run_main(
@@ -213,7 +212,7 @@ fn starting_service(
set_running.clone(),
)
.await;
set_stopped().unwrap_or_default();
set_stopped();
run_main_log_result(result, seed.clone()).await;
}
};

View File

@@ -228,10 +228,12 @@ impl DockerProcedure {
input: Option<I>,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error> {
tracing::error!("BLUJ Backup 1");
let name = name.docker_name();
let name: Option<&str> = name.as_ref().map(|x| &**x);
let mut cmd = tokio::process::Command::new("docker");
let container_name = Self::container_name(pkg_id, name);
tracing::error!("BLUJ Backup 2");
cmd.arg("run")
.arg("--rm")
.arg("--network=start9")
@@ -241,6 +243,7 @@ impl DockerProcedure {
.arg(format!("--hostname={}", &container_name))
.arg("--no-healthcheck")
.kill_on_drop(true);
tracing::error!("BLUJ Backup 3");
match ctx
.docker
.remove_container(
@@ -260,7 +263,9 @@ impl DockerProcedure {
}) => Ok(()),
Err(e) => Err(e),
}?;
tracing::error!("BLUJ Backup 4");
cmd.args(self.docker_args(ctx, pkg_id, pkg_version, volumes).await?);
tracing::error!("BLUJ Backup 5");
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
cmd.stdin(std::process::Stdio::piped());
Some(format.to_vec(input)?)
@@ -276,7 +281,9 @@ impl DockerProcedure {
.collect::<Vec<&str>>()
.join(" ")
);
tracing::error!("BLUJ Backup 6");
let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?;
tracing::error!("BLUJ Backup 7");
let id = handle.id();
let timeout_fut = if let Some(timeout) = timeout {
EitherFuture::Right(async move {
@@ -289,12 +296,15 @@ impl DockerProcedure {
};
if let (Some(input), Some(mut stdin)) = (&input_buf, handle.stdin.take()) {
use tokio::io::AsyncWriteExt;
tracing::error!("BLUJ Backup 8");
stdin
.write_all(input)
.await
.with_kind(crate::ErrorKind::Docker)?;
stdin.flush().await?;
tracing::error!("BLUJ Backup 9");
stdin.shutdown().await?;
tracing::error!("BLUJ Backup 10");
drop(stdin);
}
enum Race<T> {