Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Loader struct {
cond *sync.Cond
activeFlushes int
maxParallelFlushes int
isShuttingDown bool

// onFlush is an optional observer called when a flush completes successfully.
// It receives the number of rows flushed and the total duration of the flush.
Expand Down Expand Up @@ -247,19 +248,42 @@ func (l *Loader) WaitForAllFlushes() {
l.cond.L.Lock()
defer l.cond.L.Unlock()

for l.activeFlushes > 0 {
l.cond.Wait()
// Set the shutdown flag to prevent new flushes while we're waiting
if !l.isShuttingDown {
l.logger.Info("marking loader as shutting down during wait for flushes")
l.isShuttingDown = true
}

if l.activeFlushes > 0 {
start := time.Now()
for l.activeFlushes > 0 {
l.logger.Info("waiting for in-flight async flushes to complete",
zap.Int("active_flushes", l.activeFlushes),
zap.Duration("waited_for", time.Since(start)))
l.cond.Wait()
}
l.logger.Info("all async flushes completed")
}
}

// FlushAsync triggers a non-blocking flush. Blocks if the maximum number of parallel flushes is reached until a flush is completed.
func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor *sink.Cursor, lastFinalBlock uint64) {
l.logger.Debug("async flush: starting flush", zap.Int("active_flushes", l.activeFlushes), zap.Uint64("last_final_block", lastFinalBlock))
l.cond.L.Lock()
Comment on lines 271 to 272
Copy link

Copilot AI Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading l.activeFlushes without holding the lock creates a race condition. The lock should be acquired before accessing any shared state including for logging.

Copilot uses AI. Check for mistakes.
defer l.cond.L.Unlock()

for l.activeFlushes >= l.maxParallelFlushes {
l.logger.Debug("async flush: maximum number of parallel flushes reached, waiting for a flush to complete")
l.cond.Wait()
}

if l.isShuttingDown {
l.logger.Info("async flush: loader is shutting down after wait, skipping new flush",
zap.Uint64("block", lastFinalBlock),
zap.Int("active_flushes", l.activeFlushes))
return
}

// Snapshot entries and replace with a fresh buffer
snapshot := l.entries
l.entries = NewOrderedMap[string, *OrderedMap[string, *Operation]]()
Expand All @@ -268,18 +292,9 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor
// Build a lightweight loader for flushing while still under lock to avoid racy reads of fields.
flushLoader := l.newFlushLoader(snapshot)

l.cond.L.Unlock()

l.logger.Debug("async flush started", zap.Int("active_flushes", l.activeFlushes))

go func() {
// cleanup defer
defer func() {
l.cond.L.Lock()
l.activeFlushes--
l.cond.Broadcast()
l.cond.L.Unlock()
}()

// Disallow cancellation of the context to prevent holes in the data with parallel flushes
if l.maxParallelFlushes > 1 {
Expand All @@ -288,15 +303,28 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor

start := time.Now()
rowFlushedCount, err := flushLoader.Flush(ctx, outputModuleHash, cursor, lastFinalBlock)
took := time.Since(start)

if err != nil {
l.logger.Warn("async flush failed after retries", zap.Error(err))

l.cond.L.Lock()
l.isShuttingDown = true
Copy link

Copilot AI Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting isShuttingDown flag on flush error without checking if already shutting down could cause race conditions. Consider checking the flag state before setting it or using atomic operations.

Copilot uses AI. Check for mistakes.
l.activeFlushes--
l.cond.Broadcast()
l.cond.L.Unlock()

if l.onFlushError != nil {
l.onFlushError(err)
}
return
}

took := time.Since(start)
l.cond.L.Lock()
l.activeFlushes--
l.cond.Broadcast()
l.cond.L.Unlock()

l.logger.Debug("async flush complete",
zap.Int("row_count", rowFlushedCount),
zap.Duration("took", took))
Expand Down
6 changes: 6 additions & 0 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,15 @@ func (s *SQLSinker) HandleBlockRangeCompletion(ctx context.Context, cursor *sink
s.logger.Warn("completion flush skipped: context canceled, exiting without final flush", zap.Error(err))
return nil
}

s.logger.Info("stream completed, flushing remaining entries to database", zap.Stringer("block", cursor.Block()))
_, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, cursor.Block().Num())
if err != nil {
// Check if this is a shutdown-related error and handle accordingly
if strings.Contains(err.Error(), "shutting down") {
Copy link

Copilot AI Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using string matching on error messages is fragile and error-prone. Consider defining a specific error type or using error wrapping with errors.Is() for more robust error handling.

Copilot uses AI. Check for mistakes.
s.logger.Info("final flush skipped because loader is shutting down", zap.Stringer("block", cursor.Block()))
return nil
}
return fmt.Errorf("failed to flush %s block on completion: %w", cursor.Block(), err)
}

Expand Down
12 changes: 12 additions & 0 deletions sinker/stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sinker

import (
"sync"
"time"

"github.com/streamingfast/bstream"
Expand All @@ -13,6 +14,8 @@ import (
type Stats struct {
*shutter.Shutter

// mutex protects access to metrics that can be modified concurrently
mutex sync.Mutex
dbFlushRate *dmetrics.AvgRatePromCounter
dbFlushAvgDuration *dmetrics.AvgDurationCounter
flushedRows *dmetrics.ValueFromMetric
Expand All @@ -36,17 +39,23 @@ func NewStats(logger *zap.Logger) *Stats {
}

func (s *Stats) RecordBlock(block bstream.BlockRef) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.lastBlock = block
}

func (s *Stats) RecordFlushDuration(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.dbFlushAvgDuration.AddDuration(duration)
}

func (s *Stats) Start(each time.Duration, cursor *sink.Cursor) {
s.mutex.Lock()
if !cursor.IsBlank() {
s.lastBlock = cursor.Block()
}
s.mutex.Unlock()

if s.IsTerminating() || s.IsTerminated() {
panic("already shutdown, refusing to start again")
Expand All @@ -68,6 +77,9 @@ func (s *Stats) Start(each time.Duration, cursor *sink.Cursor) {
}

func (s *Stats) LogNow() {
s.mutex.Lock()
defer s.mutex.Unlock()

// Logging fields order is important as it affects the final rendering, we carefully ordered
// them so the development logs looks nicer.
s.logger.Info("postgres sink stats",
Expand Down