From 0fc45b33d1c141541f8a7459f7ac7ec259e8e0e9 Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Thu, 22 Feb 2024 16:43:30 -0800 Subject: [PATCH] Allow null replicas in RequestPartition The `replicas` slice was declared as an empty slice which precludes passing a nil slice in the case of wanting to cancel an ongoing assignment. --- alterpartitionreassignments.go | 2 +- alterpartitionreassignments_test.go | 59 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/alterpartitionreassignments.go b/alterpartitionreassignments.go index dd67d003..91eecb09 100644 --- a/alterpartitionreassignments.go +++ b/alterpartitionreassignments.go @@ -81,7 +81,7 @@ func (c *Client) AlterPartitionReassignments( apiTopicMap[topic] = apiTopic } - replicas := []int32{} + var replicas []int32 for _, brokerID := range assignment.BrokerIDs { replicas = append(replicas, int32(brokerID)) } diff --git a/alterpartitionreassignments_test.go b/alterpartitionreassignments_test.go index 7bbce8ff..7a00e12b 100644 --- a/alterpartitionreassignments_test.go +++ b/alterpartitionreassignments_test.go @@ -2,8 +2,12 @@ package kafka import ( "context" + "net" "testing" + "github.com/stretchr/testify/require" + + "github.com/segmentio/kafka-go/protocol/alterpartitionreassignments" ktesting "github.com/segmentio/kafka-go/testing" ) @@ -117,3 +121,58 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) { ) } } + +type mockRoundTripper struct { + t *testing.T + expected alterpartitionreassignments.Request +} + +var _ RoundTripper = &mockRoundTripper{} + +func (m mockRoundTripper) RoundTrip(_ context.Context, _ net.Addr, req Request) (Response, error) { + switch msg := req.(type) { + case *alterpartitionreassignments.Request: + require.Equal(m.t, m.expected, *msg) + default: + m.t.Error( + "Unexpected req type", + "expected", "*alterpartitionreassignments.Request", + "got", msg, + ) + } + + return &alterpartitionreassignments.Response{}, nil +} + +func TestClientAlterPartitionReassignmentsNilReplicas(t *testing.T) { + client := &Client{ + Addr: TCP("nohost"), + Transport: mockRoundTripper{ + t: t, + expected: alterpartitionreassignments.Request{ + Topics: []alterpartitionreassignments.RequestTopic{ + { + Name: "foobar", + Partitions: []alterpartitionreassignments.RequestPartition{ + { + PartitionIndex: 0, + Replicas: nil, + }, + }, + }, + }, + }, + }, + } + + _, err := client.AlterPartitionReassignments(context.Background(), &AlterPartitionReassignmentsRequest{ + Assignments: []AlterPartitionReassignmentsRequestAssignment{ + { + Topic: "foobar", + PartitionID: 0, + BrokerIDs: nil, + }, + }, + }) + require.NoError(t, err) +}