From 7c0f4a5fe80f4575d4b3b383919811ea9d7de17c Mon Sep 17 00:00:00 2001 From: Jhonatan Hidalgo <108900469+jhonatan-kmt@users.noreply.github.com> Date: Wed, 24 Jul 2024 18:09:01 -0500 Subject: [PATCH] Use bloking IO for health indicator connection --- .../DomainRabbitReactiveHealthIndicator.java | 31 +++++++++---------- ...mainRabbitReactiveHealthIndicatorTest.java | 4 ++- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java index e27f87ab..45794c64 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java @@ -2,30 +2,35 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; -import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; import org.reactivecommons.async.rabbit.config.ConnectionManager; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.actuate.health.Health; import reactor.core.publisher.Mono; import java.net.SocketException; +import java.util.Map; import java.util.stream.Collectors; @Log4j2 -@RequiredArgsConstructor public class DomainRabbitReactiveHealthIndicator extends AbstractReactiveHealthIndicator { private static final String VERSION = "version"; - private final ConnectionManager manager; + private final Map domainProviders; + + public DomainRabbitReactiveHealthIndicator(ConnectionManager manager) { + this.domainProviders = manager.getProviders().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + ConnectionFactory connection = entry.getValue().getProvider().getConnectionFactory().clone(); + connection.useBlockingIo(); + return connection; + })); + } @Override protected Mono doHealthCheck(Health.Builder builder) { - return Mono.zip(manager.getProviders() - .entrySet() - .stream() - .map(entry -> checkSingle(entry.getKey(), entry.getValue().getProvider())) + return Mono.zip(domainProviders.entrySet().stream() + .map(entry -> checkSingle(entry.getKey(), entry.getValue())) .collect(Collectors.toList()), this::merge); } @@ -38,17 +43,11 @@ private Health merge(Object[] results) { return builder.build(); } - private Mono checkSingle(String domain, ConnectionFactoryProvider provider) { - return Mono.defer(() -> getVersion(provider)) + private Mono checkSingle(String domain, ConnectionFactory connectionFactory) { + return Mono.defer(() -> Mono.just(getRawVersion(connectionFactory))) .map(version -> Status.builder().version(version).domain(domain).build()); } - private Mono getVersion(ConnectionFactoryProvider provider) { - return Mono.just(provider) - .map(ConnectionFactoryProvider::getConnectionFactory) - .map(this::getRawVersion); - } - @SneakyThrows private String getRawVersion(ConnectionFactory factory) { Connection connection = null; diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java index fb533280..cff1e660 100644 --- a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java @@ -37,12 +37,14 @@ public class DomainRabbitReactiveHealthIndicatorTest { @BeforeEach void setup() { + when(provider.getConnectionFactory()).thenReturn(factory); + when(factory.clone()).thenReturn(factory); + ConnectionManager connectionManager = new ConnectionManager(); connectionManager.addDomain(DEFAULT_DOMAIN, null, null, provider); connectionManager.addDomain("domain2", null, null, provider); connectionManager.addDomain("domain3", null, null, provider); indicator = new DomainRabbitReactiveHealthIndicator(connectionManager); - when(provider.getConnectionFactory()).thenReturn(factory); } @Test