diff --git a/chains/manager.go b/chains/manager.go index 61e40f789ddf..01dbd352ba45 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -35,6 +35,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/engine/snowman/syncer" + "github.com/ava-labs/avalanchego/snow/engine/unified" "github.com/ava-labs/avalanchego/snow/networking/handler" "github.com/ava-labs/avalanchego/snow/networking/router" "github.com/ava-labs/avalanchego/snow/networking/sender" @@ -63,7 +64,6 @@ import ( p2ppb "github.com/ava-labs/avalanchego/proto/pb/p2p" smcon "github.com/ava-labs/avalanchego/snow/consensus/snowman" - aveng "github.com/ava-labs/avalanchego/snow/engine/avalanche" avbootstrap "github.com/ava-labs/avalanchego/snow/engine/avalanche/bootstrap" avagetter "github.com/ava-labs/avalanchego/snow/engine/avalanche/getter" smeng "github.com/ava-labs/avalanchego/snow/engine/snowman" @@ -933,7 +933,6 @@ func (m *manager) createAvalancheChain( // to make sure start callbacks are duly initialized snowmanEngineConfig := smeng.Config{ Ctx: ctx, - AllGetsServer: snowGetHandler, VM: vmWrappingProposerVM, Sender: snowmanMessageSender, Validators: vdrs, @@ -941,15 +940,6 @@ func (m *manager) createAvalancheChain( Params: consensusParams, Consensus: snowmanConsensus, } - var snowmanEngine common.Engine - snowmanEngine, err = smeng.New(snowmanEngineConfig) - if err != nil { - return nil, fmt.Errorf("error initializing snowman engine: %w", err) - } - - if m.TracingEnabled { - snowmanEngine = common.TraceEngine(snowmanEngine, m.Tracer) - } // create bootstrap gear bootstrapCfg := smbootstrap.Config{ @@ -968,18 +958,6 @@ func (m *manager) createAvalancheChain( DB: blockBootstrappingDB, VM: vmWrappingProposerVM, } - var snowmanBootstrapper common.BootstrapableEngine - snowmanBootstrapper, err = smbootstrap.New( - bootstrapCfg, - snowmanEngine.Start, - ) - if err != nil { - return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err) - } - - if m.TracingEnabled { - snowmanBootstrapper = common.TraceBootstrapableEngine(snowmanBootstrapper, m.Tracer) - } avaGetHandler, err := avagetter.New( vtxManager, @@ -993,12 +971,6 @@ func (m *manager) createAvalancheChain( return nil, fmt.Errorf("couldn't initialize avalanche base message handler: %w", err) } - // create engine gear - avalancheEngine := aveng.New(ctx, avaGetHandler, linearizableVM) - if m.TracingEnabled { - avalancheEngine = common.TraceEngine(avalancheEngine, m.Tracer) - } - // create bootstrap gear avalancheBootstrapperConfig := avbootstrap.Config{ AllGetsServer: avaGetHandler, @@ -1016,32 +988,33 @@ func (m *manager) createAvalancheChain( avalancheBootstrapperConfig.StopVertexID = m.Upgrades.CortinaXChainStopVertexID } - var avalancheBootstrapper common.BootstrapableEngine - avalancheBootstrapper, err = avbootstrap.New( - avalancheBootstrapperConfig, - snowmanBootstrapper.Start, - avalancheMetrics, - ) + ef := &unified.EngineFactory{ + TracingEnabled: m.TracingEnabled, + GetServer: snowGetHandler, + AvaAncestorGetter: avaGetHandler, + AvaMetrics: avalancheMetrics, + Tracer: m.Tracer, + BootConfig: bootstrapCfg, + AvaBootConfig: avalancheBootstrapperConfig, + SnowmanConfig: snowmanEngineConfig, + Logger: ctx.Log, + } + + ctx.State.Set(snow.EngineState{ + Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, + State: snow.Bootstrapping, + }) + + ue, err := unified.EngineFromEngines(ctx, ef, vm) if err != nil { - return nil, fmt.Errorf("error initializing avalanche bootstrapper: %w", err) + return nil, fmt.Errorf("error initializing unified engine: %w", err) } + engine := common.Engine(ue) if m.TracingEnabled { - avalancheBootstrapper = common.TraceBootstrapableEngine(avalancheBootstrapper, m.Tracer) + engine = common.TraceEngine(ue, m.Tracer) } - - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: avalancheBootstrapper, - Consensus: avalancheEngine, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: snowmanBootstrapper, - Consensus: snowmanEngine, - }, - }) + h.SetEngine(engine) // Register health check for this chain if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil { @@ -1327,7 +1300,6 @@ func (m *manager) createSnowmanChain( // to make sure start callbacks are duly initialized engineConfig := smeng.Config{ Ctx: ctx, - AllGetsServer: snowGetHandler, VM: vm, Sender: messageSender, Validators: vdrs, @@ -1336,15 +1308,6 @@ func (m *manager) createSnowmanChain( Consensus: consensus, PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID, } - var engine common.Engine - engine, err = smeng.New(engineConfig) - if err != nil { - return nil, fmt.Errorf("error initializing snowman engine: %w", err) - } - - if m.TracingEnabled { - engine = common.TraceEngine(engine, m.Tracer) - } // create bootstrap gear bootstrapCfg := smbootstrap.Config{ @@ -1364,18 +1327,6 @@ func (m *manager) createSnowmanChain( VM: vm, Bootstrapped: bootstrapFunc, } - var bootstrapper common.BootstrapableEngine - bootstrapper, err = smbootstrap.New( - bootstrapCfg, - engine.Start, - ) - if err != nil { - return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err) - } - - if m.TracingEnabled { - bootstrapper = common.TraceBootstrapableEngine(bootstrapper, m.Tracer) - } // create state sync gear stateSyncCfg, err := syncer.NewConfig( @@ -1392,24 +1343,42 @@ func (m *manager) createSnowmanChain( if err != nil { return nil, fmt.Errorf("couldn't initialize state syncer configuration: %w", err) } - stateSyncer := syncer.New( - stateSyncCfg, - bootstrapper.Start, - ) - if m.TracingEnabled { - stateSyncer = common.TraceStateSyncer(stateSyncer, m.Tracer) + ef := &unified.EngineFactory{ + TracingEnabled: m.TracingEnabled, + GetServer: snowGetHandler, + AvaAncestorGetter: invalidEngineAncestorsGetter{}, + StateSync: hasStateSync(stateSyncCfg), + Tracer: m.Tracer, + BootConfig: bootstrapCfg, + SnowmanConfig: engineConfig, + StateSyncConfig: stateSyncCfg, + Logger: ctx.Log, } - h.SetEngineManager(&handler.EngineManager{ - Avalanche: nil, - Snowman: &handler.Engine{ - StateSyncer: stateSyncer, - Bootstrapper: bootstrapper, - Consensus: engine, - }, + ctx.State.Set(snow.EngineState{ + Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, }) + if !ef.HasStateSync() { + ctx.State.Set(snow.EngineState{ + Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }) + } + + ue, err := unified.EngineFromEngines(ctx, ef, vm) + if err != nil { + return nil, fmt.Errorf("error initializing unified engine: %w", err) + } + + engine := common.Engine(ue) + if m.TracingEnabled { + engine = common.TraceEngine(ue, m.Tracer) + } + h.SetEngine(engine) + // Register health checks if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil { return nil, fmt.Errorf("couldn't add health check for chain %s: %w", primaryAlias, err) @@ -1423,6 +1392,17 @@ func (m *manager) createSnowmanChain( }, nil } +func hasStateSync(stateSyncCfg syncer.Config) bool { + var hasStateSync bool + ssVM, isStateSyncable := stateSyncCfg.VM.(block.StateSyncableVM) + if isStateSyncable { + if enabled, err := ssVM.StateSyncEnabled(context.Background()); err == nil && enabled { + hasStateSync = true + } + } + return hasStateSync +} + func (m *manager) IsBootstrapped(id ids.ID) bool { m.chainsLock.Lock() chain, exists := m.chains[id] @@ -1581,3 +1561,9 @@ func (m *manager) getOrMakeVMRegisterer(vmID ids.ID, chainAlias string) (metrics ) return chainReg, err } + +type invalidEngineAncestorsGetter struct{} + +func (invalidEngineAncestorsGetter) GetAncestors(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID, engineType p2ppb.EngineType) error { + return fmt.Errorf("this engine does not run %s", engineType) +} diff --git a/scripts/mocks.mockgen.txt b/scripts/mocks.mockgen.txt index 97efc98b846e..9061c4a34312 100644 --- a/scripts/mocks.mockgen.txt +++ b/scripts/mocks.mockgen.txt @@ -11,6 +11,7 @@ github.com/ava-labs/avalanchego/snow/engine/snowman/block=BuildBlockWithContextC github.com/ava-labs/avalanchego/snow/engine/snowman/block=ChainVM=snow/engine/snowman/block/blockmock/chain_vm.go github.com/ava-labs/avalanchego/snow/engine/snowman/block=StateSyncableVM=snow/engine/snowman/block/blockmock/state_syncable_vm.go github.com/ava-labs/avalanchego/snow/engine/snowman/block=WithVerifyContext=snow/engine/snowman/block/blockmock/with_verify_context.go +github.com/ava-labs/avalanchego/snow/engine/unified=Factory=snow/engine/unified/mocks/factory.go github.com/ava-labs/avalanchego/snow/networking/handler=Handler=snow/networking/handler/handlermock/handler.go github.com/ava-labs/avalanchego/snow/networking/timeout=Manager=snow/networking/timeout/timeoutmock/manager.go github.com/ava-labs/avalanchego/snow/networking/tracker=Targeter=snow/networking/tracker/trackermock/targeter.go diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index e8393274c0f8..0730300040c9 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -13,7 +13,6 @@ import ( "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/avalanche" @@ -43,7 +42,7 @@ const ( epsilon = 1e-6 // small amount to add to time to avoid division by 0 ) -var _ common.BootstrapableEngine = (*Bootstrapper)(nil) +var _ common.AvalancheBootstrapableEngine = (*Bootstrapper)(nil) func New( config Config, @@ -53,15 +52,6 @@ func New( b := &Bootstrapper{ Config: config, - StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log), - AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(config.Ctx.Log), - AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(config.Ctx.Log), - AcceptedHandler: common.NewNoOpAcceptedHandler(config.Ctx.Log), - PutHandler: common.NewNoOpPutHandler(config.Ctx.Log), - QueryHandler: common.NewNoOpQueryHandler(config.Ctx.Log), - ChitsHandler: common.NewNoOpChitsHandler(config.Ctx.Log), - AppHandler: config.VM, - outstandingRequests: bimap.New[common.Request, ids.ID](), outstandingRequestTimes: make(map[common.Request]time.Time), @@ -77,16 +67,6 @@ type Bootstrapper struct { Config common.Halter - // list of NoOpsHandler for messages dropped by Bootstrapper - common.StateSummaryFrontierHandler - common.AcceptedStateSummaryHandler - common.AcceptedFrontierHandler - common.AcceptedHandler - common.PutHandler - common.QueryHandler - common.ChitsHandler - common.AppHandler - metrics // tracks which validators were asked for which containers in which requests @@ -320,10 +300,6 @@ func (*Bootstrapper) Notify(context.Context, common.Message) error { func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error { b.Ctx.Log.Info("starting bootstrap") - b.Ctx.State.Set(snow.EngineState{ - Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, - State: snow.Bootstrapping, - }) if err := b.VM.SetState(ctx, snow.Bootstrapping); err != nil { return fmt.Errorf("failed to notify VM that bootstrapping has started: %w", err) diff --git a/snow/engine/avalanche/bootstrap/bootstrapper_test.go b/snow/engine/avalanche/bootstrap/bootstrapper_test.go index 6a185afa8e39..b643c63ff53b 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper_test.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper_test.go @@ -617,7 +617,6 @@ func TestBootstrapperIncompleteAncestors(t *testing.T) { require.Equal(vtxID1, requested) require.NoError(bs.Ancestors(context.Background(), peerID, *reqIDPtr, [][]byte{vtxBytes1})) // Provide vtx1; should request vtx0 - require.Equal(snow.Bootstrapping, bs.Context().State.Get().State) require.Equal(vtxID0, requested) manager.StopVertexAcceptedF = func(context.Context) (bool, error) { diff --git a/snow/engine/avalanche/engine.go b/snow/engine/avalanche/engine.go deleted file mode 100644 index 530a319e0fc6..000000000000 --- a/snow/engine/avalanche/engine.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package avalanche - -import ( - "context" - "errors" - - "github.com/ava-labs/avalanchego/snow" - "github.com/ava-labs/avalanchego/snow/engine/common" -) - -var ( - _ common.Engine = (*engine)(nil) - - errUnexpectedStart = errors.New("unexpectedly started engine") -) - -type engine struct { - common.AllGetsServer - - // list of NoOpsHandler for messages dropped by engine - common.StateSummaryFrontierHandler - common.AcceptedStateSummaryHandler - common.AcceptedFrontierHandler - common.AcceptedHandler - common.AncestorsHandler - common.PutHandler - common.QueryHandler - common.ChitsHandler - common.AppHandler - common.InternalHandler - - ctx *snow.ConsensusContext - vm common.VM -} - -func New( - ctx *snow.ConsensusContext, - gets common.AllGetsServer, - vm common.VM, -) common.Engine { - return &engine{ - AllGetsServer: gets, - StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(ctx.Log), - AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(ctx.Log), - AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(ctx.Log), - AcceptedHandler: common.NewNoOpAcceptedHandler(ctx.Log), - AncestorsHandler: common.NewNoOpAncestorsHandler(ctx.Log), - PutHandler: common.NewNoOpPutHandler(ctx.Log), - QueryHandler: common.NewNoOpQueryHandler(ctx.Log), - ChitsHandler: common.NewNoOpChitsHandler(ctx.Log), - AppHandler: common.NewNoOpAppHandler(ctx.Log), - InternalHandler: common.NewNoOpInternalHandler(ctx.Log), - ctx: ctx, - vm: vm, - } -} - -func (*engine) Start(context.Context, uint32) error { - return errUnexpectedStart -} - -func (e *engine) Context() *snow.ConsensusContext { - return e.ctx -} - -func (*engine) HealthCheck(context.Context) (interface{}, error) { - return nil, nil -} diff --git a/snow/engine/avalanche/getter/getter.go b/snow/engine/avalanche/getter/getter.go index f91324a1c85d..712ada520b70 100644 --- a/snow/engine/avalanche/getter/getter.go +++ b/snow/engine/avalanche/getter/getter.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/message" + "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/avalanche" "github.com/ava-labs/avalanchego/snow/engine/avalanche/vertex" @@ -101,7 +102,7 @@ func (gh *getter) GetAccepted(_ context.Context, nodeID ids.NodeID, requestID ui return nil } -func (gh *getter) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxID ids.ID) error { +func (gh *getter) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxID ids.ID, _ p2p.EngineType) error { startTime := time.Now() gh.log.Verbo("called GetAncestors", zap.Stringer("nodeID", nodeID), diff --git a/snow/engine/common/bootstrapable.go b/snow/engine/common/bootstrapable.go index 517eba2aa154..5b6a3b2966d4 100644 --- a/snow/engine/common/bootstrapable.go +++ b/snow/engine/common/bootstrapable.go @@ -3,10 +3,40 @@ package common -import "context" +import ( + "context" + + "github.com/ava-labs/avalanchego/api/health" +) type BootstrapableEngine interface { - Engine + AcceptedFrontierHandler + AcceptedHandler + AncestorsHandler + InternalHandler + + // Start engine operations from given request ID + Start(ctx context.Context, startReqID uint32) error + + // Returns nil if the engine is healthy. + // Periodically called and reported through the health API + health.Checker + + // Clear removes all containers to be processed upon bootstrapping + Clear(ctx context.Context) error +} + +type AvalancheBootstrapableEngine interface { + AncestorsHandler + + InternalHandler + + // Start engine operations from given request ID + Start(ctx context.Context, startReqID uint32) error + + // Returns nil if the engine is healthy. + // Periodically called and reported through the health API + health.Checker // Clear removes all containers to be processed upon bootstrapping Clear(ctx context.Context) error diff --git a/snow/engine/common/engine.go b/snow/engine/common/engine.go index dc39504dc76d..0f5e182fbd60 100644 --- a/snow/engine/common/engine.go +++ b/snow/engine/common/engine.go @@ -9,6 +9,7 @@ import ( "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/set" ) @@ -30,6 +31,26 @@ type Engine interface { health.Checker } +// ConsensusEngine describes the standard interface of a consensus engine. +// +// All nodeIDs are assumed to be authenticated. +// +// A consensus engine may recover after returning an error, but it isn't +// required. +type ConsensusEngine interface { + PutHandler + QueryHandler + ChitsHandler + InternalHandler + + // Start engine operations from given request ID + Start(ctx context.Context, startReqID uint32) error + + // Returns nil if the engine is healthy. + // Periodically called and reported through the health API + health.Checker +} + type Handler interface { AllGetsServer StateSummaryFrontierHandler @@ -132,6 +153,11 @@ type AcceptedStateSummaryHandler interface { ) error } +type StateHandler interface { + StateSummaryFrontierHandler + AcceptedStateSummaryHandler +} + type GetAcceptedFrontierHandler interface { // Notify this engine of a request for an AcceptedFrontier message with the // same requestID and the ID of the most recently accepted container. @@ -215,6 +241,7 @@ type GetAncestorsHandler interface { nodeID ids.NodeID, requestID uint32, containerID ids.ID, + engineType p2p.EngineType, ) error } diff --git a/snow/engine/common/state_syncer.go b/snow/engine/common/state_syncer.go index a6d159bb6949..949ab76e3a8c 100644 --- a/snow/engine/common/state_syncer.go +++ b/snow/engine/common/state_syncer.go @@ -3,14 +3,29 @@ package common -import "context" +import ( + "context" + + "github.com/ava-labs/avalanchego/api/health" +) // StateSyncer controls the selection and verification of state summaries // to drive VM state syncing. It collects the latest state summaries and elicit // votes on them, making sure that a qualified majority of nodes support the // selected state summary. type StateSyncer interface { - Engine + StateSummaryFrontierHandler + + AcceptedStateSummaryHandler + + InternalHandler + + // Start engine operations from given request ID + Start(ctx context.Context, startReqID uint32) error + + // Returns nil if the engine is healthy. + // Periodically called and reported through the health API + health.Checker // IsEnabled returns true if the underlying VM wants to perform state sync. // Any returned error will be considered fatal. diff --git a/snow/engine/common/traced_bootstrapable_engine.go b/snow/engine/common/traced_bootstrapable_engine.go deleted file mode 100644 index 4185d0291a96..000000000000 --- a/snow/engine/common/traced_bootstrapable_engine.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package common - -import ( - "context" - - "github.com/ava-labs/avalanchego/trace" -) - -var _ BootstrapableEngine = (*tracedBootstrapableEngine)(nil) - -type tracedBootstrapableEngine struct { - Engine - bootstrapableEngine BootstrapableEngine - tracer trace.Tracer -} - -func TraceBootstrapableEngine(bootstrapableEngine BootstrapableEngine, tracer trace.Tracer) BootstrapableEngine { - return &tracedBootstrapableEngine{ - Engine: TraceEngine(bootstrapableEngine, tracer), - bootstrapableEngine: bootstrapableEngine, - tracer: tracer, - } -} - -func (e *tracedBootstrapableEngine) Clear(ctx context.Context) error { - ctx, span := e.tracer.Start(ctx, "tracedBootstrapableEngine.Clear") - defer span.End() - - return e.bootstrapableEngine.Clear(ctx) -} diff --git a/snow/engine/common/traced_clearer.go b/snow/engine/common/traced_clearer.go new file mode 100644 index 000000000000..6a068f05924c --- /dev/null +++ b/snow/engine/common/traced_clearer.go @@ -0,0 +1,33 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package common + +import ( + "context" + + "github.com/ava-labs/avalanchego/trace" +) + +type Clearer interface { + Clear(ctx context.Context) error +} + +type tracedClearer struct { + backend Clearer + tracer trace.Tracer +} + +func (c *tracedClearer) Clear(ctx context.Context) error { + ctx, span := c.tracer.Start(ctx, "tracedClearer.Clear") + defer span.End() + + return c.backend.Clear(ctx) +} + +func NewTracedClearer(backend Clearer, tracer trace.Tracer) Clearer { + return &tracedClearer{ + backend: backend, + tracer: tracer, + } +} diff --git a/snow/engine/common/traced_engine.go b/snow/engine/common/traced_engine.go index a6ab2dee1878..f0b9b89efa1c 100644 --- a/snow/engine/common/traced_engine.go +++ b/snow/engine/common/traced_engine.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/trace" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" @@ -157,7 +158,7 @@ func (e *tracedEngine) GetAcceptedFailed(ctx context.Context, nodeID ids.NodeID, return e.engine.GetAcceptedFailed(ctx, nodeID, requestID) } -func (e *tracedEngine) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error { +func (e *tracedEngine) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID, engineType p2p.EngineType) error { ctx, span := e.tracer.Start(ctx, "tracedEngine.GetAncestors", oteltrace.WithAttributes( attribute.Stringer("nodeID", nodeID), attribute.Int64("requestID", int64(requestID)), @@ -165,7 +166,7 @@ func (e *tracedEngine) GetAncestors(ctx context.Context, nodeID ids.NodeID, requ )) defer span.End() - return e.engine.GetAncestors(ctx, nodeID, requestID, containerID) + return e.engine.GetAncestors(ctx, nodeID, requestID, containerID, engineType) } func (e *tracedEngine) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, containers [][]byte) error { diff --git a/snow/engine/common/traced_is_enabled.go b/snow/engine/common/traced_is_enabled.go new file mode 100644 index 000000000000..9091d3928d21 --- /dev/null +++ b/snow/engine/common/traced_is_enabled.go @@ -0,0 +1,33 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package common + +import ( + "context" + + "github.com/ava-labs/avalanchego/trace" +) + +type Enabler interface { + IsEnabled(ctx context.Context) (bool, error) +} + +type tracedIsEnabled struct { + backend Enabler + tracer trace.Tracer +} + +func NewTracedIsEnabled(backend Enabler, tracer trace.Tracer) Enabler { + return &tracedIsEnabled{ + backend: backend, + tracer: tracer, + } +} + +func (e *tracedIsEnabled) IsEnabled(ctx context.Context) (bool, error) { + ctx, span := e.tracer.Start(ctx, "tracedIsEnabled.IsEnabled") + defer span.End() + + return e.backend.IsEnabled(ctx) +} diff --git a/snow/engine/common/traced_state_syncer.go b/snow/engine/common/traced_state_syncer.go deleted file mode 100644 index e598b6094076..000000000000 --- a/snow/engine/common/traced_state_syncer.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package common - -import ( - "context" - - "github.com/ava-labs/avalanchego/trace" -) - -var _ StateSyncer = (*tracedStateSyncer)(nil) - -type tracedStateSyncer struct { - Engine - stateSyncer StateSyncer - tracer trace.Tracer -} - -func TraceStateSyncer(stateSyncer StateSyncer, tracer trace.Tracer) StateSyncer { - return &tracedStateSyncer{ - Engine: TraceEngine(stateSyncer, tracer), - stateSyncer: stateSyncer, - tracer: tracer, - } -} - -func (e *tracedStateSyncer) IsEnabled(ctx context.Context) (bool, error) { - ctx, span := e.tracer.Start(ctx, "tracedStateSyncer.IsEnabled") - defer span.End() - - return e.stateSyncer.IsEnabled(ctx) -} diff --git a/snow/engine/enginetest/engine.go b/snow/engine/enginetest/engine.go index 4bafbfd27b8b..b589071f8f54 100644 --- a/snow/engine/enginetest/engine.go +++ b/snow/engine/enginetest/engine.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/set" @@ -111,7 +112,8 @@ type Engine struct { HaltF func(context.Context) TimeoutF, GossipF, ShutdownF func(context.Context) error NotifyF func(context.Context, common.Message) error - GetF, GetAncestorsF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error + GetF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error + GetAncestorsF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID, engineType p2p.EngineType) error PullQueryF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID, requestedHeight uint64) error PutF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, container []byte) error PushQueryF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, container []byte, requestedHeight uint64) error @@ -410,9 +412,9 @@ func (e *Engine) Get(ctx context.Context, nodeID ids.NodeID, requestID uint32, c return errGet } -func (e *Engine) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error { +func (e *Engine) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID, engineType p2p.EngineType) error { if e.GetAncestorsF != nil { - return e.GetAncestorsF(ctx, nodeID, requestID, containerID) + return e.GetAncestorsF(ctx, nodeID, requestID, containerID, engineType) } if !e.CantGetAncestors { return nil diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index bece6f32a1b2..833a6861053c 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -71,14 +71,6 @@ type Bootstrapper struct { shouldHalt func() bool *metrics - // list of NoOpsHandler for messages dropped by bootstrapper - common.StateSummaryFrontierHandler - common.AcceptedStateSummaryHandler - common.PutHandler - common.QueryHandler - common.ChitsHandler - common.AppHandler - requestID uint32 // Tracks the last requestID that was used in a request started bool @@ -120,16 +112,10 @@ type Bootstrapper struct { func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) error) (*Bootstrapper, error) { metrics, err := newMetrics(config.Ctx.Registerer) return &Bootstrapper{ - shouldHalt: config.ShouldHalt, - nonVerifyingParser: config.NonVerifyingParse, - Config: config, - metrics: metrics, - StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log), - AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(config.Ctx.Log), - PutHandler: common.NewNoOpPutHandler(config.Ctx.Log), - QueryHandler: common.NewNoOpQueryHandler(config.Ctx.Log), - ChitsHandler: common.NewNoOpChitsHandler(config.Ctx.Log), - AppHandler: config.VM, + shouldHalt: config.ShouldHalt, + nonVerifyingParser: config.NonVerifyingParse, + Config: config, + metrics: metrics, minority: bootstrapper.Noop, majority: bootstrapper.Noop, diff --git a/snow/engine/snowman/config.go b/snow/engine/snowman/config.go index 3162471a2476..487f4356ede1 100644 --- a/snow/engine/snowman/config.go +++ b/snow/engine/snowman/config.go @@ -15,8 +15,7 @@ import ( // Config wraps all the parameters needed for a snowman engine type Config struct { - common.AllGetsServer - + AllGetsServer common.AllGetsServer Ctx *snow.ConsensusContext VM block.ChainVM Sender common.Sender diff --git a/snow/engine/snowman/engine.go b/snow/engine/snowman/engine.go index b1c1698b53da..600ea1966f6f 100644 --- a/snow/engine/snowman/engine.go +++ b/snow/engine/snowman/engine.go @@ -36,7 +36,7 @@ const ( errInsufficientStake = "insufficient connected stake" ) -var _ common.Engine = (*Engine)(nil) +var _ common.ConsensusEngine = (*Engine)(nil) func cachedBlockSize(_ ids.ID, blk snowman.Block) int { return ids.IDLen + len(blk.Bytes()) + constants.PointerOverhead @@ -48,13 +48,6 @@ type Engine struct { Config *metrics - // list of NoOpsHandler for messages dropped by engine - common.StateSummaryFrontierHandler - common.AcceptedStateSummaryHandler - common.AcceptedFrontierHandler - common.AcceptedHandler - common.AncestorsHandler - common.AppHandler validators.Connector requestID uint32 @@ -133,23 +126,17 @@ func New(config Config) (*Engine, error) { } return &Engine{ - Config: config, - metrics: metrics, - StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log), - AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(config.Ctx.Log), - AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(config.Ctx.Log), - AcceptedHandler: common.NewNoOpAcceptedHandler(config.Ctx.Log), - AncestorsHandler: common.NewNoOpAncestorsHandler(config.Ctx.Log), - AppHandler: config.VM, - Connector: config.VM, - pending: make(map[ids.ID]snowman.Block), - unverifiedIDToAncestor: ancestor.NewTree(), - unverifiedBlockCache: nonVerifiedCache, - acceptedFrontiers: acceptedFrontiers, - blocked: job.NewScheduler[ids.ID](), - polls: polls, - blkReqs: bimap.New[common.Request, ids.ID](), - blkReqSourceMetric: make(map[common.Request]prometheus.Counter), + Config: config, + metrics: metrics, + Connector: config.VM, + pending: make(map[ids.ID]snowman.Block), + unverifiedIDToAncestor: ancestor.NewTree(), + unverifiedBlockCache: nonVerifiedCache, + acceptedFrontiers: acceptedFrontiers, + blocked: job.NewScheduler[ids.ID](), + polls: polls, + blkReqs: bimap.New[common.Request, ids.ID](), + blkReqSourceMetric: make(map[common.Request]prometheus.Counter), }, nil } diff --git a/snow/engine/snowman/engine_test.go b/snow/engine/snowman/engine_test.go index 758153c13514..73baacd92e9b 100644 --- a/snow/engine/snowman/engine_test.go +++ b/snow/engine/snowman/engine_test.go @@ -514,7 +514,7 @@ func TestEngineRespondsToGetRequest(t *testing.T) { require.Equal(snowmantest.GenesisBytes, blk) } - require.NoError(te.Get(context.Background(), vdr, 123, snowmantest.GenesisID)) + require.NoError(te.AllGetsServer.Get(context.Background(), vdr, 123, snowmantest.GenesisID)) require.True(sentPut) } diff --git a/snow/engine/snowman/getter/getter.go b/snow/engine/snowman/getter/getter.go index 7af3d3a2263e..dd1b384eb56b 100644 --- a/snow/engine/snowman/getter/getter.go +++ b/snow/engine/snowman/getter/getter.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/utils/constants" @@ -182,7 +183,7 @@ func (gh *getter) GetAccepted(ctx context.Context, nodeID ids.NodeID, requestID return nil } -func (gh *getter) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkID ids.ID) error { +func (gh *getter) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkID ids.ID, _ p2p.EngineType) error { ancestorsBytes, err := block.GetAncestors( ctx, gh.log, diff --git a/snow/engine/snowman/syncer/state_syncer.go b/snow/engine/snowman/syncer/state_syncer.go index 76e647e73a64..09a8d060f8b5 100644 --- a/snow/engine/snowman/syncer/state_syncer.go +++ b/snow/engine/snowman/syncer/state_syncer.go @@ -12,7 +12,6 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" @@ -39,15 +38,6 @@ type weightedSummary struct { type stateSyncer struct { Config - // list of NoOpsHandler for messages dropped by state syncer - common.AcceptedFrontierHandler - common.AcceptedHandler - common.AncestorsHandler - common.PutHandler - common.QueryHandler - common.ChitsHandler - common.AppHandler - started bool // Tracks the last requestID that was used in a request @@ -96,26 +86,15 @@ func New( ) common.StateSyncer { ssVM, _ := cfg.VM.(block.StateSyncableVM) return &stateSyncer{ - Config: cfg, - AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(cfg.Ctx.Log), - AcceptedHandler: common.NewNoOpAcceptedHandler(cfg.Ctx.Log), - AncestorsHandler: common.NewNoOpAncestorsHandler(cfg.Ctx.Log), - PutHandler: common.NewNoOpPutHandler(cfg.Ctx.Log), - QueryHandler: common.NewNoOpQueryHandler(cfg.Ctx.Log), - ChitsHandler: common.NewNoOpChitsHandler(cfg.Ctx.Log), - AppHandler: cfg.VM, - stateSyncVM: ssVM, - onDoneStateSyncing: onDoneStateSyncing, + Config: cfg, + stateSyncVM: ssVM, + onDoneStateSyncing: onDoneStateSyncing, } } func (ss *stateSyncer) Start(ctx context.Context, startReqID uint32) error { ss.Ctx.Log.Info("starting state sync") - ss.Ctx.State.Set(snow.EngineState{ - Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, - State: snow.StateSyncing, - }) if err := ss.VM.SetState(ctx, snow.StateSyncing); err != nil { return fmt.Errorf("failed to notify VM that state syncing has started: %w", err) } diff --git a/snow/engine/unified/engine.go b/snow/engine/unified/engine.go new file mode 100644 index 000000000000..18b37d1320fb --- /dev/null +++ b/snow/engine/unified/engine.go @@ -0,0 +1,455 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package unified + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/p2p" + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" +) + +var _ common.Engine = (*Engine)(nil) + +type OnFinishedFunc func(ctx context.Context, lastReqID uint32) error + +type Factory interface { + ClearBootstrapDB() error + + HasStateSync() bool + + NewStateSyncer(OnFinishedFunc) (common.StateSyncer, error) + + NewSnowman() (common.ConsensusEngine, error) + + NewSnowBootstrapper(OnFinishedFunc) (common.BootstrapableEngine, error) + + NewAvalancheSyncer(OnFinishedFunc) (common.AvalancheBootstrapableEngine, error) + + NewAvalancheAncestorsGetter() common.GetAncestorsHandler + + AllGetServer() common.AllGetsServer +} + +func EngineFromEngines(ctx *snow.ConsensusContext, engineFactory Factory, vm common.VM) (*Engine, error) { + noopStateSyncer := &noopStateSyncer{ + StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(ctx.Log), + InternalHandler: common.NewNoOpInternalHandler(ctx.Log), + AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(ctx.Log), + } + + noopBootstrapper := &noopBootstrapper{ + AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(ctx.Log), + InternalHandler: common.NewNoOpInternalHandler(ctx.Log), + AncestorsHandler: common.NewNoOpAncestorsHandler(ctx.Log), + AcceptedHandler: common.NewNoOpAcceptedHandler(ctx.Log), + } + + noopSnowmanEngine := &noopSnowmanEngine{ + PutHandler: common.NewNoOpPutHandler(ctx.Log), + InternalHandler: common.NewNoOpInternalHandler(ctx.Log), + ChitsHandler: common.NewNoOpChitsHandler(ctx.Log), + QueryHandler: common.NewNoOpQueryHandler(ctx.Log), + } + + return &Engine{ + vm: vm, + noopSnowmanEngine: noopSnowmanEngine, + noopStateSyncer: noopStateSyncer, + noopBootstrapper: noopBootstrapper, + avalancheAncestorsGetter: engineFactory.NewAvalancheAncestorsGetter(), + AllGetsServer: engineFactory.AllGetServer(), + ef: engineFactory, + ctx: ctx, + Log: ctx.Log, + }, nil +} + +type Engine struct { + vm common.VM + ctx *snow.ConsensusContext + Log logging.Logger + common.AllGetsServer + avalancheAncestorsGetter common.GetAncestorsHandler + ef Factory + stateSyncer common.StateSyncer + bootstrapper common.BootstrapableEngine + avalancheBootstrapper common.AvalancheBootstrapableEngine + snowman common.ConsensusEngine + + noopStateSyncer common.StateSyncer + noopBootstrapper common.BootstrapableEngine + noopSnowmanEngine common.ConsensusEngine +} + +type noopStateSyncer struct { + noOpEngine + common.StateSummaryFrontierHandler + common.AcceptedStateSummaryHandler + common.InternalHandler +} + +type noOpEngine struct{} + +func (noOpEngine) Start(_ context.Context, _ uint32) error { + return nil +} + +func (noOpEngine) HealthCheck(_ context.Context) (interface{}, error) { + return nil, nil +} + +func (noOpEngine) IsEnabled(_ context.Context) (bool, error) { + return false, nil +} + +type noopBootstrapper struct { + noOpEngine + common.AcceptedFrontierHandler + common.AcceptedHandler + common.AncestorsHandler + common.InternalHandler +} + +func (noopBootstrapper) Clear(context.Context) error { + return nil +} + +type noopSnowmanEngine struct { + noOpEngine + common.PutHandler + common.QueryHandler + common.ChitsHandler + common.InternalHandler +} + +func (e *Engine) GetAncestors( + ctx context.Context, + nodeID ids.NodeID, + requestID uint32, + containerID ids.ID, + engineType p2p.EngineType, +) error { + switch engineType { + case p2p.EngineType_ENGINE_TYPE_AVALANCHE: + return e.avalancheAncestorsGetter.GetAncestors(ctx, nodeID, requestID, containerID, engineType) + default: + return e.AllGetsServer.GetAncestors(ctx, nodeID, requestID, containerID, engineType) + } +} + +func (e *Engine) StateSummaryFrontier(ctx context.Context, nodeID ids.NodeID, requestID uint32, summary []byte) error { + return e.stateSyncer.StateSummaryFrontier(ctx, nodeID, requestID, summary) +} + +func (e *Engine) GetStateSummaryFrontierFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + return e.stateSyncer.GetStateSummaryFrontierFailed(ctx, nodeID, requestID) +} + +func (e *Engine) AcceptedStateSummary(ctx context.Context, nodeID ids.NodeID, requestID uint32, summaryIDs set.Set[ids.ID]) error { + return e.stateSyncer.AcceptedStateSummary(ctx, nodeID, requestID, summaryIDs) +} + +func (e *Engine) GetAcceptedStateSummaryFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + return e.stateSyncer.GetAcceptedStateSummaryFailed(ctx, nodeID, requestID) +} + +func (e *Engine) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.noopBootstrapper.AcceptedFrontier(ctx, nodeID, requestID, containerID) + } + return e.bootstrapper.AcceptedFrontier(ctx, nodeID, requestID, containerID) +} + +func (e *Engine) GetAcceptedFrontierFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.noopBootstrapper.GetAcceptedFrontierFailed(ctx, nodeID, requestID) + } + return e.bootstrapper.GetAcceptedFrontierFailed(ctx, nodeID, requestID) +} + +func (e *Engine) Accepted(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerIDs set.Set[ids.ID]) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.noopBootstrapper.Accepted(ctx, nodeID, requestID, containerIDs) + } + return e.bootstrapper.Accepted(ctx, nodeID, requestID, containerIDs) +} + +func (e *Engine) GetAcceptedFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.noopBootstrapper.GetAcceptedFailed(ctx, nodeID, requestID) + } + return e.bootstrapper.GetAcceptedFailed(ctx, nodeID, requestID) +} + +func (e *Engine) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, containers [][]byte) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.avalancheBootstrapper.Ancestors(ctx, nodeID, requestID, containers) + } + return e.bootstrapper.Ancestors(ctx, nodeID, requestID, containers) +} + +func (e *Engine) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.avalancheBootstrapper.GetAncestorsFailed(ctx, nodeID, requestID) + } + return e.bootstrapper.GetAncestorsFailed(ctx, nodeID, requestID) +} + +func (e *Engine) Put(ctx context.Context, nodeID ids.NodeID, requestID uint32, container []byte) error { + return e.snowman.Put(ctx, nodeID, requestID, container) +} + +func (e *Engine) GetFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + return e.snowman.GetFailed(ctx, nodeID, requestID) +} + +func (e *Engine) PullQuery(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID, requestedHeight uint64) error { + return e.snowman.PullQuery(ctx, nodeID, requestID, containerID, requestedHeight) +} + +func (e *Engine) PushQuery(ctx context.Context, nodeID ids.NodeID, requestID uint32, container []byte, requestedHeight uint64) error { + return e.snowman.PushQuery(ctx, nodeID, requestID, container, requestedHeight) +} + +func (e *Engine) Chits(ctx context.Context, nodeID ids.NodeID, requestID uint32, preferredID ids.ID, preferredIDAtHeight ids.ID, acceptedID ids.ID, acceptedHeight uint64) error { + return e.snowman.Chits(ctx, nodeID, requestID, preferredID, preferredIDAtHeight, acceptedID, acceptedHeight) +} + +func (e *Engine) QueryFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { + return e.snowman.QueryFailed(ctx, nodeID, requestID) +} + +func (e *Engine) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error { + return e.vm.AppRequest(ctx, nodeID, requestID, deadline, request) +} + +func (e *Engine) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { + return e.vm.AppResponse(ctx, nodeID, requestID, response) +} + +func (e *Engine) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32, appErr *common.AppError) error { + return e.vm.AppRequestFailed(ctx, nodeID, requestID, appErr) +} + +func (e *Engine) AppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error { + return e.vm.AppGossip(ctx, nodeID, msg) +} + +func (e *Engine) Timeout(ctx context.Context) error { + return e.bootstrapper.Timeout(ctx) +} + +func (e *Engine) Gossip(ctx context.Context) error { + return e.snowman.Gossip(ctx) +} + +func (e *Engine) Shutdown(ctx context.Context) error { + var err1, err2, err3, err4 error + if e.stateSyncer != nil { + err1 = e.stateSyncer.Shutdown(ctx) + } + if e.bootstrapper != nil { + err2 = e.bootstrapper.Shutdown(ctx) + } + if e.snowman != nil { + err3 = e.snowman.Shutdown(ctx) + } + if e.avalancheBootstrapper != nil { + err4 = e.avalancheBootstrapper.Shutdown(ctx) + } + return errors.Join(err1, err2, err3, err4) +} + +func (e *Engine) HealthCheck(ctx context.Context) (interface{}, error) { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.avalancheBootstrapper.HealthCheck(ctx) + } + + state := e.ctx.State.Get() + + switch state.State { + case snow.StateSyncing: + return e.stateSyncer.HealthCheck(ctx) + case snow.NormalOp: + return e.snowman.HealthCheck(ctx) + case snow.Bootstrapping: + return e.bootstrapper.HealthCheck(ctx) + default: + return nil, errors.New("initializing") + } +} + +func (e *Engine) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.avalancheBootstrapper.Connected(ctx, nodeID, nodeVersion) + } + state := e.ctx.State.Get().State + switch state { + case snow.NormalOp: + return e.snowman.Connected(ctx, nodeID, nodeVersion) + case snow.Bootstrapping: + return e.bootstrapper.Connected(ctx, nodeID, nodeVersion) + case snow.StateSyncing: + return e.stateSyncer.Connected(ctx, nodeID, nodeVersion) + default: + return e.noopSnowmanEngine.Connected(ctx, nodeID, nodeVersion) + } +} + +func (e *Engine) Disconnected(ctx context.Context, nodeID ids.NodeID) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.avalancheBootstrapper.Disconnected(ctx, nodeID) + } + state := e.ctx.State.Get().State + switch state { + case snow.NormalOp: + return e.snowman.Disconnected(ctx, nodeID) + case snow.Bootstrapping: + return e.bootstrapper.Disconnected(ctx, nodeID) + case snow.StateSyncing: + return e.stateSyncer.Disconnected(ctx, nodeID) + default: + return e.noopSnowmanEngine.Disconnected(ctx, nodeID) + } +} + +func (e *Engine) Notify(ctx context.Context, message common.Message) error { + if e.ctx.State.Get().Type == p2p.EngineType_ENGINE_TYPE_AVALANCHE { + return e.avalancheBootstrapper.Notify(ctx, message) + } + state := e.ctx.State.Get().State + switch state { + case snow.NormalOp: + return e.snowman.Notify(ctx, message) + case snow.Bootstrapping: + return e.bootstrapper.Notify(ctx, message) + case snow.StateSyncing: + return e.stateSyncer.Notify(ctx, message) + default: + return e.noopSnowmanEngine.Notify(ctx, message) + } +} + +func (e *Engine) Start(ctx context.Context, startReqID uint32) error { + e.Log.Info("Starting state sync engine") + + state := e.setNoOpEngines() + + switch state.Type { + case p2p.EngineType_ENGINE_TYPE_AVALANCHE: + return e.startAvalancheBootstrapper(ctx, startReqID) + default: + switch state.State { + case snow.StateSyncing: + return e.startStateSyncer(ctx, startReqID) + case snow.NormalOp: + return e.startSnowman(ctx, startReqID) + case snow.Bootstrapping: + return e.startSnowBootstrapper(ctx, startReqID) + default: + return fmt.Errorf("invalid state: %s", state.State) + } + } +} + +func (e *Engine) setNoOpEngines() snow.EngineState { + state := e.ctx.State.Get() + + switch state.Type { + case p2p.EngineType_ENGINE_TYPE_AVALANCHE: + e.stateSyncer = e.noopStateSyncer + e.snowman = e.noopSnowmanEngine + e.bootstrapper = e.noopBootstrapper + default: + switch state.State { + case snow.StateSyncing: + e.bootstrapper = e.noopBootstrapper + e.snowman = e.noopSnowmanEngine + e.avalancheBootstrapper = e.noopBootstrapper + case snow.NormalOp: + e.bootstrapper = e.noopBootstrapper + e.stateSyncer = e.noopStateSyncer + e.avalancheBootstrapper = e.noopBootstrapper + case snow.Bootstrapping: + e.snowman = e.noopSnowmanEngine + e.stateSyncer = e.noopStateSyncer + e.avalancheBootstrapper = e.noopBootstrapper + } + } + return state +} + +func (e *Engine) startAvalancheBootstrapper(ctx context.Context, startReqID uint32) error { + e.ctx.State.Set(snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + State: snow.Bootstrapping, + }) + e.setNoOpEngines() + + syncer, err := e.ef.NewAvalancheSyncer(e.startSnowBootstrapper) + if err != nil { + return err + } + e.avalancheBootstrapper = syncer + return e.avalancheBootstrapper.Start(ctx, startReqID) +} + +func (e *Engine) startStateSyncer(ctx context.Context, startReqID uint32) error { + e.ctx.State.Set(snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }) + e.setNoOpEngines() + + syncer, err := e.ef.NewStateSyncer(e.startSnowBootstrapper) + if err != nil { + return err + } + e.stateSyncer = syncer + + if err := e.ef.ClearBootstrapDB(); err != nil { + return err + } + + return e.stateSyncer.Start(ctx, startReqID) +} + +func (e *Engine) startSnowBootstrapper(ctx context.Context, lastReqID uint32) error { + e.ctx.State.Set(snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }) + e.setNoOpEngines() + + bs, err := e.ef.NewSnowBootstrapper(e.startSnowman) + if err != nil { + return err + } + e.bootstrapper = bs + return e.bootstrapper.Start(ctx, lastReqID) +} + +func (e *Engine) startSnowman(ctx context.Context, lastReqID uint32) error { + e.ctx.State.Set(snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }) + e.setNoOpEngines() + + snowman, err := e.ef.NewSnowman() + if err != nil { + return err + } + e.snowman = snowman + return e.snowman.Start(ctx, lastReqID) +} diff --git a/snow/engine/unified/engine_test.go b/snow/engine/unified/engine_test.go new file mode 100644 index 000000000000..2b09601cbace --- /dev/null +++ b/snow/engine/unified/engine_test.go @@ -0,0 +1,936 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package unified_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/p2p" + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/enginetest" + "github.com/ava-labs/avalanchego/snow/engine/unified" + "github.com/ava-labs/avalanchego/snow/snowtest" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" + + mock "github.com/ava-labs/avalanchego/snow/engine/unified/mocks" +) + +type mockStateSyncer struct { + *enginetest.Engine +} + +func (*mockStateSyncer) Clear(_ context.Context) error { + return nil +} + +func (*mockStateSyncer) IsEnabled(_ context.Context) (bool, error) { + return true, nil +} + +type engineStates []snow.EngineState + +func (es engineStates) exclude(state snow.EngineState) []snow.EngineState { + result := make([]snow.EngineState, 0, 6) + for _, s := range es { + if s == state { + continue + } + result = append(result, s) + } + return result +} + +func (es engineStates) contains(state snow.EngineState) bool { + for _, s := range es { + if s == state { + return true + } + } + + return false +} + +var engineStateSpace = engineStates{ + { + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + { + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + { + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + { + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + State: snow.StateSyncing, + }, + { + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + State: snow.Bootstrapping, + }, + { + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + State: snow.NormalOp, + }, +} + +func TestEngine(t *testing.T) { + for _, testCase := range []struct { + name string + invoke func(e *unified.Engine) + setup func(engine *enginetest.Engine, invoked *bool) + state snow.EngineState + shouldBeIgnored bool + shouldBeRoutedToVM bool + method string + }{ + { + name: "pull query", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.PullQuery(context.Background(), ids.EmptyNodeID, 0, ids.Empty, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.PullQueryF = func(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID, _ uint64) error { + *invoked = true + return nil + } + }, + }, + { + name: "push query", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.PushQuery(context.Background(), ids.EmptyNodeID, 0, nil, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.PushQueryF = func(_ context.Context, _ ids.NodeID, _ uint32, _ []byte, _ uint64) error { + *invoked = true + return nil + } + }, + }, + { + name: "put", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.Put(context.Background(), ids.EmptyNodeID, 0, nil)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.PutF = func(_ context.Context, _ ids.NodeID, _ uint32, _ []byte) error { + *invoked = true + return nil + } + }, + }, + { + name: "get failed", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.GetFailed(context.Background(), ids.EmptyNodeID, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.GetFailedF = func(_ context.Context, _ ids.NodeID, _ uint32) error { + *invoked = true + return nil + } + }, + }, + { + name: "chits", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.Chits(context.Background(), ids.EmptyNodeID, 0, ids.Empty, ids.Empty, ids.Empty, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.ChitsF = func(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID, _ ids.ID, _ ids.ID, _ uint64) error { + *invoked = true + return nil + } + }, + }, + { + name: "query failed", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.QueryFailed(context.Background(), ids.EmptyNodeID, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.QueryFailedF = func(_ context.Context, _ ids.NodeID, _ uint32) error { + *invoked = true + return nil + } + }, + }, + { + name: "gossip", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.Gossip(context.Background())) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.GossipF = func(_ context.Context) error { + *invoked = true + return nil + } + }, + }, + { + name: "state summary frontier", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.StateSummaryFrontier(context.Background(), ids.EmptyNodeID, 0, nil)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.StateSummaryFrontierF = func(context.Context, ids.NodeID, uint32, []byte) error { + *invoked = true + return nil + } + }, + }, + { + name: "get state summary frontier failed", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.GetStateSummaryFrontierFailed(context.Background(), ids.EmptyNodeID, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.GetStateSummaryFrontierFailedF = func(context.Context, ids.NodeID, uint32) error { + *invoked = true + return nil + } + }, + }, + { + name: "accepted state summary", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.AcceptedStateSummary(context.Background(), ids.EmptyNodeID, 0, nil)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.AcceptedStateSummaryF = func(context.Context, ids.NodeID, uint32, set.Set[ids.ID]) error { + *invoked = true + return nil + } + }, + }, + { + name: "get accepted state summary failed", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.GetAcceptedStateSummaryFailed(context.Background(), ids.EmptyNodeID, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.GetAcceptedStateSummaryFailedF = func(context.Context, ids.NodeID, uint32) error { + *invoked = true + return nil + } + }, + }, + { + name: "accepted frontier", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.AcceptedFrontier(context.Background(), ids.EmptyNodeID, 0, ids.Empty)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.AcceptedFrontierF = func(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID) error { + *invoked = true + return nil + } + }, + }, + { + name: "get accepted frontier failed", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.GetAcceptedFrontierFailed(context.Background(), ids.EmptyNodeID, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.GetAcceptedFrontierFailedF = func(_ context.Context, _ ids.NodeID, _ uint32) error { + *invoked = true + return nil + } + }, + }, + { + name: "timeout", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.Timeout(context.Background())) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.TimeoutF = func(_ context.Context) error { + *invoked = true + return nil + } + }, + }, + { + name: "accepted", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.Accepted(context.Background(), ids.EmptyNodeID, 0, set.NewSet[ids.ID](0))) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.AcceptedF = func(_ context.Context, _ ids.NodeID, _ uint32, _ set.Set[ids.ID]) error { + *invoked = true + return nil + } + }, + }, + { + name: "get accepted failed", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.GetAcceptedFailed(context.Background(), ids.EmptyNodeID, 0)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.GetAcceptedFailedF = func(_ context.Context, _ ids.NodeID, _ uint32) error { + *invoked = true + return nil + } + }, + }, + { + name: "app request", + shouldBeRoutedToVM: true, + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_UNSPECIFIED, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.AppRequest(context.Background(), ids.EmptyNodeID, 0, time.Time{}, nil)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.AppRequestF = func(_ context.Context, _ ids.NodeID, _ uint32, _ time.Time, _ []byte) error { + *invoked = true + return nil + } + }, + }, + { + name: "app gossip", + shouldBeRoutedToVM: true, + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_UNSPECIFIED, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.AppGossip(context.Background(), ids.EmptyNodeID, nil)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.AppGossipF = func(_ context.Context, _ ids.NodeID, _ []byte) error { + *invoked = true + return nil + } + }, + }, + { + name: "app response", + shouldBeRoutedToVM: true, + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_UNSPECIFIED, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.AppResponse(context.Background(), ids.EmptyNodeID, 0, nil)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.AppResponseF = func(_ context.Context, _ ids.NodeID, _ uint32, _ []byte) error { + *invoked = true + return nil + } + }, + }, + { + name: "app request failed", + shouldBeRoutedToVM: true, + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_UNSPECIFIED, + State: snow.NormalOp, + }, + invoke: func(e *unified.Engine) { + require.NoError(t, e.AppRequestFailed(context.Background(), ids.EmptyNodeID, 0, nil)) + }, + setup: func(e *enginetest.Engine, invoked *bool) { + e.AppRequestFailedF = func(_ context.Context, _ ids.NodeID, _ uint32, _ *common.AppError) error { + *invoked = true + return nil + } + }, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + require := require.New(t) + + // Cartesian product of state and expectation + stateToShouldBeIgnored := make(map[snow.EngineState]bool) + + // We take the complement of the state space defined in the test case, + // and expect the inverse result. + states := engineStateSpace.exclude(testCase.state) + for _, state := range states { + stateToShouldBeIgnored[state] = !testCase.shouldBeIgnored + } + // In the end, we also add the actual test case, but only if it's a valid state + if engineStateSpace.contains(testCase.state) { + stateToShouldBeIgnored[testCase.state] = testCase.shouldBeIgnored + } + + for state, shouldBeIgnored := range stateToShouldBeIgnored { + var et enginetest.Engine + var invoked bool + + testCase.setup(&et, &invoked) + + var mockSS mockStateSyncer + mockSS.Engine = &et + + ctrl := gomock.NewController(t) + ef := mock.NewFactory(ctrl) + ef.EXPECT().AllGetServer().Return(&et).AnyTimes() + ef.EXPECT().NewAvalancheAncestorsGetter().Return(&et).AnyTimes() + ef.EXPECT().NewSnowman().Return(&et, nil).AnyTimes() + ef.EXPECT().NewStateSyncer(gomock.Any()).Return(&mockSS, nil).AnyTimes() + ef.EXPECT().NewAvalancheSyncer(gomock.Any()).Return(&mockSS, nil).AnyTimes() + ef.EXPECT().NewSnowBootstrapper(gomock.Any()).Return(&mockSS, nil).AnyTimes() + ef.EXPECT().ClearBootstrapDB().Return(nil).AnyTimes() + + snowCtx := snowtest.Context(t, snowtest.CChainID) + ctx := snowtest.ConsensusContext(snowCtx) + ctx.State.Set(state) + + var vm enginetest.VM + routedToVM := configureVMRouting(&vm) + + engine, err := unified.EngineFromEngines(ctx, ef, &vm) + require.NoError(err) + + require.NoError(engine.Start(context.Background(), 0)) + + testCase.invoke(engine) + + fmt.Println(shouldBeIgnored, state) + + ignored := !invoked + require.Equal(shouldBeIgnored, ignored) + require.Equal(testCase.shouldBeRoutedToVM, *routedToVM) + } + }) + } +} + +func TestEngineDispatch(t *testing.T) { + for _, testCase := range []struct { + name string + method string + instance string + state snow.EngineState + shouldBeIgnored bool + }{ + { + name: "ancestors x avalanche", + method: "Ancestors", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + State: snow.Bootstrapping, + }, + }, + { + name: "ancestors x bootstrapper", + method: "Ancestors", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "ancestors x state sync", + method: "Ancestors", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + shouldBeIgnored: true, + }, + { + name: "getAncestorsFailed x avalanche", + method: "GetAncestorsFailed", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + State: snow.Bootstrapping, + }, + }, + { + name: "getAncestorsFailed x bootstrapper", + method: "GetAncestorsFailed", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "getAncestorsFailed x state sync", + method: "GetAncestorsFailed", + instance: "state-syncer", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + shouldBeIgnored: true, + }, + { + name: "connected x bootstrapper", + method: "Connected", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "connected x state-syncer", + method: "Connected", + instance: "state-syncer", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + }, + { + name: "connected x snowman", + method: "Connected", + instance: "snowman", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + }, + { + name: "connected x avalanche", + method: "Connected", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + }, + }, + { + name: "disconnected x bootstrapper", + method: "Disconnected", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "disconnected x state-syncer", + method: "Disconnected", + instance: "state-syncer", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + }, + { + name: "disconnected x snowman", + method: "Disconnected", + instance: "snowman", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + }, + { + name: "disconnected x avalanche", + method: "Disconnected", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + }, + }, + { + name: "notify x bootstrapper", + method: "Notify", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "notify x state-syncer", + method: "Notify", + instance: "state-syncer", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + }, + { + name: "notify x snowman", + method: "Notify", + instance: "snowman", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + }, + { + name: "notify x avalanche", + method: "Notify", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + }, + }, + { + name: "healthcheck x bootstrapper", + method: "HealthCheck", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "healthcheck x state-syncer", + method: "HealthCheck", + instance: "state-syncer", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + }, + { + name: "healthcheck x snowman", + method: "HealthCheck", + instance: "snowman", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + }, + { + name: "healthcheck x avalanche", + method: "HealthCheck", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + }, + }, + { + name: "getAncestors x avalanche", + method: "GetAncestors(avalanche)", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + }, + }, + { + name: "getAncestors x avalanche", + method: "GetAncestors(snowman)", + instance: "all-get-server", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "shutdown x bootstrapper", + method: "Shutdown", + instance: "bootstrapper", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.Bootstrapping, + }, + }, + { + name: "shutdown x state-syncer", + method: "Shutdown", + instance: "state-syncer", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.StateSyncing, + }, + }, + { + name: "shutdown x snowman", + method: "Shutdown", + instance: "snowman", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }, + }, + { + name: "shutdown x avalanche", + method: "Shutdown", + instance: "avalanche", + state: snow.EngineState{ + Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + }, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + var sm enginetest.Engine + var ss enginetest.Engine + var bs enginetest.Engine + var as enginetest.Engine + var gs enginetest.Engine + + var getServer mockStateSyncer + getServer.Engine = &gs + + var snowman mockStateSyncer + snowman.Engine = &sm + + var stateSyncer mockStateSyncer + stateSyncer.Engine = &ss + + var bootstrapper mockStateSyncer + bootstrapper.Engine = &bs + + var avalancheSyncer mockStateSyncer + avalancheSyncer.Engine = &as + + // {avalanche | bootstrapper | state-syncer | snowman} x method --> struct{} + // registers that a method has been invoked on a given backend instance + invokedTable := setupInvocationTables(avalancheSyncer, bootstrapper, stateSyncer, snowman, getServer) + + ef := createEngineFactory(t, gs, as, sm, stateSyncer, avalancheSyncer, bootstrapper) + + snowCtx := snowtest.Context(t, snowtest.CChainID) + ctx := snowtest.ConsensusContext(snowCtx) + ctx.State.Set(testCase.state) + + var vm enginetest.VM + configureVMRouting(&vm) + + engine, err := unified.EngineFromEngines(ctx, ef, &vm) + require.NoError(t, err) + + // method --> func(*unified.Engine), + // executes a requested method of the engine instance + invocationMap := createEngineInvocationMap(t) + + require.NoError(t, engine.Start(context.Background(), 0)) + + // Ensure invoked table is empty before invocation + require.Empty(t, invokedTable) + // Invoke the method as registered in the invocation map + invocationMap[testCase.method](engine) + // Ensure it was routed to the correct engine unless it should have been ignored + if testCase.shouldBeIgnored { + require.Empty(t, invokedTable) + } else { + _, ok := invokedTable[testCase.instance+testCase.method] + require.True(t, ok) + } + }) + } +} + +func setupInvocationTables(avalancheSyncer mockStateSyncer, bootstrapper mockStateSyncer, stateSyncer mockStateSyncer, snowman mockStateSyncer, getServer mockStateSyncer) map[string]struct{} { + invokedTable := map[string]struct{}{} + setupInvokeTable(&avalancheSyncer, invokedTable, "avalanche") + setupInvokeTable(&bootstrapper, invokedTable, "bootstrapper") + setupInvokeTable(&stateSyncer, invokedTable, "state-syncer") + setupInvokeTable(&snowman, invokedTable, "snowman") + setupInvokeTable(&getServer, invokedTable, "all-get-server") + return invokedTable +} + +func createEngineFactory(t *testing.T, gs enginetest.Engine, as enginetest.Engine, sm enginetest.Engine, stateSyncer mockStateSyncer, avalancheSyncer mockStateSyncer, bootstrapper mockStateSyncer) *mock.Factory { + ctrl := gomock.NewController(t) + ef := mock.NewFactory(ctrl) + ef.EXPECT().AllGetServer().Return(&gs).AnyTimes() + ef.EXPECT().NewAvalancheAncestorsGetter().Return(&as).AnyTimes() + ef.EXPECT().NewSnowman().Return(&sm, nil).AnyTimes() + ef.EXPECT().NewStateSyncer(gomock.Any()).Return(&stateSyncer, nil).AnyTimes() + ef.EXPECT().NewAvalancheSyncer(gomock.Any()).Return(&avalancheSyncer, nil).AnyTimes() + ef.EXPECT().NewSnowBootstrapper(gomock.Any()).Return(&bootstrapper, nil).AnyTimes() + ef.EXPECT().ClearBootstrapDB().Return(nil).AnyTimes() + return ef +} + +func createEngineInvocationMap(t *testing.T) map[string]func(*unified.Engine) { + m := make(map[string]func(*unified.Engine)) + + m["Ancestors"] = func(e *unified.Engine) { + require.NoError(t, e.Ancestors(context.Background(), ids.EmptyNodeID, 0, nil)) + } + + m["GetAncestorsFailed"] = func(e *unified.Engine) { + require.NoError(t, e.GetAncestorsFailed(context.Background(), ids.EmptyNodeID, 0)) + } + + m["Connected"] = func(e *unified.Engine) { + require.NoError(t, e.Connected(context.Background(), ids.EmptyNodeID, nil)) + } + + m["Disconnected"] = func(e *unified.Engine) { + require.NoError(t, e.Disconnected(context.Background(), ids.EmptyNodeID)) + } + + m["Notify"] = func(e *unified.Engine) { + require.NoError(t, e.Notify(context.Background(), 0)) + } + + m["HealthCheck"] = func(e *unified.Engine) { + _, err := e.HealthCheck(context.Background()) + require.NoError(t, err) + } + + m["GetAncestors(avalanche)"] = func(e *unified.Engine) { + require.NoError(t, e.GetAncestors(context.Background(), ids.EmptyNodeID, 0, ids.Empty, p2p.EngineType_ENGINE_TYPE_AVALANCHE)) + } + + m["GetAncestors(snowman)"] = func(e *unified.Engine) { + require.NoError(t, e.GetAncestors(context.Background(), ids.EmptyNodeID, 0, ids.Empty, p2p.EngineType_ENGINE_TYPE_SNOWMAN)) + } + + m["Shutdown"] = func(e *unified.Engine) { + require.NoError(t, e.Shutdown(context.Background())) + } + + return m +} + +func setupInvokeTable(ss *mockStateSyncer, invokeTable map[string]struct{}, instance string) { + ss.AncestorsF = func(context.Context, ids.NodeID, uint32, [][]byte) error { + invokeTable[instance+"Ancestors"] = struct{}{} + return nil + } + + ss.GetAncestorsFailedF = func(context.Context, ids.NodeID, uint32) error { + invokeTable[instance+"GetAncestorsFailed"] = struct{}{} + return nil + } + + ss.ConnectedF = func(context.Context, ids.NodeID, *version.Application) error { + invokeTable[instance+"Connected"] = struct{}{} + return nil + } + + ss.DisconnectedF = func(context.Context, ids.NodeID) error { + invokeTable[instance+"Disconnected"] = struct{}{} + return nil + } + + ss.NotifyF = func(context.Context, common.Message) error { + invokeTable[instance+"Notify"] = struct{}{} + return nil + } + + ss.HealthF = func(context.Context) (interface{}, error) { + invokeTable[instance+"HealthCheck"] = struct{}{} + return nil, nil + } + + ss.GetAncestorsF = func(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID, engineType p2p.EngineType) error { + switch engineType { + case p2p.EngineType_ENGINE_TYPE_SNOWMAN: + invokeTable[instance+"GetAncestors(snowman)"] = struct{}{} + case p2p.EngineType_ENGINE_TYPE_AVALANCHE: + invokeTable[instance+"GetAncestors(avalanche)"] = struct{}{} + } + return nil + } + + ss.ShutdownF = func(context.Context) error { + invokeTable[instance+"Shutdown"] = struct{}{} + return nil + } +} + +func configureVMRouting(vm *enginetest.VM) *bool { + var routedToVM bool + vm.AppRequestF = func(context.Context, ids.NodeID, uint32, time.Time, []byte) error { + routedToVM = true + return nil + } + + vm.AppRequestFailedF = func(context.Context, ids.NodeID, uint32, *common.AppError) error { + routedToVM = true + return nil + } + + vm.AppGossipF = func(context.Context, ids.NodeID, []byte) error { + routedToVM = true + return nil + } + + vm.AppResponseF = func(context.Context, ids.NodeID, uint32, []byte) error { + routedToVM = true + return nil + } + return &routedToVM +} diff --git a/snow/engine/unified/factory.go b/snow/engine/unified/factory.go new file mode 100644 index 000000000000..62d39d3d5d53 --- /dev/null +++ b/snow/engine/unified/factory.go @@ -0,0 +1,141 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package unified + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/snowman" + "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap" + "github.com/ava-labs/avalanchego/snow/engine/snowman/syncer" + "github.com/ava-labs/avalanchego/trace" + "github.com/ava-labs/avalanchego/utils/logging" + + avbootstrap "github.com/ava-labs/avalanchego/snow/engine/avalanche/bootstrap" +) + +type EngineFactory struct { + TracingEnabled bool + Tracer trace.Tracer + StateSync bool + Logger logging.Logger + StateSyncConfig syncer.Config + BootConfig bootstrap.Config + SnowmanConfig snowman.Config + GetServer common.AllGetsServer + AvaBootConfig avbootstrap.Config + AvaAncestorGetter common.GetAncestorsHandler + AvaMetrics prometheus.Registerer +} + +func (ef *EngineFactory) ClearBootstrapDB() error { + return database.AtomicClear(ef.BootConfig.DB, ef.BootConfig.DB) +} + +func (ef *EngineFactory) NewAvalancheAncestorsGetter() common.GetAncestorsHandler { + return ef.AvaAncestorGetter +} + +func (ef *EngineFactory) AllGetServer() common.AllGetsServer { + return ef.GetServer +} + +func (ef *EngineFactory) HasStateSync() bool { + return ef.StateSync +} + +func (ef *EngineFactory) NewStateSyncer(f OnFinishedFunc) (common.StateSyncer, error) { + stateSyncer := syncer.New(ef.StateSyncConfig, f) + + if ef.TracingEnabled { + stateSyncer = &tracedStateSyncer{ + Enabler: common.NewTracedIsEnabled(stateSyncer, ef.Tracer), + StateSyncer: stateSyncer, + } + } + + return stateSyncer, nil +} + +func (ef *EngineFactory) NewAvalancheSyncer(f OnFinishedFunc) (common.AvalancheBootstrapableEngine, error) { + var avalancheBootstrapper common.AvalancheBootstrapableEngine + var err error + + avalancheBootstrapper, err = avbootstrap.New( + ef.AvaBootConfig, + f, + ef.AvaMetrics, + ) + if err != nil { + ef.Logger.Fatal("error initializing avalanche bootstrapper:", zap.Error(err)) + return nil, err + } + + if ef.TracingEnabled { + avalancheBootstrapper = &tracedClearer{ + Clearer: common.NewTracedClearer(avalancheBootstrapper, ef.Tracer), + AvalancheBootstrapableEngine: avalancheBootstrapper, + } + } + + return avalancheBootstrapper, nil +} + +func (ef *EngineFactory) NewSnowman() (common.ConsensusEngine, error) { + snowmanEngine, err := snowman.New(ef.SnowmanConfig) + if err != nil { + ef.Logger.Fatal("error initializing snowman engine:", zap.Error(err)) + return nil, err + } + return snowmanEngine, nil +} + +func (ef *EngineFactory) NewSnowBootstrapper(f OnFinishedFunc) (common.BootstrapableEngine, error) { + var bootstrapper common.BootstrapableEngine + var err error + bootstrapper, err = bootstrap.New( + ef.BootConfig, + f, + ) + if err != nil { + ef.Logger.Fatal("error initializing snowman bootstrapper:", zap.Error(err)) + return nil, err + } + + if ef.TracingEnabled { + bootstrapper = &tracedClearer{ + Clearer: common.NewTracedClearer(bootstrapper, ef.Tracer), + AvalancheBootstrapableEngine: bootstrapper, + AcceptedHandler: bootstrapper, + AcceptedFrontierHandler: bootstrapper, + } + } + + return bootstrapper, nil +} + +type tracedStateSyncer struct { + common.Enabler + common.StateSyncer +} + +func (tss *tracedStateSyncer) IsEnabled(ctx context.Context) (bool, error) { + return tss.Enabler.IsEnabled(ctx) +} + +type tracedClearer struct { + common.Clearer + common.AvalancheBootstrapableEngine + common.AcceptedFrontierHandler + common.AcceptedHandler +} + +func (tc *tracedClearer) Clear(ctx context.Context) error { + return tc.Clearer.Clear(ctx) +} diff --git a/snow/engine/unified/factory_test.go b/snow/engine/unified/factory_test.go new file mode 100644 index 000000000000..51aaf28a37dd --- /dev/null +++ b/snow/engine/unified/factory_test.go @@ -0,0 +1,94 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package unified_test + +import ( + "context" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/snowman" + "github.com/ava-labs/avalanchego/snow/engine/snowman/syncer" + "github.com/ava-labs/avalanchego/snow/engine/unified" + "github.com/ava-labs/avalanchego/snow/snowtest" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/trace" + + avbootstrap "github.com/ava-labs/avalanchego/snow/engine/avalanche/bootstrap" + smbootstrap "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap" +) + +func TestFactory(t *testing.T) { + ctx := snowtest.Context(t, snowtest.PChainID) + + vals := validators.NewManager() + metrics := prometheus.NewRegistry() + + snowConfig := snowman.Config{ + Validators: vals, + Ctx: &snow.ConsensusContext{ + Registerer: metrics, + Context: ctx, + }, + } + + bootConfig := smbootstrap.Config{ + Ctx: &snow.ConsensusContext{ + Registerer: metrics, + Context: ctx, + }, + } + + var getServer mockStateSyncer + var avaAncestorGetter mockStateSyncer + + ef := &unified.EngineFactory{ + TracingEnabled: true, + Tracer: trace.Noop, + GetServer: &getServer, + AvaAncestorGetter: &avaAncestorGetter, + AvaMetrics: metrics, + AvaBootConfig: avbootstrap.Config{ + Ctx: &snow.ConsensusContext{ + Context: ctx, + Registerer: metrics, + }, + }, + Logger: ctx.Log, + BootConfig: bootConfig, + SnowmanConfig: snowConfig, + StateSyncConfig: syncer.Config{}, + } + + snowman, err := ef.NewSnowman() + require.NoError(t, err) + require.NotNil(t, snowman) + + ss, err := ef.NewStateSyncer(func(_ context.Context, _ uint32) error { + return nil + }) + require.NoError(t, err) + require.NotNil(t, ss) + + avaBoot, err := ef.NewAvalancheSyncer(func(_ context.Context, _ uint32) error { + return nil + }) + require.NoError(t, err) + require.NotNil(t, avaBoot) + + bs, err := ef.NewSnowBootstrapper(func(_ context.Context, _ uint32) error { + return nil + }) + require.NoError(t, err) + require.NotNil(t, bs) + + gs := ef.AllGetServer() + require.Equal(t, gs, &getServer) + + ag := ef.NewAvalancheAncestorsGetter() + require.Equal(t, ag, &avaAncestorGetter) +} diff --git a/snow/engine/unified/mocks/factory.go b/snow/engine/unified/mocks/factory.go new file mode 100644 index 000000000000..72be030aec7d --- /dev/null +++ b/snow/engine/unified/mocks/factory.go @@ -0,0 +1,157 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ava-labs/avalanchego/snow/engine/unified (interfaces: Factory) +// +// Generated by this command: +// +// mockgen -package=mocks -destination=snow/engine/unified/mocks/factory.go -mock_names=Factory=Factory github.com/ava-labs/avalanchego/snow/engine/unified Factory +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + common "github.com/ava-labs/avalanchego/snow/engine/common" + unified "github.com/ava-labs/avalanchego/snow/engine/unified" + gomock "go.uber.org/mock/gomock" +) + +// Factory is a mock of Factory interface. +type Factory struct { + ctrl *gomock.Controller + recorder *FactoryMockRecorder +} + +// FactoryMockRecorder is the mock recorder for Factory. +type FactoryMockRecorder struct { + mock *Factory +} + +// NewFactory creates a new mock instance. +func NewFactory(ctrl *gomock.Controller) *Factory { + mock := &Factory{ctrl: ctrl} + mock.recorder = &FactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Factory) EXPECT() *FactoryMockRecorder { + return m.recorder +} + +// AllGetServer mocks base method. +func (m *Factory) AllGetServer() common.AllGetsServer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllGetServer") + ret0, _ := ret[0].(common.AllGetsServer) + return ret0 +} + +// AllGetServer indicates an expected call of AllGetServer. +func (mr *FactoryMockRecorder) AllGetServer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllGetServer", reflect.TypeOf((*Factory)(nil).AllGetServer)) +} + +// ClearBootstrapDB mocks base method. +func (m *Factory) ClearBootstrapDB() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ClearBootstrapDB") + ret0, _ := ret[0].(error) + return ret0 +} + +// ClearBootstrapDB indicates an expected call of ClearBootstrapDB. +func (mr *FactoryMockRecorder) ClearBootstrapDB() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearBootstrapDB", reflect.TypeOf((*Factory)(nil).ClearBootstrapDB)) +} + +// HasStateSync mocks base method. +func (m *Factory) HasStateSync() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasStateSync") + ret0, _ := ret[0].(bool) + return ret0 +} + +// HasStateSync indicates an expected call of HasStateSync. +func (mr *FactoryMockRecorder) HasStateSync() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasStateSync", reflect.TypeOf((*Factory)(nil).HasStateSync)) +} + +// NewAvalancheAncestorsGetter mocks base method. +func (m *Factory) NewAvalancheAncestorsGetter() common.GetAncestorsHandler { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewAvalancheAncestorsGetter") + ret0, _ := ret[0].(common.GetAncestorsHandler) + return ret0 +} + +// NewAvalancheAncestorsGetter indicates an expected call of NewAvalancheAncestorsGetter. +func (mr *FactoryMockRecorder) NewAvalancheAncestorsGetter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewAvalancheAncestorsGetter", reflect.TypeOf((*Factory)(nil).NewAvalancheAncestorsGetter)) +} + +// NewAvalancheSyncer mocks base method. +func (m *Factory) NewAvalancheSyncer(arg0 unified.OnFinishedFunc) (common.AvalancheBootstrapableEngine, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewAvalancheSyncer", arg0) + ret0, _ := ret[0].(common.AvalancheBootstrapableEngine) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewAvalancheSyncer indicates an expected call of NewAvalancheSyncer. +func (mr *FactoryMockRecorder) NewAvalancheSyncer(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewAvalancheSyncer", reflect.TypeOf((*Factory)(nil).NewAvalancheSyncer), arg0) +} + +// NewSnowBootstrapper mocks base method. +func (m *Factory) NewSnowBootstrapper(arg0 unified.OnFinishedFunc) (common.BootstrapableEngine, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewSnowBootstrapper", arg0) + ret0, _ := ret[0].(common.BootstrapableEngine) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewSnowBootstrapper indicates an expected call of NewSnowBootstrapper. +func (mr *FactoryMockRecorder) NewSnowBootstrapper(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewSnowBootstrapper", reflect.TypeOf((*Factory)(nil).NewSnowBootstrapper), arg0) +} + +// NewSnowman mocks base method. +func (m *Factory) NewSnowman() (common.ConsensusEngine, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewSnowman") + ret0, _ := ret[0].(common.ConsensusEngine) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewSnowman indicates an expected call of NewSnowman. +func (mr *FactoryMockRecorder) NewSnowman() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewSnowman", reflect.TypeOf((*Factory)(nil).NewSnowman)) +} + +// NewStateSyncer mocks base method. +func (m *Factory) NewStateSyncer(arg0 unified.OnFinishedFunc) (common.StateSyncer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewStateSyncer", arg0) + ret0, _ := ret[0].(common.StateSyncer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewStateSyncer indicates an expected call of NewStateSyncer. +func (mr *FactoryMockRecorder) NewStateSyncer(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewStateSyncer", reflect.TypeOf((*Factory)(nil).NewStateSyncer), arg0) +} diff --git a/snow/networking/handler/engine.go b/snow/networking/handler/engine.go index e3de84ac8989..515739f211a2 100644 --- a/snow/networking/handler/engine.go +++ b/snow/networking/handler/engine.go @@ -5,7 +5,6 @@ package handler import ( "github.com/ava-labs/avalanchego/proto/pb/p2p" - "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" ) @@ -18,7 +17,7 @@ type Engine struct { // Get returns the engine corresponding to the provided state, // and whether its corresponding engine is initialized (not nil). -func (e *Engine) Get(state snow.State) (common.Engine, bool) { +/*func (e *Engine) Get(state snow.State) (common.Engine, bool) { if e == nil { return nil, false } @@ -32,7 +31,7 @@ func (e *Engine) Get(state snow.State) (common.Engine, bool) { default: return nil, false } -} +}*/ // EngineManager resolves the engine that should be used given the current // execution context of the chain. diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 224e0abc51b6..bad9dc1be4ea 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -5,7 +5,6 @@ package handler import ( "context" - "errors" "fmt" "sync" "sync/atomic" @@ -20,7 +19,6 @@ import ( "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/message" - "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/networking/tracker" @@ -29,6 +27,7 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/timer/mockable" + "github.com/ava-labs/avalanchego/version" p2ppb "github.com/ava-labs/avalanchego/proto/pb/p2p" commontracker "github.com/ava-labs/avalanchego/snow/engine/common/tracker" @@ -41,12 +40,7 @@ const ( syncProcessingTimeWarnLimit = 30 * time.Second ) -var ( - _ Handler = (*handler)(nil) - - errMissingEngine = errors.New("missing engine") - errNoStartingGear = errors.New("failed to select starting gear") -) +var _ Handler = (*handler)(nil) type Handler interface { common.Timer @@ -58,8 +52,7 @@ type Handler interface { // this chain, the message should be dropped. ShouldHandle(nodeID ids.NodeID) bool - SetEngineManager(engineManager *EngineManager) - GetEngineManager() *EngineManager + SetEngine(engine common.Engine) SetOnStopped(onStopped func()) Start(ctx context.Context, recoverPanic bool) @@ -93,7 +86,7 @@ type handler struct { preemptTimeouts chan struct{} gossipFrequency time.Duration - engineManager *EngineManager + engine common.Engine // onStopped is called in a goroutine when this handler finishes shutting // down. If it is nil then it is skipped. @@ -124,7 +117,15 @@ type handler struct { // Tracks the peers that are currently connected to this subnet peerTracker commontracker.Peers - p2pTracker *p2p.PeerTracker + p2pTracker peerTracker +} + +type peerTracker interface { + Connected( + nodeID ids.NodeID, + nodeVersion *version.Application, + ) + Disconnected(nodeID ids.NodeID) } // Initialize this consensus handler @@ -138,7 +139,7 @@ func New( resourceTracker tracker.ResourceTracker, subnet subnets.Subnet, peerTracker commontracker.Peers, - p2pTracker *p2p.PeerTracker, + p2pTracker peerTracker, reg prometheus.Registerer, haltBootstrapping func(), ) (Handler, error) { @@ -200,53 +201,17 @@ func (h *handler) ShouldHandle(nodeID ids.NodeID) bool { return h.subnet.IsAllowed(nodeID, ok) } -func (h *handler) SetEngineManager(engineManager *EngineManager) { - h.engineManager = engineManager -} - -func (h *handler) GetEngineManager() *EngineManager { - return h.engineManager +func (h *handler) SetEngine(engine common.Engine) { + h.engine = engine } func (h *handler) SetOnStopped(onStopped func()) { h.onStopped = onStopped } -func (h *handler) selectStartingGear(ctx context.Context) (common.Engine, error) { - state := h.ctx.State.Get() - engines := h.engineManager.Get(state.Type) - if engines == nil { - return nil, errNoStartingGear - } - if engines.StateSyncer == nil { - return engines.Bootstrapper, nil - } - - stateSyncEnabled, err := engines.StateSyncer.IsEnabled(ctx) - if err != nil { - return nil, err - } - - if !stateSyncEnabled { - return engines.Bootstrapper, nil - } - - // drop bootstrap state from previous runs before starting state sync - return engines.StateSyncer, engines.Bootstrapper.Clear(ctx) -} - func (h *handler) Start(ctx context.Context, recoverPanic bool) { - gear, err := h.selectStartingGear(ctx) - if err != nil { - h.ctx.Log.Error("chain failed to select starting gear", - zap.Error(err), - ) - h.shutdown(ctx, h.clock.Time()) - return - } - h.ctx.Lock.Lock() - err = gear.Start(ctx, 0) + err := h.engine.Start(ctx, 0) h.ctx.Lock.Unlock() if err != nil { h.ctx.Log.Error("chain failed to start", @@ -511,37 +476,6 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { return nil } - var engineType p2ppb.EngineType - switch msg.EngineType { - case p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, p2ppb.EngineType_ENGINE_TYPE_SNOWMAN: - // The peer is requesting an engine type that has been initialized, so - // we should attempt to honor the request. - engineType = msg.EngineType - default: - // Note: [msg.EngineType] may have been provided by the peer as an - // invalid option. I.E. not one of AVALANCHE, SNOWMAN, or UNSPECIFIED. - // In this case, we treat the value the same way as UNSPECIFIED. - // - // If the peer didn't request a specific engine type, we default to the - // current engine. - engineType = currentState.Type - } - - engine, ok := h.engineManager.Get(engineType).Get(currentState.State) - if !ok { - // This should only happen if the peer is not following the protocol. - // This can happen if the chain only has a Snowman engine and the peer - // requested an Avalanche engine handle the message. - h.ctx.Log.Debug("dropping sync message", - zap.String("reason", "uninitialized engine state"), - zap.String("messageOp", op), - zap.Stringer("currentEngineType", currentState.Type), - zap.Stringer("requestedEngineType", msg.EngineType), - zap.Stringer("engineState", currentState.State), - ) - return nil - } - // Invariant: Response messages can never be dropped here. This is because // the timeout has already been cleared. This means the engine // should be invoked with a failure message if parsing of the @@ -549,16 +483,16 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { switch msg := body.(type) { // State messages should always be sent to the snowman engine case *p2ppb.GetStateSummaryFrontier: - return engine.GetStateSummaryFrontier(ctx, nodeID, msg.RequestId) + return h.engine.GetStateSummaryFrontier(ctx, nodeID, msg.RequestId) case *p2ppb.StateSummaryFrontier: - return engine.StateSummaryFrontier(ctx, nodeID, msg.RequestId, msg.Summary) + return h.engine.StateSummaryFrontier(ctx, nodeID, msg.RequestId, msg.Summary) case *message.GetStateSummaryFrontierFailed: - return engine.GetStateSummaryFrontierFailed(ctx, nodeID, msg.RequestID) + return h.engine.GetStateSummaryFrontierFailed(ctx, nodeID, msg.RequestID) case *p2ppb.GetAcceptedStateSummary: - return engine.GetAcceptedStateSummary( + return h.engine.GetAcceptedStateSummary( ctx, nodeID, msg.RequestId, @@ -575,18 +509,18 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { zap.String("field", "SummaryIDs"), zap.Error(err), ) - return engine.GetAcceptedStateSummaryFailed(ctx, nodeID, msg.RequestId) + return h.engine.GetAcceptedStateSummaryFailed(ctx, nodeID, msg.RequestId) } - return engine.AcceptedStateSummary(ctx, nodeID, msg.RequestId, summaryIDs) + return h.engine.AcceptedStateSummary(ctx, nodeID, msg.RequestId, summaryIDs) case *message.GetAcceptedStateSummaryFailed: - return engine.GetAcceptedStateSummaryFailed(ctx, nodeID, msg.RequestID) + return h.engine.GetAcceptedStateSummaryFailed(ctx, nodeID, msg.RequestID) // Bootstrapping messages may be forwarded to either avalanche or snowman - // engines, depending on the EngineType field + // h.engines, depending on the h.h.engineType field case *p2ppb.GetAcceptedFrontier: - return engine.GetAcceptedFrontier(ctx, nodeID, msg.RequestId) + return h.engine.GetAcceptedFrontier(ctx, nodeID, msg.RequestId) case *p2ppb.AcceptedFrontier: containerID, err := ids.ToID(msg.ContainerId) @@ -598,13 +532,13 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { zap.String("field", "ContainerID"), zap.Error(err), ) - return engine.GetAcceptedFrontierFailed(ctx, nodeID, msg.RequestId) + return h.engine.GetAcceptedFrontierFailed(ctx, nodeID, msg.RequestId) } - return engine.AcceptedFrontier(ctx, nodeID, msg.RequestId, containerID) + return h.engine.AcceptedFrontier(ctx, nodeID, msg.RequestId, containerID) case *message.GetAcceptedFrontierFailed: - return engine.GetAcceptedFrontierFailed(ctx, nodeID, msg.RequestID) + return h.engine.GetAcceptedFrontierFailed(ctx, nodeID, msg.RequestID) case *p2ppb.GetAccepted: containerIDs, err := getIDs(msg.ContainerIds) @@ -619,7 +553,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { return nil } - return engine.GetAccepted(ctx, nodeID, msg.RequestId, containerIDs) + return h.engine.GetAccepted(ctx, nodeID, msg.RequestId, containerIDs) case *p2ppb.Accepted: containerIDs, err := getIDs(msg.ContainerIds) @@ -631,13 +565,13 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { zap.String("field", "ContainerIDs"), zap.Error(err), ) - return engine.GetAcceptedFailed(ctx, nodeID, msg.RequestId) + return h.engine.GetAcceptedFailed(ctx, nodeID, msg.RequestId) } - return engine.Accepted(ctx, nodeID, msg.RequestId, containerIDs) + return h.engine.Accepted(ctx, nodeID, msg.RequestId, containerIDs) case *message.GetAcceptedFailed: - return engine.GetAcceptedFailed(ctx, nodeID, msg.RequestID) + return h.engine.GetAcceptedFailed(ctx, nodeID, msg.RequestID) case *p2ppb.GetAncestors: containerID, err := ids.ToID(msg.ContainerId) @@ -652,13 +586,13 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { return nil } - return engine.GetAncestors(ctx, nodeID, msg.RequestId, containerID) + return h.engine.GetAncestors(ctx, nodeID, msg.RequestId, containerID, msg.EngineType) case *message.GetAncestorsFailed: - return engine.GetAncestorsFailed(ctx, nodeID, msg.RequestID) + return h.engine.GetAncestorsFailed(ctx, nodeID, msg.RequestID) case *p2ppb.Ancestors: - return engine.Ancestors(ctx, nodeID, msg.RequestId, msg.Containers) + return h.engine.Ancestors(ctx, nodeID, msg.RequestId, msg.Containers) case *p2ppb.Get: containerID, err := ids.ToID(msg.ContainerId) @@ -673,16 +607,16 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { return nil } - return engine.Get(ctx, nodeID, msg.RequestId, containerID) + return h.engine.Get(ctx, nodeID, msg.RequestId, containerID) case *message.GetFailed: - return engine.GetFailed(ctx, nodeID, msg.RequestID) + return h.engine.GetFailed(ctx, nodeID, msg.RequestID) case *p2ppb.Put: - return engine.Put(ctx, nodeID, msg.RequestId, msg.Container) + return h.engine.Put(ctx, nodeID, msg.RequestId, msg.Container) case *p2ppb.PushQuery: - return engine.PushQuery(ctx, nodeID, msg.RequestId, msg.Container, msg.RequestedHeight) + return h.engine.PushQuery(ctx, nodeID, msg.RequestId, msg.Container, msg.RequestedHeight) case *p2ppb.PullQuery: containerID, err := ids.ToID(msg.ContainerId) @@ -697,7 +631,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { return nil } - return engine.PullQuery(ctx, nodeID, msg.RequestId, containerID, msg.RequestedHeight) + return h.engine.PullQuery(ctx, nodeID, msg.RequestId, containerID, msg.RequestedHeight) case *p2ppb.Chits: preferredID, err := ids.ToID(msg.PreferredId) @@ -709,7 +643,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { zap.String("field", "PreferredID"), zap.Error(err), ) - return engine.QueryFailed(ctx, nodeID, msg.RequestId) + return h.engine.QueryFailed(ctx, nodeID, msg.RequestId) } preferredIDAtHeight, err := ids.ToID(msg.PreferredIdAtHeight) @@ -721,7 +655,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { zap.String("field", "PreferredIDAtHeight"), zap.Error(err), ) - return engine.QueryFailed(ctx, nodeID, msg.RequestId) + return h.engine.QueryFailed(ctx, nodeID, msg.RequestId) } acceptedID, err := ids.ToID(msg.AcceptedId) @@ -733,13 +667,13 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { zap.String("field", "AcceptedID"), zap.Error(err), ) - return engine.QueryFailed(ctx, nodeID, msg.RequestId) + return h.engine.QueryFailed(ctx, nodeID, msg.RequestId) } - return engine.Chits(ctx, nodeID, msg.RequestId, preferredID, preferredIDAtHeight, acceptedID, msg.AcceptedHeight) + return h.engine.Chits(ctx, nodeID, msg.RequestId, preferredID, preferredIDAtHeight, acceptedID, msg.AcceptedHeight) case *message.QueryFailed: - return engine.QueryFailed(ctx, nodeID, msg.RequestID) + return h.engine.QueryFailed(ctx, nodeID, msg.RequestID) // Connection messages can be sent to the currently executing engine case *message.Connected: @@ -748,7 +682,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { return err } h.p2pTracker.Connected(nodeID, msg.NodeVersion) - return engine.Connected(ctx, nodeID, msg.NodeVersion) + return h.engine.Connected(ctx, nodeID, msg.NodeVersion) case *message.Disconnected: err := h.peerTracker.Disconnected(ctx, nodeID) @@ -756,7 +690,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { return err } h.p2pTracker.Disconnected(nodeID) - return engine.Disconnected(ctx, nodeID) + return h.engine.Disconnected(ctx, nodeID) default: return fmt.Errorf( @@ -819,20 +753,9 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { ) }() - state := h.ctx.State.Get() - engine, ok := h.engineManager.Get(state.Type).Get(state.State) - if !ok { - return fmt.Errorf( - "%w %s running %s", - errMissingEngine, - state.State, - state.Type, - ) - } - switch m := body.(type) { case *p2ppb.AppRequest: - return engine.AppRequest( + return h.engine.AppRequest( ctx, nodeID, m.RequestId, @@ -841,7 +764,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { ) case *p2ppb.AppResponse: - return engine.AppResponse(ctx, nodeID, m.RequestId, m.AppBytes) + return h.engine.AppResponse(ctx, nodeID, m.RequestId, m.AppBytes) case *p2ppb.AppError: err := &common.AppError{ @@ -849,7 +772,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { Message: m.ErrorMessage, } - return engine.AppRequestFailed( + return h.engine.AppRequestFailed( ctx, nodeID, m.RequestId, @@ -857,7 +780,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { ) case *p2ppb.AppGossip: - return engine.AppGossip(ctx, nodeID, m.AppBytes) + return h.engine.AppGossip(ctx, nodeID, m.AppBytes) default: return fmt.Errorf( @@ -918,26 +841,15 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { } }() - state := h.ctx.State.Get() - engine, ok := h.engineManager.Get(state.Type).Get(state.State) - if !ok { - return fmt.Errorf( - "%w %s running %s", - errMissingEngine, - state.State, - state.Type, - ) - } - switch msg := body.(type) { case *message.VMMessage: - return engine.Notify(context.TODO(), common.Message(msg.Notification)) + return h.engine.Notify(context.TODO(), common.Message(msg.Notification)) case *message.GossipRequest: - return engine.Gossip(context.TODO()) + return h.engine.Gossip(context.TODO()) case *message.Timeout: - return engine.Timeout(context.TODO()) + return h.engine.Timeout(context.TODO()) default: return fmt.Errorf( @@ -1000,17 +912,7 @@ func (h *handler) shutdown(ctx context.Context, startClosingTime time.Time) { close(h.closed) }() - state := h.ctx.State.Get() - engine, ok := h.engineManager.Get(state.Type).Get(state.State) - if !ok { - h.ctx.Log.Error("failed fetching current engine during shutdown", - zap.Stringer("type", state.Type), - zap.Stringer("state", state.State), - ) - return - } - - if err := engine.Shutdown(ctx); err != nil { + if err := h.engine.Shutdown(ctx); err != nil { h.ctx.Log.Error("failed while shutting down the chain", zap.Error(err), ) diff --git a/snow/networking/handler/handler_test.go b/snow/networking/handler/handler_test.go index 39947a0c9503..8a9b234af2ce 100644 --- a/snow/networking/handler/handler_test.go +++ b/snow/networking/handler/handler_test.go @@ -99,11 +99,8 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { called <- struct{}{} return nil } - handler.SetEngineManager(&EngineManager{ - Snowman: &Engine{ - Bootstrapper: bootstrapper, - }, - }) + handler.SetEngine(bootstrapper) + ctx.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, State: snow.Bootstrapping, // assumed bootstrap is ongoing @@ -213,12 +210,7 @@ func TestHandlerClosesOnError(t *testing.T) { return ctx } - handler.SetEngineManager(&EngineManager{ - Snowman: &Engine{ - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + handler.SetEngine(bootstrapper) // assume bootstrapping is ongoing so that InboundGetAcceptedFrontier // should normally be handled @@ -307,11 +299,8 @@ func TestHandlerDropsGossipDuringBootstrapping(t *testing.T) { closed <- struct{}{} return nil } - handler.SetEngineManager(&EngineManager{ - Snowman: &Engine{ - Bootstrapper: bootstrapper, - }, - }) + + handler.SetEngine(bootstrapper) ctx.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, State: snow.Bootstrapping, // assumed bootstrap is ongoing @@ -401,12 +390,7 @@ func TestHandlerDispatchInternal(t *testing.T) { return nil } - handler.SetEngineManager(&EngineManager{ - Snowman: &Engine{ - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + handler.SetEngine(engine) ctx.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, @@ -425,183 +409,104 @@ func TestHandlerDispatchInternal(t *testing.T) { // Tests that messages are routed to the correct engine type func TestDynamicEngineTypeDispatch(t *testing.T) { - tests := []struct { - name string - currentEngineType p2ppb.EngineType - requestedEngineType p2ppb.EngineType - setup func( - h Handler, - b common.BootstrapableEngine, - e common.Engine, - ) - }{ - { - name: "current - avalanche, requested - unspecified", - currentEngineType: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, - requestedEngineType: p2ppb.EngineType_ENGINE_TYPE_UNSPECIFIED, - setup: func(h Handler, b common.BootstrapableEngine, e common.Engine) { - h.SetEngineManager(&EngineManager{ - Avalanche: &Engine{ - StateSyncer: nil, - Bootstrapper: b, - Consensus: e, - }, - Snowman: nil, - }) - }, - }, - { - name: "current - avalanche, requested - avalanche", - currentEngineType: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, - requestedEngineType: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, - setup: func(h Handler, b common.BootstrapableEngine, e common.Engine) { - h.SetEngineManager(&EngineManager{ - Avalanche: &Engine{ - StateSyncer: nil, - Bootstrapper: b, - Consensus: e, - }, - Snowman: nil, - }) - }, - }, - { - name: "current - snowman, requested - unspecified", - currentEngineType: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, - requestedEngineType: p2ppb.EngineType_ENGINE_TYPE_UNSPECIFIED, - setup: func(h Handler, b common.BootstrapableEngine, e common.Engine) { - h.SetEngineManager(&EngineManager{ - Avalanche: nil, - Snowman: &Engine{ - StateSyncer: nil, - Bootstrapper: b, - Consensus: e, - }, - }) - }, - }, - { - name: "current - snowman, requested - avalanche", - currentEngineType: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, - requestedEngineType: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, - setup: func(h Handler, b common.BootstrapableEngine, e common.Engine) { - h.SetEngineManager(&EngineManager{ - Avalanche: &Engine{ - StateSyncer: nil, - Bootstrapper: nil, - Consensus: e, - }, - Snowman: &Engine{ - StateSyncer: nil, - Bootstrapper: b, - Consensus: nil, - }, - }) - }, - }, - { - name: "current - snowman, requested - snowman", - currentEngineType: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, - requestedEngineType: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, - setup: func(h Handler, b common.BootstrapableEngine, e common.Engine) { - h.SetEngineManager(&EngineManager{ - Avalanche: nil, - Snowman: &Engine{ - StateSyncer: nil, - Bootstrapper: b, - Consensus: e, - }, - }) - }, + require := require.New(t) + + snowCtx := snowtest.Context(t, snowtest.CChainID) + ctx := snowtest.ConsensusContext(snowCtx) + vdrs := validators.NewManager() + require.NoError(vdrs.AddStaker(ctx.SubnetID, ids.GenerateTestNodeID(), nil, ids.Empty, 1)) + + resourceTracker, err := tracker.NewResourceTracker( + prometheus.NewRegistry(), + resource.NoUsage, + meter.ContinuousFactory{}, + time.Second, + ) + require.NoError(err) + + peerTracker, err := p2p.NewPeerTracker( + logging.NoLog{}, + "", + prometheus.NewRegistry(), + nil, + version.CurrentApp, + ) + require.NoError(err) + + handler, err := New( + ctx, + vdrs, + nil, + time.Second, + testThreadPoolSize, + resourceTracker, + subnets.New(ids.EmptyNodeID, subnets.Config{}), + commontracker.NewPeers(), + peerTracker, + prometheus.NewRegistry(), + func() {}, + ) + require.NoError(err) + + bootstrapper := &enginetest.Bootstrapper{ + Engine: enginetest.Engine{ + T: t, }, } + bootstrapper.Default(false) - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - require := require.New(t) - - messageReceived := make(chan struct{}) - snowCtx := snowtest.Context(t, snowtest.CChainID) - ctx := snowtest.ConsensusContext(snowCtx) - vdrs := validators.NewManager() - require.NoError(vdrs.AddStaker(ctx.SubnetID, ids.GenerateTestNodeID(), nil, ids.Empty, 1)) - - resourceTracker, err := tracker.NewResourceTracker( - prometheus.NewRegistry(), - resource.NoUsage, - meter.ContinuousFactory{}, - time.Second, - ) - require.NoError(err) - - peerTracker, err := p2p.NewPeerTracker( - logging.NoLog{}, - "", - prometheus.NewRegistry(), - nil, - version.CurrentApp, - ) - require.NoError(err) - - handler, err := New( - ctx, - vdrs, - nil, - time.Second, - testThreadPoolSize, - resourceTracker, - subnets.New(ids.EmptyNodeID, subnets.Config{}), - commontracker.NewPeers(), - peerTracker, - prometheus.NewRegistry(), - func() {}, - ) - require.NoError(err) - - bootstrapper := &enginetest.Bootstrapper{ - Engine: enginetest.Engine{ - T: t, - }, - } - bootstrapper.Default(false) - - engine := &enginetest.Engine{T: t} - engine.Default(false) - engine.ContextF = func() *snow.ConsensusContext { - return ctx - } - engine.ChitsF = func(context.Context, ids.NodeID, uint32, ids.ID, ids.ID, ids.ID, uint64) error { - close(messageReceived) - return nil - } - - test.setup(handler, bootstrapper, engine) - - ctx.State.Set(snow.EngineState{ - Type: test.currentEngineType, - State: snow.NormalOp, // assumed bootstrap is done - }) - - bootstrapper.StartF = func(context.Context, uint32) error { - return nil - } - - handler.Start(context.Background(), false) - handler.Push(context.Background(), Message{ - InboundMessage: message.InboundChits( - ids.Empty, - uint32(0), - ids.Empty, - ids.Empty, - ids.Empty, - ids.EmptyNodeID, - ), - EngineType: test.requestedEngineType, - }) - - <-messageReceived - }) + engine := &enginetest.Engine{T: t} + engine.Default(false) + engine.ContextF = func() *snow.ConsensusContext { + return ctx } + + appReqChan := make(chan struct{}) + chitsChan := make(chan struct{}, 1) + + engine.AppRequestF = func(context.Context, ids.NodeID, uint32, time.Time, []byte) error { + // Wait for chits message to be dispatched before proceeding with processing + <-chitsChan + close(appReqChan) + return nil + } + engine.ChitsF = func(context.Context, ids.NodeID, uint32, ids.ID, ids.ID, ids.ID, uint64) error { + chitsChan <- struct{}{} + return nil + } + + ctx.State.Set(snow.EngineState{ + Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, + State: snow.NormalOp, + }) + + bootstrapper.StartF = func(context.Context, uint32) error { + return nil + } + + handler.SetEngine(engine) + handler.Start(context.Background(), false) + + // Ensure app requests are dispatched on a different goroutine than the chits. + // Dispatch app request first, but have it wait for the chits message to be processed. + // If they are dispatched on the same goroutine, a deadlock would occur. + handler.Push(context.Background(), Message{ + InboundMessage: message.InboundAppRequest(ids.Empty, uint32(0), time.Hour, nil, ids.EmptyNodeID), + EngineType: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, + }) + handler.Push(context.Background(), Message{ + InboundMessage: message.InboundChits( + ids.Empty, + uint32(0), + ids.Empty, + ids.Empty, + ids.Empty, + ids.EmptyNodeID, + ), + EngineType: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, + }) + + <-appReqChan } func TestHandlerStartError(t *testing.T) { @@ -643,7 +548,11 @@ func TestHandlerStartError(t *testing.T) { // Starting a handler with an unprovided engine should immediately cause the // handler to shutdown. - handler.SetEngineManager(&EngineManager{}) + + handler.SetEngine(&enginetest.Engine{StartF: func(_ context.Context, _ uint32) error { + return errors.New("oops") + }}) + ctx.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, State: snow.Initializing, diff --git a/snow/networking/handler/handlermock/handler.go b/snow/networking/handler/handlermock/handler.go index d8e3c23bb0f4..379bc93b379d 100644 --- a/snow/networking/handler/handlermock/handler.go +++ b/snow/networking/handler/handlermock/handler.go @@ -16,6 +16,7 @@ import ( ids "github.com/ava-labs/avalanchego/ids" snow "github.com/ava-labs/avalanchego/snow" + common "github.com/ava-labs/avalanchego/snow/engine/common" handler "github.com/ava-labs/avalanchego/snow/networking/handler" gomock "go.uber.org/mock/gomock" ) @@ -72,20 +73,6 @@ func (mr *HandlerMockRecorder) Context() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*Handler)(nil).Context)) } -// GetEngineManager mocks base method. -func (m *Handler) GetEngineManager() *handler.EngineManager { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetEngineManager") - ret0, _ := ret[0].(*handler.EngineManager) - return ret0 -} - -// GetEngineManager indicates an expected call of GetEngineManager. -func (mr *HandlerMockRecorder) GetEngineManager() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEngineManager", reflect.TypeOf((*Handler)(nil).GetEngineManager)) -} - // HealthCheck mocks base method. func (m *Handler) HealthCheck(arg0 context.Context) (any, error) { m.ctrl.T.Helper() @@ -139,16 +126,16 @@ func (mr *HandlerMockRecorder) RegisterTimeout(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterTimeout", reflect.TypeOf((*Handler)(nil).RegisterTimeout), arg0) } -// SetEngineManager mocks base method. -func (m *Handler) SetEngineManager(arg0 *handler.EngineManager) { +// SetEngine mocks base method. +func (m *Handler) SetEngine(arg0 common.Engine) { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetEngineManager", arg0) + m.ctrl.Call(m, "SetEngine", arg0) } -// SetEngineManager indicates an expected call of SetEngineManager. -func (mr *HandlerMockRecorder) SetEngineManager(arg0 any) *gomock.Call { +// SetEngine indicates an expected call of SetEngine. +func (mr *HandlerMockRecorder) SetEngine(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetEngineManager", reflect.TypeOf((*Handler)(nil).SetEngineManager), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetEngine", reflect.TypeOf((*Handler)(nil).SetEngine), arg0) } // SetOnStopped mocks base method. diff --git a/snow/networking/handler/health.go b/snow/networking/handler/health.go index 0dbcb844fb95..8350bbcc3ce5 100644 --- a/snow/networking/handler/health.go +++ b/snow/networking/handler/health.go @@ -15,17 +15,7 @@ import ( var ErrNotConnectedEnoughStake = errors.New("not connected to enough stake") func (h *handler) HealthCheck(ctx context.Context) (interface{}, error) { - state := h.ctx.State.Get() - engine, ok := h.engineManager.Get(state.Type).Get(state.State) - if !ok { - return nil, fmt.Errorf( - "%w %s running %s", - errMissingEngine, - state.State, - state.Type, - ) - } - engineIntf, engineErr := engine.HealthCheck(ctx) + engineIntf, engineErr := h.engine.HealthCheck(ctx) networkingIntf, networkingErr := h.networkHealthCheck() intf := map[string]interface{}{ "engine": engineIntf, diff --git a/snow/networking/handler/health_test.go b/snow/networking/handler/health_test.go index 4e7e5732b8de..686ec78f6ed9 100644 --- a/snow/networking/handler/health_test.go +++ b/snow/networking/handler/health_test.go @@ -110,12 +110,7 @@ func TestHealthCheckSubnet(t *testing.T) { return ctx } - handlerIntf.SetEngineManager(&EngineManager{ - Snowman: &Engine{ - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + handlerIntf.SetEngine(engine) ctx.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index 149be6dc7b52..07228f5024c9 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -151,18 +151,8 @@ func TestShutdown(t *testing.T) { return nil } engine.HaltF = func(context.Context) {} - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + h.SetEngine(bootstrapper) + chainCtx.State.Set(snow.EngineState{ Type: engineType, State: snow.NormalOp, // assumed bootstrapping is done @@ -268,18 +258,8 @@ func TestConnectedAfterShutdownErrorLogRegression(t *testing.T) { CantClear: true, } - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: &engine, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: &engine, - }, - }) + h.SetEngine(bootstrapper) + chainCtx.State.Set(snow.EngineState{ Type: engineType, State: snow.NormalOp, // assumed bootstrapping is done @@ -409,18 +389,8 @@ func TestShutdownTimesOut(t *testing.T) { *closed++ return nil } - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + h.SetEngine(engine) + ctx.State.Set(snow.EngineState{ Type: engineType, State: snow.NormalOp, // assumed bootstrapping is done @@ -612,18 +582,8 @@ func TestRouterTimeout(t *testing.T) { bootstrapper.StartF = func(context.Context, uint32) error { return nil } - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - }) + h.SetEngine(bootstrapper) + h.Start(context.Background(), false) nodeID := ids.GenerateTestNodeID() @@ -1102,18 +1062,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) { return ctx } engine.Default(false) - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + h.SetEngine(bootstrapper) chainRouter.AddChain(context.Background(), h) @@ -1268,12 +1217,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) { } engine.Default(false) - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + h.SetEngine(bootstrapper) chainRouter.AddChain(context.Background(), h) @@ -1511,18 +1455,8 @@ func newChainRouterTest(t *testing.T) (*ChainRouter, *enginetest.Engine) { engine.ContextF = func() *snow.ConsensusContext { return ctx } - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + h.SetEngine(engine) + ctx.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, State: snow.NormalOp, // assumed bootstrapping is done diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 1f7469575690..8ccdfb32e4d7 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -157,18 +157,8 @@ func TestTimeout(t *testing.T) { bootstrapper.ConnectedF = func(context.Context, ids.NodeID, *version.Application) error { return nil } - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - }) + h.SetEngine(bootstrapper) + ctx2.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, State: snow.Bootstrapping, // assumed bootstrap is ongoing @@ -426,18 +416,8 @@ func TestReliableMessages(t *testing.T) { return nil } bootstrapper.CantGossip = false - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - }) + h.SetEngine(bootstrapper) + ctx2.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, State: snow.Bootstrapping, // assumed bootstrap is ongoing @@ -582,18 +562,8 @@ func TestReliableMessagesToMyself(t *testing.T) { close(awaiting[int(reqID)]) return nil } - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: nil, - }, - }) + h.SetEngine(bootstrapper) + ctx2.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, State: snow.Bootstrapping, // assumed bootstrap is ongoing diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 2d1740a650e0..17ed2adf094c 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -26,6 +26,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap" + "github.com/ava-labs/avalanchego/snow/engine/unified" "github.com/ava-labs/avalanchego/snow/networking/benchlist" "github.com/ava-labs/avalanchego/snow/networking/handler" "github.com/ava-labs/avalanchego/snow/networking/router" @@ -1378,33 +1379,24 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { }, Consensus: &smcon.Topological{}, } - engine, err := smeng.New(engineConfig) - require.NoError(err) - - bootstrapper, err := bootstrap.New( - bootstrapConfig, - engine.Start, - ) - require.NoError(err) - h.SetEngineManager(&handler.EngineManager{ - Avalanche: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - Snowman: &handler.Engine{ - StateSyncer: nil, - Bootstrapper: bootstrapper, - Consensus: engine, - }, - }) + factory := &unified.EngineFactory{ + GetServer: snowGetHandler, + SnowmanConfig: engineConfig, + BootConfig: bootstrapConfig, + Logger: ctx.Log, + } consensusCtx.State.Set(snow.EngineState{ Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN, - State: snow.NormalOp, + State: snow.Bootstrapping, }) + engine, err := unified.EngineFromEngines(engineConfig.Ctx, factory, vm) + require.NoError(err) + + h.SetEngine(engine) + // Allow incoming messages to be routed to the new chain chainRouter.AddChain(context.Background(), h) ctx.Lock.Unlock() @@ -1426,7 +1418,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { } peerTracker.Connected(peerID, version.CurrentApp) - require.NoError(bootstrapper.Connected(context.Background(), peerID, version.CurrentApp)) + require.NoError(engine.Connected(context.Background(), peerID, version.CurrentApp)) externalSender.SendF = func(msg message.OutboundMessage, config common.SendConfig, _ ids.ID, _ subnets.Allower) set.Set[ids.NodeID] { inMsgIntf, err := mc.Parse(msg.Bytes(), ctx.NodeID, func() {}) @@ -1438,7 +1430,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { return config.NodeIDs } - require.NoError(bootstrapper.AcceptedFrontier(context.Background(), peerID, reqID, advanceTimeBlkID)) + require.NoError(engine.AcceptedFrontier(context.Background(), peerID, reqID, advanceTimeBlkID)) externalSender.SendF = func(msg message.OutboundMessage, config common.SendConfig, _ ids.ID, _ subnets.Allower) set.Set[ids.NodeID] { inMsgIntf, err := mc.Parse(msg.Bytes(), ctx.NodeID, func() {}) @@ -1455,7 +1447,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { } frontier := set.Of(advanceTimeBlkID) - require.NoError(bootstrapper.Accepted(context.Background(), peerID, reqID, frontier)) + require.NoError(engine.Accepted(context.Background(), peerID, reqID, frontier)) externalSender.SendF = func(msg message.OutboundMessage, config common.SendConfig, _ ids.ID, _ subnets.Allower) set.Set[ids.NodeID] { inMsg, err := mc.Parse(msg.Bytes(), ctx.NodeID, func() {}) @@ -1469,7 +1461,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { return config.NodeIDs } - require.NoError(bootstrapper.Ancestors(context.Background(), peerID, reqID, [][]byte{advanceTimeBlkBytes})) + require.NoError(engine.Ancestors(context.Background(), peerID, reqID, [][]byte{advanceTimeBlkBytes})) externalSender.SendF = func(msg message.OutboundMessage, config common.SendConfig, _ ids.ID, _ subnets.Allower) set.Set[ids.NodeID] { inMsgIntf, err := mc.Parse(msg.Bytes(), ctx.NodeID, func() {}) @@ -1481,12 +1473,12 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { return config.NodeIDs } - require.NoError(bootstrapper.AcceptedFrontier(context.Background(), peerID, reqID, advanceTimeBlkID)) + require.NoError(engine.AcceptedFrontier(context.Background(), peerID, reqID, advanceTimeBlkID)) externalSender.SendF = nil externalSender.CantSend = false - require.NoError(bootstrapper.Accepted(context.Background(), peerID, reqID, frontier)) + require.NoError(engine.Accepted(context.Background(), peerID, reqID, frontier)) require.Equal(advanceTimeBlk.ID(), vm.manager.Preferred()) ctx.Lock.Unlock()