From 9c9c28c41dc4532b4f6f8b08060f66a2154282f1 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Fri, 31 Oct 2025 11:39:30 -0400 Subject: [PATCH 01/15] refactor(reexcute/c): generic VM reexecution --- tests/reexecute/benchmark_executor.go | 221 ++++++++++++ tests/reexecute/c/vm_reexecute_test.go | 471 ++----------------------- tests/reexecute/export.go | 41 +++ tests/reexecute/metrics.go | 127 +++++++ tests/reexecute/vm_executor.go | 195 ++++++++++ 5 files changed, 622 insertions(+), 433 deletions(-) create mode 100644 tests/reexecute/benchmark_executor.go create mode 100644 tests/reexecute/export.go create mode 100644 tests/reexecute/metrics.go create mode 100644 tests/reexecute/vm_executor.go diff --git a/tests/reexecute/benchmark_executor.go b/tests/reexecute/benchmark_executor.go new file mode 100644 index 000000000000..5635768a3191 --- /dev/null +++ b/tests/reexecute/benchmark_executor.go @@ -0,0 +1,221 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package reexecute + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/chains/atomic" + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/enginetest" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/validators/validatorstest" + "github.com/ava-labs/avalanchego/tests" + "github.com/ava-labs/avalanchego/upgrade" + "github.com/ava-labs/avalanchego/utils/constants" + "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +var ( + MainnetCChainID = ids.FromStringOrPanic("2q9e4r6Mu3U68nU1fYjgbR6JvwrRx36CohpAX5UQxse55x1Q5") + + mainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") + mainnetAvaxAssetID = ids.FromStringOrPanic("FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z") +) + +type BenchmarkExecutorConfig struct { + // The directory where blocks will be sourced from. + BlockDir string + + // StartBlock is the height of the first block that will be executed. + StartBlock uint64 + // EndBlock is the height of the last block that will be executed. + EndBlock uint64 + // ChanSize is the size of the channel to use for block processing. + ChanSize int + // ExecutionTimeout is the duration to run the reexecution test for before + // termininating (without error). If 0, no timeout is applied. + ExecutionTimeout time.Duration + + // PrefixGatherer is the top-most gatherer where all metrics can be derived + // from (e.g. VM metrics, consensus metrics). If MetricsServerEnabled is + // true, then the metrics server will export metrics from this gatherer. + PrefixGatherer metrics.MultiGatherer + // ConsensusRegistry is the registry where a subset of the metrics from snowman consensus + // [engine](../../snow/engine/snowman/metrics.go) will be registered. + ConsensusRegistry prometheus.Registerer + + // MetricsServerEnabled determines whether to enable a Prometheus server + // exporting VM metrics. + MetricsServerEnabled bool + // MetricsPort is the port where the metrics server will listen to. + MetricsPort uint64 + // MetricsCollectorEnabled determines whether to start a Prometheus + // collector. + MetricsCollectorEnabled bool + // MetricsLabels represents the set of labels that will be attached to all + // exported metrics. + MetricsLabels map[string]string + // SDConfigName is the name of the SDConfig file. + SDConfigName string + // NetworkUUID is the unique identifier corresponding to the benchmark. + NetworkUUID string + // DashboardPath is the relative Grafana dashboard path used to construct + // the metrics visualization URL when MetricsCollectorEnabled is true. + // Expected format: "d/dashboard-id/dashboard-name" (e.g., "d/Gl1I20mnk/c-chain"). + DashboardPath string +} + +// BenchmarkExecutor is a tool for executing a sequence of blocks against a VM. +type BenchmarkExecutor struct { + config BenchmarkExecutorConfig +} + +func NewBenchmarkExecutor(config BenchmarkExecutorConfig) BenchmarkExecutor { + return BenchmarkExecutor{config: config} +} + +// Run executes a sequence of blocks from StartBlock to EndBlock against the +// provided VM. It also manages metrics collection and reporting by optionally +// starting a Prometheus server and collector based on the executor's +// configuration. +func (e BenchmarkExecutor) Run(b testing.TB, log logging.Logger, vm block.ChainVM) { + r := require.New(b) + ctx := b.Context() + + if e.config.MetricsServerEnabled { + serverAddr := startServer(b, log, e.config.PrefixGatherer, e.config.MetricsPort) + + if e.config.MetricsCollectorEnabled { + startCollector( + b, + log, + e.config.SDConfigName, + e.config.MetricsLabels, + serverAddr, + e.config.NetworkUUID, + e.config.DashboardPath, + ) + } + } + + blockChan, err := createBlockChanFromLevelDB( + b, + e.config.BlockDir, + e.config.StartBlock, + e.config.EndBlock, + e.config.ChanSize, + ) + r.NoError(err) + + vmExecutor, err := newVMExecutor( + tests.NewDefaultLogger("vm-executor"), + vm, + e.config.ConsensusRegistry, + e.config.ExecutionTimeout, + e.config.StartBlock, + e.config.EndBlock, + ) + r.NoError(err) + + r.NoError(vmExecutor.executeSequence(ctx, blockChan)) +} + +// NewMainnetVM creates and initializes a VM configured for mainnet block +// reexecution tests. The VM is initialized with mainnet-specific settings +// including the mainnet network ID, upgrade schedule, and chain configurations. +// Both subnetID and chainID must correspond to subnets/chains that exist on mainnet. +func NewMainnetVM( + ctx context.Context, + factory vms.Factory, + db database.Database, + chainDataDir string, + genesisBytes []byte, + upgradeBytes []byte, + configBytes []byte, + subnetID ids.ID, + chainID ids.ID, + metricsGatherer metrics.MultiGatherer, +) (block.ChainVM, error) { + vmIntf, err := factory.New(logging.NoLog{}) + if err != nil { + return nil, fmt.Errorf("failed to create VM from factory: %w", err) + } + vm := vmIntf.(block.ChainVM) + + blsKey, err := localsigner.New() + if err != nil { + return nil, fmt.Errorf("failed to create BLS key: %w", err) + } + + blsPublicKey := blsKey.PublicKey() + warpSigner := warp.NewSigner(blsKey, constants.MainnetID, chainID) + + sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), db) + atomicMemory := atomic.NewMemory(sharedMemoryDB) + + chainIDToSubnetID := map[ids.ID]ids.ID{ + mainnetXChainID: constants.PrimaryNetworkID, + MainnetCChainID: constants.PrimaryNetworkID, + chainID: subnetID, + ids.Empty: constants.PrimaryNetworkID, + } + + if err := vm.Initialize( + ctx, + &snow.Context{ + NetworkID: constants.MainnetID, + SubnetID: subnetID, + ChainID: chainID, + NodeID: ids.GenerateTestNodeID(), + PublicKey: blsPublicKey, + NetworkUpgrades: upgrade.Mainnet, + + XChainID: mainnetXChainID, + CChainID: MainnetCChainID, + AVAXAssetID: mainnetAvaxAssetID, + + Log: tests.NewDefaultLogger("mainnet-vm-reexecution"), + SharedMemory: atomicMemory.NewSharedMemory(chainID), + BCLookup: ids.NewAliaser(), + Metrics: metricsGatherer, + + WarpSigner: warpSigner, + + ValidatorState: &validatorstest.State{ + GetSubnetIDF: func(_ context.Context, chainID ids.ID) (ids.ID, error) { + subnetID, ok := chainIDToSubnetID[chainID] + if ok { + return subnetID, nil + } + return ids.Empty, fmt.Errorf("unknown chainID: %s", chainID) + }, + }, + ChainDataDir: chainDataDir, + }, + prefixdb.New([]byte("vm"), db), + genesisBytes, + upgradeBytes, + configBytes, + nil, + &enginetest.Sender{}, + ); err != nil { + return nil, fmt.Errorf("failed to initialize VM: %w", err) + } + + return vm, nil +} diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 124ad589e571..64b5e9a53f55 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -4,15 +4,12 @@ package vm import ( - "context" - "encoding/binary" "flag" "fmt" "maps" "os" "path/filepath" "slices" - "strconv" "strings" "testing" "time" @@ -25,31 +22,11 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/api/metrics" - "github.com/ava-labs/avalanchego/chains/atomic" - "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/leveldb" - "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/genesis" - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" - "github.com/ava-labs/avalanchego/snow/engine/enginetest" - "github.com/ava-labs/avalanchego/snow/engine/snowman/block" - "github.com/ava-labs/avalanchego/snow/validators/validatorstest" "github.com/ava-labs/avalanchego/tests" - "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" - "github.com/ava-labs/avalanchego/upgrade" + "github.com/ava-labs/avalanchego/tests/reexecute" "github.com/ava-labs/avalanchego/utils/constants" - "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/timer" - "github.com/ava-labs/avalanchego/utils/units" - "github.com/ava-labs/avalanchego/vms/platformvm/warp" -) - -var ( - mainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") - mainnetCChainID = ids.FromStringOrPanic("2q9e4r6Mu3U68nU1fYjgbR6JvwrRx36CohpAX5UQxse55x1Q5") - mainnetAvaxAssetID = ids.FromStringOrPanic("FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z") ) var ( @@ -192,35 +169,15 @@ func benchmarkReexecuteRange( consensusRegistry := prometheus.NewRegistry() r.NoError(prefixGatherer.Register("avalanche_snowman", consensusRegistry)) - log := tests.NewDefaultLogger("c-chain-reexecution") - - if metricsServerEnabled { - serverAddr := startServer(b, log, prefixGatherer, metricsPort) + genesisConfig := genesis.GetConfig(constants.MainnetID) - if metricsCollectorEnabled { - startCollector(b, log, "c-chain-reexecution", labels, serverAddr) - } - } + log := tests.NewDefaultLogger("c-chain-reexecution") + dbLogger := tests.NewDefaultLogger("db") var ( vmDBDir = filepath.Join(currentStateDir, "db") chainDataDir = filepath.Join(currentStateDir, "chain-data-dir") ) - - log.Info("re-executing block range with params", - zap.String("block-dir", blockDir), - zap.String("vm-db-dir", vmDBDir), - zap.String("chain-data-dir", chainDataDir), - zap.Uint64("start-block", startBlock), - zap.Uint64("end-block", endBlock), - zap.Int("chan-size", chanSize), - ) - - blockChan, err := createBlockChanFromLevelDB(b, blockDir, startBlock, endBlock, chanSize) - r.NoError(err) - - dbLogger := tests.NewDefaultLogger("db") - db, err := leveldb.New(vmDBDir, nil, dbLogger, prometheus.NewRegistry()) r.NoError(err) defer func() { @@ -228,11 +185,16 @@ func benchmarkReexecuteRange( r.NoError(db.Close()) }() - vm, err := newMainnetCChainVM( + vm, err := reexecute.NewMainnetVM( ctx, + &factory.Factory{}, db, chainDataDir, + []byte(genesisConfig.CChainGenesis), + nil, configBytes, + constants.PrimaryNetworkID, + reexecute.MainnetCChainID, vmMultiGatherer, ) r.NoError(err) @@ -241,401 +203,44 @@ func benchmarkReexecuteRange( r.NoError(vm.Shutdown(ctx)) }() - config := vmExecutorConfig{ - Log: tests.NewDefaultLogger("vm-executor"), - Registry: consensusRegistry, - ExecutionTimeout: executionTimeout, - StartBlock: startBlock, - EndBlock: endBlock, + config := reexecute.BenchmarkExecutorConfig{ + BlockDir: blockDir, + StartBlock: startBlock, + EndBlock: endBlock, + ChanSize: chanSize, + ExecutionTimeout: executionTimeout, + PrefixGatherer: prefixGatherer, + ConsensusRegistry: consensusRegistry, + MetricsServerEnabled: metricsServerEnabled, + MetricsPort: metricsPort, + MetricsCollectorEnabled: metricsCollectorEnabled, + MetricsLabels: labels, + SDConfigName: "c-chain-reexecution", + NetworkUUID: networkUUID, + DashboardPath: "d/Gl1I20mnk/c-chain", } - executor, err := newVMExecutor(vm, config) - r.NoError(err) + + log.Info("re-executing block range with params", + zap.String("block-dir", blockDir), + zap.String("vm-db-dir", vmDBDir), + zap.String("chain-data-dir", chainDataDir), + zap.Uint64("start-block", startBlock), + zap.Uint64("end-block", endBlock), + zap.Int("chan-size", chanSize), + ) + + executor := reexecute.NewBenchmarkExecutor(config) start := time.Now() - r.NoError(executor.executeSequence(ctx, blockChan)) + executor.Run(b, log, vm) elapsed := time.Since(start) b.ReportMetric(0, "ns/op") // Set default ns/op to 0 to hide from the output getTopLevelMetrics(b, prefixGatherer, elapsed) // Report the desired top-level metrics } -func newMainnetCChainVM( - ctx context.Context, - vmAndSharedMemoryDB database.Database, - chainDataDir string, - configBytes []byte, - metricsGatherer metrics.MultiGatherer, -) (block.ChainVM, error) { - factory := factory.Factory{} - vmIntf, err := factory.New(logging.NoLog{}) - if err != nil { - return nil, fmt.Errorf("failed to create VM from factory: %w", err) - } - vm := vmIntf.(block.ChainVM) - - blsKey, err := localsigner.New() - if err != nil { - return nil, fmt.Errorf("failed to create BLS key: %w", err) - } - - blsPublicKey := blsKey.PublicKey() - warpSigner := warp.NewSigner(blsKey, constants.MainnetID, mainnetCChainID) - - genesisConfig := genesis.GetConfig(constants.MainnetID) - - sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), vmAndSharedMemoryDB) - atomicMemory := atomic.NewMemory(sharedMemoryDB) - - chainIDToSubnetID := map[ids.ID]ids.ID{ - mainnetXChainID: constants.PrimaryNetworkID, - mainnetCChainID: constants.PrimaryNetworkID, - ids.Empty: constants.PrimaryNetworkID, - } - - if err := vm.Initialize( - ctx, - &snow.Context{ - NetworkID: constants.MainnetID, - SubnetID: constants.PrimaryNetworkID, - ChainID: mainnetCChainID, - NodeID: ids.GenerateTestNodeID(), - PublicKey: blsPublicKey, - NetworkUpgrades: upgrade.Mainnet, - - XChainID: mainnetXChainID, - CChainID: mainnetCChainID, - AVAXAssetID: mainnetAvaxAssetID, - - Log: tests.NewDefaultLogger("mainnet-vm-reexecution"), - SharedMemory: atomicMemory.NewSharedMemory(mainnetCChainID), - BCLookup: ids.NewAliaser(), - Metrics: metricsGatherer, - - WarpSigner: warpSigner, - - ValidatorState: &validatorstest.State{ - GetSubnetIDF: func(_ context.Context, chainID ids.ID) (ids.ID, error) { - subnetID, ok := chainIDToSubnetID[chainID] - if ok { - return subnetID, nil - } - return ids.Empty, fmt.Errorf("unknown chainID: %s", chainID) - }, - }, - ChainDataDir: chainDataDir, - }, - prefixdb.New([]byte("vm"), vmAndSharedMemoryDB), - []byte(genesisConfig.CChainGenesis), - nil, - configBytes, - nil, - &enginetest.Sender{}, - ); err != nil { - return nil, fmt.Errorf("failed to initialize VM: %w", err) - } - - return vm, nil -} - -type blockResult struct { - BlockBytes []byte - Height uint64 - Err error -} - -type vmExecutorConfig struct { - Log logging.Logger - // Registry is the registry to register the metrics with. - Registry prometheus.Registerer - // ExecutionTimeout is the maximum timeout to continue executing blocks. - // If 0, no timeout is applied. If non-zero, the executor will exit early - // WITHOUT error after hitting the timeout. - // This is useful to provide consistent duration benchmarks. - ExecutionTimeout time.Duration - - // [StartBlock, EndBlock] defines the range (inclusive) of blocks to execute. - StartBlock, EndBlock uint64 -} - -type vmExecutor struct { - config vmExecutorConfig - vm block.ChainVM - metrics *consensusMetrics -} - -func newVMExecutor(vm block.ChainVM, config vmExecutorConfig) (*vmExecutor, error) { - metrics, err := newConsensusMetrics(config.Registry) - if err != nil { - return nil, fmt.Errorf("failed to create consensus metrics: %w", err) - } - - return &vmExecutor{ - vm: vm, - metrics: metrics, - config: config, - }, nil -} - -func (e *vmExecutor) execute(ctx context.Context, blockBytes []byte) error { - blk, err := e.vm.ParseBlock(ctx, blockBytes) - if err != nil { - return fmt.Errorf("failed to parse block: %w", err) - } - if err := blk.Verify(ctx); err != nil { - return fmt.Errorf("failed to verify block %s at height %d: %w", blk.ID(), blk.Height(), err) - } - - if err := blk.Accept(ctx); err != nil { - return fmt.Errorf("failed to accept block %s at height %d: %w", blk.ID(), blk.Height(), err) - } - e.metrics.lastAcceptedHeight.Set(float64(blk.Height())) - - return nil -} - -func (e *vmExecutor) executeSequence(ctx context.Context, blkChan <-chan blockResult) error { - blkID, err := e.vm.LastAccepted(ctx) - if err != nil { - return fmt.Errorf("failed to get last accepted block: %w", err) - } - blk, err := e.vm.GetBlock(ctx, blkID) - if err != nil { - return fmt.Errorf("failed to get last accepted block by blkID %s: %w", blkID, err) - } - - start := time.Now() - e.config.Log.Info("last accepted block", - zap.Stringer("blkID", blkID), - zap.Uint64("height", blk.Height()), - ) - - if e.config.ExecutionTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, e.config.ExecutionTimeout) - defer cancel() - } - - for blkResult := range blkChan { - if blkResult.Err != nil { - return blkResult.Err - } - - if blkResult.Height%1000 == 0 { - eta := timer.EstimateETA( - start, - blkResult.Height-e.config.StartBlock, - e.config.EndBlock-e.config.StartBlock, - ) - e.config.Log.Info("executing block", - zap.Uint64("height", blkResult.Height), - zap.Duration("eta", eta), - ) - } - if err := e.execute(ctx, blkResult.BlockBytes); err != nil { - return err - } - - if err := ctx.Err(); err != nil { - e.config.Log.Info("exiting early due to context timeout", - zap.Duration("elapsed", time.Since(start)), - zap.Duration("execution-timeout", e.config.ExecutionTimeout), - zap.Error(ctx.Err()), - ) - return nil - } - } - e.config.Log.Info("finished executing sequence") - - return nil -} - -func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, endBlock uint64, chanSize int) (<-chan blockResult, error) { - r := require.New(tb) - ch := make(chan blockResult, chanSize) - - db, err := leveldb.New(sourceDir, nil, logging.NoLog{}, prometheus.NewRegistry()) - if err != nil { - return nil, fmt.Errorf("failed to create leveldb database from %q: %w", sourceDir, err) - } - tb.Cleanup(func() { - r.NoError(db.Close()) - }) - - go func() { - defer close(ch) - - iter := db.NewIteratorWithStart(blockKey(startBlock)) - defer iter.Release() - - currentHeight := startBlock - - for iter.Next() { - key := iter.Key() - if len(key) != database.Uint64Size { - ch <- blockResult{ - BlockBytes: nil, - Err: fmt.Errorf("expected key length %d while looking for block at height %d, got %d", database.Uint64Size, currentHeight, len(key)), - } - return - } - height := binary.BigEndian.Uint64(key) - if height != currentHeight { - ch <- blockResult{ - BlockBytes: nil, - Err: fmt.Errorf("expected next height %d, got %d", currentHeight, height), - } - return - } - ch <- blockResult{ - BlockBytes: iter.Value(), - Height: height, - } - currentHeight++ - if currentHeight > endBlock { - break - } - } - if iter.Error() != nil { - ch <- blockResult{ - BlockBytes: nil, - Err: fmt.Errorf("failed to iterate over blocks at height %d: %w", currentHeight, iter.Error()), - } - return - } - }() - - return ch, nil -} - -func blockKey(height uint64) []byte { - return binary.BigEndian.AppendUint64(nil, height) -} - func TestExportBlockRange(t *testing.T) { - exportBlockRange(t, blockDirSrcArg, blockDirDstArg, startBlockArg, endBlockArg, chanSizeArg) -} - -func exportBlockRange(tb testing.TB, blockDirSrc string, blockDirDst string, startBlock, endBlock uint64, chanSize int) { - r := require.New(tb) - blockChan, err := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) - r.NoError(err) - - db, err := leveldb.New(blockDirDst, nil, logging.NoLog{}, prometheus.NewRegistry()) - r.NoError(err) - tb.Cleanup(func() { - r.NoError(db.Close()) - }) - - batch := db.NewBatch() - for blkResult := range blockChan { - r.NoError(batch.Put(blockKey(blkResult.Height), blkResult.BlockBytes)) - - if batch.Size() > 10*units.MiB { - r.NoError(batch.Write()) - batch = db.NewBatch() - } - } - - r.NoError(batch.Write()) -} - -type consensusMetrics struct { - lastAcceptedHeight prometheus.Gauge -} - -// newConsensusMetrics creates a subset of the metrics from snowman consensus -// [engine](../../snow/engine/snowman/metrics.go). -// -// The registry passed in is expected to be registered with the prefix -// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled -// by the[chain manager](../../../chains/manager.go). -func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { - m := &consensusMetrics{ - lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "last_accepted_height", - Help: "last height accepted", - }), - } - if err := registry.Register(m.lastAcceptedHeight); err != nil { - return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) - } - return m, nil -} - -// startServer starts a Prometheus server for the provided gatherer and returns -// the server address. -func startServer( - tb testing.TB, - log logging.Logger, - gatherer prometheus.Gatherer, - port uint64, -) string { - r := require.New(tb) - - server, err := tests.NewPrometheusServerWithPort(gatherer, port) - r.NoError(err) - - log.Info("metrics endpoint available", - zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), - ) - - tb.Cleanup(func() { - r.NoError(server.Stop()) - }) - - return server.Address() -} - -// startCollector starts a Prometheus collector configured to scrape the server -// listening on serverAddr. startCollector also attaches the provided labels + -// Github labels if available to the collected metrics. -func startCollector(tb testing.TB, log logging.Logger, name string, labels map[string]string, serverAddr string) { - r := require.New(tb) - - startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) - defer cancel() - - logger := tests.NewDefaultLogger("prometheus") - r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) - - var sdConfigFilePath string - tb.Cleanup(func() { - // Ensure a final metrics scrape. - // This default delay is set above the default scrape interval used by StartPrometheus. - time.Sleep(tmpnet.NetworkShutdownDelay) - - r.NoError(func() error { - if sdConfigFilePath != "" { - return os.Remove(sdConfigFilePath) - } - return nil - }(), - ) - - //nolint:usetesting // t.Context() is already canceled inside the cleanup function - checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) - defer cancel() - r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) - }) - - sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(name, tmpnet.SDConfig{ - Targets: []string{serverAddr}, - Labels: labels, - }, true /* withGitHubLabels */) - r.NoError(err) - - var ( - dashboardPath = "d/Gl1I20mnk/c-chain" - grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath - startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) - ) - - log.Info("metrics available via grafana", - zap.String( - "url", - tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), - ), - ) + reexecute.ExportBlockRange(t, blockDirSrcArg, blockDirDstArg, startBlockArg, endBlockArg, chanSizeArg) } // parseCustomLabels parses a comma-separated list of key-value pairs into a map diff --git a/tests/reexecute/export.go b/tests/reexecute/export.go new file mode 100644 index 000000000000..3050ba83556a --- /dev/null +++ b/tests/reexecute/export.go @@ -0,0 +1,41 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package reexecute + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/units" +) + +// ExportBlockRange copies blocks from a source LevelDB directory to a +// destination LevelDB directory for the specified block range [startBlock, endBlock]. +func ExportBlockRange(tb testing.TB, blockDirSrc string, blockDirDst string, startBlock, endBlock uint64, chanSize int) { + r := require.New(tb) + blockChan, err := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) + r.NoError(err) + + db, err := leveldb.New(blockDirDst, nil, logging.NoLog{}, prometheus.NewRegistry()) + r.NoError(err) + tb.Cleanup(func() { + r.NoError(db.Close()) + }) + + batch := db.NewBatch() + for blkResult := range blockChan { + r.NoError(batch.Put(blockKey(blkResult.height), blkResult.blockBytes)) + + if batch.Size() > 10*units.MiB { + r.NoError(batch.Write()) + batch = db.NewBatch() + } + } + + r.NoError(batch.Write()) +} diff --git a/tests/reexecute/metrics.go b/tests/reexecute/metrics.go new file mode 100644 index 000000000000..628bf811f41d --- /dev/null +++ b/tests/reexecute/metrics.go @@ -0,0 +1,127 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package reexecute + +import ( + "context" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/tests" + "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" + "github.com/ava-labs/avalanchego/utils/logging" +) + +// startServer starts a Prometheus server for the provided gatherer and returns +// the server address. +func startServer( + tb testing.TB, + log logging.Logger, + gatherer prometheus.Gatherer, + port uint64, +) string { + r := require.New(tb) + + server, err := tests.NewPrometheusServerWithPort(gatherer, port) + r.NoError(err) + + log.Info("metrics endpoint available", + zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), + ) + + tb.Cleanup(func() { + r.NoError(server.Stop()) + }) + + return server.Address() +} + +// startCollector starts a Prometheus collector configured to scrape the server +// listening on serverAddr. startCollector also attaches the provided labels + +// Github labels if available to the collected metrics. +func startCollector( + tb testing.TB, + log logging.Logger, + name string, + labels map[string]string, + serverAddr string, + networkUUID string, + dashboardPath string, +) { + r := require.New(tb) + + startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) + defer cancel() + + logger := tests.NewDefaultLogger("prometheus") + r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) + + var sdConfigFilePath string + tb.Cleanup(func() { + // Ensure a final metrics scrape. + // This default delay is set above the default scrape interval used by StartPrometheus. + time.Sleep(tmpnet.NetworkShutdownDelay) + + r.NoError(func() error { + if sdConfigFilePath != "" { + return os.Remove(sdConfigFilePath) + } + return nil + }(), + ) + + //nolint:usetesting // t.Context() is already canceled inside the cleanup function + checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) + defer cancel() + r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) + }) + + sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(name, tmpnet.SDConfig{ + Targets: []string{serverAddr}, + Labels: labels, + }, true /* withGitHubLabels */) + r.NoError(err) + + var ( + grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath + startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) + ) + + log.Info("metrics available via grafana", + zap.String( + "url", + tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), + ), + ) +} + +type consensusMetrics struct { + lastAcceptedHeight prometheus.Gauge +} + +// newConsensusMetrics creates a subset of the metrics from snowman consensus +// [engine](../../snow/engine/snowman/metrics.go). +// +// The registry passed in is expected to be registered with the prefix +// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled +// by the[chain manager](../../../chains/manager.go). +func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { + m := &consensusMetrics{ + lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "last_accepted_height", + Help: "last height accepted", + }), + } + if err := registry.Register(m.lastAcceptedHeight); err != nil { + return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) + } + return m, nil +} diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go new file mode 100644 index 000000000000..0691e6a7b0e9 --- /dev/null +++ b/tests/reexecute/vm_executor.go @@ -0,0 +1,195 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package reexecute + +import ( + "context" + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/timer" +) + +type vmExecutor struct { + log logging.Logger + vm block.ChainVM + metrics *consensusMetrics + executionTimeout time.Duration + startBlock uint64 + endBlock uint64 +} + +func newVMExecutor( + log logging.Logger, + vm block.ChainVM, + registry prometheus.Registerer, + executionTimeout time.Duration, + startBlock uint64, + endBlock uint64, +) (*vmExecutor, error) { + metrics, err := newConsensusMetrics(registry) + if err != nil { + return nil, fmt.Errorf("failed to create consensus metrics: %w", err) + } + + return &vmExecutor{ + log: log, + vm: vm, + metrics: metrics, + executionTimeout: executionTimeout, + startBlock: startBlock, + endBlock: endBlock, + }, nil +} + +func (e *vmExecutor) execute(ctx context.Context, blockBytes []byte) error { + blk, err := e.vm.ParseBlock(ctx, blockBytes) + if err != nil { + return fmt.Errorf("failed to parse block: %w", err) + } + if err := blk.Verify(ctx); err != nil { + return fmt.Errorf("failed to verify block %s at height %d: %w", blk.ID(), blk.Height(), err) + } + + if err := blk.Accept(ctx); err != nil { + return fmt.Errorf("failed to accept block %s at height %d: %w", blk.ID(), blk.Height(), err) + } + e.metrics.lastAcceptedHeight.Set(float64(blk.Height())) + + return nil +} + +func (e *vmExecutor) executeSequence(ctx context.Context, blkChan <-chan blockResult) error { + blkID, err := e.vm.LastAccepted(ctx) + if err != nil { + return fmt.Errorf("failed to get last accepted block: %w", err) + } + blk, err := e.vm.GetBlock(ctx, blkID) + if err != nil { + return fmt.Errorf("failed to get last accepted block by blkID %s: %w", blkID, err) + } + + start := time.Now() + e.log.Info("last accepted block", + zap.Stringer("blkID", blkID), + zap.Uint64("height", blk.Height()), + ) + + if e.executionTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, e.executionTimeout) + defer cancel() + } + + for blkResult := range blkChan { + if blkResult.err != nil { + return blkResult.err + } + + if blkResult.height%1000 == 0 { + eta := timer.EstimateETA( + start, + blkResult.height-e.startBlock, + e.endBlock-e.startBlock, + ) + e.log.Info("executing block", + zap.Uint64("height", blkResult.height), + zap.Duration("eta", eta), + ) + } + if err := e.execute(ctx, blkResult.blockBytes); err != nil { + return err + } + + if err := ctx.Err(); err != nil { + e.log.Info("exiting early due to context timeout", + zap.Duration("elapsed", time.Since(start)), + zap.Duration("execution-timeout", e.executionTimeout), + zap.Error(ctx.Err()), + ) + return nil + } + } + e.log.Info("finished executing sequence") + + return nil +} + +type blockResult struct { + blockBytes []byte + height uint64 + err error +} + +func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, endBlock uint64, chanSize int) (<-chan blockResult, error) { + r := require.New(tb) + ch := make(chan blockResult, chanSize) + + db, err := leveldb.New(sourceDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + if err != nil { + return nil, fmt.Errorf("failed to create leveldb database from %q: %w", sourceDir, err) + } + tb.Cleanup(func() { + r.NoError(db.Close()) + }) + + go func() { + defer close(ch) + + iter := db.NewIteratorWithStart(blockKey(startBlock)) + defer iter.Release() + + currentHeight := startBlock + + for iter.Next() { + key := iter.Key() + if len(key) != database.Uint64Size { + ch <- blockResult{ + blockBytes: nil, + err: fmt.Errorf("expected key length %d while looking for block at height %d, got %d", database.Uint64Size, currentHeight, len(key)), + } + return + } + height := binary.BigEndian.Uint64(key) + if height != currentHeight { + ch <- blockResult{ + blockBytes: nil, + err: fmt.Errorf("expected next height %d, got %d", currentHeight, height), + } + return + } + ch <- blockResult{ + blockBytes: iter.Value(), + height: height, + } + currentHeight++ + if currentHeight > endBlock { + break + } + } + if iter.Error() != nil { + ch <- blockResult{ + blockBytes: nil, + err: fmt.Errorf("failed to iterate over blocks at height %d: %w", currentHeight, iter.Error()), + } + return + } + }() + + return ch, nil +} + +func blockKey(height uint64) []byte { + return binary.BigEndian.AppendUint64(nil, height) +} From 11db32d573c04f9cb2f136663b6baaf6ed5f402a Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Fri, 31 Oct 2025 14:51:30 -0400 Subject: [PATCH 02/15] chore: add VMParams --- tests/reexecute/benchmark_executor.go | 36 ++++++++++++++------------ tests/reexecute/c/vm_reexecute_test.go | 13 ++++++---- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/tests/reexecute/benchmark_executor.go b/tests/reexecute/benchmark_executor.go index 5635768a3191..878e8f14c276 100644 --- a/tests/reexecute/benchmark_executor.go +++ b/tests/reexecute/benchmark_executor.go @@ -135,6 +135,14 @@ func (e BenchmarkExecutor) Run(b testing.TB, log logging.Logger, vm block.ChainV r.NoError(vmExecutor.executeSequence(ctx, blockChan)) } +type VMParams struct { + GenesisBytes []byte + UpgradeBytes []byte + ConfigBytes []byte + SubnetID ids.ID + ChainID ids.ID +} + // NewMainnetVM creates and initializes a VM configured for mainnet block // reexecution tests. The VM is initialized with mainnet-specific settings // including the mainnet network ID, upgrade schedule, and chain configurations. @@ -144,12 +152,8 @@ func NewMainnetVM( factory vms.Factory, db database.Database, chainDataDir string, - genesisBytes []byte, - upgradeBytes []byte, - configBytes []byte, - subnetID ids.ID, - chainID ids.ID, metricsGatherer metrics.MultiGatherer, + vmParams VMParams, ) (block.ChainVM, error) { vmIntf, err := factory.New(logging.NoLog{}) if err != nil { @@ -163,24 +167,24 @@ func NewMainnetVM( } blsPublicKey := blsKey.PublicKey() - warpSigner := warp.NewSigner(blsKey, constants.MainnetID, chainID) + warpSigner := warp.NewSigner(blsKey, constants.MainnetID, vmParams.ChainID) sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), db) atomicMemory := atomic.NewMemory(sharedMemoryDB) chainIDToSubnetID := map[ids.ID]ids.ID{ - mainnetXChainID: constants.PrimaryNetworkID, - MainnetCChainID: constants.PrimaryNetworkID, - chainID: subnetID, - ids.Empty: constants.PrimaryNetworkID, + mainnetXChainID: constants.PrimaryNetworkID, + MainnetCChainID: constants.PrimaryNetworkID, + vmParams.ChainID: vmParams.SubnetID, + ids.Empty: constants.PrimaryNetworkID, } if err := vm.Initialize( ctx, &snow.Context{ NetworkID: constants.MainnetID, - SubnetID: subnetID, - ChainID: chainID, + SubnetID: vmParams.SubnetID, + ChainID: vmParams.ChainID, NodeID: ids.GenerateTestNodeID(), PublicKey: blsPublicKey, NetworkUpgrades: upgrade.Mainnet, @@ -190,7 +194,7 @@ func NewMainnetVM( AVAXAssetID: mainnetAvaxAssetID, Log: tests.NewDefaultLogger("mainnet-vm-reexecution"), - SharedMemory: atomicMemory.NewSharedMemory(chainID), + SharedMemory: atomicMemory.NewSharedMemory(vmParams.ChainID), BCLookup: ids.NewAliaser(), Metrics: metricsGatherer, @@ -208,9 +212,9 @@ func NewMainnetVM( ChainDataDir: chainDataDir, }, prefixdb.New([]byte("vm"), db), - genesisBytes, - upgradeBytes, - configBytes, + vmParams.GenesisBytes, + vmParams.UpgradeBytes, + vmParams.ConfigBytes, nil, &enginetest.Sender{}, ); err != nil { diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 64b5e9a53f55..6467deb57416 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -185,17 +185,20 @@ func benchmarkReexecuteRange( r.NoError(db.Close()) }() + vmParams := reexecute.VMParams{ + GenesisBytes: []byte(genesisConfig.CChainGenesis), + ConfigBytes: configBytes, + SubnetID: constants.PrimaryNetworkID, + ChainID: reexecute.MainnetCChainID, + } + vm, err := reexecute.NewMainnetVM( ctx, &factory.Factory{}, db, chainDataDir, - []byte(genesisConfig.CChainGenesis), - nil, - configBytes, - constants.PrimaryNetworkID, - reexecute.MainnetCChainID, vmMultiGatherer, + vmParams, ) r.NoError(err) defer func() { From 409cf6106b6cc5f161a4bb1792aad9f8d05a8f4f Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Fri, 31 Oct 2025 14:54:15 -0400 Subject: [PATCH 03/15] refactor: nuke metrics.go --- tests/reexecute/benchmark_executor.go | 110 ++++++++++++++++++++++ tests/reexecute/metrics.go | 127 -------------------------- 2 files changed, 110 insertions(+), 127 deletions(-) delete mode 100644 tests/reexecute/metrics.go diff --git a/tests/reexecute/benchmark_executor.go b/tests/reexecute/benchmark_executor.go index 878e8f14c276..dab795025581 100644 --- a/tests/reexecute/benchmark_executor.go +++ b/tests/reexecute/benchmark_executor.go @@ -6,11 +6,14 @@ package reexecute import ( "context" "fmt" + "os" + "strconv" "testing" "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/chains/atomic" @@ -22,6 +25,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/validators/validatorstest" "github.com/ava-labs/avalanchego/tests" + "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" "github.com/ava-labs/avalanchego/upgrade" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" @@ -223,3 +227,109 @@ func NewMainnetVM( return vm, nil } + +// startServer starts a Prometheus server for the provided gatherer and returns +// the server address. +func startServer( + tb testing.TB, + log logging.Logger, + gatherer prometheus.Gatherer, + port uint64, +) string { + r := require.New(tb) + + server, err := tests.NewPrometheusServerWithPort(gatherer, port) + r.NoError(err) + + log.Info("metrics endpoint available", + zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), + ) + + tb.Cleanup(func() { + r.NoError(server.Stop()) + }) + + return server.Address() +} + +// startCollector starts a Prometheus collector configured to scrape the server +// listening on serverAddr. startCollector also attaches the provided labels + +// Github labels if available to the collected metrics. +func startCollector( + tb testing.TB, + log logging.Logger, + name string, + labels map[string]string, + serverAddr string, + networkUUID string, + dashboardPath string, +) { + r := require.New(tb) + + startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) + defer cancel() + + logger := tests.NewDefaultLogger("prometheus") + r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) + + var sdConfigFilePath string + tb.Cleanup(func() { + // Ensure a final metrics scrape. + // This default delay is set above the default scrape interval used by StartPrometheus. + time.Sleep(tmpnet.NetworkShutdownDelay) + + r.NoError(func() error { + if sdConfigFilePath != "" { + return os.Remove(sdConfigFilePath) + } + return nil + }(), + ) + + //nolint:usetesting // t.Context() is already canceled inside the cleanup function + checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) + defer cancel() + r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) + }) + + sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(name, tmpnet.SDConfig{ + Targets: []string{serverAddr}, + Labels: labels, + }, true /* withGitHubLabels */) + r.NoError(err) + + var ( + grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath + startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) + ) + + log.Info("metrics available via grafana", + zap.String( + "url", + tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), + ), + ) +} + +type consensusMetrics struct { + lastAcceptedHeight prometheus.Gauge +} + +// newConsensusMetrics creates a subset of the metrics from snowman consensus +// [engine](../../snow/engine/snowman/metrics.go). +// +// The registry passed in is expected to be registered with the prefix +// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled +// by the[chain manager](../../../chains/manager.go). +func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { + m := &consensusMetrics{ + lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "last_accepted_height", + Help: "last height accepted", + }), + } + if err := registry.Register(m.lastAcceptedHeight); err != nil { + return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) + } + return m, nil +} diff --git a/tests/reexecute/metrics.go b/tests/reexecute/metrics.go deleted file mode 100644 index 628bf811f41d..000000000000 --- a/tests/reexecute/metrics.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package reexecute - -import ( - "context" - "fmt" - "os" - "strconv" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/ava-labs/avalanchego/tests" - "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" - "github.com/ava-labs/avalanchego/utils/logging" -) - -// startServer starts a Prometheus server for the provided gatherer and returns -// the server address. -func startServer( - tb testing.TB, - log logging.Logger, - gatherer prometheus.Gatherer, - port uint64, -) string { - r := require.New(tb) - - server, err := tests.NewPrometheusServerWithPort(gatherer, port) - r.NoError(err) - - log.Info("metrics endpoint available", - zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), - ) - - tb.Cleanup(func() { - r.NoError(server.Stop()) - }) - - return server.Address() -} - -// startCollector starts a Prometheus collector configured to scrape the server -// listening on serverAddr. startCollector also attaches the provided labels + -// Github labels if available to the collected metrics. -func startCollector( - tb testing.TB, - log logging.Logger, - name string, - labels map[string]string, - serverAddr string, - networkUUID string, - dashboardPath string, -) { - r := require.New(tb) - - startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) - defer cancel() - - logger := tests.NewDefaultLogger("prometheus") - r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) - - var sdConfigFilePath string - tb.Cleanup(func() { - // Ensure a final metrics scrape. - // This default delay is set above the default scrape interval used by StartPrometheus. - time.Sleep(tmpnet.NetworkShutdownDelay) - - r.NoError(func() error { - if sdConfigFilePath != "" { - return os.Remove(sdConfigFilePath) - } - return nil - }(), - ) - - //nolint:usetesting // t.Context() is already canceled inside the cleanup function - checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) - defer cancel() - r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) - }) - - sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(name, tmpnet.SDConfig{ - Targets: []string{serverAddr}, - Labels: labels, - }, true /* withGitHubLabels */) - r.NoError(err) - - var ( - grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath - startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) - ) - - log.Info("metrics available via grafana", - zap.String( - "url", - tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), - ), - ) -} - -type consensusMetrics struct { - lastAcceptedHeight prometheus.Gauge -} - -// newConsensusMetrics creates a subset of the metrics from snowman consensus -// [engine](../../snow/engine/snowman/metrics.go). -// -// The registry passed in is expected to be registered with the prefix -// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled -// by the[chain manager](../../../chains/manager.go). -func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { - m := &consensusMetrics{ - lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "last_accepted_height", - Help: "last height accepted", - }), - } - if err := registry.Register(m.lastAcceptedHeight); err != nil { - return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) - } - return m, nil -} From 7d17718052de75cb6b415f50e55de05dedcf0619 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Fri, 31 Oct 2025 14:57:52 -0400 Subject: [PATCH 04/15] chore: reduce diff --- tests/reexecute/c/vm_reexecute_test.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 6467deb57416..91f30357440c 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -169,15 +169,24 @@ func benchmarkReexecuteRange( consensusRegistry := prometheus.NewRegistry() r.NoError(prefixGatherer.Register("avalanche_snowman", consensusRegistry)) - genesisConfig := genesis.GetConfig(constants.MainnetID) - log := tests.NewDefaultLogger("c-chain-reexecution") - dbLogger := tests.NewDefaultLogger("db") var ( vmDBDir = filepath.Join(currentStateDir, "db") chainDataDir = filepath.Join(currentStateDir, "chain-data-dir") ) + + log.Info("re-executing block range with params", + zap.String("block-dir", blockDir), + zap.String("vm-db-dir", vmDBDir), + zap.String("chain-data-dir", chainDataDir), + zap.Uint64("start-block", startBlock), + zap.Uint64("end-block", endBlock), + zap.Int("chan-size", chanSize), + ) + + dbLogger := tests.NewDefaultLogger("db") + db, err := leveldb.New(vmDBDir, nil, dbLogger, prometheus.NewRegistry()) r.NoError(err) defer func() { @@ -185,6 +194,7 @@ func benchmarkReexecuteRange( r.NoError(db.Close()) }() + genesisConfig := genesis.GetConfig(constants.MainnetID) vmParams := reexecute.VMParams{ GenesisBytes: []byte(genesisConfig.CChainGenesis), ConfigBytes: configBytes, @@ -223,15 +233,6 @@ func benchmarkReexecuteRange( DashboardPath: "d/Gl1I20mnk/c-chain", } - log.Info("re-executing block range with params", - zap.String("block-dir", blockDir), - zap.String("vm-db-dir", vmDBDir), - zap.String("chain-data-dir", chainDataDir), - zap.Uint64("start-block", startBlock), - zap.Uint64("end-block", endBlock), - zap.Int("chan-size", chanSize), - ) - executor := reexecute.NewBenchmarkExecutor(config) start := time.Now() From 7dc426a9cdbb8bed0d2d327fd878b6c5e6254ca2 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Fri, 31 Oct 2025 15:00:36 -0400 Subject: [PATCH 05/15] chore: reduce diff again --- tests/reexecute/c/vm_reexecute_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 91f30357440c..adc8a0039257 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -232,7 +232,6 @@ func benchmarkReexecuteRange( NetworkUUID: networkUUID, DashboardPath: "d/Gl1I20mnk/c-chain", } - executor := reexecute.NewBenchmarkExecutor(config) start := time.Now() From 7d7c12a55bacf82af2be59106b116aa67d946c2b Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Fri, 31 Oct 2025 17:51:27 -0400 Subject: [PATCH 06/15] chore: delegate consensus metrics to BenchmarkExecutor --- tests/reexecute/benchmark_executor.go | 8 ++++---- tests/reexecute/c/vm_reexecute_test.go | 6 ------ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/reexecute/benchmark_executor.go b/tests/reexecute/benchmark_executor.go index dab795025581..3e2efe43e479 100644 --- a/tests/reexecute/benchmark_executor.go +++ b/tests/reexecute/benchmark_executor.go @@ -59,9 +59,6 @@ type BenchmarkExecutorConfig struct { // from (e.g. VM metrics, consensus metrics). If MetricsServerEnabled is // true, then the metrics server will export metrics from this gatherer. PrefixGatherer metrics.MultiGatherer - // ConsensusRegistry is the registry where a subset of the metrics from snowman consensus - // [engine](../../snow/engine/snowman/metrics.go) will be registered. - ConsensusRegistry prometheus.Registerer // MetricsServerEnabled determines whether to enable a Prometheus server // exporting VM metrics. @@ -126,10 +123,13 @@ func (e BenchmarkExecutor) Run(b testing.TB, log logging.Logger, vm block.ChainV ) r.NoError(err) + consensusRegistry := prometheus.NewRegistry() + r.NoError(e.config.PrefixGatherer.Register("avalanche_snowman", consensusRegistry)) + vmExecutor, err := newVMExecutor( tests.NewDefaultLogger("vm-executor"), vm, - e.config.ConsensusRegistry, + consensusRegistry, e.config.ExecutionTimeout, e.config.StartBlock, e.config.EndBlock, diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index adc8a0039257..e30649fe99d5 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -164,11 +164,6 @@ func benchmarkReexecuteRange( vmMultiGatherer := metrics.NewPrefixGatherer() r.NoError(prefixGatherer.Register("avalanche_evm", vmMultiGatherer)) - // consensusRegistry includes the chain="C" label and the prefix "avalanche_snowman". - // The consensus registry is passed to the executor to mimic a subset of consensus metrics. - consensusRegistry := prometheus.NewRegistry() - r.NoError(prefixGatherer.Register("avalanche_snowman", consensusRegistry)) - log := tests.NewDefaultLogger("c-chain-reexecution") var ( @@ -223,7 +218,6 @@ func benchmarkReexecuteRange( ChanSize: chanSize, ExecutionTimeout: executionTimeout, PrefixGatherer: prefixGatherer, - ConsensusRegistry: consensusRegistry, MetricsServerEnabled: metricsServerEnabled, MetricsPort: metricsPort, MetricsCollectorEnabled: metricsCollectorEnabled, From ab9674a9ad08d4fb4ae568165c80566622b9ba21 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Sun, 2 Nov 2025 12:22:54 -0500 Subject: [PATCH 07/15] chore: remove metrics from BenchmarkExecutor --- tests/reexecute/benchmark_executor.go | 57 +++++--------------------- tests/reexecute/c/vm_reexecute_test.go | 38 ++++++++++------- 2 files changed, 34 insertions(+), 61 deletions(-) diff --git a/tests/reexecute/benchmark_executor.go b/tests/reexecute/benchmark_executor.go index 3e2efe43e479..1abbf7e43599 100644 --- a/tests/reexecute/benchmark_executor.go +++ b/tests/reexecute/benchmark_executor.go @@ -55,30 +55,9 @@ type BenchmarkExecutorConfig struct { // termininating (without error). If 0, no timeout is applied. ExecutionTimeout time.Duration - // PrefixGatherer is the top-most gatherer where all metrics can be derived - // from (e.g. VM metrics, consensus metrics). If MetricsServerEnabled is - // true, then the metrics server will export metrics from this gatherer. + // PrefixGatherer is the gatherer where consensus metrics from the benchmark + // test will be attached to. PrefixGatherer metrics.MultiGatherer - - // MetricsServerEnabled determines whether to enable a Prometheus server - // exporting VM metrics. - MetricsServerEnabled bool - // MetricsPort is the port where the metrics server will listen to. - MetricsPort uint64 - // MetricsCollectorEnabled determines whether to start a Prometheus - // collector. - MetricsCollectorEnabled bool - // MetricsLabels represents the set of labels that will be attached to all - // exported metrics. - MetricsLabels map[string]string - // SDConfigName is the name of the SDConfig file. - SDConfigName string - // NetworkUUID is the unique identifier corresponding to the benchmark. - NetworkUUID string - // DashboardPath is the relative Grafana dashboard path used to construct - // the metrics visualization URL when MetricsCollectorEnabled is true. - // Expected format: "d/dashboard-id/dashboard-name" (e.g., "d/Gl1I20mnk/c-chain"). - DashboardPath string } // BenchmarkExecutor is a tool for executing a sequence of blocks against a VM. @@ -94,26 +73,10 @@ func NewBenchmarkExecutor(config BenchmarkExecutorConfig) BenchmarkExecutor { // provided VM. It also manages metrics collection and reporting by optionally // starting a Prometheus server and collector based on the executor's // configuration. -func (e BenchmarkExecutor) Run(b testing.TB, log logging.Logger, vm block.ChainVM) { +func (e BenchmarkExecutor) Run(b testing.TB, vm block.ChainVM) { r := require.New(b) ctx := b.Context() - if e.config.MetricsServerEnabled { - serverAddr := startServer(b, log, e.config.PrefixGatherer, e.config.MetricsPort) - - if e.config.MetricsCollectorEnabled { - startCollector( - b, - log, - e.config.SDConfigName, - e.config.MetricsLabels, - serverAddr, - e.config.NetworkUUID, - e.config.DashboardPath, - ) - } - } - blockChan, err := createBlockChanFromLevelDB( b, e.config.BlockDir, @@ -228,9 +191,9 @@ func NewMainnetVM( return vm, nil } -// startServer starts a Prometheus server for the provided gatherer and returns +// StartServer starts a Prometheus server for the provided gatherer and returns // the server address. -func startServer( +func StartServer( tb testing.TB, log logging.Logger, gatherer prometheus.Gatherer, @@ -252,13 +215,13 @@ func startServer( return server.Address() } -// startCollector starts a Prometheus collector configured to scrape the server -// listening on serverAddr. startCollector also attaches the provided labels + +// StartCollector starts a Prometheus collector configured to scrape the server +// listening on serverAddr. StartCollector also attaches the provided labels + // Github labels if available to the collected metrics. -func startCollector( +func StartCollector( tb testing.TB, log logging.Logger, - name string, + sdConfigName string, labels map[string]string, serverAddr string, networkUUID string, @@ -292,7 +255,7 @@ func startCollector( r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) }) - sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(name, tmpnet.SDConfig{ + sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(sdConfigName, tmpnet.SDConfig{ Targets: []string{serverAddr}, Labels: labels, }, true /* withGitHubLabels */) diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index e30649fe99d5..6d3ec1c354ad 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -166,6 +166,23 @@ func benchmarkReexecuteRange( log := tests.NewDefaultLogger("c-chain-reexecution") + if metricsServerEnabled { + serverAddr := reexecute.StartServer(b, log, prefixGatherer, metricsPort) + + if metricsCollectorEnabled { + dashboardPath := "d/Gl1I20mnk/c-chain" + reexecute.StartCollector( + b, + log, + "c-chain-reexecution", + labels, + serverAddr, + networkUUID, + dashboardPath, + ) + } + } + var ( vmDBDir = filepath.Join(currentStateDir, "db") chainDataDir = filepath.Join(currentStateDir, "chain-data-dir") @@ -212,24 +229,17 @@ func benchmarkReexecuteRange( }() config := reexecute.BenchmarkExecutorConfig{ - BlockDir: blockDir, - StartBlock: startBlock, - EndBlock: endBlock, - ChanSize: chanSize, - ExecutionTimeout: executionTimeout, - PrefixGatherer: prefixGatherer, - MetricsServerEnabled: metricsServerEnabled, - MetricsPort: metricsPort, - MetricsCollectorEnabled: metricsCollectorEnabled, - MetricsLabels: labels, - SDConfigName: "c-chain-reexecution", - NetworkUUID: networkUUID, - DashboardPath: "d/Gl1I20mnk/c-chain", + BlockDir: blockDir, + StartBlock: startBlock, + EndBlock: endBlock, + ChanSize: chanSize, + ExecutionTimeout: executionTimeout, + PrefixGatherer: prefixGatherer, } executor := reexecute.NewBenchmarkExecutor(config) start := time.Now() - executor.Run(b, log, vm) + executor.Run(b, vm) elapsed := time.Since(start) b.ReportMetric(0, "ns/op") // Set default ns/op to 0 to hide from the output From 2fde997a4e220cc67cd0aa18e9753ffca1fb05bc Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Sun, 2 Nov 2025 13:04:33 -0500 Subject: [PATCH 08/15] chore: unify executors --- tests/reexecute/benchmark_executor.go | 298 ------------------------- tests/reexecute/c/vm_reexecute_test.go | 21 +- tests/reexecute/vm_executor.go | 266 ++++++++++++++++++++-- 3 files changed, 261 insertions(+), 324 deletions(-) delete mode 100644 tests/reexecute/benchmark_executor.go diff --git a/tests/reexecute/benchmark_executor.go b/tests/reexecute/benchmark_executor.go deleted file mode 100644 index 1abbf7e43599..000000000000 --- a/tests/reexecute/benchmark_executor.go +++ /dev/null @@ -1,298 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package reexecute - -import ( - "context" - "fmt" - "os" - "strconv" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/ava-labs/avalanchego/api/metrics" - "github.com/ava-labs/avalanchego/chains/atomic" - "github.com/ava-labs/avalanchego/database" - "github.com/ava-labs/avalanchego/database/prefixdb" - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" - "github.com/ava-labs/avalanchego/snow/engine/enginetest" - "github.com/ava-labs/avalanchego/snow/engine/snowman/block" - "github.com/ava-labs/avalanchego/snow/validators/validatorstest" - "github.com/ava-labs/avalanchego/tests" - "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" - "github.com/ava-labs/avalanchego/upgrade" - "github.com/ava-labs/avalanchego/utils/constants" - "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/vms" - "github.com/ava-labs/avalanchego/vms/platformvm/warp" -) - -var ( - MainnetCChainID = ids.FromStringOrPanic("2q9e4r6Mu3U68nU1fYjgbR6JvwrRx36CohpAX5UQxse55x1Q5") - - mainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") - mainnetAvaxAssetID = ids.FromStringOrPanic("FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z") -) - -type BenchmarkExecutorConfig struct { - // The directory where blocks will be sourced from. - BlockDir string - - // StartBlock is the height of the first block that will be executed. - StartBlock uint64 - // EndBlock is the height of the last block that will be executed. - EndBlock uint64 - // ChanSize is the size of the channel to use for block processing. - ChanSize int - // ExecutionTimeout is the duration to run the reexecution test for before - // termininating (without error). If 0, no timeout is applied. - ExecutionTimeout time.Duration - - // PrefixGatherer is the gatherer where consensus metrics from the benchmark - // test will be attached to. - PrefixGatherer metrics.MultiGatherer -} - -// BenchmarkExecutor is a tool for executing a sequence of blocks against a VM. -type BenchmarkExecutor struct { - config BenchmarkExecutorConfig -} - -func NewBenchmarkExecutor(config BenchmarkExecutorConfig) BenchmarkExecutor { - return BenchmarkExecutor{config: config} -} - -// Run executes a sequence of blocks from StartBlock to EndBlock against the -// provided VM. It also manages metrics collection and reporting by optionally -// starting a Prometheus server and collector based on the executor's -// configuration. -func (e BenchmarkExecutor) Run(b testing.TB, vm block.ChainVM) { - r := require.New(b) - ctx := b.Context() - - blockChan, err := createBlockChanFromLevelDB( - b, - e.config.BlockDir, - e.config.StartBlock, - e.config.EndBlock, - e.config.ChanSize, - ) - r.NoError(err) - - consensusRegistry := prometheus.NewRegistry() - r.NoError(e.config.PrefixGatherer.Register("avalanche_snowman", consensusRegistry)) - - vmExecutor, err := newVMExecutor( - tests.NewDefaultLogger("vm-executor"), - vm, - consensusRegistry, - e.config.ExecutionTimeout, - e.config.StartBlock, - e.config.EndBlock, - ) - r.NoError(err) - - r.NoError(vmExecutor.executeSequence(ctx, blockChan)) -} - -type VMParams struct { - GenesisBytes []byte - UpgradeBytes []byte - ConfigBytes []byte - SubnetID ids.ID - ChainID ids.ID -} - -// NewMainnetVM creates and initializes a VM configured for mainnet block -// reexecution tests. The VM is initialized with mainnet-specific settings -// including the mainnet network ID, upgrade schedule, and chain configurations. -// Both subnetID and chainID must correspond to subnets/chains that exist on mainnet. -func NewMainnetVM( - ctx context.Context, - factory vms.Factory, - db database.Database, - chainDataDir string, - metricsGatherer metrics.MultiGatherer, - vmParams VMParams, -) (block.ChainVM, error) { - vmIntf, err := factory.New(logging.NoLog{}) - if err != nil { - return nil, fmt.Errorf("failed to create VM from factory: %w", err) - } - vm := vmIntf.(block.ChainVM) - - blsKey, err := localsigner.New() - if err != nil { - return nil, fmt.Errorf("failed to create BLS key: %w", err) - } - - blsPublicKey := blsKey.PublicKey() - warpSigner := warp.NewSigner(blsKey, constants.MainnetID, vmParams.ChainID) - - sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), db) - atomicMemory := atomic.NewMemory(sharedMemoryDB) - - chainIDToSubnetID := map[ids.ID]ids.ID{ - mainnetXChainID: constants.PrimaryNetworkID, - MainnetCChainID: constants.PrimaryNetworkID, - vmParams.ChainID: vmParams.SubnetID, - ids.Empty: constants.PrimaryNetworkID, - } - - if err := vm.Initialize( - ctx, - &snow.Context{ - NetworkID: constants.MainnetID, - SubnetID: vmParams.SubnetID, - ChainID: vmParams.ChainID, - NodeID: ids.GenerateTestNodeID(), - PublicKey: blsPublicKey, - NetworkUpgrades: upgrade.Mainnet, - - XChainID: mainnetXChainID, - CChainID: MainnetCChainID, - AVAXAssetID: mainnetAvaxAssetID, - - Log: tests.NewDefaultLogger("mainnet-vm-reexecution"), - SharedMemory: atomicMemory.NewSharedMemory(vmParams.ChainID), - BCLookup: ids.NewAliaser(), - Metrics: metricsGatherer, - - WarpSigner: warpSigner, - - ValidatorState: &validatorstest.State{ - GetSubnetIDF: func(_ context.Context, chainID ids.ID) (ids.ID, error) { - subnetID, ok := chainIDToSubnetID[chainID] - if ok { - return subnetID, nil - } - return ids.Empty, fmt.Errorf("unknown chainID: %s", chainID) - }, - }, - ChainDataDir: chainDataDir, - }, - prefixdb.New([]byte("vm"), db), - vmParams.GenesisBytes, - vmParams.UpgradeBytes, - vmParams.ConfigBytes, - nil, - &enginetest.Sender{}, - ); err != nil { - return nil, fmt.Errorf("failed to initialize VM: %w", err) - } - - return vm, nil -} - -// StartServer starts a Prometheus server for the provided gatherer and returns -// the server address. -func StartServer( - tb testing.TB, - log logging.Logger, - gatherer prometheus.Gatherer, - port uint64, -) string { - r := require.New(tb) - - server, err := tests.NewPrometheusServerWithPort(gatherer, port) - r.NoError(err) - - log.Info("metrics endpoint available", - zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), - ) - - tb.Cleanup(func() { - r.NoError(server.Stop()) - }) - - return server.Address() -} - -// StartCollector starts a Prometheus collector configured to scrape the server -// listening on serverAddr. StartCollector also attaches the provided labels + -// Github labels if available to the collected metrics. -func StartCollector( - tb testing.TB, - log logging.Logger, - sdConfigName string, - labels map[string]string, - serverAddr string, - networkUUID string, - dashboardPath string, -) { - r := require.New(tb) - - startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) - defer cancel() - - logger := tests.NewDefaultLogger("prometheus") - r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) - - var sdConfigFilePath string - tb.Cleanup(func() { - // Ensure a final metrics scrape. - // This default delay is set above the default scrape interval used by StartPrometheus. - time.Sleep(tmpnet.NetworkShutdownDelay) - - r.NoError(func() error { - if sdConfigFilePath != "" { - return os.Remove(sdConfigFilePath) - } - return nil - }(), - ) - - //nolint:usetesting // t.Context() is already canceled inside the cleanup function - checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) - defer cancel() - r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) - }) - - sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(sdConfigName, tmpnet.SDConfig{ - Targets: []string{serverAddr}, - Labels: labels, - }, true /* withGitHubLabels */) - r.NoError(err) - - var ( - grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath - startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) - ) - - log.Info("metrics available via grafana", - zap.String( - "url", - tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), - ), - ) -} - -type consensusMetrics struct { - lastAcceptedHeight prometheus.Gauge -} - -// newConsensusMetrics creates a subset of the metrics from snowman consensus -// [engine](../../snow/engine/snowman/metrics.go). -// -// The registry passed in is expected to be registered with the prefix -// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled -// by the[chain manager](../../../chains/manager.go). -func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { - m := &consensusMetrics{ - lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "last_accepted_height", - Help: "last height accepted", - }), - } - if err := registry.Register(m.lastAcceptedHeight); err != nil { - return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) - } - return m, nil -} diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 6d3ec1c354ad..44639fe02ba8 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -228,18 +228,19 @@ func benchmarkReexecuteRange( r.NoError(vm.Shutdown(ctx)) }() - config := reexecute.BenchmarkExecutorConfig{ - BlockDir: blockDir, - StartBlock: startBlock, - EndBlock: endBlock, - ChanSize: chanSize, - ExecutionTimeout: executionTimeout, - PrefixGatherer: prefixGatherer, - } - executor := reexecute.NewBenchmarkExecutor(config) + executor := reexecute.NewVMExecutor( + b, + vm, + blockDir, + startBlock, + endBlock, + chanSize, + executionTimeout, + prefixGatherer, + ) start := time.Now() - executor.Run(b, vm) + r.NoError(executor.Run(ctx)) elapsed := time.Since(start) b.ReportMetric(0, "ns/op") // Set default ns/op to 0 to hide from the output diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index 1f609bd6c01e..6c3c6c40d7a9 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -7,6 +7,8 @@ import ( "context" "encoding/binary" "fmt" + "os" + "strconv" "testing" "time" @@ -14,51 +16,88 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/chains/atomic" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/validators/validatorstest" + "github.com/ava-labs/avalanchego/tests" + "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" + "github.com/ava-labs/avalanchego/upgrade" + "github.com/ava-labs/avalanchego/utils/constants" + "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/timer" + "github.com/ava-labs/avalanchego/vms" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" ) -type vmExecutor struct { +var ( + MainnetCChainID = ids.FromStringOrPanic("2q9e4r6Mu3U68nU1fYjgbR6JvwrRx36CohpAX5UQxse55x1Q5") + + mainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") + mainnetAvaxAssetID = ids.FromStringOrPanic("FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z") +) + +type VMExecutor struct { log logging.Logger vm block.ChainVM metrics *consensusMetrics executionTimeout time.Duration startBlock uint64 endBlock uint64 + blkChan <-chan blockResult etaTracker *timer.EtaTracker } -func newVMExecutor( - log logging.Logger, +func NewVMExecutor( + tb testing.TB, vm block.ChainVM, - registry prometheus.Registerer, - executionTimeout time.Duration, + blockDir string, startBlock uint64, endBlock uint64, -) (*vmExecutor, error) { - metrics, err := newConsensusMetrics(registry) - if err != nil { - return nil, fmt.Errorf("failed to create consensus metrics: %w", err) - } + chanSize int, + executionTimeout time.Duration, + prefixGatherer metrics.MultiGatherer, +) *VMExecutor { + r := require.New(tb) - return &vmExecutor{ - log: log, + blockChan, err := createBlockChanFromLevelDB( + tb, + blockDir, + startBlock, + endBlock, + chanSize, + ) + r.NoError(err) + + consensusRegistry := prometheus.NewRegistry() + r.NoError(prefixGatherer.Register("avalanche_snowman", consensusRegistry)) + + metrics, err := newConsensusMetrics(consensusRegistry) + r.NoError(err) + + return &VMExecutor{ + log: tests.NewDefaultLogger("vm-executor"), vm: vm, metrics: metrics, executionTimeout: executionTimeout, startBlock: startBlock, endBlock: endBlock, + blkChan: blockChan, // ETA tracker uses a 10-sample moving window to smooth rate estimates, // and a 1.2 slowdown factor to slightly pad ETA early in the run, // tapering to 1.0 as progress approaches 100%. etaTracker: timer.NewEtaTracker(10, 1.2), - }, nil + } } -func (e *vmExecutor) execute(ctx context.Context, blockBytes []byte) error { +func (e *VMExecutor) execute(ctx context.Context, blockBytes []byte) error { blk, err := e.vm.ParseBlock(ctx, blockBytes) if err != nil { return fmt.Errorf("failed to parse block: %w", err) @@ -75,7 +114,7 @@ func (e *vmExecutor) execute(ctx context.Context, blockBytes []byte) error { return nil } -func (e *vmExecutor) executeSequence(ctx context.Context, blkChan <-chan blockResult) error { +func (e *VMExecutor) Run(ctx context.Context) error { blkID, err := e.vm.LastAccepted(ctx) if err != nil { return fmt.Errorf("failed to get last accepted block: %w", err) @@ -101,7 +140,7 @@ func (e *vmExecutor) executeSequence(ctx context.Context, blkChan <-chan blockRe defer cancel() } - for blkResult := range blkChan { + for blkResult := range e.blkChan { if blkResult.err != nil { return blkResult.err } @@ -140,6 +179,178 @@ func (e *vmExecutor) executeSequence(ctx context.Context, blkChan <-chan blockRe return nil } +type VMParams struct { + GenesisBytes []byte + UpgradeBytes []byte + ConfigBytes []byte + SubnetID ids.ID + ChainID ids.ID +} + +// NewMainnetVM creates and initializes a VM configured for mainnet block +// reexecution tests. The VM is initialized with mainnet-specific settings +// including the mainnet network ID, upgrade schedule, and chain configurations. +// Both subnetID and chainID must correspond to subnets/chains that exist on mainnet. +func NewMainnetVM( + ctx context.Context, + factory vms.Factory, + db database.Database, + chainDataDir string, + metricsGatherer metrics.MultiGatherer, + vmParams VMParams, +) (block.ChainVM, error) { + vmIntf, err := factory.New(logging.NoLog{}) + if err != nil { + return nil, fmt.Errorf("failed to create VM from factory: %w", err) + } + vm := vmIntf.(block.ChainVM) + + blsKey, err := localsigner.New() + if err != nil { + return nil, fmt.Errorf("failed to create BLS key: %w", err) + } + + blsPublicKey := blsKey.PublicKey() + warpSigner := warp.NewSigner(blsKey, constants.MainnetID, vmParams.ChainID) + + sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), db) + atomicMemory := atomic.NewMemory(sharedMemoryDB) + + chainIDToSubnetID := map[ids.ID]ids.ID{ + mainnetXChainID: constants.PrimaryNetworkID, + MainnetCChainID: constants.PrimaryNetworkID, + vmParams.ChainID: vmParams.SubnetID, + ids.Empty: constants.PrimaryNetworkID, + } + + if err := vm.Initialize( + ctx, + &snow.Context{ + NetworkID: constants.MainnetID, + SubnetID: vmParams.SubnetID, + ChainID: vmParams.ChainID, + NodeID: ids.GenerateTestNodeID(), + PublicKey: blsPublicKey, + NetworkUpgrades: upgrade.Mainnet, + + XChainID: mainnetXChainID, + CChainID: MainnetCChainID, + AVAXAssetID: mainnetAvaxAssetID, + + Log: tests.NewDefaultLogger("mainnet-vm-reexecution"), + SharedMemory: atomicMemory.NewSharedMemory(vmParams.ChainID), + BCLookup: ids.NewAliaser(), + Metrics: metricsGatherer, + + WarpSigner: warpSigner, + + ValidatorState: &validatorstest.State{ + GetSubnetIDF: func(_ context.Context, chainID ids.ID) (ids.ID, error) { + subnetID, ok := chainIDToSubnetID[chainID] + if ok { + return subnetID, nil + } + return ids.Empty, fmt.Errorf("unknown chainID: %s", chainID) + }, + }, + ChainDataDir: chainDataDir, + }, + prefixdb.New([]byte("vm"), db), + vmParams.GenesisBytes, + vmParams.UpgradeBytes, + vmParams.ConfigBytes, + nil, + &enginetest.Sender{}, + ); err != nil { + return nil, fmt.Errorf("failed to initialize VM: %w", err) + } + + return vm, nil +} + +// StartServer starts a Prometheus server for the provided gatherer and returns +// the server address. +func StartServer( + tb testing.TB, + log logging.Logger, + gatherer prometheus.Gatherer, + port uint64, +) string { + r := require.New(tb) + + server, err := tests.NewPrometheusServerWithPort(gatherer, port) + r.NoError(err) + + log.Info("metrics endpoint available", + zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), + ) + + tb.Cleanup(func() { + r.NoError(server.Stop()) + }) + + return server.Address() +} + +// StartCollector starts a Prometheus collector configured to scrape the server +// listening on serverAddr. StartCollector also attaches the provided labels + +// Github labels if available to the collected metrics. +func StartCollector( + tb testing.TB, + log logging.Logger, + sdConfigName string, + labels map[string]string, + serverAddr string, + networkUUID string, + dashboardPath string, +) { + r := require.New(tb) + + startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) + defer cancel() + + logger := tests.NewDefaultLogger("prometheus") + r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) + + var sdConfigFilePath string + tb.Cleanup(func() { + // Ensure a final metrics scrape. + // This default delay is set above the default scrape interval used by StartPrometheus. + time.Sleep(tmpnet.NetworkShutdownDelay) + + r.NoError(func() error { + if sdConfigFilePath != "" { + return os.Remove(sdConfigFilePath) + } + return nil + }(), + ) + + //nolint:usetesting // t.Context() is already canceled inside the cleanup function + checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) + defer cancel() + r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) + }) + + sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(sdConfigName, tmpnet.SDConfig{ + Targets: []string{serverAddr}, + Labels: labels, + }, true /* withGitHubLabels */) + r.NoError(err) + + var ( + grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath + startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) + ) + + log.Info("metrics available via grafana", + zap.String( + "url", + tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), + ), + ) +} + type blockResult struct { blockBytes []byte height uint64 @@ -207,3 +418,26 @@ func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, end func blockKey(height uint64) []byte { return binary.BigEndian.AppendUint64(nil, height) } + +type consensusMetrics struct { + lastAcceptedHeight prometheus.Gauge +} + +// newConsensusMetrics creates a subset of the metrics from snowman consensus +// [engine](../../snow/engine/snowman/metrics.go). +// +// The registry passed in is expected to be registered with the prefix +// "avalanche_snowman" and the chain label (ex. chain="C") that would be handled +// by the[chain manager](../../../chains/manager.go). +func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { + m := &consensusMetrics{ + lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "last_accepted_height", + Help: "last height accepted", + }), + } + if err := registry.Register(m.lastAcceptedHeight); err != nil { + return nil, fmt.Errorf("failed to register last accepted height metric: %w", err) + } + return m, nil +} From f7bd20e3aea5e449cab32f5d5d08474f4db9ae04 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Sun, 2 Nov 2025 13:08:54 -0500 Subject: [PATCH 09/15] chore: unnuke metrics.go --- tests/reexecute/metrics.go | 104 +++++++++++++++++++++++++++++++++ tests/reexecute/vm_executor.go | 86 --------------------------- 2 files changed, 104 insertions(+), 86 deletions(-) create mode 100644 tests/reexecute/metrics.go diff --git a/tests/reexecute/metrics.go b/tests/reexecute/metrics.go new file mode 100644 index 000000000000..c62dc93e02f8 --- /dev/null +++ b/tests/reexecute/metrics.go @@ -0,0 +1,104 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package reexecute + +import ( + "context" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/tests" + "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" + "github.com/ava-labs/avalanchego/utils/logging" +) + +// StartServer starts a Prometheus server for the provided gatherer and returns +// the server address. +func StartServer( + tb testing.TB, + log logging.Logger, + gatherer prometheus.Gatherer, + port uint64, +) string { + r := require.New(tb) + + server, err := tests.NewPrometheusServerWithPort(gatherer, port) + r.NoError(err) + + log.Info("metrics endpoint available", + zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), + ) + + tb.Cleanup(func() { + r.NoError(server.Stop()) + }) + + return server.Address() +} + +// StartCollector starts a Prometheus collector configured to scrape the server +// listening on serverAddr. StartCollector also attaches the provided labels + +// Github labels if available to the collected metrics. +func StartCollector( + tb testing.TB, + log logging.Logger, + sdConfigName string, + labels map[string]string, + serverAddr string, + networkUUID string, + dashboardPath string, +) { + r := require.New(tb) + + startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) + defer cancel() + + logger := tests.NewDefaultLogger("prometheus") + r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) + + var sdConfigFilePath string + tb.Cleanup(func() { + // Ensure a final metrics scrape. + // This default delay is set above the default scrape interval used by StartPrometheus. + time.Sleep(tmpnet.NetworkShutdownDelay) + + r.NoError(func() error { + if sdConfigFilePath != "" { + return os.Remove(sdConfigFilePath) + } + return nil + }(), + ) + + //nolint:usetesting // t.Context() is already canceled inside the cleanup function + checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) + defer cancel() + r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) + }) + + sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(sdConfigName, tmpnet.SDConfig{ + Targets: []string{serverAddr}, + Labels: labels, + }, true /* withGitHubLabels */) + r.NoError(err) + + var ( + grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath + startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) + ) + + log.Info("metrics available via grafana", + zap.String( + "url", + tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), + ), + ) +} diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index 6c3c6c40d7a9..41e57df50bcc 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -7,8 +7,6 @@ import ( "context" "encoding/binary" "fmt" - "os" - "strconv" "testing" "time" @@ -27,7 +25,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/validators/validatorstest" "github.com/ava-labs/avalanchego/tests" - "github.com/ava-labs/avalanchego/tests/fixture/tmpnet" "github.com/ava-labs/avalanchego/upgrade" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" @@ -268,89 +265,6 @@ func NewMainnetVM( return vm, nil } -// StartServer starts a Prometheus server for the provided gatherer and returns -// the server address. -func StartServer( - tb testing.TB, - log logging.Logger, - gatherer prometheus.Gatherer, - port uint64, -) string { - r := require.New(tb) - - server, err := tests.NewPrometheusServerWithPort(gatherer, port) - r.NoError(err) - - log.Info("metrics endpoint available", - zap.String("url", fmt.Sprintf("http://%s/ext/metrics", server.Address())), - ) - - tb.Cleanup(func() { - r.NoError(server.Stop()) - }) - - return server.Address() -} - -// StartCollector starts a Prometheus collector configured to scrape the server -// listening on serverAddr. StartCollector also attaches the provided labels + -// Github labels if available to the collected metrics. -func StartCollector( - tb testing.TB, - log logging.Logger, - sdConfigName string, - labels map[string]string, - serverAddr string, - networkUUID string, - dashboardPath string, -) { - r := require.New(tb) - - startPromCtx, cancel := context.WithTimeout(tb.Context(), tests.DefaultTimeout) - defer cancel() - - logger := tests.NewDefaultLogger("prometheus") - r.NoError(tmpnet.StartPrometheus(startPromCtx, logger)) - - var sdConfigFilePath string - tb.Cleanup(func() { - // Ensure a final metrics scrape. - // This default delay is set above the default scrape interval used by StartPrometheus. - time.Sleep(tmpnet.NetworkShutdownDelay) - - r.NoError(func() error { - if sdConfigFilePath != "" { - return os.Remove(sdConfigFilePath) - } - return nil - }(), - ) - - //nolint:usetesting // t.Context() is already canceled inside the cleanup function - checkMetricsCtx, cancel := context.WithTimeout(context.Background(), tests.DefaultTimeout) - defer cancel() - r.NoError(tmpnet.CheckMetricsExist(checkMetricsCtx, logger, networkUUID)) - }) - - sdConfigFilePath, err := tmpnet.WritePrometheusSDConfig(sdConfigName, tmpnet.SDConfig{ - Targets: []string{serverAddr}, - Labels: labels, - }, true /* withGitHubLabels */) - r.NoError(err) - - var ( - grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath - startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) - ) - - log.Info("metrics available via grafana", - zap.String( - "url", - tmpnet.NewGrafanaURI(networkUUID, startTime, "", grafanaURI), - ), - ) -} - type blockResult struct { blockBytes []byte height uint64 From d0872e6db3c3ea1419771dc58bf9f7c23c6087c9 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Sun, 2 Nov 2025 13:10:11 -0500 Subject: [PATCH 10/15] chore: exported methods go first --- tests/reexecute/vm_executor.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index 41e57df50bcc..bdb160ce1f99 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -94,23 +94,6 @@ func NewVMExecutor( } } -func (e *VMExecutor) execute(ctx context.Context, blockBytes []byte) error { - blk, err := e.vm.ParseBlock(ctx, blockBytes) - if err != nil { - return fmt.Errorf("failed to parse block: %w", err) - } - if err := blk.Verify(ctx); err != nil { - return fmt.Errorf("failed to verify block %s at height %d: %w", blk.ID(), blk.Height(), err) - } - - if err := blk.Accept(ctx); err != nil { - return fmt.Errorf("failed to accept block %s at height %d: %w", blk.ID(), blk.Height(), err) - } - e.metrics.lastAcceptedHeight.Set(float64(blk.Height())) - - return nil -} - func (e *VMExecutor) Run(ctx context.Context) error { blkID, err := e.vm.LastAccepted(ctx) if err != nil { @@ -176,6 +159,23 @@ func (e *VMExecutor) Run(ctx context.Context) error { return nil } +func (e *VMExecutor) execute(ctx context.Context, blockBytes []byte) error { + blk, err := e.vm.ParseBlock(ctx, blockBytes) + if err != nil { + return fmt.Errorf("failed to parse block: %w", err) + } + if err := blk.Verify(ctx); err != nil { + return fmt.Errorf("failed to verify block %s at height %d: %w", blk.ID(), blk.Height(), err) + } + + if err := blk.Accept(ctx); err != nil { + return fmt.Errorf("failed to accept block %s at height %d: %w", blk.ID(), blk.Height(), err) + } + e.metrics.lastAcceptedHeight.Set(float64(blk.Height())) + + return nil +} + type VMParams struct { GenesisBytes []byte UpgradeBytes []byte From ea2676bc564f161e259bcad0cf57676c0c4972fc Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Mon, 3 Nov 2025 19:44:10 -0500 Subject: [PATCH 11/15] chore: remove err from createBlockChanFromLevelDB() --- tests/reexecute/export.go | 3 +-- tests/reexecute/vm_executor.go | 11 ++++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/reexecute/export.go b/tests/reexecute/export.go index 3050ba83556a..1e0042e9604a 100644 --- a/tests/reexecute/export.go +++ b/tests/reexecute/export.go @@ -18,8 +18,7 @@ import ( // destination LevelDB directory for the specified block range [startBlock, endBlock]. func ExportBlockRange(tb testing.TB, blockDirSrc string, blockDirDst string, startBlock, endBlock uint64, chanSize int) { r := require.New(tb) - blockChan, err := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) - r.NoError(err) + blockChan := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) db, err := leveldb.New(blockDirDst, nil, logging.NoLog{}, prometheus.NewRegistry()) r.NoError(err) diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index bdb160ce1f99..6173ee83c814 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -64,14 +64,13 @@ func NewVMExecutor( ) *VMExecutor { r := require.New(tb) - blockChan, err := createBlockChanFromLevelDB( + blockChan := createBlockChanFromLevelDB( tb, blockDir, startBlock, endBlock, chanSize, ) - r.NoError(err) consensusRegistry := prometheus.NewRegistry() r.NoError(prefixGatherer.Register("avalanche_snowman", consensusRegistry)) @@ -271,14 +270,12 @@ type blockResult struct { err error } -func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, endBlock uint64, chanSize int) (<-chan blockResult, error) { +func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, endBlock uint64, chanSize int) <-chan blockResult { r := require.New(tb) ch := make(chan blockResult, chanSize) db, err := leveldb.New(sourceDir, nil, logging.NoLog{}, prometheus.NewRegistry()) - if err != nil { - return nil, fmt.Errorf("failed to create leveldb database from %q: %w", sourceDir, err) - } + r.NoError(err, "failed to create leveldb database from %q", sourceDir) tb.Cleanup(func() { r.NoError(db.Close()) }) @@ -326,7 +323,7 @@ func createBlockChanFromLevelDB(tb testing.TB, sourceDir string, startBlock, end } }() - return ch, nil + return ch } func blockKey(height uint64) []byte { From 4c21f9603d1cf803f2597204463b113148205327 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Mon, 3 Nov 2025 19:57:30 -0500 Subject: [PATCH 12/15] docs: nits --- tests/reexecute/c/vm_reexecute_test.go | 6 ++++-- tests/reexecute/metrics.go | 4 ++-- tests/reexecute/vm_executor.go | 5 +++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 44639fe02ba8..22d011a8c8b3 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -170,7 +170,9 @@ func benchmarkReexecuteRange( serverAddr := reexecute.StartServer(b, log, prefixGatherer, metricsPort) if metricsCollectorEnabled { - dashboardPath := "d/Gl1I20mnk/c-chain" + // cChainDashboardPath is the Grafana dashboard path for the C-Chain + // metrics visualization. + cChainDashboardPath := "d/Gl1I20mnk/c-chain" reexecute.StartCollector( b, log, @@ -178,7 +180,7 @@ func benchmarkReexecuteRange( labels, serverAddr, networkUUID, - dashboardPath, + cChainDashboardPath, ) } } diff --git a/tests/reexecute/metrics.go b/tests/reexecute/metrics.go index c62dc93e02f8..22684a7f8bd1 100644 --- a/tests/reexecute/metrics.go +++ b/tests/reexecute/metrics.go @@ -54,7 +54,7 @@ func StartCollector( labels map[string]string, serverAddr string, networkUUID string, - dashboardPath string, + grafanaDashboardPath string, ) { r := require.New(tb) @@ -91,7 +91,7 @@ func StartCollector( r.NoError(err) var ( - grafanaURI = tmpnet.DefaultBaseGrafanaURI + dashboardPath + grafanaURI = tmpnet.DefaultBaseGrafanaURI + grafanaDashboardPath startTime = strconv.FormatInt(time.Now().UnixMilli(), 10) ) diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index 6173ee83c814..dcba630ed880 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -52,6 +52,9 @@ type VMExecutor struct { etaTracker *timer.EtaTracker } +// NewVMExecutor creates a VMExecutor that reexecutes blocks from startBlock to +// endBlock. NewVMExecutor starts reading blocks from blockDir and sets up +// metrics for the reexecution test. func NewVMExecutor( tb testing.TB, vm block.ChainVM, @@ -93,6 +96,8 @@ func NewVMExecutor( } } +// Run reexecutes the blocks against the VM. It parses, verifies, and accepts +// each block while tracking progress and respecting the execution timeout if configured. func (e *VMExecutor) Run(ctx context.Context) error { blkID, err := e.vm.LastAccepted(ctx) if err != nil { From ee16107ecc558217540c6c2b7918aa3d8f645352 Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Tue, 4 Nov 2025 09:05:14 -0500 Subject: [PATCH 13/15] chore: address copilot review --- tests/reexecute/vm_executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index dcba630ed880..f403883b2e09 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -344,7 +344,7 @@ type consensusMetrics struct { // // The registry passed in is expected to be registered with the prefix // "avalanche_snowman" and the chain label (ex. chain="C") that would be handled -// by the[chain manager](../../../chains/manager.go). +// by the [chain manager](../../chains/manager.go). func newConsensusMetrics(registry prometheus.Registerer) (*consensusMetrics, error) { m := &consensusMetrics{ lastAcceptedHeight: prometheus.NewGauge(prometheus.GaugeOpts{ From 3b45a8d4fb4f3f82856c216a2e48fae5aea8fdec Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Wed, 5 Nov 2025 09:06:06 -0500 Subject: [PATCH 14/15] chore: nuke export.go --- tests/reexecute/export.go | 40 ---------------------------------- tests/reexecute/vm_executor.go | 26 ++++++++++++++++++++++ 2 files changed, 26 insertions(+), 40 deletions(-) delete mode 100644 tests/reexecute/export.go diff --git a/tests/reexecute/export.go b/tests/reexecute/export.go deleted file mode 100644 index 1e0042e9604a..000000000000 --- a/tests/reexecute/export.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package reexecute - -import ( - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/database/leveldb" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/units" -) - -// ExportBlockRange copies blocks from a source LevelDB directory to a -// destination LevelDB directory for the specified block range [startBlock, endBlock]. -func ExportBlockRange(tb testing.TB, blockDirSrc string, blockDirDst string, startBlock, endBlock uint64, chanSize int) { - r := require.New(tb) - blockChan := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) - - db, err := leveldb.New(blockDirDst, nil, logging.NoLog{}, prometheus.NewRegistry()) - r.NoError(err) - tb.Cleanup(func() { - r.NoError(db.Close()) - }) - - batch := db.NewBatch() - for blkResult := range blockChan { - r.NoError(batch.Put(blockKey(blkResult.height), blkResult.blockBytes)) - - if batch.Size() > 10*units.MiB { - r.NoError(batch.Write()) - batch = db.NewBatch() - } - } - - r.NoError(batch.Write()) -} diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index f403883b2e09..6731bdd884ce 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -30,6 +30,7 @@ import ( "github.com/ava-labs/avalanchego/utils/crypto/bls/signer/localsigner" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/timer" + "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/avalanchego/vms" "github.com/ava-labs/avalanchego/vms/platformvm/warp" ) @@ -269,6 +270,31 @@ func NewMainnetVM( return vm, nil } +// ExportBlockRange copies blocks from a source LevelDB directory to a +// destination LevelDB directory for the specified block range [startBlock, endBlock]. +func ExportBlockRange(tb testing.TB, blockDirSrc string, blockDirDst string, startBlock, endBlock uint64, chanSize int) { + r := require.New(tb) + blockChan := createBlockChanFromLevelDB(tb, blockDirSrc, startBlock, endBlock, chanSize) + + db, err := leveldb.New(blockDirDst, nil, logging.NoLog{}, prometheus.NewRegistry()) + r.NoError(err) + tb.Cleanup(func() { + r.NoError(db.Close()) + }) + + batch := db.NewBatch() + for blkResult := range blockChan { + r.NoError(batch.Put(blockKey(blkResult.height), blkResult.blockBytes)) + + if batch.Size() > 10*units.MiB { + r.NoError(batch.Write()) + batch = db.NewBatch() + } + } + + r.NoError(batch.Write()) +} + type blockResult struct { blockBytes []byte height uint64 From ced37fe2868d0f8f51ab74809f00010b97501dbc Mon Sep 17 00:00:00 2001 From: Rodrigo Villar Date: Thu, 6 Nov 2025 09:16:12 -0500 Subject: [PATCH 15/15] chore: chainToSubnet --- tests/reexecute/c/vm_reexecute_test.go | 6 ++++++ tests/reexecute/vm_executor.go | 16 ++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 22d011a8c8b3..4a38d041ef65 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -24,6 +24,7 @@ import ( "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/database/leveldb" "github.com/ava-labs/avalanchego/genesis" + "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/tests" "github.com/ava-labs/avalanchego/tests/reexecute" "github.com/ava-labs/avalanchego/utils/constants" @@ -214,6 +215,11 @@ func benchmarkReexecuteRange( ConfigBytes: configBytes, SubnetID: constants.PrimaryNetworkID, ChainID: reexecute.MainnetCChainID, + ChainToSubnet: map[ids.ID]ids.ID{ + reexecute.MainnetXChainID: constants.PrimaryNetworkID, + reexecute.MainnetCChainID: constants.PrimaryNetworkID, + ids.Empty: constants.PrimaryNetworkID, + }, } vm, err := reexecute.NewMainnetVM( diff --git a/tests/reexecute/vm_executor.go b/tests/reexecute/vm_executor.go index 6731bdd884ce..60c676f75b70 100644 --- a/tests/reexecute/vm_executor.go +++ b/tests/reexecute/vm_executor.go @@ -37,8 +37,8 @@ import ( var ( MainnetCChainID = ids.FromStringOrPanic("2q9e4r6Mu3U68nU1fYjgbR6JvwrRx36CohpAX5UQxse55x1Q5") + MainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") - mainnetXChainID = ids.FromStringOrPanic("2oYMBNV4eNHyqk2fjjV5nVQLDbtmNJzq5s3qs3Lo6ftnC6FByM") mainnetAvaxAssetID = ids.FromStringOrPanic("FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z") ) @@ -187,6 +187,9 @@ type VMParams struct { ConfigBytes []byte SubnetID ids.ID ChainID ids.ID + // ChainToSubnet maps chain IDs to their subnet IDs. This mapping is used by + // the VM to validate cross-chain operations and warp messages. + ChainToSubnet map[ids.ID]ids.ID } // NewMainnetVM creates and initializes a VM configured for mainnet block @@ -218,13 +221,6 @@ func NewMainnetVM( sharedMemoryDB := prefixdb.New([]byte("sharedmemory"), db) atomicMemory := atomic.NewMemory(sharedMemoryDB) - chainIDToSubnetID := map[ids.ID]ids.ID{ - mainnetXChainID: constants.PrimaryNetworkID, - MainnetCChainID: constants.PrimaryNetworkID, - vmParams.ChainID: vmParams.SubnetID, - ids.Empty: constants.PrimaryNetworkID, - } - if err := vm.Initialize( ctx, &snow.Context{ @@ -235,7 +231,7 @@ func NewMainnetVM( PublicKey: blsPublicKey, NetworkUpgrades: upgrade.Mainnet, - XChainID: mainnetXChainID, + XChainID: MainnetXChainID, CChainID: MainnetCChainID, AVAXAssetID: mainnetAvaxAssetID, @@ -248,7 +244,7 @@ func NewMainnetVM( ValidatorState: &validatorstest.State{ GetSubnetIDF: func(_ context.Context, chainID ids.ID) (ids.ID, error) { - subnetID, ok := chainIDToSubnetID[chainID] + subnetID, ok := vmParams.ChainToSubnet[chainID] if ok { return subnetID, nil }