Skip to content

Commit 695a1d6

Browse files
authored
[SEGDP-2963] Add support for topic deletion (#240)
1 parent dfbbe28 commit 695a1d6

File tree

8 files changed

+174
-1
lines changed

8 files changed

+174
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ Currently, the following operations are supported:
178178
| Subcommand | Description |
179179
| --------- | ----------- |
180180
| `delete acls [flags]` | Deletes ACL(s) in the cluster matching the provided flags |
181+
| `delete topic [topic]` | Deletes a single topic in the cluster |
181182

182183
#### get
183184

cmd/topicctl/subcmd/delete.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func init() {
4444
addSharedFlags(deleteCmd, &deleteConfig.shared)
4545
deleteCmd.AddCommand(
4646
deleteACLCmd(),
47+
deleteTopicCmd(),
4748
)
4849
RootCmd.AddCommand(deleteCmd)
4950
}
@@ -150,3 +151,24 @@ $ topicctl delete acls --resource-type topic --resource-pattern-type literal --r
150151
cmd.MarkFlagRequired("resource-type")
151152
return cmd
152153
}
154+
155+
func deleteTopicCmd() *cobra.Command {
156+
return &cobra.Command{
157+
Use: "topic [topic name]",
158+
Short: "Delete a topic",
159+
Args: cobra.ExactArgs(1),
160+
RunE: func(cmd *cobra.Command, args []string) error {
161+
ctx := context.Background()
162+
sess := session.Must(session.NewSession())
163+
164+
adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false)
165+
if err != nil {
166+
return err
167+
}
168+
defer adminClient.Close()
169+
170+
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
171+
return cliRunner.DeleteTopic(ctx, args[0])
172+
},
173+
}
174+
}

pkg/admin/brokerclient.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,31 @@ func (c *BrokerAdminClient) AssignPartitions(
616616
return err
617617
}
618618

