Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

work around incorrect v1 metadata #1172

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 59 additions & 42 deletions consumergroup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kafka

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -13,6 +11,8 @@ import (
"strings"
"sync"
"time"

"github.com/segmentio/kafka-go/protocol/consumer"
)

// ErrGroupClosed is returned by ConsumerGroup.Next when the group has already
Expand Down Expand Up @@ -168,7 +168,6 @@ type ConsumerGroupConfig struct {
// Validate method validates ConsumerGroupConfig properties and sets relevant
// defaults.
func (config *ConsumerGroupConfig) Validate() error {

if len(config.Brokers) == 0 {
return errors.New("cannot create a consumer group with an empty list of broker addresses")
}
Expand Down Expand Up @@ -925,12 +924,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
// - GroupLoadInProgress:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - InconsistentGroupProtocol:
// - InvalidSessionTimeout:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
Expand All @@ -951,7 +950,6 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
cg.withLogger(func(l Logger) {
l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
})

var assignments GroupMemberAssignments
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
v, err := cg.assignTopicPartitions(conn, response)
Expand Down Expand Up @@ -990,15 +988,19 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
for _, balancer := range cg.config.GroupBalancers {
userData, err := balancer.UserData()
if err != nil {
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata user data for member, %v: %w", balancer.ProtocolName(), err)
}
pm, err := (&consumer.Subscription{
Version: 1,
Topics: cg.config.Topics,
UserData: userData,
}).Bytes()
if err != nil {
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata subscription for member, %v: %w", balancer.ProtocolName(), err)
}
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
ProtocolName: balancer.ProtocolName(),
ProtocolMetadata: groupMetadata{
Version: 1,
Topics: cg.config.Topics,
UserData: userData,
}.bytes(),
ProtocolName: balancer.ProtocolName(),
ProtocolMetadata: pm,
})
}

Expand Down Expand Up @@ -1053,9 +1055,9 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
members := make([]GroupMember, 0, len(in))
for _, item := range in {
metadata := groupMetadata{}
reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
var metadata consumer.Subscription
err := metadata.FromBytes(item.MemberMetadata)
if err != nil {
return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
}

Expand All @@ -1073,13 +1075,16 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - IllegalGeneration:
// - RebalanceInProgress:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
request, err := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
if err != nil {
return nil, err
}
response, err := conn.syncGroup(request)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
Expand All @@ -1088,13 +1093,13 @@ func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generation
return nil, err
}

assignments := groupAssignment{}
reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
var assignment consumer.Assignment
err = assignment.FromBytes(response.MemberAssignments)
if err != nil {
return nil, err
}

if len(assignments.Topics) == 0 {
if len(assignment.AssignedPartitions) == 0 {
cg.withLogger(func(l Logger) {
l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
})
Expand All @@ -1104,10 +1109,15 @@ func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generation
l.Printf("sync group finished for group, %v", cg.config.ID)
})

return assignments.Topics, nil
assignments := make(map[string][]int32, len(assignment.AssignedPartitions))
for _, ap := range assignment.AssignedPartitions {
assignments[ap.Topic] = ap.Partitions
}

return assignments, nil
}

func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) (syncGroupRequestV0, error) {
request := syncGroupRequestV0{
GroupID: cg.config.ID,
GenerationID: generationID,
Expand All @@ -1118,20 +1128,27 @@ func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID in
request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)

for memberID, topics := range memberAssignments {
topics32 := make(map[string][]int32)
assignedPartitions := make([]consumer.TopicPartition, 0, len(topics))
for topic, partitions := range topics {
partitions32 := make([]int32, len(partitions))
topic := consumer.TopicPartition{
Topic: topic,
Partitions: make([]int32, len(partitions)),
}
for i := range partitions {
partitions32[i] = int32(partitions[i])
topic.Partitions[i] = int32(partitions[i])
}
topics32[topic] = partitions32
assignedPartitions = append(assignedPartitions, topic)
}
assignments, err := (&consumer.Assignment{
Version: 1,
AssignedPartitions: assignedPartitions,
}).Bytes()
if err != nil {
return request, err
}
request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
MemberID: memberID,
MemberAssignments: groupAssignment{
Version: 1,
Topics: topics32,
}.bytes(),
MemberID: memberID,
MemberAssignments: assignments,
})
}

Expand All @@ -1140,7 +1157,7 @@ func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID in
})
}

return request
return request, nil
}

func (cg *ConsumerGroup) fetchOffsets(conn coordinator, subs map[string][]int32) (map[string]map[int]int64, error) {
Expand Down
16 changes: 11 additions & 5 deletions consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"sync"
"testing"
"time"

"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/protocol/consumer"
)

