-
Notifications
You must be signed in to change notification settings - Fork 557
[server] Add per-task timeout for RebalanceManager to prevent indefinite blocking #3429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.server.coordinator.event; | ||
|
|
||
| import org.apache.fluss.metadata.TableBucket; | ||
|
|
||
| /** An event fired when a rebalance task exceeds the timeout without completing. */ | ||
| public class RebalanceTaskTimeoutEvent implements CoordinatorEvent { | ||
|
|
||
| private final TableBucket tableBucket; | ||
|
|
||
| public RebalanceTaskTimeoutEvent(TableBucket tableBucket) { | ||
| this.tableBucket = tableBucket; | ||
| } | ||
|
|
||
| public TableBucket getTableBucket() { | ||
| return tableBucket; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "RebalanceTaskTimeoutEvent{tableBucket=" + tableBucket + "}"; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,8 @@ | |
| import org.apache.fluss.metadata.TableBucket; | ||
| import org.apache.fluss.server.coordinator.CoordinatorContext; | ||
| import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; | ||
| import org.apache.fluss.server.coordinator.event.EventManager; | ||
| import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent; | ||
| import org.apache.fluss.server.coordinator.rebalance.goal.Goal; | ||
| import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; | ||
| import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; | ||
|
|
@@ -36,6 +38,9 @@ | |
| import org.apache.fluss.server.zk.ZooKeeperClient; | ||
| import org.apache.fluss.server.zk.data.LeaderAndIsr; | ||
| import org.apache.fluss.server.zk.data.RebalanceTask; | ||
| import org.apache.fluss.utils.clock.Clock; | ||
| import org.apache.fluss.utils.clock.SystemClock; | ||
| import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -53,6 +58,9 @@ | |
| import java.util.TreeSet; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; | ||
| import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; | ||
|
|
@@ -70,8 +78,17 @@ | |
| public class RebalanceManager { | ||
| private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); | ||
|
|
||
| /** Hardcoded timeout for an in-flight rebalance task: 2 minutes. */ | ||
| private static final long REBALANCE_TASK_TIMEOUT_MS = 2 * 60 * 1000L; | ||
|
|
||
| /** Hardcoded interval for the periodic timeout check: 30 seconds. */ | ||
| private static final long TIMEOUT_CHECK_INTERVAL_MS = 30 * 1000L; | ||
|
|
||
| private final ZooKeeperClient zkClient; | ||
| private final CoordinatorEventProcessor eventProcessor; | ||
| private final EventManager eventManager; | ||
| private final Clock clock; | ||
| private final ScheduledExecutorService timeoutChecker; | ||
|
|
||
| /** A queue of in progress table bucket to rebalance. */ | ||
| private final Queue<TableBucket> inProgressRebalanceTasksQueue = new ArrayDeque<>(); | ||
|
|
@@ -90,9 +107,46 @@ public class RebalanceManager { | |
| private volatile @Nullable String currentRebalanceId; | ||
| private volatile boolean isClosed = false; | ||
|
|
||
| public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) { | ||
| /** | ||
| * Timestamp when the current in-flight task was started, or -1 if idle. | ||
| * | ||
| * <p>Write ordering contract (volatile publication idiom): always write {@code | ||
| * inflightTaskStartMs} BEFORE {@code inflightTaskBucket} when setting, and clear {@code | ||
| * inflightTaskBucket} BEFORE {@code inflightTaskStartMs} when resetting. The timeout checker | ||
| * reads in reverse order (bucket first, then startMs), ensuring it never observes a stale | ||
| * startMs paired with a new bucket. | ||
| */ | ||
| private volatile long inflightTaskStartMs = -1; | ||
|
|
||
| /** The bucket of the current in-flight task, or null if idle. Acts as the "gate" variable. */ | ||
| private volatile @Nullable TableBucket inflightTaskBucket; | ||
|
|
||
| public RebalanceManager( | ||
| CoordinatorEventProcessor eventProcessor, | ||
| ZooKeeperClient zkClient, | ||
| EventManager eventManager, | ||
| Clock clock) { | ||
| this( | ||
| eventProcessor, | ||
| zkClient, | ||
| eventManager, | ||
| clock, | ||
| Executors.newScheduledThreadPool( | ||
| 1, new ExecutorThreadFactory("rebalance-timeout"))); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| RebalanceManager( | ||
| CoordinatorEventProcessor eventProcessor, | ||
| ZooKeeperClient zkClient, | ||
| EventManager eventManager, | ||
| Clock clock, | ||
| ScheduledExecutorService timeoutChecker) { | ||
| this.eventProcessor = eventProcessor; | ||
| this.zkClient = zkClient; | ||
| this.eventManager = eventManager; | ||
| this.clock = clock == null ? SystemClock.getInstance() : clock; | ||
| this.timeoutChecker = timeoutChecker; | ||
| this.goalOptimizer = new GoalOptimizer(); | ||
| } | ||
|
|
||
|
|
@@ -101,6 +155,19 @@ public void startup() { | |
| initialize(); | ||
| } | ||
|
|
||
| /** Starts the periodic timeout checker. Call after {@link #startup()}. */ | ||
| public void start() { | ||
| timeoutChecker.scheduleWithFixedDelay( | ||
| this::checkTimeoutSafely, | ||
| TIMEOUT_CHECK_INTERVAL_MS, | ||
| TIMEOUT_CHECK_INTERVAL_MS, | ||
| TimeUnit.MILLISECONDS); | ||
| LOG.info( | ||
| "RebalanceManager timeout checker started: timeoutMs={}, checkIntervalMs={}", | ||
| REBALANCE_TASK_TIMEOUT_MS, | ||
| TIMEOUT_CHECK_INTERVAL_MS); | ||
| } | ||
|
|
||
| public @Nullable String getRebalanceId() { | ||
| return currentRebalanceId; | ||
| } | ||
|
|
@@ -132,6 +199,9 @@ public void registerRebalance( | |
| inProgressRebalanceTasks.clear(); | ||
| inProgressRebalanceTasksQueue.clear(); | ||
| finishedRebalanceTasks.clear(); | ||
| // Clear gate (bucket) first, then data (startMs). | ||
| inflightTaskBucket = null; | ||
| inflightTaskStartMs = -1; | ||
|
|
||
| currentRebalanceId = rebalanceId; | ||
| if (rebalancePlan.isEmpty()) { | ||
|
|
@@ -170,6 +240,9 @@ public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusF | |
| finishedRebalanceTasks.put( | ||
| tableBucket, | ||
| RebalanceResultForBucket.of(resultForBucket.plan(), statusForBucket)); | ||
| // Clear gate (bucket) first, then data (startMs). | ||
| inflightTaskBucket = null; | ||
| inflightTaskStartMs = -1; | ||
| LOG.info( | ||
| "Rebalance task {} in progress: {} tasks pending, {} completed.", | ||
| currentRebalanceId, | ||
|
|
@@ -250,6 +323,9 @@ public void cancelRebalance(@Nullable String rebalanceId) { | |
| rebalanceStatus = CANCELED; | ||
| inProgressRebalanceTasksQueue.clear(); | ||
| inProgressRebalanceTasks.clear(); | ||
| // Clear gate (bucket) first, then data (startMs). | ||
| inflightTaskBucket = null; | ||
| inflightTaskStartMs = -1; | ||
| // Here, it will not clear finishedRebalanceTasks, because it will be used by | ||
| // listRebalanceProgress. It will be cleared when next register. | ||
|
|
||
|
|
@@ -302,6 +378,9 @@ public RebalanceTask generateRebalanceTask(List<Goal> goalsByPriority) { | |
| private void processNewRebalanceTask() { | ||
| TableBucket tableBucket = inProgressRebalanceTasksQueue.peek(); | ||
| if (tableBucket != null && inProgressRebalanceTasks.containsKey(tableBucket)) { | ||
| // Write data (startMs) first, then publish gate (bucket). | ||
| inflightTaskStartMs = clock.milliseconds(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: inflightTaskBucket and inflightTaskStartMs rely on a fragile implicit write-ordering invariant Currently this is safe because every clear path writes inflightTaskStartMs = -1 before clearing inflightTaskBucket, and processNewRebalanceTask() writes inflightTaskBucket before inflightTaskStartMs = now(). Since checkTimeout() reads inflightTaskStartMs first and returns immediately if it is -1, no dangerous interleaving can occur today. However, this correctness depends on the write order being maintained across four separate call sites, with no compile-time enforcement. If a future change swaps the write order in any of these paths (e.g., writes inflightTaskStartMs before inflightTaskBucket in processNewRebalanceTask), the timeout thread could read a stale bucket or a stale startMs and fire a spurious timeout for the wrong task. Consider combining both fields into a single immutable holder behind an AtomicReference to make the invariant structurally unbreakable rather than relying on write ordering:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch on the ordering fragility. Fixed by applying the volatile publication idiom — |
||
| inflightTaskBucket = tableBucket; | ||
| RebalanceResultForBucket resultForBucket = inProgressRebalanceTasks.get(tableBucket); | ||
| RebalanceResultForBucket rebalanceResultForBucket = | ||
| RebalanceResultForBucket.of(resultForBucket.plan(), REBALANCING); | ||
|
|
@@ -400,12 +479,46 @@ private ClusterModel initialClusterModel(Map<Integer, ServerModel> serverModelMa | |
| return new ClusterModel(servers); | ||
| } | ||
|
|
||
| private void checkTimeoutSafely() { | ||
| try { | ||
| checkTimeout(); | ||
| } catch (Throwable t) { | ||
| LOG.error("Unexpected error in RebalanceManager timeout check.", t); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| void checkTimeout() { | ||
| // Read gate (bucket) first, then data (startMs). | ||
| // If bucket is non-null, happens-before guarantees startMs is at least as | ||
| // fresh as the value written before bucket was published. | ||
| TableBucket bucket = inflightTaskBucket; | ||
| long startMs = inflightTaskStartMs; | ||
| if (bucket == null || startMs < 0) { | ||
| return; | ||
| } | ||
| long elapsed = clock.milliseconds() - startMs; | ||
| if (elapsed > REBALANCE_TASK_TIMEOUT_MS) { | ||
| LOG.warn( | ||
| "In-flight rebalance task for {} timed out after {}ms. " | ||
| + "Treating it as timed out and advancing to the next task.", | ||
| bucket, | ||
| elapsed); | ||
| // Clear gate (bucket) first, then data (startMs), matching the | ||
| // publication idiom so the next checkTimeout sees bucket==null. | ||
| inflightTaskBucket = null; | ||
| inflightTaskStartMs = -1; | ||
| eventManager.put(new RebalanceTaskTimeoutEvent(bucket)); | ||
| } | ||
| } | ||
|
|
||
| private void checkNotClosed() { | ||
| checkArgument(!isClosed, "RebalanceManager is already closed."); | ||
| } | ||
|
|
||
| public void close() { | ||
| isClosed = true; | ||
| timeoutChecker.shutdownNow(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.