@@ -44,6 +44,8 @@ import org.junit.Assume
4444import java.nio.file.Files
4545import java.util.concurrent.TimeUnit
4646import org.opensearch.replication.ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS
47+ import org.junit.BeforeClass
48+ import org.junit.AfterClass
4749
4850@MultiClusterAnnotations.ClusterConfigurations (
4951 MultiClusterAnnotations .ClusterConfiguration (clusterName = LEADER ),
@@ -57,6 +59,42 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {
5759 private val buildDir = System .getProperty(" build.dir" )
5860 private val synonymsJson = " /analyzers/synonym_setting.json"
5961
62+ companion object {
63+ private val allSynonymPaths = mutableListOf< java.nio.file.Path > ()
64+ private val buildDir = System .getProperty(" build.dir" )
65+
66+ @BeforeClass
67+ @JvmStatic
68+ fun setupSynonymFiles () {
69+ val testClustersDir = PathUtils .get(buildDir, " testclusters" )
70+
71+ Files .walk(testClustersDir, 1 )
72+ .filter { Files .isDirectory(it) && (it.fileName.toString().startsWith(" leaderCluster" ) || it.fileName.toString().startsWith(" followCluster" )) }
73+ .forEach { clusterDir ->
74+ val configDir = clusterDir.resolve(" config" )
75+ if (Files .exists(configDir)) {
76+ // Copy all required synonym files
77+ listOf (" synonyms.txt" , " synonyms_new.txt" , " synonyms_follower.txt" ).forEach { filename ->
78+ val targetPath = configDir.resolve(filename)
79+ ResumeReplicationIT ::class .java.getResourceAsStream(" /analyzers/synonyms.txt" )?.use { synonymStream ->
80+ Files .copy(synonymStream, targetPath)
81+ allSynonymPaths.add(targetPath)
82+ }
83+ }
84+ }
85+ }
86+ }
87+
88+ @AfterClass
89+ @JvmStatic
90+ fun cleanupSynonymFiles () {
91+ allSynonymPaths.forEach { if (Files .exists(it)) Files .delete(it) }
92+ allSynonymPaths.clear()
93+ }
94+ }
95+
96+
97+
6098 fun `test pause and resume replication in following state and empty index` () {
6199 val followerClient = getClientForCluster(FOLLOWER )
62100 val leaderClient = getClientForCluster(LEADER )
@@ -245,56 +283,36 @@ class ResumeReplicationIT: MultiClusterRestTestCase() {
245283
246284 Assume .assumeFalse(ANALYZERS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS , checkifIntegTestRemote())
247285
248- val synonyms = javaClass.getResourceAsStream(" /analyzers/synonyms.txt" )
249- val config = PathUtils .get(buildDir, leaderClusterPath, " config" )
250- val synonymPath = config.resolve(" synonyms.txt" )
251- val newSynonymPath = config.resolve(" synonyms_new.txt" )
252- val followerConfig = PathUtils .get(buildDir, followerClusterPath, " config" )
253286 val followerSynonymFilename = " synonyms_follower.txt"
254- val followerSynonymPath = followerConfig.resolve(followerSynonymFilename)
255287 val leaderClient = getClientForCluster(LEADER )
256288 val followerClient = getClientForCluster(FOLLOWER )
289+
290+ var settings: Settings = Settings .builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false )
291+ .build()
257292 try {
258- Files .copy(synonyms, synonymPath)
259- Files .copy(synonyms, followerSynonymPath)
260- var settings: Settings = Settings .builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false )
261- .build()
262- try {
263- val createIndexResponse = leaderClient.indices().create(CreateIndexRequest (leaderIndexName).settings(settings), RequestOptions .DEFAULT )
264- assertThat(createIndexResponse.isAcknowledged).isTrue()
265- } catch (e: Exception ) {
266- assumeNoException(" Ignored test as analyzer setting could not be added" , e)
267- }
268- createConnectionBetweenClusters(FOLLOWER , LEADER )
269- val overriddenSettings: Settings = Settings .builder()
270- .put(" index.analysis.filter.my_filter.synonyms_path" , followerSynonymFilename)
271- .build()
272- followerClient.startReplication(StartReplicationRequest (" source" , leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true )
273- followerClient.pauseReplication(followerIndexName)
274- leaderClient.indices().close(CloseIndexRequest (leaderIndexName), RequestOptions .DEFAULT );
275- Files .copy(synonyms, newSynonymPath)
276- settings = Settings .builder()
277- .put(" index.analysis.filter.my_filter.synonyms_path" , " synonyms_new.txt" )
278- .build()
279- try {
280- leaderClient.indices().putSettings(UpdateSettingsRequest (leaderIndexName).settings(settings), RequestOptions .DEFAULT )
281- } catch (e: Exception ) {
282- assumeNoException(" Ignored test as analyzer setting could not be added" , e)
283- }
284- leaderClient.indices().open(OpenIndexRequest (leaderIndexName), RequestOptions .DEFAULT );
285- followerClient.resumeReplication(followerIndexName)
286- var statusResp = followerClient.replicationStatus(followerIndexName)
287- `validate status syncing response`(statusResp)
288- } finally {
289- if (Files .exists(synonymPath)) {
290- Files .delete(synonymPath)
291- }
292- if (Files .exists(followerSynonymPath)) {
293- Files .delete(followerSynonymPath)
294- }
295- if (Files .exists(newSynonymPath)) {
296- Files .delete(newSynonymPath)
297- }
293+ val createIndexResponse = leaderClient.indices().create(CreateIndexRequest (leaderIndexName).settings(settings), RequestOptions .DEFAULT )
294+ assertThat(createIndexResponse.isAcknowledged).isTrue()
295+ } catch (e: Exception ) {
296+ assumeNoException(" Ignored test as analyzer setting could not be added" , e)
297+ }
298+ createConnectionBetweenClusters(FOLLOWER , LEADER )
299+ val overriddenSettings: Settings = Settings .builder()
300+ .put(" index.analysis.filter.my_filter.synonyms_path" , followerSynonymFilename)
301+ .build()
302+ followerClient.startReplication(StartReplicationRequest (" source" , leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true )
303+ followerClient.pauseReplication(followerIndexName)
304+ leaderClient.indices().close(CloseIndexRequest (leaderIndexName), RequestOptions .DEFAULT );
305+ settings = Settings .builder()
306+ .put(" index.analysis.filter.my_filter.synonyms_path" , " synonyms_new.txt" )
307+ .build()
308+ try {
309+ leaderClient.indices().putSettings(UpdateSettingsRequest (leaderIndexName).settings(settings), RequestOptions .DEFAULT )
310+ } catch (e: Exception ) {
311+ assumeNoException(" Ignored test as analyzer setting could not be added" , e)
298312 }
313+ leaderClient.indices().open(OpenIndexRequest (leaderIndexName), RequestOptions .DEFAULT );
314+ followerClient.resumeReplication(followerIndexName)
315+ var statusResp = followerClient.replicationStatus(followerIndexName)
316+ `validate status syncing response`(statusResp)
299317 }
300318}
0 commit comments