Skip to content

Commit

Permalink
Upstream fetch
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Feb 12, 2025
1 parent db5212b commit 470c0ea
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import org.opensearch.action.ActionRequestBuilder;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.transport.client.OpenSearchClient;

@PublicApi(since = "1.0.0")
public class SearchOnlyRequestBuilder extends ActionRequestBuilder<SearchOnlyRequest, AcknowledgedResponse> {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected void clusterManagerOperation(
final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get(state.metadata().index(index).getSettings());
for (IndexShardRoutingTable routing : indexShardRoutingTables) {
final int shardId = routing.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, routing);
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, routing, state.metadata().index(index));
if (request.shardStatuses().contains(shardHealth.getStatus())) {
shardsToFetch.add(Tuple.tuple(routing.shardId(), customDataPath));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
shards = new HashMap<>();
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
int shardId = shardRoutingTable.shardId().id();
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable, indexMetadata));
}

// update the index status
Expand Down Expand Up @@ -218,7 +218,7 @@ public ClusterIndexHealth(
if (isShardLevelHealthRequired) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
int shardId = indexShardRoutingTable.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, indexShardRoutingTable);
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, indexShardRoutingTable, indexMetadata);
if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
}
Expand Down Expand Up @@ -268,7 +268,8 @@ public ClusterIndexHealth(
ClusterHealthStatus shardHealth = ClusterShardHealth.getShardHealth(
primaryShard,
activeShardsPerShardId,
shardRoutingCountPerShardId
shardRoutingCountPerShardId,
indexMetadata
);
computeStatus = getIndexHealthStatus(shardHealth, computeStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.health;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -113,7 +114,7 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment {
private int delayedUnassignedShards;
private final boolean primaryActive;

public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardRoutingTable) {
public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardRoutingTable, final IndexMetadata indexMetadata) {
this.shardId = shardId;
int computeActiveShards = 0;
int computeRelocatingShards = 0;
Expand All @@ -139,13 +140,13 @@ public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardR
}
}
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
this.status = getShardHealth(primaryRouting, computeActiveShards, shardRoutingTable.size());
this.status = getShardHealth(primaryRouting, computeActiveShards, shardRoutingTable.size(), indexMetadata);
this.activeShards = computeActiveShards;
this.relocatingShards = computeRelocatingShards;
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;
this.primaryActive = primaryRouting.active();
this.primaryActive = primaryRouting != null && primaryRouting.active();
}

