@@ -211,8 +211,10 @@ func (client *DataSetClient) isProcessingEvents() bool {
211
211
return client .eventsEnqueued .Load () > client .eventsProcessed .Load ()
212
212
}
213
213
214
- // Shutdown stops processing of new events and waits until all the events that are
215
- // being processed are really processed (sent to DataSet).
214
+ // Shutdown takes care of shutdown of client. It does following steps
215
+ // - stops processing of new events,
216
+ // - tries (with 1st half of shutdownMaxTimeout period) to process (add into buffers) all the events,
217
+ // - tries (with 2nd half of shutdownMaxTimeout period) to send processed events (buffers) into DataSet
216
218
func (client * DataSetClient ) Shutdown () error {
217
219
client .Logger .Info ("Shutting down - BEGIN" )
218
220
// mark as finished to prevent processing of further events
@@ -222,33 +224,25 @@ func (client *DataSetClient) Shutdown() error {
222
224
client .logStatistics ()
223
225
224
226
var lastError error = nil
227
+ shutdownTimeout := minDuration (client .Config .BufferSettings .RetryMaxElapsedTime , client .Config .BufferSettings .RetryShutdownTimeout )
225
228
expBackoff := backoff.ExponentialBackOff {
226
229
InitialInterval : client .Config .BufferSettings .RetryInitialInterval ,
227
230
RandomizationFactor : client .Config .BufferSettings .RetryRandomizationFactor ,
228
231
Multiplier : client .Config .BufferSettings .RetryMultiplier ,
229
232
MaxInterval : client .Config .BufferSettings .RetryMaxInterval ,
230
- MaxElapsedTime : client . Config . BufferSettings . RetryMaxElapsedTime ,
233
+ MaxElapsedTime : shutdownTimeout / 2 ,
231
234
Stop : backoff .Stop ,
232
235
Clock : backoff .SystemClock ,
233
236
}
234
237
expBackoff .Reset ()
235
238
236
- // first we wait until all the events in buffers are added into buffers
237
- // then we are waiting until all the buffers are processed
238
- // if some progress is made we restart the waiting times
239
-
240
- // do wait for all events to be processed
239
+ // try (with timeout) to process (add into buffers) events,
241
240
retryNum := 0
242
- lastProcessed := client . eventsProcessed . Load ()
241
+ processingStart := time . Now ()
243
242
for client .isProcessingEvents () {
244
243
// log statistics
245
244
client .logStatistics ()
246
245
247
- // if some events were processed restart retry interval
248
- if client .eventsProcessed .Load () != lastProcessed {
249
- expBackoff .Reset ()
250
- }
251
- lastProcessed = client .eventsProcessed .Load ()
252
246
backoffDelay := expBackoff .NextBackOff ()
253
247
client .Logger .Info (
254
248
"Shutting down - processing events" ,
@@ -279,22 +273,26 @@ func (client *DataSetClient) Shutdown() error {
279
273
client .Logger .Info ("Shutting down - publishing all buffers" )
280
274
client .publishAllBuffers ()
281
275
282
- // do wait for all buffers to be processed
276
+ // reinitialize expBackoff with MaxElapsedTime based on actually elapsed time of processing (previous phase)
277
+ processingElapsed := time .Since (processingStart )
278
+ remainingShutdownTimeout := maxDuration (shutdownTimeout - processingElapsed , shutdownTimeout / 2 )
279
+ expBackoff = backoff.ExponentialBackOff {
280
+ InitialInterval : client .Config .BufferSettings .RetryInitialInterval ,
281
+ RandomizationFactor : client .Config .BufferSettings .RetryRandomizationFactor ,
282
+ Multiplier : client .Config .BufferSettings .RetryMultiplier ,
283
+ MaxInterval : client .Config .BufferSettings .RetryMaxInterval ,
284
+ MaxElapsedTime : remainingShutdownTimeout ,
285
+ Stop : backoff .Stop ,
286
+ Clock : backoff .SystemClock ,
287
+ }
288
+ // do wait (with timeout) for all buffers to be sent to the server
283
289
retryNum = 0
284
290
expBackoff .Reset ()
285
- lastProcessed = client .buffersProcessed .Load ()
286
- lastDropped := client .buffersDropped .Load ()
287
- initialDropped := lastDropped
291
+ initialDropped := client .buffersDropped .Load ()
288
292
for client .isProcessingBuffers () {
289
293
// log statistics
290
294
client .logStatistics ()
291
295
292
- // if some buffers were processed restart retry interval
293
- if client .buffersProcessed .Load ()+ lastDropped != lastProcessed + client .buffersDropped .Load () {
294
- expBackoff .Reset ()
295
- }
296
- lastProcessed = client .buffersProcessed .Load ()
297
- lastDropped = client .buffersDropped .Load ()
298
296
backoffDelay := expBackoff .NextBackOff ()
299
297
client .Logger .Info (
300
298
"Shutting down - processing buffers" ,
@@ -476,3 +474,17 @@ func truncateText(text string, length int) string {
476
474
477
475
return text
478
476
}
477
+
478
+ func minDuration (a , b time.Duration ) time.Duration {
479
+ if a <= b {
480
+ return a
481
+ }
482
+ return b
483
+ }
484
+
485
+ func maxDuration (a , b time.Duration ) time.Duration {
486
+ if a >= b {
487
+ return a
488
+ }
489
+ return b
490
+ }
0 commit comments