disable concurrency and delete tmpdir before retry (#1846)

* disable concurrency and delete tmpdir before retry

* undo retry

* really limit usage of pgloader

* configurable

* no migrate notifications
This commit is contained in:
Aiden McClelland
2022-10-04 15:14:15 -06:00
committed by GitHub
parent 1388632562
commit 0c3d0dd525
4 changed files with 34 additions and 20 deletions

View File

@@ -41,6 +41,8 @@ use crate::{Error, ErrorKind, ResultExt};
#[derive(Debug, Default, Deserialize)] #[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub struct RpcContextConfig { pub struct RpcContextConfig {
pub migration_batch_rows: Option<usize>,
pub migration_prefetch_rows: Option<usize>,
pub bind_rpc: Option<SocketAddr>, pub bind_rpc: Option<SocketAddr>,
pub bind_ws: Option<SocketAddr>, pub bind_ws: Option<SocketAddr>,
pub bind_static: Option<SocketAddr>, pub bind_static: Option<SocketAddr>,
@@ -100,14 +102,12 @@ impl RpcContextConfig {
.with_kind(crate::ErrorKind::Database)?; .with_kind(crate::ErrorKind::Database)?;
let old_db_path = self.datadir().join("main/secrets.db"); let old_db_path = self.datadir().join("main/secrets.db");
if tokio::fs::metadata(&old_db_path).await.is_ok() { if tokio::fs::metadata(&old_db_path).await.is_ok() {
let mut res = Ok(()); pgloader(
for _ in 0..5 { &old_db_path,
res = pgloader(&old_db_path).await; self.migration_batch_rows.unwrap_or(25000),
if res.is_ok() { self.migration_prefetch_rows.unwrap_or(100_000),
break; )
} .await?;
}
res?
} }
Ok(secret_store) Ok(secret_store)
} }

View File

@@ -36,6 +36,8 @@ pub struct SetupResult {
#[derive(Debug, Default, Deserialize)] #[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub struct SetupContextConfig { pub struct SetupContextConfig {
pub migration_batch_rows: Option<usize>,
pub migration_prefetch_rows: Option<usize>,
pub bind_rpc: Option<SocketAddr>, pub bind_rpc: Option<SocketAddr>,
pub datadir: Option<PathBuf>, pub datadir: Option<PathBuf>,
} }
@@ -64,6 +66,8 @@ impl SetupContextConfig {
pub struct SetupContextSeed { pub struct SetupContextSeed {
pub config_path: Option<PathBuf>, pub config_path: Option<PathBuf>,
pub migration_batch_rows: usize,
pub migration_prefetch_rows: usize,
pub bind_rpc: SocketAddr, pub bind_rpc: SocketAddr,
pub shutdown: Sender<()>, pub shutdown: Sender<()>,
pub datadir: PathBuf, pub datadir: PathBuf,
@@ -92,6 +96,8 @@ impl SetupContext {
let datadir = cfg.datadir().to_owned(); let datadir = cfg.datadir().to_owned();
Ok(Self(Arc::new(SetupContextSeed { Ok(Self(Arc::new(SetupContextSeed {
config_path: path.as_ref().map(|p| p.as_ref().to_owned()), config_path: path.as_ref().map(|p| p.as_ref().to_owned()),
migration_batch_rows: cfg.migration_batch_rows.unwrap_or(25000),
migration_prefetch_rows: cfg.migration_prefetch_rows.unwrap_or(100_000),
bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()), bind_rpc: cfg.bind_rpc.unwrap_or(([127, 0, 0, 1], 5959).into()),
shutdown, shutdown,
datadir, datadir,
@@ -141,14 +147,12 @@ impl SetupContext {
.with_kind(crate::ErrorKind::Database)?; .with_kind(crate::ErrorKind::Database)?;
let old_db_path = self.datadir.join("main/secrets.db"); let old_db_path = self.datadir.join("main/secrets.db");
if tokio::fs::metadata(&old_db_path).await.is_ok() { if tokio::fs::metadata(&old_db_path).await.is_ok() {
let mut res = Ok(()); pgloader(
for _ in 0..5 { &old_db_path,
res = pgloader(&old_db_path).await; self.migration_batch_rows,
if res.is_ok() { self.migration_prefetch_rows,
break; )
} .await?;
}
res?
} }
Ok(secret_store) Ok(secret_store)
} }

View File

@@ -75,15 +75,25 @@ impl InitReceipts {
} }
} }
pub async fn pgloader(old_db_path: impl AsRef<Path>) -> Result<(), Error> { pub async fn pgloader(
old_db_path: impl AsRef<Path>,
batch_rows: usize,
prefetch_rows: usize,
) -> Result<(), Error> {
tokio::fs::write( tokio::fs::write(
"/etc/embassy/migrate.load", "/etc/embassy/migrate.load",
format!( format!(
include_str!("migrate.load"), include_str!("migrate.load"),
sqlite_path = old_db_path.as_ref().display() sqlite_path = old_db_path.as_ref().display(),
batch_rows = batch_rows,
prefetch_rows = prefetch_rows
), ),
) )
.await?; .await?;
match tokio::fs::remove_dir_all("/tmp/pgloader").await {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
a => a,
}?;
tracing::info!("Running pgloader"); tracing::info!("Running pgloader");
let out = Command::new("pgloader") let out = Command::new("pgloader")
.arg("-v") .arg("-v")

View File

@@ -2,6 +2,6 @@ load database
from sqlite://{sqlite_path} from sqlite://{sqlite_path}
into postgresql://root@unix:/var/run/postgresql:5432/secrets into postgresql://root@unix:/var/run/postgresql:5432/secrets
with include no drop, truncate, reset sequences, data only with include no drop, truncate, reset sequences, data only, workers = 1, concurrency = 1, max parallel create index = 1, batch rows = {batch_rows}, prefetch rows = {prefetch_rows}
excluding table names like '_sqlx_migrations'; excluding table names like '_sqlx_migrations', 'notifications';