public ClusterShardHealth(final StreamInput in) throws IOException {
Expand Down Expand Up @@ -230,9 +231,17 @@ public void writeTo(final StreamOutput out) throws IOException {
* Shard health is RED when the primary is not active.
* </p>
*/
public static ClusterHealthStatus getShardHealth(final ShardRouting primaryRouting, final int activeShards, final int totalShards) {
// TO DO
// assert primaryRouting != null : "Primary shard routing can't be null";
public static ClusterHealthStatus getShardHealth(
final ShardRouting primaryRouting,
final int activeShards,
final int totalShards,
final IndexMetadata indexMetadata
) {
if (primaryRouting == null) {
boolean isSearchOnlyEnabled = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false);
return isSearchOnlyEnabled ? ClusterHealthStatus.GREEN : ClusterHealthStatus.RED;
}
if (primaryRouting.active()) {
if (activeShards == totalShards) {
return ClusterHealthStatus.GREEN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.common.util.IndexUtils.filterIndices;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

Expand Down Expand Up @@ -142,17 +140,14 @@ public RemoteRestoreResult restore(
) {
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap = new HashMap<>();
ClusterState remoteState = null;
boolean metadataFromRemoteStore = (restoreClusterUUID == null
|| restoreClusterUUID.isEmpty()
|| restoreClusterUUID.isBlank()) == false;
boolean metadataFromRemoteStore = restoreClusterUUID != null && !restoreClusterUUID.isEmpty();

if (metadataFromRemoteStore) {
try {
// Restore with current cluster UUID will fail as same indices would be present in the cluster which we are trying to
// restore
if (currentState.metadata().clusterUUID().equals(restoreClusterUUID)) {
throw new IllegalArgumentException("clusterUUID to restore from should be different from current cluster UUID");
throw new IllegalArgumentException("Cluster UUID for restore must be different from the current cluster UUID.");
}
logger.info("Restoring cluster state from remote store from cluster UUID : [{}]", restoreClusterUUID);
logger.info("Restoring cluster state from remote store for cluster UUID: [{}]", restoreClusterUUID);
remoteState = remoteClusterStateService.getLatestClusterState(
currentState.getClusterName().value(),
restoreClusterUUID,
Expand All @@ -170,25 +165,33 @@ public RemoteRestoreResult restore(
indexNames,
IndicesOptions.fromOptions(true, true, true, true)
);

boolean allSearchOnly = true;
for (String indexName : filteredIndices) {
IndexMetadata indexMetadata = currentState.metadata().index(indexName);
if (indexMetadata == null) {
logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName);
} else if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false) == false) {
logger.warn("Remote store is not enabled for index: {}", indexName);
} else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"cannot restore index [%s] because an open index with same name/uuid already exists in the cluster.",
indexName
) + " Close the existing index."
);
logger.warn("Skipping restore: index [{}] does not exist.", indexName);
continue;
}

boolean isSearchOnly = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false);

if (isSearchOnly) {
logger.warn("Skipping _remotestore/_restore for index [{}] as search-only mode is enabled.", indexName);
} else {
allSearchOnly = false;
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));
}
}

if (allSearchOnly) {
throw new IllegalArgumentException(
"Skipping _remotestore/_restore for all selected indices as search-only mode is enabled."
);
}
}

return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteState);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.opensearch.rest.action.admin.indices;

import org.opensearch.client.node.NodeClient;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.transport.client.node.NodeClient;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@

package org.opensearch.cluster.health;

import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -64,8 +67,19 @@ public void testClusterShardGreenHealth() {
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(indexName, shardID, "node_1", null, false, ShardRoutingState.STARTED)
);

IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
.creationDate(System.currentTimeMillis());
IndexMetadata indexMetadata = indexMetadataBuilder.build();
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable);

ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable, indexMetadata);
assertEquals(2, clusterShardHealth.getActiveShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
Expand Down Expand Up @@ -112,7 +126,18 @@ public void testClusterShardYellowHealth() {
)
);
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable);

IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
.creationDate(System.currentTimeMillis());
IndexMetadata indexMetadata = indexMetadataBuilder.build();

ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable, indexMetadata);
assertEquals(2, clusterShardHealth.getActiveShards());
assertEquals(1, clusterShardHealth.getInitializingShards());
assertEquals(1, clusterShardHealth.getRelocatingShards());
Expand Down Expand Up @@ -150,7 +175,18 @@ public void testClusterShardRedHealth() {
TestShardRouting.newShardRouting(indexName, shardID, null, null, false, ShardRoutingState.UNASSIGNED)
);
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable);

IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
)
.creationDate(System.currentTimeMillis());
IndexMetadata indexMetadata = indexMetadataBuilder.build();

ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable, indexMetadata);
assertEquals(0, clusterShardHealth.getActiveShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
Expand All @@ -161,7 +197,16 @@ public void testClusterShardRedHealth() {
}

public void testShardRoutingNullCheck() {
assertThrows(AssertionError.class, () -> ClusterShardHealth.getShardHealth(null, 0, 0));
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
);
IndexMetadata indexMetadata = indexMetadataBuilder.build();

assertThrows(AssertionError.class, () -> ClusterShardHealth.getShardHealth(null, 0, 0, indexMetadata));
}

@Override
Expand Down

0 comments on commit 470c0ea

Please sign in to comment.