Skip to content

Commit 3558156

Browse files
authored
Merge pull request #11 from Trendyol/feature/#7
Implement batch consuming
2 parents b442a01 + 364bdb6 commit 3558156

File tree

24 files changed

+100587
-1325
lines changed

24 files changed

+100587
-1325
lines changed

.github/images/grafana.jpg

-121 KB
Binary file not shown.

.github/images/grafana.png

641 KB
Loading

README.md

Lines changed: 126 additions & 84 deletions
Large diffs are not rendered by default.

batch_consumer.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
8+
"github.com/Trendyol/kafka-konsumer/instrumentation"
9+
"github.com/segmentio/kafka-go"
10+
)
11+
12+
type batchConsumer struct {
13+
*base
14+
15+
consumeFn func([]Message) error
16+
17+
messageGroupLimit int
18+
messageGroupDuration time.Duration
19+
}
20+
21+
func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
22+
consumerBase, err := newBase(cfg)
23+
if err != nil {
24+
return nil, err
25+
}
26+
27+
c := batchConsumer{
28+
base: consumerBase,
29+
consumeFn: cfg.BatchConfiguration.BatchConsumeFn,
30+
messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit,
31+
messageGroupDuration: cfg.BatchConfiguration.MessageGroupDuration,
32+
}
33+
34+
if cfg.RetryEnabled {
35+
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
36+
return c.consumeFn([]Message{toMessage(message)})
37+
})
38+
}
39+
40+
if cfg.APIEnabled {
41+
c.base.setupAPI(cfg)
42+
}
43+
44+
return &c, nil
45+
}
46+
47+
func (b *batchConsumer) Consume() {
48+
go b.base.subprocesses.Start()
49+
go b.startConsume()
50+
51+
for i := 0; i < b.concurrency; i++ {
52+
b.wg.Add(1)
53+
go b.startBatch()
54+
}
55+
}
56+
57+
func (b *batchConsumer) startBatch() {
58+
defer b.wg.Done()
59+
60+
ticker := time.NewTicker(b.messageGroupDuration)
61+
messages := make([]Message, 0, b.messageGroupLimit)
62+
63+
for {
64+
select {
65+
case <-ticker.C:
66+
if len(messages) == 0 {
67+
continue
68+
}
69+
70+
b.process(messages)
71+
messages = messages[:0]
72+
case msg, ok := <-b.messageCh:
73+
if !ok {
74+
return
75+
}
76+
77+
messages = append(messages, msg)
78+
79+
if len(messages) == b.messageGroupLimit {
80+
b.process(messages)
81+
messages = messages[:0]
82+
}
83+
}
84+
}
85+
}
86+
87+
func (b *batchConsumer) process(messages []Message) {
88+
consumeErr := b.consumeFn(messages)
89+
if consumeErr != nil && b.retryEnabled {
90+
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
91+
92+
// Try to process same message again
93+
if consumeErr = b.consumeFn(messages); consumeErr != nil {
94+
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
95+
96+
cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
97+
for i := range messages {
98+
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
99+
}
100+
101+
if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil {
102+
b.logger.Errorf("Error producing messages to exception/retry topic %s", produceErr.Error())
103+
}
104+
}
105+
}
106+
107+
segmentioMessages := make([]kafka.Message, 0, len(messages))
108+
for i := range messages {
109+
segmentioMessages = append(segmentioMessages, kafka.Message(messages[i]))
110+
}
111+
112+
commitErr := b.r.CommitMessages(context.Background(), segmentioMessages...)
113+
if commitErr != nil {
114+
instrumentation.TotalUnprocessedBatchMessagesCounter.Inc()
115+
b.logger.Error("Error Committing messages %s", commitErr.Error())
116+
return
117+
}
118+
119+
instrumentation.TotalProcessedBatchMessagesCounter.Inc()
120+
}

consumer.go

Lines changed: 32 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -2,215 +2,82 @@ package kafka
22

33
import (
44
"context"
5-
"sync"
65

7-
cronsumer "github.com/Trendyol/kafka-cronsumer"
86
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
97
"github.com/Trendyol/kafka-konsumer/instrumentation"
108
"github.com/segmentio/kafka-go"
119
)
1210

