Skip to content

Commit

Permalink
refactor(controller): consider brokers that has recently `CONTROLLED_…
Browse files Browse the repository at this point in the history
…SHUTDOWN` as `SHUTTING_DOWN` (#2261)

* refactor(controller): consider brokers that has recently `CONTROLLED_SHUTDOWN` as `SHUTTING_DOWN`

Signed-off-by: Ning Yu <[email protected]>

* test: test `BrokerHeartbeatManager#brokerState`

Signed-off-by: Ning Yu <[email protected]>

* revert(NodeState): revert `SHUTDOWN` and `SHUTTING_DOWN` to `FENCED` and `CONTROLLED_SHUTDOWN`

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Jan 8, 2025
1 parent 273c134 commit 6b47359
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.stream.NodeState;
import org.apache.kafka.controller.stream.OverloadCircuitBreaker;
import org.apache.kafka.metadata.placement.UsableBroker;

Expand Down Expand Up @@ -83,6 +84,16 @@ static class BrokerHeartbeatState {
*/
private long controlledShutdownOffset;

// AutoMQ inject start
/**
* The last time the broker was controlled shutdown, in monotonic nanoseconds, or 0
* if the broker has never been controlled shutdown since the most recent start.
* It will be updated on receiving a broker heartbeat with controlled shutdown request.
* It will be reset to 0 when the broker is active again.
*/
private long lastControlledShutdownNs;
// AutoMQ inject end

/**
* The previous entry in the unfenced list, or null if the broker is not in that list.
*/
Expand All @@ -100,6 +111,9 @@ static class BrokerHeartbeatState {
this.next = null;
this.metadataOffset = -1;
this.controlledShutdownOffset = -1;
// AutoMQ inject start
this.lastControlledShutdownNs = 0;
// AutoMQ inject end
}

/**
Expand All @@ -122,6 +136,12 @@ boolean fenced() {
boolean shuttingDown() {
return controlledShutdownOffset >= 0;
}

// AutoMQ inject start
long lastControlledShutdownNs() {
return lastControlledShutdownNs;
}
// AutoMQ inject end
}

static class MetadataOffsetComparator implements Comparator<BrokerHeartbeatState> {
Expand Down Expand Up @@ -441,6 +461,9 @@ void maybeUpdateControlledShutdownOffset(int brokerId, long controlledShutDownOf
throw new RuntimeException("Fenced brokers cannot enter controlled shutdown.");
}
active.remove(broker);
// AutoMQ inject start
broker.lastControlledShutdownNs = time.nanoseconds();
// AutoMQ inject end
if (broker.controlledShutdownOffset < 0) {
broker.controlledShutdownOffset = controlledShutDownOffset;
log.debug("Updated the controlled shutdown offset for broker {} to {}.",
Expand Down Expand Up @@ -489,6 +512,24 @@ Iterator<UsableBroker> usableBrokers(
}

// AutoMQ inject start
public NodeState brokerState(int brokerId, long shutdownTimeoutNs) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker == null) {
return NodeState.UNKNOWN;
}
if (broker.shuttingDown()) {
return NodeState.CONTROLLED_SHUTDOWN;
}
if (broker.fenced()) {
if (broker.lastControlledShutdownNs() + shutdownTimeoutNs > time.nanoseconds()) {
// The broker is still in controlled shutdown.
return NodeState.CONTROLLED_SHUTDOWN;
}
return NodeState.FENCED;
}
return NodeState.ACTIVE;
}

long nextCheckTimeNs() {
if (overloadCircuitBreaker.isOverload()) {
return Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,5 +1003,9 @@ public List<BrokerRegistration> getActiveBrokers() {
.filter(b -> isActive(b.id()))
.collect(Collectors.toList());
}

public BrokerHeartbeatManager getHeartbeatManager() {
return heartbeatManager;
}
// AutoMQ inject end
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@

package org.apache.kafka.controller.stream;

import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.metadata.BrokerRegistration;

import java.util.concurrent.TimeUnit;

public class DefaultNodeRuntimeInfoGetter implements NodeRuntimeInfoGetter {
private static final long SHUTDOWN_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(60);

private final ClusterControlManager clusterControlManager;
private final StreamControlManager streamControlManager;

Expand All @@ -25,17 +29,12 @@ public DefaultNodeRuntimeInfoGetter(ClusterControlManager clusterControlManager,

@Override
public NodeState state(int nodeId) {
BrokerRegistration brokerRegistration = clusterControlManager.registration(nodeId);
if (brokerRegistration == null) {
BrokerHeartbeatManager brokerHeartbeatManager = clusterControlManager.getHeartbeatManager();
if (null == brokerHeartbeatManager) {
// This controller is not the active controller, so we don't have the heartbeat manager.
return NodeState.UNKNOWN;
}
if (brokerRegistration.fenced()) {
return NodeState.FENCED;
}
if (brokerRegistration.inControlledShutdown()) {
return NodeState.CONTROLLED_SHUTDOWN;
}
return NodeState.ACTIVE;
return brokerHeartbeatManager.brokerState(nodeId, SHUTDOWN_TIMEOUT_NS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@

package org.apache.kafka.controller.stream;

import org.apache.kafka.controller.BrokerControlState;

public enum NodeState {
ACTIVE, FENCED, CONTROLLED_SHUTDOWN, UNKNOWN
/**
* The node is active and can handle requests.
*/
ACTIVE,
/**
* The node is shut down and cannot handle requests.
*/
FENCED,
/**
* The node is shutting down in a controlled manner.
* Note: In AutoMQ, this state is different from {@link BrokerControlState#CONTROLLED_SHUTDOWN}. In some cases,
* a node in {@link BrokerControlState#FENCED} state may still be shutting down in a controlled manner.
*/
CONTROLLED_SHUTDOWN,
/**
* The state of the node is unknown, possibly because it has not yet registered.
*/
UNKNOWN
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateIterator;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateList;
import org.apache.kafka.controller.BrokerHeartbeatManager.UsableBrokerIterator;
import org.apache.kafka.controller.stream.NodeState;
import org.apache.kafka.metadata.placement.UsableBroker;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -362,4 +363,43 @@ public void testTouchThrowsExceptionUnlessRegistered() {
assertThrows(IllegalStateException.class,
() -> manager.touch(4, false, 0)).getMessage());
}

// AutoMQ inject start
@Test
public void testBrokerState() {
final long shutdownTimeoutNs = 10_000_000; // 10ms
// init
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
manager.time().sleep(1000);
manager.register(0, true);

// FENCED Broker
assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs));

