Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e53b771
core: implemented witness state caching
pratikspatil024 Oct 10, 2025
3d66730
core/state: added helper functions
pratikspatil024 Oct 10, 2025
57ba58f
Merge branch 'develop' of https://github.com/maticnetwork/bor into ps…
pratikspatil024 Oct 13, 2025
62bd494
core: updates in witness state caching
pratikspatil024 Oct 13, 2025
d4707da
core: updated the witness state caching mechanism to now use 2 slidin…
pratikspatil024 Oct 23, 2025
46139f2
core: added a way for full nodes to replicate the witness state cache
pratikspatil024 Oct 24, 2025
7492ecc
Merge branch 'develop' of https://github.com/maticnetwork/bor into ps…
pratikspatil024 Oct 27, 2025
c6f6ac8
Merge branch 'psp-pos-2955' of https://github.com/maticnetwork/bor in…
pratikspatil024 Oct 27, 2025
464072b
eth/protocols/wit: added message type to get and send reduced witnesses
pratikspatil024 Oct 27, 2025
4d42f38
eth, core: added logic for reduced witnesses
pratikspatil024 Oct 28, 2025
211e129
core, eth: sending reduced witness
pratikspatil024 Oct 29, 2025
3dfe7ad
eth: updated mock witness peer
pratikspatil024 Oct 29, 2025
3683987
fix lint
pratikspatil024 Oct 29, 2025
737856f
updated name reduced -> compact
pratikspatil024 Oct 30, 2025
f05239e
core: made compactWitnessCacheWindowSize and compactWitnessCacheOverl…
pratikspatil024 Oct 30, 2025
40a9051
core: added timer metric here to understand the performance impact of…
pratikspatil024 Oct 30, 2025
bee6136
eth: added caching for compact witnesses
pratikspatil024 Oct 30, 2025
ee7342f
core, eth: made parallel stateless import and compact witness (witnes…
pratikspatil024 Oct 31, 2025
61b1a40
core: bug fix in witness caching - prevent cache bloat from recaching…
pratikspatil024 Oct 31, 2025
eddf235
core, eth: bug fix - setting the correct start block accross the nodes
pratikspatil024 Oct 31, 2025
d5a365d
Merge branch 'psp-pos-2955' of https://github.com/maticnetwork/bor in…
pratikspatil024 Oct 31, 2025
f95b0d6
eth: use full witness if peer is wit/1
pratikspatil024 Nov 3, 2025
6127cb1
eth: bug fix: fixed compatibility between wit/1 and wit/2 caused by u…
pratikspatil024 Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 147 additions & 14 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/ethereum/go-ethereum/triedb/pathdb"
Expand Down Expand Up @@ -344,6 +345,10 @@
// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]

// Span state cache for reducing witness bandwidth
spanStateCache *lru.Cache[string, struct{}] // Cache state nodes within current span
currentcacheSpan uint64 // Track current span for boundary detection

wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
Expand Down Expand Up @@ -405,20 +410,26 @@
log.Info("")

bc := &BlockChain{
chainConfig: chainConfig,
cfg: cfg,
db: db,
triedb: triedb,
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,
chainConfig: chainConfig,
cfg: cfg,
db: db,
triedb: triedb,
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,

// Initialize span state cache with sufficient capacity for one span
// Typical span has ~100K-500K unique state nodes
spanStateCache: lru.NewCache[string, struct{}](1000000),
currentcacheSpan: 0, // Will be set on first block processing

borReceiptsCache: lru.NewCache[common.Hash, *types.Receipt](receiptsCacheLimit),
borReceiptsRLPCache: lru.NewCache[common.Hash, rlp.RawValue](receiptsCacheLimit),
logger: cfg.VmConfig.Tracer,
Expand Down Expand Up @@ -3885,12 +3896,91 @@
return bc.scope.Track(bc.chain2HeadFeed.Subscribe(ch))
}

// mergeSpanCacheIntoWitness merges cached state nodes from the span cache into the witness.
// This allows reduced witnesses to work by adding previously cached state data.
func (bc *BlockChain) mergeSpanCacheIntoWitness(witness *stateless.Witness) int {
if bc.spanStateCache == nil || witness == nil {
return 0
}

mergedCount := 0
keys := bc.spanStateCache.Keys()

for _, key := range keys {
stateNode := key
// Check if this state node is missing from the witness
if _, exists := witness.State[stateNode]; !exists {
// Add it from cache
witness.State[stateNode] = struct{}{}
mergedCount++
}
}

return mergedCount
}

const (
defaultSpanLength = 6400 // Default span length i.e. number of bor blocks in a span
zerothSpanEnd = 255 // End block of 0th span
)

func isSpanEnd(blockNum uint64) bool {

Check failure on line 3927 in core/blockchain.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-22.04)

func isSpanEnd is unused (unused)
if blockNum > zerothSpanEnd {
return (blockNum-zerothSpanEnd)%defaultSpanLength == 0
}
return blockNum == zerothSpanEnd
}

func isSpanStart(blockNum uint64) bool {
if blockNum > zerothSpanEnd {
return (blockNum-zerothSpanEnd-1)%defaultSpanLength == 0
}
return blockNum == 0
}

