Skip to content

Commit 3830f4b

Browse files
get users subcommand (#153)
* bump kafka-go to include acl apis * add acl interfaces and aclinfo type stub * pull latest kafka-go and use kafka-go aclresource type * wip * fix test * fix typos * get acls working * getacls working * upgrade cobra to latest * finish separating get into separate subcommands * remove unneeded variables * wip * pr feedback * Revert "upgrade cobra to latest" This reverts commit 7b8ee42. * use getCliRunnerAndCtx in get acls * more consistent variable names * custom cobra type * bring in new kafka-go * support resource pattern type * add support for acloperationtype and remove options for unknown * improve descriptions * support permissiontype and host filters * add resource name filter and fix permission type formatting * support principal filtering * improve docs * add examples * remove comment * remove TODOs that are complete * remove TODOs that are complete * update README * fix test * wip * fix error handling * error handling for zk * more consistent error msg * clean up createacl * add TestBrokerClientCreateACLReadOnly * improve zk tests * run acl tests in ci * enable acls for kafka 2.4.1 in ci * fix zk tests * skip TestBrokerClientCreateACLReadOnly on old versions of kafka * try to debug * handle nested errors from createacls * operations -> operation * operations -> operation * remove setting log level in test * clean up allowed types in help command * fix merge conflict * bump kafka-go to version on main * wip * fix test * basic tests * start on getusers cmd * add json annotations * add json annotations * get users working * wip * add todos and fix type annotaitons * improve test * use CanTestBrokerAdminSecurity to feature flag test * update README * bump kafka-go to version on main * wip * basic tests * start on getusers cmd * add json annotations * get users working * wip * add todos and fix type annotaitons * improve test * use CanTestBrokerAdminSecurity to feature flag test * update README * use released version of kafka-go * createacl -> createacls * add minimal repl support * add sleep to stop flaky test failures * remove sleep * bump kafka-go to version on main * wip * basic tests * start on getusers cmd * add json annotations * get users working * wip * add todos and fix type annotaitons * improve test * use CanTestBrokerAdminSecurity to feature flag test * update README * bump kafka-go to version on main * wip * basic tests * add json annotations * get users working * add todos and fix type annotaitons * fix merge conflcits * test against kafka 2.7.1 * fix test for kafka 0.10 * fix supported features * add repl support * than -> then * CreateUser -> UpsertUser
1 parent bd46e16 commit 3830f4b

File tree

15 files changed

+375
-11
lines changed

15 files changed

+375
-11
lines changed

.github/workflows/ci.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ jobs:
117117
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
118118

119119

120-
test241:
120+
test271:
121121
runs-on: ubuntu-latest
122122
container:
123123
image: cimg/go:1.19
@@ -148,7 +148,7 @@ jobs:
148148
- "2181:2181"
149149

150150
kafka1:
151-
image: wurstmeister/kafka:2.12-2.4.1
151+
image: wurstmeister/kafka:2.13-2.7.1
152152
ports:
153153
- "9092:9092"
154154
env:
@@ -161,7 +161,7 @@ jobs:
161161
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true
162162

163163
kafka2:
164-
image: wurstmeister/kafka:2.12-2.4.1
164+
image: wurstmeister/kafka:2.13-2.7.1
165165
ports:
166166
- "9093:9092"
167167
env:
@@ -174,7 +174,7 @@ jobs:
174174
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true
175175

176176
kafka3:
177-
image: wurstmeister/kafka:2.12-2.4.1
177+
image: wurstmeister/kafka:2.13-2.7.1
178178
ports:
179179
- "9094:9092"
180180
env:
@@ -187,7 +187,7 @@ jobs:
187187
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true
188188

189189
kafka4:
190-
image: wurstmeister/kafka:2.12-2.4.1
190+
image: wurstmeister/kafka:2.13-2.7.1
191191
ports:
192192
- "9095:9092"
193193
env:
@@ -200,7 +200,7 @@ jobs:
200200
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true
201201

202202
kafka5:
203-
image: wurstmeister/kafka:2.12-2.4.1
203+
image: wurstmeister/kafka:2.13-2.7.1
204204
ports:
205205
- "9096:9092"
206206
env:
@@ -213,7 +213,7 @@ jobs:
213213
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true
214214

215215
kafka6:
216-
image: wurstmeister/kafka:2.12-2.4.1
216+
image: wurstmeister/kafka:2.13-2.7.1
217217
ports:
218218
- "9097:9092"
219219
env:
@@ -227,7 +227,7 @@ jobs:
227227

228228
snyk:
229229
runs-on: ubuntu-latest
230-
needs: [test010, test241]
230+
needs: [test010, test271]
231231
steps:
232232
- uses: actions/checkout@v3
233233
- name: Run Snyk to check for vulnerabilities

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ resource type in the cluster. Currently, the following operations are supported:
176176
| `get offsets [topic]` | Number of messages per partition along with start and end times |
177177
| `get topics` | All topics in the cluster |
178178
| `get acls [flags]` | Describe access control levels (ACLs) in the cluster |
179+
| `get users` | All users in the cluster |
179180

180181
#### rebalance
181182

cmd/topicctl/subcmd/get.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func init() {
6868
offsetsCmd(),
6969
topicsCmd(),
7070
aclsCmd(),
71+
usersCmd(),
7172
)
7273
RootCmd.AddCommand(getCmd)
7374
}
@@ -414,3 +415,24 @@ $ topicctl get acls --host 198.51.100.0
414415
)
415416
return cmd
416417
}
418+
419+
func usersCmd() *cobra.Command {
420+
return &cobra.Command{
421+
Use: "users",
422+
Short: "Displays information for all users in the cluster.",
423+
Args: cobra.NoArgs,
424+
RunE: func(cmd *cobra.Command, args []string) error {
425+
ctx := context.Background()
426+
sess := session.Must(session.NewSession())
427+
428+
adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
429+
if err != nil {
430+
return err
431+
}
432+
defer adminClient.Close()
433+
434+
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
435+
return cliRunner.GetUsers(ctx, nil)
436+
},
437+
}
438+
}

