Skip to content

Commit

Permalink
Using RemoteDirectory#delete to clear all segments during migration
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Feb 12, 2025
1 parent d0a65d3 commit b7fe0b8
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.BLOCK_SEGMENT_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -113,6 +114,26 @@ protected void setFailRate(String repoName, int value) throws ExecutionException
createRepository(repoName, ReloadableFsRepository.TYPE, settings);
}

protected void setBlockOnSegments(String repoName) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(BLOCK_SEGMENT_SETTING.getKey(), true);
createRepository(repoName, ReloadableFsRepository.TYPE, settings);
}

protected void unsetBlockOnSegments(String repoName) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(BLOCK_SEGMENT_SETTING.getKey(), false);
createRepository(repoName, ReloadableFsRepository.TYPE, settings);
}

public void initDocRepToRemoteMigration() {
assertTrue(
internalCluster().client()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.Client;
import org.opensearch.transport.client.Requests;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -195,4 +198,87 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
.get();
}

public void testMixedModeRelocation_FailInFinalize() throws Exception {
String docRepNode = internalCluster().startNode();
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");

AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
asyncIndexingService.startIndexing();

refresh("test");

// add remote node in mixed mode cluster
setAddRemote(true);
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

AtomicBoolean failFinalize = new AtomicBoolean(true);

MockTransportService remoteNodeTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
remoteNode
);

remoteNodeTransportService.addRequestHandlingBehavior(
PeerRecoveryTargetService.Actions.FINALIZE,
(handler, request, channel, task) -> {
if (failFinalize.get()) {
throw new IOException("Failing finalize");
} else {
handler.messageReceived(request, channel, task);
}
}
);

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), "40s"))
.get();

// Change direction to remote store
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(5))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertTrue(clusterHealthResponse.getRelocatingShards() == 1);

ClusterHealthRequest healthRequest = Requests.clusterHealthRequest()
.waitForNoRelocatingShards(true)
.waitForNoInitializingShards(true);
ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet();
assertEquals(actionGet.getRelocatingShards(), 0);
assertEquals(docRepNode, primaryNodeName("test"));

// now unblock it
logger.info("Unblocking the finalize recovery now");
failFinalize.set(false);

client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
waitForRelocation();

asyncIndexingService.stopIndexing();
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
}

private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
Files.createDirectories(path);
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
final int bufferSize = blobStore.bufferSizeInBytes();
org.opensearch.common.util.io.Streams.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5056,7 +5056,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
*/
public void deleteRemoteStoreContents() throws IOException {
deleteTranslogFilesFromRemoteTranslog();
getRemoteDirectory().deleteStaleSegments(0);
getRemoteDirectory().delete();
}

public void syncTranslogFilesFromRemoteTranslog() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ public void onFailure(Exception e) {
throw new IOException("Exception in listFilesByPrefixInLexicographicOrder with prefix: " + filenamePrefix, e);
}
if (exception.get() != null) {
if (exception.get() instanceof NoSuchFileException) {
return sortedBlobList;
}
throw new IOException(exception.get());
} else {
return sortedBlobList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());

logger.debug(
logger.info(
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
metadataFilesEligibleToDelete,
metadataFilesToBeDeleted
Expand Down Expand Up @@ -1061,7 +1061,7 @@ private boolean deleteIfEmpty() throws IOException {
return delete();
}

private boolean delete() {
public boolean delete() {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
Expand Down

0 comments on commit b7fe0b8

Please sign in to comment.