Skip to content

Commit

Permalink
Cache headers and logs using accepted depth instead of LRU (#1151)
Browse files Browse the repository at this point in the history
  • Loading branch information
darioush authored Dec 5, 2022
1 parent 37769e2 commit e665724
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 99 deletions.
80 changes: 64 additions & 16 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type CacheConfig struct {
SnapshotVerify bool // Verify generated snapshots
SkipSnapshotRebuild bool // Whether to skip rebuilding the snapshot in favor of returning an error (only set to true for tests)
Preimages bool // Whether to store preimage of trie key to the disk
AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip
TxLookupLimit uint64 // Number of recent blocks for which to maintain transaction lookup indices
}

Expand All @@ -169,6 +170,7 @@ var DefaultCacheConfig = &CacheConfig{
CommitInterval: 4096,
AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay
SnapshotLimit: 256,
AcceptedCacheSize: 32,
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -269,6 +271,9 @@ type BlockChain struct {
// [flattenLock] prevents the [acceptor] from flattening snapshots while
// a block is being verified.
flattenLock sync.Mutex

// [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs.
acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log]
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -297,23 +302,24 @@ func NewBlockChain(
Preimages: cacheConfig.Preimages,
StatsPrefix: trieCleanCacheStatsNamespace,
}),
bodyCache: bodyCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
senderCacher: newTxSenderCacher(runtime.NumCPU()),
acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit),
quit: make(chan struct{}),
bodyCache: bodyCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
senderCacher: newTxSenderCacher(runtime.NumCPU()),
acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit),
quit: make(chan struct{}),
acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine)
bc.hc, err = NewHeaderChain(db, chainConfig, cacheConfig, engine)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -371,6 +377,9 @@ func NewBlockChain(
bc.initSnapshot(head)
}

// Warm up [hc.acceptedNumberCache] and [acceptedLogsCache]
bc.warmAcceptedCaches()

// Start processing accepted blocks effects in the background
go bc.startAcceptor()

Expand Down Expand Up @@ -499,6 +508,41 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha
return bc.snaps.Flatten(hash)
}

// warmAcceptedCaches fetches previously accepted headers and logs from disk to
// pre-populate [hc.acceptedNumberCache] and [acceptedLogsCache].
func (bc *BlockChain) warmAcceptedCaches() {
var (
startTime = time.Now()
lastAccepted = bc.LastAcceptedBlock().NumberU64()
startIndex = uint64(1)
targetCacheSize = uint64(bc.cacheConfig.AcceptedCacheSize)
)
if targetCacheSize == 0 {
log.Info("Not warming accepted cache because disabled")
return
}
if lastAccepted < startIndex {
// This could occur if we haven't accepted any blocks yet
log.Info("Not warming accepted cache because there are no accepted blocks")
return
}
cacheDiff := targetCacheSize - 1 // last accepted lookback is inclusive, so we reduce size by 1
if cacheDiff < lastAccepted {
startIndex = lastAccepted - cacheDiff
}
for i := startIndex; i <= lastAccepted; i++ {
header := bc.GetHeaderByNumber(i)
if header == nil {
// This could happen if a node state-synced
log.Info("Exiting accepted cache warming early because header is nil", "height", i, "t", time.Since(startTime))
break
}
bc.hc.acceptedNumberCache.Put(header.Number.Uint64(), header)
bc.acceptedLogsCache.Put(header.Hash(), rawdb.ReadLogs(bc.db, header.Hash(), header.Number.Uint64()))
}
log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime))
}

// startAcceptor starts processing items on the [acceptorQueue]. If a [nil]
// object is placed on the [acceptorQueue], the [startAcceptor] will exit.
func (bc *BlockChain) startAcceptor() {
Expand All @@ -519,13 +563,16 @@ func (bc *BlockChain) startAcceptor() {
log.Crit("failed to write accepted block effects", "err", err)
}

// Fetch block logs
logs := bc.gatherBlockLogs(next.Hash(), next.NumberU64(), false)
// Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content
bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header())
logs := rawdb.ReadLogs(bc.db, next.Hash(), next.NumberU64())
bc.acceptedLogsCache.Put(next.Hash(), logs)

// Update accepted feeds
bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsAcceptedFeed.Send(logs)
flattenedLogs := types.FlattenLogs(logs)
bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs})
if len(flattenedLogs) > 0 {
bc.logsAcceptedFeed.Send(flattenedLogs)
}
if len(next.Transactions()) != 0 {
bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()})
Expand Down Expand Up @@ -978,6 +1025,7 @@ func (bc *BlockChain) Accept(block *types.Block) error {
}
}

