Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify engines #3405

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 71 additions & 85 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common/tracker"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/snow/engine/snowman/syncer"
"github.com/ava-labs/avalanchego/snow/engine/unified"
"github.com/ava-labs/avalanchego/snow/networking/handler"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/sender"
Expand Down Expand Up @@ -63,7 +64,6 @@ import (

p2ppb "github.com/ava-labs/avalanchego/proto/pb/p2p"
smcon "github.com/ava-labs/avalanchego/snow/consensus/snowman"
aveng "github.com/ava-labs/avalanchego/snow/engine/avalanche"
avbootstrap "github.com/ava-labs/avalanchego/snow/engine/avalanche/bootstrap"
avagetter "github.com/ava-labs/avalanchego/snow/engine/avalanche/getter"
smeng "github.com/ava-labs/avalanchego/snow/engine/snowman"
Expand Down Expand Up @@ -933,23 +933,13 @@ func (m *manager) createAvalancheChain(
// to make sure start callbacks are duly initialized
snowmanEngineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
ConnectedValidators: connectedValidators,
Params: consensusParams,
Consensus: snowmanConsensus,
}
var snowmanEngine common.Engine
snowmanEngine, err = smeng.New(snowmanEngineConfig)
if err != nil {
return nil, fmt.Errorf("error initializing snowman engine: %w", err)
}

if m.TracingEnabled {
snowmanEngine = common.TraceEngine(snowmanEngine, m.Tracer)
}

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Expand All @@ -968,18 +958,6 @@ func (m *manager) createAvalancheChain(
DB: blockBootstrappingDB,
VM: vmWrappingProposerVM,
}
var snowmanBootstrapper common.BootstrapableEngine
snowmanBootstrapper, err = smbootstrap.New(
bootstrapCfg,
snowmanEngine.Start,
)
if err != nil {
return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err)
}

if m.TracingEnabled {
snowmanBootstrapper = common.TraceBootstrapableEngine(snowmanBootstrapper, m.Tracer)
}

avaGetHandler, err := avagetter.New(
vtxManager,
Expand All @@ -993,12 +971,6 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("couldn't initialize avalanche base message handler: %w", err)
}

// create engine gear
avalancheEngine := aveng.New(ctx, avaGetHandler, linearizableVM)
if m.TracingEnabled {
avalancheEngine = common.TraceEngine(avalancheEngine, m.Tracer)
}

// create bootstrap gear
avalancheBootstrapperConfig := avbootstrap.Config{
AllGetsServer: avaGetHandler,
Expand All @@ -1016,32 +988,33 @@ func (m *manager) createAvalancheChain(
avalancheBootstrapperConfig.StopVertexID = m.Upgrades.CortinaXChainStopVertexID
}

var avalancheBootstrapper common.BootstrapableEngine
avalancheBootstrapper, err = avbootstrap.New(
avalancheBootstrapperConfig,
snowmanBootstrapper.Start,
avalancheMetrics,
)
ef := &unified.EngineFactory{
TracingEnabled: m.TracingEnabled,
GetServer: snowGetHandler,
AvaAncestorGetter: avaGetHandler,
AvaMetrics: avalancheMetrics,
Tracer: m.Tracer,
BootConfig: bootstrapCfg,
AvaBootConfig: avalancheBootstrapperConfig,
SnowmanConfig: snowmanEngineConfig,
Logger: ctx.Log,
}

ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE,
State: snow.Bootstrapping,
})

ue, err := unified.EngineFromEngines(ctx, ef, vm)
if err != nil {
return nil, fmt.Errorf("error initializing avalanche bootstrapper: %w", err)
return nil, fmt.Errorf("error initializing unified engine: %w", err)
}

engine := common.Engine(ue)
if m.TracingEnabled {
avalancheBootstrapper = common.TraceBootstrapableEngine(avalancheBootstrapper, m.Tracer)
engine = common.TraceEngine(ue, m.Tracer)
}

