Skip to content

Commit

Permalink
fix(node): fix node restart (#3797)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimjbrettj authored Mar 28, 2024
1 parent 476115a commit 101de8f
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 22 deletions.
29 changes: 29 additions & 0 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type BlockState struct {
unfinalisedBlocks *hashToBlockMap
tries *Tries

// State variables
pausedLock sync.RWMutex
pause chan struct{}

// block notifiers
imported map[chan *types.Block]struct{}
finalised map[chan *types.FinalisationInfo]struct{}
Expand All @@ -85,6 +89,7 @@ func NewBlockState(db database.Database, trs *Tries, telemetry Telemetry) (*Bloc
finalised: make(map[chan *types.FinalisationInfo]struct{}),
runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version),
telemetry: telemetry,
pause: make(chan struct{}),
}

gh, err := bs.db.Get(headerHashKey(0))
Expand Down Expand Up @@ -120,6 +125,7 @@ func NewBlockStateFromGenesis(db database.Database, trs *Tries, header *types.He
genesisHash: header.Hash(),
lastFinalised: header.Hash(),
telemetry: telemetryMailer,
pause: make(chan struct{}),
}

if err := bs.setArrivalTime(header.Hash(), time.Now()); err != nil {
Expand Down Expand Up @@ -153,6 +159,29 @@ func NewBlockStateFromGenesis(db database.Database, trs *Tries, header *types.He
return bs, nil
}

// Pause pauses the service ie. halts block production
func (bs *BlockState) Pause() error {
bs.pausedLock.Lock()
defer bs.pausedLock.Unlock()

if bs.IsPaused() {
return nil
}

close(bs.pause)
return nil
}

// IsPaused returns if the service is paused or not (ie. producing blocks)
func (bs *BlockState) IsPaused() bool {
select {
case <-bs.pause:
return true
default:
return false
}
}

// encodeBlockNumber encodes a block number as big endian uint64
func encodeBlockNumber(number uint64) []byte {
enc := make([]byte, 8) // encoding results in 8 bytes
Expand Down
4 changes: 4 additions & 0 deletions dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
bs.lock.Lock()
defer bs.lock.Unlock()

if bs.IsPaused() {
return errors.New("blockstate service is paused")
}

has, err := bs.HasHeader(hash)
if err != nil {
return fmt.Errorf("could not check header for hash %s: %w", hash, err)
Expand Down
5 changes: 5 additions & 0 deletions dot/state/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type Service struct {
BabeThresholdDenominator uint64
}

// Pause Pauses the state service
func (s *Service) Pause() error {
return s.Block.Pause()
}

// Config is the default configuration used by state service.
type Config struct {
Path string
Expand Down
81 changes: 66 additions & 15 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ func (cs *chainSync) bootstrapSync() {
cs.workerPool.useConnectedPeers()
err = cs.requestMaxBlocksFrom(currentBlock, networkInitialSync)
if err != nil {
if errors.Is(err, errBlockStatePaused) {
logger.Debugf("exiting bootstrap sync: %s", err)
return
}
logger.Errorf("requesting max blocks from best block header: %s", err)
}

Expand Down Expand Up @@ -411,8 +415,11 @@ func (cs *chainSync) requestChainBlocks(announcedHeader, bestBlockHeader *types.
}

resultsQueue := make(chan *syncTaskResult)
cs.workerPool.submitRequest(request, &peerWhoAnnounced, resultsQueue)
err := cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, totalBlocks)
err := cs.submitRequest(request, &peerWhoAnnounced, resultsQueue)
if err != nil {
return err
}
err = cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, totalBlocks)
if err != nil {
return fmt.Errorf("while handling workers results: %w", err)
}
Expand Down Expand Up @@ -449,8 +456,10 @@ func (cs *chainSync) requestForkBlocks(bestBlockHeader, highestFinalizedHeader,
gapLength, peerWhoAnnounced, announcedHeader.Number, announcedHash.Short())

resultsQueue := make(chan *syncTaskResult)
cs.workerPool.submitRequest(request, &peerWhoAnnounced, resultsQueue)

err = cs.submitRequest(request, &peerWhoAnnounced, resultsQueue)
if err != nil {
return err
}
err = cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, gapLength)
if err != nil {
return fmt.Errorf("while handling workers results: %w", err)
Expand Down Expand Up @@ -499,8 +508,10 @@ func (cs *chainSync) requestPendingBlocks(highestFinalizedHeader *types.Header)
// the `requests` in the tip sync are not related necessarily
// this is why we need to treat them separately
resultsQueue := make(chan *syncTaskResult)
cs.workerPool.submitRequest(descendingGapRequest, nil, resultsQueue)

err = cs.submitRequest(descendingGapRequest, nil, resultsQueue)
if err != nil {
return err
}
// TODO: we should handle the requests concurrently
// a way of achieve that is by constructing a new `handleWorkersResults` for
// handling only tip sync requests
Expand Down Expand Up @@ -536,15 +547,38 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin
}
}

resultsQueue := cs.workerPool.submitRequests(requests)
err := cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks)
resultsQueue, err := cs.submitRequests(requests)
if err != nil {
return err
}
err = cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks)
if err != nil {
return fmt.Errorf("while handling workers results: %w", err)
}

