Skip to content

Commit c7899a1

Browse files
committed
add isShuttingDown flag
1 parent 517219d commit c7899a1

File tree

2 files changed

+48
-4
lines changed

2 files changed

+48
-4
lines changed

db/db.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type Loader struct {
9898
cond *sync.Cond
9999
activeFlushes int
100100
maxParallelFlushes int
101+
isShuttingDown bool
101102

102103
// onFlush is an optional observer called when a flush completes successfully.
103104
// It receives the number of rows flushed and the total duration of the flush.
@@ -242,24 +243,59 @@ func (l *Loader) FlushNeeded() bool {
242243
return totalRows > l.batchRowFlushInterval
243244
}
244245

246+
// MarkShuttingDown sets the shutdown flag to prevent new flushes from starting
247+
func (l *Loader) MarkShuttingDown() {
248+
l.cond.L.Lock()
249+
// Only log if we're changing the state
250+
if !l.isShuttingDown {
251+
l.logger.Info("marking loader as shutting down",
252+
zap.Int("active_flushes", l.activeFlushes))
253+
l.isShuttingDown = true
254+
}
255+
l.cond.L.Unlock()
256+
}
257+
245258
// WaitForAllFlushes blocks until there are no in-flight async flushes.
246259
func (l *Loader) WaitForAllFlushes() {
247260
l.cond.L.Lock()
248261
defer l.cond.L.Unlock()
249262

250-
for l.activeFlushes > 0 {
251-
l.cond.Wait()
263+
// Set the shutdown flag to prevent new flushes while we're waiting
264+
if !l.isShuttingDown {
265+
l.logger.Info("marking loader as shutting down during wait for flushes")
266+
l.isShuttingDown = true
267+
}
268+
269+
if l.activeFlushes > 0 {
270+
start := time.Now()
271+
for l.activeFlushes > 0 {
272+
l.logger.Info("waiting for in-flight async flushes to complete",
273+
zap.Int("active_flushes", l.activeFlushes),
274+
zap.Duration("waited_for", time.Since(start)))
275+
l.cond.Wait()
276+
}
277+
l.logger.Info("all async flushes completed")
252278
}
253279
}
254280

255281
// FlushAsync triggers a non-blocking flush. Blocks if the maximum number of parallel flushes is reached until a flush is completed.
256282
func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor *sink.Cursor, lastFinalBlock uint64) {
257283
l.logger.Debug("async flush: starting flush", zap.Int("active_flushes", l.activeFlushes), zap.Uint64("last_final_block", lastFinalBlock))
258284
l.cond.L.Lock()
285+
defer l.cond.L.Unlock()
286+
259287
for l.activeFlushes >= l.maxParallelFlushes {
260288
l.logger.Debug("async flush: maximum number of parallel flushes reached, waiting for a flush to complete")
261289
l.cond.Wait()
262290
}
291+
292+
if l.isShuttingDown {
293+
l.logger.Info("async flush: loader is shutting down after wait, skipping new flush",
294+
zap.Uint64("block", lastFinalBlock),
295+
zap.Int("active_flushes", l.activeFlushes))
296+
return
297+
}
298+
263299
// Snapshot entries and replace with a fresh buffer
264300
snapshot := l.entries
265301
l.entries = NewOrderedMap[string, *OrderedMap[string, *Operation]]()
@@ -268,8 +304,6 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor
268304
// Build a lightweight loader for flushing while still under lock to avoid racy reads of fields.
269305
flushLoader := l.newFlushLoader(snapshot)
270306

271-
l.cond.L.Unlock()
272-
273307
l.logger.Debug("async flush started", zap.Int("active_flushes", l.activeFlushes))
274308

275309
go func() {

sinker/sinker.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer loggin
5454

5555
// Register an error observer so any async flush failure shuts down the sinker
5656
loader.SetOnFlushError(func(err error) {
57+
// First mark the loader as shutting down to prevent new flushes
58+
loader.MarkShuttingDown()
59+
60+
// Then initiate graceful shutdown
5761
s.Shutdown(fmt.Errorf("async flush failed: %w", err))
5862
})
5963

@@ -241,9 +245,15 @@ func (s *SQLSinker) HandleBlockRangeCompletion(ctx context.Context, cursor *sink
241245
s.logger.Warn("completion flush skipped: context canceled, exiting without final flush", zap.Error(err))
242246
return nil
243247
}
248+
244249
s.logger.Info("stream completed, flushing remaining entries to database", zap.Stringer("block", cursor.Block()))
245250
_, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, cursor.Block().Num())
246251
if err != nil {
252+
// Check if this is a shutdown-related error and handle accordingly
253+
if strings.Contains(err.Error(), "shutting down") {
254+
s.logger.Info("final flush skipped because loader is shutting down", zap.Stringer("block", cursor.Block()))
255+
return nil
256+
}
247257
return fmt.Errorf("failed to flush %s block on completion: %w", cursor.Block(), err)
248258
}
249259

0 commit comments

Comments
 (0)