13-
type Consumer interface {
14-
Consume()
15-
WithLogger(logger LoggerInterface)
16-
Stop() error
17-
}
18-
19-
type subprocess interface {
20-
Start()
21-
Stop()
22-
}
23-
2411
type consumer struct {
25-
r *kafka.Reader
26-
wg sync.WaitGroup
27-
once sync.Once
28-
messageCh chan Message
29-
quit chan struct{}
30-
concurrency int
31-
cronsumer kcronsumer.Cronsumer
12+
*base
3213

3314
consumeFn func(Message) error
34-
35-
retryEnabled bool
36-
retryFn func(message kcronsumer.Message) error
37-
retryTopic string
38-
39-
logger LoggerInterface
40-
41-
cancelFn context.CancelFunc
42-
api API
43-
subprocesses []subprocess
4415
}
4516

46-
var _ Consumer = (*consumer)(nil)
47-
48-
func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
49-
logger := NewZapLogger(cfg.LogLevel)
50-
reader, err := cfg.newKafkaReader()
17+
func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
18+
consumerBase, err := newBase(cfg)
5119
if err != nil {
52-
logger.Errorf("Error when initializing kafka reader %v", err)
5320
return nil, err
5421
}
5522

5623
c := consumer{
57-
messageCh: make(chan Message, cfg.Concurrency),
58-
quit: make(chan struct{}),
59-
concurrency: cfg.Concurrency,
60-
consumeFn: cfg.ConsumeFn,
61-
retryEnabled: cfg.RetryEnabled,
62-
logger: logger,
63-
subprocesses: []subprocess{},
64-
r: reader,
24+
base: consumerBase,
25+
consumeFn: cfg.ConsumeFn,
6526
}
6627

6728
if cfg.RetryEnabled {
68-
c.logger.Debug("Konsumer retry enabled mode active!")
69-
kcronsumerCfg := kcronsumer.Config{
70-
Brokers: cfg.RetryConfiguration.Brokers,
71-
Consumer: kcronsumer.ConsumerConfig{
72-
GroupID: cfg.Reader.GroupID,
73-
Topic: cfg.RetryConfiguration.Topic,
74-
Cron: cfg.RetryConfiguration.StartTimeCron,
75-
Duration: cfg.RetryConfiguration.WorkDuration,
76-
Concurrency: cfg.Concurrency,
77-
MinBytes: cfg.Reader.MinBytes,
78-
MaxBytes: cfg.Reader.MaxBytes,
79-
MaxRetry: cfg.RetryConfiguration.MaxRetry,
80-
MaxWait: cfg.Reader.MaxWait,
81-
CommitInterval: cfg.Reader.CommitInterval,
82-
HeartbeatInterval: cfg.Reader.HeartbeatInterval,
83-
SessionTimeout: cfg.Reader.SessionTimeout,
84-
RebalanceTimeout: cfg.Reader.RebalanceTimeout,
85-
StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset),
86-
RetentionTime: cfg.Reader.RetentionTime,
87-
},
88-
LogLevel: "info",
89-
}
90-
91-
if !cfg.RetryConfiguration.SASL.IsEmpty() {
92-
c.logger.Debug("Setting cronsumer SASL configurations...")
93-
kcronsumerCfg.SASL.Enabled = true
94-
kcronsumerCfg.SASL.AuthType = string(cfg.RetryConfiguration.SASL.Type)
95-
kcronsumerCfg.SASL.Username = cfg.RetryConfiguration.SASL.Username
96-
kcronsumerCfg.SASL.Password = cfg.RetryConfiguration.SASL.Password
97-
kcronsumerCfg.SASL.Rack = cfg.RetryConfiguration.Rack
98-
}
99-
100-
if !cfg.RetryConfiguration.TLS.IsEmpty() {
101-
c.logger.Debug("Setting cronsumer TLS configurations...")
102-
kcronsumerCfg.SASL.RootCAPath = cfg.RetryConfiguration.TLS.RootCAPath
103-
kcronsumerCfg.SASL.IntermediateCAPath = cfg.RetryConfiguration.TLS.IntermediateCAPath
104-
}
105-
106-
c.retryTopic = cfg.RetryConfiguration.Topic
107-
c.retryFn = func(message kcronsumer.Message) error {
108-
consumeErr := c.consumeFn(ToMessage(message))
109-
instrumentation.TotalProcessedRetryableMessagesCounter.Inc()
110-
return consumeErr
111-
}
112-
113-
c.cronsumer = cronsumer.New(&kcronsumerCfg, c.retryFn)
114-
c.subprocesses = append(c.subprocesses, c.cronsumer)
29+
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
30+
return c.consumeFn(toMessage(message))
31+
})
11532
}
11633

