Skip to content

Commit 81762bd

Browse files
authored
feat: add producer interceptor interface (#145)
* feat: add producer interceptor interface * fix: redpanda docker image * fix: producer tests * fix: linter * feat: add multi interceptor support * fix: refactoring code * docs: add producer interceptor example
1 parent a40194d commit 81762bd

File tree

7 files changed

+271
-37
lines changed

7 files changed

+271
-37
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want.
196196

197197
</details>
198198

199+
#### With Producer Interceptor
200+
201+
Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor)
202+
199203
#### With Distributed Tracing Support
200204

201205
Please refer to [Tracing Example](examples/with-tracing/README.md)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/Trendyol/kafka-konsumer/v2"
7+
)
8+
9+
func main() {
10+
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
11+
Writer: kafka.WriterConfig{
12+
Brokers: []string{"localhost:29092"},
13+
},
14+
}, newProducerInterceptor()...)
15+
16+
const topicName = "standart-topic"
17+
18+
_ = producer.Produce(context.Background(), kafka.Message{
19+
Topic: topicName,
20+
Key: []byte("1"),
21+
Value: []byte(`{ "foo": "bar" }`),
22+
})
23+
24+
_ = producer.ProduceBatch(context.Background(), []kafka.Message{
25+
{
26+
Topic: topicName,
27+
Key: []byte("1"),
28+
Value: []byte(`{ "foo": "bar" }`),
29+
},
30+
{
31+
Topic: topicName,
32+
Key: []byte("2"),
33+
Value: []byte(`{ "foo2": "bar2" }`),
34+
},
35+
})
36+
37+
fmt.Println("Messages sended...!")
38+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package main
2+
3+
import "github.com/Trendyol/kafka-konsumer/v2"
4+
5+
type producerInterceptor struct{}
6+
7+
func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
8+
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
9+
Key: "x-source-app",
10+
Value: []byte("kafka-konsumer"),
11+
})
12+
}
13+
14+
func newProducerInterceptor() []kafka.ProducerInterceptor {
15+
return []kafka.ProducerInterceptor{&producerInterceptor{}}
16+
}

producer.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ type Writer interface {
1818
}
1919

2020
type producer struct {
21-
w Writer
21+
w Writer
22+
interceptors []ProducerInterceptor
2223
}
2324

24-
func NewProducer(cfg *ProducerConfig) (Producer, error) {
25+
func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) {
2526
kafkaWriter := &kafka.Writer{
2627
Addr: kafka.TCP(cfg.Writer.Brokers...),
2728
Topic: cfg.Writer.Topic,
@@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
5152
kafkaWriter.Transport = transport
5253
}
5354

54-
p := &producer{w: kafkaWriter}
55+
p := &producer{w: kafkaWriter, interceptors: interceptors}
5556

5657
if cfg.DistributedTracingEnabled {
5758
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
@@ -64,18 +65,33 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
6465
return p, nil
6566
}
6667

67-
func (c *producer) Produce(ctx context.Context, message Message) error {
68-
return c.w.WriteMessages(ctx, message.toKafkaMessage())
68+
func (p *producer) Produce(ctx context.Context, message Message) error {
69+
if len(p.interceptors) > 0 {
70+
p.executeInterceptors(ctx, &message)
71+
}
72+
73+
return p.w.WriteMessages(ctx, message.toKafkaMessage())
6974
}
7075

71-
func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error {
76+
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
7277
kafkaMessages := make([]kafka.Message, 0, len(messages))
7378
for i := range messages {
79+
if len(p.interceptors) > 0 {
80+
p.executeInterceptors(ctx, &messages[i])
81+
}
82+
7483
kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
7584
}
76-
return c.w.WriteMessages(ctx, kafkaMessages...)
85+
86+
return p.w.WriteMessages(ctx, kafkaMessages...)
87+
}
88+
89+
func (p *producer) executeInterceptors(ctx context.Context, message *Message) {
90+
for _, interceptor := range p.interceptors {
91+
interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message})
92+
}
7793
}
7894

79-
func (c *producer) Close() error {
80-
return c.w.Close()
95+
func (p *producer) Close() error {
96+
return p.w.Close()
8197
}

producer_interceptor.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
)
6+
7+
type ProducerInterceptorContext struct {
8+
Context context.Context
9+
Message *Message
10+
}
11+
12+
type ProducerInterceptor interface {
13+
OnProduce(ctx ProducerInterceptorContext)
14+
}

producer_test.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/gofiber/fiber/v2/utils"
8+
79
"github.com/segmentio/kafka-go"
810
)
911

@@ -20,6 +22,26 @@ func Test_producer_Produce_Successfully(t *testing.T) {
2022
}
2123
}
2224

25+
func Test_producer_Produce_interceptor_Successfully(t *testing.T) {
26+
// Given
27+
mw := &mockWriter{}
28+
msg := Message{Headers: make([]Header, 0)}
29+
msg.Headers = append(msg.Headers, kafka.Header{
30+
Key: "x-correlation-id",
31+
Value: []byte(utils.UUIDv4()),
32+
})
33+
interceptor := newMockProducerInterceptor()
34+
35+
p := producer{w: mw, interceptors: interceptor}
36+
37+
// When
38+
err := p.Produce(context.Background(), msg)
39+
// Then
40+
if err != nil {
41+
t.Fatalf("Producing err %s", err.Error())
42+
}
43+
}
44+
2345
func Test_producer_ProduceBatch_Successfully(t *testing.T) {
2446
// Given
2547
mw := &mockWriter{}
@@ -33,6 +55,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) {
3355
}
3456
}
3557

58+
func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) {
59+
// Given
60+
mw := &mockWriter{}
61+
interceptor := newMockProducerInterceptor()
62+
p := producer{w: mw, interceptors: interceptor}
63+
64+
// When
65+
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
66+
// Then
67+
if err != nil {
68+
t.Fatalf("Batch Producing err %s", err.Error())
69+
}
70+
}
71+
3672
func Test_producer_Close_Successfully(t *testing.T) {
3773
// Given
3874
mw := &mockWriter{}
@@ -48,10 +84,23 @@ func Test_producer_Close_Successfully(t *testing.T) {
4884

4985
type mockWriter struct{}
5086

51-
func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error {
87+
func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error {
5288
return nil
5389
}
5490

5591
func (m *mockWriter) Close() error {
5692
return nil
5793
}
94+
95+
type mockProducerInterceptor struct{}
96+
97+
func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) {
98+
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
99+
Key: "test",
100+
Value: []byte("test"),
101+
})
102+
}
103+
104+
func newMockProducerInterceptor() []ProducerInterceptor {
105+
return []ProducerInterceptor{&mockProducerInterceptor{}}
106+
}

0 commit comments

Comments
 (0)