Skip to content

Commit 01c619c

Browse files
authored
[DP-1774] - topicctl get partitions to display under replicated and offline (#155)
* [DP-1774] - topicctl get partitions to display under replicated and offline * [DP-1774] - topicctl get partitions to display under replicated and offline * [DP-1774] - topicctl get partitions to display under replicated and offline * [DP-1774] - topicctl get partitions to display under replicated and offline * MInor code fixtures * Minor fixtures. Modifying get metadata from all topics to nil * Minor fixtures. Modifying get metadata from all topics to nil
1 parent 0075b97 commit 01c619c

File tree

9 files changed

+634
-15
lines changed

9 files changed

+634
-15
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ topicctl repl --cluster-config=examples/local-cluster/cluster.yaml
9797
```
9898
get brokers
9999
get topics
100+
get partitions
100101
get partitions topic-default
101102
get offsets topic-default
102103
tail topic-default
@@ -171,7 +172,7 @@ resource type in the cluster. Currently, the following operations are supported:
171172
| `get groups` | All consumer groups in the cluster |
172173
| `get lags [topic] [group]` | Lag for each topic partition for a consumer group |
173174
| `get members [group]` | Details of each member in a consumer group |
174-
| `get partitions [topic]` | All partitions in a topic |
175+
| `get partitions [optional: topics]` | Get all partitions for topics |
175176
| `get offsets [topic]` | Number of messages per partition along with start and end times |
176177
| `get topics` | All topics in the cluster |
177178

cmd/topicctl/subcmd/get.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package subcmd
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
67

78
"github.com/aws/aws-sdk-go/aws/session"
9+
"github.com/segmentio/topicctl/pkg/admin"
810
"github.com/segmentio/topicctl/pkg/cli"
911
log "github.com/sirupsen/logrus"
1012
"github.com/spf13/cobra"
@@ -31,6 +33,15 @@ type getCmdConfig struct {
3133

3234
var getConfig getCmdConfig
3335

36+
type partitionsCmdConfig struct {
37+
status admin.PartitionStatus
38+
summary bool
39+
}
40+
41+
var partitionsConfig partitionsCmdConfig
42+
43+
var partitionsStatusHelpText = "Allowed values: ok, offline, under-replicated"
44+
3445
func init() {
3546
getCmd.PersistentFlags().BoolVar(
3647
&getConfig.full,
@@ -211,10 +222,10 @@ func membersCmd() *cobra.Command {
211222
}
212223

213224
func partitionsCmd() *cobra.Command {
214-
return &cobra.Command{
215-
Use: "partitions [topic]",
216-
Short: "Displays partition information for the specified topic.",
217-
Args: cobra.ExactArgs(1),
225+
partitionsCommand := &cobra.Command{
226+
Use: "partitions [optional: topics]",
227+
Short: "Get all partitions information for topics",
228+
Args: cobra.MinimumNArgs(0),
218229
RunE: func(cmd *cobra.Command, args []string) error {
219230
ctx := context.Background()
220231
sess := session.Must(session.NewSession())
@@ -225,10 +236,35 @@ func partitionsCmd() *cobra.Command {
225236
}
226237
defer adminClient.Close()
227238

239+
topics := []string{}
240+
for _, arg := range args {
241+
topics = append(topics, arg)
242+
}
243+
228244
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
229-
return cliRunner.GetPartitions(ctx, args[0])
245+
return cliRunner.GetPartitions(
246+
ctx,
247+
topics,
248+
partitionsConfig.status,
249+
partitionsConfig.summary,
250+
)
230251
},
231252
}
253+
254+
partitionsCommand.Flags().Var(
255+
&partitionsConfig.status,
256+
"status",
257+
fmt.Sprintf("partition status\n%s", partitionsStatusHelpText),
258+
)
259+
260+
partitionsCommand.Flags().BoolVar(
261+
&partitionsConfig.summary,
262+
"summary",
263+
false,
264+
fmt.Sprintf("Display summary of partitions"),
265+
)
266+
267+
return partitionsCommand
232268
}
233269

234270
func offsetsCmd() *cobra.Command {

pkg/admin/brokerclient.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,3 +690,20 @@ func configEntriesToAPIConfigs(
690690

691691
return apiConfigs
692692
}
693+
694+
func (c *BrokerAdminClient) GetAllTopicsMetadata(
695+
ctx context.Context,
696+
) (*kafka.MetadataResponse, error) {
697+
client := c.GetConnector().KafkaClient
698+
req := kafka.MetadataRequest{
699+
Topics: nil,
700+
}
701+
702+
log.Debugf("Metadata request: %+v", req)
703+
metadata, err := client.Metadata(ctx, &req)
704+
if err != nil {
705+
return nil, fmt.Errorf("Error fetching all topics metadata: %+v", err)
706+
}
707+
708+
return metadata, nil
709+
}

pkg/admin/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ type Client interface {
3838
detailed bool,
3939
) (TopicInfo, error)
4040

41+
// GetAllTopicsMetadata performs kafka-go metadata call to get topic information
42+
GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error)
43+
4144
// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
4245
// keys that were updated.
4346
UpdateTopicConfig(

pkg/admin/format.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,205 @@ func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) str
504504
return string(bytes.TrimRight(buf.Bytes(), "\n"))
505505
}
506506

507+
// FormatTopicsPartitionsSummary creates a pretty table with summary of the
508+
// partitions for topics.
509+
func FormatTopicsPartitionsSummary(
510+
topicsPartitionsStatusSummary map[string]map[PartitionStatus][]int,
511+
) string {
512+
buf := &bytes.Buffer{}
513+
514+
headers := []string{
515+
"Topic",
516+
"Status",
517+
"Count",
518+
"IDs",
519+
}
520+
columnAligment := []int{
521+
tablewriter.ALIGN_LEFT,
522+
tablewriter.ALIGN_LEFT,
523+
tablewriter.ALIGN_LEFT,
524+
tablewriter.ALIGN_LEFT,
525+
}
526+
527+
table := tablewriter.NewWriter(buf)
528+
table.SetHeader(headers)
529+
table.SetAutoWrapText(true)
530+
table.SetColumnAlignment(columnAligment)
531+
table.SetBorders(
532+
tablewriter.Border{
533+
Left: false,
534+
Top: true,
535+
Right: false,
536+
Bottom: true,
537+
},
538+
)
539+
540+
topicNames := []string{}
541+
tableData := make(map[string][][]string)
542+
for topicName, partitionsStatusSummary := range topicsPartitionsStatusSummary {
543+
topicTableRows := [][]string{}
544+
545+
for partitionStatus, partitionStatusIDs := range partitionsStatusSummary {
546+
topicTableRows = append(topicTableRows, []string{
547+
fmt.Sprintf("%s", topicName),
548+
fmt.Sprintf("%s", partitionStatus),
549+
fmt.Sprintf("%d", len(partitionStatusIDs)),
550+
fmt.Sprintf("%+v", partitionStatusIDs),
551+
})
552+
}
553+
554+
// sort the topicTableRows by partitionStatus
555+
statusSort := func(i, j int) bool {
556+
// second element in the row is of type PartitionStatus
557+
return string(topicTableRows[i][1]) < string(topicTableRows[j][1])
558+
}
559+
560+
sort.Slice(topicTableRows, statusSort)
561+
562+
tableData[topicName] = topicTableRows
563+
topicNames = append(topicNames, topicName)
564+
}
565+
566+
sort.Strings(topicNames)
567+
for _, topicName := range topicNames {
568+
_, exists := tableData[topicName]
569+
if exists {
570+
for _, topicTableRow := range tableData[topicName] {
571+
table.Append(topicTableRow)
572+
}
573+
}
574+
}
575+
576+
table.Render()
577+
return string(bytes.TrimRight(buf.Bytes(), "\n"))
578+
}
579+
580+
// FormatTopicsPartitions creates a pretty table with information on all of the
581+
// partitions for topics.
582+
func FormatTopicsPartitions(
583+
topicsPartitionsStatusInfo map[string][]PartitionStatusInfo,
584+
brokers []BrokerInfo,
585+
) string {
586+
buf := &bytes.Buffer{}
587+
588+
headers := []string{
589+
"Topic",
590+
"ID",
591+
"Leader",
592+
"ISR",
593+
"Replicas",
594+
"Distinct\nRacks",
595+
"Racks",
596+
"Status",
597+
}
598+
columnAligment := []int{
599+
tablewriter.ALIGN_LEFT,
600+
tablewriter.ALIGN_LEFT,
601+
tablewriter.ALIGN_LEFT,
602+
tablewriter.ALIGN_LEFT,
603+
tablewriter.ALIGN_LEFT,
604+
tablewriter.ALIGN_LEFT,
605+
tablewriter.ALIGN_LEFT,
606+
tablewriter.ALIGN_LEFT,
607+
}
608+
609+
table := tablewriter.NewWriter(buf)
610+
table.SetHeader(headers)
611+
table.SetAutoWrapText(false)
612+
table.SetColumnAlignment(columnAligment)
613+
table.SetBorders(
614+
tablewriter.Border{
615+
Left: false,
616+
Top: true,
617+
Right: false,
618+
Bottom: true,
619+
},
620+
)
621+
622+
topicNames := []string{}
623+
brokerRacks := BrokerRacks(brokers)
624+
tableData := make(map[string][][]string)
625+
for topicName, partitionsStatusInfo := range topicsPartitionsStatusInfo {
626+
topicTableRows := [][]string{}
627+
for _, partitionStatusInfo := range partitionsStatusInfo {
628+
racks := partitionStatusInfo.Racks(brokerRacks)
629+
630+
distinctRacks := make(map[string]int)
631+
for _, rack := range racks {
632+
distinctRacks[rack] += 1
633+
}
634+
635+
partitionIsrs := []int{}
636+
for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr {
637+
partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID)
638+
}
639+
640+
partitionReplicas := []int{}
641+
for _, partitionReplica := range partitionStatusInfo.Partition.Replicas {
642+
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
643+
}
644+
645+
inSync := true
646+
if partitionStatusInfo.Status != Ok {
647+
inSync = false
648+
}
649+
650+
correctLeader := true
651+
if partitionStatusInfo.LeaderState != CorrectLeader {
652+
correctLeader = false
653+
}
654+
655+
var statusPrinter func(f string, a ...interface{}) string
656+
if !util.InTerminal() || inSync {
657+
statusPrinter = fmt.Sprintf
658+
} else if !inSync {
659+
statusPrinter = color.New(color.FgRed).SprintfFunc()
660+
}
661+
662+
var statePrinter func(f string, a ...interface{}) string
663+
if !util.InTerminal() || correctLeader {
664+
statePrinter = fmt.Sprintf
665+
} else if !correctLeader {
666+
statePrinter = color.New(color.FgCyan).SprintfFunc()
667+
}
668+
669+
leaderStateString := fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID)
670+
if !correctLeader {
671+
leaderStateString = fmt.Sprintf("%d %+v", partitionStatusInfo.Partition.Leader.ID,
672+
statePrinter("(%s)", string(partitionStatusInfo.LeaderState)),
673+
)
674+
}
675+
676+
topicTableRows = append(topicTableRows, []string{
677+
fmt.Sprintf("%s", topicName),
678+
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
679+
leaderStateString,
680+
fmt.Sprintf("%+v", partitionIsrs),
681+
fmt.Sprintf("%+v", partitionReplicas),
682+
fmt.Sprintf("%d", len(distinctRacks)),
683+
fmt.Sprintf("%+v", racks),
684+
fmt.Sprintf("%v", statusPrinter("%s", string(partitionStatusInfo.Status))),
685+
})
686+
}
687+
688+
tableData[topicName] = topicTableRows
689+
topicNames = append(topicNames, topicName)
690+
}
691+
692+
sort.Strings(topicNames)
693+
for _, topicName := range topicNames {
694+
_, exists := tableData[topicName]
695+
if exists {
696+
for _, topicTableRow := range tableData[topicName] {
697+
table.Append(topicTableRow)
698+
}
699+
}
700+
}
701+
702+
table.Render()
703+
return string(bytes.TrimRight(buf.Bytes(), "\n"))
704+
}
705+
507706
// FormatConfig creates a pretty table with all of the keys and values in a topic or
508707
// broker config.
509708
func FormatConfig(configMap map[string]string) string {

0 commit comments

Comments
 (0)