return nil
}

func (cs *chainSync) submitRequest(
request *network.BlockRequestMessage,
who *peer.ID,
resultCh chan<- *syncTaskResult,
) error {
if !cs.blockState.IsPaused() {
cs.workerPool.submitRequest(request, who, resultCh)
return nil
}
return fmt.Errorf("submitting request: %w", errBlockStatePaused)
}

func (cs *chainSync) submitRequests(requests []*network.BlockRequestMessage) (
resultCh chan *syncTaskResult, err error) {
if !cs.blockState.IsPaused() {
return cs.workerPool.submitRequests(requests), nil
}
return nil, fmt.Errorf("submitting requests: %w", errBlockStatePaused)
}

func (cs *chainSync) showSyncStats(syncBegin time.Time, syncedBlocks int) {
finalisedHeader, err := cs.blockState.GetHighestFinalisedHeader()
if err != nil {
Expand Down Expand Up @@ -581,7 +615,6 @@ func (cs *chainSync) showSyncStats(syncBegin time.Time, syncedBlocks int) {
func (cs *chainSync) handleWorkersResults(
workersResults chan *syncTaskResult, origin blockOrigin, startAtBlock uint, expectedSyncedBlocks uint32) error {
startTime := time.Now()

syncingChain := make([]*types.BlockData, expectedSyncedBlocks)
// the total numbers of blocks is missing in the syncing chain
waitingBlocks := expectedSyncedBlocks
Expand Down Expand Up @@ -627,7 +660,10 @@ taskResultLoop:
}

// TODO: avoid the same peer to get the same task
cs.workerPool.submitRequest(request, nil, workersResults)
err := cs.submitRequest(request, nil, workersResults)
if err != nil {
return err
}
continue
}

Expand All @@ -648,22 +684,31 @@ taskResultLoop:
}, who)
}

cs.workerPool.submitRequest(taskResult.request, nil, workersResults)
err = cs.submitRequest(taskResult.request, nil, workersResults)
if err != nil {
return err
}
continue taskResultLoop
}

isChain := isResponseAChain(response.BlockData)
if !isChain {
logger.Criticalf("response from %s is not a chain", who)
cs.workerPool.submitRequest(taskResult.request, nil, workersResults)
err = cs.submitRequest(taskResult.request, nil, workersResults)
if err != nil {
return err
}
continue taskResultLoop
}

grows := doResponseGrowsTheChain(response.BlockData, syncingChain,
startAtBlock, expectedSyncedBlocks)
if !grows {
logger.Criticalf("response from %s does not grows the ongoing chain", who)
cs.workerPool.submitRequest(taskResult.request, nil, workersResults)
err = cs.submitRequest(taskResult.request, nil, workersResults)
if err != nil {
return err
}
continue taskResultLoop
}

Expand All @@ -678,7 +723,10 @@ taskResultLoop:
}, who)

cs.workerPool.ignorePeerAsWorker(taskResult.who)
cs.workerPool.submitRequest(taskResult.request, nil, workersResults)
err = cs.submitRequest(taskResult.request, nil, workersResults)
if err != nil {
return err
}
continue taskResultLoop
}

Expand Down Expand Up @@ -708,7 +756,10 @@ taskResultLoop:
Direction: network.Ascending,
Max: &difference,
}
cs.workerPool.submitRequest(taskResult.request, nil, workersResults)
err = cs.submitRequest(taskResult.request, nil, workersResults)
if err != nil {
return err
}
continue taskResultLoop
}
}
Expand Down
12 changes: 12 additions & 0 deletions dot/sync/chain_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) {
blockStateMock.EXPECT().
HasHeader(block2AnnounceHeader.Hash()).
Return(false, nil)
blockStateMock.EXPECT().IsPaused().Return(false)

blockStateMock.EXPECT().
BestBlockHeader().
Expand Down Expand Up @@ -272,6 +273,8 @@ func Test_chainSync_onBlockAnnounceHandshake_tipModeNeedToCatchup(t *testing.T)
Return(block1AnnounceHeader, nil).
Times(3)

blockStateMock.EXPECT().IsPaused().Return(false).Times(2)

expectedRequest := network.NewAscendingBlockRequests(
block1AnnounceHeader.Number+1,
block2AnnounceHeader.Number, network.BootstrapRequestData)
Expand Down Expand Up @@ -532,6 +535,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorker(t *testing.T) {

mockedBlockState := NewMockBlockState(ctrl)
mockedBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo))
mockedBlockState.EXPECT().IsPaused().Return(false)

