Skip to content
This repository was archived by the owner on Jun 11, 2021. It is now read-only.

Commit fdd27e9

Browse files
commit offsets and wait for consumers to stop (#98)
1 parent 7b0ebc7 commit fdd27e9

File tree

4 files changed

+54
-18
lines changed

4 files changed

+54
-18
lines changed

components/dispatcher/cmd/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,11 @@ func main() {
172172
healthServer.SetAlive(true)
173173
healthServer.SetDispatcherReady(true)
174174

175+
// Start The Controllers
175176
logger.Info("Starting controllers.")
176177
kncontroller.StartAll(stopCh, controllers[:]...)
177178

179+
// Block On Signal Handler Stop Channel
178180
<-stopCh
179181

180182
// Reset The Liveness and Readiness Flags In Preparation For Shutdown

components/dispatcher/internal/controller/kafkachannel_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ func TestAllCases(t *testing.T) {
137137
KafkaClientSet: kafkaClient,
138138
}
139139
}))
140+
141+
// Pause to let async go processes finish logging :(
142+
time.Sleep(1 * time.Second)
140143
}
141144

142145
func NewTestDispatcher(t *testing.T, channelKey string) *dispatcher.Dispatcher {

components/dispatcher/internal/dispatcher/dispatcher.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type ConsumerOffset struct {
4040
lastOffsetCommit time.Time
4141
offsets map[int32]kafka.Offset
4242
stopCh chan bool
43+
stoppedCh chan bool
4344
}
4445

4546
// Define a Dispatcher Struct to hold Dispatcher Config and dispatcher implementation details
@@ -62,17 +63,19 @@ func NewDispatcher(dispatcherConfig DispatcherConfig) *Dispatcher {
6263
return dispatcher
6364
}
6465

65-
// Close The Running Consumers Connections
66+
// Stop All Consumers
6667
func (d *Dispatcher) StopConsumers() {
6768
for subscription, _ := range d.consumers {
6869
d.stopConsumer(subscription)
6970
}
7071
}
7172

73+
// Stop An Individual Consumer
7274
func (d *Dispatcher) stopConsumer(subscription Subscription) {
7375
d.Logger.Info("Stopping Consumer", zap.String("GroupId", subscription.GroupId), zap.String("topic", d.Topic), zap.String("URI", subscription.URI))
74-
75-
d.consumers[subscription].stopCh <- true
76+
consumerOffset := d.consumers[subscription]
77+
consumerOffset.stopCh <- true // Send Stop Signal
78+
<-consumerOffset.stoppedCh // Wait Until Stop Completes
7679
delete(d.consumers, subscription)
7780
}
7881

@@ -94,8 +97,14 @@ func (d *Dispatcher) initConsumer(subscription Subscription) (*ConsumerOffset, e
9497
return nil, err
9598
}
9699

97-
stopCh := make(chan bool)
98-
consumerOffset := ConsumerOffset{consumer: consumer, lastOffsetCommit: time.Now(), offsets: make(map[int32]kafka.Offset), stopCh: stopCh}
100+
// Create The ConsumerOffset For Tracking State
101+
consumerOffset := ConsumerOffset{
102+
consumer: consumer,
103+
lastOffsetCommit: time.Now(),
104+
offsets: make(map[int32]kafka.Offset),
105+
stopCh: make(chan bool),
106+
stoppedCh: make(chan bool),
107+
}
99108

100109
// Start Consuming Messages From Topic (Async)
101110
go d.handleKafkaMessages(consumerOffset, subscription)
@@ -110,23 +119,26 @@ func (d *Dispatcher) handleKafkaMessages(consumerOffset ConsumerOffset, subscrip
110119
// Configure The Logger
111120
logger := d.Logger.With(zap.String("GroupID", subscription.GroupId))
112121

113-
stopped := false
114122
// Message Processing Loop
123+
stopped := false
115124
for !stopped {
116125

117126
// Poll For A New Event Message Until We Get One (Timeout is how long to wait before requesting again)
118127
event := consumerOffset.consumer.Poll(d.PollTimeoutMillis)
119128

120129
select {
121-
// Handle Shutdown Case
130+
131+
// Non-Blocking Channel Check
122132
case <-consumerOffset.stopCh:
123-
stopped = true
133+
stopped = true // Handle Shutdown Case - Break Out Of Loop
134+
124135
default:
125136

137+
// No Events To Process At The Moment
126138
if event == nil {
139+
127140
// Commit Offsets If The Amount Of Time Since Last Commit Is "OffsetCommitDuration" Or More
128141
currentTimeDuration := time.Now().Sub(consumerOffset.lastOffsetCommit)
129-
130142
if currentTimeDuration > d.OffsetCommitDurationMinimum && currentTimeDuration >= d.OffsetCommitDuration {
131143
d.commitOffsets(logger, &consumerOffset)
132144
}
@@ -177,12 +189,17 @@ func (d *Dispatcher) handleKafkaMessages(consumerOffset ConsumerOffset, subscrip
177189
}
178190
}
179191

192+
// Commit Offsets One Last Time
193+
logger.Debug("Final Offset Commit")
194+
d.commitOffsets(logger, &consumerOffset)
195+
180196
// Safe To Shutdown The Consumer
181197
logger.Debug("Shutting Down Consumer")
182198
err := consumerOffset.consumer.Close()
183199
if err != nil {
184200
logger.Error("Unable To Stop Consumer", zap.Error(err))
185201
}
202+
consumerOffset.stoppedCh <- true
186203
}
187204

188205
// Store Updated Offsets For The Partition If Consumer Still Has It Assigned
@@ -213,6 +230,7 @@ func (d *Dispatcher) commitOffsets(logger *zap.Logger, consumerOffset *ConsumerO
213230
consumerOffset.lastOffsetCommit = time.Now()
214231
}
215232

233+
// Update The Dispatcher's Subscriptions
216234
func (d *Dispatcher) UpdateSubscriptions(subscriptions []Subscription) map[Subscription]error {
217235

218236
failedSubscriptions := make(map[Subscription]error)

components/dispatcher/internal/dispatcher/dispatcher_test.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const (
2424
testTopic = "TestTopic"
2525
testPartition = 33
2626
testOffset = "latest"
27-
testGroupId = "TestGroupId"
27+
testGroupId1 = "TestGroupId1"
2828
testGroupId2 = "TestGroupId2"
2929
testGroupId3 = "TestGroupId3"
3030
testKey = "TestKey"
@@ -113,7 +113,7 @@ func TestDispatcher(t *testing.T) {
113113
subscriptionResults := testDispatcher.UpdateSubscriptions([]Subscription{
114114
{
115115
URI: testSubscriberUri1,
116-
GroupId: testGroupId,
116+
GroupId: testGroupId1,
117117
},
118118
})
119119

@@ -131,7 +131,7 @@ func TestDispatcher(t *testing.T) {
131131
subscriptionResults = testDispatcher.UpdateSubscriptions([]Subscription{
132132
{
133133
URI: testSubscriberUri1,
134-
GroupId: testGroupId,
134+
GroupId: testGroupId1,
135135
},
136136
{
137137
URI: testSubscriberUri2,
@@ -153,7 +153,7 @@ func TestDispatcher(t *testing.T) {
153153
sendMessagesToConsumers(t, testDispatcher.consumers, testMessagesToSend)
154154

155155
// Wait For Consumers To Process Messages
156-
waitForConsumersToProcessEvents(t, testDispatcher.consumers)
156+
waitForConsumersToProcessEvents(t, testDispatcher.consumers, testMessagesToSend)
157157

158158
// Verify The Consumer Offset Commit Messages
159159
verifyConsumerCommits(t, testDispatcher.consumers, 3)
@@ -162,7 +162,7 @@ func TestDispatcher(t *testing.T) {
162162
subscriptionResults = testDispatcher.UpdateSubscriptions([]Subscription{
163163
{
164164
URI: testSubscriberUri1,
165-
GroupId: testGroupId,
165+
GroupId: testGroupId1,
166166
},
167167
})
168168
verifyConsumersCount(t, testDispatcher.consumers, 1)
@@ -393,21 +393,34 @@ func createMessageHeaders(context cloudevents.EventContext) []kafka.Header {
393393
}
394394

395395
// Wait For The Consumers To Process All The Events
396-
func waitForConsumersToProcessEvents(t *testing.T, consumers map[Subscription]*ConsumerOffset) {
396+
func waitForConsumersToProcessEvents(t *testing.T, consumers map[Subscription]*ConsumerOffset, expectedOffset kafka.Offset) {
397+
398+
// Track Wait Start Time
397399
startTime := time.Now()
400+
401+
// Loop Over All The Specified Consumers
398402
for _, consumer := range consumers {
403+
404+
// Verified Cast Of Consumer As MockConsumer
399405
mockConsumer, ok := consumer.consumer.(*MockConsumer)
400406
assert.True(t, ok)
407+
408+
// Infinite Loop - Broken By Internal Timeout Failure
401409
for {
410+
411+
// Get The Partition's Committed Offset (Minimal Synchronization To Avoid Deadlocking The Commit Updates!)
402412
mockConsumer.offsetsMutex.Lock()
403-
if mockConsumer.getCommits()[testPartition] >= testMessagesToSend+1 {
413+
commitOffset := mockConsumer.getCommits()[testPartition]
414+
mockConsumer.offsetsMutex.Unlock()
415+
416+
// Process Committed Offset - Either Stop Looking, Wait, Or Timeout
417+
if commitOffset >= expectedOffset+1 {
404418
break
405-
} else if time.Now().Sub(startTime) > (4 * time.Second) {
419+
} else if time.Now().Sub(startTime) > (5 * time.Second) {
406420
assert.FailNow(t, "Timed-out Waiting For Consumers To Process Events")
407421
} else {
408422
time.Sleep(100 * time.Millisecond)
409423
}
410-
mockConsumer.offsetsMutex.Unlock()
411424
}
412425
}
413426
}

0 commit comments

Comments
 (0)