Skip to content

Commit 15a97a2

Browse files
authored
feat: add header filter feature (#105)
* feat: add header filter feature * chore: add skipMessageByHeaderFn to the README.md * chore: fix lint
1 parent 9ec9054 commit 15a97a2

File tree

7 files changed

+158
-4
lines changed

7 files changed

+158
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
223223
|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|
224224
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | |
225225
| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
226+
| `skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil |
226227
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
227228
| `concurrency` | Number of goroutines used at listeners | 1 |
228229
| `retryEnabled` | Retry/Exception consumer is working or not | false |

consumer_base.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type base struct {
5454
context context.Context
5555
r Reader
5656
cancelFn context.CancelFunc
57+
skipMessageByHeaderFn SkipMessageByHeaderFn
5758
metric *ConsumerMetric
5859
pause chan struct{}
5960
quit chan struct{}
@@ -107,6 +108,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
107108
singleConsumingStream: make(chan *Message, cfg.Concurrency),
108109
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
109110
consumerState: stateRunning,
111+
skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn,
110112
}
111113

112114
if cfg.DistributedTracingEnabled {
@@ -159,6 +161,14 @@ func (c *base) startConsume() {
159161
continue
160162
}
161163

164+
if c.skipMessageByHeaderFn != nil && c.skipMessageByHeaderFn(m.Headers) {
165+
c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", m.Headers)
166+
if err = c.r.CommitMessages([]kafka.Message{*m}); err != nil {
167+
c.logger.Errorf("Commit Error %s,", err.Error())
168+
}
169+
continue
170+
}
171+
162172
incomingMessage := &IncomingMessage{
163173
kafkaMessage: m,
164174
message: fromKafkaMessage(m),

consumer_base_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,45 @@ func Test_base_startConsume(t *testing.T) {
5757
t.Error(diff)
5858
}
5959
})
60+
61+
t.Run("Skip_Incoming_Messages_When_SkipMessageByHeaderFn_Is_Applied", func(t *testing.T) {
62+
// Given
63+
mc := mockReader{}
64+
skipMessageCh := make(chan struct{})
65+
b := base{
66+
wg: sync.WaitGroup{},
67+
r: &mc,
68+
logger: NewZapLogger(LogLevelDebug),
69+
incomingMessageStream: make(chan *IncomingMessage),
70+
skipMessageByHeaderFn: func(header []kafka.Header) bool {
71+
defer func() {
72+
skipMessageCh <- struct{}{}
73+
}()
74+
75+
for _, h := range header {
76+
if h.Key == "header" {
77+
return true
78+
}
79+
}
80+
return false
81+
},
82+
}
83+
84+
b.wg.Add(1)
85+
86+
// When
87+
go b.startConsume()
88+
89+
// Then
90+
<-skipMessageCh
91+
92+
// assert incomingMessageStream does not receive any value because message is skipped
93+
select {
94+
case <-b.incomingMessageStream:
95+
t.Fatal("incoming message stream must equal to 0")
96+
case <-time.After(1 * time.Second):
97+
}
98+
})
6099
}
61100

62101
func Test_base_Pause(t *testing.T) {

consumer_config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type PreBatchFn func([]*Message) []*Message
2121

2222
type ConsumeFn func(*Message) error
2323

24+
type SkipMessageByHeaderFn func(header []kafka.Header) bool
25+
2426
type DialConfig struct {
2527
Timeout time.Duration
2628
KeepAlive time.Duration
@@ -36,6 +38,7 @@ type ConsumerConfig struct {
3638
Dial *DialConfig
3739
BatchConfiguration *BatchConfiguration
3840
ConsumeFn ConsumeFn
41+
SkipMessageByHeaderFn SkipMessageByHeaderFn
3942
TransactionalRetry *bool
4043
RetryConfiguration RetryConfiguration
4144
LogLevel LogLevel
@@ -116,8 +119,6 @@ type DistributedTracingConfiguration struct {
116119
Propagator propagation.TextMapPropagator
117120
}
118121

119-
type SkipMessageByHeaderFn func(headers []Header) bool
120-
121122
func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header {
122123
headers := make([]Header, 0, len(cronsumerHeaders))
123124
for i := range cronsumerHeaders {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"github.com/Trendyol/kafka-konsumer/v2"
6+
"os"
7+
"os/signal"
8+
)
9+
10+
func main() {
11+
consumerCfg := &kafka.ConsumerConfig{
12+
Concurrency: 1,
13+
Reader: kafka.ReaderConfig{
14+
Brokers: []string{"localhost:29092"},
15+
Topic: "standart-topic",
16+
GroupID: "standart-cg",
17+
},
18+
RetryEnabled: false,
19+
SkipMessageByHeaderFn: skipMessageByHeaderFn,
20+
ConsumeFn: consumeFn,
21+
}
22+
23+
consumer, _ := kafka.NewConsumer(consumerCfg)
24+
defer consumer.Stop()
25+
26+
consumer.Consume()
27+
28+
fmt.Println("Consumer started...!")
29+
30+
c := make(chan os.Signal, 1)
31+
signal.Notify(c, os.Interrupt)
32+
<-c
33+
}
34+
35+
func skipMessageByHeaderFn(headers []kafka.Header) bool {
36+
for _, header := range headers {
37+
if header.Key == "SkipMessage" {
38+
return true
39+
}
40+
}
41+
return false
42+
}
43+
44+
func consumeFn(message *kafka.Message) error {
45+
fmt.Printf("Message From %s with value %s\n", message.Topic, string(message.Value))
46+
return nil
47+
}

test/integration/go.sum

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY=
2-
github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
1+
github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY=
32
github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
43
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
54
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=

test/integration/integration_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,63 @@ func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) {
436436
}
437437
}
438438

439+
func Test_Should_Skip_Message_When_Header_Filter_Given(t *testing.T) {
440+
// Given
441+
topic := "header-filter-topic"
442+
consumerGroup := "header-filter-cg"
443+
brokerAddress := "localhost:9092"
444+
445+
incomingMessage := []segmentio.Message{
446+
{
447+
Topic: topic,
448+
Headers: []segmentio.Header{
449+
{Key: "SkipMessage", Value: []byte("any")},
450+
},
451+
Key: []byte("1"),
452+
Value: []byte(`foo`),
453+
},
454+
}
455+
456+
_, cleanUp := createTopicAndWriteMessages(t, topic, incomingMessage)
457+
defer cleanUp()
458+
459+
consumeCh := make(chan struct{})
460+
skipMessageCh := make(chan struct{})
461+
462+
consumerCfg := &kafka.ConsumerConfig{
463+
Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup},
464+
SkipMessageByHeaderFn: func(header []kafka.Header) bool {
465+
defer func() {
466+
skipMessageCh <- struct{}{}
467+
}()
468+
for _, h := range header {
469+
if h.Key == "SkipMessage" {
470+
return true
471+
}
472+
}
473+
return false
474+
},
475+
ConsumeFn: func(message *kafka.Message) error {
476+
consumeCh <- struct{}{}
477+
return nil
478+
},
479+
}
480+
481+
consumer, _ := kafka.NewConsumer(consumerCfg)
482+
defer consumer.Stop()
483+
484+
consumer.Consume()
485+
486+
// Then
487+
<-skipMessageCh
488+
489+
select {
490+
case <-consumeCh:
491+
t.Fatal("Message must be skipped! consumeCh mustn't receive any value")
492+
case <-time.After(1 * time.Second):
493+
}
494+
}
495+
439496
func createTopicAndWriteMessages(t *testing.T, topicName string, messages []segmentio.Message) (*segmentio.Conn, func()) {
440497
t.Helper()
441498

0 commit comments

Comments
 (0)