docker-compose-auth.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ services:
1616
- "2181:2181"
1717

1818
kafka:
19-
image: wurstmeister/kafka:2.12-2.4.1
19+
image: wurstmeister/kafka:2.13-2.7.1
2020
restart: on-failure:3
2121
links:
2222
- zookeeper

pkg/admin/brokerclient.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,17 @@ func NewBrokerAdminClient(
9999
supportedFeatures.DynamicBrokerConfigs = true
100100
}
101101

102-
// If we have DescribeAcls, than we're running a version of Kafka > 2.0.1,
102+
// If we have DescribeAcls, then we're running a version of Kafka > 2.0.1,
103103
// that will have support for all ACLs APIs.
104104
if _, ok := maxVersions["DescribeAcls"]; ok {
105105
supportedFeatures.ACLs = true
106106
}
107+
108+
// If we have DescribeUserScramCredentials, than we're running a version of Kafka > 2.7.1,
109+
// that will have support for all User APIs.
110+
if _, ok := maxVersions["DescribeUserScramCredentials"]; ok {
111+
supportedFeatures.Users = true
112+
}
107113
log.Debugf("Supported features: %+v", supportedFeatures)
108114

109115
adminClient := &BrokerAdminClient{
@@ -379,6 +385,72 @@ func (c *BrokerAdminClient) GetTopic(
379385
return topicInfos[0], nil
380386
}
381387

388+
func (c *BrokerAdminClient) GetUsers(
389+
ctx context.Context,
390+
names []string,
391+
) ([]UserInfo, error) {
392+
var users []kafka.UserScramCredentialsUser
393+
for _, name := range names {
394+
users = append(users, kafka.UserScramCredentialsUser{
395+
Name: name,
396+
})
397+
}
398+
399+
req := kafka.DescribeUserScramCredentialsRequest{
400+
Users: users,
401+
}
402+
log.Debugf("DescribeUserScramCredentials request: %+v", req)
403+
404+
resp, err := c.client.DescribeUserScramCredentials(ctx, &req)
405+
log.Debugf("DescribeUserScramCredentials response: %+v (%+v)", resp, err)
406+
if err != nil {
407+
return nil, err
408+
}
409+
410+
if err = util.DescribeUserScramCredentialsResponseResultsError(resp.Results); err != nil {
411+
return nil, err
412+
}
413+
414+
results := []UserInfo{}
415+
416+
for _, result := range resp.Results {
417+
var credentials []CredentialInfo
418+
for _, credential := range result.CredentialInfos {
419+
credentials = append(credentials, CredentialInfo{
420+
ScramMechanism: ScramMechanism(credential.Mechanism),
421+
Iterations: credential.Iterations,
422+
})
423+
}
424+
results = append(results, UserInfo{
425+
Name: result.User,
426+
CredentialInfos: credentials,
427+
})
428+
}
429+
return results, err
430+
}
431+
432+
func (c *BrokerAdminClient) UpsertUser(
433+
ctx context.Context,
434+
user kafka.UserScramCredentialsUpsertion,
435+
) error {
436+
if c.config.ReadOnly {
437+
return errors.New("Cannot create user in read-only mode")
438+
}
439+
req := kafka.AlterUserScramCredentialsRequest{
440+
Upsertions: []kafka.UserScramCredentialsUpsertion{user},
441+
}
442+
log.Debugf("AlterUserScramCredentials request: %+v", req)
443+
resp, err := c.client.AlterUserScramCredentials(ctx, &req)
444+
log.Debugf("AlterUserScramCredentials response: %+v", resp)
445+
if err != nil {
446+
return err
447+
}
448+
if err = resp.Results[0].Error; err != nil {
449+
return err
450+
}
451+
return nil
452+
}
453+
382454
// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
383455
// keys that were updated.
384456
func (c *BrokerAdminClient) UpdateTopicConfig(

pkg/admin/brokerclient_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,94 @@ func TestBrokerClientCreateGetACL(t *testing.T) {
664664
assert.Equal(t, expected, aclsInfo)
665665
}
666666

667+
func TestBrokerClientCreateGetUsers(t *testing.T) {
668+
if !util.CanTestBrokerAdminSecurity() {
669+
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set")
670+
}
671+
ctx := context.Background()
672+
client, err := NewBrokerAdminClient(
673+
ctx,
674+
BrokerAdminClientConfig{
675+
ConnectorConfig: ConnectorConfig{
676+
BrokerAddr: util.TestKafkaAddr(),
677+
},
678+
},
679+
)
680+
require.NoError(t, err)
681+
682+
name := util.RandomString("test-user-", 6)
683+
mechanism := kafka.ScramMechanismSha512
684+
685+
defer func() {
686+
resp, err := client.client.AlterUserScramCredentials(
687+
ctx,
688+
&kafka.AlterUserScramCredentialsRequest{
689+
Deletions: []kafka.UserScramCredentialsDeletion{
690+
{
691+
Name: name,
692+
Mechanism: mechanism,
693+
},
694+
},
695+
},
696+
)
697+
698+
if err != nil {
699+
t.Fatal(fmt.Errorf("failed to clean up user, err: %v", err))
700+
}
701+
for _, response := range resp.Results {
702+
if err = response.Error; err != nil {
703+
t.Fatal(fmt.Errorf("failed to clean up user, err: %v", err))
704+
}
705+
}
706+
707+
}()
708+
709+
err = client.UpsertUser(ctx, kafka.UserScramCredentialsUpsertion{
710+
Name: name,
711+
Mechanism: mechanism,
712+
Iterations: 15000,
713+
Salt: []byte("my-salt"),
714+
SaltedPassword: []byte("my-salted-password"),
715+
})
716+
717+
require.NoError(t, err)
718+
719+
resp, err := client.GetUsers(ctx, []string{name})
720+
require.NoError(t, err)
721+
assert.Equal(t, []UserInfo{
722+
{
723+
Name: name,
724+
CredentialInfos: []CredentialInfo{
725+
{
726+
ScramMechanism: ScramMechanism(mechanism),
727+
Iterations: 15000,
728+
},
729+
},
730+
},
731+
}, resp)
732+
}
733+
734+
func TestBrokerClientUpsertUserReadOnly(t *testing.T) {
735+
if !util.CanTestBrokerAdminSecurity() {
736+
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN_SECURITY is not set")
737+
}
738+
739+
ctx := context.Background()
740+
client, err := NewBrokerAdminClient(
741+
ctx,
742+
BrokerAdminClientConfig{
743+
ConnectorConfig: ConnectorConfig{
744+
BrokerAddr: util.TestKafkaAddr(),
745+
},
746+
ReadOnly: true,
747+
},
748+
)
749+
require.NoError(t, err)
750+
err = client.UpsertUser(ctx, kafka.UserScramCredentialsUpsertion{})
751+
752+
assert.Equal(t, errors.New("Cannot create user in read-only mode"), err)
753+
}
754+
667755
func TestBrokerClientCreateACLReadOnly(t *testing.T) {
668756
if !util.CanTestBrokerAdmin() {
669757
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")

pkg/admin/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ type Client interface {
4747
// GetAllTopicsMetadata performs kafka-go metadata call to get topic information
4848
GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error)
4949

50+
// GetUsers gets information about users in the cluster.
51+
GetUsers(
52+
ctx context.Context,
53+
names []string,
54+
) ([]UserInfo, error)
55+
5056
// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
5157
// keys that were updated.
5258
UpdateTopicConfig(
@@ -77,6 +83,12 @@ type Client interface {
7783
acls []kafka.ACLEntry,
7884
) error
7985

86+
// UpsertUser creates or updates an user in zookeeper.
87+
UpsertUser(
88+
ctx context.Context,
89+
user kafka.UserScramCredentialsUpsertion,
90+
) error
91+
8092
// AssignPartitions sets the replica broker IDs for one or more partitions in a topic.
8193
AssignPartitions(
8294
ctx context.Context,

pkg/admin/format.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,51 @@ func FormatACLs(acls []ACLInfo) string {
10001000
return string(bytes.TrimRight(buf.Bytes(), "\n"))
10011001
}
10021002

1003+
// FormatUsers creates a pretty table that lists the details of the
1004+
// argument users.
1005+
func FormatUsers(users []UserInfo) string {
1006+
buf := &bytes.Buffer{}
1007+
1008+
headers := []string{
1009+
"Name",
1010+
"Mechanism",
1011+
"Iterations",
1012+
}
1013+
1014+
table := tablewriter.NewWriter(buf)
1015+
table.SetHeader(headers)
1016+
table.SetAutoWrapText(false)
1017+
table.SetColumnAlignment(
1018+
[]int{
1019+
tablewriter.ALIGN_LEFT,
1020+
tablewriter.ALIGN_LEFT,
1021+
},
1022+
)
1023+
table.SetBorders(
1024+
tablewriter.Border{
1025+
Left: false,
1026+
Top: true,
1027+
Right: false,
1028+
Bottom: true,
1029+
},
1030+
)
1031+
1032+
for _, user := range users {
1033+
for _, credential := range user.CredentialInfos {
1034+
row := []string{
1035+
user.Name,
1036+
credential.ScramMechanism.String(),
1037+
fmt.Sprintf("%d", credential.Iterations),
1038+
}
1039+
1040+
table.Append(row)
1041+
}
1042+
}
1043+
1044+
table.Render()
1045+
return string(bytes.TrimRight(buf.Bytes(), "\n"))
1046+
}
1047+
10031048
func prettyConfig(config map[string]string) string {
10041049
rows := []string{}
10051050

pkg/admin/support.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,7 @@ type SupportedFeatures struct {
1919

2020
// ACLs indicates whether the client supports access control levels.
2121
ACLs bool
22+
23+
// Users indicates whether the client supports SASL Users.
24+
Users bool
2225
}

0 commit comments

Comments
 (0)