036 migration (#2750)

* chore: convert to use a value, cause why not

* wip: Add the up for this going up

* wip: trait changes

* wip: Add in some more of the private transformations

* chore(wip): Adding the ssh_keys todo

* wip: Add cifs

* fix migration structure

* chore: Fix the trait for the version

* wip(feat): Notifications are in the system

* fix marker trait hell

* handle key todos

* wip: Testing the migration in a system.

* fix pubkey parser

* fix: migration works

* wip: Trying to get the migration stuff?

* fix: Can now install the packages that we wanted, yay!"

* Merge branch 'next/minor' of github.com:Start9Labs/start-os into feat/migration

---------

Co-authored-by: Aiden McClelland <me@drbonez.dev>
This commit is contained in:
Jade
2024-10-16 10:09:30 -06:00
committed by GitHub
parent 9fc082d1e6
commit fb074c8c32
30 changed files with 1056 additions and 370 deletions

View File

@@ -1,10 +1,15 @@
use std::any::Any;
use std::cmp::Ordering;
use std::panic::{RefUnwindSafe, UnwindSafe};
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::{Future, FutureExt};
use imbl_value::InternedString;
use imbl::Vector;
use imbl_value::{to_value, InternedString};
use patch_db::json_ptr::{JsonPointer, ROOT};
use crate::context::RpcContext;
use crate::db::model::Database;
use crate::prelude::*;
use crate::progress::PhaseProgressTrackerHandle;
@@ -20,10 +25,68 @@ mod v0_3_6_alpha_3;
mod v0_3_6_alpha_4;
mod v0_3_6_alpha_5;
mod v0_3_6_alpha_6;
mod v0_3_6_alpha_7;
pub type Current = v0_3_6_alpha_6::Version; // VERSION_BUMP
impl Current {
#[instrument(skip(self, db))]
pub async fn pre_init(self, db: &PatchDb) -> Result<(), Error> {
let from = from_value::<Version>(
version_accessor(&mut db.dump(&ROOT).await.value)
.or_not_found("`version` in db")?
.clone(),
)?
.as_version_t()?;
match from.semver().cmp(&self.semver()) {
Ordering::Greater => {
db.apply_function(|mut db| {
rollback_to_unchecked(&from, &self, &mut db)?;
Ok::<_, Error>((db, ()))
})
.await?;
}
Ordering::Less => {
let pre_ups = PreUps::load(&from, &self).await?;
db.apply_function(|mut db| {
migrate_from_unchecked(&from, &self, pre_ups, &mut db)?;
Ok::<_, Error>((db, ()))
})
.await?;
}
Ordering::Equal => (),
}
Ok(())
}
}
pub async fn post_init(ctx: &RpcContext) -> Result<(), Error> {
let mut peek;
while let Some(version) = {
peek = ctx.db.peek().await;
peek.as_public()
.as_server_info()
.as_post_init_migration_todos()
.de()?
.first()
.cloned()
.map(Version::from_exver_version)
.as_ref()
.map(Version::as_version_t)
.transpose()?
} {
version.0.post_up(ctx).await?;
ctx.db
.mutate(|db| {
db.as_public_mut()
.as_server_info_mut()
.as_post_init_migration_todos_mut()
.mutate(|m| Ok(m.remove(&version.0.semver())))
})
.await?;
}
Ok(())
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
#[serde(untagged)]
#[allow(non_camel_case_types)]
@@ -39,7 +102,6 @@ enum Version {
V0_3_6_alpha_4(Wrapper<v0_3_6_alpha_4::Version>),
V0_3_6_alpha_5(Wrapper<v0_3_6_alpha_5::Version>),
V0_3_6_alpha_6(Wrapper<v0_3_6_alpha_6::Version>),
V0_3_6_alpha_7(Wrapper<v0_3_6_alpha_7::Version>),
Other(exver::Version),
}
@@ -52,6 +114,32 @@ impl Version {
Version::Other(version)
})
}
fn as_version_t(&self) -> Result<DynVersion, Error> {
Ok(match self {
Self::LT0_3_5(_) => {
return Err(Error::new(
eyre!("cannot migrate from versions before 0.3.5"),
ErrorKind::MigrationFailed,
))
}
Self::V0_3_5(v) => DynVersion(Box::new(v.0)),
Self::V0_3_5_1(v) => DynVersion(Box::new(v.0)),
Self::V0_3_5_2(v) => DynVersion(Box::new(v.0)),
Self::V0_3_6_alpha_0(v) => DynVersion(Box::new(v.0)),
Self::V0_3_6_alpha_1(v) => DynVersion(Box::new(v.0)),
Self::V0_3_6_alpha_2(v) => DynVersion(Box::new(v.0)),
Self::V0_3_6_alpha_3(v) => DynVersion(Box::new(v.0)),
Self::V0_3_6_alpha_4(v) => DynVersion(Box::new(v.0)),
Self::V0_3_6_alpha_5(v) => DynVersion(Box::new(v.0)),
Self::V0_3_6_alpha_6(v) => DynVersion(Box::new(v.0)),
Self::Other(v) => {
return Err(Error::new(
eyre!("unknown version {v}"),
ErrorKind::MigrationFailed,
))
}
})
}
#[cfg(test)]
fn as_exver(&self) -> exver::Version {
match self {
@@ -66,116 +154,246 @@ impl Version {
Version::V0_3_6_alpha_4(Wrapper(x)) => x.semver(),
Version::V0_3_6_alpha_5(Wrapper(x)) => x.semver(),
Version::V0_3_6_alpha_6(Wrapper(x)) => x.semver(),
Version::V0_3_6_alpha_7(Wrapper(x)) => x.semver(),
Version::Other(x) => x.clone(),
}
}
}
fn version_accessor(db: &mut Value) -> Option<&mut Value> {
if db.get("public").is_some() {
db.get_mut("public")?
.get_mut("serverInfo")?
.get_mut("version")
} else {
db.get_mut("server-info")?.get_mut("version")
}
}
fn version_compat_accessor(db: &mut Value) -> Option<&mut Value> {
if db.get("public").is_some() {
db.get_mut("public")?
.get_mut("serverInfo")?
.get_mut("versionCompat")
} else {
db.get_mut("server-info")?.get_mut("eos-version-compat")
}
}
fn post_init_migration_todos_accessor(db: &mut Value) -> Option<&mut Value> {
let server_info = if db.get("public").is_some() {
db.get_mut("public")?.get_mut("serverInfo")?
} else {
db.get_mut("server-info")?
};
if server_info.get("postInitMigrationTodos").is_none() {
server_info
.as_object_mut()?
.insert("postInitMigrationTodos".into(), Value::Array(Vector::new()));
}
server_info.get_mut("postInitMigrationTodos")
}
struct PreUps {
prev: Option<Box<PreUps>>,
value: Box<dyn Any + UnwindSafe + Send + 'static>,
}
impl PreUps {
#[instrument(skip(from, to))]
fn load<'a, VFrom: DynVersionT + ?Sized, VTo: DynVersionT + ?Sized>(
from: &'a VFrom,
to: &'a VTo,
) -> BoxFuture<'a, Result<Self, Error>> {
async {
let previous = to.previous();
let prev = match from.semver().cmp(&previous.semver()) {
Ordering::Less => Some(Box::new(PreUps::load(from, &previous).await?)),
Ordering::Greater => {
return Err(Error::new(
eyre!(
"NO PATH FROM {}, THIS IS LIKELY A MISTAKE IN THE VERSION DEFINITION",
from.semver()
),
crate::ErrorKind::MigrationFailed,
))
}
Ordering::Equal => None,
};
Ok(Self {
prev,
value: to.pre_up().await?,
})
}
.boxed()
}
}
fn migrate_from_unchecked<VFrom: DynVersionT + ?Sized, VTo: DynVersionT + ?Sized>(
from: &VFrom,
to: &VTo,
pre_ups: PreUps,
db: &mut Value,
) -> Result<(), Error> {
let previous = to.previous();
match pre_ups.prev {
Some(prev) if from.semver() < previous.semver() => {
migrate_from_unchecked(from, &previous, *prev, db)?
}
_ if from.semver() > previous.semver() => {
return Err(Error::new(
eyre!(
"NO PATH FROM {}, THIS IS LIKELY A MISTAKE IN THE VERSION DEFINITION",
from.semver()
),
crate::ErrorKind::MigrationFailed,
));
}
_ => (),
};
to.up(db, pre_ups.value)?;
to.commit(db)?;
Ok(())
}
fn rollback_to_unchecked<VFrom: DynVersionT + ?Sized, VTo: DynVersionT + ?Sized>(
from: &VFrom,
to: &VTo,
db: &mut Value,
) -> Result<(), Error> {
let previous = from.previous();
from.down(db)?;
previous.commit(db)?;
if to.semver() < previous.semver() {
rollback_to_unchecked(&previous, to, db)?
} else if to.semver() > previous.semver() {
return Err(Error::new(
eyre!(
"NO PATH TO {}, THIS IS LIKELY A MISTAKE IN THE VERSION DEFINITION",
to.semver()
),
crate::ErrorKind::MigrationFailed,
));
}
Ok(())
}
pub trait VersionT
where
Self: Sized + Send + Sync,
Self: Default + Copy + Sized + RefUnwindSafe + Send + Sync + 'static,
{
type Previous: VersionT;
fn new() -> Self;
type PreUpRes: Send + UnwindSafe;
fn semver(self) -> exver::Version;
fn compat(self) -> &'static exver::VersionRange;
/// MUST NOT change system state. Intended for async I/O reads
fn pre_up(self) -> impl Future<Output = Result<Self::PreUpRes, Error>> + Send + 'static;
fn up(self, db: &mut Value, input: Self::PreUpRes) -> Result<(), Error> {
Ok(())
}
/// MUST be idempotent, and is run after *all* db migrations
fn post_up(self, ctx: &RpcContext) -> impl Future<Output = Result<(), Error>> + Send + 'static {
async { Ok(()) }
}
fn down(self, db: &mut Value) -> Result<(), Error> {
Err(Error::new(
eyre!("downgrades prohibited"),
ErrorKind::InvalidRequest,
))
}
fn commit(self, db: &mut Value) -> Result<(), Error> {
*version_accessor(db).or_not_found("`version` in db")? = to_value(&self.semver())?;
*version_compat_accessor(db).or_not_found("`versionCompat` in db")? =
to_value(self.compat())?;
post_init_migration_todos_accessor(db)
.or_not_found("`serverInfo` in db")?
.as_array_mut()
.ok_or_else(|| {
Error::new(
eyre!("postInitMigrationTodos is not an array"),
ErrorKind::Database,
)
})?
.push_back(to_value(&self.semver())?);
Ok(())
}
}
struct DynVersion(Box<dyn DynVersionT>);
unsafe impl Send for DynVersion {}
trait DynVersionT: RefUnwindSafe + Send + Sync {
fn previous(&self) -> DynVersion;
fn semver(&self) -> exver::Version;
fn compat(&self) -> &'static exver::VersionRange;
fn up(&self, db: &TypedPatchDb<Database>) -> impl Future<Output = Result<(), Error>> + Send;
fn down(&self, db: &TypedPatchDb<Database>) -> impl Future<Output = Result<(), Error>> + Send;
fn commit(
&self,
db: &TypedPatchDb<Database>,
) -> impl Future<Output = Result<(), Error>> + Send {
async {
let semver = self.semver().into();
let compat = self.compat().clone();
db.mutate(|d| {
d.as_public_mut()
.as_server_info_mut()
.as_version_mut()
.ser(&semver)?;
d.as_public_mut()
.as_server_info_mut()
.as_eos_version_compat_mut()
.ser(&compat)?;
Ok(())
})
.await?;
Ok(())
}
fn pre_up(&self) -> BoxFuture<'static, Result<Box<dyn Any + UnwindSafe + Send>, Error>>;
fn up(&self, db: &mut Value, input: Box<dyn Any + Send>) -> Result<(), Error>;
fn post_up<'a>(&self, ctx: &'a RpcContext) -> BoxFuture<'a, Result<(), Error>>;
fn down(&self, db: &mut Value) -> Result<(), Error>;
fn commit(&self, db: &mut Value) -> Result<(), Error>;
}
impl<T> DynVersionT for T
where
T: VersionT,
{
fn previous(&self) -> DynVersion {
DynVersion(Box::new(<Self as VersionT>::Previous::default()))
}
fn migrate_to<V: VersionT>(
&self,
version: &V,
db: &TypedPatchDb<Database>,
progress: &mut PhaseProgressTrackerHandle,
) -> impl Future<Output = Result<(), Error>> + Send {
async {
match self.semver().cmp(&version.semver()) {
Ordering::Greater => self.rollback_to_unchecked(version, db, progress).await,
Ordering::Less => version.migrate_from_unchecked(self, db, progress).await,
Ordering::Equal => Ok(()),
}
}
fn semver(&self) -> exver::Version {
VersionT::semver(*self)
}
fn migrate_from_unchecked<'a, V: VersionT>(
&'a self,
version: &'a V,
db: &'a TypedPatchDb<Database>,
progress: &'a mut PhaseProgressTrackerHandle,
) -> BoxFuture<'a, Result<(), Error>> {
progress.add_total(1);
async {
let previous = Self::Previous::new();
if version.semver() < previous.semver() {
previous
.migrate_from_unchecked(version, db, progress)
.await?;
} else if version.semver() > previous.semver() {
return Err(Error::new(
eyre!(
"NO PATH FROM {}, THIS IS LIKELY A MISTAKE IN THE VERSION DEFINITION",
version.semver()
),
crate::ErrorKind::MigrationFailed,
));
}
tracing::info!("{} -> {}", previous.semver(), self.semver(),);
self.up(db).await?;
self.commit(db).await?;
*progress += 1;
Ok(())
}
.boxed()
fn compat(&self) -> &'static exver::VersionRange {
VersionT::compat(*self)
}
fn rollback_to_unchecked<'a, V: VersionT>(
&'a self,
version: &'a V,
db: &'a TypedPatchDb<Database>,
progress: &'a mut PhaseProgressTrackerHandle,
) -> BoxFuture<'a, Result<(), Error>> {
async {
let previous = Self::Previous::new();
tracing::info!("{} -> {}", self.semver(), previous.semver(),);
self.down(db).await?;
previous.commit(db).await?;
*progress += 1;
if version.semver() < previous.semver() {
previous
.rollback_to_unchecked(version, db, progress)
.await?;
} else if version.semver() > previous.semver() {
return Err(Error::new(
eyre!(
"NO PATH TO {}, THIS IS LIKELY A MISTAKE IN THE VERSION DEFINITION",
version.semver()
),
crate::ErrorKind::MigrationFailed,
));
}
Ok(())
}
.boxed()
fn pre_up(&self) -> BoxFuture<'static, Result<Box<dyn Any + UnwindSafe + Send>, Error>> {
let v = *self;
async move { Ok(Box::new(VersionT::pre_up(v).await?) as Box<dyn Any + UnwindSafe + Send>) }
.boxed()
}
fn up(&self, db: &mut Value, input: Box<dyn Any + Send>) -> Result<(), Error> {
VersionT::up(
*self,
db,
*input.downcast().map_err(|_| {
Error::new(
eyre!("pre_up returned unexpected type"),
ErrorKind::Incoherent,
)
})?,
)
}
fn post_up<'a>(&self, ctx: &'a RpcContext) -> BoxFuture<'a, Result<(), Error>> {
VersionT::post_up(*self, ctx).boxed()
}
fn down(&self, db: &mut Value) -> Result<(), Error> {
VersionT::down(*self, db)
}
fn commit(&self, db: &mut Value) -> Result<(), Error> {
VersionT::commit(*self, db)
}
}
impl DynVersionT for DynVersion {
fn previous(&self) -> DynVersion {
self.0.previous()
}
fn semver(&self) -> exver::Version {
self.0.semver()
}
fn compat(&self) -> &'static exver::VersionRange {
self.0.compat()
}
fn pre_up(&self) -> BoxFuture<'static, Result<Box<dyn Any + UnwindSafe + Send>, Error>> {
self.0.pre_up()
}
fn up(&self, db: &mut Value, input: Box<dyn Any + Send>) -> Result<(), Error> {
self.0.up(db, input)
}
fn post_up<'a>(&self, ctx: &'a RpcContext) -> BoxFuture<'a, Result<(), Error>> {
self.0.post_up(ctx)
}
fn down(&self, db: &mut Value) -> Result<(), Error> {
self.0.down(db)
}
fn commit(&self, db: &mut Value) -> Result<(), Error> {
self.0.commit(db)
}
}
@@ -195,7 +413,7 @@ where
{
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let v = exver::Version::deserialize(deserializer)?;
let version = T::new();
let version = T::default();
if v < version.semver() {
Ok(Self(version, v))
} else {
@@ -220,7 +438,7 @@ where
{
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let v = exver::Version::deserialize(deserializer)?;
let version = T::new();
let version = T::default();
if v == version.semver() {
Ok(Wrapper(version))
} else {
@@ -229,62 +447,6 @@ where
}
}
pub async fn init(
db: &TypedPatchDb<Database>,
mut progress: PhaseProgressTrackerHandle,
) -> Result<(), Error> {
progress.start();
db.mutate(|db| {
db.as_public_mut()
.as_server_info_mut()
.as_version_mut()
.map_mutate(|v| {
Ok(if v == exver::Version::new([0, 3, 6], []) {
v0_3_6_alpha_0::Version::new().semver()
} else {
v
})
})
})
.await?; // TODO: remove before releasing 0.3.6
let version = Version::from_exver_version(
db.peek()
.await
.as_public()
.as_server_info()
.as_version()
.de()?,
);
match version {
Version::LT0_3_5(_) => {
return Err(Error::new(
eyre!("Cannot migrate from pre-0.3.5. Please update to v0.3.5 first."),
ErrorKind::MigrationFailed,
));
}
Version::V0_3_5(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_5_1(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_5_2(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_0(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_1(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_2(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_3(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_4(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_5(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_6(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::V0_3_6_alpha_7(v) => v.0.migrate_to(&Current::new(), &db, &mut progress).await?,
Version::Other(_) => {
return Err(Error::new(
eyre!("Cannot downgrade"),
crate::ErrorKind::InvalidRequest,
))
}
}
progress.complete();
Ok(())
}
pub const COMMIT_HASH: &str =
include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../GIT_HASH.txt"));
@@ -315,17 +477,17 @@ mod tests {
fn versions() -> impl Strategy<Value = Version> {
prop_oneof![
Just(Version::V0_3_5(Wrapper(v0_3_5::Version::new()))),
Just(Version::V0_3_5_1(Wrapper(v0_3_5_1::Version::new()))),
Just(Version::V0_3_5_2(Wrapper(v0_3_5_2::Version::new()))),
Just(Version::V0_3_5(Wrapper(v0_3_5::Version::default()))),
Just(Version::V0_3_5_1(Wrapper(v0_3_5_1::Version::default()))),
Just(Version::V0_3_5_2(Wrapper(v0_3_5_2::Version::default()))),
Just(Version::V0_3_6_alpha_0(Wrapper(
v0_3_6_alpha_0::Version::new()
v0_3_6_alpha_0::Version::default()
))),
Just(Version::V0_3_6_alpha_1(Wrapper(
v0_3_6_alpha_1::Version::new()
v0_3_6_alpha_1::Version::default()
))),
Just(Version::V0_3_6_alpha_2(Wrapper(
v0_3_6_alpha_2::Version::new()
v0_3_6_alpha_2::Version::default()
))),
em_version().prop_map(Version::Other),
]