diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 27a9ec7412..29a3ec1615 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -324,8 +324,8 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r // NewPebbleDBDatabase creates a persistent key-value database without a freezer // moving immutable chain segments into cold storage. -func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool) (ethdb.Database, error) { - db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral) +func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool, extraOptions *pebble.ExtraOptions) (ethdb.Database, error) { + db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral, extraOptions) if err != nil { return nil, err } @@ -366,6 +366,8 @@ type OpenOptions struct { // Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of // a crash is not important. This option should typically be used in tests. Ephemeral bool + + PebbleExtraOptions *pebble.ExtraOptions } // openKeyValueDatabase opens a disk-based key-value database, e.g. leveldb or pebble. @@ -387,7 +389,7 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) { } if o.Type == dbPebble || existingDb == dbPebble { log.Info("Using pebble as the backing database") - return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions) } if o.Type == dbLeveldb || existingDb == dbLeveldb { log.Info("Using leveldb as the backing database") @@ -395,7 +397,7 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) { } // No pre-existing database, no user-requested one either. Default to Pebble. log.Info("Defaulting to pebble as the backing database") - return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions) } // Open opens both a disk-based key-value database such as leveldb or pebble, but also diff --git a/ethdb/pebble/extraoptions.go b/ethdb/pebble/extraoptions.go new file mode 100644 index 0000000000..787167c1cc --- /dev/null +++ b/ethdb/pebble/extraoptions.go @@ -0,0 +1,35 @@ +package pebble + +import "time" + +type ExtraOptions struct { + BytesPerSync int + L0CompactionFileThreshold int + L0CompactionThreshold int + L0StopWritesThreshold int + LBaseMaxBytes int64 + MemTableStopWritesThreshold int + MaxConcurrentCompactions func() int + DisableAutomaticCompactions bool + WALBytesPerSync int + WALDir string + WALMinSyncInterval func() time.Duration + TargetByteDeletionRate int + Experimental ExtraOptionsExperimental + Levels []ExtraLevelOptions +} + +type ExtraOptionsExperimental struct { + L0CompactionConcurrency int + CompactionDebtConcurrency uint64 + ReadCompactionRate int64 + ReadSamplingMultiplier int64 + MaxWriterConcurrency int + ForceWriterParallelism bool +} + +type ExtraLevelOptions struct { + BlockSize int + IndexBlockSize int + TargetFileSize int64 +} diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index af4686cf5b..e091d4543a 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -68,6 +68,25 @@ type Database struct { seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated + compDebtGauge metrics.Gauge + compInProgressGauge metrics.Gauge + + commitCountMeter metrics.Meter + commitTotalDurationMeter metrics.Meter + commitSemaphoreWaitMeter metrics.Meter + commitMemTableWriteStallMeter metrics.Meter + commitL0ReadAmpWriteStallMeter metrics.Meter + commitWALRotationMeter metrics.Meter + commitWaitMeter metrics.Meter + + commitCount atomic.Int64 + commitTotalDuration atomic.Int64 + commitSemaphoreWait atomic.Int64 + commitMemTableWriteStall atomic.Int64 + commitL0ReadAmpWriteStall atomic.Int64 + commitWALRotation atomic.Int64 + commitWait atomic.Int64 + levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag @@ -135,7 +154,38 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) { // New returns a wrapped pebble DB object. The namespace is the prefix that the // metrics reporting should use for surfacing internal stats. -func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) { +func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (*Database, error) { + if extraOptions == nil { + extraOptions = &ExtraOptions{} + } + if extraOptions.MemTableStopWritesThreshold <= 0 { + extraOptions.MemTableStopWritesThreshold = 2 + } + if extraOptions.MaxConcurrentCompactions == nil { + extraOptions.MaxConcurrentCompactions = func() int { return runtime.NumCPU() } + } + var levels []pebble.LevelOptions + if len(extraOptions.Levels) == 0 { + levels = []pebble.LevelOptions{ + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + } + } else { + for _, level := range extraOptions.Levels { + levels = append(levels, pebble.LevelOptions{ + BlockSize: level.BlockSize, + IndexBlockSize: level.IndexBlockSize, + TargetFileSize: level.TargetFileSize, + FilterPolicy: bloom.FilterPolicy(10), + }) + } + } + // Ensure we have some minimal caching and file guarantees if cache < minCache { cache = minCache @@ -160,7 +210,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e // Two memory tables is configured which is identical to leveldb, // including a frozen memory table and another live one. - memTableLimit := 2 + memTableLimit := extraOptions.MemTableStopWritesThreshold memTableSize := cache * 1024 * 1024 / 2 / memTableLimit // The memory table size is currently capped at maxMemTableSize-1 due to a @@ -198,19 +248,11 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e // The default compaction concurrency(1 thread), // Here use all available CPUs for faster compaction. - MaxConcurrentCompactions: func() int { return runtime.NumCPU() }, + MaxConcurrentCompactions: extraOptions.MaxConcurrentCompactions, - // Per-level options. Options for at least one level must be specified. The - // options for the last level are used for all subsequent levels. - Levels: []pebble.LevelOptions{ - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - }, + // Per-level extraOptions. Options for at least one level must be specified. The + // extraOptions for the last level are used for all subsequent levels. + Levels: levels, ReadOnly: readonly, EventListener: &pebble.EventListener{ CompactionBegin: db.onCompactionBegin, @@ -219,11 +261,31 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e WriteStallEnd: db.onWriteStallEnd, }, Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble + + BytesPerSync: extraOptions.BytesPerSync, + L0CompactionFileThreshold: extraOptions.L0CompactionFileThreshold, + L0CompactionThreshold: extraOptions.L0CompactionThreshold, + L0StopWritesThreshold: extraOptions.L0StopWritesThreshold, + LBaseMaxBytes: extraOptions.LBaseMaxBytes, + DisableAutomaticCompactions: extraOptions.DisableAutomaticCompactions, + WALBytesPerSync: extraOptions.WALBytesPerSync, + WALDir: extraOptions.WALDir, + WALMinSyncInterval: extraOptions.WALMinSyncInterval, + TargetByteDeletionRate: extraOptions.TargetByteDeletionRate, } // Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130 // for more details. opt.Experimental.ReadSamplingMultiplier = -1 + if opt.Experimental.ReadSamplingMultiplier != 0 { + opt.Experimental.ReadSamplingMultiplier = extraOptions.Experimental.ReadSamplingMultiplier + } + opt.Experimental.L0CompactionConcurrency = extraOptions.Experimental.L0CompactionConcurrency + opt.Experimental.CompactionDebtConcurrency = extraOptions.Experimental.CompactionDebtConcurrency + opt.Experimental.ReadCompactionRate = extraOptions.Experimental.ReadCompactionRate + opt.Experimental.MaxWriterConcurrency = extraOptions.Experimental.MaxWriterConcurrency + opt.Experimental.ForceWriterParallelism = extraOptions.Experimental.ForceWriterParallelism + // Open the db and recover any potential corruptions innerDB, err := pebble.Open(file, opt) if err != nil { @@ -245,6 +307,17 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e db.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil) db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil) + db.compDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/debt", nil) + db.compInProgressGauge = metrics.GetOrRegisterGauge(namespace+"compact/inprogress", nil) + + db.commitCountMeter = metrics.GetOrRegisterMeter(namespace+"commit/counter", nil) + db.commitTotalDurationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/total", nil) + db.commitSemaphoreWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/semaphorewait", nil) + db.commitMemTableWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/memtablewritestall", nil) + db.commitL0ReadAmpWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/l0readampwritestall", nil) + db.commitWALRotationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/walrotation", nil) + db.commitWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/commitwait", nil) + // Start up the metrics gathering and return go db.meter(metricsGatheringInterval, namespace) return db, nil @@ -457,6 +530,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) { compReads [2]int64 nWrites [2]int64 + + commitCounts [2]int64 + commitTotalDurations [2]int64 + commitSemaphoreWaits [2]int64 + commitMemTableWriteStalls [2]int64 + commitL0ReadAmpWriteStalls [2]int64 + commitWALRotations [2]int64 + commitWaits [2]int64 ) // Iterate ad infinitum and collect the stats @@ -472,6 +553,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) { writeDelayTime = d.writeDelayTime.Load() nonLevel0CompCount = int64(d.nonLevel0Comp.Load()) level0CompCount = int64(d.level0Comp.Load()) + + commitCount = d.commitCount.Load() + commitTotalDuration = d.commitTotalDuration.Load() + commitSemaphoreWait = d.commitSemaphoreWait.Load() + commitMemTableWriteStall = d.commitMemTableWriteStall.Load() + commitL0ReadAmpWriteStall = d.commitL0ReadAmpWriteStall.Load() + commitWALRotation = d.commitWALRotation.Load() + commitWait = d.commitWait.Load() ) writeDelayTimes[i%2] = writeDelayTime writeDelayCounts[i%2] = writeDelayCount @@ -522,6 +611,25 @@ func (d *Database) meter(refresh time.Duration, namespace string) { d.level0CompGauge.Update(level0CompCount) d.seekCompGauge.Update(stats.Compact.ReadCount) + commitCounts[i%2] = commitCount + commitTotalDurations[i%2] = commitTotalDuration + commitSemaphoreWaits[i%2] = commitSemaphoreWait + commitMemTableWriteStalls[i%2] = commitMemTableWriteStall + commitL0ReadAmpWriteStalls[i%2] = commitL0ReadAmpWriteStall + commitWALRotations[i%2] = commitWALRotation + commitWaits[i%2] = commitWait + + d.commitCountMeter.Mark(commitCounts[i%2] - commitCounts[(i-1)%2]) + d.commitTotalDurationMeter.Mark(commitTotalDurations[i%2] - commitTotalDurations[(i-1)%2]) + d.commitSemaphoreWaitMeter.Mark(commitSemaphoreWaits[i%2] - commitSemaphoreWaits[(i-1)%2]) + d.commitMemTableWriteStallMeter.Mark(commitMemTableWriteStalls[i%2] - commitMemTableWriteStalls[(i-1)%2]) + d.commitL0ReadAmpWriteStallMeter.Mark(commitL0ReadAmpWriteStalls[i%2] - commitL0ReadAmpWriteStalls[(i-1)%2]) + d.commitWALRotationMeter.Mark(commitWALRotations[i%2] - commitWALRotations[(i-1)%2]) + d.commitWaitMeter.Mark(commitWaits[i%2] - commitWaits[(i-1)%2]) + + d.compDebtGauge.Update(int64(stats.Compact.EstimatedDebt)) + d.compInProgressGauge.Update(stats.Compact.NumInProgress) + for i, level := range stats.Levels { // Append metrics for additional layers if i >= len(d.levelsGauge) { @@ -576,7 +684,20 @@ func (b *batch) Write() error { if b.db.closed { return pebble.ErrClosed } - return b.b.Commit(b.db.writeOptions) + err := b.b.Commit(b.db.writeOptions) + if err != nil { + return err + } + stats := b.b.CommitStats() + b.db.commitCount.Add(1) + b.db.commitTotalDuration.Add(int64(stats.TotalDuration)) + b.db.commitSemaphoreWait.Add(int64(stats.SemaphoreWaitDuration)) + b.db.commitMemTableWriteStall.Add(int64(stats.MemTableWriteStallDuration)) + b.db.commitL0ReadAmpWriteStall.Add(int64(stats.L0ReadAmpWriteStallDuration)) + b.db.commitWALRotation.Add(int64(stats.WALRotationDuration)) + b.db.commitWait.Add(int64(stats.CommitWaitDuration)) + // TODO add metric for stats.WALQueueWaitDuration when it will be used by pebble (currently it is always 0) + return nil } // Reset resets the batch for reuse. diff --git a/node/node.go b/node/node.go index 4dc856c345..51fbce0c00 100644 --- a/node/node.go +++ b/node/node.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/pebble" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -714,6 +715,10 @@ func (n *Node) EventMux() *event.TypeMux { // previous can be found) from within the node's instance directory. If the node is // ephemeral, a memory database is returned. func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, readonly bool) (ethdb.Database, error) { + return n.OpenDatabaseWithExtraOptions(name, cache, handles, namespace, readonly, nil) +} + +func (n *Node) OpenDatabaseWithExtraOptions(name string, cache, handles int, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -726,12 +731,13 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r db = rawdb.NewMemoryDatabase() } else { db, err = rawdb.Open(rawdb.OpenOptions{ - Type: n.config.DBEngine, - Directory: n.ResolvePath(name), - Namespace: namespace, - Cache: cache, - Handles: handles, - ReadOnly: readonly, + Type: n.config.DBEngine, + Directory: n.ResolvePath(name), + Namespace: namespace, + Cache: cache, + Handles: handles, + ReadOnly: readonly, + PebbleExtraOptions: pebbleExtraOptions, }) } @@ -747,6 +753,10 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r // database to immutable append-only files. If the node is an ephemeral one, a // memory database is returned. func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) { + return n.OpenDatabaseWithFreezerWithExtraOptions(name, cache, handles, ancient, namespace, readonly, nil) +} + +func (n *Node) OpenDatabaseWithFreezerWithExtraOptions(name string, cache, handles int, ancient string, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -758,13 +768,14 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient db = rawdb.NewMemoryDatabase() } else { db, err = rawdb.Open(rawdb.OpenOptions{ - Type: n.config.DBEngine, - Directory: n.ResolvePath(name), - AncientsDirectory: n.ResolveAncient(name, ancient), - Namespace: namespace, - Cache: cache, - Handles: handles, - ReadOnly: readonly, + Type: n.config.DBEngine, + Directory: n.ResolvePath(name), + AncientsDirectory: n.ResolveAncient(name, ancient), + Namespace: namespace, + Cache: cache, + Handles: handles, + ReadOnly: readonly, + PebbleExtraOptions: pebbleExtraOptions, }) }