// UNFENCED Broker
manager.touch(0, false, 100);
assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs));

// CONTROLLED_SHUTDOWN Broker
manager.maybeUpdateControlledShutdownOffset(0, 100);
assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs));

// SHUTDOWN_NOW Broker within shutdownTimeoutNs
manager.touch(0, true, 100);
manager.time().sleep(5);
assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs));

// SHUTDOWN_NOW Broker after shutdownTimeoutNs
manager.time().sleep(6);
assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs));

// UNFENCED Broker after SHUTDOWN
manager.touch(0, false, 100);
assertEquals(NodeState.ACTIVE, manager.brokerState(0, shutdownTimeoutNs));

// UNREGISTERED Broker
manager.remove(0);
assertEquals(NodeState.UNKNOWN, manager.brokerState(0, shutdownTimeoutNs));
}
// AutoMQ inject end
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void testRegister() {
assertEquals(0, nodes.get(0).nodeId());
assertEquals(2L, nodes.get(0).nodeEpoch());
assertEquals("wal2", nodes.get(0).walConfig());
assertEquals(NodeState.FENCED.name(), nodes.get(0).state());
}

AutomqRegisterNodeRequestData.TagCollection tags(Map<String, String> tags) {
Expand Down

0 comments on commit 6b47359

Please sign in to comment.