-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Simulate impact of shard movement using shard-level write load #131406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0ebd75d
e589db4
beb2611
4e0fd1d
3f90889
0b1d4a2
9527720
9e36975
9ca9b4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.routing; | ||
|
||
import com.carrotsearch.hppc.ObjectDoubleHashMap; | ||
import com.carrotsearch.hppc.ObjectDoubleMap; | ||
|
||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
import org.elasticsearch.common.util.Maps; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public class WriteLoadPerShardSimulator { | ||
|
||
private final ObjectDoubleMap<String> simulatedWriteLoadDeltas; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a performance gain over Map<String, Double>? I'm wondering why use it, essentially. |
||
private final RoutingAllocation routingAllocation; | ||
private final Map<ShardId, Double> writeLoadsPerShard; | ||
|
||
public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) { | ||
this.routingAllocation = routingAllocation; | ||
this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>(); | ||
writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); | ||
} | ||
|
||
public void simulateShardStarted(ShardRouting shardRouting) { | ||
final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); | ||
if (writeLoadForShard != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you test the case where a shard write load is 0/null? Like would be reported for a non-data stream index shard. |
||
if (shardRouting.relocatingNodeId() != null) { | ||
// relocating | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ClusterInfoSimulator doesn't explain the to and from nodes, but could you add something here? relocatingNodeId is non-obvious. |
||
simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); | ||
simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some testing to This looks right, but it took a little digging for me to realize that we're tracking free space in DiskUsage and that flipping adding/subtracting here makes sense. So having a bit of testing in I don't feel too strongly about this, if you disagree. |
||
} else { | ||
// not sure how this would come about, perhaps when allocating a replica after a delay? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to explain your reasoning here in the comment. If I understand correctly, you're wondering how it would be possible to have a write load estimate for a new index? I haven't verified, but I expect data stream rollover to create a new index with an estimated write load calculated from the previous index in the data stream. But the write load estimate we're feeding into this simulation is currently the peak write load estimate, right? I think we'll want to be able to make an estimate for a new index, eventually. Could you file a ticket to keep track of that work, and throw it into Milestone 3 (https://elasticco.atlassian.net/browse/ES-11977) epic? I'm currently thinking of that milestone as a bucket for follow up optimizations. |
||
simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); | ||
} | ||
} | ||
} | ||
|
||
public Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this method be renamed to convey the node stats are simulated? In the testing you use |
||
return routingAllocation.clusterInfo() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding of streams is that Armin identified a while ago (a tech talk) that they perform poorly compared to for/while loops. Since this code will be run frequently, can we use some kind of loop instead of a stream? |
||
.getNodeUsageStatsForThreadPools() | ||
.entrySet() | ||
.stream() | ||
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the So, if I am understanding the code correctly, the diffs in this write load simulator will continue to be added to, never reset, but will be applied on top of a ClusterInfo that keeps getting updated by the diffs. This doesn't seem like what we want? The ClusterInfoSimulator never looks at the RoutingAllocation's ClusterInfo again after initializing the ClusterInfoSimulator's private variables as a starting point.. |
||
if (simulatedWriteLoadDeltas.containsKey(e.getKey())) { | ||
return new NodeUsageStatsForThreadPools( | ||
e.getKey(), | ||
Maps.copyMapWithAddedOrReplacedEntry( | ||
e.getValue().threadPoolUsageStatsMap(), | ||
"write", | ||
replaceWritePoolStats(e.getValue(), simulatedWriteLoadDeltas.get(e.getKey())) | ||
) | ||
); | ||
} | ||
return e.getValue(); | ||
})); | ||
} | ||
|
||
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( | ||
NodeUsageStatsForThreadPools value, | ||
double writeLoadDelta | ||
) { | ||
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() | ||
.get(ThreadPool.Names.WRITE); | ||
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||
writeThreadPoolStats.totalThreadPoolThreads(), | ||
(float) (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats | ||
.totalThreadPoolThreads())), | ||
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,204 @@ | ||||||
/* | ||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||||||
* or more contributor license agreements. Licensed under the "Elastic License | ||||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||||||
* Public License v 1"; you may not use this file except in compliance with, at | ||||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||||||
* License v3.0 only", or the "Server Side Public License, v 1". | ||||||
*/ | ||||||
|
||||||
package org.elasticsearch.cluster.routing; | ||||||
|
||||||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; | ||||||
import org.elasticsearch.cluster.ClusterInfo; | ||||||
import org.elasticsearch.cluster.ClusterState; | ||||||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||||||
import org.elasticsearch.cluster.metadata.ProjectId; | ||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; | ||||||
import org.elasticsearch.index.shard.ShardId; | ||||||
import org.elasticsearch.snapshots.SnapshotShardSizeInfo; | ||||||
import org.elasticsearch.test.ESTestCase; | ||||||
import org.hamcrest.Matchers; | ||||||
|
||||||
import java.util.Arrays; | ||||||
import java.util.HashMap; | ||||||
import java.util.HashSet; | ||||||
import java.util.List; | ||||||
import java.util.Map; | ||||||
import java.util.Set; | ||||||
import java.util.stream.Collectors; | ||||||
import java.util.stream.IntStream; | ||||||
import java.util.stream.StreamSupport; | ||||||
|
||||||
import static org.hamcrest.Matchers.equalTo; | ||||||
import static org.hamcrest.Matchers.greaterThan; | ||||||
import static org.hamcrest.Matchers.lessThan; | ||||||
import static org.hamcrest.Matchers.sameInstance; | ||||||
|
||||||
public class WriteLoadPerShardSimulatorTests extends ESTestCase { | ||||||
|
||||||
private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { | ||||||
}; | ||||||
public static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" }; | ||||||
|
||||||
/** | ||||||
* We should not adjust the values if there's no movement | ||||||
*/ | ||||||
public void testNoShardMovement() { | ||||||
final var originalNode0WriteLoadStats = randomUsageStats(); | ||||||
final var originalNode1WriteLoadStats = randomUsageStats(); | ||||||
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); | ||||||
|
||||||
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); | ||||||
final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); | ||||||
assertThat( | ||||||
calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"), | ||||||
sameInstance(originalNode0WriteLoadStats) | ||||||
); | ||||||
assertThat( | ||||||
calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"), | ||||||
sameInstance(originalNode1WriteLoadStats) | ||||||
); | ||||||
} | ||||||
|
||||||
public void testMovementOfAShardWillReduceThreadPoolUtilisation() { | ||||||
final var originalNode0WriteLoadStats = randomUsageStats(); | ||||||
final var originalNode1WriteLoadStats = randomUsageStats(); | ||||||
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); | ||||||
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); | ||||||
|
||||||
// Relocate a random shard from node_0 to node_1 | ||||||
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. log randomShard? For debug purposes, then we can match it with the ClusterInfo info I suggest logging in createRoutingAllocation. |
||||||
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", randomNonNegativeLong(), "testing", NOOP); | ||||||
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); | ||||||
|
||||||
final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); | ||||||
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); | ||||||
|
||||||
// Some node_0 utilization should have been moved to node_1 | ||||||
assertThat( | ||||||
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), | ||||||
lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization()) | ||||||
); | ||||||
assertThat( | ||||||
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), | ||||||
greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization()) | ||||||
); | ||||||
} | ||||||
|
||||||
public void testMovementFollowedByMovementBackWillNotChangeAnything() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is essentially identical to |
||||||
final var originalNode0WriteLoadStats = randomUsageStats(); | ||||||
final var originalNode1WriteLoadStats = randomUsageStats(); | ||||||
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); | ||||||
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); | ||||||
|
||||||
// Relocate a random shard from node_0 to node_1 | ||||||
final long expectedShardSize = randomNonNegativeLong(); | ||||||
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); | ||||||
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); | ||||||
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); | ||||||
final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); | ||||||
|
||||||
// Some node_0 utilization should have been moved to node_1 | ||||||
assertThat( | ||||||
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), | ||||||
lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization()) | ||||||
); | ||||||
assertThat( | ||||||
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), | ||||||
greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization()) | ||||||
); | ||||||
|
||||||
// Then move it back | ||||||
final var moveBackTuple = allocation.routingNodes() | ||||||
.relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP); | ||||||
writeLoadPerShardSimulator.simulateShardStarted(moveBackTuple.v2()); | ||||||
|
||||||
// The utilization numbers should be back to their original values | ||||||
assertThat( | ||||||
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), | ||||||
equalTo(originalNode0WriteLoadStats.averageThreadPoolUtilization()) | ||||||
); | ||||||
assertThat( | ||||||
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), | ||||||
equalTo(originalNode1WriteLoadStats.averageThreadPoolUtilization()) | ||||||
); | ||||||
} | ||||||
|
||||||
public void testMovementBetweenNodesWithNoThreadPoolStats() { | ||||||
final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null; | ||||||
final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null; | ||||||
final var allocation = createRoutingAllocation( | ||||||
originalNode0WriteLoadStats, | ||||||
originalNode1WriteLoadStats, | ||||||
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))) | ||||||
); | ||||||
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); | ||||||
|
||||||
// Relocate a random shard from node_0 to node_1 | ||||||
final long expectedShardSize = randomNonNegativeLong(); | ||||||
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); | ||||||
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); | ||||||
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); | ||||||
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); | ||||||
|
||||||
final var generated = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); | ||||||
assertThat(generated.containsKey("node_0"), equalTo(originalNode0WriteLoadStats != null)); | ||||||
assertThat(generated.containsKey("node_1"), equalTo(originalNode1WriteLoadStats != null)); | ||||||
} | ||||||
|
||||||
private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoadPerShardSimulator, String nodeId) { | ||||||
final var generatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); | ||||||
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write"); | ||||||
return node0WritePoolStats.averageThreadPoolUtilization(); | ||||||
} | ||||||
|
||||||
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() { | ||||||
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||||||
randomIntBetween(4, 16), | ||||||
randomFloatBetween(0.1f, 1.0f, true), | ||||||
randomLongBetween(0, 60_000) | ||||||
); | ||||||
} | ||||||
|
||||||
private RoutingAllocation createRoutingAllocation( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we name this |
||||||
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0WriteLoadStats, | ||||||
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats, | ||||||
Set<String> indicesWithNoWriteLoad | ||||||
) { | ||||||
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>(); | ||||||
if (node0WriteLoadStats != null) { | ||||||
nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0WriteLoadStats))); | ||||||
} | ||||||
if (node1WriteLoadStats != null) { | ||||||
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1WriteLoadStats))); | ||||||
} | ||||||
|
||||||
final ClusterState clusterState = createClusterState(); | ||||||
final ClusterInfo clusterInfo = ClusterInfo.builder() | ||||||
.nodeUsageStatsForThreadPools(nodeUsageStats) | ||||||
.shardWriteLoads( | ||||||
clusterState.metadata() | ||||||
.getProject(ProjectId.DEFAULT) | ||||||
.stream() | ||||||
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false) | ||||||
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum))) | ||||||
.collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> randomDoubleBetween(0.1, 5.0, true))) | ||||||
) | ||||||
.build(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you log the ClusterInfo to a string? There isn't any debug information to look at if any of the tests fail (I think?), and some logging of the values might help. |
||||||
|
||||||
return new RoutingAllocation( | ||||||
new AllocationDeciders(List.of()), | ||||||
clusterState, | ||||||
clusterInfo, | ||||||
SnapshotShardSizeInfo.EMPTY, | ||||||
System.nanoTime() | ||||||
).mutableCloneForSimulation(); | ||||||
} | ||||||
|
||||||
private ClusterState createClusterState() { | ||||||
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0); | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment explaining the purpose/use of the class, please.