Skip to content

Commit

Permalink
Merge branch 'IBM:main' into napallday/incremental-cooperative-balanc…
Browse files Browse the repository at this point in the history
…e-strategy
  • Loading branch information
napallday committed Aug 23, 2023
2 parents 5214325 + b1bf950 commit 6b95cec
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
GOFLAGS: -tags=functional
uses: golangci/golangci-lint-action@v3
with:
version: v1.54.0
version: v1.54.2
test:
name: Unit Testing with Go ${{ matrix.go-version }}
runs-on: ubuntu-latest
Expand Down
22 changes: 8 additions & 14 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@ linters-settings:
gocritic:
enabled-tags:
- diagnostic
- performance
# - experimental
# - opinionated
# - performance
# - style
enabled-checks:
- importShadow
- nestingReduce
- stringsCompare
# - unnamedResult
# - whyNoLint
disabled-checks:
- assignOp
- appendAssign
- commentedOutCode
- hugeParam
- ifElseChain
- singleCaseSwitch
- sloppyReassign
- wrapperFunc
funlen:
lines: 300
statements: 300
Expand All @@ -50,37 +56,25 @@ linters:
disable-all: true
enable:
- bodyclose
# - deadcode
- depguard
- exportloopref
- dogsled
# - dupl
- errcheck
- errorlint
- funlen
- gochecknoinits
# - goconst
- gocritic
- gocyclo
- gofmt
- goimports
# - golint
- gosec
# - gosimple
- govet
# - ineffassign
- misspell
# - nakedret
- nilerr
# - paralleltest
# - scopelint
- staticcheck
# - structcheck
# - stylecheck
- typecheck
- unconvert
- unused
# - varcheck
- whitespace

issues:
Expand Down
89 changes: 89 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,94 @@
# Changelog

## Version 1.41.0 (2023-08-21)

## What's Changed
### :rotating_light: Breaking Changes

Note: this version of Sarama has had a big overhaul in its adherence to the use of the right Kafka protocol versions for the given Config Version. It has also bumped the default Version set in Config (where one is not supplied) to 2.1.0. This is in preparation for Kafka 4.0 dropping support for protocol versions older than 2.1. If you are using Sarama against Kafka clusters older than v2.1.0, or using it against Azure EventHubs then you will likely have to change your application code to pin to the appropriate Version.

