@@ -314,6 +314,8 @@ proc updateBase(c: ForkedChainRef, base: BlockRef): uint =
314314 # No update, return
315315 return
316316
317+ let startTime = Moment .now ()
318+
317319 # State root sanity check is performed to verify, before writing to disk,
318320 # that optimistically checked blocks indeed end up being stored with a
319321 # consistent state root.
@@ -338,8 +340,7 @@ with --debug-eager-state-root."""
338340
339341 # Cleanup in-memory blocks starting from base backward
340342 # e.g. B2 backward.
341- var
342- count = 0 'u
343+ var count = 0 'u
343344
344345 for it in ancestors (base.parent):
345346 c.removeBlockFromCache (it)
@@ -352,6 +353,33 @@ with --debug-eager-state-root."""
352353 # Base block always have finalized marker
353354 c.base.finalize ()
354355
356+ if c.dynamicBatchSize:
357+ # Dynamicly adjust the persistBatchSize based on the recorded run time.
358+ # The goal here is use the maximum batch size possible without blocking the
359+ # event loop for too long which could negatively impact the p2p networking.
360+ # Increasing the batch size can improve performance because the stateroot
361+ # computation and persist calls are performed less frequently.
362+ const
363+ targetTime = 500 .milliseconds
364+ targetTimeDelta = 200 .milliseconds
365+ targetTimeLowerBound = (targetTime - targetTimeDelta).milliseconds
366+ targetTimeUpperBound = (targetTime + targetTimeDelta).milliseconds
367+ batchSizeLowerBound = 4
368+ batchSizeUpperBound = 512
369+
370+ let
371+ finishTime = Moment .now ()
372+ runTime = (finishTime - startTime).milliseconds
373+
374+ if runTime < targetTimeLowerBound and c.persistBatchSize <= batchSizeUpperBound:
375+ c.persistBatchSize *= 2
376+ info " Increased persistBatchSize" , runTime, targetTime,
377+ persistBatchSize = c.persistBatchSize
378+ elif runTime > targetTimeUpperBound and c.persistBatchSize >= batchSizeLowerBound:
379+ c.persistBatchSize = c.persistBatchSize div 2
380+ info " Decreased persistBatchSize" , runTime, targetTime,
381+ persistBatchSize = c.persistBatchSize
382+
355383 count
356384
357385proc processUpdateBase (c: ForkedChainRef ): Future [Result [void , string ]] {.async : (raises: [CancelledError ]).} =
@@ -407,20 +435,20 @@ proc queueUpdateBase(c: ForkedChainRef, base: BlockRef)
407435 else :
408436 c.base
409437
410- if prevQueuedBase.number = = base.number:
438+ if prevQueuedBase.number > = base.number:
411439 return
412440
413441 var
414- number = base.number - min (base.number, PersistBatchSize )
415- steps = newSeqOfCap [BlockRef ]((base.number- prevQueuedBase.number) div PersistBatchSize + 1 )
442+ number = base.number - min (base.number, c.persistBatchSize )
443+ steps = newSeqOfCap [BlockRef ]((base.number - prevQueuedBase.number) div c.persistBatchSize + 1 )
416444 it = base
417445
418446 steps.add base
419447
420448 while it.number > prevQueuedBase.number:
421449 if it.number == number:
422450 steps.add it
423- number -= min (number, PersistBatchSize )
451+ number -= min (number, c.persistBatchSize )
424452 it = it.parent
425453
426454 for i in countdown (steps.len- 1 , 0 ):
@@ -501,8 +529,15 @@ proc validateBlock(c: ForkedChainRef,
501529 # Entering base auto forward mode while avoiding forkChoice
502530 # handled region(head - baseDistance)
503531 # e.g. live syncing with the tip very far from from our latest head
532+ let
533+ offset = c.baseDistance + c.persistBatchSize
534+ number =
535+ if offset >= c.latestFinalizedBlockNumber:
536+ 0 .uint64
537+ else :
538+ c.latestFinalizedBlockNumber - offset
504539 if c.pendingFCU != zeroHash32 and
505- c.base.number < c.latestFinalizedBlockNumber - c.baseDistance - c.persistBatchSize :
540+ c.base.number < number :
506541 let
507542 base = c.calculateNewBase (c.latestFinalizedBlockNumber, c.latest)
508543 prevBase = c.base.number
@@ -579,6 +614,7 @@ proc init*(
579614 com: CommonRef ;
580615 baseDistance = BaseDistance ;
581616 persistBatchSize = PersistBatchSize ;
617+ dynamicBatchSize = false ;
582618 eagerStateRoot = false ;
583619 enableQueue = false ;
584620 ): T =
@@ -594,6 +630,8 @@ proc init*(
594630 # # This constructor also works well when resuming import after running
595631 # # `persistentBlocks()` used for `Era1` or `Era` import.
596632 # #
633+ doAssert (persistBatchSize > 0 )
634+
597635 let
598636 baseTxFrame = com.db.baseTxFrame ()
599637 base = baseTxFrame.getSavedStateBlockNumber
@@ -610,19 +648,20 @@ proc init*(
610648 fcuSafe = baseTxFrame.fcuSafe ().valueOr:
611649 FcuHashAndNumber (hash: baseHash, number: base)
612650 fc = T (
613- com: com,
614- base: baseBlock,
615- latest: baseBlock,
616- heads: @ [baseBlock],
617- hashToBlock: {baseHash: baseBlock}.toTable,
618- baseTxFrame: baseTxFrame,
619- baseDistance: baseDistance,
620- persistBatchSize:persistBatchSize,
621- quarantine: Quarantine .init (),
622- fcuHead: fcuHead,
623- fcuSafe: fcuSafe,
624- baseQueue: initDeque [BlockRef ](),
625- lastBaseLogTime: EthTime .now (),
651+ com: com,
652+ base: baseBlock,
653+ latest: baseBlock,
654+ heads: @ [baseBlock],
655+ hashToBlock: {baseHash: baseBlock}.toTable,
656+ baseTxFrame: baseTxFrame,
657+ baseDistance: baseDistance,
658+ persistBatchSize: persistBatchSize,
659+ dynamicBatchSize: dynamicBatchSize,
660+ quarantine: Quarantine .init (),
661+ fcuHead: fcuHead,
662+ fcuSafe: fcuSafe,
663+ baseQueue: initDeque [BlockRef ](),
664+ lastBaseLogTime: EthTime .now (),
626665 )
627666
628667 # updateFinalized will stop ancestor lineage
@@ -712,10 +751,8 @@ proc forkChoice*(c: ForkedChainRef,
712751 c.updateHead (head)
713752 c.updateFinalized (finalized, head)
714753
715- let
716- base = c.calculateNewBase (finalized.number, head)
717-
718- if base.number == c.base.number:
754+ let base = c.calculateNewBase (finalized.number, head)
755+ if base.number <= c.base.number:
719756 # The base is not updated, return.
720757 return ok ()
721758
0 commit comments