Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict Search Replicas to Allocate only to Search dedicated node #17457

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802))
- Improve performace of NumericTermAggregation by avoiding unnecessary sorting([#17252](https://github.com/opensearch-project/OpenSearch/pull/17252))
- [Rule Based Auto-tagging] Add in-memory attribute value store ([#17342](https://github.com/opensearch-project/OpenSearch/pull/17342))
- Restrict Search Replica Allocation to Search-Dedicated Nodes ([#17457](https://github.com/opensearch-project/OpenSearch/pull/17457))

### Dependencies
- Bump `org.awaitility:awaitility` from 4.2.0 to 4.2.2 ([#17230](https://github.com/opensearch-project/OpenSearch/pull/17230))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,7 @@ public void testSearchReplicaDedicatedIncludes() {
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
)
.execute()
.actionGet();
setSearchDedicatedNodeSettings(node_1 + "," + node_0);

createIndex(
"test",
Expand All @@ -65,12 +58,7 @@ public void testSearchReplicaDedicatedIncludes() {
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;

// set the included nodes to the other open node, search replica should relocate to that node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
.execute()
.actionGet();
setSearchDedicatedNodeSettings(emptyAllowedNode);
ensureGreen("test");

routingTable = getRoutingTable();
Expand All @@ -86,12 +74,7 @@ public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
assertEquals(3, cluster().size());

// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
.execute()
.actionGet();
setSearchDedicatedNodeSettings(node_1);

logger.info("--> creating an index with no replicas");
createIndex(
Expand All @@ -115,12 +98,49 @@ public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
}

public void testSearchReplicaDedicatedIncludes_WhenNotSetDoNotAssign() {
List<String> nodesIds = internalCluster().startNodes(2);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
assertEquals(2, cluster().size());

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureYellowAndNoInitializingShards("test");
IndexShardRoutingTable routingTable = getRoutingTable();
assertNull(routingTable.searchOnlyReplicas().get(0).currentNodeId());

String primaryAssignedNodeName = getNodeName(routingTable.primaryShard().currentNodeId());
String emptyAllowedNode = primaryAssignedNodeName.equals(node_0) ? node_1 : node_0;

// set search only role on another node where primary not assigned
setSearchDedicatedNodeSettings(emptyAllowedNode);

ensureGreen("test");
assertEquals(emptyAllowedNode, getNodeName(getRoutingTable().searchOnlyReplicas().get(0).currentNodeId()));
}

private IndexShardRoutingTable getRoutingTable() {
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
return routingTable;
return getClusterState().routingTable().index("test").getShards().get(0);
}

private String getNodeName(String id) {
return getClusterState().nodes().get(id).getName();
}

private void setSearchDedicatedNodeSettings(String nodeName) {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", nodeName))
.execute()
.actionGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void testReplication() throws Exception {
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();

// set search only role on node
setSearchDedicatedNodeSettings(replica);

ensureGreen(INDEX_NAME);

final int docCount = 10;
Expand All @@ -107,6 +111,10 @@ public void testSegmentReplicationStatsResponseWithSearchReplica() throws Except
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);

// set search only role on node
setSearchDedicatedNodeSettings(nodes.get(0));

ensureGreen(INDEX_NAME);

final int docCount = 5;
Expand Down Expand Up @@ -145,12 +153,7 @@ public void testSearchReplicaRecovery() throws Exception {
final String replica = internalCluster().startDataOnlyNode();

// ensure search replicas are only allocated to "replica" node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", replica))
.execute()
.actionGet();
setSearchDedicatedNodeSettings(replica);

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -191,6 +194,10 @@ public void testRecoveryAfterDocsIndexed() throws Exception {
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();

// search node setting
setSearchDedicatedNodeSettings(replica);

ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

Expand Down Expand Up @@ -258,6 +265,10 @@ public void testStopPrimary_RestoreOnNewNode() throws Exception {
assertDocCounts(docCount, primary);

final String replica = internalCluster().startDataOnlyNode();

// search node setting
setSearchDedicatedNodeSettings(replica);

ensureGreen(INDEX_NAME);
assertDocCounts(docCount, replica);
// stop the primary
Expand Down Expand Up @@ -294,6 +305,10 @@ public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();

// search node setting
setSearchDedicatedNodeSettings(replica);

ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

Expand Down Expand Up @@ -322,4 +337,13 @@ public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
refresh(INDEX_NAME);
assertBusy(() -> assertDocCounts(20, replica, writer_replica));
}

private void setSearchDedicatedNodeSettings(String nodeName) {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", nodeName))
.execute()
.actionGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -72,7 +73,11 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnSegRepWithSea
Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build()
);
ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME);
internalCluster().startDataOnlyNode();
String replicaNode = internalCluster().startDataOnlyNode();

// search node setting
setSearchDedicatedNodeSettings(replicaNode);

ensureGreen(RESTORED_INDEX_NAME);
assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

Expand All @@ -98,7 +103,7 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_Resto
}

private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException {
startCluster(2);
internalCluster().startNodes(2);

Settings settings = Settings.builder()
.put(super.indexSettings())
Expand All @@ -114,8 +119,8 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType
ensureGreen(INDEX_NAME);
}

private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
startCluster(3);
private void bootstrapIndexWithSearchReplicas() {
List<String> nodes = internalCluster().startNodes(3);

Settings settings = Settings.builder()
.put(super.indexSettings())
Expand All @@ -126,16 +131,24 @@ private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
.build();

createIndex(INDEX_NAME, settings);

// search node setting
setSearchDedicatedNodeSettings(nodes.get(0));

ensureGreen(INDEX_NAME);
for (int i = 0; i < DOC_COUNT; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
flushAndRefresh(INDEX_NAME);
}

private void startCluster(int numOfNodes) {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(numOfNodes);
private void setSearchDedicatedNodeSettings(String nodeName) {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", nodeName))
.execute()
.actionGet();
}

private void createRepoAndSnapshot(String repositoryName, String repositoryType, String snapshotName, String indexName) {
Expand Down
Loading
Loading