619+
// DeleteTopic deletes a topic in the cluster.
620+
func (c *BrokerAdminClient) DeleteTopic(ctx context.Context, topic string) error {
621+
if c.config.ReadOnly {
622+
return errors.New("Cannot delete topics in read-only mode")
623+
}
624+
625+
req := &kafka.DeleteTopicsRequest{
626+
Topics: []string{topic},
627+
}
628+
log.Debugf("DeleteTopics request: %+v", req)
629+
630+
resp, err := c.client.DeleteTopics(ctx, req)
631+
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)
632+
633+
if err != nil {
634+
return err
635+
}
636+
637+
if err, ok := resp.Errors[topic]; ok {
638+
return err
639+
}
640+
641+
return nil
642+
}
643+
619644
// AddPartitions extends a topic by adding one or more new partitions to it.
620645
func (c *BrokerAdminClient) AddPartitions(
621646
ctx context.Context,

pkg/admin/brokerclient_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,70 @@ func TestBrokerClientAddPartitions(t *testing.T) {
419419
assert.Equal(t, []int{6, 1}, topicInfo.Partitions[4].Replicas)
420420
}
421421

422+
func TestBrokerDeleteTopic(t *testing.T) {
423+
if !util.CanTestBrokerAdmin() {
424+
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
425+
}
426+
427+
ctx := context.Background()
428+
client, err := NewBrokerAdminClient(
429+
ctx,
430+
BrokerAdminClientConfig{
431+
ConnectorConfig: ConnectorConfig{
432+
BrokerAddr: util.TestKafkaAddr(),
433+
},
434+
},
435+
)
436+
require.NoError(t, err)
437+
438+
topicName := util.RandomString("topic-delete-", 6)
439+
err = client.CreateTopic(
440+
ctx,
441+
kafka.TopicConfig{
442+
Topic: topicName,
443+
NumPartitions: -1,
444+
ReplicationFactor: -1,
445+
ReplicaAssignments: []kafka.ReplicaAssignment{
446+
{
447+
Partition: 0,
448+
Replicas: []int{1, 2},
449+
},
450+
{
451+
Partition: 1,
452+
Replicas: []int{2, 3},
453+
},
454+
{
455+
Partition: 2,
456+
Replicas: []int{3, 4},
457+
},
458+
},
459+
ConfigEntries: []kafka.ConfigEntry{
460+
{
461+
ConfigName: "flush.ms",
462+
ConfigValue: "2000",
463+
},
464+
{
465+
ConfigName: "retention.ms",
466+
ConfigValue: "10000000",
467+
},
468+
},
469+
},
470+
)
471+
require.NoError(t, err)
472+
util.RetryUntil(t, 5*time.Second, func() error {
473+
_, err := client.GetTopic(ctx, topicName, true)
474+
return err
475+
})
476+
477+
err = client.DeleteTopic(ctx, topicName)
478+
require.NoError(t, err)
479+
480+
time.Sleep(time.Second * 10)
481+
482+
_, err = client.GetTopic(ctx, topicName, false)
483+
require.Error(t, err)
484+
}
485+
422486
func TestBrokerClientAlterAssignments(t *testing.T) {
423487
if !util.CanTestBrokerAdmin() {
424488
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")

pkg/admin/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type Client interface {
4141
detailed bool,
4242
) (TopicInfo, error)
4343

44+
// DeleteTopic deletes a single topic in the cluster.
45+
DeleteTopic(ctx context.Context, topic string) error
46+
4447
// GetACLs gets full information about each ACL in the cluster.
4548
GetACLs(
4649
ctx context.Context,

pkg/admin/zkclient.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,29 @@ func (c *ZKAdminClient) CreateTopic(
645645
return err
646646
}
647647

648+
func (c *ZKAdminClient) DeleteTopic(ctx context.Context, topic string) error {
649+
if c.readOnly {
650+
return errors.New("Cannot delete topics in read-only mode")
651+
}
652+
653+
req := kafka.DeleteTopicsRequest{
654+
Topics: []string{topic},
655+
}
656+
log.Debugf("DeleteTopics request: %+v", req)
657+
658+
resp, err := c.Connector.KafkaClient.DeleteTopics(ctx, &req)
659+
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)
660+
if err != nil {
661+
return err
662+
}
663+
664+
if err, ok := resp.Errors[topic]; ok {
665+
return err
666+
}
667+
668+
return nil
669+
}
670+
648671
// AssignPartitions notifies the cluster to begin a partition reassignment.
649672
// This should only be used for existing partitions; to create new partitions,
650673
// use the AddPartitions method.

pkg/cli/cli.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/segmentio/topicctl/pkg/config"
2222
"github.com/segmentio/topicctl/pkg/groups"
2323
"github.com/segmentio/topicctl/pkg/messages"
24+
"github.com/segmentio/topicctl/pkg/util"
2425
log "github.com/sirupsen/logrus"
2526
)
2627

@@ -608,6 +609,40 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error {
608609
return nil
609610
}
610611

612+
// DeleteTopic deletes a single topic.
613+
func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error {
614+
c.printer("Checking if topic %s exists...", topic)
615+
c.startSpinner()
616+
// First check that topic exists
617+
_, err := c.adminClient.GetTopic(ctx, topic, false)
618+
if err != nil {
619+
c.stopSpinner()
620+
return fmt.Errorf("error fetching topic info: %+v", err)
621+
}
622+
c.stopSpinner()
623+
c.printer("Topic %s exists in the cluster!", topic)
624+
625+
confirm, err := util.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false)
626+
if err != nil {
627+
return err
628+
}
629+
630+
if !confirm {
631+
return nil
632+
}
633+
634+
c.startSpinner()
635+
err = c.adminClient.DeleteTopic(ctx, topic)
636+
c.stopSpinner()
637+
if err != nil {
638+
return err
639+
}
640+
641+
c.printer("Topic %s successfully deleted", topic)
642+
643+
return nil
644+
}
645+
611646
// GerUsers fetches the details of each user in the cluster and prints out a table of them.
612647
func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error {
613648
c.startSpinner()

pkg/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
package version
22

33
// Version is the current topicctl version.
4-
const Version = "1.20.2"
4+
const Version = "1.21.0"

0 commit comments

Comments
 (0)