@@ -254,7 +254,7 @@ impl v1alpha1::KafkaCluster {
254254 match & self . spec . cluster_config . metadata_manager {
255255 Some ( manager) => match manager. clone ( ) {
256256 MetadataManager :: ZooKeeper => {
257- if self . spec . image . product_version ( ) . starts_with ( "4 \\ ." ) {
257+ if ! self . spec . image . product_version ( ) . starts_with ( "3 ." ) {
258258 Err ( Error :: Kafka4RequiresKraftMetadataManager )
259259 } else {
260260 Ok ( MetadataManager :: ZooKeeper )
@@ -263,10 +263,10 @@ impl v1alpha1::KafkaCluster {
263263 _ => Ok ( MetadataManager :: KRaft ) ,
264264 } ,
265265 None => {
266- if self . spec . image . product_version ( ) . starts_with ( "4\\ ." ) {
267- Ok ( MetadataManager :: KRaft )
268- } else {
266+ if self . spec . image . product_version ( ) . starts_with ( "3." ) {
269267 Ok ( MetadataManager :: ZooKeeper )
268+ } else {
269+ Ok ( MetadataManager :: KRaft )
270270 }
271271 }
272272 }
@@ -508,6 +508,8 @@ pub enum MetadataManager {
508508
509509#[ cfg( test) ]
510510mod tests {
511+ use rstest:: rstest;
512+
511513 use super :: * ;
512514
513515 fn get_server_secret_class ( kafka : & v1alpha1:: KafkaCluster ) -> Option < String > {
@@ -686,4 +688,66 @@ mod tests {
686688 tls:: internal_tls_default( )
687689 ) ;
688690 }
691+
692+ #[ rstest]
693+ #[ case( "3.9.1" , None , Ok ( MetadataManager :: ZooKeeper ) ) ]
694+ #[ case(
695+ "3.9.1" ,
696+ Some ( MetadataManager :: ZooKeeper ) ,
697+ Ok ( MetadataManager :: ZooKeeper )
698+ ) ]
699+ #[ case( "3.9.1" , Some ( MetadataManager :: KRaft ) , Ok ( MetadataManager :: KRaft ) ) ]
700+ #[ case( "4.1.1" , None , Ok ( MetadataManager :: KRaft ) ) ]
701+ #[ case(
702+ "4.1.1" ,
703+ Some ( MetadataManager :: ZooKeeper ) ,
704+ Err ( Error :: Kafka4RequiresKraftMetadataManager )
705+ ) ]
706+ #[ case( "4.1.1" , Some ( MetadataManager :: KRaft ) , Ok ( MetadataManager :: KRaft ) ) ]
707+ fn test_effective_metadata_manager (
708+ #[ case] product_version : & str ,
709+ #[ case] metadata_manager : Option < MetadataManager > ,
710+ #[ case] expected : Result < MetadataManager , Error > ,
711+ ) {
712+ let input = format ! (
713+ r#"
714+ apiVersion: kafka.stackable.tech/v1alpha1
715+ kind: KafkaCluster
716+ metadata:
717+ name: kafka
718+ spec:
719+ image:
720+ productVersion: {product_version}
721+ controllers:
722+ roleGroups:
723+ default:
724+ replicas: 1
725+ brokers:
726+ roleGroups:
727+ default:
728+ replicas: 1
729+ "#
730+ ) ;
731+ let mut kafka: v1alpha1:: KafkaCluster =
732+ serde_yaml:: from_str ( & input) . expect ( "illegal test input" ) ;
733+
734+ if metadata_manager. is_some ( ) {
735+ kafka. spec . cluster_config . metadata_manager = metadata_manager;
736+ }
737+
738+ match kafka. effective_metadata_manager ( ) {
739+ Ok ( manager) => match expected {
740+ Ok ( expected_manager) => assert_eq ! ( manager, expected_manager) ,
741+ Err ( _) => {
742+ panic ! ( "expected error but got metadata manager : {}" , manager)
743+ }
744+ } ,
745+ Err ( err) => match expected {
746+ Ok ( _) => panic ! ( "expected Ok but got error: {}" , err) ,
747+ Err ( expected_err) => {
748+ assert_eq ! ( format!( "{}" , err) , format!( "{}" , expected_err) )
749+ }
750+ } ,
751+ } ;
752+ }
689753}
0 commit comments