Skip to content

Commit

Permalink
test: Eliminate deadlock in DebugStashFactory (#22117)
Browse files Browse the repository at this point in the history
Previously, in the Drop impl of DebugStashFactory we were cleaning up
the test schema created by the DebugStashFactory. In order to run the
DROP command, we were blocking the async runtime, since the Drop impl
can't be async. There were often other async tasks that had active CRDB
transactions, especially when tests were run in parallel, that were now
sitting idle because the runtime was blocked. The DROP SCHEMA command
was waiting for these transactions to complete which caused a deadlock.
CRDB eventually aborted active transactions after 5 minutes, but this
was too long for tests that should take under a minute. A previous fix
was to lower the idle transaction timeout to 1 second.

This commit updates DebugStashFactory so that the CRDB state is cleaned
up in an async function that must be called before dropping the
DebugStashFactory. The solution no longer blocks the entire runtime, so
we don't have to abort transactions from parallel tests.

Works towards resolving #21891
  • Loading branch information
jkosh44 authored Oct 2, 2023
1 parent 001808d commit 2f59227
Show file tree
Hide file tree
Showing 12 changed files with 1,222 additions and 1,184 deletions.
6 changes: 5 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3506,7 +3506,9 @@ impl Catalog {
let catalog = Self::open_debug_stash_catalog_factory(&debug_stash_factory, now)
.await
.expect("unable to open debug stash");
f(catalog).await
let res = f(catalog).await;
debug_stash_factory.drop().await;
res
}

/// Opens a debug stash backed catalog using `debug_stash_factory`.
Expand Down Expand Up @@ -8051,6 +8053,7 @@ mod tests {
// Re-opening the same stash resets the transient_revision to 1.
assert_eq!(catalog.transient_revision(), 1);
}
debug_stash_factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand Down Expand Up @@ -8944,6 +8947,7 @@ mod tests {
item => panic!("expected view, got {}", item.typ()),
}
}
debug_stash_factory.drop().await;
}

#[mz_ore::test]
Expand Down
20 changes: 15 additions & 5 deletions src/catalog/tests/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ use mz_stash::DebugStashFactory;
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_is_initialized() {
let (_debug_factory, stash_config) = stash_config().await;
let (debug_factory, stash_config) = stash_config().await;
let openable_state = stash_backed_catalog_state(stash_config);
is_initialized(openable_state).await;
debug_factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -108,6 +109,7 @@ async fn test_debug_is_initialized() {
let debug_factory = DebugStashFactory::new().await;
let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory);
is_initialized(debug_openable_state).await;
debug_factory.drop().await;
}

