Skip to content

Commit

Permalink
Merge pull request #128 from Arooba-git/fix-blocking-teardown-code
Browse files Browse the repository at this point in the history
Execute blocking teardown code in vert.x worker pool
  • Loading branch information
aureamunoz authored Jul 21, 2023
2 parents bc0e0af + 57b693a commit 3139a41
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import dev.snowdrop.vertx.kafka.KafkaProducerFactory;
import dev.snowdrop.vertx.kafka.KafkaProperties;
import dev.snowdrop.vertx.kafka.ProducerRecord;
import io.vertx.core.Vertx;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import reactor.core.publisher.Mono;

Expand All @@ -37,12 +38,16 @@ protected void setUp(KafkaProducerFactory producerFactory, KafkaConsumerFactory
}

protected void tearDown() {
producersToCleanup.stream()
.map(KafkaProducer::close)
.forEach(Mono::block);
consumersToCleanup.stream()
.map(KafkaConsumer::close)
.forEach(Mono::block);
Vertx.vertx().executeBlocking(future -> {
producersToCleanup.stream()
.map(KafkaProducer::close)
.forEach(Mono::block);
consumersToCleanup.stream()
.map(KafkaConsumer::close)
.forEach(Mono::block);

future.complete();
});
}

protected <K, V> KafkaProducer<K, V> createProducer() {
Expand Down

0 comments on commit 3139a41

Please sign in to comment.