diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index 03a0309a10e..e24950105bb 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -30,6 +30,7 @@ import com.mongodb.event.ServerHeartbeatSucceededEvent; import com.mongodb.event.ServerMonitorListener; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.internal.inject.Provider; @@ -55,6 +56,7 @@ import static com.mongodb.connection.ServerType.UNKNOWN; import static com.mongodb.internal.Locks.checkedWithLock; import static com.mongodb.internal.Locks.withLock; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.connection.CommandHelper.HELLO; import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; import static com.mongodb.internal.connection.CommandHelper.executeCommand; @@ -149,8 +151,14 @@ public void cancelCurrentCheck() { monitor.cancelCurrentCheck(); } + @VisibleForTesting(otherwise = PRIVATE) + ServerMonitor getServerMonitor() { + return monitor; + } + class ServerMonitor extends Thread implements AutoCloseable { private volatile InternalConnection connection = null; + private volatile boolean alreadyLoggedHeartBeatStarted = false; private volatile boolean currentCheckCancelled; ServerMonitor() { @@ -213,9 +221,13 @@ public void run() { private ServerDescription lookupServerDescription(final ServerDescription currentServerDescription) { try { + boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); if (connection == null || connection.isClosed()) { + alreadyLoggedHeartBeatStarted = true; currentCheckCancelled = false; InternalConnection newConnection = internalConnectionFactory.create(serverId); + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + newConnection.getDescription().getConnectionId(), shouldStreamResponses)); newConnection.open(operationContextFactory.create()); connection = newConnection; roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); @@ -225,9 +237,11 @@ private ServerDescription lookupServerDescription(final ServerDescription curren if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Checking status of %s", serverId.getAddress())); } - boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - connection.getDescription().getConnectionId(), shouldStreamResponses)); + if (!alreadyLoggedHeartBeatStarted) { + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + connection.getDescription().getConnectionId(), shouldStreamResponses)); + } + alreadyLoggedHeartBeatStarted = false; long start = System.nanoTime(); try { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy deleted file mode 100644 index c452d757a28..00000000000 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.internal.connection - -import com.mongodb.MongoSocketReadTimeoutException -import com.mongodb.ServerAddress -import com.mongodb.connection.ClusterConnectionMode -import com.mongodb.connection.ClusterId -import com.mongodb.connection.ConnectionDescription -import com.mongodb.connection.ServerConnectionState -import com.mongodb.connection.ServerDescription -import com.mongodb.connection.ServerId -import com.mongodb.connection.ServerSettings -import com.mongodb.connection.ServerType -import com.mongodb.event.ServerHeartbeatFailedEvent -import com.mongodb.event.ServerHeartbeatStartedEvent -import com.mongodb.event.ServerHeartbeatSucceededEvent -import com.mongodb.event.ServerMonitorListener -import com.mongodb.internal.inject.SameObjectProvider -import org.bson.BsonDocument -import org.bson.ByteBufNIO -import spock.lang.Specification - -import java.nio.ByteBuffer -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import static com.mongodb.ClusterFixture.OPERATION_CONTEXT_FACTORY -import static com.mongodb.internal.connection.MessageHelper.LEGACY_HELLO_LOWER - -@SuppressWarnings('BusyWait') -class DefaultServerMonitorSpecification extends Specification { - - DefaultServerMonitor monitor - - def 'close should not send a sendStateChangedEvent'() { - given: - def stateChanged = false - def sdam = new SdamServerDescriptionManager() { - @Override - void update(final ServerDescription candidateDescription) { - assert candidateDescription != null - stateChanged = true - } - - @Override - void handleExceptionBeforeHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { - throw new UnsupportedOperationException() - } - - @Override - void handleExceptionAfterHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { - throw new UnsupportedOperationException() - } - - @Override - SdamServerDescriptionManager.SdamIssue.Context context() { - throw new UnsupportedOperationException() - } - - @Override - SdamServerDescriptionManager.SdamIssue.Context context(final InternalConnection connection) { - throw new UnsupportedOperationException() - } - } - def internalConnectionFactory = Mock(InternalConnectionFactory) { - create(_) >> { - Mock(InternalConnection) { - open(_) >> { sleep(100) } - } - } - } - monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), ServerSettings.builder().build(), - internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, SameObjectProvider.initialized(sdam), - OPERATION_CONTEXT_FACTORY) - - monitor.start() - - when: - monitor.close() - monitor.monitor.join() - - then: - !stateChanged - } - - def 'should send started and succeeded heartbeat events'() { - given: - def latch = new CountDownLatch(1) - def startedEvent - def succeededEvent - def failedEvent - - def serverMonitorListener = new ServerMonitorListener() { - @Override - void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { - startedEvent = event - } - - @Override - void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { - succeededEvent = event - latch.countDown() - } - - @Override - void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { - failedEvent = event - latch.countDown() - } - } - - def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId(''), new ServerAddress())) - def initialServerDescription = ServerDescription.builder() - .ok(true) - .address(new ServerAddress()) - .type(ServerType.STANDALONE) - .state(ServerConnectionState.CONNECTED) - .build() - - def helloResponse = '{' + - "$LEGACY_HELLO_LOWER: true," + - 'maxBsonObjectSize : 16777216, ' + - 'maxMessageSizeBytes : 48000000, ' + - 'maxWriteBatchSize : 1000, ' + - 'localTime : ISODate("2016-04-05T20:36:36.082Z"), ' + - 'maxWireVersion : 4, ' + - 'minWireVersion : 0, ' + - 'ok : 1 ' + - '}' - - def internalConnectionFactory = Mock(InternalConnectionFactory) { - create(_) >> { - Mock(InternalConnection) { - open(_) >> { } - - getBuffer(_) >> { int size -> - new ByteBufNIO(ByteBuffer.allocate(size)) - } - - getDescription() >> { - connectionDescription - } - - getInitialServerDescription() >> { - initialServerDescription - } - - send(_, _, _) >> { } - - receive(_, _) >> { - BsonDocument.parse(helloResponse) - } - } - } - } - monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), - ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(), - internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, mockSdamProvider(), OPERATION_CONTEXT_FACTORY) - - when: - monitor.start() - latch.await(30, TimeUnit.SECONDS) - - then: - failedEvent == null - startedEvent.connectionId == connectionDescription.connectionId - succeededEvent.connectionId == connectionDescription.connectionId - succeededEvent.reply == BsonDocument.parse(helloResponse) - succeededEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0 - - cleanup: - monitor?.close() - } - - def 'should send started and failed heartbeat events'() { - given: - def latch = new CountDownLatch(1) - def startedEvent - def succeededEvent - def failedEvent - - def serverMonitorListener = new ServerMonitorListener() { - @Override - void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { - startedEvent = event - } - - @Override - void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { - succeededEvent = event - latch.countDown() - } - - @Override - void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { - failedEvent = event - latch.countDown() - } - } - - def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId(''), new ServerAddress())) - def initialServerDescription = ServerDescription.builder() - .ok(true) - .address(new ServerAddress()) - .type(ServerType.STANDALONE) - .state(ServerConnectionState.CONNECTED) - .build() - def exception = new MongoSocketReadTimeoutException('read timeout', new ServerAddress(), new IOException()) - - def internalConnectionFactory = Mock(InternalConnectionFactory) { - create(_) >> { - Mock(InternalConnection) { - open(_) >> { } - - getBuffer(_) >> { int size -> - new ByteBufNIO(ByteBuffer.allocate(size)) - } - - getDescription() >> { - connectionDescription - } - - getInitialServerDescription() >> { - initialServerDescription - } - - send(_, _, _) >> { } - - receive(_, _) >> { - throw exception - } - } - } - } - monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), - ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(), - internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, mockSdamProvider(), OPERATION_CONTEXT_FACTORY) - - when: - monitor.start() - latch.await(30, TimeUnit.SECONDS) - - then: - succeededEvent == null - startedEvent.connectionId == connectionDescription.connectionId - failedEvent.connectionId == connectionDescription.connectionId - failedEvent.throwable == exception - failedEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0 - - cleanup: - monitor?.close() - } - - private mockSdamProvider() { - SameObjectProvider.initialized(Mock(SdamServerDescriptionManager)) - } -} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorTest.java new file mode 100644 index 00000000000..c6bc469cc55 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorTest.java @@ -0,0 +1,300 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.connection; + +import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.ServerAddress; +import com.mongodb.connection.ClusterConnectionMode; +import com.mongodb.connection.ClusterId; +import com.mongodb.connection.ConnectionDescription; +import com.mongodb.connection.ServerConnectionState; +import com.mongodb.connection.ServerDescription; +import com.mongodb.connection.ServerId; +import com.mongodb.connection.ServerSettings; +import com.mongodb.connection.ServerType; +import com.mongodb.event.ServerHeartbeatFailedEvent; +import com.mongodb.event.ServerHeartbeatStartedEvent; +import com.mongodb.event.ServerHeartbeatSucceededEvent; +import com.mongodb.event.ServerMonitorListener; +import com.mongodb.event.TestServerMonitorListener; +import com.mongodb.internal.inject.SameObjectProvider; +import org.bson.BsonDocument; +import org.bson.ByteBufNIO; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; + +import java.io.IOException; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT_FACTORY; +import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.internal.connection.MessageHelper.LEGACY_HELLO_LOWER; +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class DefaultServerMonitorTest { + + private DefaultServerMonitor monitor; + + @AfterEach + void tearDown() throws InterruptedException { + if (monitor != null) { + monitor.close(); + monitor.getServerMonitor().join(); + } + } + + @Test + void closeShouldNotSendStateChangedEvent() throws Exception { + // Given + AtomicBoolean stateChanged = new AtomicBoolean(false); + + SdamServerDescriptionManager sdamManager = new SdamServerDescriptionManager() { + @Override + public void update(final ServerDescription candidateDescription) { + assertNotNull(candidateDescription); + stateChanged.set(true); + } + + @Override + public void handleExceptionBeforeHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { + throw new UnsupportedOperationException(); + } + + @Override + public void handleExceptionAfterHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { + throw new UnsupportedOperationException(); + } + + @Override + public SdamServerDescriptionManager.SdamIssue.Context context() { + throw new UnsupportedOperationException(); + } + + @Override + public SdamServerDescriptionManager.SdamIssue.Context context(final InternalConnection connection) { + throw new UnsupportedOperationException(); + } + }; + + InternalConnection mockConnection = mock(InternalConnection.class); + doAnswer(invocation -> { + Thread.sleep(100); + return null; + }).when(mockConnection).open(any()); + + InternalConnectionFactory factory = createConnectionFactory(mockConnection); + + monitor = new DefaultServerMonitor( + new ServerId(new ClusterId(), new ServerAddress()), + ServerSettings.builder().build(), + factory, + ClusterConnectionMode.SINGLE, + null, + false, + SameObjectProvider.initialized(sdamManager), + OPERATION_CONTEXT_FACTORY); + + // When + monitor.start(); + monitor.close(); + + // Then + assertFalse(stateChanged.get()); + } + + @Test + void shouldSendStartedAndSucceededHeartbeatEvents() throws Exception { + // Given + ConnectionDescription connectionDescription = createDefaultConnectionDescription(); + ServerDescription initialServerDescription = createDefaultServerDescription(); + + String helloResponse = "{" + + LEGACY_HELLO_LOWER + ": true," + + "maxBsonObjectSize : 16777216, " + + "maxMessageSizeBytes : 48000000, " + + "maxWriteBatchSize : 1000, " + + "localTime : ISODate(\"2016-04-05T20:36:36.082Z\"), " + + "maxWireVersion : 4, " + + "minWireVersion : 0, " + + "ok : 1 " + + "}"; + + InternalConnection mockConnection = mock(InternalConnection.class); + when(mockConnection.getDescription()).thenReturn(connectionDescription); + when(mockConnection.getInitialServerDescription()).thenReturn(initialServerDescription); + when(mockConnection.getBuffer(anyInt())).thenReturn(new ByteBufNIO(ByteBuffer.allocate(1024))); + when(mockConnection.receive(any(), any())).thenReturn(BsonDocument.parse(helloResponse)); + + // When + TestServerMonitorListener listener = createTestServerMonitorListener(); + monitor = createAndStartMonitor(createConnectionFactory(mockConnection), listener); + + listener.waitForEvents(ServerHeartbeatSucceededEvent.class, event -> true, 1, Duration.ofSeconds(30)); + ServerHeartbeatStartedEvent startedEvent = getEvent(ServerHeartbeatStartedEvent.class, listener); + ServerHeartbeatSucceededEvent succeededEvent = getEvent(ServerHeartbeatSucceededEvent.class, listener); + + // Then + assertEquals(connectionDescription.getConnectionId(), startedEvent.getConnectionId()); + assertEquals(connectionDescription.getConnectionId(), succeededEvent.getConnectionId()); + assertEquals(BsonDocument.parse(helloResponse), succeededEvent.getReply()); + assertTrue(succeededEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0); + } + + @Test + void shouldSendStartedAndFailedHeartbeatEvents() throws Exception { + // Given + ConnectionDescription connectionDescription = createDefaultConnectionDescription(); + ServerDescription initialServerDescription = createDefaultServerDescription(); + MongoSocketReadTimeoutException exception = new MongoSocketReadTimeoutException("read timeout", + new ServerAddress(), new IOException()); + + InternalConnection mockConnection = mock(InternalConnection.class); + when(mockConnection.getDescription()).thenReturn(connectionDescription); + when(mockConnection.getInitialServerDescription()).thenReturn(initialServerDescription); + when(mockConnection.getBuffer(anyInt())).thenReturn(new ByteBufNIO(ByteBuffer.allocate(1024))); + when(mockConnection.receive(any(), any())).thenThrow(exception); + + // When + TestServerMonitorListener listener = createTestServerMonitorListener(); + monitor = createAndStartMonitor(createConnectionFactory(mockConnection), listener); + + listener.waitForEvents(ServerHeartbeatFailedEvent.class, event -> true, 1, Duration.ofSeconds(30)); + ServerHeartbeatStartedEvent startedEvent = getEvent(ServerHeartbeatStartedEvent.class, listener); + ServerHeartbeatFailedEvent failedEvent = getEvent(ServerHeartbeatFailedEvent.class, listener); + + // Then + assertEquals(connectionDescription.getConnectionId(), startedEvent.getConnectionId()); + assertEquals(connectionDescription.getConnectionId(), failedEvent.getConnectionId()); + assertEquals(exception, failedEvent.getThrowable()); + assertTrue(failedEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0); + } + + @Test + void shouldEmitHeartbeatStartedBeforeSocketIsConnected() throws Exception { + // Given + InternalConnection mockConnection = mock(InternalConnection.class); + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + ServerMonitorListener listener = new ServerMonitorListener() { + @Override + public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { + events.add("serverHeartbeatStartedEvent"); + } + + @Override + public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { + events.add("serverHeartbeatSucceededEvent"); + latch.countDown(); + } + + @Override + public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { + events.add("serverHeartbeatFailedEvent"); + latch.countDown(); + } + }; + + doAnswer(invocation -> { + events.add("client connected"); + return null; + }).when(mockConnection).open(any()); + + when(mockConnection.getBuffer(anyInt())).thenReturn(new ByteBufNIO(ByteBuffer.allocate(1024))); + when(mockConnection.getDescription()).thenReturn(createDefaultConnectionDescription()); + when(mockConnection.getInitialServerDescription()).thenReturn(createDefaultServerDescription()); + + doAnswer(invocation -> { + events.add("client hello received"); + throw new SocketException("Socket error"); + }).when(mockConnection).receive(any(), any()); + + // When + monitor = createAndStartMonitor(createConnectionFactory(mockConnection), listener); + assertTrue(latch.await(5, TimeUnit.SECONDS), "Timed out waiting for heartbeat"); + + // Then + List expectedEvents = asList("serverHeartbeatStartedEvent", "client connected", "client hello received", "serverHeartbeatFailedEvent"); + assertEquals(expectedEvents, events); + } + + + private InternalConnectionFactory createConnectionFactory(final InternalConnection connection) { + InternalConnectionFactory factory = mock(InternalConnectionFactory.class); + when(factory.create(any())).thenReturn(connection); + return factory; + } + + private ServerDescription createDefaultServerDescription() { + return ServerDescription.builder() + .ok(true) + .address(new ServerAddress()) + .type(ServerType.STANDALONE) + .state(ServerConnectionState.CONNECTED) + .build(); + } + + private ConnectionDescription createDefaultConnectionDescription() { + return new ConnectionDescription(new ServerId(new ClusterId(""), new ServerAddress())); + } + + private DefaultServerMonitor createAndStartMonitor(final InternalConnectionFactory factory, final ServerMonitorListener listener) { + DefaultServerMonitor monitor = new DefaultServerMonitor( + new ServerId(new ClusterId(), new ServerAddress()), + ServerSettings.builder() + .heartbeatFrequency(500, TimeUnit.MILLISECONDS) + .addServerMonitorListener(listener) + .build(), + factory, + ClusterConnectionMode.SINGLE, + null, + false, + SameObjectProvider.initialized(mock(SdamServerDescriptionManager.class)), + OPERATION_CONTEXT_FACTORY); + monitor.start(); + return monitor; + } + + private T getEvent(final Class clazz, final TestServerMonitorListener listener) { + return listener.getEvents() + .stream() + .filter(clazz::isInstance) + .map(clazz::cast) + .findFirst() + .orElseThrow(AssertionFailedError::new); + } + + private TestServerMonitorListener createTestServerMonitorListener() { + return new TestServerMonitorListener(asList("serverHeartbeatStartedEvent", "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent")); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java index 6b10e475249..e390d4c3afc 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java @@ -155,17 +155,18 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) @Override public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { events.add(event); - heartbeatLatch.countDown(); } @Override public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { events.add(event); + heartbeatLatch.countDown(); } @Override public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { events.add(event); + heartbeatLatch.countDown(); } } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java index 1887b2006cd..77883b6be73 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java @@ -166,7 +166,7 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) * Connection Pool Management. */ @Test - @Ignore + @Ignore("JAVA-4484 - events are not guaranteed to be delivered in order") @SuppressWarnings("try") public void testConnectionPoolManagement() throws InterruptedException { assumeTrue(serverVersionAtLeast(4, 3)); @@ -232,7 +232,7 @@ public void connectionPoolCleared(final ConnectionPoolClearedEvent event) { */ @Test @SuppressWarnings("try") - public void monitorsSleepAtLeastMinHeartbeatFreqencyMSBetweenChecks() { + public void monitorsSleepAtLeastMinHeartbeatFrequencyMSBetweenChecks() { assumeTrue(serverVersionAtLeast(4, 3)); assumeFalse(isServerlessTest()); long defaultMinHeartbeatIntervalMillis = MongoClientSettings.builder().build().getServerSettings() @@ -267,6 +267,13 @@ public void monitorsSleepAtLeastMinHeartbeatFreqencyMSBetweenChecks() { } } + @Test + @Ignore("Run as part of DefaultServerMonitorTest") + public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() { + // The implementation of this test is in DefaultServerMonitorTest.shouldEmitHeartbeatStartedBeforeSocketIsConnected + // As it requires mocking and package access to `com.mongodb.internal.connection` + } + private static void assertPoll(final BlockingQueue queue, @Nullable final Class allowed, final Set> required) throws InterruptedException { assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED));