diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index e54615cc77..8be03d2280 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.stream.NodeState; import org.apache.kafka.controller.stream.OverloadCircuitBreaker; import org.apache.kafka.metadata.placement.UsableBroker; @@ -83,6 +84,16 @@ static class BrokerHeartbeatState { */ private long controlledShutdownOffset; + // AutoMQ inject start + /** + * The last time the broker was controlled shutdown, in monotonic nanoseconds, or 0 + * if the broker has never been controlled shutdown since the most recent start. + * It will be updated on receiving a broker heartbeat with controlled shutdown request. + * It will be reset to 0 when the broker is active again. + */ + private long lastControlledShutdownNs; + // AutoMQ inject end + /** * The previous entry in the unfenced list, or null if the broker is not in that list. */ @@ -100,6 +111,9 @@ static class BrokerHeartbeatState { this.next = null; this.metadataOffset = -1; this.controlledShutdownOffset = -1; + // AutoMQ inject start + this.lastControlledShutdownNs = 0; + // AutoMQ inject end } /** @@ -122,6 +136,12 @@ boolean fenced() { boolean shuttingDown() { return controlledShutdownOffset >= 0; } + + // AutoMQ inject start + long lastControlledShutdownNs() { + return lastControlledShutdownNs; + } + // AutoMQ inject end } static class MetadataOffsetComparator implements Comparator { @@ -441,6 +461,9 @@ void maybeUpdateControlledShutdownOffset(int brokerId, long controlledShutDownOf throw new RuntimeException("Fenced brokers cannot enter controlled shutdown."); } active.remove(broker); + // AutoMQ inject start + broker.lastControlledShutdownNs = time.nanoseconds(); + // AutoMQ inject end if (broker.controlledShutdownOffset < 0) { broker.controlledShutdownOffset = controlledShutDownOffset; log.debug("Updated the controlled shutdown offset for broker {} to {}.", @@ -489,6 +512,24 @@ Iterator usableBrokers( } // AutoMQ inject start + public NodeState brokerState(int brokerId, long shutdownTimeoutNs) { + BrokerHeartbeatState broker = brokers.get(brokerId); + if (broker == null) { + return NodeState.UNKNOWN; + } + if (broker.shuttingDown()) { + return NodeState.CONTROLLED_SHUTDOWN; + } + if (broker.fenced()) { + if (broker.lastControlledShutdownNs() + shutdownTimeoutNs > time.nanoseconds()) { + // The broker is still in controlled shutdown. + return NodeState.CONTROLLED_SHUTDOWN; + } + return NodeState.FENCED; + } + return NodeState.ACTIVE; + } + long nextCheckTimeNs() { if (overloadCircuitBreaker.isOverload()) { return Long.MAX_VALUE; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 428891ec4d..7bd2ad27ed 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -1003,5 +1003,9 @@ public List getActiveBrokers() { .filter(b -> isActive(b.id())) .collect(Collectors.toList()); } + + public BrokerHeartbeatManager getHeartbeatManager() { + return heartbeatManager; + } // AutoMQ inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java index e71d391afa..51a84026c7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java @@ -11,10 +11,14 @@ package org.apache.kafka.controller.stream; +import org.apache.kafka.controller.BrokerHeartbeatManager; import org.apache.kafka.controller.ClusterControlManager; -import org.apache.kafka.metadata.BrokerRegistration; + +import java.util.concurrent.TimeUnit; public class DefaultNodeRuntimeInfoGetter implements NodeRuntimeInfoGetter { + private static final long SHUTDOWN_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(60); + private final ClusterControlManager clusterControlManager; private final StreamControlManager streamControlManager; @@ -25,17 +29,12 @@ public DefaultNodeRuntimeInfoGetter(ClusterControlManager clusterControlManager, @Override public NodeState state(int nodeId) { - BrokerRegistration brokerRegistration = clusterControlManager.registration(nodeId); - if (brokerRegistration == null) { + BrokerHeartbeatManager brokerHeartbeatManager = clusterControlManager.getHeartbeatManager(); + if (null == brokerHeartbeatManager) { + // This controller is not the active controller, so we don't have the heartbeat manager. return NodeState.UNKNOWN; } - if (brokerRegistration.fenced()) { - return NodeState.FENCED; - } - if (brokerRegistration.inControlledShutdown()) { - return NodeState.CONTROLLED_SHUTDOWN; - } - return NodeState.ACTIVE; + return brokerHeartbeatManager.brokerState(nodeId, SHUTDOWN_TIMEOUT_NS); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java index 108b08d28e..c21fcd8114 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java @@ -11,6 +11,25 @@ package org.apache.kafka.controller.stream; +import org.apache.kafka.controller.BrokerControlState; + public enum NodeState { - ACTIVE, FENCED, CONTROLLED_SHUTDOWN, UNKNOWN + /** + * The node is active and can handle requests. + */ + ACTIVE, + /** + * The node is shut down and cannot handle requests. + */ + FENCED, + /** + * The node is shutting down in a controlled manner. + * Note: In AutoMQ, this state is different from {@link BrokerControlState#CONTROLLED_SHUTDOWN}. In some cases, + * a node in {@link BrokerControlState#FENCED} state may still be shutting down in a controlled manner. + */ + CONTROLLED_SHUTDOWN, + /** + * The state of the node is unknown, possibly because it has not yet registered. + */ + UNKNOWN } diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index 9a8776f721..9c96de0b8b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateIterator; import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateList; import org.apache.kafka.controller.BrokerHeartbeatManager.UsableBrokerIterator; +import org.apache.kafka.controller.stream.NodeState; import org.apache.kafka.metadata.placement.UsableBroker; import org.junit.jupiter.api.Test; @@ -362,4 +363,43 @@ public void testTouchThrowsExceptionUnlessRegistered() { assertThrows(IllegalStateException.class, () -> manager.touch(4, false, 0)).getMessage()); } + + // AutoMQ inject start + @Test + public void testBrokerState() { + final long shutdownTimeoutNs = 10_000_000; // 10ms + // init + BrokerHeartbeatManager manager = newBrokerHeartbeatManager(); + manager.time().sleep(1000); + manager.register(0, true); + + // FENCED Broker + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); + + // UNFENCED Broker + manager.touch(0, false, 100); + assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs)); + + // CONTROLLED_SHUTDOWN Broker + manager.maybeUpdateControlledShutdownOffset(0, 100); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + + // SHUTDOWN_NOW Broker within shutdownTimeoutNs + manager.touch(0, true, 100); + manager.time().sleep(5); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + + // SHUTDOWN_NOW Broker after shutdownTimeoutNs + manager.time().sleep(6); + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); + + // UNFENCED Broker after SHUTDOWN + manager.touch(0, false, 100); + assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs)); + + // UNREGISTERED Broker + manager.remove(0); + assertEquals(NodeState.UNKNOWN, manager.brokerState(0, shutdownTimeoutNs)); + } + // AutoMQ inject end } diff --git a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java index 7aadb9403a..b725d9e69a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java @@ -107,6 +107,7 @@ public void testRegister() { assertEquals(0, nodes.get(0).nodeId()); assertEquals(2L, nodes.get(0).nodeEpoch()); assertEquals("wal2", nodes.get(0).walConfig()); + assertEquals(NodeState.FENCED.name(), nodes.get(0).state()); } AutomqRegisterNodeRequestData.TagCollection tags(Map tags) {