mockBabeVerifier := NewMockBabeVerifier(ctrl)
mockStorageState := NewMockStorageState(ctrl)
Expand Down Expand Up @@ -588,6 +592,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithTwoWorkers(t *testing.T) {
mockTelemetry := NewMockTelemetry(ctrl)

mockBlockState.EXPECT().GetHighestFinalisedHeader().Return(types.NewEmptyHeader(), nil).Times(1)
mockBlockState.EXPECT().IsPaused().Return(false)
mockNetwork.EXPECT().Peers().Return([]common.PeerInfo{}).Times(1)

// this test expects two workers responding each request with 128 blocks which means
Expand Down Expand Up @@ -664,6 +669,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing.
ctrl := gomock.NewController(t)
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo))
mockBlockState.EXPECT().IsPaused().Return(false).Times(2)
mockedGenesisHeader := types.NewHeader(common.NewHash([]byte{0}), trie.EmptyHash,
trie.EmptyHash, 0, types.NewDigest())

Expand All @@ -676,6 +682,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing.
mockTelemetry := NewMockTelemetry(ctrl)

mockBlockState.EXPECT().GetHighestFinalisedHeader().Return(types.NewEmptyHeader(), nil).Times(1)

mockNetwork.EXPECT().Peers().Return([]common.PeerInfo{}).Times(1)

// this test expects two workers responding each request with 128 blocks which means
Expand Down Expand Up @@ -760,6 +767,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithProtocolNotSupported(t *test
ctrl := gomock.NewController(t)
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo))
mockBlockState.EXPECT().IsPaused().Return(false).Times(2)
mockBlockState.EXPECT().
GetHighestFinalisedHeader().
Return(types.NewEmptyHeader(), nil).
Expand Down Expand Up @@ -864,6 +872,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithNilHeaderInResponse(t *testi
ctrl := gomock.NewController(t)
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo))
mockBlockState.EXPECT().IsPaused().Return(false).Times(2)
mockBlockState.EXPECT().
GetHighestFinalisedHeader().
Return(types.NewEmptyHeader(), nil).
Expand Down Expand Up @@ -970,6 +979,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithResponseIsNotAChain(t *testi
ctrl := gomock.NewController(t)
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo))
mockBlockState.EXPECT().IsPaused().Return(false).Times(2)
mockBlockState.EXPECT().
GetHighestFinalisedHeader().
Return(types.NewEmptyHeader(), nil).
Expand Down Expand Up @@ -1073,6 +1083,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithReceivedBadBlock(t *testing.
ctrl := gomock.NewController(t)
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo))
mockBlockState.EXPECT().IsPaused().Return(false).Times(2)
mockBlockState.EXPECT().
GetHighestFinalisedHeader().
Return(types.NewEmptyHeader(), nil).
Expand Down Expand Up @@ -1197,6 +1208,7 @@ func TestChainSync_BootstrapSync_SucessfulSync_ReceivedPartialBlockData(t *testi
ctrl := gomock.NewController(t)
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo))
mockBlockState.EXPECT().IsPaused().Return(false).Times(2)
mockBlockState.EXPECT().
GetHighestFinalisedHeader().
Return(types.NewEmptyHeader(), nil).
Expand Down
6 changes: 1 addition & 5 deletions dot/sync/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ import (
)

var (
// ErrServiceStopped is returned when the service has been stopped
ErrServiceStopped = errors.New("service has been stopped")

// ErrInvalidBlock is returned when a block cannot be verified
ErrInvalidBlock = errors.New("could not verify block")
errBlockStatePaused = errors.New("blockstate service has been paused")

// ErrInvalidBlockRequest is returned when an invalid block request is received
ErrInvalidBlockRequest = errors.New("invalid block request")
Expand Down
3 changes: 3 additions & 0 deletions dot/sync/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type BlockState interface {
GetHeaderByNumber(num uint) (*types.Header, error)
GetAllBlocksAtNumber(num uint) ([]common.Hash, error)
IsDescendantOf(parent, child common.Hash) (bool, error)

IsPaused() bool
Pause() error
}

// StorageState is the interface for the storage state
Expand Down
28 changes: 28 additions & 0 deletions dot/sync/mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 101de8f

Please sign in to comment.