var _ coordinator = mockCoordinator{}
Expand Down Expand Up @@ -146,11 +149,15 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
}

for memberID, topics := range topicsByMemberID {
mm, err := protocol.Marshal(1, consumer.Subscription{
Topics: topics,
})
if err != nil {
t.Errorf("error marshaling consumer subscription: %v", err)
}
resp.Members = append(resp.Members, joinGroupResponseMemberV1{
MemberID: memberID,
MemberMetadata: groupMetadata{
Topics: topics,
}.bytes(),
MemberID: memberID,
MemberMetadata: mm,
})
}

Expand Down Expand Up @@ -553,7 +560,6 @@ func TestConsumerGroupErrors(t *testing.T) {

for _, tt := range tests {
t.Run(tt.scenario, func(t *testing.T) {

tt.prepare(&mc)

group, err := NewConsumerGroup(ConsumerGroupConfig{
Expand Down
126 changes: 26 additions & 100 deletions describegroups.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package kafka

import (
"bufio"
"bytes"
"context"
"fmt"
"net"

"github.com/segmentio/kafka-go/protocol/consumer"
"github.com/segmentio/kafka-go/protocol/describegroups"
)

Expand Down Expand Up @@ -168,54 +166,26 @@ func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetad
return mm, nil
}

buf := bytes.NewBuffer(rawMetadata)
bufReader := bufio.NewReader(buf)
remain := len(rawMetadata)

var err error
var version16 int16

if remain, err = readInt16(bufReader, remain, &version16); err != nil {
return mm, err
}
mm.Version = int(version16)

if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
return mm, err
}
if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
var sub consumer.Subscription
err := sub.FromBytes(rawMetadata)
if err != nil {
return mm, err
}

if mm.Version == 1 && remain > 0 {
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
op := DescribeGroupsResponseMemberMetadataOwnedPartition{}
if fnRemain, fnErr = readString(r, size, &op.Topic); fnErr != nil {
return
}

ps := []int32{}
if fnRemain, fnErr = readInt32Array(r, fnRemain, &ps); fnErr != nil {
return
}

for _, p := range ps {
op.Partitions = append(op.Partitions, int(p))
}

mm.OwnedPartitions = append(mm.OwnedPartitions, op)
return
mm.Version = int(sub.Version)
mm.Topics = sub.Topics
mm.UserData = sub.UserData
mm.OwnedPartitions = make([]DescribeGroupsResponseMemberMetadataOwnedPartition, len(sub.OwnedPartitions))
for i, op := range sub.OwnedPartitions {
mm.OwnedPartitions[i] = DescribeGroupsResponseMemberMetadataOwnedPartition{
Topic: op.Topic,
Partitions: make([]int, len(op.Partitions)),
}

if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
return mm, err
for j, part := range op.Partitions {
mm.OwnedPartitions[i].Partitions[j] = int(part)
}
}

if remain != 0 {
return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
}

return mm, nil
}

Expand All @@ -231,68 +201,24 @@ func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssig
return ma, nil
}

buf := bytes.NewBuffer(rawAssignments)
bufReader := bufio.NewReader(buf)
remain := len(rawAssignments)

var err error
var version16 int16

if remain, err = readInt16(bufReader, remain, &version16); err != nil {
var assignment consumer.Assignment
err := assignment.FromBytes(rawAssignments)
if err != nil {
return ma, err
}
ma.Version = int(version16)

fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := GroupMemberTopic{}

if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
return
}

partitions := []int32{}

if fnRemain, fnErr = readInt32Array(r, fnRemain, &partitions); fnErr != nil {
return
ma.Version = int(assignment.Version)
ma.UserData = assignment.UserData
ma.Topics = make([]GroupMemberTopic, len(assignment.AssignedPartitions))
for i, topic := range assignment.AssignedPartitions {
ma.Topics[i] = GroupMemberTopic{
Topic: topic.Topic,
Partitions: make([]int, len(topic.Partitions)),
}
for _, partition := range partitions {
item.Partitions = append(item.Partitions, int(partition))
for j, part := range topic.Partitions {
ma.Topics[i].Partitions[j] = int(part)
}

ma.Topics = append(ma.Topics, item)
return
}
if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
return ma, err
}

if remain, err = readBytes(bufReader, remain, &ma.UserData); err != nil {
return ma, err
}

if remain != 0 {
return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
}

return ma, nil
}

// readInt32Array reads an array of int32s. It's adapted from the implementation of
// readStringArray.
func readInt32Array(r *bufio.Reader, sz int, v *[]int32) (remain int, err error) {
var content []int32
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
var value int32
if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
return
}
content = append(content, value)
return
}
if remain, err = readArrayWith(r, sz, fn); err != nil {
return
}

*v = content
return
}
Loading