// Enqueue block in the acceptor
bc.lastAccepted = block
bc.addAcceptorQueue(block)
acceptedBlockGasUsedCounter.Inc(int64(block.GasUsed()))
Expand Down
9 changes: 9 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,12 @@ func (bc *BlockChain) SubscribeAcceptedLogsEvent(ch chan<- []*types.Log) event.S
func (bc *BlockChain) SubscribeAcceptedTransactionEvent(ch chan<- NewTxsEvent) event.Subscription {
return bc.scope.Track(bc.txAcceptedFeed.Subscribe(ch))
}

// GetLogs fetches all logs from a given block.
func (bc *BlockChain) GetLogs(hash common.Hash, number uint64) [][]*types.Log {
logs, ok := bc.acceptedLogsCache.Get(hash) // this cache is thread-safe
if ok {
return logs
}
return rawdb.ReadLogs(bc.db, hash, number)
}
56 changes: 33 additions & 23 deletions core/bounded_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,55 @@

package core

import (
"github.com/ethereum/go-ethereum/common"
)

// BoundedBuffer keeps [size] common.Hash entries in a buffer and calls
// [callback] on any item that is evicted. This is typically used for
// BoundedBuffer keeps [size] entries of type [K] in a buffer and calls
// [callback] on any item that is overwritten. This is typically used for
// dereferencing old roots during block processing.
type BoundedBuffer struct {
//
// BoundedBuffer is not thread-safe and requires the caller synchronize usage.
type BoundedBuffer[K any] struct {
lastPos int
size int
callback func(common.Hash)
buffer []common.Hash
callback func(K)
buffer []K

cycled bool
}

// NewBoundedBuffer creates a new [BoundedBuffer].
func NewBoundedBuffer(size int, callback func(common.Hash)) *BoundedBuffer {
return &BoundedBuffer{
func NewBoundedBuffer[K any](size int, callback func(K)) *BoundedBuffer[K] {
return &BoundedBuffer[K]{
lastPos: -1,
size: size,
callback: callback,
buffer: make([]common.Hash, size),
buffer: make([]K, size),
}
}

// Insert adds a new common.Hash to the buffer. If the buffer is full, the
// oldest common.Hash will be evicted and [callback] will be invoked.
//
// WARNING: BoundedBuffer does not support the insertion of empty common.Hash.
// Inserting such data will cause unintended behavior.
func (b *BoundedBuffer) Insert(h common.Hash) {
nextPos := (b.lastPos + 1) % b.size // the first item added to the buffer will be at position 1
if b.buffer[nextPos] != (common.Hash{}) {
// Insert adds a new value to the buffer. If the buffer is full, the
// oldest value will be overwritten and [callback] will be invoked.
func (b *BoundedBuffer[K]) Insert(h K) {
nextPos := b.lastPos + 1 // the first item added to the buffer will be at position 0
if nextPos == b.size {
nextPos = 0
// Set [cycled] since we are back to the 0th element
b.cycled = true
}
if b.cycled {
// We ensure we have cycled through the buffer once before invoking the
// [callback] to ensure we don't call it with unset values.
b.callback(b.buffer[nextPos])
}
b.buffer[nextPos] = h
b.lastPos = nextPos
}

// Last retrieves the last item added to the buffer.
// If no items have been added to the buffer, Last returns an empty hash.
func (b *BoundedBuffer) Last() common.Hash {
return b.buffer[b.lastPos]
//
// If no items have been added to the buffer, Last returns the default value of
// [K] and [false].
func (b *BoundedBuffer[K]) Last() (K, bool) {
if b.lastPos == -1 {
return *new(K), false
}
return b.buffer[b.lastPos], true
}
70 changes: 70 additions & 0 deletions core/fifo_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// (c) 2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package core

import "sync"

var (
_ FIFOCache[int, int] = (*BufferFIFOCache[int, int])(nil)
_ FIFOCache[int, int] = (*NoOpFIFOCache[int, int])(nil)
)

// FIFOCache evicts the oldest element added to it after [limit] items are
// added.
type FIFOCache[K comparable, V any] interface {
Put(K, V)
Get(K) (V, bool)
}

// NewFIFOCache creates a new First-In-First-Out cache of size [limit].
//
// If a [limit] of 0 is passed as an argument, a no-op cache is returned that
// does nothing.
func NewFIFOCache[K comparable, V any](limit int) FIFOCache[K, V] {
if limit <= 0 {
return &NoOpFIFOCache[K, V]{}
}

c := &BufferFIFOCache[K, V]{
m: make(map[K]V, limit),
}
c.buffer = NewBoundedBuffer(limit, c.remove)
return c
}

type BufferFIFOCache[K comparable, V any] struct {
l sync.RWMutex

buffer *BoundedBuffer[K]
m map[K]V
}

func (f *BufferFIFOCache[K, V]) Put(key K, val V) {
f.l.Lock()
defer f.l.Unlock()

f.buffer.Insert(key) // Insert will remove the oldest [K] if we are at the [limit]
f.m[key] = val
}

func (f *BufferFIFOCache[K, V]) Get(key K) (V, bool) {
f.l.RLock()
defer f.l.RUnlock()

v, ok := f.m[key]
return v, ok
}

// remove is used as the callback in [BoundedBuffer]. It is assumed that the
// [WriteLock] is held when this is accessed.
func (f *BufferFIFOCache[K, V]) remove(key K) {
delete(f.m, key)
}

type NoOpFIFOCache[K comparable, V any] struct{}

func (f *NoOpFIFOCache[K, V]) Put(_ K, _ V) {}
func (f *NoOpFIFOCache[K, V]) Get(_ K) (V, bool) {
return *new(V), false
}
28 changes: 17 additions & 11 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,22 @@ type HeaderChain struct {
currentHeader atomic.Value // Current head of the header chain (may be above the block chain!)
currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time)

headerCache *lru.Cache // Cache for the most recent block headers
tdCache *lru.Cache // Cache for the most recent block total difficulties
numberCache *lru.Cache // Cache for the most recent block numbers
headerCache *lru.Cache // Cache for the most recent block headers
tdCache *lru.Cache // Cache for the most recent block total difficulties
numberCache *lru.Cache // Cache for the most recent block numbers
acceptedNumberCache FIFOCache[uint64, *types.Header] // Cache for most recent accepted heights to headers (only modified in accept)

rand *mrand.Rand
engine consensus.Engine
}

// NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points
// to the parent's interrupt semaphore.
func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine) (*HeaderChain, error) {
func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, cacheConfig *CacheConfig, engine consensus.Engine) (*HeaderChain, error) {
headerCache, _ := lru.New(headerCacheLimit)
tdCache, _ := lru.New(tdCacheLimit)
numberCache, _ := lru.New(numberCacheLimit)
acceptedNumberCache := NewFIFOCache[uint64, *types.Header](cacheConfig.AcceptedCacheSize)

// Seed a fast but crypto originating random generator
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
Expand All @@ -92,13 +94,14 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
}

hc := &HeaderChain{
config: config,
chainDb: chainDb,
headerCache: headerCache,
tdCache: tdCache,
numberCache: numberCache,
rand: mrand.New(mrand.NewSource(seed.Int64())),
engine: engine,
config: config,
chainDb: chainDb,
headerCache: headerCache,
tdCache: tdCache,
numberCache: numberCache,
acceptedNumberCache: acceptedNumberCache,
rand: mrand.New(mrand.NewSource(seed.Int64())),
engine: engine,
}

hc.genesisHeader = hc.GetHeaderByNumber(0)
Expand Down Expand Up @@ -170,6 +173,9 @@ func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool {
// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (hc *HeaderChain) GetHeaderByNumber(number uint64) *types.Header {
if cachedHeader, ok := hc.acceptedNumberCache.Get(number); ok {
return cachedHeader
}
hash := rawdb.ReadCanonicalHash(hc.chainDb, number)
if hash == (common.Hash{}) {
return nil
Expand Down
6 changes: 3 additions & 3 deletions core/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type cappedMemoryTrieWriter struct {
imageCap common.StorageSize
commitInterval uint64

tipBuffer *BoundedBuffer
tipBuffer *BoundedBuffer[common.Hash]
}

func (cm *cappedMemoryTrieWriter) InsertTrie(block *types.Block) error {
Expand Down Expand Up @@ -192,8 +192,8 @@ func (cm *cappedMemoryTrieWriter) RejectTrie(block *types.Block) error {
func (cm *cappedMemoryTrieWriter) Shutdown() error {
// If [tipBuffer] entry is empty, no need to do any cleanup on
// shutdown.
last := cm.tipBuffer.Last()
if last == (common.Hash{}) {
last, exists := cm.tipBuffer.Last()
if !exists {
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions core/types/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,12 @@ func (l *LogForStorage) DecodeRLP(s *rlp.Stream) error {
}
return err
}

// FlattenLogs converts a nested array of logs to a single array of logs.
func FlattenLogs(list [][]*Log) []*Log {
var flat []*Log
for _, logs := range list {
flat = append(flat, logs...)
}
return flat
}
Loading

0 comments on commit e665724

Please sign in to comment.