11734
if cfg.APIEnabled {
118-
c.logger.Debug("Metrics API Enabled!")
119-
c.api = NewAPI(cfg)
120-
c.subprocesses = append(c.subprocesses, c.api)
35+
c.base.setupAPI(cfg)
12136
}
12237

12338
return &c, nil
12439
}
12540

12641
func (c *consumer) Consume() {
127-
go c.startSubprocesses()
128-
129-
ctx, cancel := context.WithCancel(context.Background())
130-
c.cancelFn = cancel
131-
132-
go c.consume(ctx)
42+
go c.subprocesses.Start()
43+
go c.base.startConsume()
13344

13445
for i := 0; i < c.concurrency; i++ {
13546
c.wg.Add(1)
13647
go func() {
13748
defer c.wg.Done()
13849

13950
for message := range c.messageCh {
140-
err := c.consumeFn(message)
141-
instrumentation.TotalProcessedMessagesCounter.Inc()
142-
143-
if err != nil && c.retryEnabled {
144-
c.logger.Warnf("Consume Function Err %s, Message is sending to exception/retry topic %s", err.Error(), c.retryTopic)
145-
146-
retryableMsg := message.ToRetryableMessage(c.retryTopic)
147-
if err = c.retryFn(retryableMsg); err != nil {
148-
if err = c.cronsumer.Produce(retryableMsg); err != nil {
149-
c.logger.Errorf("Error producing message %s to exception/retry topic %v",
150-
string(retryableMsg.Value), err.Error())
151-
}
152-
}
153-
}
154-
155-
if err = c.r.CommitMessages(context.Background(), kafka.Message(message)); err != nil {
156-
c.logger.Errorf("Error Committing message %s", string(message.Value))
157-
}
51+
c.process(message)
15852
}
15953
}()
16054
}
16155
}
16256

163-
func (c *consumer) Stop() error {
164-
c.logger.Debug("Consuming is closing!")
165-
var err error
166-
c.once.Do(func() {
167-
c.stopSubprocesses()
168-
c.cancelFn()
169-
c.quit <- struct{}{}
170-
close(c.messageCh)
171-
c.wg.Wait()
172-
err = c.r.Close()
173-
})
174-
175-
return err
176-
}
57+
func (c *consumer) process(message Message) {
58+
consumeErr := c.consumeFn(message)
17759

178-
func (c *consumer) WithLogger(logger LoggerInterface) {
179-
c.logger = logger
180-
}
181-
182-
func (c *consumer) startSubprocesses() {
183-
for i := range c.subprocesses {
184-
c.subprocesses[i].Start()
185-
}
186-
}
60+
if consumeErr != nil && c.retryEnabled {
61+
c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error())
18762

188-
func (c *consumer) stopSubprocesses() {
189-
for i := range c.subprocesses {
190-
c.subprocesses[i].Stop()
191-
}
192-
}
63+
// Try to process same message again
64+
if consumeErr = c.consumeFn(message); consumeErr != nil {
65+
c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic)
19366

194-
func (c *consumer) consume(ctx context.Context) {
195-
c.logger.Debug("Consuming is starting")
196-
c.wg.Add(1)
197-
defer c.wg.Done()
198-
199-
for {
200-
select {
201-
case <-c.quit:
202-
return
203-
default:
204-
message, err := c.r.FetchMessage(ctx)
205-
if err != nil {
206-
if ctx.Err() != nil {
207-
continue
208-
}
209-
c.logger.Errorf("Message could not read, err %s", err.Error())
210-
continue
67+
retryableMsg := message.toRetryableMessage(c.retryTopic)
68+
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
69+
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
70+
string(retryableMsg.Value), produceErr.Error())
21171
}
212-
213-
c.messageCh <- Message(message)
21472
}
21573
}
74+
75+
commitErr := c.r.CommitMessages(context.Background(), kafka.Message(message))
76+
if commitErr != nil {
77+
instrumentation.TotalUnprocessedMessagesCounter.Inc()
78+
c.logger.Errorf("Error Committing message %s, %s", string(message.Value), commitErr.Error())
79+
return
80+
}
81+
82+
instrumentation.TotalProcessedMessagesCounter.Inc()
21683
}

0 commit comments

Comments
 (0)