Skip to content

Commit 517219d

Browse files
committed
cleanup before calling onFlushError() to avoid deadlock
1 parent 426df23 commit 517219d

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

db/db.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,6 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor
273273
l.logger.Debug("async flush started", zap.Int("active_flushes", l.activeFlushes))
274274

275275
go func() {
276-
// cleanup defer
277-
defer func() {
278-
l.cond.L.Lock()
279-
l.activeFlushes--
280-
l.cond.Broadcast()
281-
l.cond.L.Unlock()
282-
}()
283276

284277
// Disallow cancellation of the context to prevent holes in the data with parallel flushes
285278
if l.maxParallelFlushes > 1 {
@@ -288,6 +281,13 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor
288281

289282
start := time.Now()
290283
rowFlushedCount, err := flushLoader.Flush(ctx, outputModuleHash, cursor, lastFinalBlock)
284+
took := time.Since(start)
285+
286+
l.cond.L.Lock()
287+
l.activeFlushes--
288+
l.cond.Broadcast()
289+
l.cond.L.Unlock()
290+
291291
if err != nil {
292292
l.logger.Warn("async flush failed after retries", zap.Error(err))
293293
if l.onFlushError != nil {
@@ -296,7 +296,6 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor
296296
return
297297
}
298298

299-
took := time.Since(start)
300299
l.logger.Debug("async flush complete",
301300
zap.Int("row_count", rowFlushedCount),
302301
zap.Duration("took", took))

0 commit comments

Comments
 (0)