Skip to content
This repository has been archived by the owner on Jun 11, 2021. It is now read-only.

Commit

Permalink
Backport pr95 (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
travis-minke-sap authored Mar 31, 2020
1 parent 1585b5c commit c2a58a0
Show file tree
Hide file tree
Showing 25 changed files with 126 additions and 58 deletions.
6 changes: 3 additions & 3 deletions components/channel/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/channel/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

[[constraint]]
name = "github.com/kyma-incubator/knative-kafka"
branch = "master"
branch = "release-0.12"

# Match Azure version in common component
[[override]]
Expand Down
2 changes: 1 addition & 1 deletion components/channel/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ ci-release: docker-push

ci-pr: docker-build

.PHONY: ci-release ci-pr
.PHONY: ci-master ci-release ci-pr

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/common/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ ci-release: docker-build

ci-pr: docker-build

.PHONY: ci-release ci-pr
.PHONY: ci-master ci-release ci-pr
6 changes: 3 additions & 3 deletions components/controller/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/controller/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,4 @@ required = [

[[constraint]]
name = "github.com/kyma-incubator/knative-kafka"
branch = "master"
branch = "release-0.12"
2 changes: 1 addition & 1 deletion components/controller/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ ci-release: docker-push

ci-pr: docker-build

.PHONY: ci-release ci-pr
.PHONY: ci-master ci-release ci-pr

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions components/dispatcher/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/dispatcher/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

[[constraint]]
name = "github.com/kyma-incubator/knative-kafka"
branch = "master"
branch = "release-0.12"

# Match Azure version in common component
[[override]]
Expand Down
2 changes: 1 addition & 1 deletion components/dispatcher/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ ci-release: docker-push

ci-pr: docker-build

.PHONY: ci-release ci-pr
.PHONY: ci-master ci-release ci-pr
9 changes: 6 additions & 3 deletions components/dispatcher/internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (d *Dispatcher) handleKafkaMessages(consumerOffset ConsumerOffset, subscrip
_ = d.Client.Dispatch(*cloudEvent, subscription.URI) // Ignore Errors - Dispatcher Will Retry And We're Moving On!

// Update Stored Offsets Based On The Processed Message
d.updateOffsets(consumerOffset.consumer, e)
d.updateOffsets(logger, consumerOffset.consumer, e)
currentTimeDuration := time.Now().Sub(consumerOffset.lastOffsetCommit)

// If "OffsetCommitCount" Number Of Messages Have Been Processed Since Last Offset Commit, Then Do One Now
Expand Down Expand Up @@ -186,11 +186,14 @@ func (d *Dispatcher) handleKafkaMessages(consumerOffset ConsumerOffset, subscrip
}

// Store Updated Offsets For The Partition If Consumer Still Has It Assigned
func (d *Dispatcher) updateOffsets(consumer kafkaconsumer.ConsumerInterface, message *kafka.Message) {
func (d *Dispatcher) updateOffsets(logger *zap.Logger, consumer kafkaconsumer.ConsumerInterface, message *kafka.Message) {
// Store The Updated Offsets
offsets := []kafka.TopicPartition{message.TopicPartition}
offsets[0].Offset++
consumer.StoreOffsets(offsets)
topicPartitions, err := consumer.StoreOffsets(offsets)
if err != nil {
logger.Error("Kafka Consumer Failed To Store Offsets", zap.Any("TopicPartitions", topicPartitions), zap.Error(err))
}
}

// Commit The Stored Offsets For Partitions Still Assigned To This Consumer
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions components/dispatcher/vendor/github.com/cloudevents/sdk-go/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c2a58a0

Please sign in to comment.