-
Notifications
You must be signed in to change notification settings - Fork 0
Fix async shutdown #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -98,6 +98,7 @@ type Loader struct { | |
| cond *sync.Cond | ||
| activeFlushes int | ||
| maxParallelFlushes int | ||
| isShuttingDown bool | ||
|
|
||
| // onFlush is an optional observer called when a flush completes successfully. | ||
| // It receives the number of rows flushed and the total duration of the flush. | ||
|
|
@@ -247,19 +248,42 @@ func (l *Loader) WaitForAllFlushes() { | |
| l.cond.L.Lock() | ||
| defer l.cond.L.Unlock() | ||
|
|
||
| for l.activeFlushes > 0 { | ||
| l.cond.Wait() | ||
| // Set the shutdown flag to prevent new flushes while we're waiting | ||
| if !l.isShuttingDown { | ||
| l.logger.Info("marking loader as shutting down during wait for flushes") | ||
| l.isShuttingDown = true | ||
| } | ||
|
|
||
| if l.activeFlushes > 0 { | ||
| start := time.Now() | ||
| for l.activeFlushes > 0 { | ||
| l.logger.Info("waiting for in-flight async flushes to complete", | ||
| zap.Int("active_flushes", l.activeFlushes), | ||
| zap.Duration("waited_for", time.Since(start))) | ||
| l.cond.Wait() | ||
| } | ||
| l.logger.Info("all async flushes completed") | ||
| } | ||
| } | ||
|
|
||
| // FlushAsync triggers a non-blocking flush. Blocks if the maximum number of parallel flushes is reached until a flush is completed. | ||
| func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor *sink.Cursor, lastFinalBlock uint64) { | ||
| l.logger.Debug("async flush: starting flush", zap.Int("active_flushes", l.activeFlushes), zap.Uint64("last_final_block", lastFinalBlock)) | ||
| l.cond.L.Lock() | ||
| defer l.cond.L.Unlock() | ||
|
|
||
| for l.activeFlushes >= l.maxParallelFlushes { | ||
| l.logger.Debug("async flush: maximum number of parallel flushes reached, waiting for a flush to complete") | ||
| l.cond.Wait() | ||
| } | ||
|
|
||
| if l.isShuttingDown { | ||
| l.logger.Info("async flush: loader is shutting down after wait, skipping new flush", | ||
| zap.Uint64("block", lastFinalBlock), | ||
| zap.Int("active_flushes", l.activeFlushes)) | ||
| return | ||
| } | ||
|
|
||
| // Snapshot entries and replace with a fresh buffer | ||
| snapshot := l.entries | ||
| l.entries = NewOrderedMap[string, *OrderedMap[string, *Operation]]() | ||
|
|
@@ -268,18 +292,9 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor | |
| // Build a lightweight loader for flushing while still under lock to avoid racy reads of fields. | ||
| flushLoader := l.newFlushLoader(snapshot) | ||
|
|
||
| l.cond.L.Unlock() | ||
|
|
||
| l.logger.Debug("async flush started", zap.Int("active_flushes", l.activeFlushes)) | ||
|
|
||
| go func() { | ||
| // cleanup defer | ||
| defer func() { | ||
| l.cond.L.Lock() | ||
| l.activeFlushes-- | ||
| l.cond.Broadcast() | ||
| l.cond.L.Unlock() | ||
| }() | ||
|
|
||
| // Disallow cancellation of the context to prevent holes in the data with parallel flushes | ||
| if l.maxParallelFlushes > 1 { | ||
|
|
@@ -288,15 +303,28 @@ func (l *Loader) FlushAsync(ctx context.Context, outputModuleHash string, cursor | |
|
|
||
| start := time.Now() | ||
| rowFlushedCount, err := flushLoader.Flush(ctx, outputModuleHash, cursor, lastFinalBlock) | ||
| took := time.Since(start) | ||
|
|
||
| if err != nil { | ||
| l.logger.Warn("async flush failed after retries", zap.Error(err)) | ||
|
|
||
| l.cond.L.Lock() | ||
| l.isShuttingDown = true | ||
|
||
| l.activeFlushes-- | ||
| l.cond.Broadcast() | ||
| l.cond.L.Unlock() | ||
|
|
||
| if l.onFlushError != nil { | ||
| l.onFlushError(err) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| took := time.Since(start) | ||
| l.cond.L.Lock() | ||
| l.activeFlushes-- | ||
| l.cond.Broadcast() | ||
| l.cond.L.Unlock() | ||
|
|
||
| l.logger.Debug("async flush complete", | ||
| zap.Int("row_count", rowFlushedCount), | ||
| zap.Duration("took", took)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -241,9 +241,15 @@ func (s *SQLSinker) HandleBlockRangeCompletion(ctx context.Context, cursor *sink | |
| s.logger.Warn("completion flush skipped: context canceled, exiting without final flush", zap.Error(err)) | ||
| return nil | ||
| } | ||
|
|
||
| s.logger.Info("stream completed, flushing remaining entries to database", zap.Stringer("block", cursor.Block())) | ||
| _, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, cursor.Block().Num()) | ||
| if err != nil { | ||
| // Check if this is a shutdown-related error and handle accordingly | ||
| if strings.Contains(err.Error(), "shutting down") { | ||
|
||
| s.logger.Info("final flush skipped because loader is shutting down", zap.Stringer("block", cursor.Block())) | ||
| return nil | ||
| } | ||
| return fmt.Errorf("failed to flush %s block on completion: %w", cursor.Block(), err) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading l.activeFlushes without holding the lock creates a race condition. The lock should be acquired before accessing any shared state including for logging.