diff --git a/.circleci/config.yml b/.circleci/config.yml index 67d03a7be..c16afcf63 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -178,6 +178,83 @@ jobs: entrypoint: *entrypoint steps: *steps + kafka-370-kraft: + working_directory: *working_directory + environment: + KAFKA_VERSION: "3.7.0" + + # Need to skip nettest to avoid these kinds of errors: + # --- FAIL: TestConn/nettest (17.56s) + # --- FAIL: TestConn/nettest/PingPong (7.40s) + # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request + # conntest.go:118: mismatching value: got 77, want 78 + # conntest.go:118: mismatching value: got 78, want 79 + # ... + # + # TODO: Figure out why these are happening and fix them (they don't appear to be new). + KAFKA_SKIP_NETTEST: "1" + docker: + - image: circleci/golang + - image: bitnami/kafka:3.7.0 + ports: + - 9092:9092 + - 9093:9093 + environment: &kraft-env + KAFKA_KRAFT_MODE: "true" + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CLUSTER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT + KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true + KAFKA_CFG_DELETE_TOPIC_ENABLE: true + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain + KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain + KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret + KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_INTER_BROKER_USER: adminscram512 + KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 + entrypoint: *entrypoint + steps: *steps + + kafka-400: + working_directory: *working_directory + environment: + KAFKA_VERSION: "4.0.0" + + # Need to skip nettest to avoid these kinds of errors: + # --- FAIL: TestConn/nettest (17.56s) + # --- FAIL: TestConn/nettest/PingPong (7.40s) + # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request + # conntest.go:118: mismatching value: got 77, want 78 + # conntest.go:118: mismatching value: got 78, want 79 + # ... + # + # TODO: Figure out why these are happening and fix them (they don't appear to be new). + KAFKA_SKIP_NETTEST: "1" + docker: + - image: circleci/golang + - image: bitnami/kafka:4.0.0 + ports: + - 9092:9092 + - 9093:9093 + environment: *kraft-env + steps: *steps workflows: version: 2 run: @@ -187,3 +264,5 @@ workflows: - kafka-270 - kafka-281 - kafka-370 + - kafka-370-kraft + - kafka-400 \ No newline at end of file diff --git a/alterclientquotas_test.go b/alterclientquotas_test.go index d61c745e3..3bb1023e2 100644 --- a/alterclientquotas_test.go +++ b/alterclientquotas_test.go @@ -3,9 +3,11 @@ package kafka import ( "context" "testing" + "time" - ktesting "github.com/segmentio/kafka-go/testing" "github.com/stretchr/testify/assert" + + ktesting "github.com/segmentio/kafka-go/testing" ) func TestClientAlterClientQuotas(t *testing.T) { @@ -65,6 +67,8 @@ func TestClientAlterClientQuotas(t *testing.T) { assert.Equal(t, expectedAlterResp, *alterResp) + time.Sleep(1 * time.Second) // wait for the quota to be applie (Kafka 4.0.0+) + describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{ Components: []DescribeClientQuotasRequestComponent{ { diff --git a/alterpartitionreassignments_test.go b/alterpartitionreassignments_test.go index 7bbce8fff..48974c7c5 100644 --- a/alterpartitionreassignments_test.go +++ b/alterpartitionreassignments_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "testing" + "time" ktesting "github.com/segmentio/kafka-go/testing" ) @@ -35,6 +36,7 @@ func TestClientAlterPartitionReassignments(t *testing.T) { BrokerIDs: []int{1}, }, }, + Timeout: 5 * time.Second, }, ) @@ -96,6 +98,7 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) { BrokerIDs: []int{1}, }, }, + Timeout: 5 * time.Second, }, ) diff --git a/conn.go b/conn.go index 2b51afbd5..9f9f25903 100644 --- a/conn.go +++ b/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV0{ + _, err := c.deleteTopics(deleteTopicsRequest{ Topics: topics, }) return err @@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) { - var response joinGroupResponseV1 +func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) { + version, err := c.negotiateVersion(joinGroup, v1, v2) + if err != nil { + return joinGroupResponse{}, err + } - err := c.writeOperation( + response := joinGroupResponse{v: version} + + err = c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroup, v1, id, request) + return c.writeRequest(joinGroup, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error }, ) if err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } if response.ErrorCode != 0 { - return joinGroupResponseV1{}, Error(response.ErrorCode) + return joinGroupResponse{}, Error(response.ErrorCode) } return response, nil diff --git a/conn_test.go b/conn_test.go index bdce327e0..ef5ce3071 100644 --- a/conn_test.go +++ b/conn_test.go @@ -13,8 +13,9 @@ import ( "testing" "time" - ktesting "github.com/segmentio/kafka-go/testing" "golang.org/x/net/nettest" + + ktesting "github.com/segmentio/kafka-go/testing" ) type timeout struct{} @@ -679,10 +680,10 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) { func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) { waitForCoordinator(t, conn, groupID) - join := func() (joinGroup joinGroupResponseV1) { + join := func() (joinGroup joinGroupResponse) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV1{ + joinGroup, err = conn.joinGroup(joinGroupRequest{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), @@ -770,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV1{}) + _, err := conn.joinGroup(joinGroupRequest{}) if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -780,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequest{ GroupID: groupID, }) if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) { @@ -792,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequest{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) diff --git a/consumergroup.go b/consumergroup.go index f4bb382cb..b32f90162 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error) + joinGroup(joinGroupRequest) (joinGroupResponse, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } return t.conn.joinGroup(req) } @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV1(memberID) + request, err := cg.makeJoinGroupRequest(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { - request := joinGroupRequestV1{ +func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) { + request := joinGroupRequest{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), @@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque // assignTopicPartitions uses the selected GroupBalancer to assign members to // their various partitions. -func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) { +func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) { cg.withLogger(func(l Logger) { l.Printf("selected as leader for group, %s\n", cg.config.ID) }) @@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup } // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember. -func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) { +func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) { members := make([]GroupMember, 0, len(in)) for _, item := range in { metadata := groupMetadata{} diff --git a/consumergroup_test.go b/consumergroup_test.go index 0d3e290a9..da41fc3df 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "reflect" + "strconv" "strings" "sync" "testing" @@ -15,7 +16,7 @@ var _ coordinator = mockCoordinator{} type mockCoordinator struct { closeFunc func() error findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroupFunc func(joinGroupRequestV1) (joinGroupResponseV1, error) + joinGroupFunc func(joinGroupRequest) (joinGroupResponse, error) syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -38,9 +39,9 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor return c.findCoordinatorFunc(req) } -func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) { +func (c mockCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) { if c.joinGroupFunc == nil { - return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified") + return joinGroupResponse{}, errors.New("no joinGroup behavior specified") } return c.joinGroupFunc(req) } @@ -140,33 +141,36 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } - newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponseV1 { - resp := joinGroupResponseV1{ - GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), - } + newJoinGroupResponse := func(topicsByMemberID map[string][]string) func(v apiVersion) joinGroupResponse { + return func(v apiVersion) joinGroupResponse { + resp := joinGroupResponse{ + v: v, + GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), + } - for memberID, topics := range topicsByMemberID { - resp.Members = append(resp.Members, joinGroupResponseMemberV1{ - MemberID: memberID, - MemberMetadata: groupMetadata{ - Topics: topics, - }.bytes(), - }) - } + for memberID, topics := range topicsByMemberID { + resp.Members = append(resp.Members, joinGroupResponseMember{ + MemberID: memberID, + MemberMetadata: groupMetadata{ + Topics: topics, + }.bytes(), + }) + } - return resp + return resp + } } testCases := map[string]struct { - Members joinGroupResponseV1 + MembersFunc func(v apiVersion) joinGroupResponse Assignments GroupMemberAssignments }{ "nil": { - Members: newJoinGroupResponseV1(nil), + MembersFunc: newJoinGroupResponse(nil), Assignments: GroupMemberAssignments{}, }, "one member, one topic": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, }), Assignments: GroupMemberAssignments{ @@ -176,7 +180,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "one member, two topics": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1", "topic-2"}, }), Assignments: GroupMemberAssignments{ @@ -187,7 +191,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, one topic": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-1"}, }), @@ -201,7 +205,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, two unshared topics": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-2"}, }), @@ -216,21 +220,24 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } + supportedVersions := []apiVersion{v1, v2} // joinGroup versions for label, tc := range testCases { - t.Run(label, func(t *testing.T) { - cg := ConsumerGroup{} - cg.config.GroupBalancers = []GroupBalancer{ - RangeGroupBalancer{}, - RoundRobinGroupBalancer{}, - } - assignments, err := cg.assignTopicPartitions(conn, tc.Members) - if err != nil { - t.Fatalf("bad err: %v", err) - } - if !reflect.DeepEqual(tc.Assignments, assignments) { - t.Errorf("expected %v; got %v", tc.Assignments, assignments) - } - }) + for _, v := range supportedVersions { + t.Run(label+"_v"+strconv.Itoa(int(v)), func(t *testing.T) { + cg := ConsumerGroup{} + cg.config.GroupBalancers = []GroupBalancer{ + RangeGroupBalancer{}, + RoundRobinGroupBalancer{}, + } + assignments, err := cg.assignTopicPartitions(conn, tc.MembersFunc(v)) + if err != nil { + t.Fatalf("bad err: %v", err) + } + if !reflect.DeepEqual(tc.Assignments, assignments) { + t.Errorf("expected %v; got %v", tc.Assignments, assignments) + } + }) + } } } @@ -243,12 +250,12 @@ func TestConsumerGroup(t *testing.T) { scenario: "Next returns generations", function: func(t *testing.T, ctx context.Context, cg *ConsumerGroup) { gen1, err := cg.Next(ctx) - if gen1 == nil { - t.Fatalf("expected generation 1 not to be nil") - } if err != nil { t.Fatalf("expected no error, but got %+v", err) } + if gen1 == nil { + t.Fatalf("expected generation 1 not to be nil") + } // returning from this function should cause the generation to // exit. gen1.Start(func(context.Context) {}) @@ -419,8 +426,8 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { - return joinGroupResponseV1{}, errors.New("join group failed") + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { + return joinGroupResponse{}, errors.New("join group failed") } // NOTE : no stub for leaving the group b/c the member never joined. }, @@ -449,8 +456,8 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { + return joinGroupResponse{ ErrorCode: int16(InvalidTopic), }, nil } @@ -472,8 +479,8 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to join group (leader, unsupported protocol)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { + return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "foo", LeaderID: "abc", @@ -498,8 +505,8 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to sync group (general error)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { + return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "range", LeaderID: "abc", diff --git a/createtopics.go b/createtopics.go index 4244b3117..40f180f58 100644 --- a/createtopics.go +++ b/createtopics.go @@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsRequestV0 struct { +type createTopicsRequest struct { + v apiVersion // v0, v1, v2 + // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. Topics []createTopicsRequestV0Topic @@ -270,86 +272,136 @@ type createTopicsRequestV0 struct { // Timeout ms to wait for a topic to be completely created on the // controller node. Values <= 0 will trigger topic creation and return immediately Timeout int32 + + // If true, check that the topics can be created as specified, but don't create anything. + // Internal use only for Kafka 4.0 support. + ValidateOnly bool } -func (t createTopicsRequestV0) size() int32 { - return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + +func (t createTopicsRequest) size() int32 { + sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + sizeofInt32(t.Timeout) + if t.v >= v1 { + sz += 1 + } + return sz } -func (t createTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t createTopicsRequest) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) }) wb.writeInt32(t.Timeout) + if t.v >= v1 { + wb.writeBool(t.ValidateOnly) + } } -type createTopicsResponseV0TopicError struct { +type createTopicsResponseTopicError struct { + v apiVersion + // Topic name Topic string // ErrorCode holds response error code ErrorCode int16 + + // ErrorMessage holds responce error message string + ErrorMessage string } -func (t createTopicsResponseV0TopicError) size() int32 { - return sizeofString(t.Topic) + +func (t createTopicsResponseTopicError) size() int32 { + sz := sizeofString(t.Topic) + sizeofInt16(t.ErrorCode) + if t.v >= v1 { + sz += sizeofString(t.ErrorMessage) + } + return sz } -func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) { +func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeInt16(t.ErrorCode) + if t.v >= v1 { + wb.writeString(t.ErrorMessage) + } } -func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } + if t.v >= v1 { + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } + } return } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsResponseV0 struct { - TopicErrors []createTopicsResponseV0TopicError +type createTopicsResponse struct { + v apiVersion + + ThrottleTime int32 // v2+ + TopicErrors []createTopicsResponseTopicError } -func (t createTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) +func (t createTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) + if t.v >= v2 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t createTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t createTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v2 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) } -func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var topic createTopicsResponseV0TopicError - if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil { + topic := createTopicsResponseTopicError{v: t.v} + if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil { return } t.TopicErrors = append(t.TopicErrors, topic) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + remain = size + if t.v >= v2 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) { - var response createTopicsResponseV0 +func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) { + version, err := c.negotiateVersion(createTopics, v0, v1, v2) + if err != nil { + return createTopicsResponse{}, err + } + + request.v = version + response := createTopicsResponse{v: version} - err := c.writeOperation( + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopics, v0, id, request) + return c.writeRequest(createTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -383,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { t.toCreateTopicsRequestV0Topic()) } - _, err := c.createTopics(createTopicsRequestV0{ + _, err := c.createTopics(createTopicsRequest{ Topics: requestV0Topics, }) return err diff --git a/createtopics_test.go b/createtopics_test.go index 38819c382..119d17094 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -160,32 +160,37 @@ func TestClientCreateTopics(t *testing.T) { } } -func TestCreateTopicsResponseV0(t *testing.T) { - item := createTopicsResponseV0{ - TopicErrors: []createTopicsResponseV0TopicError{ - { - Topic: "topic", - ErrorCode: 2, +func TestCreateTopicsResponse(t *testing.T) { + supportedVersions := []apiVersion{v0, v1, v2} + for _, v := range supportedVersions { + item := createTopicsResponse{ + v: v, + TopicErrors: []createTopicsResponseTopicError{ + { + v: v, + Topic: "topic", + ErrorCode: 2, + }, }, - }, - } + } - b := bytes.NewBuffer(nil) - w := &writeBuffer{w: b} - item.writeTo(w) + b := bytes.NewBuffer(nil) + w := &writeBuffer{w: b} + item.writeTo(w) - var found createTopicsResponseV0 - remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) - if err != nil { - t.Error(err) - t.FailNow() - } - if remain != 0 { - t.Errorf("expected 0 remain, got %v", remain) - t.FailNow() - } - if !reflect.DeepEqual(item, found) { - t.Error("expected item and found to be the same") - t.FailNow() + found := createTopicsResponse{v: v} + remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } } } diff --git a/deletetopics.go b/deletetopics.go index d758d9fd6..ff73d553b 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV0 struct { +type deleteTopicsRequest struct { // Topics holds the topic names Topics []string @@ -77,41 +77,57 @@ type deleteTopicsRequestV0 struct { Timeout int32 } -func (t deleteTopicsRequestV0) size() int32 { +func (t deleteTopicsRequest) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequest) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } -type deleteTopicsResponseV0 struct { +type deleteTopicsResponse struct { + v apiVersion // v0, v1 + + ThrottleTime int32 // TopicErrorCodes holds per topic error codes TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode } -func (t deleteTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) +func (t deleteTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) + if t.v >= v1 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item deleteTopicsResponseV0TopicErrorCode - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.TopicErrorCodes = append(t.TopicErrorCodes, item) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + remain = size + if t.v >= v1 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v1 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) }) } @@ -146,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) { - var response deleteTopicsResponseV0 - err := c.writeOperation( +func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) { + version, err := c.negotiateVersion(deleteTopics, v0, v1) + if err != nil { + return deleteTopicsResponse{}, err + } + + response := deleteTopicsResponse{ + v: version, + } + + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopics, v0, id, request) + return c.writeRequest(deleteTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -164,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse }, ) if err != nil { - return deleteTopicsResponseV0{}, err + return deleteTopicsResponse{}, err } for _, c := range response.TopicErrorCodes { if c.ErrorCode != 0 { diff --git a/deletetopics_test.go b/deletetopics_test.go index 3caffe840..4dc681831 100644 --- a/deletetopics_test.go +++ b/deletetopics_test.go @@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) { } func TestDeleteTopicsResponseV1(t *testing.T) { - item := deleteTopicsResponseV0{ + item := deleteTopicsResponse{ TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{ { Topic: "a", @@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found deleteTopicsResponseV0 + var found deleteTopicsResponse remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Fatal(err) diff --git a/describegroups_test.go b/describegroups_test.go index ad5890988..5d907366a 100644 --- a/describegroups_test.go +++ b/describegroups_test.go @@ -32,7 +32,7 @@ func TestClientDescribeGroups(t *testing.T) { Topic: topic, }) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0 defer cancel() err := w.WriteMessages( diff --git a/electleaders_test.go b/electleaders_test.go index 3dbaa4704..8933bb4c1 100644 --- a/electleaders_test.go +++ b/electleaders_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "testing" + "time" ktesting "github.com/segmentio/kafka-go/testing" ) @@ -26,6 +27,7 @@ func TestClientElectLeaders(t *testing.T) { &ElectLeadersRequest{ Topic: topic, Partitions: []int{0, 1}, + Timeout: 5 * time.Second, }, ) diff --git a/joingroup.go b/joingroup.go index 30823a69a..f3d90a937 100644 --- a/joingroup.go +++ b/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV1 struct { +type joinGroupRequest struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV1 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV1) size() int32 { +func (t joinGroupRequest) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { +func (t joinGroupRequest) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) @@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) }) } -type joinGroupResponseMemberV1 struct { +type joinGroupResponseMember struct { // MemberID assigned by the group coordinator MemberID string MemberMetadata []byte } -func (t joinGroupResponseMemberV1) size() int32 { +func (t joinGroupResponseMember) size() int32 { return sizeofString(t.MemberID) + sizeofBytes(t.MemberMetadata) } -func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponseMember) writeTo(wb *writeBuffer) { wb.writeString(t.MemberID) wb.writeBytes(t.MemberMetadata) } -func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.MemberID); err != nil { return } @@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain return } -type joinGroupResponseV1 struct { +type joinGroupResponse struct { + v apiVersion // v1, v2 + + ThrottleTime int32 + // ErrorCode holds response error code ErrorCode int16 @@ -323,19 +327,26 @@ type joinGroupResponseV1 struct { // MemberID assigned by the group coordinator MemberID string - Members []joinGroupResponseMemberV1 + Members []joinGroupResponseMember } -func (t joinGroupResponseV1) size() int32 { - return sizeofInt16(t.ErrorCode) + +func (t joinGroupResponse) size() int32 { + sz := sizeofInt16(t.ErrorCode) + sizeofInt32(t.GenerationID) + sizeofString(t.GroupProtocol) + sizeofString(t.LeaderID) + sizeofString(t.MemberID) + sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() }) + if t.v >= v2 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponse) writeTo(wb *writeBuffer) { + if t.v >= v2 { + wb.writeInt32(t.ThrottleTime) + } wb.writeInt16(t.ErrorCode) wb.writeInt32(t.GenerationID) wb.writeString(t.GroupProtocol) @@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) }) } -func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { +func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { + remain = size + if t.v >= v2 { + if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } if remain, err = readInt32(r, remain, &t.GenerationID); err != nil { @@ -362,7 +379,7 @@ func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, e } fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var item joinGroupResponseMemberV1 + var item joinGroupResponseMember if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } diff --git a/joingroup_test.go b/joingroup_test.go index 926f5b4a6..73922d6a0 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -217,37 +217,41 @@ func TestMemberMetadata(t *testing.T) { } } -func TestJoinGroupResponseV1(t *testing.T) { - item := joinGroupResponseV1{ - ErrorCode: 2, - GenerationID: 3, - GroupProtocol: "a", - LeaderID: "b", - MemberID: "c", - Members: []joinGroupResponseMemberV1{ - { - MemberID: "d", - MemberMetadata: []byte("blah"), +func TestJoinGroupResponse(t *testing.T) { + supportedVersions := []apiVersion{v1, v2} + for _, v := range supportedVersions { + item := joinGroupResponse{ + v: v, + ErrorCode: 2, + GenerationID: 3, + GroupProtocol: "a", + LeaderID: "b", + MemberID: "c", + Members: []joinGroupResponseMember{ + { + MemberID: "d", + MemberMetadata: []byte("blah"), + }, }, - }, - } + } - b := bytes.NewBuffer(nil) - w := &writeBuffer{w: b} - item.writeTo(w) + b := bytes.NewBuffer(nil) + w := &writeBuffer{w: b} + item.writeTo(w) - var found joinGroupResponseV1 - remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) - if err != nil { - t.Error(err) - t.FailNow() - } - if remain != 0 { - t.Errorf("expected 0 remain, got %v", remain) - t.FailNow() - } - if !reflect.DeepEqual(item, found) { - t.Error("expected item and found to be the same") - t.FailNow() + found := joinGroupResponse{v: v} + remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } } } diff --git a/listgroups.go b/listgroups.go index 229de9352..5034b5440 100644 --- a/listgroups.go +++ b/listgroups.go @@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item listGroupsResponseGroupV1 - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.Groups = append(t.Groups, item) diff --git a/listgroups_test.go b/listgroups_test.go index 8c389d712..a148718be 100644 --- a/listgroups_test.go +++ b/listgroups_test.go @@ -55,7 +55,7 @@ func TestClientListGroups(t *testing.T) { Topic: topic, }) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0 defer cancel() err := w.WriteMessages( diff --git a/offsetfetch.go b/offsetfetch.go index b85bc5c83..ce80213f8 100644 --- a/offsetfetch.go +++ b/offsetfetch.go @@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { item := offsetFetchResponseV1PartitionResponse{} - if fnRemain, fnErr = (&item).readFrom(r, size); err != nil { + if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } t.PartitionResponses = append(t.PartitionResponses, item) diff --git a/reader_test.go b/reader_test.go index d179c2858..63d81816e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) { conn.SetDeadline(time.Now().Add(10 * time.Second)) - _, err = conn.createTopics(createTopicsRequestV0{ + _, err = conn.createTopics(createTopicsRequest{ Topics: []createTopicsRequestV0Topic{ { Topic: topic, diff --git a/sasl/sasl_test.go b/sasl/sasl_test.go index a4101391a..57ff8b7cf 100644 --- a/sasl/sasl_test.go +++ b/sasl/sasl_test.go @@ -18,6 +18,11 @@ const ( ) func TestSASL(t *testing.T) { + scramUsers := map[scram.Algorithm]string{scram.SHA256: "adminscram", scram.SHA512: "adminscram"} + // kafka 4.0.0 test environment supports only different users for different scram algorithms. + if ktesting.KafkaIsAtLeast("4.0.0") { + scramUsers = map[scram.Algorithm]string{scram.SHA256: "adminscram256", scram.SHA512: "adminscram512"} + } tests := []struct { valid func() sasl.Mechanism invalid func() sasl.Mechanism @@ -39,22 +44,22 @@ func TestSASL(t *testing.T) { }, { valid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256") + mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "admin-secret-256") return mech }, invalid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword") + mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "badpassword") return mech }, minKafka: "0.10.2.0", }, { valid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512") + mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "admin-secret-512") return mech }, invalid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword") + mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "badpassword") return mech }, minKafka: "0.10.2.0", diff --git a/writer_test.go b/writer_test.go index 6f894ecd3..def7d8a63 100644 --- a/writer_test.go +++ b/writer_test.go @@ -856,7 +856,8 @@ func testWriterAutoCreateTopic(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err = w.WriteMessages(ctx, msg) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) || + errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } @@ -924,7 +925,8 @@ func testWriterSasl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err = w.WriteMessages(ctx, msg) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) || + errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue }