Skip to content

Commit cd70d3d

Browse files
Merge pull request #18 from segmentio/yolken-usability-improvements4
Misc. usability improvements
2 parents a714c3b + b31ee95 commit cd70d3d

File tree

12 files changed

+433
-49
lines changed

12 files changed

+433
-49
lines changed

cmd/topicctl/subcmd/apply.go

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,26 @@ import (
1818
)
1919

2020
var applyCmd = &cobra.Command{
21-
Use: "apply [topic configs]",
22-
Short: "apply one or more topic configs",
23-
Args: cobra.MinimumNArgs(1),
24-
RunE: applyRun,
21+
Use: "apply [topic configs]",
22+
Short: "apply one or more topic configs",
23+
Args: cobra.MinimumNArgs(1),
24+
PreRunE: applyPreRun,
25+
RunE: applyRun,
2526
}
2627

2728
type applyCmdConfig struct {
28-
brokersToRemove []int
29-
brokerThrottleMBsOverride int
30-
clusterConfig string
31-
dryRun bool
32-
partitionBatchSizeOverride int
33-
pathPrefix string
34-
rebalance bool
35-
skipConfirm bool
36-
sleepLoopTime time.Duration
29+
brokersToRemove []int
30+
brokerThrottleMBsOverride int
31+
clusterConfig string
32+
dryRun bool
33+
partitionBatchSizeOverride int
34+
pathPrefix string
35+
rebalance bool
36+
retentionDropStepDurationStr string
37+
skipConfirm bool
38+
sleepLoopDuration time.Duration
39+
40+
retentionDropStepDuration time.Duration
3741
}
3842

3943
var applyConfig applyCmdConfig
@@ -69,17 +73,23 @@ func init() {
6973
0,
7074
"Partition batch size override",
7175
)
76+
applyCmd.Flags().StringVar(
77+
&applyConfig.pathPrefix,
78+
"path-prefix",
79+
os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"),
80+
"Prefix for topic config paths",
81+
)
7282
applyCmd.Flags().BoolVar(
7383
&applyConfig.rebalance,
7484
"rebalance",
7585
false,
7686
"Explicitly rebalance broker partition assignments",
7787
)
7888
applyCmd.Flags().StringVar(
79-
&applyConfig.pathPrefix,
80-
"path-prefix",
81-
os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"),
82-
"Prefix for topic config paths",
89+
&applyConfig.retentionDropStepDurationStr,
90+
"retention-drop-step-duration",
91+
"",
92+
"Amount of time to use for retention drop steps",
8393
)
8494
applyCmd.Flags().BoolVar(
8595
&applyConfig.skipConfirm,
@@ -88,15 +98,30 @@ func init() {
8898
"Skip confirmation prompts during apply process",
8999
)
90100
applyCmd.Flags().DurationVar(
91-
&applyConfig.sleepLoopTime,
92-
"sleep-loop-time",
101+
&applyConfig.sleepLoopDuration,
102+
"sleep-loop-duration",
93103
10*time.Second,
94104
"Amount of time to wait between partition checks",
95105
)
96106

97107
RootCmd.AddCommand(applyCmd)
98108
}
99109

110+
func applyPreRun(cmd *cobra.Command, args []string) error {
111+
if applyConfig.retentionDropStepDurationStr != "" {
112+
var err error
113+
applyConfig.retentionDropStepDuration, err = time.ParseDuration(
114+
applyConfig.retentionDropStepDurationStr,
115+
)
116+
117+
if err != nil {
118+
return err
119+
}
120+
}
121+
122+
return nil
123+
}
124+
100125
func applyRun(cmd *cobra.Command, args []string) error {
101126
ctx, cancel := context.WithCancel(context.Background())
102127
defer cancel()
@@ -189,8 +214,9 @@ func applyTopic(
189214
DryRun: applyConfig.dryRun,
190215
PartitionBatchSizeOverride: applyConfig.partitionBatchSizeOverride,
191216
Rebalance: applyConfig.rebalance,
217+
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
192218
SkipConfirm: applyConfig.skipConfirm,
193-
SleepLoopTime: applyConfig.sleepLoopTime,
219+
SleepLoopDuration: applyConfig.sleepLoopDuration,
194220
TopicConfig: topicConfig,
195221
}
196222

cmd/topicctl/subcmd/get.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func getRun(cmd *cobra.Command, args []string) error {
153153
return fmt.Errorf("Must provide topic and groupID as additional positional arguments")
154154
}
155155

156-
return cliRunner.GetMemberLags(ctx, args[1], args[2])
156+
return cliRunner.GetMemberLags(ctx, args[1], args[2], getConfig.full)
157157
case "members":
158158
if len(args) != 2 {
159159
return fmt.Errorf("Must provide group ID as second positional argument")

pkg/apply/apply.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"path/filepath"
88
"reflect"
9+
"strings"
910
"time"
1011

1112
"github.com/hashicorp/go-multierror"
@@ -29,8 +30,9 @@ type TopicApplierConfig struct {
2930
DryRun bool
3031
PartitionBatchSizeOverride int
3132
Rebalance bool
33+
RetentionDropStepDuration time.Duration
3234
SkipConfirm bool
33-
SleepLoopTime time.Duration
35+
SleepLoopDuration time.Duration
3436
TopicConfig config.TopicConfig
3537
}
3638

@@ -168,7 +170,7 @@ func (t *TopicApplier) applyNewTopic(ctx context.Context) error {
168170
}
169171

170172
// Just do a short sleep to ensure that zk is updated before we check
171-
if err := interruptableSleep(ctx, t.config.SleepLoopTime/5); err != nil {
173+
if err := interruptableSleep(ctx, t.config.SleepLoopDuration/5); err != nil {
172174
return err
173175
}
174176

@@ -354,6 +356,25 @@ func (t *TopicApplier) updateSettings(
354356
return err
355357
}
356358

359+
var retentionDropStepDuration time.Duration
360+
if t.config.RetentionDropStepDuration != 0 {
361+
retentionDropStepDuration = t.config.RetentionDropStepDuration
362+
} else {
363+
var err error
364+
retentionDropStepDuration, err = t.config.ClusterConfig.GetDefaultRetentionDropStepDuration()
365+
if err != nil {
366+
return err
367+
}
368+
}
369+
370+
reduced, err := topicSettings.ReduceRetentionDrop(
371+
topicInfo.Config,
372+
retentionDropStepDuration,
373+
)
374+
if err != nil {
375+
return err
376+
}
377+
357378
if len(diffKeys) > 0 {
358379
diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys)
359380
if err != nil {
@@ -366,6 +387,18 @@ func (t *TopicApplier) updateSettings(
366387
diffsTable,
367388
)
368389

390+
if reduced {
391+
log.Infof(
392+
strings.Join(
393+
[]string{
394+
"Note: Retention drop has been reduced to minimize cluster disruption.",
395+
"Re-run apply afterwards to keep dropping retention to configured value or run with --retention-drop-step-duration=0 to not do gradual step-down.",
396+
},
397+
" ",
398+
),
399+
)
400+
}
401+
369402
if t.config.DryRun {
370403
log.Infof("Skipping update because dryRun is set to true")
371404
return nil
@@ -887,7 +920,7 @@ func (t *TopicApplier) updatePartitionsIteration(
887920
return err
888921
}
889922

890-
checkTimer := time.NewTicker(t.config.SleepLoopTime)
923+
checkTimer := time.NewTicker(t.config.SleepLoopDuration)
891924
defer checkTimer.Stop()
892925

893926
log.Info("Sleeping then entering check loop")
@@ -945,7 +978,7 @@ outerLoop:
945978
len(assignmentsToUpdate),
946979
admin.FormatTopicPartitions(notReady, t.brokers),
947980
)
948-
log.Infof("Sleeping for %s", t.config.SleepLoopTime.String())
981+
log.Infof("Sleeping for %s", t.config.SleepLoopDuration.String())
949982
case <-ctx.Done():
950983
return ctx.Err()
951984
}
@@ -1181,7 +1214,7 @@ func (t *TopicApplier) updateLeadersIteration(
11811214
return err
11821215
}
11831216

1184-
checkTimer := time.NewTicker(t.config.SleepLoopTime)
1217+
checkTimer := time.NewTicker(t.config.SleepLoopDuration)
11851218
defer checkTimer.Stop()
11861219

11871220
log.Info("Sleeping then entering check loop")
@@ -1212,7 +1245,7 @@ outerLoop:
12121245
admin.FormatTopicPartitions(wrongLeaders, t.brokers),
12131246
)
12141247

1215-
log.Infof("Sleeping for %s", t.config.SleepLoopTime.String())
1248+
log.Infof("Sleeping for %s", t.config.SleepLoopDuration.String())
12161249
case <-ctx.Done():
12171250
return ctx.Err()
12181251
}

pkg/apply/apply_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func TestApplyBasicUpdates(t *testing.T) {
4545
}
4646

4747
applier := testApplier(ctx, t, topicConfig)
48+
applier.config.RetentionDropStepDuration = 50 * time.Minute
49+
4850
assert.Equal(t, 3, applier.maxBatchSize)
4951
assert.Equal(t, int64(2000000), applier.throttleBytes)
5052

@@ -62,13 +64,15 @@ func TestApplyBasicUpdates(t *testing.T) {
6264
assert.Equal(t, "compact", topicInfo.Config["cleanup.policy"])
6365

6466
// Update retention and settings
65-
applier.topicConfig.Spec.RetentionMinutes = 501
67+
applier.topicConfig.Spec.RetentionMinutes = 400
6668
applier.topicConfig.Spec.Settings["cleanup.policy"] = "delete"
6769
err = applier.Apply(ctx)
6870
require.Nil(t, err)
6971
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
7072
require.Nil(t, err)
71-
assert.Equal(t, "30060000", topicInfo.Config[admin.RetentionKey])
73+
74+
// Dropped to only 450 because of retention reduction
75+
assert.Equal(t, "27000000", topicInfo.Config[admin.RetentionKey])
7276
assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"])
7377

7478
// Updating replication factor not allowed
@@ -868,7 +872,7 @@ func TestApplyOverrides(t *testing.T) {
868872
TopicConfig: topicConfig,
869873
DryRun: false,
870874
SkipConfirm: true,
871-
SleepLoopTime: 500 * time.Millisecond,
875+
SleepLoopDuration: 500 * time.Millisecond,
872876
PartitionBatchSizeOverride: 8,
873877
},
874878
)
@@ -906,11 +910,11 @@ func testApplier(
906910
ctx,
907911
adminClient,
908912
TopicApplierConfig{
909-
ClusterConfig: clusterConfig,
910-
TopicConfig: topicConfig,
911-
DryRun: false,
912-
SkipConfirm: true,
913-
SleepLoopTime: 500 * time.Millisecond,
913+
ClusterConfig: clusterConfig,
914+
TopicConfig: topicConfig,
915+
DryRun: false,
916+
SkipConfirm: true,
917+
SleepLoopDuration: 500 * time.Millisecond,
914918
},
915919
)
916920
require.Nil(t, err)

pkg/check/check_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ func TestCheck(t *testing.T) {
5858
ctx,
5959
adminClient,
6060
apply.TopicApplierConfig{
61-
ClusterConfig: clusterConfig,
62-
TopicConfig: topicConfig,
63-
DryRun: false,
64-
SkipConfirm: true,
65-
SleepLoopTime: 500 * time.Millisecond,
61+
ClusterConfig: clusterConfig,
62+
TopicConfig: topicConfig,
63+
DryRun: false,
64+
SkipConfirm: true,
65+
SleepLoopDuration: 500 * time.Millisecond,
6666
},
6767
)
6868
require.Nil(t, err)

pkg/cli/cli.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,12 @@ func (c *CLIRunner) GetGroupMembers(ctx context.Context, groupID string, full bo
352352

353353
// GetMemberLags fetches and prints a summary of the consumer group lag for each partition
354354
// in a single topic.
355-
func (c *CLIRunner) GetMemberLags(ctx context.Context, topic string, groupID string) error {
355+
func (c *CLIRunner) GetMemberLags(
356+
ctx context.Context,
357+
topic string,
358+
groupID string,
359+
full bool,
360+
) error {
356361
c.startSpinner()
357362

358363
// Check that topic exists before getting offsets; otherwise, the topic get
@@ -370,7 +375,7 @@ func (c *CLIRunner) GetMemberLags(ctx context.Context, topic string, groupID str
370375
return err
371376
}
372377

373-
c.printer("Group member lags:\n%s", groups.FormatMemberLags(memberLags))
378+
c.printer("Group member lags:\n%s", groups.FormatMemberLags(memberLags, full))
374379
return nil
375380
}
376381

pkg/cli/repl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (r *Repl) executor(in string) {
262262
log.Errorf("Error: %+v", err)
263263
return
264264
}
265-
if err := r.cliRunner.GetMemberLags(ctx, words[2], words[3]); err != nil {
265+
if err := r.cliRunner.GetMemberLags(ctx, words[2], words[3], false); err != nil {
266266
log.Errorf("Error: %+v", err)
267267
return
268268
}

pkg/config/cluster.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package config
33
import (
44
"context"
55
"errors"
6+
"fmt"
7+
"time"
68

79
"github.com/aws/aws-sdk-go/aws/session"
810
"github.com/hashicorp/go-multierror"
@@ -68,6 +70,10 @@ type ClusterSpec struct {
6870
// DefaultThrottleMB is the default broker throttle used for migrations in this
6971
// cluster. If unset, then a reasonable default is used instead.
7072
DefaultThrottleMB int64 `json:"defaultThrottleMB"`
73+
74+
// DefaultRetentionDropStepDuration is the default amount of time that retention drops will be
75+
// limited by. If unset, no retention drop limiting will be applied.
76+
DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"`
7177
}
7278

7379
// Validate evaluates whether the cluster config is valid.
@@ -98,9 +104,25 @@ func (c ClusterConfig) Validate() error {
98104
multierror.Append(err, errors.New("MajorVersion must be v0.10 or v2"))
99105
}
100106

107+
_, parseErr := c.GetDefaultRetentionDropStepDuration()
108+
if parseErr != nil {
109+
err = multierror.Append(
110+
err,
111+
fmt.Errorf("Error parsing retention drop step retention: %+v", parseErr),
112+
)
113+
}
114+
101115
return err
102116
}
103117

118+
func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error) {
119+
if c.Spec.DefaultRetentionDropStepDurationStr == "" {
120+
return 0, nil
121+
}
122+
123+
return time.ParseDuration(c.Spec.DefaultRetentionDropStepDurationStr)
124+
}
125+
104126
// NewAdminClient returns a new admin client using the parameters in the current cluster config.
105127
func (c ClusterConfig) NewAdminClient(
106128
ctx context.Context,

0 commit comments

Comments
 (0)