async fn is_initialized<D: DurableCatalogState>(
Expand Down Expand Up @@ -137,9 +139,10 @@ async fn is_initialized<D: DurableCatalogState>(
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_get_deployment_generation() {
let (_debug_factory, stash_config) = stash_config().await;
let (debug_factory, stash_config) = stash_config().await;
let openable_state = stash_backed_catalog_state(stash_config);
get_deployment_generation(openable_state).await;
debug_factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -148,6 +151,7 @@ async fn test_debug_get_deployment_generation() {
let debug_factory = DebugStashFactory::new().await;
let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory);
get_deployment_generation(debug_openable_state).await;
debug_factory.drop().await;
}

async fn get_deployment_generation<D: DurableCatalogState>(
Expand Down Expand Up @@ -180,9 +184,10 @@ async fn get_deployment_generation<D: DurableCatalogState>(
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_open_check() {
let (_debug_factory, stash_config) = stash_config().await;
let (debug_factory, stash_config) = stash_config().await;
let openable_state = stash_backed_catalog_state(stash_config);
open_check(openable_state).await;
debug_factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -191,6 +196,7 @@ async fn test_debug_open_check() {
let debug_factory = DebugStashFactory::new().await;
let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory);
open_check(debug_openable_state).await;
debug_factory.drop().await;
}

async fn open_check<D: DurableCatalogState>(
Expand Down Expand Up @@ -256,9 +262,10 @@ async fn open_check<D: DurableCatalogState>(
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_open_read_only() {
let (_debug_factory, stash_config) = stash_config().await;
let (debug_factory, stash_config) = stash_config().await;
let openable_state = stash_backed_catalog_state(stash_config);
open_read_only(openable_state).await;
debug_factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -267,6 +274,7 @@ async fn test_debug_open_read_only() {
let debug_factory = DebugStashFactory::new().await;
let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory);
open_read_only(debug_openable_state).await;
debug_factory.drop().await;
}

async fn open_read_only<D: DurableCatalogState>(
Expand Down Expand Up @@ -303,9 +311,10 @@ async fn open_read_only<D: DurableCatalogState>(
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_open() {
let (_debug_factory, stash_config) = stash_config().await;
let (debug_factory, stash_config) = stash_config().await;
let openable_state = stash_backed_catalog_state(stash_config);
open(openable_state).await;
debug_factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -314,6 +323,7 @@ async fn test_debug_open() {
let debug_factory = DebugStashFactory::new().await;
let debug_openable_state = debug_stash_backed_catalog_state(&debug_factory);
open(debug_openable_state).await;
debug_factory.drop().await;
}

async fn open<D: DurableCatalogState>(mut openable_state: impl OpenableDurableCatalogState<D>) {
Expand Down
74 changes: 29 additions & 45 deletions src/stash/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,9 @@ impl Stash {
{
let factory = DebugStashFactory::try_new().await?;
let stash = factory.try_open().await?;
Ok(f(stash).await)
let res = Ok(f(stash).await);
factory.drop().await;
res
}

/// Verifies stash invariants. Should only be called by tests.
Expand Down Expand Up @@ -1398,11 +1400,14 @@ pub struct DebugStashFactory {
#[derivative(Debug = "ignore")]
tls: MakeTlsConnector,
stash_factory: StashFactory,
dropped: bool,
}

impl DebugStashFactory {
/// Returns a new factory that will generate a random schema one time, then use it on any
/// opened Stash.
///
/// IMPORTANT: Call [`Self::drop`] when you are done to clean up leftover state in CRDB.
pub async fn try_new() -> Result<DebugStashFactory, StashError> {
let url =
std::env::var("COCKROACH_URL").expect("COCKROACH_URL environment variable is not set");
Expand All @@ -1416,17 +1421,6 @@ impl DebugStashFactory {
tracing::error!("postgres stash connection error: {e}");
}
});
// Running tests in parallel has been causing some deadlock/starvation issue that we haven't
// been able to figure out. For some reason, uncommitted transactions are being left open
// which causes the schema cleanup in the Drop impl to block for an enormous amount of time
// (many minutes). Setting a small idle transaction timeout globally seems to resolve the
// issue somehow. This is a gross hack and we don't really understand what's going on, but
// for now it resolves issues in CI while we debug further.
client
.batch_execute(
"SET CLUSTER SETTING sql.defaults.idle_in_transaction_session_timeout TO '1s'",
)
.await?;
client
.batch_execute(&format!("CREATE SCHEMA {schema}"))
.await?;
Expand All @@ -1438,12 +1432,15 @@ impl DebugStashFactory {
schema,
tls,
stash_factory,
dropped: false,
})
}

/// Returns a new factory that will generate a random schema one time, then use it on any
/// opened Stash.
///
/// IMPORTANT: Call [`Self::drop`] when you are done to clean up leftover state in CRDB.
///
/// # Panics
/// Panics if it is unable to create a new factory.
pub async fn new() -> DebugStashFactory {
Expand Down Expand Up @@ -1497,6 +1494,21 @@ impl DebugStashFactory {
.expect("unable to open debug stash")
}

/// Best effort clean up of testing state in CRDB, any error is ignored.
pub async fn drop(mut self) {
let Ok((client, connection)) = tokio_postgres::connect(&self.url, self.tls.clone()).await
else {
return;
};
mz_ore::task::spawn(|| "tokio-postgres stash connection", async move {
let _ = connection.await;
});
let _ = client
.batch_execute(&format!("DROP SCHEMA {} CASCADE", &self.schema))
.await;
self.dropped = true;
}

pub fn url(&self) -> &str {
&self.url
}
Expand All @@ -1513,38 +1525,10 @@ impl DebugStashFactory {

impl Drop for DebugStashFactory {
fn drop(&mut self) {
let url = self.url.clone();
let schema = self.schema.clone();
let tls = self.tls.clone();
let result = std::thread::spawn(move || {
let async_runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
async_runtime.block_on(async {
let (client, connection) = tokio_postgres::connect(&url, tls).await?;
mz_ore::task::spawn(|| "tokio-postgres stash connection", async move {
if let Err(e) = connection.await {
std::panic::resume_unwind(Box::new(e));
}
});
client
.batch_execute(&format!("DROP SCHEMA {} CASCADE", &schema))
.await?;
Ok::<_, StashError>(())
})
})
// Note that we are joining on a tokio task here, which blocks the current runtime from making other progress on the current worker thread.
// Because this only happens on shutdown and is only used in tests, we have determined that its okay
.join();

match result {
Ok(result) => {
if let Err(e) = result {
std::panic::resume_unwind(Box::new(e));
}
}

Err(e) => std::panic::resume_unwind(e),
}
assert!(
self.dropped,
"You forgot to call `drop()` on a `DebugStashFactory` before dropping it! You may also \
see this if a test panicked before calling `drop()`."
);
}
}
5 changes: 5 additions & 0 deletions src/stash/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ async fn test_stash_readonly() {
// The previous stash should still be the leader.
assert!(stash_rw.confirm_leadership().await.is_ok());
stash_rw.verify().await.unwrap();
factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand Down Expand Up @@ -726,6 +727,7 @@ async fn test_stash_savepoint() {
BTreeMap::from([(1, 2)])
);
stash_rw.verify().await.unwrap();
factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -739,6 +741,7 @@ async fn test_stash_fence() {
_ => panic!("expected error"),
});
let _: StashCollection<String, String> = collection(&mut conn2, "c").await.unwrap();
factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -747,6 +750,7 @@ async fn test_stash_append() {
let factory = DebugStashFactory::try_new().await.expect("must succeed");
test_append(|| async { factory.open().await }).await;
factory.open().await.verify().await.unwrap();
factory.drop().await;
}

#[mz_ore::test(tokio::test)]
Expand All @@ -755,6 +759,7 @@ async fn test_stash_stash() {
let factory = DebugStashFactory::try_new().await.expect("must succeed");
test_stash(|| async { factory.open().await }).await;
factory.open().await.verify().await.unwrap();
factory.drop().await;
}

async fn make_batch<K, V>(
Expand Down
Loading

0 comments on commit 2f59227

Please sign in to comment.