diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index d9779b5a9c564..3942d599586aa 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -103,7 +103,7 @@ public static void closeCluster() { private static StreamsBuilder builder; private static Properties properties; private static String appId = ""; - public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); + public static final Duration DEFAULT_DURATION = Duration.ofSeconds(60); @BeforeEach public void setup(final TestInfo testInfo) { @@ -272,8 +272,8 @@ public void shouldAddAndRemoveThreadsMultipleTimes() throws InterruptedException stateTransitionHistory.clear(); final CountDownLatch latch = new CountDownLatch(2); - final Thread one = adjustCountHelperThread(kafkaStreams, 4, latch); - final Thread two = adjustCountHelperThread(kafkaStreams, 6, latch); + final Thread one = adjustCountHelperThread(kafkaStreams, 1, latch); + final Thread two = adjustCountHelperThread(kafkaStreams, 1, latch); Set threadMetadata = null; AssertionError testError = null; @@ -284,13 +284,14 @@ public void shouldAddAndRemoveThreadsMultipleTimes() throws InterruptedException assertTrue(latch.await(30, TimeUnit.SECONDS)); one.join(); two.join(); + waitForCondition( () -> kafkaStreams.metadataForLocalThreads().size() == oldThreadCount && kafkaStreams.state() == KafkaStreams.State.RUNNING, DEFAULT_DURATION.toMillis(), "Kafka Streams did not stabilize at the expected thread count and RUNNING state." ); - + threadMetadata = kafkaStreams.metadataForLocalThreads(); assertThat(threadMetadata.size(), equalTo(oldThreadCount)); } catch (final AssertionError e) {