1515 * limitations under the License.
1616 */
1717
18- package kafka .server ;
18+ package org . apache . kafka .server ;
1919
20+ import org .apache .kafka .clients .admin .AddRaftVoterOptions ;
2021import org .apache .kafka .clients .admin .Admin ;
2122import org .apache .kafka .clients .admin .FeatureMetadata ;
2223import org .apache .kafka .clients .admin .QuorumInfo ;
2324import org .apache .kafka .clients .admin .RaftVoterEndpoint ;
25+ import org .apache .kafka .clients .admin .RemoveRaftVoterOptions ;
2426import org .apache .kafka .common .Uuid ;
27+ import org .apache .kafka .common .errors .InconsistentClusterIdException ;
2528import org .apache .kafka .common .test .KafkaClusterTestKit ;
2629import org .apache .kafka .common .test .TestKitNodes ;
2730import org .apache .kafka .common .test .api .TestKitDefaults ;
2831import org .apache .kafka .raft .QuorumConfig ;
2932import org .apache .kafka .server .common .KRaftVersion ;
3033import org .apache .kafka .test .TestUtils ;
3134
35+ import org .junit .jupiter .api .Tag ;
3236import org .junit .jupiter .api .Test ;
3337
3438import java .util .HashMap ;
3539import java .util .Map ;
40+ import java .util .Optional ;
3641import java .util .Set ;
3742import java .util .TreeMap ;
3843
4146import static org .junit .jupiter .api .Assertions .assertNotEquals ;
4247import static org .junit .jupiter .api .Assertions .assertTrue ;
4348
49+ @ Tag ("integration" )
4450public class ReconfigurableQuorumIntegrationTest {
4551 static void checkKRaftVersions (Admin admin , short finalized ) throws Exception {
4652 FeatureMetadata featureMetadata = admin .describeFeatures ().featureMetadata ().get ();
@@ -70,7 +76,7 @@ public void testCreateAndDestroyNonReconfigurableCluster() throws Exception {
7076 ).build ()) {
7177 cluster .format ();
7278 cluster .startup ();
73- try (Admin admin = Admin .create (cluster .clientProperties ())) {
79+ try (var admin = Admin .create (cluster .clientProperties ())) {
7480 TestUtils .retryOnExceptionWithTimeout (30_000 , () -> {
7581 checkKRaftVersions (admin , KRaftVersion .KRAFT_VERSION_0 .featureLevel ());
7682 });
@@ -88,7 +94,7 @@ public void testCreateAndDestroyReconfigurableCluster() throws Exception {
8894 ).setStandalone (true ).build ()) {
8995 cluster .format ();
9096 cluster .startup ();
91- try (Admin admin = Admin .create (cluster .clientProperties ())) {
97+ try (var admin = Admin .create (cluster .clientProperties ())) {
9298 TestUtils .retryOnExceptionWithTimeout (30_000 , () -> {
9399 checkKRaftVersions (admin , KRaftVersion .KRAFT_VERSION_1 .featureLevel ());
94100 });
@@ -126,7 +132,7 @@ public void testRemoveController() throws Exception {
126132 ) {
127133 cluster .format ();
128134 cluster .startup ();
129- try (Admin admin = Admin .create (cluster .clientProperties ())) {
135+ try (var admin = Admin .create (cluster .clientProperties ())) {
130136 TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
131137 Map <Integer , Uuid > voters = findVoterDirs (admin );
132138 assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
@@ -161,7 +167,7 @@ public void testRemoveAndAddSameController() throws Exception {
161167 ) {
162168 cluster .format ();
163169 cluster .startup ();
164- try (Admin admin = Admin .create (cluster .clientProperties ())) {
170+ try (var admin = Admin .create (cluster .clientProperties ())) {
165171 TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
166172 Map <Integer , Uuid > voters = findVoterDirs (admin );
167173 assertEquals (Set .of (3000 , 3001 , 3002 , 3003 ), voters .keySet ());
@@ -200,7 +206,7 @@ public void testControllersAutoJoinStandaloneVoter() throws Exception {
200206 ) {
201207 cluster .format ();
202208 cluster .startup ();
203- try (Admin admin = Admin .create (cluster .clientProperties ())) {
209+ try (var admin = Admin .create (cluster .clientProperties ())) {
204210 TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
205211 Map <Integer , Uuid > voters = findVoterDirs (admin );
206212 assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
@@ -238,7 +244,7 @@ public void testNewVoterAutoRemovesAndAdds() throws Exception {
238244 ) {
239245 cluster .format ();
240246 cluster .startup ();
241- try (Admin admin = Admin .create (cluster .clientProperties ())) {
247+ try (var admin = Admin .create (cluster .clientProperties ())) {
242248 TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
243249 Map <Integer , Uuid > voters = findVoterDirs (admin );
244250 assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
@@ -249,4 +255,95 @@ public void testNewVoterAutoRemovesAndAdds() throws Exception {
249255 }
250256 }
251257 }
252- }
258+
259+ @ Test
260+ public void testRemoveAndAddVoterWithValidClusterId () throws Exception {
261+ final var nodes = new TestKitNodes .Builder ()
262+ .setClusterId ("test-cluster" )
263+ .setNumBrokerNodes (1 )
264+ .setNumControllerNodes (3 )
265+ .build ();
266+
267+ final Map <Integer , Uuid > initialVoters = new HashMap <>();
268+ for (final var controllerNode : nodes .controllerNodes ().values ()) {
269+ initialVoters .put (
270+ controllerNode .id (),
271+ controllerNode .metadataDirectoryId ()
272+ );
273+ }
274+
275+ try (var cluster = new KafkaClusterTestKit .Builder (nodes ).setInitialVoterSet (initialVoters ).build ()) {
276+ cluster .format ();
277+ cluster .startup ();
278+ try (var admin = Admin .create (cluster .clientProperties ())) {
279+ TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
280+ Map <Integer , Uuid > voters = findVoterDirs (admin );
281+ assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
282+ for (int replicaId : new int [] {3000 , 3001 , 3002 }) {
283+ assertNotEquals (Uuid .ZERO_UUID , voters .get (replicaId ));
284+ }
285+ });
286+
287+ Uuid dirId = cluster .nodes ().controllerNodes ().get (3000 ).metadataDirectoryId ();
288+ admin .removeRaftVoter (
289+ 3000 ,
290+ dirId ,
291+ new RemoveRaftVoterOptions ().setClusterId (Optional .of ("test-cluster" ))
292+ ).all ().get ();
293+ TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
294+ Map <Integer , Uuid > voters = findVoterDirs (admin );
295+ assertEquals (Set .of (3001 , 3002 ), voters .keySet ());
296+ for (int replicaId : new int [] {3001 , 3002 }) {
297+ assertNotEquals (Uuid .ZERO_UUID , voters .get (replicaId ));
298+ }
299+ });
300+
301+ admin .addRaftVoter (
302+ 3000 ,
303+ dirId ,
304+ Set .of (new RaftVoterEndpoint ("CONTROLLER" , "example.com" , 8080 )),
305+ new AddRaftVoterOptions ().setClusterId (Optional .of ("test-cluster" ))
306+ ).all ().get ();
307+ }
308+ }
309+ }
310+
311+ @ Test
312+ public void testRemoveAndAddVoterWithInconsistentClusterId () throws Exception {
313+ final var nodes = new TestKitNodes .Builder ()
314+ .setClusterId ("test-cluster" )
315+ .setNumBrokerNodes (1 )
316+ .setNumControllerNodes (3 )
317+ .build ();
318+
319+ final Map <Integer , Uuid > initialVoters = new HashMap <>();
320+ for (final var controllerNode : nodes .controllerNodes ().values ()) {
321+ initialVoters .put (
322+ controllerNode .id (),
323+ controllerNode .metadataDirectoryId ()
324+ );
325+ }
326+
327+ try (var cluster = new KafkaClusterTestKit .Builder (nodes ).setInitialVoterSet (initialVoters ).build ()) {
328+ cluster .format ();
329+ cluster .startup ();
330+ try (var admin = Admin .create (cluster .clientProperties ())) {
331+ Uuid dirId = cluster .nodes ().controllerNodes ().get (3000 ).metadataDirectoryId ();
332+ var removeFuture = admin .removeRaftVoter (
333+ 3000 ,
334+ dirId ,
335+ new RemoveRaftVoterOptions ().setClusterId (Optional .of ("inconsistent" ))
336+ ).all ();
337+ TestUtils .assertFutureThrows (InconsistentClusterIdException .class , removeFuture );
338+
339+ var addFuture = admin .addRaftVoter (
340+ 3000 ,
341+ dirId ,
342+ Set .of (new RaftVoterEndpoint ("CONTROLLER" , "example.com" , 8080 )),
343+ new AddRaftVoterOptions ().setClusterId (Optional .of ("inconsistent" ))
344+ ).all ();
345+ TestUtils .assertFutureThrows (InconsistentClusterIdException .class , addFuture );
346+ }
347+ }
348+ }
349+ }
0 commit comments