@@ -4,14 +4,10 @@ import (
44 "context"
55 "database/sql"
66 "fmt"
7- "strings"
87 "time"
98
10- "sync"
11-
129 "github.com/jimsmart/schema"
1310 "github.com/streamingfast/logging"
14- sink "github.com/streamingfast/substreams-sink"
1511 orderedmap "github.com/wk8/go-ordered-map/v2"
1612 "go.uber.org/zap"
1713 "go.uber.org/zap/zapcore"
@@ -26,43 +22,6 @@ type OrderedMap[K comparable, V any] struct {
2622 * orderedmap.OrderedMap [K , V ]
2723}
2824
29- // newFlushLoader creates a minimal Loader used exclusively to perform a flush
30- // using the provided entries snapshot. It intentionally does not copy
31- // synchronization primitives.
32- func (l * Loader ) newFlushLoader (snapshot * OrderedMap [string , * OrderedMap [string , * Operation ]]) * Loader {
33- return & Loader {
34- DB : l .DB ,
35- database : l .database ,
36- schema : l .schema ,
37- entries : snapshot ,
38- entriesCount : l .entriesCount ,
39- tables : l .tables ,
40- cursorTable : l .cursorTable ,
41- handleReorgs : l .handleReorgs ,
42- batchBlockFlushInterval : l .batchBlockFlushInterval ,
43- batchRowFlushInterval : l .batchRowFlushInterval ,
44- liveBlockFlushInterval : l .liveBlockFlushInterval ,
45- moduleMismatchMode : l .moduleMismatchMode ,
46- maxFlushRetries : l .maxFlushRetries ,
47- sleepBetweenFlushRetries : l .sleepBetweenFlushRetries ,
48- logger : l .logger ,
49- tracer : l .tracer ,
50- testTx : l .testTx ,
51- }
52- }
53-
54- // SetOnFlush sets an optional observer invoked on successful flush completion.
55- // The callback receives the number of rows flushed and the flush duration.
56- func (l * Loader ) SetOnFlush (cb func (rows int , dur time.Duration )) {
57- l .onFlush = cb
58- }
59-
60- // SetOnFlushError sets an optional observer invoked when an async flush fails.
61- // This enables the caller to handle shutdown or retries.
62- func (l * Loader ) SetOnFlushError (cb func (err error )) {
63- l .onFlushError = cb
64- }
65-
6625func NewOrderedMap [K comparable , V any ]() * OrderedMap [K , V ] {
6726 return & OrderedMap [K , V ]{OrderedMap : orderedmap .New [K , V ]()}
6827}
@@ -93,27 +52,13 @@ type Loader struct {
9352 tracer logging.Tracer
9453
9554 testTx * TestTx // used for testing: if non-nil, 'loader.BeginTx()' will return this object instead of a real *sql.Tx
96-
97- // async flush
98- cond * sync.Cond
99- activeFlushes int
100- maxParallelFlushes int
101-
102- // onFlush is an optional observer called when a flush completes successfully.
103- // It receives the number of rows flushed and the total duration of the flush.
104- onFlush func (rows int , dur time.Duration )
105-
106- // onFlushError is an optional observer invoked when an async flush fails.
107- // If set, it should trigger appropriate shutdown from the caller side.
108- onFlushError func (err error )
10955}
11056
11157func NewLoader (
11258 psqlDsn string ,
11359 batchBlockFlushInterval int ,
11460 batchRowFlushInterval int ,
11561 liveBlockFlushInterval int ,
116- maxParallelFlushes int ,
11762 moduleMismatchMode OnModuleHashMismatch ,
11863 handleReorgs * bool ,
11964 logger * zap.Logger ,
@@ -129,10 +74,6 @@ func NewLoader(
12974 return nil , fmt .Errorf ("open db connection: %w" , err )
13075 }
13176
132- if maxParallelFlushes < 1 {
133- maxParallelFlushes = 1
134- }
135-
13677 l := & Loader {
13778 DB : db ,
13879 database : dsn .database ,
@@ -147,9 +88,7 @@ func NewLoader(
14788 moduleMismatchMode : moduleMismatchMode ,
14889 logger : logger ,
14990 tracer : tracer ,
150- maxParallelFlushes : maxParallelFlushes ,
15191 }
152- l .cond = sync .NewCond (& sync.Mutex {})
15392 _ , err = l .tryDialect ()
15493 if err != nil {
15594 return nil , fmt .Errorf ("dialect not found: %s" , err )
@@ -170,7 +109,6 @@ func NewLoader(
170109 zap .Int ("batch_block_flush_interval" , batchBlockFlushInterval ),
171110 zap .Int ("batch_row_flush_interval" , batchRowFlushInterval ),
172111 zap .Int ("live_block_flush_interval" , liveBlockFlushInterval ),
173- zap .Int ("max_parallel_flushes" , maxParallelFlushes ),
174112 zap .String ("driver" , dsn .driver ),
175113 zap .String ("database" , dsn .database ),
176114 zap .String ("schema" , dsn .schema ),
@@ -212,28 +150,7 @@ func (l *Loader) LiveBlockFlushInterval() int {
212150 return l .liveBlockFlushInterval
213151}
214152
215- func (l * Loader ) GetPrimaryKey (tableName string , pk string ) (map [string ]string , error ) {
216- table , found := l .tables [tableName ]
217- if ! found {
218- return nil , fmt .Errorf ("unknown table %q" , tableName )
219- }
220- if len (table .primaryColumns ) == 1 {
221- return map [string ]string {table .primaryColumns [0 ].name : pk }, nil
222- }
223- parts := strings .Split (pk , "/" )
224- if len (parts ) != len (table .primaryColumns ) {
225- return nil , fmt .Errorf ("composite primary key value count mismatch for table %q: got %d parts, expected %d" , tableName , len (parts ), len (table .primaryColumns ))
226- }
227- res := make (map [string ]string , len (parts ))
228- for i , col := range table .primaryColumns {
229- res [col .name ] = parts [i ]
230- }
231- return res , nil
232- }
233-
234153func (l * Loader ) FlushNeeded () bool {
235- l .cond .L .Lock ()
236- defer l .cond .L .Unlock ()
237154 totalRows := 0
238155 // todo keep a running count when inserting/deleting rows directly
239156 for pair := l .entries .Oldest (); pair != nil ; pair = pair .Next () {
@@ -242,72 +159,6 @@ func (l *Loader) FlushNeeded() bool {
242159 return totalRows > l .batchRowFlushInterval
243160}
244161
245- // WaitForAllFlushes blocks until there are no in-flight async flushes.
246- func (l * Loader ) WaitForAllFlushes () {
247- l .cond .L .Lock ()
248- defer l .cond .L .Unlock ()
249-
250- for l .activeFlushes > 0 {
251- l .cond .Wait ()
252- }
253- }
254-
255- // FlushAsync triggers a non-blocking flush. Blocks if the maximum number of parallel flushes is reached until a flush is completed.
256- func (l * Loader ) FlushAsync (ctx context.Context , outputModuleHash string , cursor * sink.Cursor , lastFinalBlock uint64 ) {
257- l .logger .Debug ("async flush: starting flush" , zap .Int ("active_flushes" , l .activeFlushes ), zap .Uint64 ("last_final_block" , lastFinalBlock ))
258- l .cond .L .Lock ()
259- for l .activeFlushes >= l .maxParallelFlushes {
260- l .logger .Debug ("async flush: maximum number of parallel flushes reached, waiting for a flush to complete" )
261- l .cond .Wait ()
262- }
263- // Snapshot entries and replace with a fresh buffer
264- snapshot := l .entries
265- l .entries = NewOrderedMap [string , * OrderedMap [string , * Operation ]]()
266- l .activeFlushes ++
267-
268- // Build a lightweight loader for flushing while still under lock to avoid racy reads of fields.
269- flushLoader := l .newFlushLoader (snapshot )
270-
271- l .cond .L .Unlock ()
272-
273- l .logger .Debug ("async flush started" , zap .Int ("active_flushes" , l .activeFlushes ))
274-
275- go func () {
276- // cleanup defer
277- defer func () {
278- l .cond .L .Lock ()
279- l .activeFlushes --
280- l .cond .Broadcast ()
281- l .cond .L .Unlock ()
282- }()
283-
284- // Disallow cancellation of the context to prevent holes in the data with parallel flushes
285- if l .maxParallelFlushes > 1 {
286- ctx = context .WithoutCancel (ctx )
287- }
288-
289- start := time .Now ()
290- rowFlushedCount , err := flushLoader .Flush (ctx , outputModuleHash , cursor , lastFinalBlock )
291- if err != nil {
292- l .logger .Warn ("async flush failed after retries" , zap .Error (err ))
293- if l .onFlushError != nil {
294- l .onFlushError (err )
295- }
296- return
297- }
298-
299- took := time .Since (start )
300- l .logger .Debug ("async flush complete" ,
301- zap .Int ("row_count" , rowFlushedCount ),
302- zap .Duration ("took" , took ))
303-
304- // Notify observer outside the lock
305- if l .onFlush != nil {
306- l .onFlush (rowFlushedCount , took )
307- }
308- }()
309- }
310-
311162func (l * Loader ) LoadTables () error {
312163 schemaTables , err := schema .Tables (l .DB )
313164 if err != nil {
@@ -453,13 +304,6 @@ func (l *Loader) HasTable(tableName string) bool {
453304 return false
454305}
455306
456- // GetActiveFlushesNum returns the current number of in-flight async flushes.
457- func (l * Loader ) GetActiveFlushesNum () int {
458- l .cond .L .Lock ()
459- defer l .cond .L .Unlock ()
460- return l .activeFlushes
461- }
462-
463307func (l * Loader ) MarshalLogObject (encoder zapcore.ObjectEncoder ) error {
464308 encoder .AddUint64 ("entries_count" , l .entriesCount )
465309 return nil
0 commit comments