* chore(config): make DefaultVersion V2_0_0_0 by @dnwe in https://github.com/IBM/sarama/pull/2572
* chore(config): make DefaultVersion V2_1_0_0 by @dnwe in https://github.com/IBM/sarama/pull/2574
### :tada: New Features / Improvements
* Implement resolve_canonical_bootstrap_servers_only by @gebn in https://github.com/IBM/sarama/pull/2156
* feat: sleep when throttled (KIP-219) by @hindessm in https://github.com/IBM/sarama/pull/2536
* feat: add isValidVersion to protocol types by @dnwe in https://github.com/IBM/sarama/pull/2538
* fix(consumer): use newer LeaveGroup as appropriate by @dnwe in https://github.com/IBM/sarama/pull/2544
* Add support for up to version 4 List Groups API by @prestona in https://github.com/IBM/sarama/pull/2541
* fix(producer): use newer ProduceReq as appropriate by @dnwe in https://github.com/IBM/sarama/pull/2546
* fix(proto): ensure req+resp requiredVersion match by @dnwe in https://github.com/IBM/sarama/pull/2548
* chore(proto): permit CreatePartitionsRequest V1 by @dnwe in https://github.com/IBM/sarama/pull/2549
* chore(proto): permit AlterConfigsRequest V1 by @dnwe in https://github.com/IBM/sarama/pull/2550
* chore(proto): permit DeleteGroupsRequest V1 by @dnwe in https://github.com/IBM/sarama/pull/2551
* fix(proto): correct JoinGroup usage for wider version range by @dnwe in https://github.com/IBM/sarama/pull/2553
* fix(consumer): use full range of FetchRequest vers by @dnwe in https://github.com/IBM/sarama/pull/2554
* fix(proto): use range of OffsetCommitRequest vers by @dnwe in https://github.com/IBM/sarama/pull/2555
* fix(proto): use full range of MetadataRequest by @dnwe in https://github.com/IBM/sarama/pull/2556
* fix(proto): use fuller ranges of supported proto by @dnwe in https://github.com/IBM/sarama/pull/2558
* fix(proto): use full range of SyncGroupRequest by @dnwe in https://github.com/IBM/sarama/pull/2565
* fix(proto): use full range of ListGroupsRequest by @dnwe in https://github.com/IBM/sarama/pull/2568
* feat(proto): support for Metadata V6-V10 by @dnwe in https://github.com/IBM/sarama/pull/2566
* fix(proto): use full ranges for remaining proto by @dnwe in https://github.com/IBM/sarama/pull/2570
* feat(proto): add remaining protocol for V2.1 by @dnwe in https://github.com/IBM/sarama/pull/2573
* feat: add new error for MockDeleteTopicsResponse by @javiercri in https://github.com/IBM/sarama/pull/2475
* feat(gzip): switch to klauspost/compress gzip by @dnwe in https://github.com/IBM/sarama/pull/2600
### :bug: Fixes
* fix: correct unsupported version check by @hindessm in https://github.com/IBM/sarama/pull/2528
* fix: avoiding burning cpu if all partitions are paused by @napallday in https://github.com/IBM/sarama/pull/2532
* extend throttling metric scope by @hindessm in https://github.com/IBM/sarama/pull/2533
* Fix printing of final metrics by @prestona in https://github.com/IBM/sarama/pull/2545
* fix(consumer): cannot automatically fetch newly-added partitions unless restart by @napallday in https://github.com/IBM/sarama/pull/2563
* bug: implement unsigned modulus for partitioning with crc32 hashing by @csm8118 in https://github.com/IBM/sarama/pull/2560
* fix: avoid logging value of proxy.Dialer by @prestona in https://github.com/IBM/sarama/pull/2569
* fix(test): add missing closes to admin client tests by @dnwe in https://github.com/IBM/sarama/pull/2594
* fix(test): ensure some more clients are closed by @dnwe in https://github.com/IBM/sarama/pull/2595
* fix(examples): sync exactly_once and consumergroup by @dnwe in https://github.com/IBM/sarama/pull/2614
* fix(fvt): fresh metrics registry for each test by @dnwe in https://github.com/IBM/sarama/pull/2616
* fix(test): flaky test TestFuncOffsetManager by @napallday in https://github.com/IBM/sarama/pull/2609
### :package: Dependency updates
* chore(deps): bump the golang-org-x group with 1 update by @dependabot in https://github.com/IBM/sarama/pull/2542
* chore(deps): bump the golang-org-x group with 1 update by @dependabot in https://github.com/IBM/sarama/pull/2561
* chore(deps): bump module github.com/pierrec/lz4/v4 to v4.1.18 by @dnwe in https://github.com/IBM/sarama/pull/2589
* chore(deps): bump module github.com/jcmturner/gokrb5/v8 to v8.4.4 by @dnwe in https://github.com/IBM/sarama/pull/2587
* chore(deps): bump github.com/eapache/go-xerial-snappy digest to c322873 by @dnwe in https://github.com/IBM/sarama/pull/2586
* chore(deps): bump module github.com/klauspost/compress to v1.16.7 by @dnwe in https://github.com/IBM/sarama/pull/2588
* chore(deps): bump github.com/eapache/go-resiliency from 1.3.0 to 1.4.0 by @dependabot in https://github.com/IBM/sarama/pull/2598
### :wrench: Maintenance
* fix(fvt): ensure fully-replicated at test start by @hindessm in https://github.com/IBM/sarama/pull/2531
* chore: rollup fvt kafka to latest three by @dnwe in https://github.com/IBM/sarama/pull/2537
* Merge the two CONTRIBUTING.md's by @prestona in https://github.com/IBM/sarama/pull/2543
* fix(test): test timing error by @hindessm in https://github.com/IBM/sarama/pull/2552
* chore(ci): tidyup and improve actions workflows by @dnwe in https://github.com/IBM/sarama/pull/2557
* fix(test): shutdown MockBroker by @dnwe in https://github.com/IBM/sarama/pull/2571
* chore(proto): match HeartbeatResponse version by @dnwe in https://github.com/IBM/sarama/pull/2576
* chore(test): ensure MockBroker closed within test by @dnwe in https://github.com/IBM/sarama/pull/2575
* chore(test): ensure all mockresponses use version by @dnwe in https://github.com/IBM/sarama/pull/2578
* chore(ci): use latest Go in actions by @dnwe in https://github.com/IBM/sarama/pull/2580
* chore(test): speedup some slow tests by @dnwe in https://github.com/IBM/sarama/pull/2579
* chore(test): use modern protocol versions in FVT by @dnwe in https://github.com/IBM/sarama/pull/2581
* chore(test): fix a couple of leaks by @dnwe in https://github.com/IBM/sarama/pull/2591
* feat(fvt): experiment with per-kafka-version image by @dnwe in https://github.com/IBM/sarama/pull/2592
* chore(ci): replace toxiproxy client dep by @dnwe in https://github.com/IBM/sarama/pull/2593
* feat(fvt): add healthcheck, depends_on and --wait by @dnwe in https://github.com/IBM/sarama/pull/2601
* fix(fvt): handle msgset vs batchset by @dnwe in https://github.com/IBM/sarama/pull/2603
* fix(fvt): Metadata version in ensureFullyReplicated by @dnwe in https://github.com/IBM/sarama/pull/2612
* fix(fvt): versioned cfg for invalid topic producer by @dnwe in https://github.com/IBM/sarama/pull/2613
* chore(fvt): tweak to work across more versions by @dnwe in https://github.com/IBM/sarama/pull/2615
* feat(fvt): test wider range of kafkas by @dnwe in https://github.com/IBM/sarama/pull/2605
### :memo: Documentation
* fix(example): check if msg channel is closed by @ioanzicu in https://github.com/IBM/sarama/pull/2479
* chore: use go install for installing sarama tools by @vigith in https://github.com/IBM/sarama/pull/2599

