Skip to content

Simulate impact of shard movement using shard-level write load #131406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0ebd75d
Estimate impact of shard movement using node-level write load
nicktindall Jul 17, 2025
e589db4
Naming
nicktindall Jul 17, 2025
beb2611
More randomness
nicktindall Jul 17, 2025
4e0fd1d
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall Jul 17, 2025
3f90889
Pedantry
nicktindall Jul 17, 2025
0b1d4a2
Naming
nicktindall Jul 17, 2025
9527720
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall Jul 17, 2025
9e36975
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 21, 2025
9ca9b4b
Use shard write loads instead of estimating
nicktindall Jul 21, 2025
988ac3c
Add javadoc to WriteLoadPerShardSimulator
nicktindall Jul 24, 2025
f5ed735
Explain simulateShardStarted better for the new shard case
nicktindall Jul 24, 2025
8501b37
Assert on scale of utilisation change
nicktindall Jul 24, 2025
8c21cc0
Improve description of relocation
nicktindall Jul 24, 2025
519d1dd
Typo
nicktindall Jul 24, 2025
58e84a2
Rename test to indicate it also tests missing write loads
nicktindall Jul 24, 2025
faccc3d
Always simulate based on original write loads and thread pool stats
nicktindall Jul 24, 2025
edc259a
Use for-loop instead of stream
nicktindall Jul 24, 2025
f60029f
Consolidate similar tests
nicktindall Jul 24, 2025
94687e3
Naming/description of nodeUsageStatsForThreadPools
nicktindall Jul 24, 2025
466a7e0
Naming of test utility methods
nicktindall Jul 24, 2025
827f637
WriteLoadPerShardSimulator -> ShardMovementWriteLoadSimulator
nicktindall Jul 24, 2025
1c876e7
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall Jul 24, 2025
a072aaf
Increase likelihood of write loads and utilizations being 0, floor ut…
nicktindall Jul 24, 2025
6ae4aa4
Pedantry
nicktindall Jul 24, 2025
1dbee7f
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 24, 2025
a4d89b9
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
Expand All @@ -34,7 +35,7 @@ public class ClusterInfoSimulator {
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<NodeAndShard, String> dataPath;
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator;

public ClusterInfoSimulator(RoutingAllocation allocation) {
this.allocation = allocation;
Expand All @@ -44,7 +45,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
}

/**
Expand Down Expand Up @@ -115,6 +116,7 @@ public void simulateShardStarted(ShardRouting shard) {
shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size);
}
}
shardMovementWriteLoadSimulator.simulateShardStarted(shard);
}

private void modifyDiskUsage(String nodeId, long freeDelta) {
Expand Down Expand Up @@ -159,7 +161,7 @@ public ClusterInfo getClusterInfo() {
dataPath,
Map.of(),
estimatedHeapUsages,
nodeThreadPoolUsageStats,
shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(),
allocation.clusterInfo().getShardWriteLoads()
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import com.carrotsearch.hppc.ObjectDoubleHashMap;
import com.carrotsearch.hppc.ObjectDoubleMap;

import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.Map;

/**
* Simulates the impact to each node's write-load in response to the movement of individual
* shards around the cluster.
*/
public class ShardMovementWriteLoadSimulator {

private final Map<String, NodeUsageStatsForThreadPools> originalNodeUsageStatsForThreadPools;
private final ObjectDoubleMap<String> simulatedWriteLoadDeltas;
private final Map<ShardId, Double> writeLoadsPerShard;

public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) {
this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools();
this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads();
this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>();
}

public void simulateShardStarted(ShardRouting shardRouting) {
final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
if (writeLoadForShard != null) {
if (shardRouting.relocatingNodeId() != null) {
// This is a shard being relocated
simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
} else {
// This is a new shard starting, it's unlikely we'll have a write-load value for a new
// shard, but we may be able to estimate if the new shard is created as part of a datastream
// rollover. See https://elasticco.atlassian.net/browse/ES-12469
simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
}
}
}

/**
* Get the node usage stats with the simulated shard movements applied
*/
public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThreadPools() {
final Map<String, NodeUsageStatsForThreadPools> adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize(
originalNodeUsageStatsForThreadPools.size()
);
for (Map.Entry<String, NodeUsageStatsForThreadPools> entry : originalNodeUsageStatsForThreadPools.entrySet()) {
if (simulatedWriteLoadDeltas.containsKey(entry.getKey())) {
var adjustedValue = new NodeUsageStatsForThreadPools(
entry.getKey(),
Maps.copyMapWithAddedOrReplacedEntry(
entry.getValue().threadPoolUsageStatsMap(),
"write",
replaceWritePoolStats(entry.getValue(), simulatedWriteLoadDeltas.get(entry.getKey()))
)
);
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue);
} else {
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue());
}
}
return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools);
}

