Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ public enum RebalanceStatus {
REBALANCING(1),
FAILED(2),
COMPLETED(3),
CANCELED(4);
CANCELED(4),
TIMEOUT(5);

public static final Set<RebalanceStatus> FINAL_STATUSES =
new HashSet<>(Arrays.asList(COMPLETED, CANCELED, FAILED));
new HashSet<>(Arrays.asList(COMPLETED, CANCELED, FAILED, TIMEOUT));

private final int code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent;
import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
Expand Down Expand Up @@ -122,6 +123,7 @@
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
import org.apache.fluss.utils.AutoPartitionStrategy;
import org.apache.fluss.utils.clock.SystemClock;
import org.apache.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
Expand Down Expand Up @@ -249,7 +251,9 @@ public CoordinatorEventProcessor(
this.lakeTableTieringManager = lakeTableTieringManager;
this.coordinatorMetricGroup = coordinatorMetricGroup;
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
this.rebalanceManager = new RebalanceManager(this, zooKeeperClient);
this.rebalanceManager =
new RebalanceManager(
this, zooKeeperClient, coordinatorEventManager, SystemClock.getInstance());
this.ioExecutor = ioExecutor;
this.lakeTableHelper =
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
Expand Down Expand Up @@ -302,6 +306,7 @@ public void startup() {

// start rebalance manager.
rebalanceManager.startup();
rebalanceManager.start();
}

public void shutdown() {
Expand Down Expand Up @@ -645,6 +650,13 @@ public void process(CoordinatorEvent event) {
completeFromCallable(
cancelRebalanceEvent.getRespCallback(),
() -> processCancelRebalance(cancelRebalanceEvent));
} else if (event instanceof RebalanceTaskTimeoutEvent) {
RebalanceTaskTimeoutEvent timeoutEvent = (RebalanceTaskTimeoutEvent) event;
LOG.warn(
"Rebalance task for {} timed out. Treating as timeout.",
timeoutEvent.getTableBucket());
rebalanceManager.finishRebalanceTask(
timeoutEvent.getTableBucket(), RebalanceStatus.TIMEOUT);
} else if (event instanceof ListRebalanceProgressEvent) {
ListRebalanceProgressEvent listRebalanceProgressEvent =
(ListRebalanceProgressEvent) event;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.coordinator.event;

import org.apache.fluss.metadata.TableBucket;

/** An event fired when a rebalance task exceeds the timeout without completing. */
public class RebalanceTaskTimeoutEvent implements CoordinatorEvent {

private final TableBucket tableBucket;

public RebalanceTaskTimeoutEvent(TableBucket tableBucket) {
this.tableBucket = tableBucket;
}

public TableBucket getTableBucket() {
return tableBucket;
}

@Override
public String toString() {
return "RebalanceTaskTimeoutEvent{tableBucket=" + tableBucket + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.coordinator.CoordinatorContext;
import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
import org.apache.fluss.server.coordinator.event.EventManager;
import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent;
import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer;
import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
Expand All @@ -36,6 +38,9 @@
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.server.zk.data.RebalanceTask;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.SystemClock;
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,6 +58,9 @@
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
Expand All @@ -70,8 +78,17 @@
public class RebalanceManager {
private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class);

/** Hardcoded timeout for an in-flight rebalance task: 2 minutes. */
private static final long REBALANCE_TASK_TIMEOUT_MS = 2 * 60 * 1000L;
Comment thread
swuferhong marked this conversation as resolved.

/** Hardcoded interval for the periodic timeout check: 30 seconds. */
private static final long TIMEOUT_CHECK_INTERVAL_MS = 30 * 1000L;

private final ZooKeeperClient zkClient;
private final CoordinatorEventProcessor eventProcessor;
private final EventManager eventManager;
private final Clock clock;
private final ScheduledExecutorService timeoutChecker;

/** A queue of in progress table bucket to rebalance. */
private final Queue<TableBucket> inProgressRebalanceTasksQueue = new ArrayDeque<>();
Expand All @@ -90,9 +107,46 @@ public class RebalanceManager {
private volatile @Nullable String currentRebalanceId;
private volatile boolean isClosed = false;

public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) {
/**
* Timestamp when the current in-flight task was started, or -1 if idle.
*
* <p>Write ordering contract (volatile publication idiom): always write {@code
* inflightTaskStartMs} BEFORE {@code inflightTaskBucket} when setting, and clear {@code
* inflightTaskBucket} BEFORE {@code inflightTaskStartMs} when resetting. The timeout checker
* reads in reverse order (bucket first, then startMs), ensuring it never observes a stale
* startMs paired with a new bucket.
*/
private volatile long inflightTaskStartMs = -1;

/** The bucket of the current in-flight task, or null if idle. Acts as the "gate" variable. */
private volatile @Nullable TableBucket inflightTaskBucket;

public RebalanceManager(
CoordinatorEventProcessor eventProcessor,
ZooKeeperClient zkClient,
EventManager eventManager,
Clock clock) {
this(
eventProcessor,
zkClient,
eventManager,
clock,
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("rebalance-timeout")));
}

@VisibleForTesting
RebalanceManager(
CoordinatorEventProcessor eventProcessor,
ZooKeeperClient zkClient,
EventManager eventManager,
Clock clock,
ScheduledExecutorService timeoutChecker) {
this.eventProcessor = eventProcessor;
this.zkClient = zkClient;
this.eventManager = eventManager;
this.clock = clock == null ? SystemClock.getInstance() : clock;
this.timeoutChecker = timeoutChecker;
this.goalOptimizer = new GoalOptimizer();
}

Expand All @@ -101,6 +155,19 @@ public void startup() {
initialize();
}

/** Starts the periodic timeout checker. Call after {@link #startup()}. */
public void start() {
timeoutChecker.scheduleWithFixedDelay(
this::checkTimeoutSafely,
TIMEOUT_CHECK_INTERVAL_MS,
TIMEOUT_CHECK_INTERVAL_MS,
TimeUnit.MILLISECONDS);
LOG.info(
"RebalanceManager timeout checker started: timeoutMs={}, checkIntervalMs={}",
REBALANCE_TASK_TIMEOUT_MS,
TIMEOUT_CHECK_INTERVAL_MS);
}

public @Nullable String getRebalanceId() {
return currentRebalanceId;
}
Expand Down Expand Up @@ -132,6 +199,9 @@ public void registerRebalance(
inProgressRebalanceTasks.clear();
inProgressRebalanceTasksQueue.clear();
finishedRebalanceTasks.clear();
// Clear gate (bucket) first, then data (startMs).
inflightTaskBucket = null;
inflightTaskStartMs = -1;

currentRebalanceId = rebalanceId;
if (rebalancePlan.isEmpty()) {
Expand Down Expand Up @@ -170,6 +240,9 @@ public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusF
finishedRebalanceTasks.put(
tableBucket,
RebalanceResultForBucket.of(resultForBucket.plan(), statusForBucket));
// Clear gate (bucket) first, then data (startMs).
inflightTaskBucket = null;
inflightTaskStartMs = -1;
LOG.info(
"Rebalance task {} in progress: {} tasks pending, {} completed.",
currentRebalanceId,
Expand Down Expand Up @@ -250,6 +323,9 @@ public void cancelRebalance(@Nullable String rebalanceId) {
rebalanceStatus = CANCELED;
inProgressRebalanceTasksQueue.clear();
inProgressRebalanceTasks.clear();
// Clear gate (bucket) first, then data (startMs).
inflightTaskBucket = null;
inflightTaskStartMs = -1;
// Here, it will not clear finishedRebalanceTasks, because it will be used by
// listRebalanceProgress. It will be cleared when next register.

Expand Down Expand Up @@ -302,6 +378,9 @@ public RebalanceTask generateRebalanceTask(List<Goal> goalsByPriority) {
private void processNewRebalanceTask() {
TableBucket tableBucket = inProgressRebalanceTasksQueue.peek();
if (tableBucket != null && inProgressRebalanceTasks.containsKey(tableBucket)) {
// Write data (startMs) first, then publish gate (bucket).
inflightTaskStartMs = clock.milliseconds();
Copy link
Copy Markdown
Contributor

@LiebingYu LiebingYu Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: inflightTaskBucket and inflightTaskStartMs rely on a fragile implicit write-ordering invariant

Currently this is safe because every clear path writes inflightTaskStartMs = -1 before clearing inflightTaskBucket, and processNewRebalanceTask() writes inflightTaskBucket before inflightTaskStartMs = now(). Since checkTimeout() reads inflightTaskStartMs first and returns immediately if it is -1, no dangerous interleaving can occur today.

However, this correctness depends on the write order being maintained across four separate call sites, with no compile-time enforcement. If a future change swaps the write order in any of these paths (e.g., writes inflightTaskStartMs before inflightTaskBucket in processNewRebalanceTask), the timeout thread could read a stale bucket or a stale startMs and fire a spurious timeout for the wrong task.

Consider combining both fields into a single immutable holder behind an AtomicReference to make the invariant structurally unbreakable rather than relying on write ordering:

private final AtomicReference<InflightTask> inflightTask = new AtomicReference<>();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the ordering fragility. Fixed by applying the volatile publication idiom — inflightTaskBucket now acts as the "gate" variable: written last when setting, cleared first when resetting. The timeout checker reads it first, so JMM happens-before guarantees it always sees a consistent (bucket, startMs) pair. Added comments documenting the contract.

inflightTaskBucket = tableBucket;
RebalanceResultForBucket resultForBucket = inProgressRebalanceTasks.get(tableBucket);
RebalanceResultForBucket rebalanceResultForBucket =
RebalanceResultForBucket.of(resultForBucket.plan(), REBALANCING);
Expand Down Expand Up @@ -400,12 +479,46 @@ private ClusterModel initialClusterModel(Map<Integer, ServerModel> serverModelMa
return new ClusterModel(servers);
}

private void checkTimeoutSafely() {
try {
checkTimeout();
} catch (Throwable t) {
LOG.error("Unexpected error in RebalanceManager timeout check.", t);
}
}

@VisibleForTesting
void checkTimeout() {
// Read gate (bucket) first, then data (startMs).
// If bucket is non-null, happens-before guarantees startMs is at least as
// fresh as the value written before bucket was published.
TableBucket bucket = inflightTaskBucket;
long startMs = inflightTaskStartMs;
if (bucket == null || startMs < 0) {
return;
}
long elapsed = clock.milliseconds() - startMs;
if (elapsed > REBALANCE_TASK_TIMEOUT_MS) {
LOG.warn(
"In-flight rebalance task for {} timed out after {}ms. "
+ "Treating it as timed out and advancing to the next task.",
bucket,
elapsed);
// Clear gate (bucket) first, then data (startMs), matching the
// publication idiom so the next checkTimeout sees bucket==null.
inflightTaskBucket = null;
inflightTaskStartMs = -1;
eventManager.put(new RebalanceTaskTimeoutEvent(bucket));
}
}

private void checkNotClosed() {
checkArgument(!isClosed, "RebalanceManager is already closed.");
}

public void close() {
isClosed = true;
timeoutChecker.shutdownNow();
}

@VisibleForTesting
Expand Down
Loading