-
Notifications
You must be signed in to change notification settings - Fork 2
/
kafka_handler.go
114 lines (105 loc) · 3.21 KB
/
kafka_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package kafka_go
import (
"context"
"github.com/Shopify/sarama"
)
func newConsumerHandler(handlers map[string]TopicHandler, fallback map[string]TopicHandler,
middleware []ConsumerMiddleware, interceptor []ConsumerInterceptor, meta bool) *consumerHandler {
consumerHandler := &consumerHandler{
ready: make(chan bool),
handlers: handlers,
fallbacks: fallback,
middleware: middleware,
middlewarePresent: middleware != nil && len(middleware) > 0,
messageMeta: meta,
}
if interceptor == nil || len(interceptor) == 0 {
consumerHandler.interceptor = noOpInterceptor
} else {
chainedInterceptor := func(ctx context.Context, msg *SubscriberMessage, handler func(context.Context, *SubscriberMessage) bool) bool {
for i := len(interceptor) - 1; i >= 0; i-- {
currentInterceptor := interceptor[i]
currentHandler := handler
handler = func(ctx context.Context, msg *SubscriberMessage) bool {
return currentInterceptor(ctx, msg, currentHandler)
}
}
return handler(ctx, msg)
}
consumerHandler.interceptor = chainedInterceptor
}
return consumerHandler
}
type consumerHandler struct {
ready chan bool
retryCount int8
handlers, fallbacks map[string]TopicHandler
middleware []ConsumerMiddleware
interceptor ConsumerInterceptor
middlewarePresent bool
messageMeta bool
}
func (ch *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
ch.retryCount = 0
close(ch.ready)
return nil
}
func (ch *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
ch.retryCount = 0
return nil
}
func (ch *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
messageChan := claim.Messages()
for {
select {
case message := <-messageChan:
{
if message == nil {
continue
}
if handler := ch.handlers[message.Topic]; handler != nil {
msg := &SubscriberMessage{
Topic: message.Topic,
Partition: message.Partition,
Offset: message.Offset,
Key: message.Key,
Value: message.Value,
}
if ch.messageMeta {
msg.Meta = make(map[string]interface{})
}
// invoke Middleware if present
if ch.middlewarePresent {
ch.invokeMiddleware(session.Context(), msg)
}
if ch.interceptor(session.Context(), msg, handler.Handle) {
// successful handling
session.MarkMessage(message, "")
} else {
// handling failed, trying fallback handler if any
if fbHandler := ch.fallbacks[message.Topic]; fbHandler != nil {
fbHandler.Handle(session.Context(), msg)
}
session.MarkMessage(message, "")
}
} else {
Logger.Printf("No handler found for topic %v", message.Topic)
session.MarkMessage(message, "")
}
}
case <-session.Context().Done():
{
Logger.Println("Releasing kafka consumer handler")
return nil
}
}
}
}
func (ch *consumerHandler) invokeMiddleware(ctx context.Context, msg *SubscriberMessage) {
for _, m := range ch.middleware {
m(ctx, msg)
}
}
func noOpInterceptor(ctx context.Context, msg *SubscriberMessage, handler func(context.Context, *SubscriberMessage) bool) bool {
return handler(ctx, msg)
}