Skip to content

Commit

Permalink
Add CreateAcls Admin API support (#839)
Browse files Browse the repository at this point in the history
This adds a CreateACLs method to the kafka Client
to supports the CreateAcls Admin API.

Signed-off-by: Guillaume Fillon <[email protected]>
  • Loading branch information
zirkome committed Feb 25, 2022
1 parent 8f2199a commit b952e63
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 17 deletions.
35 changes: 28 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-211:
Expand All @@ -145,7 +148,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-222:
Expand All @@ -161,7 +167,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-231:
Expand All @@ -177,7 +186,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-241:
Expand All @@ -204,7 +216,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-260:
Expand All @@ -231,7 +246,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

kafka-271:
Expand All @@ -258,7 +276,10 @@ jobs:
ports:
- 9092:9092
- 9093:9093
environment: *environment
environment:
<<: *environment
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
steps: *steps

workflows:
Expand Down
49 changes: 49 additions & 0 deletions createacl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientCreateACLs(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.0.1") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
ACLs: []ACLEntry{
{
Principal: "User:alice",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeTopic,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-topic-for-alice",
Host: "*",
},
{
Principal: "User:bob",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeGroup,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-group-for-bob",
Host: "*",
},
},
})
if err != nil {
t.Fatal(err)
}

for _, err := range res.Errors {
if err != nil {
t.Error(err)
}
}
}
108 changes: 108 additions & 0 deletions createacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/createacls"
)

// CreateACLsRequest represents a request sent to a kafka broker to add
// new ACLs.
type CreateACLsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of ACL to create.
ACLs []ACLEntry
}

// CreateACLsResponse represents a response from a kafka broker to an ACL
// creation request.
type CreateACLsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of errors that occurred while attempting to create
// the ACLs.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Errors []error
}

type ACLPermissionType int8

const (
ACLPermissionTypeUnknown ACLPermissionType = 0
ACLPermissionTypeAny ACLPermissionType = 1
ACLPermissionTypeDeny ACLPermissionType = 2
ACLPermissionTypeAllow ACLPermissionType = 3
)

type ACLOperationType int8

const (
ACLOperationTypeUnknown ACLOperationType = 0
ACLOperationTypeAny ACLOperationType = 1
ACLOperationTypeAll ACLOperationType = 2
ACLOperationTypeRead ACLOperationType = 3
ACLOperationTypeWrite ACLOperationType = 4
ACLOperationTypeCreate ACLOperationType = 5
ACLOperationTypeDelete ACLOperationType = 6
ACLOperationTypeAlter ACLOperationType = 7
ACLOperationTypeDescribe ACLOperationType = 8
ACLOperationTypeClusterAction ACLOperationType = 9
ACLOperationTypeDescribeConfigs ACLOperationType = 10
ACLOperationTypeAlterConfigs ACLOperationType = 11
ACLOperationTypeIdempotentWrite ACLOperationType = 12
)

type ACLEntry struct {
ResourceType ResourceType
ResourceName string
ResourcePatternType PatternType
Principal string
Host string
Operation ACLOperationType
PermissionType ACLPermissionType
}

// CreateACLs sends ACLs creation request to a kafka broker and returns the
// response.
func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) {
acls := make([]createacls.RequestACLs, 0, len(req.ACLs))

for _, acl := range req.ACLs {
acls = append(acls, createacls.RequestACLs{
ResourceType: int8(acl.ResourceType),
ResourceName: acl.ResourceName,
ResourcePatternType: int8(acl.ResourcePatternType),
Principal: acl.Principal,
Host: acl.Host,
Operation: int8(acl.Operation),
PermissionType: int8(acl.PermissionType),
})
}

m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{
Creations: acls,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err)
}

res := m.(*createacls.Response)
ret := &CreateACLsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Errors: make([]error, 0, len(res.Results)),
}

for _, t := range res.Results {
ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage))
}

return ret, nil
}
10 changes: 0 additions & 10 deletions describeconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ type DescribeConfigsRequest struct {
IncludeDocumentation bool
}

type ResourceType int8

const (
// See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36
ResourceTypeUnknown ResourceType = 0
ResourceTypeTopic ResourceType = 2
ResourceTypeBroker ResourceType = 4
)

type DescribeConfigRequestResource struct {
// Resource Type
ResourceType ResourceType
Expand Down Expand Up @@ -122,7 +113,6 @@ func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsReques
IncludeSynonyms: req.IncludeSynonyms,
IncludeDocumentation: req.IncludeDocumentation,
})

if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeConfigs: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ services:
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
Expand Down
49 changes: 49 additions & 0 deletions protocol/createacls/createacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package createacls

import "github.com/segmentio/kafka-go/protocol"

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v2,tag"`

Creations []RequestACLs `kafka:"min=v0,max=v2"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls }

func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
return cluster.Brokers[cluster.Controller], nil
}

type RequestACLs struct {
ResourceType int8 `kafka:"min=v0,max=v2"`
ResourceName string `kafka:"min=v0,max=v2"`
ResourcePatternType int8 `kafka:"min=v0,max=v2"`
Principal string `kafka:"min=v0,max=v2"`
Host string `kafka:"min=v0,max=v2"`
Operation int8 `kafka:"min=v0,max=v2"`
PermissionType int8 `kafka:"min=v0,max=v2"`
}

type Response struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v2,tag"`

ThrottleTimeMs int32 `kafka:"min=v0,max=v2"`
Results []ResponseACLs `kafka:"min=v0,max=v2"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls }

type ResponseACLs struct {
ErrorCode int16 `kafka:"min=v0,max=v2"`
ErrorMessage string `kafka:"min=v0,max=v2,nullable"`
}

var _ protocol.BrokerMessage = (*Request)(nil)
37 changes: 37 additions & 0 deletions resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kafka

// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
type ResourceType int8

const (
ResourceTypeUnknown ResourceType = 0
ResourceTypeAny ResourceType = 1
ResourceTypeTopic ResourceType = 2
ResourceTypeGroup ResourceType = 3
// See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36
ResourceTypeBroker ResourceType = 4
ResourceTypeCluster ResourceType = 4
ResourceTypeTransactionalID ResourceType = 5
ResourceTypeDelegationToken ResourceType = 6
)

// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
type PatternType int8

const (
// PatternTypeUnknown represents any PatternType which this client cannot
// understand.
PatternTypeUnknown PatternType = 0
// PatternTypeAny matches any resource pattern type.
PatternTypeAny PatternType = 1
// PatternTypeMatch perform pattern matching.
PatternTypeMatch PatternType = 2
// PatternTypeLiteral represents a literal name.
// A literal name defines the full name of a resource, e.g. topic with name
// 'foo', or group with name 'bob'.
PatternTypeLiteral PatternType = 3
// PatternTypePrefixed represents a prefixed name.
// A prefixed name defines a prefix for a resource, e.g. topics with names
// that start with 'foo'.
PatternTypePrefixed PatternType = 4
)

0 comments on commit b952e63

Please sign in to comment.