diff --git a/core/blockchain.go b/core/blockchain.go index 6fee535c9b..7e0449ff9a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -158,6 +158,7 @@ type CacheConfig struct { SnapshotVerify bool // Verify generated snapshots SkipSnapshotRebuild bool // Whether to skip rebuilding the snapshot in favor of returning an error (only set to true for tests) Preimages bool // Whether to store preimage of trie key to the disk + AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip TxLookupLimit uint64 // Number of recent blocks for which to maintain transaction lookup indices } @@ -169,6 +170,7 @@ var DefaultCacheConfig = &CacheConfig{ CommitInterval: 4096, AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay SnapshotLimit: 256, + AcceptedCacheSize: 32, } // BlockChain represents the canonical chain given a database with a genesis @@ -269,6 +271,9 @@ type BlockChain struct { // [flattenLock] prevents the [acceptor] from flattening snapshots while // a block is being verified. flattenLock sync.Mutex + + // [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs. + acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log] } // NewBlockChain returns a fully initialised block chain using information @@ -297,23 +302,24 @@ func NewBlockChain( Preimages: cacheConfig.Preimages, StatsPrefix: trieCleanCacheStatsNamespace, }), - bodyCache: bodyCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - txLookupCache: txLookupCache, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - senderCacher: newTxSenderCacher(runtime.NumCPU()), - acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), - quit: make(chan struct{}), + bodyCache: bodyCache, + receiptsCache: receiptsCache, + blockCache: blockCache, + txLookupCache: txLookupCache, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, + senderCacher: newTxSenderCacher(runtime.NumCPU()), + acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), + quit: make(chan struct{}), + acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize), } bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error - bc.hc, err = NewHeaderChain(db, chainConfig, engine) + bc.hc, err = NewHeaderChain(db, chainConfig, cacheConfig, engine) if err != nil { return nil, err } @@ -371,6 +377,9 @@ func NewBlockChain( bc.initSnapshot(head) } + // Warm up [hc.acceptedNumberCache] and [acceptedLogsCache] + bc.warmAcceptedCaches() + // Start processing accepted blocks effects in the background go bc.startAcceptor() @@ -499,6 +508,41 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha return bc.snaps.Flatten(hash) } +// warmAcceptedCaches fetches previously accepted headers and logs from disk to +// pre-populate [hc.acceptedNumberCache] and [acceptedLogsCache]. +func (bc *BlockChain) warmAcceptedCaches() { + var ( + startTime = time.Now() + lastAccepted = bc.LastAcceptedBlock().NumberU64() + startIndex = uint64(1) + targetCacheSize = uint64(bc.cacheConfig.AcceptedCacheSize) + ) + if targetCacheSize == 0 { + log.Info("Not warming accepted cache because disabled") + return + } + if lastAccepted < startIndex { + // This could occur if we haven't accepted any blocks yet + log.Info("Not warming accepted cache because there are no accepted blocks") + return + } + cacheDiff := targetCacheSize - 1 // last accepted lookback is inclusive, so we reduce size by 1 + if cacheDiff < lastAccepted { + startIndex = lastAccepted - cacheDiff + } + for i := startIndex; i <= lastAccepted; i++ { + header := bc.GetHeaderByNumber(i) + if header == nil { + // This could happen if a node state-synced + log.Info("Exiting accepted cache warming early because header is nil", "height", i, "t", time.Since(startTime)) + break + } + bc.hc.acceptedNumberCache.Put(header.Number.Uint64(), header) + bc.acceptedLogsCache.Put(header.Hash(), rawdb.ReadLogs(bc.db, header.Hash(), header.Number.Uint64())) + } + log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime)) +} + // startAcceptor starts processing items on the [acceptorQueue]. If a [nil] // object is placed on the [acceptorQueue], the [startAcceptor] will exit. func (bc *BlockChain) startAcceptor() { @@ -519,13 +563,16 @@ func (bc *BlockChain) startAcceptor() { log.Crit("failed to write accepted block effects", "err", err) } - // Fetch block logs - logs := bc.gatherBlockLogs(next.Hash(), next.NumberU64(), false) + // Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content + bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header()) + logs := rawdb.ReadLogs(bc.db, next.Hash(), next.NumberU64()) + bc.acceptedLogsCache.Put(next.Hash(), logs) // Update accepted feeds - bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: logs}) - if len(logs) > 0 { - bc.logsAcceptedFeed.Send(logs) + flattenedLogs := types.FlattenLogs(logs) + bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs}) + if len(flattenedLogs) > 0 { + bc.logsAcceptedFeed.Send(flattenedLogs) } if len(next.Transactions()) != 0 { bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()}) @@ -978,6 +1025,7 @@ func (bc *BlockChain) Accept(block *types.Block) error { } } + // Enqueue block in the acceptor bc.lastAccepted = block bc.addAcceptorQueue(block) acceptedBlockGasUsedCounter.Inc(int64(block.GasUsed())) diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index e812fdd01e..961636949c 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -334,3 +334,12 @@ func (bc *BlockChain) SubscribeAcceptedLogsEvent(ch chan<- []*types.Log) event.S func (bc *BlockChain) SubscribeAcceptedTransactionEvent(ch chan<- NewTxsEvent) event.Subscription { return bc.scope.Track(bc.txAcceptedFeed.Subscribe(ch)) } + +// GetLogs fetches all logs from a given block. +func (bc *BlockChain) GetLogs(hash common.Hash, number uint64) [][]*types.Log { + logs, ok := bc.acceptedLogsCache.Get(hash) // this cache is thread-safe + if ok { + return logs + } + return rawdb.ReadLogs(bc.db, hash, number) +} diff --git a/core/bounded_buffer.go b/core/bounded_buffer.go index c99042fa3e..b6170682d9 100644 --- a/core/bounded_buffer.go +++ b/core/bounded_buffer.go @@ -3,37 +3,42 @@ package core -import ( - "github.com/ethereum/go-ethereum/common" -) - -// BoundedBuffer keeps [size] common.Hash entries in a buffer and calls -// [callback] on any item that is evicted. This is typically used for +// BoundedBuffer keeps [size] entries of type [K] in a buffer and calls +// [callback] on any item that is overwritten. This is typically used for // dereferencing old roots during block processing. -type BoundedBuffer struct { +// +// BoundedBuffer is not thread-safe and requires the caller synchronize usage. +type BoundedBuffer[K any] struct { lastPos int size int - callback func(common.Hash) - buffer []common.Hash + callback func(K) + buffer []K + + cycled bool } // NewBoundedBuffer creates a new [BoundedBuffer]. -func NewBoundedBuffer(size int, callback func(common.Hash)) *BoundedBuffer { - return &BoundedBuffer{ +func NewBoundedBuffer[K any](size int, callback func(K)) *BoundedBuffer[K] { + return &BoundedBuffer[K]{ + lastPos: -1, size: size, callback: callback, - buffer: make([]common.Hash, size), + buffer: make([]K, size), } } -// Insert adds a new common.Hash to the buffer. If the buffer is full, the -// oldest common.Hash will be evicted and [callback] will be invoked. -// -// WARNING: BoundedBuffer does not support the insertion of empty common.Hash. -// Inserting such data will cause unintended behavior. -func (b *BoundedBuffer) Insert(h common.Hash) { - nextPos := (b.lastPos + 1) % b.size // the first item added to the buffer will be at position 1 - if b.buffer[nextPos] != (common.Hash{}) { +// Insert adds a new value to the buffer. If the buffer is full, the +// oldest value will be overwritten and [callback] will be invoked. +func (b *BoundedBuffer[K]) Insert(h K) { + nextPos := b.lastPos + 1 // the first item added to the buffer will be at position 0 + if nextPos == b.size { + nextPos = 0 + // Set [cycled] since we are back to the 0th element + b.cycled = true + } + if b.cycled { + // We ensure we have cycled through the buffer once before invoking the + // [callback] to ensure we don't call it with unset values. b.callback(b.buffer[nextPos]) } b.buffer[nextPos] = h @@ -41,7 +46,12 @@ func (b *BoundedBuffer) Insert(h common.Hash) { } // Last retrieves the last item added to the buffer. -// If no items have been added to the buffer, Last returns an empty hash. -func (b *BoundedBuffer) Last() common.Hash { - return b.buffer[b.lastPos] +// +// If no items have been added to the buffer, Last returns the default value of +// [K] and [false]. +func (b *BoundedBuffer[K]) Last() (K, bool) { + if b.lastPos == -1 { + return *new(K), false + } + return b.buffer[b.lastPos], true } diff --git a/core/fifo_cache.go b/core/fifo_cache.go new file mode 100644 index 0000000000..c941382f95 --- /dev/null +++ b/core/fifo_cache.go @@ -0,0 +1,70 @@ +// (c) 2021, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package core + +import "sync" + +var ( + _ FIFOCache[int, int] = (*BufferFIFOCache[int, int])(nil) + _ FIFOCache[int, int] = (*NoOpFIFOCache[int, int])(nil) +) + +// FIFOCache evicts the oldest element added to it after [limit] items are +// added. +type FIFOCache[K comparable, V any] interface { + Put(K, V) + Get(K) (V, bool) +} + +// NewFIFOCache creates a new First-In-First-Out cache of size [limit]. +// +// If a [limit] of 0 is passed as an argument, a no-op cache is returned that +// does nothing. +func NewFIFOCache[K comparable, V any](limit int) FIFOCache[K, V] { + if limit <= 0 { + return &NoOpFIFOCache[K, V]{} + } + + c := &BufferFIFOCache[K, V]{ + m: make(map[K]V, limit), + } + c.buffer = NewBoundedBuffer(limit, c.remove) + return c +} + +type BufferFIFOCache[K comparable, V any] struct { + l sync.RWMutex + + buffer *BoundedBuffer[K] + m map[K]V +} + +func (f *BufferFIFOCache[K, V]) Put(key K, val V) { + f.l.Lock() + defer f.l.Unlock() + + f.buffer.Insert(key) // Insert will remove the oldest [K] if we are at the [limit] + f.m[key] = val +} + +func (f *BufferFIFOCache[K, V]) Get(key K) (V, bool) { + f.l.RLock() + defer f.l.RUnlock() + + v, ok := f.m[key] + return v, ok +} + +// remove is used as the callback in [BoundedBuffer]. It is assumed that the +// [WriteLock] is held when this is accessed. +func (f *BufferFIFOCache[K, V]) remove(key K) { + delete(f.m, key) +} + +type NoOpFIFOCache[K comparable, V any] struct{} + +func (f *NoOpFIFOCache[K, V]) Put(_ K, _ V) {} +func (f *NoOpFIFOCache[K, V]) Get(_ K) (V, bool) { + return *new(V), false +} diff --git a/core/headerchain.go b/core/headerchain.go index 94b34058a3..b782959a2e 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -70,9 +70,10 @@ type HeaderChain struct { currentHeader atomic.Value // Current head of the header chain (may be above the block chain!) currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time) - headerCache *lru.Cache // Cache for the most recent block headers - tdCache *lru.Cache // Cache for the most recent block total difficulties - numberCache *lru.Cache // Cache for the most recent block numbers + headerCache *lru.Cache // Cache for the most recent block headers + tdCache *lru.Cache // Cache for the most recent block total difficulties + numberCache *lru.Cache // Cache for the most recent block numbers + acceptedNumberCache FIFOCache[uint64, *types.Header] // Cache for most recent accepted heights to headers (only modified in accept) rand *mrand.Rand engine consensus.Engine @@ -80,10 +81,11 @@ type HeaderChain struct { // NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points // to the parent's interrupt semaphore. -func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine) (*HeaderChain, error) { +func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, cacheConfig *CacheConfig, engine consensus.Engine) (*HeaderChain, error) { headerCache, _ := lru.New(headerCacheLimit) tdCache, _ := lru.New(tdCacheLimit) numberCache, _ := lru.New(numberCacheLimit) + acceptedNumberCache := NewFIFOCache[uint64, *types.Header](cacheConfig.AcceptedCacheSize) // Seed a fast but crypto originating random generator seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64)) @@ -92,13 +94,14 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c } hc := &HeaderChain{ - config: config, - chainDb: chainDb, - headerCache: headerCache, - tdCache: tdCache, - numberCache: numberCache, - rand: mrand.New(mrand.NewSource(seed.Int64())), - engine: engine, + config: config, + chainDb: chainDb, + headerCache: headerCache, + tdCache: tdCache, + numberCache: numberCache, + acceptedNumberCache: acceptedNumberCache, + rand: mrand.New(mrand.NewSource(seed.Int64())), + engine: engine, } hc.genesisHeader = hc.GetHeaderByNumber(0) @@ -170,6 +173,9 @@ func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool { // GetHeaderByNumber retrieves a block header from the database by number, // caching it (associated with its hash) if found. func (hc *HeaderChain) GetHeaderByNumber(number uint64) *types.Header { + if cachedHeader, ok := hc.acceptedNumberCache.Get(number); ok { + return cachedHeader + } hash := rawdb.ReadCanonicalHash(hc.chainDb, number) if hash == (common.Hash{}) { return nil diff --git a/core/state_manager.go b/core/state_manager.go index db0a7ba3f7..02521aa5f9 100644 --- a/core/state_manager.go +++ b/core/state_manager.go @@ -121,7 +121,7 @@ type cappedMemoryTrieWriter struct { imageCap common.StorageSize commitInterval uint64 - tipBuffer *BoundedBuffer + tipBuffer *BoundedBuffer[common.Hash] } func (cm *cappedMemoryTrieWriter) InsertTrie(block *types.Block) error { @@ -192,8 +192,8 @@ func (cm *cappedMemoryTrieWriter) RejectTrie(block *types.Block) error { func (cm *cappedMemoryTrieWriter) Shutdown() error { // If [tipBuffer] entry is empty, no need to do any cleanup on // shutdown. - last := cm.tipBuffer.Last() - if last == (common.Hash{}) { + last, exists := cm.tipBuffer.Last() + if !exists { return nil } diff --git a/core/types/log.go b/core/types/log.go index 8c429e9c3a..131ef8599a 100644 --- a/core/types/log.go +++ b/core/types/log.go @@ -148,3 +148,12 @@ func (l *LogForStorage) DecodeRLP(s *rlp.Stream) error { } return err } + +// FlattenLogs converts a nested array of logs to a single array of logs. +func FlattenLogs(list [][]*Log) []*Log { + var flat []*Log + for _, logs := range list { + flat = append(flat, logs...) + } + return flat +} diff --git a/eth/api_backend.go b/eth/api_backend.go index 14e1bf1d53..a41930d168 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -247,7 +247,10 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type } func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { - return rawdb.ReadLogs(b.eth.chainDb, hash, number), nil + if err := ctx.Err(); err != nil { + return nil, err + } + return b.eth.blockchain.GetLogs(hash, number), nil } func (b *EthAPIBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config) (*vm.EVM, func() error, error) { diff --git a/eth/backend.go b/eth/backend.go index 86eb9016bb..e940616383 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -213,6 +213,7 @@ func New( SnapshotVerify: config.SnapshotVerify, SkipSnapshotRebuild: config.SkipSnapshotRebuild, Preimages: config.Preimages, + AcceptedCacheSize: config.AcceptedCacheSize, TxLookupLimit: config.TxLookupLimit, } ) @@ -281,10 +282,8 @@ func (s *Ethereum) APIs() []rpc.API { apis = append(apis, s.stackRPCs...) // Create [filterSystem] with the log cache size set in the config. - ethcfg := s.APIBackend.eth.config filterSystem := filters.NewFilterSystem(s.APIBackend, filters.Config{ - LogCacheSize: ethcfg.FilterLogCacheSize, - Timeout: 5 * time.Minute, + Timeout: 5 * time.Minute, }) // Append all the local APIs and return diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 131ca7faf0..9733d59769 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -57,7 +57,7 @@ func NewDefaultConfig() Config { TrieDirtyCache: 256, TrieDirtyCommitTarget: 20, SnapshotCache: 256, - FilterLogCacheSize: 32, + AcceptedCacheSize: 32, Miner: miner.Config{}, TxPool: core.DefaultTxPoolConfig, RPCGasCap: 25000000, @@ -101,8 +101,9 @@ type Config struct { SnapshotCache int Preimages bool - // This is the number of blocks for which logs will be cached in the filter system. - FilterLogCacheSize int + // AcceptedCacheSize is the depth of accepted headers cache and accepted + // logs cache at the accepted tip. + AcceptedCacheSize int // Mining options Miner miner.Config diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 563cfcbfb3..17c7b10426 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -264,11 +264,11 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) { // Fast track: no filtering criteria if len(f.addresses) == 0 && len(f.topics) == 0 { - list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) + list, err := f.sys.getLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - return flatten(list), nil + return types.FlattenLogs(list), nil } else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) { return f.checkMatches(ctx, header) } @@ -278,12 +278,12 @@ func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) { - logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) + logsList, err := f.sys.getLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - unfiltered := flatten(logsList) + unfiltered := types.FlattenLogs(logsList) logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics) if len(logs) > 0 { // We have matching logs, check if we need to resolve full logs via the light client @@ -377,11 +377,3 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } return true } - -func flatten(list [][]*types.Log) []*types.Log { - var flat []*types.Log - for _, logs := range list { - flat = append(flat, logs...) - } - return flat -} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 5019119803..1b5e33fcf3 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -45,22 +45,17 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - lru "github.com/hashicorp/golang-lru" ) // Config represents the configuration of the filter system. type Config struct { - LogCacheSize int // maximum number of cached blocks (default: 32) - Timeout time.Duration // how long filters stay active (default: 5min) + Timeout time.Duration // how long filters stay active (default: 5min) } func (cfg Config) withDefaults() Config { if cfg.Timeout == 0 { cfg.Timeout = 5 * time.Minute } - if cfg.LogCacheSize == 0 { - cfg.LogCacheSize = 32 - } return cfg } @@ -93,33 +88,22 @@ type Backend interface { // FilterSystem holds resources shared by all filters. type FilterSystem struct { - backend Backend - logsCache *lru.Cache - cfg *Config + backend Backend + cfg *Config } // NewFilterSystem creates a filter system. func NewFilterSystem(backend Backend, config Config) *FilterSystem { config = config.withDefaults() - - cache, err := lru.New(config.LogCacheSize) - if err != nil { - panic(err) - } return &FilterSystem{ - backend: backend, - logsCache: cache, - cfg: &config, + backend: backend, + cfg: &config, } } -// cachedGetLogs loads block logs from the backend and caches the result. -func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { - cached, ok := sys.logsCache.Get(blockHash) - if ok { - return cached.([][]*types.Log), nil - } - +// getLogs loads block logs from the backend. The backend is responsible for +// performing any log caching. +func (sys *FilterSystem) getLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { logs, err := sys.backend.GetLogs(ctx, blockHash, number) if err != nil { return nil, err @@ -127,7 +111,6 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has if logs == nil { return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) } - sys.logsCache.Add(blockHash, logs) return logs, nil } @@ -626,7 +609,7 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // Get the logs of the block ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) + logsList, err := es.sys.getLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil } diff --git a/plugin/evm/atomic_trie.go b/plugin/evm/atomic_trie.go index 88ef781170..3e67a6822e 100644 --- a/plugin/evm/atomic_trie.go +++ b/plugin/evm/atomic_trie.go @@ -118,7 +118,7 @@ type atomicTrie struct { lastAcceptedRoot common.Hash // most recent trie root passed to accept trie or the root of the atomic trie on intialization. codec codec.Manager memoryCap common.StorageSize - tipBuffer *core.BoundedBuffer + tipBuffer *core.BoundedBuffer[common.Hash] } // newAtomicTrie returns a new instance of a atomicTrie with a configurable commitHeightInterval, used in testing. diff --git a/plugin/evm/config.go b/plugin/evm/config.go index 21a6878e96..d25dc1e1be 100644 --- a/plugin/evm/config.go +++ b/plugin/evm/config.go @@ -40,6 +40,7 @@ const ( defaultPopulateMissingTriesParallelism = 1024 defaultMaxOutboundActiveRequests = 16 defaultStateSyncServerTrieCache = 64 // MB + defaultAcceptedCacheSize = 32 // blocks // defaultStateSyncMinBlocks is the minimum number of blocks the blockchain // should be ahead of local last accepted to perform state sync. @@ -163,6 +164,13 @@ type Config struct { // identical state with the pre-upgrade ruleset. SkipUpgradeCheck bool `json:"skip-upgrade-check"` + // AcceptedCacheSize is the depth to keep in the accepted headers cache and the + // accepted logs cache at the accepted tip. + // + // This is particularly useful for improving the performance of eth_getLogs + // on RPC nodes. + AcceptedCacheSize int `json:"accepted-cache-size"` + // TxLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: // * 0: means no limit @@ -209,6 +217,7 @@ func (c *Config) SetDefaults() { c.StateSyncCommitInterval = defaultSyncableCommitInterval c.StateSyncMinBlocks = defaultStateSyncMinBlocks c.AllowUnprotectedTxHashes = defaultAllowUnprotectedTxHashes + c.AcceptedCacheSize = defaultAcceptedCacheSize } func (d *Duration) UnmarshalJSON(data []byte) (err error) { diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 9b9067cd5e..5633d0d574 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -453,6 +453,7 @@ func (vm *VM) Initialize( vm.ethConfig.OfflinePruningDataDirectory = vm.config.OfflinePruningDataDirectory vm.ethConfig.CommitInterval = vm.config.CommitInterval vm.ethConfig.SkipUpgradeCheck = vm.config.SkipUpgradeCheck + vm.ethConfig.AcceptedCacheSize = vm.config.AcceptedCacheSize vm.ethConfig.TxLookupLimit = vm.config.TxLookupLimit // Create directory for offline pruning