func getSpanNum(blockNum uint64) uint64 {
if blockNum > zerothSpanEnd {
return (blockNum - zerothSpanEnd) / defaultSpanLength
}
return 0
}

// ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses.
func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, *ProcessResult, error) {
if witness == nil {
return nil, nil, errors.New("nil witness")
}

blockNum := block.Number().Uint64()

// Span boundary detection and cache management using isSpanStart
if bc.spanStateCache != nil && isSpanStart(blockNum) {
oldCacheSize := bc.spanStateCache.Len()
bc.spanStateCache.Purge()

// Calculate span number for logging
spanNum := getSpanNum(blockNum)
bc.currentcacheSpan = spanNum

log.Info("Span boundary: flushed state cache",
"block", blockNum,
"spanNum", spanNum,
"cachedStates", oldCacheSize)
}

// Merge cached states into witness for blocks that aren't the first in span
if !isSpanStart(blockNum) {
witnessStatesBefore := len(witness.State)
mergedCount := bc.mergeSpanCacheIntoWitness(witness)
if mergedCount > 0 {
log.Debug("Merged cached states into witness",
"block", blockNum,
"witnessStatesBefore", witnessStatesBefore,
"mergedFromCache", mergedCount,
"witnessStatesAfter", len(witness.State))
}
}

// Validate witness.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this before the merge (cache -> witness.state). Let me know if you think the same. @cffls @lucca30

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understood the reason behind it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We validate the witness before the execution. Now once we have the cache, I merge the cache into the witness, so my question was to do this witness validation before the merge or leave it after the merge.

if err := stateless.ValidateWitnessPreState(witness, bc); err != nil {
log.Error("Witness validation failed during stateless processing", "blockNumber", block.Number(), "blockHash", block.Hash(), "err", err)
Expand Down Expand Up @@ -3925,6 +4015,49 @@
err = fmt.Errorf("stateless self-validation receipt root mismatch: remote %x != local %x", block.ReceiptHash(), crossReceiptRoot)
return nil, nil, err
}

// Cache witness states and updated states for subsequent blocks in this span
if bc.spanStateCache != nil {
cachedWitnessStates := 0
cachedUpdatedStates := 0

// Cache all witness state nodes
for stateNode := range witness.State {
if _, exists := bc.spanStateCache.Get(stateNode); !exists {
bc.spanStateCache.Add(stateNode, struct{}{})
cachedWitnessStates++
}
}

// Extract and cache updated states from execution
if statedb != nil {
_, stateUpdate, _ := statedb.CommitAndReturnStateUpdate(blockNum, bc.chainConfig.IsEIP158(block.Number()))
if stateUpdate != nil && stateUpdate.Nodes != nil {
nodes := stateUpdate.Nodes

// Iterate through all node sets
for owner := range nodes.Sets {
subset := nodes.Sets[owner]
subset.ForEachWithOrder(func(path string, n *trienode.Node) {
if !n.IsDeleted() && n.Blob != nil {
stateNode := string(n.Blob)
if _, exists := bc.spanStateCache.Get(stateNode); !exists {
bc.spanStateCache.Add(stateNode, struct{}{})
cachedUpdatedStates++
}
}
})
}
}
}

log.Debug("Cached states for span",
"block", blockNum,
"cachedWitnessStates", cachedWitnessStates,
"cachedUpdatedStates", cachedUpdatedStates,
"totalCacheSize", bc.spanStateCache.Len())
}

return statedb, res, nil
}

Expand Down
10 changes: 9 additions & 1 deletion core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,7 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag
// If trie database is enabled, commit the state update as a new layer
if db := s.db.TrieDB(); db != nil {
start := time.Now()
if err := db.Update(ret.root, ret.originRoot, block, ret.nodes, ret.stateSet()); err != nil {
if err := db.Update(ret.root, ret.originRoot, block, ret.Nodes, ret.stateSet()); err != nil {
return nil, err
}
s.TrieDBCommits += time.Since(start)
Expand All @@ -1802,6 +1802,14 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag
return ret, err
}

func (s *StateDB) CommitAndReturnStateUpdate(block uint64, deleteEmptyObjects bool) (common.Hash, *stateUpdate, error) {
ret, err := s.commitAndFlush(block, deleteEmptyObjects, true)
if err != nil {
return common.Hash{}, nil, err
}
return ret.root, ret, nil
}

// Commit writes the state mutations into the configured data stores.
//
// Once the state is committed, tries cached in stateDB (including account
Expand Down
4 changes: 2 additions & 2 deletions core/state/stateupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type stateUpdate struct {
rawStorageKey bool

codes map[common.Address]contractCode // codes contains the set of dirty codes
nodes *trienode.MergedNodeSet // Aggregated dirty nodes caused by state changes
Nodes *trienode.MergedNodeSet // Aggregated dirty nodes caused by state changes
}

// empty returns a flag indicating the state transition is empty or not.
Expand Down Expand Up @@ -170,7 +170,7 @@ func newStateUpdate(rawStorageKey bool, originRoot common.Hash, root common.Hash
storagesOrigin: storagesOrigin,
rawStorageKey: rawStorageKey,
codes: codes,
nodes: nodes,
Nodes: nodes,
}
}

Expand Down
Loading