## New Contributors
* @gebn made their first contribution in https://github.com/IBM/sarama/pull/2156
* @prestona made their first contribution in https://github.com/IBM/sarama/pull/2543
* @ioanzicu made their first contribution in https://github.com/IBM/sarama/pull/2479
* @csm8118 made their first contribution in https://github.com/IBM/sarama/pull/2560
* @javiercri made their first contribution in https://github.com/IBM/sarama/pull/2475
* @vigith made their first contribution in https://github.com/IBM/sarama/pull/2599

**Full Changelog**: https://github.com/IBM/sarama/compare/v1.40.1...v1.41.0

## Version 1.40.1 (2023-07-27)

## What's Changed
Expand Down
1 change: 1 addition & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
topicDetails.ConfigEntries = make(map[string]*string)

for _, entry := range resource.Configs {
entry := entry
// only include non-default non-sensitive config
// (don't actually think topic config will ever be sensitive)
if entry.Default || entry.Sensitive {
Expand Down
9 changes: 4 additions & 5 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
// leaks and message lost: it will not be garbage-collected automatically when it passes
// out of scope and buffered messages may not be flushed.
type AsyncProducer interface {

// AsyncClose triggers a shutdown of the producer. The shutdown has completed
// when both the Errors and Successes channels have been closed. When calling
// AsyncClose, you *must* continue to read from those channels in order to
Expand Down Expand Up @@ -366,17 +365,17 @@ func (p *asyncProducer) Close() error {
})
}

var errors ProducerErrors
var pErrs ProducerErrors
if p.conf.Producer.Return.Errors {
for event := range p.errors {
errors = append(errors, event)
pErrs = append(pErrs, event)
}
} else {
<-p.errors
}

if len(errors) > 0 {
return errors
if len(pErrs) > 0 {
return pErrs
}
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ func closeProducer(t *testing.T, p AsyncProducer) {
closeProducerWithTimeout(t, p, 5*time.Minute)
}

func expectResultsWithTimeout(t *testing.T, p AsyncProducer, successes, errors int, timeout time.Duration) {
func expectResultsWithTimeout(t *testing.T, p AsyncProducer, successCount, errorCount int, timeout time.Duration) {
t.Helper()
expect := successes + errors
expect := successCount + errorCount
defer func() {
if successes != 0 || errors != 0 {
t.Error("Unexpected successes", successes, "or errors", errors)
if successCount != 0 || errorCount != 0 {
t.Error("Unexpected successes", successCount, "or errors", errorCount)
}
}()
timer := time.NewTimer(timeout)
Expand All @@ -84,26 +84,26 @@ func expectResultsWithTimeout(t *testing.T, p AsyncProducer, successes, errors i
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
errors--
errorCount--
expect--
if errors < 0 {
if errorCount < 0 {
t.Error(msg.Err)
}
case msg := <-p.Successes():
if msg.flags != 0 {
t.Error("Message had flags set")
}
successes--
successCount--
expect--
if successes < 0 {
if successCount < 0 {
t.Error("Too many successes")
}
}
}
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
expectResultsWithTimeout(t, p, successes, errors, 5*time.Minute)
func expectResults(t *testing.T, p AsyncProducer, successCount, errorCount int) {
expectResultsWithTimeout(t, p, successCount, errorCount, 5*time.Minute)
}

type testPartitioner chan *int32
Expand Down
27 changes: 14 additions & 13 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,20 +1179,21 @@ func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, cur
}

for _, pair := range pairs {
if pair.SrcMemberID == src {
// create a deep copy of the pairs, excluding the current pair
reducedSet := make([]consumerPair, len(pairs)-1)
i := 0
for _, p := range pairs {
if p != pair {
reducedSet[i] = pair
i++
}
if pair.SrcMemberID != src {
continue
}
// create a deep copy of the pairs, excluding the current pair
reducedSet := make([]consumerPair, len(pairs)-1)
i := 0
for _, p := range pairs {
if p != pair {
reducedSet[i] = pair
i++
}

currentPath = append(currentPath, pair.SrcMemberID)
return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
}

currentPath = append(currentPath, pair.SrcMemberID)
return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
}
return currentPath, false
}
Expand Down Expand Up @@ -1292,7 +1293,7 @@ func (pq assignmentPriorityQueue) Len() int { return len(pq) }
func (pq assignmentPriorityQueue) Less(i, j int) bool {
// order assignment priority queue in descending order using assignment-count/member-id
if len(pq[i].assignments) == len(pq[j].assignments) {
return strings.Compare(pq[i].id, pq[j].id) > 0
return pq[i].id > pq[j].id
}
return len(pq[i].assignments) > len(pq[j].assignments)
}
Expand Down
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ func (b *Broker) sendAndReceiveSASLSCRAMv0() error {
length := len(msg)
authBytes := make([]byte, length+4) // 4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
copy(authBytes[4:], []byte(msg))
copy(authBytes[4:], msg)
_, err := b.write(authBytes)
b.updateOutgoingCommunicationMetrics(length + 4)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,17 +553,17 @@ func (client *client) RefreshMetadata(topics ...string) error {
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}

func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
func (client *client) GetOffset(topic string, partitionID int32, timestamp int64) (int64, error) {
if client.Closed() {
return -1, ErrClosedClient
}

offset, err := client.getOffset(topic, partitionID, time)
offset, err := client.getOffset(topic, partitionID, timestamp)
if err != nil {
if err := client.RefreshMetadata(topic); err != nil {
return -1, err
}
return client.getOffset(topic, partitionID, time)
return client.getOffset(topic, partitionID, timestamp)
}

return offset, err
Expand Down Expand Up @@ -905,7 +905,7 @@ func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, in
return nil, -1, ErrUnknownTopicOrPartition
}

func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
func (client *client) getOffset(topic string, partitionID int32, timestamp int64) (int64, error) {
broker, err := client.Leader(topic, partitionID)
if err != nil {
return -1, err
Expand All @@ -927,7 +927,7 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in
request.Version = 1
}

request.AddBlock(topic, partitionID, time, 1)
request.AddBlock(topic, partitionID, timestamp, 1)

response, err := broker.GetAvailableOffsets(request)
if err != nil {
Expand Down
Loading

0 comments on commit 6b95cec

Please sign in to comment.