private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
NodeUsageStatsForThreadPools value,
double writeLoadDelta
) {
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
writeThreadPoolStats.totalThreadPoolThreads(),
(float) Math.max(
(writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())),
0.0
),
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;

public class ShardMovementWriteLoadSimulatorTests extends ESTestCase {

private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
};
private static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" };

/**
* We should not adjust the values if there's no movement
*/
public void testNoShardMovement() {
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
originalNode0ThreadPoolStats,
originalNode1ThreadPoolStats,
Set.of()
);

final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
final var calculatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2));
assertThat(
calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"),
sameInstance(originalNode0ThreadPoolStats)
);
assertThat(
calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"),
sameInstance(originalNode1ThreadPoolStats)
);
}

public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
originalNode0ThreadPoolStats,
originalNode1ThreadPoolStats,
Set.of()
);
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);

// Relocate a random shard from node_0 to node_1
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
final var expectedShardSize = randomNonNegativeLong();
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);

final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2));

final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId());
final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads();
final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads();

// Some node_0 utilization should have been moved to node_1
if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) {
// We don't return utilization less than zero because that makes no sense
assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f));
} else {
assertThat(
(double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization(
shardMovementWriteLoadSimulator,
"node_0"
),
closeTo(expectedUtilisationReductionAtSource, 0.001f)
);
}
assertThat(
(double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats
.averageThreadPoolUtilization(),
closeTo(expectedUtilisationIncreaseAtDestination, 0.001f)
);

// Then move it back
final var moveBackTuple = allocation.routingNodes()
.relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP);
shardMovementWriteLoadSimulator.simulateShardStarted(moveBackTuple.v2());

// The utilization numbers should return to their original values
assertThat(
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"),
equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization())
);
assertThat(
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"),
equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization())
);
}

public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
originalNode0ThreadPoolStats,
originalNode1ThreadPoolStats,
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES)))
);
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);

// Relocate a random shard from node_0 to node_1
final var expectedShardSize = randomNonNegativeLong();
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);

final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null));
assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null));
}

private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) {
final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
return node0WritePoolStats.averageThreadPoolUtilization();
}

private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() {
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
randomIntBetween(4, 16),
randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true),
randomLongBetween(0, 60_000)
);
}

private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0ThreadPoolStats,
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1ThreadPoolStats,
Set<String> indicesWithNoWriteLoad
) {
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
if (node0ThreadPoolStats != null) {
nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0ThreadPoolStats)));
}
if (node1ThreadPoolStats != null) {
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1ThreadPoolStats)));
}

final ClusterState clusterState = createClusterState();
final ClusterInfo clusterInfo = ClusterInfo.builder()
.nodeUsageStatsForThreadPools(nodeUsageStats)
.shardWriteLoads(
clusterState.metadata()
.getProject(ProjectId.DEFAULT)
.stream()
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
.collect(
Collectors.toUnmodifiableMap(
shardId -> shardId,
shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true)
)
)
)
.build();

return new RoutingAllocation(
new AllocationDeciders(List.of()),
clusterState,
clusterInfo,
SnapshotShardSizeInfo.EMPTY,
System.nanoTime()
).mutableCloneForSimulation();
}

private ClusterState createClusterState() {
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0);
}
}