h.SetEngineManager(&handler.EngineManager{
Avalanche: &handler.Engine{
StateSyncer: nil,
Bootstrapper: avalancheBootstrapper,
Consensus: avalancheEngine,
},
Snowman: &handler.Engine{
StateSyncer: nil,
Bootstrapper: snowmanBootstrapper,
Consensus: snowmanEngine,
},
})
h.SetEngine(engine)

// Register health check for this chain
if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil {
Expand Down Expand Up @@ -1327,7 +1300,6 @@ func (m *manager) createSnowmanChain(
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Expand All @@ -1336,15 +1308,6 @@ func (m *manager) createSnowmanChain(
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
}
var engine common.Engine
engine, err = smeng.New(engineConfig)
if err != nil {
return nil, fmt.Errorf("error initializing snowman engine: %w", err)
}

if m.TracingEnabled {
engine = common.TraceEngine(engine, m.Tracer)
}

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Expand All @@ -1364,18 +1327,6 @@ func (m *manager) createSnowmanChain(
VM: vm,
Bootstrapped: bootstrapFunc,
}
var bootstrapper common.BootstrapableEngine
bootstrapper, err = smbootstrap.New(
bootstrapCfg,
engine.Start,
)
if err != nil {
return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err)
}

if m.TracingEnabled {
bootstrapper = common.TraceBootstrapableEngine(bootstrapper, m.Tracer)
}

// create state sync gear
stateSyncCfg, err := syncer.NewConfig(
Expand All @@ -1392,24 +1343,42 @@ func (m *manager) createSnowmanChain(
if err != nil {
return nil, fmt.Errorf("couldn't initialize state syncer configuration: %w", err)
}
stateSyncer := syncer.New(
stateSyncCfg,
bootstrapper.Start,
)

if m.TracingEnabled {
stateSyncer = common.TraceStateSyncer(stateSyncer, m.Tracer)
ef := &unified.EngineFactory{
TracingEnabled: m.TracingEnabled,
GetServer: snowGetHandler,
AvaAncestorGetter: invalidEngineAncestorsGetter{},
StateSync: hasStateSync(stateSyncCfg),
Tracer: m.Tracer,
BootConfig: bootstrapCfg,
SnowmanConfig: engineConfig,
StateSyncConfig: stateSyncCfg,
Logger: ctx.Log,
}

h.SetEngineManager(&handler.EngineManager{
Avalanche: nil,
Snowman: &handler.Engine{
StateSyncer: stateSyncer,
Bootstrapper: bootstrapper,
Consensus: engine,
},
ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN,
State: snow.StateSyncing,
})

if !ef.HasStateSync() {
ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN,
State: snow.Bootstrapping,
})
}

ue, err := unified.EngineFromEngines(ctx, ef, vm)
if err != nil {
return nil, fmt.Errorf("error initializing unified engine: %w", err)
}

engine := common.Engine(ue)
if m.TracingEnabled {
engine = common.TraceEngine(ue, m.Tracer)
}
h.SetEngine(engine)

// Register health checks
if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil {
return nil, fmt.Errorf("couldn't add health check for chain %s: %w", primaryAlias, err)
Expand All @@ -1423,6 +1392,17 @@ func (m *manager) createSnowmanChain(
}, nil
}

func hasStateSync(stateSyncCfg syncer.Config) bool {
var hasStateSync bool
ssVM, isStateSyncable := stateSyncCfg.VM.(block.StateSyncableVM)
if isStateSyncable {
if enabled, err := ssVM.StateSyncEnabled(context.Background()); err == nil && enabled {
hasStateSync = true
}
}
return hasStateSync
}

func (m *manager) IsBootstrapped(id ids.ID) bool {
m.chainsLock.Lock()
chain, exists := m.chains[id]
Expand Down Expand Up @@ -1581,3 +1561,9 @@ func (m *manager) getOrMakeVMRegisterer(vmID ids.ID, chainAlias string) (metrics
)
return chainReg, err
}

type invalidEngineAncestorsGetter struct{}

func (invalidEngineAncestorsGetter) GetAncestors(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID, engineType p2ppb.EngineType) error {
return fmt.Errorf("this engine does not run %s", engineType)
}
1 change: 1 addition & 0 deletions scripts/mocks.mockgen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/ava-labs/avalanchego/snow/engine/snowman/block=BuildBlockWithContextC
github.com/ava-labs/avalanchego/snow/engine/snowman/block=ChainVM=snow/engine/snowman/block/blockmock/chain_vm.go
github.com/ava-labs/avalanchego/snow/engine/snowman/block=StateSyncableVM=snow/engine/snowman/block/blockmock/state_syncable_vm.go
github.com/ava-labs/avalanchego/snow/engine/snowman/block=WithVerifyContext=snow/engine/snowman/block/blockmock/with_verify_context.go
github.com/ava-labs/avalanchego/snow/engine/unified=Factory=snow/engine/unified/mocks/mock_factory.go
github.com/ava-labs/avalanchego/snow/networking/handler=Handler=snow/networking/handler/handlermock/handler.go
github.com/ava-labs/avalanchego/snow/networking/timeout=Manager=snow/networking/timeout/timeoutmock/manager.go
github.com/ava-labs/avalanchego/snow/networking/tracker=Targeter=snow/networking/tracker/trackermock/targeter.go
Expand Down
26 changes: 1 addition & 25 deletions snow/engine/avalanche/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/snow/consensus/avalanche"
Expand Down Expand Up @@ -43,7 +42,7 @@ const (
epsilon = 1e-6 // small amount to add to time to avoid division by 0
)

var _ common.BootstrapableEngine = (*Bootstrapper)(nil)
var _ common.AvalancheBootstrapableEngine = (*Bootstrapper)(nil)

func New(
config Config,
Expand All @@ -53,15 +52,6 @@ func New(
b := &Bootstrapper{
Config: config,

StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log),
AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(config.Ctx.Log),
AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(config.Ctx.Log),
AcceptedHandler: common.NewNoOpAcceptedHandler(config.Ctx.Log),
PutHandler: common.NewNoOpPutHandler(config.Ctx.Log),
QueryHandler: common.NewNoOpQueryHandler(config.Ctx.Log),
ChitsHandler: common.NewNoOpChitsHandler(config.Ctx.Log),
AppHandler: config.VM,

outstandingRequests: bimap.New[common.Request, ids.ID](),
outstandingRequestTimes: make(map[common.Request]time.Time),

Expand All @@ -77,16 +67,6 @@ type Bootstrapper struct {
Config
common.Halter

// list of NoOpsHandler for messages dropped by Bootstrapper
common.StateSummaryFrontierHandler
common.AcceptedStateSummaryHandler
common.AcceptedFrontierHandler
common.AcceptedHandler
common.PutHandler
common.QueryHandler
common.ChitsHandler
common.AppHandler

metrics

// tracks which validators were asked for which containers in which requests
Expand Down Expand Up @@ -320,10 +300,6 @@ func (*Bootstrapper) Notify(context.Context, common.Message) error {
func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error {
b.Ctx.Log.Info("starting bootstrap")

b.Ctx.State.Set(snow.EngineState{
Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE,
State: snow.Bootstrapping,
})
if err := b.VM.SetState(ctx, snow.Bootstrapping); err != nil {
return fmt.Errorf("failed to notify VM that bootstrapping has started: %w",
err)
Expand Down
1 change: 0 additions & 1 deletion snow/engine/avalanche/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ func TestBootstrapperIncompleteAncestors(t *testing.T) {
require.Equal(vtxID1, requested)

require.NoError(bs.Ancestors(context.Background(), peerID, *reqIDPtr, [][]byte{vtxBytes1})) // Provide vtx1; should request vtx0
require.Equal(snow.Bootstrapping, bs.Context().State.Get().State)
require.Equal(vtxID0, requested)

manager.StopVertexAcceptedF = func(context.Context) (bool, error) {
Expand Down
Loading